Repository: flink Updated Branches: refs/heads/master 5156a1b3f -> 6f07c5f3a
[FLINK-2528] [tests] Increase robustness and error reporting of MatchTaskTest Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6f07c5f3 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6f07c5f3 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6f07c5f3 Branch: refs/heads/master Commit: 6f07c5f3af1b4fd67cb61ad140b99095f7ee6c45 Parents: 5156a1b Author: Stephan Ewen <se...@apache.org> Authored: Tue Aug 25 14:42:47 2015 +0200 Committer: Stephan Ewen <se...@apache.org> Committed: Tue Aug 25 14:42:47 2015 +0200 ---------------------------------------------------------------------- .../flink/runtime/operators/MatchTaskTest.java | 280 ++++++++++--------- .../operators/testutils/DriverTestBase.java | 22 +- 2 files changed, 170 insertions(+), 132 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/6f07c5f3/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java index 8fbf05e..15f3d0c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java @@ -16,12 +16,12 @@ * limitations under the License. */ - package org.apache.flink.runtime.operators; import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.FlatJoinFunction; @@ -38,9 +38,12 @@ import org.apache.flink.types.IntValue; import org.apache.flink.types.Key; import org.apache.flink.types.Record; import org.apache.flink.util.Collector; + import org.junit.Assert; import org.junit.Test; +import static org.junit.Assert.*; + @SuppressWarnings("deprecation") public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Record, Record>> { @@ -58,11 +61,11 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor @SuppressWarnings("unchecked") private final RecordComparator comparator1 = new RecordComparator( - new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ IntValue.class }); + new int[]{0}, (Class<? extends Key<?>>[])new Class<?>[]{ IntValue.class }); @SuppressWarnings("unchecked") private final RecordComparator comparator2 = new RecordComparator( - new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ IntValue.class }); + new int[]{0}, (Class<? extends Key<?>>[])new Class<?>[]{ IntValue.class }); private final List<Record> outList = new ArrayList<Record>(); @@ -393,150 +396,177 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor @Test public void testCancelMatchTaskWhileSort1() { - int keyCnt = 20; - int valCnt = 20; - - setOutput(new NirvanaOutputList()); - addDriverComparator(this.comparator1); - addDriverComparator(this.comparator2); - getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get()); - getTaskConfig().setDriverStrategy(DriverStrategy.MERGE); - getTaskConfig().setRelativeMemoryDriver(bnljn_frac); - setNumFileHandlesForSort(4); - - final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>(); + final int keyCnt = 20; + final int valCnt = 20; try { - addInputSorted(new DelayingInfinitiveInputIterator(100), this.comparator1.duplicate()); - addInput(new UniformRecordGenerator(keyCnt, valCnt, true)); - } catch (Exception e) { - e.printStackTrace(); - Assert.fail("The test caused an exception."); - } - - final AtomicBoolean success = new AtomicBoolean(false); - - Thread taskRunner = new Thread() { - @Override - public void run() { - try { - testDriver(testTask, MockMatchStub.class); - success.set(true); - } catch (Exception ie) { - ie.printStackTrace(); + setOutput(new NirvanaOutputList()); + addDriverComparator(this.comparator1); + addDriverComparator(this.comparator2); + getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get()); + getTaskConfig().setDriverStrategy(DriverStrategy.MERGE); + getTaskConfig().setRelativeMemoryDriver(bnljn_frac); + setNumFileHandlesForSort(4); + + final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>(); + + try { + addInputSorted(new DelayingInfinitiveInputIterator(100), this.comparator1.duplicate()); + addInput(new UniformRecordGenerator(keyCnt, valCnt, true)); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail("The test caused an exception."); + } + + final AtomicReference<Throwable> error = new AtomicReference<>(); + + Thread taskRunner = new Thread("Task runner for testCancelMatchTaskWhileSort1()") { + @Override + public void run() { + try { + testDriver(testTask, MockMatchStub.class); + } + catch (Throwable t) { + error.set(t); + } } + }; + taskRunner.start(); + + Thread.sleep(1000); + + cancel(); + taskRunner.interrupt(); + + taskRunner.join(60000); + + assertFalse("Task thread did not finish within 60 seconds", taskRunner.isAlive()); + + Throwable taskError = error.get(); + if (taskError != null) { + taskError.printStackTrace(); + fail("Error in task while canceling: " + taskError.getMessage()); } - }; - taskRunner.start(); - - TaskCancelThread tct = new TaskCancelThread(1, taskRunner, this); - tct.start(); - - try { - tct.join(); - taskRunner.join(); - } catch(InterruptedException ie) { - Assert.fail("Joining threads failed"); } - - Assert.assertTrue("Test threw an exception even though it was properly canceled.", success.get()); + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } } @Test public void testCancelMatchTaskWhileSort2() { - int keyCnt = 20; - int valCnt = 20; - - setOutput(new NirvanaOutputList()); - addDriverComparator(this.comparator1); - addDriverComparator(this.comparator2); - getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get()); - getTaskConfig().setDriverStrategy(DriverStrategy.MERGE); - getTaskConfig().setRelativeMemoryDriver(bnljn_frac); - setNumFileHandlesForSort(4); - - final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>(); + final int keyCnt = 20; + final int valCnt = 20; try { - addInput(new UniformRecordGenerator(keyCnt, valCnt, true)); - addInputSorted(new DelayingInfinitiveInputIterator(100), this.comparator1.duplicate()); - } catch (Exception e) { - e.printStackTrace(); - Assert.fail("The test caused an exception."); - } - - final AtomicBoolean success = new AtomicBoolean(false); - - Thread taskRunner = new Thread() { - @Override - public void run() { - try { - testDriver(testTask, MockMatchStub.class); - success.set(true); - } catch (Exception ie) { - ie.printStackTrace(); + setOutput(new NirvanaOutputList()); + addDriverComparator(this.comparator1); + addDriverComparator(this.comparator2); + getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get()); + getTaskConfig().setDriverStrategy(DriverStrategy.MERGE); + getTaskConfig().setRelativeMemoryDriver(bnljn_frac); + setNumFileHandlesForSort(4); + + final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>(); + + try { + addInput(new UniformRecordGenerator(keyCnt, valCnt, true)); + addInputSorted(new DelayingInfinitiveInputIterator(100), this.comparator1.duplicate()); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail("The test caused an exception."); + } + + final AtomicReference<Throwable> error = new AtomicReference<>(); + + Thread taskRunner = new Thread("Task runner for testCancelMatchTaskWhileSort2()") { + @Override + public void run() { + try { + testDriver(testTask, MockMatchStub.class); + } + catch (Throwable t) { + error.set(t); + } } + }; + taskRunner.start(); + + Thread.sleep(1000); + + cancel(); + taskRunner.interrupt(); + + taskRunner.join(60000); + + assertFalse("Task thread did not finish within 60 seconds", taskRunner.isAlive()); + + Throwable taskError = error.get(); + if (taskError != null) { + taskError.printStackTrace(); + fail("Error in task while canceling: " + taskError.getMessage()); } - }; - taskRunner.start(); - - TaskCancelThread tct = new TaskCancelThread(1, taskRunner, this); - tct.start(); - - try { - tct.join(); - taskRunner.join(); - } catch(InterruptedException ie) { - Assert.fail("Joining threads failed"); } - - Assert.assertTrue("Test threw an exception even though it was properly canceled.", success.get()); + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } } @Test public void testCancelMatchTaskWhileMatching() { - int keyCnt = 20; - int valCnt = 20; - - setOutput(new NirvanaOutputList()); - addDriverComparator(this.comparator1); - addDriverComparator(this.comparator2); - getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get()); - getTaskConfig().setDriverStrategy(DriverStrategy.MERGE); - getTaskConfig().setRelativeMemoryDriver(bnljn_frac); - setNumFileHandlesForSort(4); - - final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>(); - - addInput(new UniformRecordGenerator(keyCnt, valCnt, true)); - addInput(new UniformRecordGenerator(keyCnt, valCnt, true)); - - final AtomicBoolean success = new AtomicBoolean(false); + final int keyCnt = 20; + final int valCnt = 20; - Thread taskRunner = new Thread() { - @Override - public void run() { - try { - testDriver(testTask, MockDelayingMatchStub.class); - success.set(true); - } catch (Exception ie) { - ie.printStackTrace(); + try { + setOutput(new NirvanaOutputList()); + addDriverComparator(this.comparator1); + addDriverComparator(this.comparator2); + getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get()); + getTaskConfig().setDriverStrategy(DriverStrategy.MERGE); + getTaskConfig().setRelativeMemoryDriver(bnljn_frac); + setNumFileHandlesForSort(4); + + final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>(); + + addInput(new UniformRecordGenerator(keyCnt, valCnt, true)); + addInput(new UniformRecordGenerator(keyCnt, valCnt, true)); + + final AtomicReference<Throwable> error = new AtomicReference<>(); + + Thread taskRunner = new Thread("Task runner for testCancelMatchTaskWhileMatching()") { + @Override + public void run() { + try { + testDriver(testTask, MockDelayingMatchStub.class); + } + catch (Throwable t) { + error.set(t); + } } + }; + taskRunner.start(); + + Thread.sleep(1000); + + cancel(); + taskRunner.interrupt(); + + taskRunner.join(60000); + + assertFalse("Task thread did not finish within 60 seconds", taskRunner.isAlive()); + + Throwable taskError = error.get(); + if (taskError != null) { + taskError.printStackTrace(); + fail("Error in task while canceling: " + taskError.getMessage()); } - }; - taskRunner.start(); - - TaskCancelThread tct = new TaskCancelThread(1, taskRunner, this); - tct.start(); - - try { - tct.join(); - taskRunner.join(); - } catch(InterruptedException ie) { - Assert.fail("Joining threads failed"); } - - Assert.assertTrue("Test threw an exception even though it was properly canceled.", success.get()); + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } } @Test http://git-wip-us.apache.org/repos/asf/flink/blob/6f07c5f3/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java index e4aad98..12ca909 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java @@ -18,8 +18,6 @@ package org.apache.flink.runtime.operators.testutils; -import java.io.FileNotFoundException; -import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.LinkedList; @@ -88,7 +86,7 @@ public class DriverTestBase<S extends Function> extends TestLogger implements Pa private PactDriver<S, Record> driver; - private volatile boolean running; + private volatile boolean running = true; private ExecutionConfig executionConfig; @@ -119,7 +117,7 @@ public class DriverTestBase<S extends Function> extends TestLogger implements Pa } @Parameterized.Parameters - public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException { + public static Collection<Object[]> getConfigurations() { LinkedList<Object[]> configs = new LinkedList<Object[]>(); @@ -184,7 +182,6 @@ public class DriverTestBase<S extends Function> extends TestLogger implements Pa this.stub = (S)stubClass.newInstance(); // regular running logic - this.running = true; boolean stubOpen = false; try { @@ -205,6 +202,10 @@ public class DriverTestBase<S extends Function> extends TestLogger implements Pa throw new Exception("The user defined 'open()' method caused an exception: " + t.getMessage(), t); } + if (!running) { + return; + } + // run the user code driver.run(); @@ -222,10 +223,10 @@ public class DriverTestBase<S extends Function> extends TestLogger implements Pa try { FunctionUtils.closeFunction(this.stub); } - catch (Throwable t) {} + catch (Throwable ignored) {} } - // if resettable driver invoke treardown + // if resettable driver invoke tear down if (this.driver instanceof ResettablePactDriver) { final ResettablePactDriver<?, ?> resDriver = (ResettablePactDriver<?, ?>) this.driver; try { @@ -269,6 +270,13 @@ public class DriverTestBase<S extends Function> extends TestLogger implements Pa public void cancel() throws Exception { this.running = false; + + // compensate for races, where cancel is called before the driver is set + // not that this is an artifact of a bad design of this test base, where the setup + // of the basic properties is not separated from the invocation of the execution logic + while (this.driver == null) { + Thread.sleep(200); + } this.driver.cancel(); }