Repository: flink
Updated Branches:
  refs/heads/master d33b44549 -> 954beca7e


[FLINK-1560] [streaming] Streaming example ITCases cleanup

This closes #519


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/954beca7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/954beca7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/954beca7

Branch: refs/heads/master
Commit: 954beca7ec9c8ebadfaf11c77c37e493f190554b
Parents: 464e782
Author: mbalassi <mbala...@apache.org>
Authored: Mon Apr 6 23:46:06 2015 +0200
Committer: mbalassi <mbala...@apache.org>
Committed: Tue Apr 7 16:09:37 2015 +0200

----------------------------------------------------------------------
 .../apache/flink/streaming/util/TestStreamEnvironment.java   | 1 +
 .../test/socket/SocketTextStreamWordCountITCase.java         | 8 +++++---
 .../examples/test/windowing/SessionWindowingITCase.java      | 1 -
 .../test/windowing/TopSpeedWindowingExampleITCase.java       | 2 +-
 .../main/java/org/apache/flink/test/util/TestBaseUtils.java  | 2 ++
 5 files changed, 9 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/954beca7/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
index 8ddf9e6..f7843cf 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
@@ -46,6 +46,7 @@ public class TestStreamEnvironment extends 
StreamExecutionEnvironment {
 
        public TestStreamEnvironment(ForkableFlinkMiniCluster executor, int 
parallelism){
                this.executor = executor;
+               setDefaultLocalParallelism(parallelism);
                setParallelism(parallelism);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/954beca7/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/socket/SocketTextStreamWordCountITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/socket/SocketTextStreamWordCountITCase.java
 
b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/socket/SocketTextStreamWordCountITCase.java
index b16a85f..0af8fe2 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/socket/SocketTextStreamWordCountITCase.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/socket/SocketTextStreamWordCountITCase.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.examples.test.socket;
 
+import org.apache.flink.runtime.net.NetUtils;
 import org.apache.flink.streaming.examples.socket.SocketTextStreamWordCount;
 import org.apache.flink.streaming.util.StreamingProgramTestBase;
 import org.apache.flink.test.testdata.WordCountData;
@@ -29,14 +30,15 @@ import java.net.Socket;
 public class SocketTextStreamWordCountITCase extends StreamingProgramTestBase {
 
        private static final String HOST = "localhost";
-       private static final String PORT = "9999";
+       private static Integer port;
        protected String resultPath;
 
        private ServerSocket temporarySocket;
 
        @Override
        protected void preSubmit() throws Exception {
-               temporarySocket = createSocket(HOST, Integer.valueOf(PORT), 
WordCountData.TEXT);
+               port = NetUtils.getAvailablePort();
+               temporarySocket = createSocket(HOST, port, WordCountData.TEXT);
                resultPath = getTempDirPath("result");
        }
 
@@ -48,7 +50,7 @@ public class SocketTextStreamWordCountITCase extends 
StreamingProgramTestBase {
 
        @Override
        protected void testProgram() throws Exception {
-               SocketTextStreamWordCount.main(new String[]{HOST, PORT, 
resultPath});
+               SocketTextStreamWordCount.main(new String[]{HOST, 
port.toString(), resultPath});
        }
 
        public ServerSocket createSocket(String host, int port, String 
contents) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/954beca7/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/SessionWindowingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/SessionWindowingITCase.java
 
b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/SessionWindowingITCase.java
index 5e332f1..2318aa4 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/SessionWindowingITCase.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/SessionWindowingITCase.java
@@ -27,7 +27,6 @@ public class SessionWindowingITCase extends 
StreamingProgramTestBase {
 
        @Override
        protected void preSubmit() throws Exception {
-               setParallelism(2);
                resultPath = getTempDirPath("result");
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/954beca7/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/TopSpeedWindowingExampleITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/TopSpeedWindowingExampleITCase.java
 
b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/TopSpeedWindowingExampleITCase.java
index f973bd1..d1fa9c6 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/TopSpeedWindowingExampleITCase.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/TopSpeedWindowingExampleITCase.java
@@ -27,7 +27,7 @@ public class TopSpeedWindowingExampleITCase extends 
StreamingProgramTestBase {
 
        @Override
        protected void preSubmit() throws Exception {
-               setParallelism(1);
+               setParallelism(1); //needed to ensure total ordering for windows
                textPath = createTempFile("text.txt", 
TopSpeedWindowingExampleData.CAR_DATA);
                resultPath = getTempDirPath("result");
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/954beca7/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java 
b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index 754314b..1fc9af2 100644
--- 
a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ 
b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -351,6 +351,8 @@ public class TestBaseUtils {
                        }
                } catch (URISyntaxException e) {
                        throw new IllegalArgumentException("This path does not 
describe a valid local file URI.");
+               } catch (NullPointerException e) {
+                       throw new IllegalArgumentException("This path does not 
describe a valid local file URI.");
                }
        }
 

Reply via email to