Repository: flink Updated Branches: refs/heads/release-0.9.0-milestone-1 90dc58b55 -> 287b4e9a3 (forced update)
[FLINK-1560] [streaming] Streaming example ITCases cleanup This closes #519 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/954beca7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/954beca7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/954beca7 Branch: refs/heads/release-0.9.0-milestone-1 Commit: 954beca7ec9c8ebadfaf11c77c37e493f190554b Parents: 464e782 Author: mbalassi <mbala...@apache.org> Authored: Mon Apr 6 23:46:06 2015 +0200 Committer: mbalassi <mbala...@apache.org> Committed: Tue Apr 7 16:09:37 2015 +0200 ---------------------------------------------------------------------- .../apache/flink/streaming/util/TestStreamEnvironment.java | 1 + .../test/socket/SocketTextStreamWordCountITCase.java | 8 +++++--- .../examples/test/windowing/SessionWindowingITCase.java | 1 - .../test/windowing/TopSpeedWindowingExampleITCase.java | 2 +- .../main/java/org/apache/flink/test/util/TestBaseUtils.java | 2 ++ 5 files changed, 9 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/954beca7/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java index 8ddf9e6..f7843cf 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java @@ -46,6 +46,7 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment { public TestStreamEnvironment(ForkableFlinkMiniCluster executor, int parallelism){ this.executor = executor; + setDefaultLocalParallelism(parallelism); setParallelism(parallelism); } http://git-wip-us.apache.org/repos/asf/flink/blob/954beca7/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/socket/SocketTextStreamWordCountITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/socket/SocketTextStreamWordCountITCase.java b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/socket/SocketTextStreamWordCountITCase.java index b16a85f..0af8fe2 100644 --- a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/socket/SocketTextStreamWordCountITCase.java +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/socket/SocketTextStreamWordCountITCase.java @@ -17,6 +17,7 @@ package org.apache.flink.streaming.examples.test.socket; +import org.apache.flink.runtime.net.NetUtils; import org.apache.flink.streaming.examples.socket.SocketTextStreamWordCount; import org.apache.flink.streaming.util.StreamingProgramTestBase; import org.apache.flink.test.testdata.WordCountData; @@ -29,14 +30,15 @@ import java.net.Socket; public class SocketTextStreamWordCountITCase extends StreamingProgramTestBase { private static final String HOST = "localhost"; - private static final String PORT = "9999"; + private static Integer port; protected String resultPath; private ServerSocket temporarySocket; @Override protected void preSubmit() throws Exception { - temporarySocket = createSocket(HOST, Integer.valueOf(PORT), WordCountData.TEXT); + port = NetUtils.getAvailablePort(); + temporarySocket = createSocket(HOST, port, WordCountData.TEXT); resultPath = getTempDirPath("result"); } @@ -48,7 +50,7 @@ public class SocketTextStreamWordCountITCase extends StreamingProgramTestBase { @Override protected void testProgram() throws Exception { - SocketTextStreamWordCount.main(new String[]{HOST, PORT, resultPath}); + SocketTextStreamWordCount.main(new String[]{HOST, port.toString(), resultPath}); } public ServerSocket createSocket(String host, int port, String contents) throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/954beca7/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/SessionWindowingITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/SessionWindowingITCase.java b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/SessionWindowingITCase.java index 5e332f1..2318aa4 100644 --- a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/SessionWindowingITCase.java +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/SessionWindowingITCase.java @@ -27,7 +27,6 @@ public class SessionWindowingITCase extends StreamingProgramTestBase { @Override protected void preSubmit() throws Exception { - setParallelism(2); resultPath = getTempDirPath("result"); } http://git-wip-us.apache.org/repos/asf/flink/blob/954beca7/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/TopSpeedWindowingExampleITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/TopSpeedWindowingExampleITCase.java b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/TopSpeedWindowingExampleITCase.java index f973bd1..d1fa9c6 100644 --- a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/TopSpeedWindowingExampleITCase.java +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/TopSpeedWindowingExampleITCase.java @@ -27,7 +27,7 @@ public class TopSpeedWindowingExampleITCase extends StreamingProgramTestBase { @Override protected void preSubmit() throws Exception { - setParallelism(1); + setParallelism(1); //needed to ensure total ordering for windows textPath = createTempFile("text.txt", TopSpeedWindowingExampleData.CAR_DATA); resultPath = getTempDirPath("result"); } http://git-wip-us.apache.org/repos/asf/flink/blob/954beca7/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java ---------------------------------------------------------------------- diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java index 754314b..1fc9af2 100644 --- a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java +++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java @@ -351,6 +351,8 @@ public class TestBaseUtils { } } catch (URISyntaxException e) { throw new IllegalArgumentException("This path does not describe a valid local file URI."); + } catch (NullPointerException e) { + throw new IllegalArgumentException("This path does not describe a valid local file URI."); } }