Repository: flink Updated Branches: refs/heads/master 4343e448c -> b56688453
[FLINK-1841] [streaming] More permissive WindowJoinITCase Instead of a checking against a concrete output it only determines well-formedness. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b5668845 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b5668845 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b5668845 Branch: refs/heads/master Commit: b56688453f0a3bd8edf5c1ccce25ce32b6a65296 Parents: 4343e44 Author: mbalassi <mbala...@apache.org> Authored: Wed Apr 8 11:54:56 2015 +0200 Committer: mbalassi <mbala...@apache.org> Committed: Wed Apr 8 13:42:08 2015 +0200 ---------------------------------------------------------------------- .../examples/join/util/WindowJoinData.java | 5 ----- .../examples/test/join/WindowJoinITCase.java | 5 ++++- .../apache/flink/test/util/TestBaseUtils.java | 23 ++++++++++++++++++++ 3 files changed, 27 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b5668845/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/util/WindowJoinData.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/util/WindowJoinData.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/util/WindowJoinData.java index 7d0c746..23d29b1 100644 --- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/util/WindowJoinData.java +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/util/WindowJoinData.java @@ -56,11 +56,6 @@ public class WindowJoinData { "(jerry,2138)\n" + "(alice,7503)\n" + "(alice,6424)\n" + "(tom,140)\n" + "(john,9802)\n" + "(grace,2977)\n" + "(grace,889)\n" + "(john,1338)"; - public static final String WINDOW_JOIN_RESULTS = "(bob,2,9018)\n" + "(bob,2,5472)\n" + "(bob,2,4685)\n" + - "(bob,2,710)\n" + "(bob,2,5936)\n" + "(bob,2,9896)\n" + "(bob,2,2048)\n" + "(bob,2,958)\n" + - "(bob,2,5258)\n" + "(bob,2,609)\n" + "(bob,2,1022)\n" + "(bob,2,6213)\n" + "(bob,2,3720)\n" + - "(bob,2,7612)\n" + "(bob,2,8302)\n" + "(bob,2,3968)\n" + "(bob,2,7461)"; - private WindowJoinData() { } } http://git-wip-us.apache.org/repos/asf/flink/blob/b5668845/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/join/WindowJoinITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/join/WindowJoinITCase.java b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/join/WindowJoinITCase.java index ddab597..0c1fb39 100644 --- a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/join/WindowJoinITCase.java +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/join/WindowJoinITCase.java @@ -38,7 +38,10 @@ public class WindowJoinITCase extends StreamingProgramTestBase { @Override protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(WindowJoinData.WINDOW_JOIN_RESULTS, resultPath); + // since the two sides of the join might have different speed + // the exact output can not be checked just whether it is well-formed + // checks that the result lines look like e.g. (bob, 2, 2015) + checkLinesAgainstRegexp(resultPath, "^\\([a-z]+,(\\d),(\\d)+\\)"); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/b5668845/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java ---------------------------------------------------------------------- diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java index 1fc9af2..9a38b4d 100644 --- a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java +++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java @@ -58,6 +58,8 @@ import java.util.Comparator; import java.util.LinkedList; import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; public class TestBaseUtils { @@ -276,6 +278,27 @@ public class TestBaseUtils { Assert.assertArrayEquals(expected, result); } + public void checkLinesAgainstRegexp(String resultPath, String regexp){ + Pattern pattern = Pattern.compile(regexp); + Matcher matcher = pattern.matcher(""); + + ArrayList<String> list = new ArrayList<String>(); + try { + readAllResultLines(list, resultPath, new String[]{}, false); + } catch (IOException e1) { + Assert.fail("Error reading the result"); + } + + for (String line : list){ + matcher.reset(line); + if (!matcher.find()){ + String msg = "Line is not well-formed: " + line; + Assert.fail(msg); + } + } + + } + public void compareKeyValueParisWithDelta(String expectedLines, String resultPath, String delimiter, double maxDelta) throws Exception { compareKeyValueParisWithDelta(expectedLines, resultPath, new String[]{}, delimiter, maxDelta);