[FLINK-1982] [record-api] Remove dependencies on Record API from flink-runtime tests
Rename Match*Test to Join*Test and MapTaskTest to FlatMapTaskTest This closes #1294 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3c8a6588 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3c8a6588 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3c8a6588 Branch: refs/heads/master Commit: 3c8a6588ac9bd35984d6e3b1eef916461a262fe4 Parents: 7ff071f Author: Fabian Hueske <fhue...@apache.org> Authored: Thu Oct 22 21:10:41 2015 +0200 Committer: Fabian Hueske <fhue...@apache.org> Committed: Fri Oct 23 13:01:37 2015 +0200 ---------------------------------------------------------------------- .../operators/CombineTaskExternalITCase.java | 16 +- .../flink/runtime/operators/CrossTaskTest.java | 41 +- .../runtime/operators/DataSinkTaskTest.java | 52 +- .../runtime/operators/DataSourceTaskTest.java | 16 +- .../runtime/operators/FlatMapTaskTest.java | 149 +++ .../operators/JoinTaskExternalITCase.java | 165 +++ .../flink/runtime/operators/JoinTaskTest.java | 1017 +++++++++++++++++ .../flink/runtime/operators/MapTaskTest.java | 151 --- .../operators/MatchTaskExternalITCase.java | 167 --- .../flink/runtime/operators/MatchTaskTest.java | 1019 ------------------ .../operators/ReduceTaskExternalITCase.java | 16 +- .../flink/runtime/operators/ReduceTaskTest.java | 19 +- .../operators/chaining/ChainTaskTest.java | 25 +- .../operators/testutils/TaskTestBase.java | 18 +- 14 files changed, 1428 insertions(+), 1443 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3c8a6588/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java index 4905e57..800bca7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java @@ -22,11 +22,11 @@ import java.util.ArrayList; import java.util.HashMap; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.java.record.operators.ReduceOperator; import org.apache.flink.runtime.operators.testutils.ExpectedTestException; import org.apache.flink.util.Collector; import org.junit.Assert; import org.apache.flink.api.common.functions.RichGroupReduceFunction; +import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable; import org.apache.flink.api.common.typeutils.record.RecordComparator; import org.apache.flink.runtime.operators.testutils.DriverTestBase; import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator; @@ -42,7 +42,7 @@ public class CombineTaskExternalITCase extends DriverTestBase<RichGroupReduceFun private final double combine_frac; - private final ArrayList<Record> outList = new ArrayList<Record>(); + private final ArrayList<Record> outList = new ArrayList<>(); @SuppressWarnings("unchecked") private final RecordComparator comparator = new RecordComparator( @@ -69,7 +69,7 @@ public class CombineTaskExternalITCase extends DriverTestBase<RichGroupReduceFun getTaskConfig().setRelativeMemoryDriver(combine_frac); getTaskConfig().setFilehandlesDriver(2); - final GroupReduceCombineDriver<Record, Record> testTask = new GroupReduceCombineDriver<Record, Record>(); + final GroupReduceCombineDriver<Record, Record> testTask = new GroupReduceCombineDriver<>(); try { testDriver(testTask, MockCombiningReduceStub.class); @@ -85,7 +85,7 @@ public class CombineTaskExternalITCase extends DriverTestBase<RichGroupReduceFun // wee need to do the final aggregation manually in the test, because the // combiner is not guaranteed to do that - final HashMap<IntValue, IntValue> aggMap = new HashMap<IntValue, IntValue>(); + final HashMap<IntValue, IntValue> aggMap = new HashMap<>(); for (Record record : this.outList) { IntValue key = new IntValue(); IntValue value = new IntValue(); @@ -123,7 +123,7 @@ public class CombineTaskExternalITCase extends DriverTestBase<RichGroupReduceFun getTaskConfig().setRelativeMemoryDriver(combine_frac); getTaskConfig().setFilehandlesDriver(2); - final GroupReduceCombineDriver<Record, Record> testTask = new GroupReduceCombineDriver<Record, Record>(); + final GroupReduceCombineDriver<Record, Record> testTask = new GroupReduceCombineDriver<>(); try { testDriver(testTask, MockCombiningReduceStub.class); @@ -139,7 +139,7 @@ public class CombineTaskExternalITCase extends DriverTestBase<RichGroupReduceFun // wee need to do the final aggregation manually in the test, because the // combiner is not guaranteed to do that - final HashMap<IntValue, IntValue> aggMap = new HashMap<IntValue, IntValue>(); + final HashMap<IntValue, IntValue> aggMap = new HashMap<>(); for (Record record : this.outList) { IntValue key = new IntValue(); IntValue value = new IntValue(); @@ -166,7 +166,7 @@ public class CombineTaskExternalITCase extends DriverTestBase<RichGroupReduceFun // ------------------------------------------------------------------------ // ------------------------------------------------------------------------ - @ReduceOperator.Combinable + @Combinable public static class MockCombiningReduceStub extends RichGroupReduceFunction<Record, Record> { private static final long serialVersionUID = 1L; @@ -194,7 +194,7 @@ public class CombineTaskExternalITCase extends DriverTestBase<RichGroupReduceFun } } - @ReduceOperator.Combinable + @Combinable public static final class MockFailingCombiningReduceStub extends RichGroupReduceFunction<Record, Record> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/flink/blob/3c8a6588/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CrossTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CrossTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CrossTaskTest.java index 4c27a68..2d8838a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CrossTaskTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CrossTaskTest.java @@ -31,7 +31,6 @@ import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator; import org.apache.flink.types.Record; import org.junit.Test; -@SuppressWarnings("deprecation") public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record, Record>> { private static final long CROSS_MEM = 1024 * 1024; @@ -65,7 +64,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record, getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_FIRST); getTaskConfig().setRelativeMemoryDriver(cross_frac); - final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>(); + final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>(); try { testDriver(testTask, MockCrossStub.class); @@ -95,7 +94,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record, getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_SECOND); getTaskConfig().setRelativeMemoryDriver(cross_frac); - final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>(); + final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>(); try { testDriver(testTask, MockCrossStub.class); @@ -123,7 +122,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record, getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_FIRST); getTaskConfig().setRelativeMemoryDriver(cross_frac); - final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>(); + final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>(); try { testDriver(testTask, MockFailingCrossStub.class); @@ -153,7 +152,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record, getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_SECOND); getTaskConfig().setRelativeMemoryDriver(cross_frac); - final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>(); + final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>(); try { testDriver(testTask, MockFailingCrossStub.class); @@ -184,7 +183,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record, getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_FIRST); getTaskConfig().setRelativeMemoryDriver(cross_frac); - final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>(); + final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>(); try { testDriver(testTask, MockCrossStub.class); @@ -215,7 +214,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record, getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_SECOND); getTaskConfig().setRelativeMemoryDriver(cross_frac); - final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>(); + final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>(); try { testDriver(testTask, MockCrossStub.class); @@ -243,7 +242,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record, getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_FIRST); getTaskConfig().setRelativeMemoryDriver(cross_frac); - final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>(); + final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>(); try { testDriver(testTask, MockFailingCrossStub.class); @@ -272,7 +271,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record, getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_SECOND); getTaskConfig().setRelativeMemoryDriver(cross_frac); - final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>(); + final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>(); try { testDriver(testTask, MockFailingCrossStub.class); @@ -303,7 +302,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record, getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_FIRST); getTaskConfig().setRelativeMemoryDriver(cross_frac); - final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>(); + final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>(); try { testDriver(testTask, MockCrossStub.class); @@ -333,7 +332,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record, getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_SECOND); getTaskConfig().setRelativeMemoryDriver(cross_frac); - final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>(); + final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>(); try { testDriver(testTask, MockCrossStub.class); @@ -363,7 +362,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record, getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_FIRST); getTaskConfig().setRelativeMemoryDriver(cross_frac); - final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>(); + final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>(); try { testDriver(testTask, MockCrossStub.class); @@ -393,7 +392,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record, getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_SECOND); getTaskConfig().setRelativeMemoryDriver(cross_frac); - final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>(); + final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>(); try { testDriver(testTask, MockCrossStub.class); @@ -420,7 +419,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record, getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_FIRST); getTaskConfig().setRelativeMemoryDriver(cross_frac); - final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>(); + final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>(); final AtomicBoolean success = new AtomicBoolean(false); @@ -463,7 +462,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record, getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_SECOND); getTaskConfig().setRelativeMemoryDriver(cross_frac); - final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>(); + final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>(); final AtomicBoolean success = new AtomicBoolean(false); @@ -485,7 +484,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record, try { tct.join(); - taskRunner.join(); + taskRunner.join(); } catch(InterruptedException ie) { Assert.fail("Joining threads failed"); } @@ -506,7 +505,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record, getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_FIRST); getTaskConfig().setRelativeMemoryDriver(cross_frac); - final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>(); + final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>(); final AtomicBoolean success = new AtomicBoolean(false); @@ -549,7 +548,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record, getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_SECOND); getTaskConfig().setRelativeMemoryDriver(cross_frac); - final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>(); + final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>(); final AtomicBoolean success = new AtomicBoolean(false); @@ -571,7 +570,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record, try { tct.join(); - taskRunner.join(); + taskRunner.join(); } catch(InterruptedException ie) { Assert.fail("Joining threads failed"); } @@ -579,7 +578,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record, Assert.assertTrue("Exception was thrown despite proper canceling.", success.get()); } - public static final class MockCrossStub extends org.apache.flink.api.java.record.functions.CrossFunction { + public static final class MockCrossStub implements CrossFunction<Record, Record, Record> { private static final long serialVersionUID = 1L; @Override @@ -588,7 +587,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record, } } - public static final class MockFailingCrossStub extends org.apache.flink.api.java.record.functions.CrossFunction { + public static final class MockFailingCrossStub implements CrossFunction<Record, Record, Record> { private static final long serialVersionUID = 1L; private int cnt = 0; http://git-wip-us.apache.org/repos/asf/flink/blob/3c8a6588/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java index e91d338..b741b64 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java @@ -18,8 +18,8 @@ package org.apache.flink.runtime.operators; +import org.apache.flink.api.common.io.FileOutputFormat; import org.apache.flink.api.common.typeutils.record.RecordComparatorFactory; -import org.apache.flink.api.java.record.io.DelimitedOutputFormat; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.io.network.partition.consumer.IteratorWrappingTestSingleInputGate; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; @@ -30,7 +30,6 @@ import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator; import org.apache.flink.runtime.operators.util.LocalStrategy; import org.apache.flink.runtime.taskmanager.Task; import org.apache.flink.types.IntValue; -import org.apache.flink.types.Key; import org.apache.flink.types.Record; import org.junit.After; import org.junit.Assert; @@ -81,7 +80,7 @@ public class DataSinkTaskTest extends TaskTestBase super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE); super.addInput(new UniformRecordGenerator(keyCnt, valCnt, false), 0); - DataSinkTask<Record> testTask = new DataSinkTask<Record>(); + DataSinkTask<Record> testTask = new DataSinkTask<>(); super.registerFileOutputTask(testTask, MockOutputFormat.class, new File(tempTestPath).toURI().toString()); @@ -94,7 +93,7 @@ public class DataSinkTaskTest extends TaskTestBase fr = new FileReader(tempTestFile); br = new BufferedReader(fr); - HashMap<Integer, HashSet<Integer>> keyValueCountMap = new HashMap<Integer, HashSet<Integer>>(keyCnt); + HashMap<Integer, HashSet<Integer>> keyValueCountMap = new HashMap<>(keyCnt); while (br.ready()) { String line = br.readLine(); @@ -144,7 +143,7 @@ public class DataSinkTaskTest extends TaskTestBase readers[2] = super.addInput(new UniformRecordGenerator(keyCnt, valCnt, keyCnt * 2, 0, false), 0, false); readers[3] = super.addInput(new UniformRecordGenerator(keyCnt, valCnt, keyCnt * 3, 0, false), 0, false); - DataSinkTask<Record> testTask = new DataSinkTask<Record>(); + DataSinkTask<Record> testTask = new DataSinkTask<>(); super.registerFileOutputTask(testTask, MockOutputFormat.class, new File(tempTestPath).toURI().toString()); @@ -171,7 +170,7 @@ public class DataSinkTaskTest extends TaskTestBase fr = new FileReader(tempTestFile); br = new BufferedReader(fr); - HashMap<Integer,HashSet<Integer>> keyValueCountMap = new HashMap<Integer, HashSet<Integer>>(keyCnt); + HashMap<Integer,HashSet<Integer>> keyValueCountMap = new HashMap<>(keyCnt); while(br.ready()) { String line = br.readLine(); @@ -219,12 +218,12 @@ public class DataSinkTaskTest extends TaskTestBase super.addInput(new UniformRecordGenerator(keyCnt, valCnt, true), 0); - DataSinkTask<Record> testTask = new DataSinkTask<Record>(); + DataSinkTask<Record> testTask = new DataSinkTask<>(); // set sorting super.getTaskConfig().setInputLocalStrategy(0, LocalStrategy.SORT); super.getTaskConfig().setInputComparator( - new RecordComparatorFactory(new int[]{1},((Class<? extends Key<?>>[])new Class[]{IntValue.class})), 0); + new RecordComparatorFactory(new int[]{1},(new Class[]{IntValue.class})), 0); super.getTaskConfig().setRelativeMemoryInput(0, memoryFraction); super.getTaskConfig().setFilehandlesInput(0, 8); super.getTaskConfig().setSpillingThresholdInput(0, 0.8f); @@ -248,7 +247,7 @@ public class DataSinkTaskTest extends TaskTestBase fr = new FileReader(tempTestFile); br = new BufferedReader(fr); - Set<Integer> keys = new HashSet<Integer>(); + Set<Integer> keys = new HashSet<>(); int curVal = -1; while(br.ready()) { @@ -297,7 +296,7 @@ public class DataSinkTaskTest extends TaskTestBase super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE); super.addInput(new UniformRecordGenerator(keyCnt, valCnt, false), 0); - DataSinkTask<Record> testTask = new DataSinkTask<Record>(); + DataSinkTask<Record> testTask = new DataSinkTask<>(); Configuration stubParams = new Configuration(); super.getTaskConfig().setStubParameters(stubParams); @@ -323,21 +322,20 @@ public class DataSinkTaskTest extends TaskTestBase public void testFailingSortingDataSinkTask() { int keyCnt = 100; - int valCnt = 20;; + int valCnt = 20; double memoryFraction = 1.0; super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE); super.addInput(new UniformRecordGenerator(keyCnt, valCnt, true), 0); - DataSinkTask<Record> testTask = new DataSinkTask<Record>(); + DataSinkTask<Record> testTask = new DataSinkTask<>(); Configuration stubParams = new Configuration(); super.getTaskConfig().setStubParameters(stubParams); // set sorting super.getTaskConfig().setInputLocalStrategy(0, LocalStrategy.SORT); super.getTaskConfig().setInputComparator( - new RecordComparatorFactory(new int[]{1}, ((Class<? extends Key<?>>[]) new Class[]{IntValue.class})), - 0); + new RecordComparatorFactory(new int[]{1}, ( new Class[]{IntValue.class})), 0); super.getTaskConfig().setRelativeMemoryInput(0, memoryFraction); super.getTaskConfig().setFilehandlesInput(0, 8); super.getTaskConfig().setSpillingThresholdInput(0, 0.8f); @@ -365,7 +363,7 @@ public class DataSinkTaskTest extends TaskTestBase super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE); super.addInput(new InfiniteInputIterator(), 0); - final DataSinkTask<Record> testTask = new DataSinkTask<Record>(); + final DataSinkTask<Record> testTask = new DataSinkTask<>(); Configuration stubParams = new Configuration(); super.getTaskConfig().setStubParameters(stubParams); @@ -407,15 +405,14 @@ public class DataSinkTaskTest extends TaskTestBase super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE); super.addInput(new InfiniteInputIterator(), 0); - final DataSinkTask<Record> testTask = new DataSinkTask<Record>(); + final DataSinkTask<Record> testTask = new DataSinkTask<>(); Configuration stubParams = new Configuration(); super.getTaskConfig().setStubParameters(stubParams); // set sorting super.getTaskConfig().setInputLocalStrategy(0, LocalStrategy.SORT); super.getTaskConfig().setInputComparator( - new RecordComparatorFactory(new int[]{1},((Class<? extends Key<?>>[])new Class[]{IntValue.class})), - 0); + new RecordComparatorFactory(new int[]{1},(new Class[]{IntValue.class})), 0); super.getTaskConfig().setRelativeMemoryInput(0, memoryFraction); super.getTaskConfig().setFilehandlesInput(0, 8); super.getTaskConfig().setSpillingThresholdInput(0, 0.8f); @@ -447,7 +444,7 @@ public class DataSinkTaskTest extends TaskTestBase } - public static class MockOutputFormat extends DelimitedOutputFormat { + public static class MockOutputFormat extends FileOutputFormat<Record> { private static final long serialVersionUID = 1L; final StringBuilder bld = new StringBuilder(); @@ -458,8 +455,7 @@ public class DataSinkTaskTest extends TaskTestBase } @Override - public int serializeRecord(Record rec, byte[] target) throws Exception - { + public void writeRecord(Record rec) throws IOException { IntValue key = rec.getField(0, IntValue.class); IntValue value = rec.getField(1, IntValue.class); @@ -467,14 +463,11 @@ public class DataSinkTaskTest extends TaskTestBase this.bld.append(key.getValue()); this.bld.append('_'); this.bld.append(value.getValue()); + this.bld.append('\n'); byte[] bytes = this.bld.toString().getBytes(); - if (bytes.length <= target.length) { - System.arraycopy(bytes, 0, target, 0, bytes.length); - return bytes.length; - } - // else - return -bytes.length; + + this.stream.write(bytes); } } @@ -490,12 +483,11 @@ public class DataSinkTaskTest extends TaskTestBase } @Override - public int serializeRecord(Record rec, byte[] target) throws Exception - { + public void writeRecord(Record rec) throws IOException { if (++this.cnt >= 10) { throw new RuntimeException("Expected Test Exception"); } - return super.serializeRecord(rec, target); + super.writeRecord(rec); } } http://git-wip-us.apache.org/repos/asf/flink/blob/3c8a6588/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java index 4548410..96ae700 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java @@ -28,11 +28,11 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; +import org.apache.flink.api.common.io.DelimitedInputFormat; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.taskmanager.Task; import org.junit.Assert; -import org.apache.flink.api.java.record.io.DelimitedInputFormat; import org.apache.flink.runtime.operators.testutils.NirvanaOutputList; import org.apache.flink.runtime.operators.testutils.TaskCancelThread; import org.apache.flink.runtime.operators.testutils.TaskTestBase; @@ -83,7 +83,7 @@ public class DataSourceTaskTest extends TaskTestBase { super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE); super.addOutput(this.outList); - DataSourceTask<Record> testTask = new DataSourceTask<Record>(); + DataSourceTask<Record> testTask = new DataSourceTask<>(); super.registerFileInputTask(testTask, MockInputFormat.class, new File(tempTestPath).toURI().toString(), "\n"); @@ -97,7 +97,7 @@ public class DataSourceTaskTest extends TaskTestBase { Assert.assertTrue("Invalid output size. Expected: "+(keyCnt*valCnt)+" Actual: "+this.outList.size(), this.outList.size() == keyCnt * valCnt); - HashMap<Integer,HashSet<Integer>> keyValueCountMap = new HashMap<Integer, HashSet<Integer>>(keyCnt); + HashMap<Integer,HashSet<Integer>> keyValueCountMap = new HashMap<>(keyCnt); for (Record kvp : this.outList) { @@ -138,7 +138,7 @@ public class DataSourceTaskTest extends TaskTestBase { super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE); super.addOutput(this.outList); - DataSourceTask<Record> testTask = new DataSourceTask<Record>(); + DataSourceTask<Record> testTask = new DataSourceTask<>(); super.registerFileInputTask(testTask, MockFailingInputFormat.class, new File(tempTestPath).toURI().toString(), "\n"); @@ -172,7 +172,7 @@ public class DataSourceTaskTest extends TaskTestBase { Assert.fail("Unable to set-up test input file"); } - final DataSourceTask<Record> testTask = new DataSourceTask<Record>(); + final DataSourceTask<Record> testTask = new DataSourceTask<>(); super.registerFileInputTask(testTask, MockDelayingInputFormat.class, new File(tempTestPath).toURI().toString(), "\n"); @@ -232,7 +232,7 @@ public class DataSourceTaskTest extends TaskTestBase { } } - public static class MockInputFormat extends DelimitedInputFormat { + public static class MockInputFormat extends DelimitedInputFormat<Record> { private static final long serialVersionUID = 1L; private final IntValue key = new IntValue(); @@ -257,7 +257,7 @@ public class DataSourceTaskTest extends TaskTestBase { } } - public static class MockDelayingInputFormat extends DelimitedInputFormat { + public static class MockDelayingInputFormat extends DelimitedInputFormat<Record> { private static final long serialVersionUID = 1L; private final IntValue key = new IntValue(); @@ -289,7 +289,7 @@ public class DataSourceTaskTest extends TaskTestBase { } - public static class MockFailingInputFormat extends DelimitedInputFormat { + public static class MockFailingInputFormat extends DelimitedInputFormat<Record> { private static final long serialVersionUID = 1L; private final IntValue key = new IntValue(); http://git-wip-us.apache.org/repos/asf/flink/blob/3c8a6588/flink-runtime/src/test/java/org/apache/flink/runtime/operators/FlatMapTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/FlatMapTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/FlatMapTaskTest.java new file mode 100644 index 0000000..3d7f99e --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/FlatMapTaskTest.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.runtime.operators; + +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector; +import org.apache.flink.runtime.operators.testutils.DriverTestBase; +import org.apache.flink.runtime.operators.testutils.ExpectedTestException; +import org.apache.flink.runtime.operators.testutils.InfiniteInputIterator; +import org.apache.flink.runtime.operators.testutils.TaskCancelThread; +import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator; +import org.apache.flink.types.Record; +import org.apache.flink.util.Collector; +import org.junit.Assert; +import org.junit.Test; + +public class FlatMapTaskTest extends DriverTestBase<FlatMapFunction<Record, Record>> { + + private static final Logger LOG = LoggerFactory.getLogger(FlatMapTaskTest.class); + + private final CountingOutputCollector output = new CountingOutputCollector(); + + + public FlatMapTaskTest(ExecutionConfig config) { + super(config, 0, 0); + } + + @Test + public void testMapTask() { + final int keyCnt = 100; + final int valCnt = 20; + + addInput(new UniformRecordGenerator(keyCnt, valCnt, false)); + setOutput(this.output); + + final FlatMapDriver<Record, Record> testDriver = new FlatMapDriver<>(); + + try { + testDriver(testDriver, MockMapStub.class); + } catch (Exception e) { + LOG.debug("Exception while running the test driver.", e); + Assert.fail("Invoke method caused exception."); + } + + Assert.assertEquals("Wrong result set size.", keyCnt*valCnt, this.output.getNumberOfRecords()); + } + + @Test + public void testFailingMapTask() { + final int keyCnt = 100; + final int valCnt = 20; + + addInput(new UniformRecordGenerator(keyCnt, valCnt, false)); + setOutput(new DiscardingOutputCollector<Record>()); + + final FlatMapDriver<Record, Record> testTask = new FlatMapDriver<>(); + try { + testDriver(testTask, MockFailingMapStub.class); + Assert.fail("Function exception was not forwarded."); + } catch (ExpectedTestException e) { + // good! + } catch (Exception e) { + e.printStackTrace(); + Assert.fail("Exception in test."); + } + } + + @Test + public void testCancelMapTask() { + addInput(new InfiniteInputIterator()); + setOutput(new DiscardingOutputCollector<Record>()); + + final FlatMapDriver<Record, Record> testTask = new FlatMapDriver<>(); + + final AtomicBoolean success = new AtomicBoolean(false); + + final Thread taskRunner = new Thread() { + @Override + public void run() { + try { + testDriver(testTask, MockMapStub.class); + success.set(true); + } catch (Exception ie) { + ie.printStackTrace(); + } + } + }; + taskRunner.start(); + + TaskCancelThread tct = new TaskCancelThread(1, taskRunner, this); + tct.start(); + + try { + tct.join(); + taskRunner.join(); + } catch(InterruptedException ie) { + Assert.fail("Joining threads failed"); + } + + Assert.assertTrue("Test threw an exception even though it was properly canceled.", success.get()); + } + + public static class MockMapStub extends RichFlatMapFunction<Record, Record> { + private static final long serialVersionUID = 1L; + + @Override + public void flatMap(Record record, Collector<Record> out) throws Exception { + out.collect(record); + } + + } + + public static class MockFailingMapStub extends RichFlatMapFunction<Record, Record> { + private static final long serialVersionUID = 1L; + + private int cnt = 0; + + @Override + public void flatMap(Record record, Collector<Record> out) throws Exception { + if (++this.cnt >= 10) { + throw new ExpectedTestException(); + } + out.collect(record); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/3c8a6588/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskExternalITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskExternalITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskExternalITCase.java new file mode 100644 index 0000000..5b2e6eb --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskExternalITCase.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.runtime.operators; + +import org.apache.flink.api.common.ExecutionConfig; +import org.junit.Assert; +import org.apache.flink.api.common.functions.FlatJoinFunction; +import org.apache.flink.api.common.typeutils.record.RecordComparator; +import org.apache.flink.api.common.typeutils.record.RecordPairComparatorFactory; +import org.apache.flink.runtime.operators.testutils.DriverTestBase; +import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.Key; +import org.apache.flink.types.Record; +import org.apache.flink.util.Collector; +import org.junit.Test; + +public class JoinTaskExternalITCase extends DriverTestBase<FlatJoinFunction<Record, Record, Record>> { + + private static final long HASH_MEM = 4*1024*1024; + + private static final long SORT_MEM = 3*1024*1024; + + private static final long BNLJN_MEM = 10 * PAGE_SIZE; + + private final double bnljn_frac; + + private final double hash_frac; + + @SuppressWarnings("unchecked") + private final RecordComparator comparator1 = new RecordComparator( + new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ IntValue.class }); + + @SuppressWarnings("unchecked") + private final RecordComparator comparator2 = new RecordComparator( + new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ IntValue.class }); + + private final CountingOutputCollector output = new CountingOutputCollector(); + + public JoinTaskExternalITCase(ExecutionConfig config) { + super(config, HASH_MEM, 2, SORT_MEM); + bnljn_frac = (double)BNLJN_MEM/this.getMemoryManager().getMemorySize(); + hash_frac = (double)HASH_MEM/this.getMemoryManager().getMemorySize(); + } + + @Test + public void testExternalSort1MatchTask() { + final int keyCnt1 = 16384*4; + final int valCnt1 = 2; + + final int keyCnt2 = 8192; + final int valCnt2 = 4*2; + + final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2); + + setOutput(this.output); + addDriverComparator(this.comparator1); + addDriverComparator(this.comparator2); + getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get()); + getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE); + getTaskConfig().setRelativeMemoryDriver(bnljn_frac); + setNumFileHandlesForSort(4); + + final JoinDriver<Record, Record, Record> testTask = new JoinDriver<>(); + + try { + addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate()); + addInputSorted(new UniformRecordGenerator(keyCnt2, valCnt2, false), this.comparator2.duplicate()); + testDriver(testTask, MockMatchStub.class); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail("The test caused an exception."); + } + + Assert.assertEquals("Wrong result set size.", expCnt, this.output.getNumberOfRecords()); + } + + @Test + public void testExternalHash1MatchTask() { + final int keyCnt1 = 32768; + final int valCnt1 = 8; + + final int keyCnt2 = 65536; + final int valCnt2 = 8; + + final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2); + + addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false)); + addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false)); + addDriverComparator(this.comparator1); + addDriverComparator(this.comparator2); + getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get()); + setOutput(this.output); + getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST); + getTaskConfig().setRelativeMemoryDriver(hash_frac); + + JoinDriver<Record, Record, Record> testTask = new JoinDriver<>(); + + try { + testDriver(testTask, MockMatchStub.class); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail("Test caused an exception."); + } + + Assert.assertEquals("Wrong result set size.", expCnt, this.output.getNumberOfRecords()); + } + + @Test + public void testExternalHash2MatchTask() { + final int keyCnt1 = 32768; + final int valCnt1 = 8; + + final int keyCnt2 = 65536; + final int valCnt2 = 8; + + final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2); + + addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false)); + addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false)); + addDriverComparator(this.comparator1); + addDriverComparator(this.comparator2); + getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get()); + setOutput(this.output); + getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND); + getTaskConfig().setRelativeMemoryDriver(hash_frac); + + JoinDriver<Record, Record, Record> testTask = new JoinDriver<>(); + + try { + testDriver(testTask, MockMatchStub.class); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail("Test caused an exception."); + } + + Assert.assertEquals("Wrong result set size.", expCnt, this.output.getNumberOfRecords()); + } + + public static final class MockMatchStub implements FlatJoinFunction<Record, Record, Record> { + private static final long serialVersionUID = 1L; + + @Override + public void join(Record value1, Record value2, Collector<Record> out) throws Exception { + out.collect(value1); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/3c8a6588/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskTest.java new file mode 100644 index 0000000..ecde59e --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskTest.java @@ -0,0 +1,1017 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.FlatJoinFunction; +import org.apache.flink.api.common.typeutils.record.RecordComparator; +import org.apache.flink.api.common.typeutils.record.RecordPairComparatorFactory; +import org.apache.flink.runtime.operators.testutils.DelayingInfinitiveInputIterator; +import org.apache.flink.runtime.operators.testutils.DriverTestBase; +import org.apache.flink.runtime.operators.testutils.ExpectedTestException; +import org.apache.flink.runtime.operators.testutils.NirvanaOutputList; +import org.apache.flink.runtime.operators.testutils.TaskCancelThread; +import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.Key; +import org.apache.flink.types.Record; +import org.apache.flink.util.Collector; + +import org.junit.Assert; +import org.junit.Test; + +import static org.junit.Assert.*; + +public class JoinTaskTest extends DriverTestBase<FlatJoinFunction<Record, Record, Record>> { + + private static final long HASH_MEM = 6*1024*1024; + + private static final long SORT_MEM = 3*1024*1024; + + private static final int NUM_SORTER = 2; + + private static final long BNLJN_MEM = 10 * PAGE_SIZE; + + private final double bnljn_frac; + + private final double hash_frac; + + @SuppressWarnings("unchecked") + private final RecordComparator comparator1 = new RecordComparator( + new int[]{0}, (Class<? extends Key<?>>[])new Class<?>[]{ IntValue.class }); + + @SuppressWarnings("unchecked") + private final RecordComparator comparator2 = new RecordComparator( + new int[]{0}, (Class<? extends Key<?>>[])new Class<?>[]{ IntValue.class }); + + private final List<Record> outList = new ArrayList<>(); + + + public JoinTaskTest(ExecutionConfig config) { + super(config, HASH_MEM, NUM_SORTER, SORT_MEM); + bnljn_frac = (double)BNLJN_MEM/this.getMemoryManager().getMemorySize(); + hash_frac = (double)HASH_MEM/this.getMemoryManager().getMemorySize(); + } + + + @Test + public void testSortBoth1MatchTask() { + final int keyCnt1 = 20; + final int valCnt1 = 1; + + final int keyCnt2 = 10; + final int valCnt2 = 2; + + setOutput(this.outList); + addDriverComparator(this.comparator1); + addDriverComparator(this.comparator2); + getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get()); + getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE); + getTaskConfig().setRelativeMemoryDriver(bnljn_frac); + setNumFileHandlesForSort(4); + + final JoinDriver<Record, Record, Record> testTask = new JoinDriver<>(); + + try { + addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate()); + addInputSorted(new UniformRecordGenerator(keyCnt2, valCnt2, false), this.comparator2.duplicate()); + testDriver(testTask, MockMatchStub.class); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail("The test caused an exception."); + } + + final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2); + Assert.assertTrue("Resultset size was " + this.outList.size() + ". Expected was " + expCnt, this.outList.size() == expCnt); + + this.outList.clear(); + } + + @Test + public void testSortBoth2MatchTask() { + + int keyCnt1 = 20; + int valCnt1 = 1; + + int keyCnt2 = 20; + int valCnt2 = 1; + + setOutput(this.outList); + addDriverComparator(this.comparator1); + addDriverComparator(this.comparator2); + getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get()); + getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE); + getTaskConfig().setRelativeMemoryDriver(bnljn_frac); + setNumFileHandlesForSort(4); + + final JoinDriver<Record, Record, Record> testTask = new JoinDriver<>(); + + try { + addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate()); + addInputSorted(new UniformRecordGenerator(keyCnt2, valCnt2, false), this.comparator2.duplicate()); + testDriver(testTask, MockMatchStub.class); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail("The test caused an exception."); + } + + int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2); + + Assert.assertTrue("Resultset size was "+this.outList.size()+". Expected was "+expCnt, this.outList.size() == expCnt); + + this.outList.clear(); + + } + + @Test + public void testSortBoth3MatchTask() { + + int keyCnt1 = 20; + int valCnt1 = 1; + + int keyCnt2 = 20; + int valCnt2 = 20; + + setOutput(this.outList); + addDriverComparator(this.comparator1); + addDriverComparator(this.comparator2); + getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get()); + getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE); + getTaskConfig().setRelativeMemoryDriver(bnljn_frac); + setNumFileHandlesForSort(4); + + final JoinDriver<Record, Record, Record> testTask = new JoinDriver<>(); + + try { + addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate()); + addInputSorted(new UniformRecordGenerator(keyCnt2, valCnt2, false), this.comparator2.duplicate()); + testDriver(testTask, MockMatchStub.class); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail("The test caused an exception."); + } + + int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2); + + Assert.assertTrue("Resultset size was "+this.outList.size()+". Expected was "+expCnt, this.outList.size() == expCnt); + + this.outList.clear(); + + } + + @Test + public void testSortBoth4MatchTask() { + + int keyCnt1 = 20; + int valCnt1 = 20; + + int keyCnt2 = 20; + int valCnt2 = 1; + + setOutput(this.outList); + addDriverComparator(this.comparator1); + addDriverComparator(this.comparator2); + getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get()); + getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE); + getTaskConfig().setRelativeMemoryDriver(bnljn_frac); + setNumFileHandlesForSort(4); + + final JoinDriver<Record, Record, Record> testTask = new JoinDriver<>(); + + try { + addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate()); + addInputSorted(new UniformRecordGenerator(keyCnt2, valCnt2, false), this.comparator2.duplicate()); + testDriver(testTask, MockMatchStub.class); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail("The test caused an exception."); + } + + int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2); + + Assert.assertTrue("Resultset size was "+this.outList.size()+". Expected was "+expCnt, this.outList.size() == expCnt); + + this.outList.clear(); + + } + + @Test + public void testSortBoth5MatchTask() { + + int keyCnt1 = 20; + int valCnt1 = 20; + + int keyCnt2 = 20; + int valCnt2 = 20; + + setOutput(this.outList); + addDriverComparator(this.comparator1); + addDriverComparator(this.comparator2); + getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get()); + getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE); + getTaskConfig().setRelativeMemoryDriver(bnljn_frac); + setNumFileHandlesForSort(4); + + final JoinDriver<Record, Record, Record> testTask = new JoinDriver<>(); + + try { + addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate()); + addInputSorted(new UniformRecordGenerator(keyCnt2, valCnt2, false), this.comparator2.duplicate()); + testDriver(testTask, MockMatchStub.class); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail("The test caused an exception."); + } + + int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2); + + Assert.assertTrue("Resultset size was "+this.outList.size()+". Expected was "+expCnt, this.outList.size() == expCnt); + + this.outList.clear(); + + } + + @Test + public void testSortFirstMatchTask() { + + int keyCnt1 = 20; + int valCnt1 = 20; + + int keyCnt2 = 20; + int valCnt2 = 20; + + setOutput(this.outList); + addDriverComparator(this.comparator1); + addDriverComparator(this.comparator2); + getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get()); + getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE); + getTaskConfig().setRelativeMemoryDriver(bnljn_frac); + setNumFileHandlesForSort(4); + + final JoinDriver<Record, Record, Record> testTask = new JoinDriver<>(); + + try { + addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate()); + addInput(new UniformRecordGenerator(keyCnt2, valCnt2, true)); + testDriver(testTask, MockMatchStub.class); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail("The test caused an exception."); + } + + int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2); + + Assert.assertTrue("Resultset size was "+this.outList.size()+". Expected was "+expCnt, this.outList.size() == expCnt); + + this.outList.clear(); + + } + + @Test + public void testSortSecondMatchTask() { + + int keyCnt1 = 20; + int valCnt1 = 20; + + int keyCnt2 = 20; + int valCnt2 = 20; + + setOutput(this.outList); + addDriverComparator(this.comparator1); + addDriverComparator(this.comparator2); + getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get()); + getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE); + getTaskConfig().setRelativeMemoryDriver(bnljn_frac); + setNumFileHandlesForSort(4); + + final JoinDriver<Record, Record, Record> testTask = new JoinDriver<>(); + + try { + addInput(new UniformRecordGenerator(keyCnt1, valCnt1, true)); + addInputSorted(new UniformRecordGenerator(keyCnt2, valCnt2, false), this.comparator2.duplicate()); + testDriver(testTask, MockMatchStub.class); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail("The test caused an exception."); + } + + int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2); + + Assert.assertTrue("Resultset size was "+this.outList.size()+". Expected was "+expCnt, this.outList.size() == expCnt); + + this.outList.clear(); + + } + + @Test + public void testMergeMatchTask() { + int keyCnt1 = 20; + int valCnt1 = 20; + + int keyCnt2 = 20; + int valCnt2 = 20; + + setOutput(this.outList); + addDriverComparator(this.comparator1); + addDriverComparator(this.comparator2); + getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get()); + getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE); + getTaskConfig().setRelativeMemoryDriver(bnljn_frac); + setNumFileHandlesForSort(4); + + final JoinDriver<Record, Record, Record> testTask = new JoinDriver<>(); + + addInput(new UniformRecordGenerator(keyCnt1, valCnt1, true)); + addInput(new UniformRecordGenerator(keyCnt2, valCnt2, true)); + + try { + testDriver(testTask, MockMatchStub.class); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail("The test caused an exception."); + } + + int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2); + + Assert.assertTrue("Resultset size was "+this.outList.size()+". Expected was "+expCnt, this.outList.size() == expCnt); + + this.outList.clear(); + + } + + @Test + public void testFailingMatchTask() { + int keyCnt1 = 20; + int valCnt1 = 20; + + int keyCnt2 = 20; + int valCnt2 = 20; + + setOutput(new NirvanaOutputList()); + addDriverComparator(this.comparator1); + addDriverComparator(this.comparator2); + getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get()); + getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE); + getTaskConfig().setRelativeMemoryDriver(bnljn_frac); + setNumFileHandlesForSort(4); + + final JoinDriver<Record, Record, Record> testTask = new JoinDriver<>(); + + addInput(new UniformRecordGenerator(keyCnt1, valCnt1, true)); + addInput(new UniformRecordGenerator(keyCnt2, valCnt2, true)); + + try { + testDriver(testTask, MockFailingMatchStub.class); + Assert.fail("Driver did not forward Exception."); + } catch (ExpectedTestException e) { + // good! + } catch (Exception e) { + e.printStackTrace(); + Assert.fail("The test caused an exception."); + } + } + + @Test + public void testCancelMatchTaskWhileSort1() { + final int keyCnt = 20; + final int valCnt = 20; + + try { + setOutput(new NirvanaOutputList()); + addDriverComparator(this.comparator1); + addDriverComparator(this.comparator2); + getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get()); + getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE); + getTaskConfig().setRelativeMemoryDriver(bnljn_frac); + setNumFileHandlesForSort(4); + + final JoinDriver<Record, Record, Record> testTask = new JoinDriver<>(); + + try { + addInputSorted(new DelayingInfinitiveInputIterator(100), this.comparator1.duplicate()); + addInput(new UniformRecordGenerator(keyCnt, valCnt, true)); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail("The test caused an exception."); + } + + final AtomicReference<Throwable> error = new AtomicReference<>(); + + Thread taskRunner = new Thread("Task runner for testCancelMatchTaskWhileSort1()") { + @Override + public void run() { + try { + testDriver(testTask, MockMatchStub.class); + } + catch (Throwable t) { + error.set(t); + } + } + }; + taskRunner.start(); + + Thread.sleep(1000); + + cancel(); + taskRunner.interrupt(); + + taskRunner.join(60000); + + assertFalse("Task thread did not finish within 60 seconds", taskRunner.isAlive()); + + Throwable taskError = error.get(); + if (taskError != null) { + taskError.printStackTrace(); + fail("Error in task while canceling: " + taskError.getMessage()); + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testCancelMatchTaskWhileSort2() { + final int keyCnt = 20; + final int valCnt = 20; + + try { + setOutput(new NirvanaOutputList()); + addDriverComparator(this.comparator1); + addDriverComparator(this.comparator2); + getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get()); + getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE); + getTaskConfig().setRelativeMemoryDriver(bnljn_frac); + setNumFileHandlesForSort(4); + + final JoinDriver<Record, Record, Record> testTask = new JoinDriver<>(); + + try { + addInput(new UniformRecordGenerator(keyCnt, valCnt, true)); + addInputSorted(new DelayingInfinitiveInputIterator(100), this.comparator1.duplicate()); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail("The test caused an exception."); + } + + final AtomicReference<Throwable> error = new AtomicReference<>(); + + Thread taskRunner = new Thread("Task runner for testCancelMatchTaskWhileSort2()") { + @Override + public void run() { + try { + testDriver(testTask, MockMatchStub.class); + } + catch (Throwable t) { + error.set(t); + } + } + }; + taskRunner.start(); + + Thread.sleep(1000); + + cancel(); + taskRunner.interrupt(); + + taskRunner.join(60000); + + assertFalse("Task thread did not finish within 60 seconds", taskRunner.isAlive()); + + Throwable taskError = error.get(); + if (taskError != null) { + taskError.printStackTrace(); + fail("Error in task while canceling: " + taskError.getMessage()); + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testCancelMatchTaskWhileMatching() { + final int keyCnt = 20; + final int valCnt = 20; + + try { + setOutput(new NirvanaOutputList()); + addDriverComparator(this.comparator1); + addDriverComparator(this.comparator2); + getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get()); + getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE); + getTaskConfig().setRelativeMemoryDriver(bnljn_frac); + setNumFileHandlesForSort(4); + + final JoinDriver<Record, Record, Record> testTask = new JoinDriver<>(); + + addInput(new UniformRecordGenerator(keyCnt, valCnt, true)); + addInput(new UniformRecordGenerator(keyCnt, valCnt, true)); + + final AtomicReference<Throwable> error = new AtomicReference<>(); + + Thread taskRunner = new Thread("Task runner for testCancelMatchTaskWhileMatching()") { + @Override + public void run() { + try { + testDriver(testTask, MockDelayingMatchStub.class); + } + catch (Throwable t) { + error.set(t); + } + } + }; + taskRunner.start(); + + Thread.sleep(1000); + + cancel(); + taskRunner.interrupt(); + + taskRunner.join(60000); + + assertFalse("Task thread did not finish within 60 seconds", taskRunner.isAlive()); + + Throwable taskError = error.get(); + if (taskError != null) { + taskError.printStackTrace(); + fail("Error in task while canceling: " + taskError.getMessage()); + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testHash1MatchTask() { + int keyCnt1 = 20; + int valCnt1 = 1; + + int keyCnt2 = 10; + int valCnt2 = 2; + + addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false)); + addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false)); + addDriverComparator(this.comparator1); + addDriverComparator(this.comparator2); + getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get()); + setOutput(this.outList); + getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST); + getTaskConfig().setRelativeMemoryDriver(hash_frac); + + JoinDriver<Record, Record, Record> testTask = new JoinDriver<>(); + + try { + testDriver(testTask, MockMatchStub.class); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail("Test caused an exception."); + } + + final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2); + Assert.assertEquals("Wrong result set size.", expCnt, this.outList.size()); + this.outList.clear(); + } + + @Test + public void testHash2MatchTask() { + int keyCnt1 = 20; + int valCnt1 = 1; + + int keyCnt2 = 20; + int valCnt2 = 1; + + addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false)); + addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false)); + addDriverComparator(this.comparator1); + addDriverComparator(this.comparator2); + getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get()); + setOutput(this.outList); + getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND); + getTaskConfig().setRelativeMemoryDriver(hash_frac); + + JoinDriver<Record, Record, Record> testTask = new JoinDriver<>(); + + try { + testDriver(testTask, MockMatchStub.class); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail("Test caused an exception."); + } + + final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2); + Assert.assertEquals("Wrong result set size.", expCnt, this.outList.size()); + this.outList.clear(); + } + + @Test + public void testHash3MatchTask() { + int keyCnt1 = 20; + int valCnt1 = 1; + + int keyCnt2 = 20; + int valCnt2 = 20; + + addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false)); + addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false)); + addDriverComparator(this.comparator1); + addDriverComparator(this.comparator2); + getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get()); + setOutput(this.outList); + getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST); + getTaskConfig().setRelativeMemoryDriver(hash_frac); + + JoinDriver<Record, Record, Record> testTask = new JoinDriver<>(); + + try { + testDriver(testTask, MockMatchStub.class); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail("Test caused an exception."); + } + + final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2); + Assert.assertEquals("Wrong result set size.", expCnt, this.outList.size()); + this.outList.clear(); + } + + @Test + public void testHash4MatchTask() { + int keyCnt1 = 20; + int valCnt1 = 20; + + int keyCnt2 = 20; + int valCnt2 = 1; + + addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false)); + addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false)); + addDriverComparator(this.comparator1); + addDriverComparator(this.comparator2); + getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get()); + setOutput(this.outList); + getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND); + getTaskConfig().setRelativeMemoryDriver(hash_frac); + + JoinDriver<Record, Record, Record> testTask = new JoinDriver<>(); + + try { + testDriver(testTask, MockMatchStub.class); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail("Test caused an exception."); + } + + final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2); + Assert.assertEquals("Wrong result set size.", expCnt, this.outList.size()); + this.outList.clear(); + } + + @Test + public void testHash5MatchTask() { + int keyCnt1 = 20; + int valCnt1 = 20; + + int keyCnt2 = 20; + int valCnt2 = 20; + + addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false)); + addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false)); + addDriverComparator(this.comparator1); + addDriverComparator(this.comparator2); + getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get()); + setOutput(this.outList); + getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST); + getTaskConfig().setRelativeMemoryDriver(hash_frac); + + JoinDriver<Record, Record, Record> testTask = new JoinDriver<>(); + + try { + testDriver(testTask, MockMatchStub.class); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail("Test caused an exception."); + } + + final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2); + Assert.assertEquals("Wrong result set size.", expCnt, this.outList.size()); + this.outList.clear(); + } + + @Test + public void testFailingHashFirstMatchTask() { + int keyCnt1 = 20; + int valCnt1 = 20; + + int keyCnt2 = 20; + int valCnt2 = 20; + + addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false)); + addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false)); + addDriverComparator(this.comparator1); + addDriverComparator(this.comparator2); + getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get()); + setOutput(new NirvanaOutputList()); + getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST); + getTaskConfig().setRelativeMemoryDriver(hash_frac); + + JoinDriver<Record, Record, Record> testTask = new JoinDriver<>(); + + try { + testDriver(testTask, MockFailingMatchStub.class); + Assert.fail("Function exception was not forwarded."); + } catch (ExpectedTestException etex) { + // good! + } catch (Exception e) { + e.printStackTrace(); + Assert.fail("Test caused an exception."); + } + } + + @Test + public void testFailingHashSecondMatchTask() { + int keyCnt1 = 20; + int valCnt1 = 20; + + int keyCnt2 = 20; + int valCnt2 = 20; + + addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false)); + addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false)); + addDriverComparator(this.comparator1); + addDriverComparator(this.comparator2); + getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get()); + setOutput(new NirvanaOutputList()); + getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND); + getTaskConfig().setRelativeMemoryDriver(hash_frac); + + JoinDriver<Record, Record, Record> testTask = new JoinDriver<>(); + + try { + testDriver(testTask, MockFailingMatchStub.class); + Assert.fail("Function exception was not forwarded."); + } catch (ExpectedTestException etex) { + // good! + } catch (Exception e) { + e.printStackTrace(); + Assert.fail("Test caused an exception."); + } + } + + @Test + public void testCancelHashMatchTaskWhileBuildFirst() { + final int keyCnt = 20; + final int valCnt = 20; + + try { + addInput(new DelayingInfinitiveInputIterator(100)); + addInput(new UniformRecordGenerator(keyCnt, valCnt, false)); + + addDriverComparator(this.comparator1); + addDriverComparator(this.comparator2); + + getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get()); + + setOutput(new NirvanaOutputList()); + + getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST); + getTaskConfig().setRelativeMemoryDriver(hash_frac); + + final JoinDriver<Record, Record, Record> testTask = new JoinDriver<>(); + + final AtomicBoolean success = new AtomicBoolean(false); + + Thread taskRunner = new Thread() { + @Override + public void run() { + try { + testDriver(testTask, MockMatchStub.class); + success.set(true); + } catch (Exception ie) { + ie.printStackTrace(); + } + } + }; + taskRunner.start(); + + Thread.sleep(1000); + cancel(); + + try { + taskRunner.join(); + } + catch (InterruptedException ie) { + Assert.fail("Joining threads failed"); + } + + Assert.assertTrue("Test threw an exception even though it was properly canceled.", success.get()); + } + catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + + @Test + public void testHashCancelMatchTaskWhileBuildSecond() { + final int keyCnt = 20; + final int valCnt = 20; + + try { + addInput(new UniformRecordGenerator(keyCnt, valCnt, false)); + addInput(new DelayingInfinitiveInputIterator(100)); + + addDriverComparator(this.comparator1); + addDriverComparator(this.comparator2); + + getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get()); + + setOutput(new NirvanaOutputList()); + + getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND); + getTaskConfig().setRelativeMemoryDriver(hash_frac); + + final JoinDriver<Record, Record, Record> testTask = new JoinDriver<>(); + + final AtomicBoolean success = new AtomicBoolean(false); + + Thread taskRunner = new Thread() { + @Override + public void run() { + try { + testDriver(testTask, MockMatchStub.class); + success.set(true); + } catch (Exception ie) { + ie.printStackTrace(); + } + } + }; + taskRunner.start(); + + Thread.sleep(1000); + cancel(); + + try { + taskRunner.join(); + } + catch (InterruptedException ie) { + Assert.fail("Joining threads failed"); + } + + Assert.assertTrue("Test threw an exception even though it was properly canceled.", success.get()); + } + catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + + @Test + public void testHashFirstCancelMatchTaskWhileMatching() { + int keyCnt = 20; + int valCnt = 20; + + addInput(new UniformRecordGenerator(keyCnt, valCnt, false)); + addInput(new UniformRecordGenerator(keyCnt, valCnt, false)); + addDriverComparator(this.comparator1); + addDriverComparator(this.comparator2); + getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get()); + setOutput(new NirvanaOutputList()); + getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST); + getTaskConfig().setRelativeMemoryDriver(hash_frac); + + final JoinDriver<Record, Record, Record> testTask = new JoinDriver<>(); + + final AtomicBoolean success = new AtomicBoolean(false); + + Thread taskRunner = new Thread() { + @Override + public void run() { + try { + testDriver(testTask, MockMatchStub.class); + success.set(true); + } catch (Exception ie) { + ie.printStackTrace(); + } + } + }; + taskRunner.start(); + + TaskCancelThread tct = new TaskCancelThread(1, taskRunner, this); + tct.start(); + + try { + tct.join(); + taskRunner.join(); + } catch(InterruptedException ie) { + Assert.fail("Joining threads failed"); + } + + Assert.assertTrue("Test threw an exception even though it was properly canceled.", success.get()); + } + + @Test + public void testHashSecondCancelMatchTaskWhileMatching() { + int keyCnt = 20; + int valCnt = 20; + + addInput(new UniformRecordGenerator(keyCnt, valCnt, false)); + addInput(new UniformRecordGenerator(keyCnt, valCnt, false)); + addDriverComparator(this.comparator1); + addDriverComparator(this.comparator2); + getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get()); + setOutput(new NirvanaOutputList()); + getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND); + getTaskConfig().setRelativeMemoryDriver(hash_frac); + + final JoinDriver<Record, Record, Record> testTask = new JoinDriver<>(); + + final AtomicBoolean success = new AtomicBoolean(false); + + Thread taskRunner = new Thread() { + @Override + public void run() { + try { + testDriver(testTask, MockMatchStub.class); + success.set(true); + } catch (Exception ie) { + ie.printStackTrace(); + } + } + }; + taskRunner.start(); + + TaskCancelThread tct = new TaskCancelThread(1, taskRunner, this); + tct.start(); + + try { + tct.join(); + taskRunner.join(); + } catch(InterruptedException ie) { + Assert.fail("Joining threads failed"); + } + + Assert.assertTrue("Test threw an exception even though it was properly canceled.", success.get()); + } + + // ================================================================================================= + + public static final class MockMatchStub implements FlatJoinFunction<Record, Record, Record> { + private static final long serialVersionUID = 1L; + + @Override + public void join(Record record1, Record record2, Collector<Record> out) throws Exception { + out.collect(record1); + } + } + + public static final class MockFailingMatchStub implements FlatJoinFunction<Record, Record, Record> { + private static final long serialVersionUID = 1L; + + private int cnt = 0; + + @Override + public void join(Record record1, Record record2, Collector<Record> out) throws Exception { + if (++this.cnt >= 10) { + throw new ExpectedTestException(); + } + out.collect(record1); + } + } + + public static final class MockDelayingMatchStub implements FlatJoinFunction<Record, Record, Record> { + private static final long serialVersionUID = 1L; + + @Override + public void join(Record record1, Record record2, Collector<Record> out) throws Exception { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/3c8a6588/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MapTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MapTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MapTaskTest.java deleted file mode 100644 index bfc6c44..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MapTaskTest.java +++ /dev/null @@ -1,151 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.runtime.operators; - -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.flink.api.common.ExecutionConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.flink.api.common.functions.GenericCollectorMap; -import org.apache.flink.api.java.record.functions.MapFunction; -import org.apache.flink.runtime.operators.CollectorMapDriver; -import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector; -import org.apache.flink.runtime.operators.testutils.DriverTestBase; -import org.apache.flink.runtime.operators.testutils.ExpectedTestException; -import org.apache.flink.runtime.operators.testutils.InfiniteInputIterator; -import org.apache.flink.runtime.operators.testutils.TaskCancelThread; -import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator; -import org.apache.flink.types.Record; -import org.apache.flink.util.Collector; -import org.junit.Assert; -import org.junit.Test; - -@SuppressWarnings("deprecation") -public class MapTaskTest extends DriverTestBase<GenericCollectorMap<Record, Record>> { - - private static final Logger LOG = LoggerFactory.getLogger(MapTaskTest.class); - - private final CountingOutputCollector output = new CountingOutputCollector(); - - - public MapTaskTest(ExecutionConfig config) { - super(config, 0, 0); - } - - @Test - public void testMapTask() { - final int keyCnt = 100; - final int valCnt = 20; - - addInput(new UniformRecordGenerator(keyCnt, valCnt, false)); - setOutput(this.output); - - final CollectorMapDriver<Record, Record> testDriver = new CollectorMapDriver<Record, Record>(); - - try { - testDriver(testDriver, MockMapStub.class); - } catch (Exception e) { - LOG.debug("Exception while running the test driver.", e); - Assert.fail("Invoke method caused exception."); - } - - Assert.assertEquals("Wrong result set size.", keyCnt*valCnt, this.output.getNumberOfRecords()); - } - - @Test - public void testFailingMapTask() { - final int keyCnt = 100; - final int valCnt = 20; - - addInput(new UniformRecordGenerator(keyCnt, valCnt, false)); - setOutput(new DiscardingOutputCollector<Record>()); - - final CollectorMapDriver<Record, Record> testTask = new CollectorMapDriver<Record, Record>(); - try { - testDriver(testTask, MockFailingMapStub.class); - Assert.fail("Function exception was not forwarded."); - } catch (ExpectedTestException e) { - // good! - } catch (Exception e) { - e.printStackTrace(); - Assert.fail("Exception in test."); - } - } - - @Test - public void testCancelMapTask() { - addInput(new InfiniteInputIterator()); - setOutput(new DiscardingOutputCollector<Record>()); - - final CollectorMapDriver<Record, Record> testTask = new CollectorMapDriver<Record, Record>(); - - final AtomicBoolean success = new AtomicBoolean(false); - - final Thread taskRunner = new Thread() { - @Override - public void run() { - try { - testDriver(testTask, MockMapStub.class); - success.set(true); - } catch (Exception ie) { - ie.printStackTrace(); - } - } - }; - taskRunner.start(); - - TaskCancelThread tct = new TaskCancelThread(1, taskRunner, this); - tct.start(); - - try { - tct.join(); - taskRunner.join(); - } catch(InterruptedException ie) { - Assert.fail("Joining threads failed"); - } - - Assert.assertTrue("Test threw an exception even though it was properly canceled.", success.get()); - } - - public static class MockMapStub extends MapFunction { - private static final long serialVersionUID = 1L; - - @Override - public void map(Record record, Collector<Record> out) throws Exception { - out.collect(record); - } - - } - - public static class MockFailingMapStub extends MapFunction { - private static final long serialVersionUID = 1L; - - private int cnt = 0; - - @Override - public void map(Record record, Collector<Record> out) throws Exception { - if (++this.cnt >= 10) { - throw new ExpectedTestException(); - } - out.collect(record); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/3c8a6588/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java deleted file mode 100644 index 6f7fb21..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java +++ /dev/null @@ -1,167 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.runtime.operators; - -import org.apache.flink.api.common.ExecutionConfig; -import org.junit.Assert; -import org.apache.flink.api.common.functions.FlatJoinFunction; -import org.apache.flink.api.common.typeutils.record.RecordComparator; -import org.apache.flink.api.common.typeutils.record.RecordPairComparatorFactory; -import org.apache.flink.api.java.record.functions.JoinFunction; -import org.apache.flink.runtime.operators.testutils.DriverTestBase; -import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator; -import org.apache.flink.types.IntValue; -import org.apache.flink.types.Key; -import org.apache.flink.types.Record; -import org.apache.flink.util.Collector; -import org.junit.Test; - -@SuppressWarnings("deprecation") -public class MatchTaskExternalITCase extends DriverTestBase<FlatJoinFunction<Record, Record, Record>> { - - private static final long HASH_MEM = 4*1024*1024; - - private static final long SORT_MEM = 3*1024*1024; - - private static final long BNLJN_MEM = 10 * PAGE_SIZE; - - private final double bnljn_frac; - - private final double hash_frac; - - @SuppressWarnings("unchecked") - private final RecordComparator comparator1 = new RecordComparator( - new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ IntValue.class }); - - @SuppressWarnings("unchecked") - private final RecordComparator comparator2 = new RecordComparator( - new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ IntValue.class }); - - private final CountingOutputCollector output = new CountingOutputCollector(); - - public MatchTaskExternalITCase(ExecutionConfig config) { - super(config, HASH_MEM, 2, SORT_MEM); - bnljn_frac = (double)BNLJN_MEM/this.getMemoryManager().getMemorySize(); - hash_frac = (double)HASH_MEM/this.getMemoryManager().getMemorySize(); - } - - @Test - public void testExternalSort1MatchTask() { - final int keyCnt1 = 16384*4; - final int valCnt1 = 2; - - final int keyCnt2 = 8192; - final int valCnt2 = 4*2; - - final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2); - - setOutput(this.output); - addDriverComparator(this.comparator1); - addDriverComparator(this.comparator2); - getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get()); - getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE); - getTaskConfig().setRelativeMemoryDriver(bnljn_frac); - setNumFileHandlesForSort(4); - - final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>(); - - try { - addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate()); - addInputSorted(new UniformRecordGenerator(keyCnt2, valCnt2, false), this.comparator2.duplicate()); - testDriver(testTask, MockMatchStub.class); - } catch (Exception e) { - e.printStackTrace(); - Assert.fail("The test caused an exception."); - } - - Assert.assertEquals("Wrong result set size.", expCnt, this.output.getNumberOfRecords()); - } - - @Test - public void testExternalHash1MatchTask() { - final int keyCnt1 = 32768; - final int valCnt1 = 8; - - final int keyCnt2 = 65536; - final int valCnt2 = 8; - - final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2); - - addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false)); - addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false)); - addDriverComparator(this.comparator1); - addDriverComparator(this.comparator2); - getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get()); - setOutput(this.output); - getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST); - getTaskConfig().setRelativeMemoryDriver(hash_frac); - - JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>(); - - try { - testDriver(testTask, MockMatchStub.class); - } catch (Exception e) { - e.printStackTrace(); - Assert.fail("Test caused an exception."); - } - - Assert.assertEquals("Wrong result set size.", expCnt, this.output.getNumberOfRecords()); - } - - @Test - public void testExternalHash2MatchTask() { - final int keyCnt1 = 32768; - final int valCnt1 = 8; - - final int keyCnt2 = 65536; - final int valCnt2 = 8; - - final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2); - - addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false)); - addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false)); - addDriverComparator(this.comparator1); - addDriverComparator(this.comparator2); - getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get()); - setOutput(this.output); - getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND); - getTaskConfig().setRelativeMemoryDriver(hash_frac); - - JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>(); - - try { - testDriver(testTask, MockMatchStub.class); - } catch (Exception e) { - e.printStackTrace(); - Assert.fail("Test caused an exception."); - } - - Assert.assertEquals("Wrong result set size.", expCnt, this.output.getNumberOfRecords()); - } - - public static final class MockMatchStub extends JoinFunction { - private static final long serialVersionUID = 1L; - - @Override - public void join(Record value1, Record value2, Collector<Record> out) throws Exception { - out.collect(value1); - } - } -}