[FLINK-3755] Fix variety of test problems cause by Keyed-State Refactoring
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f44b57cc Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f44b57cc Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f44b57cc Branch: refs/heads/master Commit: f44b57ccf8f088f2ad4c1f10f479ed62be17eb8b Parents: 6d43061 Author: Stefan Richter <s.rich...@data-artisans.com> Authored: Mon Aug 29 16:10:15 2016 +0200 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Wed Aug 31 19:10:02 2016 +0200 ---------------------------------------------------------------------- .../state/RocksDBAsyncSnapshotTest.java | 58 ++++++++++++-------- .../flink/cep/operator/CEPOperatorTest.java | 6 ++ .../flink/runtime/executiongraph/Execution.java | 4 +- .../checkpoint/CheckpointCoordinatorTest.java | 18 ------ .../runtime/state/StateBackendTestBase.java | 20 +++---- .../streaming/runtime/tasks/StreamTask.java | 8 ++- .../api/graph/StreamGraphGeneratorTest.java | 5 +- .../KeyedOneInputStreamOperatorTestHarness.java | 7 +++ .../api/scala/StreamingOperatorsITCase.scala | 1 + .../WindowCheckpointingITCase.java | 14 +++-- .../test/streaming/runtime/IterateITCase.java | 1 + .../translation/CustomPartitioningTest.scala | 5 +- .../DeltaIterationTranslationTest.scala | 1 + 13 files changed, 83 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f44b57cc/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java index 624905c..d5b9b46 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java @@ -69,6 +69,8 @@ import java.util.Arrays; import java.util.List; import java.util.UUID; import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; @@ -173,10 +175,10 @@ public class RocksDBAsyncSnapshotTest { } } - testHarness.processElement(new StreamRecord<>("Wohoo", 0)); - task.triggerCheckpoint(42, 17); + testHarness.processElement(new StreamRecord<>("Wohoo", 0)); + // now we allow the checkpoint delayCheckpointLatch.trigger(); @@ -184,7 +186,13 @@ public class RocksDBAsyncSnapshotTest { ensureCheckpointLatch.await(); testHarness.endInput(); + + ExecutorService threadPool = task.getAsyncOperationsThreadPool(); + threadPool.shutdown(); + Assert.assertTrue(threadPool.awaitTermination(60_000, TimeUnit.MILLISECONDS)); + testHarness.waitForTaskCompletion(); + task.checkTimerException(); } /** @@ -199,9 +207,6 @@ public class RocksDBAsyncSnapshotTest { final OneInputStreamTask<String, String> task = new OneInputStreamTask<>(); - //ensure that the async threads complete before invoke method of the tasks returns. - task.setThreadPoolTerminationTimeout(Long.MAX_VALUE); - final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(task, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); testHarness.configureForKeyedStream(new KeySelector<String, String>() { @@ -232,6 +237,9 @@ public class RocksDBAsyncSnapshotTest { new MockInputSplitProvider(), testHarness.bufferSize); + BlockingStreamMemoryStateBackend.waitFirstWriteLatch = new OneShotLatch(); + BlockingStreamMemoryStateBackend.unblockCancelLatch = new OneShotLatch(); + testHarness.invoke(mockEnv); // wait for the task to be running @@ -241,36 +249,40 @@ public class RocksDBAsyncSnapshotTest { while (!field.getBoolean(task)) { Thread.sleep(10); } - } } - testHarness.processElement(new StreamRecord<>("Wohoo", 0)); - task.triggerCheckpoint(42, 17); - + testHarness.processElement(new StreamRecord<>("Wohoo", 0)); BlockingStreamMemoryStateBackend.waitFirstWriteLatch.await(); task.cancel(); - BlockingStreamMemoryStateBackend.unblockCancelLatch.trigger(); - testHarness.endInput(); try { + + ExecutorService threadPool = task.getAsyncOperationsThreadPool(); + threadPool.shutdown(); + Assert.assertTrue(threadPool.awaitTermination(60_000, TimeUnit.MILLISECONDS)); testHarness.waitForTaskCompletion(); + task.checkTimerException(); + Assert.fail("Operation completed. Cancel failed."); } catch (Exception expected) { - // we expect the exception from canceling snapshots - Throwable cause = expected.getCause(); - if(cause instanceof AsynchronousException) { - AsynchronousException asynchronousException = (AsynchronousException) cause; - cause = asynchronousException.getCause(); - Assert.assertTrue("Unexpected Exception: " + cause, - cause instanceof CancellationException //future canceled - || cause instanceof InterruptedException); //thread interrupted + AsynchronousException asynchronousException = null; + if (expected instanceof AsynchronousException) { + asynchronousException = (AsynchronousException) expected; + } else if (expected.getCause() instanceof AsynchronousException) { + asynchronousException = (AsynchronousException) expected.getCause(); } else { - Assert.fail(); + Assert.fail("Unexpected exception: " + expected); } + + // we expect the exception from canceling snapshots + Throwable innerCause = asynchronousException.getCause(); + Assert.assertTrue("Unexpected inner cause: " + innerCause, + innerCause instanceof CancellationException //future canceled + || innerCause instanceof InterruptedException); //thread interrupted } } @@ -301,11 +313,11 @@ public class RocksDBAsyncSnapshotTest { */ static class BlockingStreamMemoryStateBackend extends MemoryStateBackend { - public static OneShotLatch waitFirstWriteLatch = new OneShotLatch(); + public static volatile OneShotLatch waitFirstWriteLatch = null; - public static OneShotLatch unblockCancelLatch = new OneShotLatch(); + public static volatile OneShotLatch unblockCancelLatch = null; - volatile boolean closed = false; + private volatile boolean closed = false; @Override public CheckpointStreamFactory createStreamFactory(JobID jobId, String operatorIdentifier) throws IOException { http://git-wip-us.apache.org/repos/asf/flink/blob/f44b57cc/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java index 52a02d1..1fd8de8 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java @@ -136,6 +136,7 @@ public class CEPOperatorTest extends TestLogger { // simulate snapshot/restore with some elements in internal sorting queue StreamStateHandle snapshot = harness.snapshot(0, 0); + harness.close(); harness = new OneInputStreamOperatorTestHarness<>( new CEPPatternOperator<>( @@ -157,6 +158,7 @@ public class CEPOperatorTest extends TestLogger { // simulate snapshot/restore with empty element queue but NFA state StreamStateHandle snapshot2 = harness.snapshot(1, 1); + harness.close(); harness = new OneInputStreamOperatorTestHarness<>( new CEPPatternOperator<>( @@ -227,6 +229,7 @@ public class CEPOperatorTest extends TestLogger { // simulate snapshot/restore with some elements in internal sorting queue StreamStateHandle snapshot = harness.snapshot(0, 0); + harness.close(); harness = new KeyedOneInputStreamOperatorTestHarness<>( new KeyedCEPPatternOperator<>( @@ -252,6 +255,7 @@ public class CEPOperatorTest extends TestLogger { // simulate snapshot/restore with empty element queue but NFA state StreamStateHandle snapshot2 = harness.snapshot(1, 1); + harness.close(); harness = new KeyedOneInputStreamOperatorTestHarness<>( new KeyedCEPPatternOperator<>( @@ -334,6 +338,7 @@ public class CEPOperatorTest extends TestLogger { // simulate snapshot/restore with some elements in internal sorting queue StreamStateHandle snapshot = harness.snapshot(0, 0); + harness.close(); harness = new KeyedOneInputStreamOperatorTestHarness<>( new KeyedCEPPatternOperator<>( @@ -364,6 +369,7 @@ public class CEPOperatorTest extends TestLogger { // simulate snapshot/restore with empty element queue but NFA state StreamStateHandle snapshot2 = harness.snapshot(1, 1); + harness.close(); harness = new KeyedOneInputStreamOperatorTestHarness<>( new KeyedCEPPatternOperator<>( http://git-wip-us.apache.org/repos/asf/flink/blob/f44b57cc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index 1981f5b..efddecc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -387,8 +387,8 @@ public class Execution { final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor( attemptId, slot, - chainedStateHandle, - keyGroupsStateHandles, + chainedStateHandle, + keyGroupsStateHandles, attemptNumber); // register this execution at the execution graph, to receive call backs http://git-wip-us.apache.org/repos/asf/flink/blob/f44b57cc/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index 4972c51..bc61742 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -1669,7 +1669,6 @@ public class CheckpointCoordinatorTest { 200000, 0L, 1, // max one checkpoint at a time => should not affect savepoints - 42, new ExecutionVertex[] { vertex1 }, new ExecutionVertex[] { vertex1 }, new ExecutionVertex[] { vertex1 }, @@ -1721,7 +1720,6 @@ public class CheckpointCoordinatorTest { 200000, 100000000L, // very long min delay => should not affect savepoints 1, - 42, new ExecutionVertex[] { vertex1 }, new ExecutionVertex[] { vertex1 }, new ExecutionVertex[] { vertex1 }, @@ -1742,22 +1740,6 @@ public class CheckpointCoordinatorTest { // Utilities // ------------------------------------------------------------------------ - private static ExecutionVertex mockExecutionVertex(ExecutionAttemptID attemptID) { - return mockExecutionVertex(attemptID, ExecutionState.RUNNING); - } - - private static ExecutionVertex mockExecutionVertex(ExecutionAttemptID attemptID, - ExecutionState state, ExecutionState ... successiveStates) { - final Execution exec = mock(Execution.class); - when(exec.getAttemptId()).thenReturn(attemptID); - when(exec.getState()).thenReturn(state, successiveStates); - - ExecutionVertex vertex = mock(ExecutionVertex.class); - when(vertex.getJobvertexId()).thenReturn(new JobVertexID()); - when(vertex.getCurrentExecutionAttempt()).thenReturn(exec); - - return vertex; - } /** * Tests that the checkpointed partitioned and non-partitioned state is assigned properly to * the {@link Execution} upon recovery. http://git-wip-us.apache.org/repos/asf/flink/blob/f44b57cc/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java index 5984aca..33ec182 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java @@ -1148,20 +1148,20 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> { @Test public void testEmptyStateCheckpointing() { + try { - DummyEnvironment env = new DummyEnvironment("test", 1, 0); - backend.initializeForJob(env, "test_op", IntSerializer.INSTANCE); + CheckpointStreamFactory streamFactory = createStreamFactory(); + KeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE); + + ListStateDescriptor<String> kvId = new ListStateDescriptor<>("id", String.class); - HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> snapshot = backend - .snapshotPartitionedState(682375462379L, 1); - + // draw a snapshot + KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot(682375462379L, 1, streamFactory)); assertNull(snapshot); - backend.dispose(); + backend.close(); - // Make sure we can restore from empty state - backend.initializeForJob(env, "test_op", IntSerializer.INSTANCE); - backend.injectKeyValueStateSnapshots((HashMap) snapshot); - backend.dispose(); + backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot); + backend.close(); } catch (Exception e) { e.printStackTrace(); http://git-wip-us.apache.org/repos/asf/flink/blob/f44b57cc/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 701281b..bedc8fa 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -614,7 +614,6 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>> private boolean performCheckpoint(final long checkpointId, final long timestamp) throws Exception { LOG.debug("Starting checkpoint {} on task {}", checkpointId, getName()); - synchronized (lock) { if (isRunning) { @@ -677,7 +676,6 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>> synchronized (cancelables) { cancelables.add(asyncCheckpointRunnable); } - asyncOperationsThreadPool.submit(asyncCheckpointRunnable); return true; } else { @@ -685,7 +683,11 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>> } } } - + + public ExecutorService getAsyncOperationsThreadPool() { + return asyncOperationsThreadPool; + } + @Override public void notifyCheckpointComplete(long checkpointId) throws Exception { synchronized (lock) { http://git-wip-us.apache.org/repos/asf/flink/blob/f44b57cc/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java index 874274f..c93a439 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.streaming.api.datastream.ConnectedStreams; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; @@ -371,8 +372,8 @@ public class StreamGraphGeneratorTest { StreamNode keyedResult3Node = graph.getStreamNode(keyedResult3.getId()); StreamNode keyedResult4Node = graph.getStreamNode(keyedResult4.getId()); - assertEquals(globalParallelism, keyedResult1Node.getMaxParallelism()); - assertEquals(mapParallelism, keyedResult2Node.getMaxParallelism()); + assertEquals(KeyGroupRangeAssignment.DEFAULT_MAX_PARALLELISM, keyedResult1Node.getMaxParallelism()); + assertEquals(KeyGroupRangeAssignment.DEFAULT_MAX_PARALLELISM, keyedResult2Node.getMaxParallelism()); assertEquals(maxParallelism, keyedResult3Node.getMaxParallelism()); assertEquals(maxParallelism, keyedResult4Node.getMaxParallelism()); } http://git-wip-us.apache.org/repos/asf/flink/blob/f44b57cc/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java index 03f50f9..7e86da0 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java @@ -113,6 +113,10 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> final int numberOfKeyGroups = (Integer) invocationOnMock.getArguments()[1]; final KeyGroupRange keyGroupRange = (KeyGroupRange) invocationOnMock.getArguments()[2]; + if(keyedStateBackend != null) { + keyedStateBackend.close(); + } + if (restoredKeyedState == null) { keyedStateBackend = stateBackend.createKeyedStateBackend( mockTask.getEnvironment(), @@ -195,5 +199,8 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> */ public void close() throws Exception { super.close(); + if(keyedStateBackend != null) { + keyedStateBackend.close(); + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/f44b57cc/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala index d353468..c57c29c 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala @@ -69,6 +69,7 @@ class StreamingOperatorsITCase extends ScalaStreamingMultipleProgramsTestBase { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(2) + env.getConfig.setMaxParallelism(2); val sourceStream = env.addSource(new SourceFunction[(Int, Int)] { http://git-wip-us.apache.org/repos/asf/flink/blob/f44b57cc/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java index 2d634de..2e6ce78 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java @@ -337,7 +337,6 @@ public class WindowCheckpointingITCase extends TestLogger { // we loop longer than we have elements, to permit delayed checkpoints // to still cause a failure while (running) { - if (!failedBefore) { // delay a bit, if we have not failed before Thread.sleep(1); @@ -350,17 +349,15 @@ public class WindowCheckpointingITCase extends TestLogger { } if (numElementsEmitted < numElementsToEmit && - (failedBefore || numElementsEmitted <= failureAfterNumElements)) - { + (failedBefore || numElementsEmitted <= failureAfterNumElements)) { // the function failed before, or we are in the elements before the failure synchronized (ctx.getCheckpointLock()) { int next = numElementsEmitted++; ctx.collect(new Tuple2<Long, IntType>((long) next, new IntType(next))); } - } - else { + } else { // if our work is done, delay a bit to prevent busy waiting - Thread.sleep(1); + Thread.sleep(10); } } } @@ -409,6 +406,7 @@ public class WindowCheckpointingITCase extends TestLogger { public void open(Configuration parameters) throws Exception { // this sink can only work with DOP 1 assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks()); + checkSuccess(); } @Override @@ -423,6 +421,10 @@ public class WindowCheckpointingITCase extends TestLogger { // check if we have seen all we expect aggCount += value.f1.value; + checkSuccess(); + } + + private void checkSuccess() throws SuccessException { if (aggCount >= elementCountExpected * countPerElementExpected) { // we are done. validate assertEquals(elementCountExpected, counts.size()); http://git-wip-us.apache.org/repos/asf/flink/blob/f44b57cc/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java index 1fbebd0..e49f832 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java @@ -524,6 +524,7 @@ public class IterateITCase extends StreamingMultipleProgramsTestBase { try { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(DEFAULT_PARALLELISM - 1); + env.getConfig().setMaxParallelism(env.getParallelism()); KeySelector<Integer, Integer> key = new KeySelector<Integer, Integer>() { http://git-wip-us.apache.org/repos/asf/flink/blob/f44b57cc/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningTest.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningTest.scala index 7ebf378..2ef5f01 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningTest.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningTest.scala @@ -39,6 +39,7 @@ class CustomPartitioningTest extends CompilerTestBase { val env = ExecutionEnvironment.getExecutionEnvironment env.setParallelism(parallelism) + env.getConfig.setMaxParallelism(parallelism); val data = env.fromElements( (0,0) ).rebalance() @@ -108,7 +109,8 @@ class CustomPartitioningTest extends CompilerTestBase { val env = ExecutionEnvironment.getExecutionEnvironment env.setParallelism(parallelism) - + env.getConfig.setMaxParallelism(parallelism); + val data = env.fromElements(new Pojo()).rebalance() data @@ -179,6 +181,7 @@ class CustomPartitioningTest extends CompilerTestBase { val env = ExecutionEnvironment.getExecutionEnvironment env.setParallelism(parallelism) + env.getConfig.setMaxParallelism(parallelism); val data = env.fromElements(new Pojo()).rebalance() http://git-wip-us.apache.org/repos/asf/flink/blob/f44b57cc/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/DeltaIterationTranslationTest.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/DeltaIterationTranslationTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/DeltaIterationTranslationTest.scala index 3121d68..05294b9 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/DeltaIterationTranslationTest.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/DeltaIterationTranslationTest.scala @@ -52,6 +52,7 @@ class DeltaIterationTranslationTest { val env = ExecutionEnvironment.getExecutionEnvironment env.setParallelism(DEFAULT_PARALLELISM) + env.getConfig.setMaxParallelism(DEFAULT_PARALLELISM); val initialSolutionSet = env.fromElements((3.44, 5L, "abc")) val initialWorkSet = env.fromElements((1.23, "abc"))