Repository: flink
Updated Branches:
  refs/heads/master 4343e448c -> b56688453


[FLINK-1841] [streaming] More permissive WindowJoinITCase

Instead of a checking against a concrete output it only determines 
well-formedness.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b5668845
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b5668845
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b5668845

Branch: refs/heads/master
Commit: b56688453f0a3bd8edf5c1ccce25ce32b6a65296
Parents: 4343e44
Author: mbalassi <mbala...@apache.org>
Authored: Wed Apr 8 11:54:56 2015 +0200
Committer: mbalassi <mbala...@apache.org>
Committed: Wed Apr 8 13:42:08 2015 +0200

----------------------------------------------------------------------
 .../examples/join/util/WindowJoinData.java      |  5 -----
 .../examples/test/join/WindowJoinITCase.java    |  5 ++++-
 .../apache/flink/test/util/TestBaseUtils.java   | 23 ++++++++++++++++++++
 3 files changed, 27 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b5668845/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/util/WindowJoinData.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/util/WindowJoinData.java
 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/util/WindowJoinData.java
index 7d0c746..23d29b1 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/util/WindowJoinData.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/util/WindowJoinData.java
@@ -56,11 +56,6 @@ public class WindowJoinData {
                        "(jerry,2138)\n" + "(alice,7503)\n" + "(alice,6424)\n" 
+ "(tom,140)\n" + "(john,9802)\n" +
                        "(grace,2977)\n" + "(grace,889)\n" + "(john,1338)";
 
-       public static final String WINDOW_JOIN_RESULTS = "(bob,2,9018)\n" + 
"(bob,2,5472)\n" + "(bob,2,4685)\n" +
-                       "(bob,2,710)\n" + "(bob,2,5936)\n" + "(bob,2,9896)\n" + 
"(bob,2,2048)\n" + "(bob,2,958)\n" +
-                       "(bob,2,5258)\n" + "(bob,2,609)\n" + "(bob,2,1022)\n" + 
"(bob,2,6213)\n" + "(bob,2,3720)\n" +
-                       "(bob,2,7612)\n" + "(bob,2,8302)\n" + "(bob,2,3968)\n" 
+ "(bob,2,7461)";
-
        private WindowJoinData() {
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b5668845/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/join/WindowJoinITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/join/WindowJoinITCase.java
 
b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/join/WindowJoinITCase.java
index ddab597..0c1fb39 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/join/WindowJoinITCase.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/join/WindowJoinITCase.java
@@ -38,7 +38,10 @@ public class WindowJoinITCase extends 
StreamingProgramTestBase {
 
        @Override
        protected void postSubmit() throws Exception {
-               
compareResultsByLinesInMemory(WindowJoinData.WINDOW_JOIN_RESULTS, resultPath);
+               // since the two sides of the join might have different speed
+               // the exact output can not be checked just whether it is 
well-formed
+               // checks that the result lines look like e.g. (bob, 2, 2015)
+               checkLinesAgainstRegexp(resultPath, 
"^\\([a-z]+,(\\d),(\\d)+\\)");
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/b5668845/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java 
b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index 1fc9af2..9a38b4d 100644
--- 
a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ 
b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -58,6 +58,8 @@ import java.util.Comparator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 public class TestBaseUtils {
 
@@ -276,6 +278,27 @@ public class TestBaseUtils {
                Assert.assertArrayEquals(expected, result);
        }
 
+       public void checkLinesAgainstRegexp(String resultPath, String regexp){
+               Pattern pattern = Pattern.compile(regexp);
+               Matcher matcher = pattern.matcher("");
+
+               ArrayList<String> list = new ArrayList<String>();
+               try {
+                       readAllResultLines(list, resultPath, new String[]{}, 
false);
+               } catch (IOException e1) {
+                       Assert.fail("Error reading the result");
+               }
+
+               for (String line : list){
+                       matcher.reset(line);
+                       if (!matcher.find()){
+                               String msg = "Line is not well-formed: " + line;
+                               Assert.fail(msg);
+                       }
+               }
+
+       }
+
        public void compareKeyValueParisWithDelta(String expectedLines, String 
resultPath,
                                                                                
        String delimiter, double maxDelta) throws Exception {
                compareKeyValueParisWithDelta(expectedLines, resultPath, new 
String[]{}, delimiter, maxDelta);

Reply via email to