Repository: flink
Updated Branches:
  refs/heads/master 5156a1b3f -> 6f07c5f3a


[FLINK-2528] [tests] Increase robustness and error reporting of MatchTaskTest


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6f07c5f3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6f07c5f3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6f07c5f3

Branch: refs/heads/master
Commit: 6f07c5f3af1b4fd67cb61ad140b99095f7ee6c45
Parents: 5156a1b
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Aug 25 14:42:47 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Aug 25 14:42:47 2015 +0200

----------------------------------------------------------------------
 .../flink/runtime/operators/MatchTaskTest.java  | 280 ++++++++++---------
 .../operators/testutils/DriverTestBase.java     |  22 +-
 2 files changed, 170 insertions(+), 132 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6f07c5f3/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
index 8fbf05e..15f3d0c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
@@ -16,12 +16,12 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.operators;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
@@ -38,9 +38,12 @@ import org.apache.flink.types.IntValue;
 import org.apache.flink.types.Key;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
+
 import org.junit.Assert;
 import org.junit.Test;
 
+import static org.junit.Assert.*;
+
 @SuppressWarnings("deprecation")
 public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, 
Record, Record>> {
        
@@ -58,11 +61,11 @@ public class MatchTaskTest extends 
DriverTestBase<FlatJoinFunction<Record, Recor
        
        @SuppressWarnings("unchecked")
        private final RecordComparator comparator1 = new RecordComparator(
-               new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ 
IntValue.class });
+               new int[]{0}, (Class<? extends Key<?>>[])new Class<?>[]{ 
IntValue.class });
        
        @SuppressWarnings("unchecked")
        private final RecordComparator comparator2 = new RecordComparator(
-               new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ 
IntValue.class });
+               new int[]{0}, (Class<? extends Key<?>>[])new Class<?>[]{ 
IntValue.class });
        
        private final List<Record> outList = new ArrayList<Record>();
        
@@ -393,150 +396,177 @@ public class MatchTaskTest extends 
DriverTestBase<FlatJoinFunction<Record, Recor
        
        @Test
        public void testCancelMatchTaskWhileSort1() {
-               int keyCnt = 20;
-               int valCnt = 20;
-               
-               setOutput(new NirvanaOutputList());
-               addDriverComparator(this.comparator1);
-               addDriverComparator(this.comparator2);
-               
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-               getTaskConfig().setDriverStrategy(DriverStrategy.MERGE);
-               getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
-               setNumFileHandlesForSort(4);
-               
-               final JoinDriver<Record, Record, Record> testTask = new 
JoinDriver<Record, Record, Record>();
+               final int keyCnt = 20;
+               final int valCnt = 20;
                
                try {
-                       addInputSorted(new 
DelayingInfinitiveInputIterator(100), this.comparator1.duplicate());
-                       addInput(new UniformRecordGenerator(keyCnt, valCnt, 
true));
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail("The test caused an exception.");
-               }
-               
-               final AtomicBoolean success = new AtomicBoolean(false);
-               
-               Thread taskRunner = new Thread() {
-                       @Override
-                       public void run() {
-                               try {
-                                       testDriver(testTask, 
MockMatchStub.class);
-                                       success.set(true);
-                               } catch (Exception ie) {
-                                       ie.printStackTrace();
+                       setOutput(new NirvanaOutputList());
+                       addDriverComparator(this.comparator1);
+                       addDriverComparator(this.comparator2);
+                       
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+                       getTaskConfig().setDriverStrategy(DriverStrategy.MERGE);
+                       getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
+                       setNumFileHandlesForSort(4);
+                       
+                       final JoinDriver<Record, Record, Record> testTask = new 
JoinDriver<Record, Record, Record>();
+                       
+                       try {
+                               addInputSorted(new 
DelayingInfinitiveInputIterator(100), this.comparator1.duplicate());
+                               addInput(new UniformRecordGenerator(keyCnt, 
valCnt, true));
+                       } catch (Exception e) {
+                               e.printStackTrace();
+                               Assert.fail("The test caused an exception.");
+                       }
+       
+                       final AtomicReference<Throwable> error = new 
AtomicReference<>();
+
+                       Thread taskRunner = new Thread("Task runner for 
testCancelMatchTaskWhileSort1()") {
+                               @Override
+                               public void run() {
+                                       try {
+                                               testDriver(testTask, 
MockMatchStub.class);
+                                       }
+                                       catch (Throwable t) {
+                                               error.set(t);
+                                       }
                                }
+                       };
+                       taskRunner.start();
+
+                       Thread.sleep(1000);
+
+                       cancel();
+                       taskRunner.interrupt();
+
+                       taskRunner.join(60000);
+
+                       assertFalse("Task thread did not finish within 60 
seconds", taskRunner.isAlive());
+
+                       Throwable taskError = error.get();
+                       if (taskError != null) {
+                               taskError.printStackTrace();
+                               fail("Error in task while canceling: " + 
taskError.getMessage());
                        }
-               };
-               taskRunner.start();
-               
-               TaskCancelThread tct = new TaskCancelThread(1, taskRunner, 
this);
-               tct.start();
-               
-               try {
-                       tct.join();
-                       taskRunner.join();
-               } catch(InterruptedException ie) {
-                       Assert.fail("Joining threads failed");
                }
-               
-               Assert.assertTrue("Test threw an exception even though it was 
properly canceled.", success.get());
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
        }
        
        @Test
        public void testCancelMatchTaskWhileSort2() {
-               int keyCnt = 20;
-               int valCnt = 20;
-               
-               setOutput(new NirvanaOutputList());
-               addDriverComparator(this.comparator1);
-               addDriverComparator(this.comparator2);
-               
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-               getTaskConfig().setDriverStrategy(DriverStrategy.MERGE);
-               getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
-               setNumFileHandlesForSort(4);
-               
-               final JoinDriver<Record, Record, Record> testTask = new 
JoinDriver<Record, Record, Record>();
+               final int keyCnt = 20;
+               final int valCnt = 20;
                
                try {
-                       addInput(new UniformRecordGenerator(keyCnt, valCnt, 
true));
-                       addInputSorted(new 
DelayingInfinitiveInputIterator(100), this.comparator1.duplicate());
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail("The test caused an exception.");
-               }
-               
-               final AtomicBoolean success = new AtomicBoolean(false);
-               
-               Thread taskRunner = new Thread() {
-                       @Override
-                       public void run() {
-                               try {
-                                       testDriver(testTask, 
MockMatchStub.class);
-                                       success.set(true);
-                               } catch (Exception ie) {
-                                       ie.printStackTrace();
+                       setOutput(new NirvanaOutputList());
+                       addDriverComparator(this.comparator1);
+                       addDriverComparator(this.comparator2);
+                       
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+                       getTaskConfig().setDriverStrategy(DriverStrategy.MERGE);
+                       getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
+                       setNumFileHandlesForSort(4);
+                       
+                       final JoinDriver<Record, Record, Record> testTask = new 
JoinDriver<Record, Record, Record>();
+                       
+                       try {
+                               addInput(new UniformRecordGenerator(keyCnt, 
valCnt, true));
+                               addInputSorted(new 
DelayingInfinitiveInputIterator(100), this.comparator1.duplicate());
+                       } catch (Exception e) {
+                               e.printStackTrace();
+                               Assert.fail("The test caused an exception.");
+                       }
+
+                       final AtomicReference<Throwable> error = new 
AtomicReference<>();
+
+                       Thread taskRunner = new Thread("Task runner for 
testCancelMatchTaskWhileSort2()") {
+                               @Override
+                               public void run() {
+                                       try {
+                                               testDriver(testTask, 
MockMatchStub.class);
+                                       }
+                                       catch (Throwable t) {
+                                               error.set(t);
+                                       }
                                }
+                       };
+                       taskRunner.start();
+
+                       Thread.sleep(1000);
+
+                       cancel();
+                       taskRunner.interrupt();
+
+                       taskRunner.join(60000);
+
+                       assertFalse("Task thread did not finish within 60 
seconds", taskRunner.isAlive());
+
+                       Throwable taskError = error.get();
+                       if (taskError != null) {
+                               taskError.printStackTrace();
+                               fail("Error in task while canceling: " + 
taskError.getMessage());
                        }
-               };
-               taskRunner.start();
-               
-               TaskCancelThread tct = new TaskCancelThread(1, taskRunner, 
this);
-               tct.start();
-               
-               try {
-                       tct.join();
-                       taskRunner.join();
-               } catch(InterruptedException ie) {
-                       Assert.fail("Joining threads failed");
                }
-               
-               Assert.assertTrue("Test threw an exception even though it was 
properly canceled.", success.get());
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
        }
        
        @Test
        public void testCancelMatchTaskWhileMatching() {
-               int keyCnt = 20;
-               int valCnt = 20;
-               
-               setOutput(new NirvanaOutputList());
-               addDriverComparator(this.comparator1);
-               addDriverComparator(this.comparator2);
-               
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-               getTaskConfig().setDriverStrategy(DriverStrategy.MERGE);
-               getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
-               setNumFileHandlesForSort(4);
-               
-               final JoinDriver<Record, Record, Record> testTask = new 
JoinDriver<Record, Record, Record>();
-               
-               addInput(new UniformRecordGenerator(keyCnt, valCnt, true));
-               addInput(new UniformRecordGenerator(keyCnt, valCnt, true));
-               
-               final AtomicBoolean success = new AtomicBoolean(false);
+               final int keyCnt = 20;
+               final int valCnt = 20;
                
-               Thread taskRunner = new Thread() {
-                       @Override
-                       public void run() {
-                               try {
-                                       testDriver(testTask, 
MockDelayingMatchStub.class);
-                                       success.set(true);
-                               } catch (Exception ie) {
-                                       ie.printStackTrace();
+               try {
+                       setOutput(new NirvanaOutputList());
+                       addDriverComparator(this.comparator1);
+                       addDriverComparator(this.comparator2);
+                       
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+                       getTaskConfig().setDriverStrategy(DriverStrategy.MERGE);
+                       getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
+                       setNumFileHandlesForSort(4);
+                       
+                       final JoinDriver<Record, Record, Record> testTask = new 
JoinDriver<Record, Record, Record>();
+                       
+                       addInput(new UniformRecordGenerator(keyCnt, valCnt, 
true));
+                       addInput(new UniformRecordGenerator(keyCnt, valCnt, 
true));
+                       
+                       final AtomicReference<Throwable> error = new 
AtomicReference<>();
+                       
+                       Thread taskRunner = new Thread("Task runner for 
testCancelMatchTaskWhileMatching()") {
+                               @Override
+                               public void run() {
+                                       try {
+                                               testDriver(testTask, 
MockDelayingMatchStub.class);
+                                       }
+                                       catch (Throwable t) {
+                                               error.set(t);
+                                       }
                                }
+                       };
+                       taskRunner.start();
+                       
+                       Thread.sleep(1000);
+                       
+                       cancel();
+                       taskRunner.interrupt();
+                       
+                       taskRunner.join(60000);
+                       
+                       assertFalse("Task thread did not finish within 60 
seconds", taskRunner.isAlive());
+                       
+                       Throwable taskError = error.get();
+                       if (taskError != null) {
+                               taskError.printStackTrace();
+                               fail("Error in task while canceling: " + 
taskError.getMessage());
                        }
-               };
-               taskRunner.start();
-               
-               TaskCancelThread tct = new TaskCancelThread(1, taskRunner, 
this);
-               tct.start();
-               
-               try {
-                       tct.join();
-                       taskRunner.join();
-               } catch(InterruptedException ie) {
-                       Assert.fail("Joining threads failed");
                }
-               
-               Assert.assertTrue("Test threw an exception even though it was 
properly canceled.", success.get());
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
        }
        
        @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/6f07c5f3/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
index e4aad98..12ca909 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.operators.testutils;
 
-import java.io.FileNotFoundException;
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.LinkedList;
@@ -88,7 +86,7 @@ public class DriverTestBase<S extends Function> extends 
TestLogger implements Pa
        
        private PactDriver<S, Record> driver;
        
-       private volatile boolean running;
+       private volatile boolean running = true;
 
        private ExecutionConfig executionConfig;
        
@@ -119,7 +117,7 @@ public class DriverTestBase<S extends Function> extends 
TestLogger implements Pa
        }
 
        @Parameterized.Parameters
-       public static Collection<Object[]> getConfigurations() throws 
FileNotFoundException, IOException {
+       public static Collection<Object[]> getConfigurations() {
 
                LinkedList<Object[]> configs = new LinkedList<Object[]>();
 
@@ -184,7 +182,6 @@ public class DriverTestBase<S extends Function> extends 
TestLogger implements Pa
                this.stub = (S)stubClass.newInstance();
 
                // regular running logic
-               this.running = true;
                boolean stubOpen = false;
 
                try {
@@ -205,6 +202,10 @@ public class DriverTestBase<S extends Function> extends 
TestLogger implements Pa
                                throw new Exception("The user defined 'open()' 
method caused an exception: " + t.getMessage(), t);
                        }
 
+                       if (!running) {
+                               return;
+                       }
+                       
                        // run the user code
                        driver.run();
 
@@ -222,10 +223,10 @@ public class DriverTestBase<S extends Function> extends 
TestLogger implements Pa
                                try {
                                        FunctionUtils.closeFunction(this.stub);
                                }
-                               catch (Throwable t) {}
+                               catch (Throwable ignored) {}
                        }
 
-                       // if resettable driver invoke treardown
+                       // if resettable driver invoke tear down
                        if (this.driver instanceof ResettablePactDriver) {
                                final ResettablePactDriver<?, ?> resDriver = 
(ResettablePactDriver<?, ?>) this.driver;
                                try {
@@ -269,6 +270,13 @@ public class DriverTestBase<S extends Function> extends 
TestLogger implements Pa
        
        public void cancel() throws Exception {
                this.running = false;
+               
+               // compensate for races, where cancel is called before the 
driver is set
+               // not that this is an artifact of a bad design of this test 
base, where the setup
+               // of the basic properties is not separated from the invocation 
of the execution logic 
+               while (this.driver == null) {
+                       Thread.sleep(200);
+               }
                this.driver.cancel();
        }
 

Reply via email to