[FLINK-1085] [tests] Make the combiner tests generic. Add more coverage for oversized records.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/01c74338 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/01c74338 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/01c74338 Branch: refs/heads/master Commit: 01c74338ff44ea7f3735a7eb94b2ce01ababc505 Parents: 7271881 Author: Stephan Ewen <[email protected]> Authored: Mon Jul 13 15:12:04 2015 +0200 Committer: Stephan Ewen <[email protected]> Committed: Mon Jul 13 16:29:38 2015 +0200 ---------------------------------------------------------------------- .../operators/GroupReduceCombineDriver.java | 60 +-- .../operators/CombineTaskExternalITCase.java | 87 +++- .../runtime/operators/CombineTaskTest.java | 354 ++++++++-------- .../operators/CombinerOversizedRecordsTest.java | 236 +++++++++++ .../operators/testutils/DelayingIterator.java | 59 +++ .../testutils/InfiniteIntTupleIterator.java | 38 ++ .../runtime/operators/testutils/TestData.java | 2 +- .../testutils/UnaryOperatorTestBase.java | 410 +++++++++++++++++++ .../UniformIntStringTupleGenerator.java | 77 ++++ .../testutils/UniformIntTupleGenerator.java | 75 ++++ .../operators/testutils/UnionIterator.java | 16 +- 11 files changed, 1199 insertions(+), 215 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/01c74338/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java index c426295..2bf778e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java @@ -16,13 +16,9 @@ * limitations under the License. */ - package org.apache.flink.runtime.operators; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.runtime.util.NonReusingKeyGroupedIterator; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -33,10 +29,15 @@ import org.apache.flink.runtime.operators.sort.FixedLengthRecordSorter; import org.apache.flink.runtime.operators.sort.InMemorySorter; import org.apache.flink.runtime.operators.sort.NormalizedKeySorter; import org.apache.flink.runtime.operators.sort.QuickSort; +import org.apache.flink.runtime.util.NonReusingKeyGroupedIterator; import org.apache.flink.runtime.util.ReusingKeyGroupedIterator; import org.apache.flink.util.Collector; import org.apache.flink.util.MutableObjectIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; import java.util.List; /** @@ -44,11 +45,12 @@ import java.util.List; * the user supplied a RichGroupReduceFunction with a combine method. The combining is performed in memory with a * lazy approach which only combines elements which currently fit in the sorter. This may lead to a partial solution. * In the case of the RichGroupReduceFunction this partial result is transformed into a proper deterministic result. - * The CombineGroup uses the GroupCombineFunction interface which allows to combine values of type <IN> to any type - * of type <OUT>. In contrast, the RichGroupReduceFunction requires the combine method to have the same input and - * output type to be able to reduce the elements after the combine from <IN> to <OUT>. + * The CombineGroup uses the GroupCombineFunction interface which allows to combine values of type {@code IN} + * to any type of type {@code OUT}. In contrast, the RichGroupReduceFunction requires the combine method + * to have the same input and output type to be able to reduce the elements after the combine from + * {@code IN} to {@code OUT}. * - * The CombineTask uses a combining iterator over its input. The output of the iterator is emitted. + * <p>The CombineTask uses a combining iterator over its input. The output of the iterator is emitted.</p> * * @param <IN> The data type consumed by the combiner. * @param <OUT> The data type produced by the combiner. @@ -67,8 +69,6 @@ public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<GroupCombin private GroupCombineFunction<IN, OUT> combiner; private TypeSerializer<IN> serializer; - - private TypeComparator<IN> sortingComparator; private TypeComparator<IN> groupingComparator; @@ -78,7 +78,7 @@ public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<GroupCombin private Collector<OUT> output; - private long oversizedRecordCount = 0L; + private long oversizedRecordCount; private volatile boolean running = true; @@ -112,9 +112,8 @@ public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<GroupCombin @Override public void prepare() throws Exception { final DriverStrategy driverStrategy = this.taskContext.getTaskConfig().getDriverStrategy(); - if(driverStrategy != DriverStrategy.SORTED_GROUP_COMBINE){ - throw new Exception("Invalid strategy " + driverStrategy + " for " + - "group reduce combinder."); + if (driverStrategy != DriverStrategy.SORTED_GROUP_COMBINE){ + throw new Exception("Invalid strategy " + driverStrategy + " for group reduce combiner."); } this.memManager = this.taskContext.getMemoryManager(); @@ -122,7 +121,9 @@ public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<GroupCombin final TypeSerializerFactory<IN> serializerFactory = this.taskContext.getInputSerializer(0); this.serializer = serializerFactory.getSerializer(); - this.sortingComparator = this.taskContext.getDriverComparator(0); + + final TypeComparator<IN> sortingComparator = this.taskContext.getDriverComparator(0); + this.groupingComparator = this.taskContext.getDriverComparator(1); this.combiner = this.taskContext.getStub(); this.output = this.taskContext.getOutputCollector(); @@ -131,12 +132,12 @@ public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<GroupCombin numMemoryPages); // instantiate a fix-length in-place sorter, if possible, otherwise the out-of-place sorter - if (this.sortingComparator.supportsSerializationWithKeyNormalization() && + if (sortingComparator.supportsSerializationWithKeyNormalization() && this.serializer.getLength() > 0 && this.serializer.getLength() <= THRESHOLD_FOR_IN_PLACE_SORTING) { - this.sorter = new FixedLengthRecordSorter<IN>(this.serializer, this.sortingComparator, memory); + this.sorter = new FixedLengthRecordSorter<IN>(this.serializer, sortingComparator, memory); } else { - this.sorter = new NormalizedKeySorter<IN>(this.serializer, this.sortingComparator.duplicate(), memory); + this.sorter = new NormalizedKeySorter<IN>(this.serializer, sortingComparator.duplicate(), memory); } ExecutionConfig executionConfig = taskContext.getExecutionConfig(); @@ -171,10 +172,14 @@ public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<GroupCombin // write the value again if (!this.sorter.write(value)) { + ++oversizedRecordCount; - LOG.debug("Cannot write record to fresh sort buffer. Record too large. Oversized record count: {}", oversizedRecordCount); - // simply forward the record - this.output.collect((OUT)value); + LOG.debug("Cannot write record to fresh sort buffer, record is too large. " + + "Oversized record count: {}", oversizedRecordCount); + + // simply forward the record. We need to pass it through the combine function to convert it + Iterable<IN> input = Collections.singleton(value); + this.combiner.combine(input, this.output); } } @@ -210,7 +215,7 @@ public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<GroupCombin @Override public void cleanup() throws Exception { - if(this.sorter != null) { + if (this.sorter != null) { this.memManager.release(this.sorter.dispose()); } } @@ -218,8 +223,17 @@ public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<GroupCombin @Override public void cancel() { this.running = false; - if(this.sorter != null) { + if (this.sorter != null) { this.memManager.release(this.sorter.dispose()); } } + + /** + * Gets the number of oversized records handled by this combiner. + * + * @return The number of oversized records handled by this combiner. + */ + public long getOversizedRecordCount() { + return oversizedRecordCount; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/01c74338/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java index d957fa1..4905e57 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java @@ -16,17 +16,18 @@ * limitations under the License. */ - package org.apache.flink.runtime.operators; import java.util.ArrayList; import java.util.HashMap; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.java.record.operators.ReduceOperator; +import org.apache.flink.runtime.operators.testutils.ExpectedTestException; +import org.apache.flink.util.Collector; import org.junit.Assert; import org.apache.flink.api.common.functions.RichGroupReduceFunction; import org.apache.flink.api.common.typeutils.record.RecordComparator; -import org.apache.flink.runtime.operators.CombineTaskTest.MockCombiningReduceStub; import org.apache.flink.runtime.operators.testutils.DriverTestBase; import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator; import org.apache.flink.types.IntValue; @@ -45,7 +46,7 @@ public class CombineTaskExternalITCase extends DriverTestBase<RichGroupReduceFun @SuppressWarnings("unchecked") private final RecordComparator comparator = new RecordComparator( - new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ IntValue.class }); + new int[]{0}, (Class<? extends Key<?>>[])new Class<?>[]{ IntValue.class }); public CombineTaskExternalITCase(ExecutionConfig config) { super(config, COMBINE_MEM, 0); @@ -161,4 +162,84 @@ public class CombineTaskExternalITCase extends DriverTestBase<RichGroupReduceFun this.outList.clear(); } + + // ------------------------------------------------------------------------ + // ------------------------------------------------------------------------ + + @ReduceOperator.Combinable + public static class MockCombiningReduceStub extends RichGroupReduceFunction<Record, Record> { + private static final long serialVersionUID = 1L; + + private final IntValue theInteger = new IntValue(); + + @Override + public void reduce(Iterable<Record> records, Collector<Record> out) { + Record element = null; + int sum = 0; + + for (Record next : records) { + element = next; + element.getField(1, this.theInteger); + + sum += this.theInteger.getValue(); + } + this.theInteger.setValue(sum); + element.setField(1, this.theInteger); + out.collect(element); + } + + @Override + public void combine(Iterable<Record> records, Collector<Record> out) throws Exception { + reduce(records, out); + } + } + + @ReduceOperator.Combinable + public static final class MockFailingCombiningReduceStub extends RichGroupReduceFunction<Record, Record> { + private static final long serialVersionUID = 1L; + + private int cnt = 0; + + private final IntValue key = new IntValue(); + private final IntValue value = new IntValue(); + private final IntValue combineValue = new IntValue(); + + @Override + public void reduce(Iterable<Record> records, Collector<Record> out) { + Record element = null; + int sum = 0; + + for (Record next : records) { + element = next; + element.getField(1, this.value); + + sum += this.value.getValue(); + } + element.getField(0, this.key); + this.value.setValue(sum - this.key.getValue()); + element.setField(1, this.value); + out.collect(element); + } + + @Override + public void combine(Iterable<Record> records, Collector<Record> out) { + Record element = null; + int sum = 0; + + for (Record next : records) { + element = next; + element.getField(1, this.combineValue); + + sum += this.combineValue.getValue(); + } + + if (++this.cnt >= 10) { + throw new ExpectedTestException(); + } + + this.combineValue.setValue(sum); + element.setField(1, this.combineValue); + out.collect(element); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/01c74338/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java index 7772151..932e746 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java @@ -18,254 +18,244 @@ package org.apache.flink.runtime.operators; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; - import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.runtime.operators.testutils.*; -import org.junit.Assert; import org.apache.flink.api.common.functions.RichGroupReduceFunction; -import org.apache.flink.api.common.typeutils.record.RecordComparator; -import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable; -import org.apache.flink.types.IntValue; -import org.apache.flink.types.Key; -import org.apache.flink.types.Record; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.IntComparator; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.runtime.TupleComparator; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; +import org.apache.flink.runtime.operators.testutils.DelayingIterator; +import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector; +import org.apache.flink.runtime.operators.testutils.ExpectedTestException; +import org.apache.flink.runtime.operators.testutils.InfiniteIntTupleIterator; +import org.apache.flink.runtime.operators.testutils.UnaryOperatorTestBase; +import org.apache.flink.runtime.operators.testutils.UniformIntTupleGenerator; + import org.apache.flink.util.Collector; import org.apache.flink.util.MutableObjectIterator; -import org.apache.flink.runtime.operators.testutils.TestData.Generator; + import org.junit.Test; -public class CombineTaskTest extends DriverTestBase<RichGroupReduceFunction<Record, ?>> -{ +import java.util.ArrayList; + +import static org.junit.Assert.*; + +public class CombineTaskTest + extends UnaryOperatorTestBase<RichGroupReduceFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>, + Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> { + private static final long COMBINE_MEM = 3 * 1024 * 1024; private final double combine_frac; - private final ArrayList<Record> outList = new ArrayList<Record>(); + private final ArrayList<Tuple2<Integer, Integer>> outList = new ArrayList<Tuple2<Integer, Integer>>(); + + private final TypeSerializer<Tuple2<Integer, Integer>> serializer = new TupleSerializer<Tuple2<Integer, Integer>>( + (Class<Tuple2<Integer, Integer>>) (Class<?>) Tuple2.class, + new TypeSerializer<?>[] { IntSerializer.INSTANCE, IntSerializer.INSTANCE }); - @SuppressWarnings("unchecked") - private final RecordComparator comparator = new RecordComparator( - new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ IntValue.class }); + private final TypeComparator<Tuple2<Integer, Integer>> comparator = new TupleComparator<Tuple2<Integer, Integer>>( + new int[]{0}, + new TypeComparator<?>[] { new IntComparator(true) }, + new TypeSerializer<?>[] { IntSerializer.INSTANCE }); + public CombineTaskTest(ExecutionConfig config) { super(config, COMBINE_MEM, 0); - combine_frac = (double)COMBINE_MEM/this.getMemoryManager().getMemorySize(); + combine_frac = (double)COMBINE_MEM / this.getMemoryManager().getMemorySize(); } + @Test public void testCombineTask() { - int keyCnt = 100; - int valCnt = 20; - - addInput(new UniformRecordGenerator(keyCnt, valCnt, false)); - addDriverComparator(this.comparator); - addDriverComparator(this.comparator); - setOutput(this.outList); - - getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE); - getTaskConfig().setRelativeMemoryDriver(combine_frac); - getTaskConfig().setFilehandlesDriver(2); - - final GroupReduceCombineDriver<Record, Record> testTask = new GroupReduceCombineDriver<Record, Record>(); - try { + int keyCnt = 100; + int valCnt = 20; + + setInput(new UniformIntTupleGenerator(keyCnt, valCnt, false), serializer); + addDriverComparator(this.comparator); + addDriverComparator(this.comparator); + setOutput(this.outList, serializer); + + getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE); + getTaskConfig().setRelativeMemoryDriver(combine_frac); + getTaskConfig().setFilehandlesDriver(2); + + final GroupReduceCombineDriver<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testTask = + new GroupReduceCombineDriver<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>(); + testDriver(testTask, MockCombiningReduceStub.class); - } catch (Exception e) { - e.printStackTrace(); - Assert.fail("Invoke method caused exception."); - } - - int expSum = 0; - for (int i = 1;i < valCnt; i++) { - expSum += i; - } - - Assert.assertTrue("Resultset size was "+this.outList.size()+". Expected was "+keyCnt, this.outList.size() == keyCnt); - - for(Record record : this.outList) { - Assert.assertTrue("Incorrect result", record.getField(1, IntValue.class).getValue() == expSum); + + int expSum = 0; + for (int i = 1;i < valCnt; i++) { + expSum += i; + } + + assertTrue(this.outList.size() == keyCnt); + + for (Tuple2<Integer, Integer> record : this.outList) { + assertTrue(record.f1 == expSum); + } + + this.outList.clear(); } - - this.outList.clear(); - } - - @Test - public void testOversizedRecordCombineTask() { - int tenMil = 10000000; - Generator g = new Generator(561349061987311L, 1, tenMil); - //generate 10 records each of size 10MB - final TestData.GeneratorIterator gi = new TestData.GeneratorIterator(g, 10); - List<MutableObjectIterator<Record>> inputs = new ArrayList<MutableObjectIterator<Record>>(); - inputs.add(gi); - - addInput(new UnionIterator<Record>(inputs)); - addDriverComparator(this.comparator); - addDriverComparator(this.comparator); - setOutput(this.outList); - - getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE); - getTaskConfig().setRelativeMemoryDriver(combine_frac); - getTaskConfig().setFilehandlesDriver(2); - - final GroupReduceCombineDriver<Record, Record> testTask = new GroupReduceCombineDriver<Record, Record>(); - - try { - testDriver(testTask, MockCombiningReduceStub.class); - } catch (Exception e) { + catch (Exception e) { e.printStackTrace(); - Assert.fail("Invoke method caused exception."); + fail(e.getMessage()); } - - Assert.assertTrue("Resultset size was "+this.outList.size()+". Expected was "+10, this.outList.size() == 10); - - this.outList.clear(); } @Test public void testFailingCombineTask() { - int keyCnt = 100; - int valCnt = 20; - - addInput(new UniformRecordGenerator(keyCnt, valCnt, false)); - addDriverComparator(this.comparator); - addDriverComparator(this.comparator); - setOutput(new DiscardingOutputCollector<Record>()); - - getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE); - getTaskConfig().setRelativeMemoryDriver(combine_frac); - getTaskConfig().setFilehandlesDriver(2); - - final GroupReduceCombineDriver<Record, Record> testTask = new GroupReduceCombineDriver<Record, Record>(); - try { - testDriver(testTask, MockFailingCombiningReduceStub.class); - Assert.fail("Exception not forwarded."); - } catch (ExpectedTestException etex) { - // good! - } catch (Exception e) { + int keyCnt = 100; + int valCnt = 20; + + setInput(new UniformIntTupleGenerator(keyCnt, valCnt, false), serializer); + addDriverComparator(this.comparator); + addDriverComparator(this.comparator); + setOutput(new DiscardingOutputCollector<Tuple2<Integer, Integer>>()); + + getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE); + getTaskConfig().setRelativeMemoryDriver(combine_frac); + getTaskConfig().setFilehandlesDriver(2); + + final GroupReduceCombineDriver<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testTask = + new GroupReduceCombineDriver<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>(); + + try { + testDriver(testTask, MockFailingCombiningReduceStub.class); + fail("Exception not forwarded."); + } + catch (ExpectedTestException etex) { + // good! + } + } + catch (Exception e) { e.printStackTrace(); - Assert.fail("Test failed due to an exception."); + fail(e.getMessage()); } } @Test - public void testCancelCombineTaskSorting() - { - addInput(new DelayingInfinitiveInputIterator(100)); - addDriverComparator(this.comparator); - addDriverComparator(this.comparator); - setOutput(new DiscardingOutputCollector<Record>()); - - getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE); - getTaskConfig().setRelativeMemoryDriver(combine_frac); - getTaskConfig().setFilehandlesDriver(2); - - final GroupReduceCombineDriver<Record, Record> testTask = new GroupReduceCombineDriver<Record, Record>(); - - final AtomicBoolean success = new AtomicBoolean(false); - - Thread taskRunner = new Thread() { - @Override - public void run() { - try { - testDriver(testTask, MockFailingCombiningReduceStub.class); - success.set(true); - } catch (Exception ie) { - ie.printStackTrace(); - } - } - }; - taskRunner.start(); - - TaskCancelThread tct = new TaskCancelThread(1, taskRunner, this); - tct.start(); - + public void testCancelCombineTaskSorting() { try { - tct.join(); - taskRunner.join(); - } catch(InterruptedException ie) { - Assert.fail("Joining threads failed"); + MutableObjectIterator<Tuple2<Integer, Integer>> slowInfiniteInput = + new DelayingIterator<Tuple2<Integer, Integer>>(new InfiniteIntTupleIterator(), 1); + + setInput(slowInfiniteInput, serializer); + addDriverComparator(this.comparator); + addDriverComparator(this.comparator); + setOutput(new DiscardingOutputCollector<Tuple2<Integer, Integer>>()); + + getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE); + getTaskConfig().setRelativeMemoryDriver(combine_frac); + getTaskConfig().setFilehandlesDriver(2); + + final GroupReduceCombineDriver<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testTask = + new GroupReduceCombineDriver<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>(); + + Thread taskRunner = new Thread() { + @Override + public void run() { + try { + testDriver(testTask, MockFailingCombiningReduceStub.class); + } + catch (Exception e) { + // exceptions may happen during canceling + } + } + }; + taskRunner.start(); + + // give the task some time + Thread.sleep(500); + + // cancel + testTask.cancel(); + + // make sure it reacts to the canceling in some time + taskRunner.join(5000); + + assertFalse("Task did not cancel properly within in 5 seconds.", taskRunner.isAlive()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); } - - Assert.assertTrue("Exception was thrown despite proper canceling.", success.get()); } - @Combinable - public static class MockCombiningReduceStub extends RichGroupReduceFunction<Record, Record> { + // ------------------------------------------------------------------------ + // Test Combiners + // ------------------------------------------------------------------------ + + @RichGroupReduceFunction.Combinable + public static class MockCombiningReduceStub extends + RichGroupReduceFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> + { private static final long serialVersionUID = 1L; - - private final IntValue theInteger = new IntValue(); @Override - public void reduce(Iterable<Record> records, Collector<Record> out) { - Record element = null; + public void reduce(Iterable<Tuple2<Integer, Integer>> records, Collector<Tuple2<Integer, Integer>> out) { + int key = 0; int sum = 0; - - for (Record next : records) { - element = next; - element.getField(1, this.theInteger); - - sum += this.theInteger.getValue(); + + for (Tuple2<Integer, Integer> next : records) { + key = next.f0; + sum += next.f1; } - this.theInteger.setValue(sum); - element.setField(1, this.theInteger); - out.collect(element); + + out.collect(new Tuple2<Integer, Integer>(key, sum)); } @Override - public void combine(Iterable<Record> records, Collector<Record> out) throws Exception { + public void combine(Iterable<Tuple2<Integer, Integer>> records, Collector<Tuple2<Integer, Integer>> out) { reduce(records, out); } } - @Combinable - public static final class MockFailingCombiningReduceStub extends RichGroupReduceFunction<Record, Record> { + @RichGroupReduceFunction.Combinable + public static final class MockFailingCombiningReduceStub extends + RichGroupReduceFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> + { private static final long serialVersionUID = 1L; - private int cnt = 0; - - private final IntValue key = new IntValue(); - private final IntValue value = new IntValue(); - private final IntValue combineValue = new IntValue(); + private int cnt; @Override - public void reduce(Iterable<Record> records, Collector<Record> out) { - Record element = null; + public void reduce(Iterable<Tuple2<Integer, Integer>> records, Collector<Tuple2<Integer, Integer>> out) { + int key = 0; int sum = 0; - for (Record next : records) { - element = next; - element.getField(1, this.value); - - sum += this.value.getValue(); + for (Tuple2<Integer, Integer> next : records) { + key = next.f0; + sum += next.f1; } - element.getField(0, this.key); - this.value.setValue(sum - this.key.getValue()); - element.setField(1, this.value); - out.collect(element); + + int resultValue = sum - key; + out.collect(new Tuple2<Integer, Integer>(key, resultValue)); } @Override - public void combine(Iterable<Record> records, Collector<Record> out) { - Record element = null; + public void combine(Iterable<Tuple2<Integer, Integer>> records, Collector<Tuple2<Integer, Integer>> out) { + int key = 0; int sum = 0; - - for (Record next : records) { - element = next; - element.getField(1, this.combineValue); - - sum += this.combineValue.getValue(); + + for (Tuple2<Integer, Integer> next : records) { + key = next.f0; + sum += next.f1; } if (++this.cnt >= 10) { throw new ExpectedTestException(); } - - this.combineValue.setValue(sum); - element.setField(1, this.combineValue); - out.collect(element); + + int resultValue = sum - key; + out.collect(new Tuple2<Integer, Integer>(key, resultValue)); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/01c74338/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombinerOversizedRecordsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombinerOversizedRecordsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombinerOversizedRecordsTest.java new file mode 100644 index 0000000..58d1676 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombinerOversizedRecordsTest.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.GroupCombineFunction; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.DoubleSerializer; +import org.apache.flink.api.common.typeutils.base.IntComparator; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.runtime.TupleComparator; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; +import org.apache.flink.runtime.operators.testutils.UnaryOperatorTestBase; +import org.apache.flink.runtime.operators.testutils.UniformIntTupleGenerator; +import org.apache.flink.runtime.operators.testutils.UnionIterator; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Random; + +import static org.junit.Assert.*; + +/** + * Test that checks how the combiner handles very large records that are too large to be written into + * a fresh sort buffer. + */ +public class CombinerOversizedRecordsTest + extends UnaryOperatorTestBase<GroupCombineFunction<Tuple3<Integer, Integer, String>, Tuple3<Integer, Double, String>>, + Tuple3<Integer, Integer, String>, Tuple3<Integer, Double, String>> { + + private static final long COMBINE_MEM = 3 * 1024 * 1024; + + private final double combine_frac; + + private final ArrayList<Tuple3<Integer, Double, String>> outList = new ArrayList<Tuple3<Integer, Double, String>>(); + + private final TypeSerializer<Tuple3<Integer, Integer, String>> serializer = + new TupleSerializer<Tuple3<Integer, Integer, String>>( + (Class<Tuple3<Integer, Integer, String>>) (Class<?>) Tuple3.class, + new TypeSerializer<?>[] { IntSerializer.INSTANCE, IntSerializer.INSTANCE, StringSerializer.INSTANCE }); + + private final TypeSerializer<Tuple3<Integer, Double, String>> outSerializer = + new TupleSerializer<Tuple3<Integer, Double, String>>( + (Class<Tuple3<Integer, Double, String>>) (Class<?>) Tuple3.class, + new TypeSerializer<?>[] { IntSerializer.INSTANCE, DoubleSerializer.INSTANCE, StringSerializer.INSTANCE }); + + private final TypeComparator<Tuple3<Integer, Integer, String>> comparator = + new TupleComparator<Tuple3<Integer, Integer, String>>( + new int[] { 0 }, + new TypeComparator<?>[] { new IntComparator(true) }, + new TypeSerializer<?>[] { IntSerializer.INSTANCE }); + + // ------------------------------------------------------------------------ + + public CombinerOversizedRecordsTest(ExecutionConfig config) { + super(config, COMBINE_MEM, 0); + combine_frac = (double)COMBINE_MEM / getMemoryManager().getMemorySize(); + } + + @Test + public void testOversizedRecordCombineTask() { + try { + final int keyCnt = 100; + final int valCnt = 20; + + // create a long heavy string payload + StringBuilder bld = new StringBuilder(10 * 1024 * 1024); + Random rnd = new Random(); + + for (int i = 0; i < 10000000; i++) { + bld.append((char) (rnd.nextInt(26) + 'a')); + } + + String longString = bld.toString(); + bld = null; + + // construct the input as a union of + // 1) long string + // 2) some random values + // 3) long string + // 4) random values + // 5) long string + + // random values 1 + MutableObjectIterator<Tuple2<Integer, Integer>> gen1 = + new UniformIntTupleGenerator(keyCnt, valCnt, false); + + // random values 2 + MutableObjectIterator<Tuple2<Integer, Integer>> gen2 = + new UniformIntTupleGenerator(keyCnt, valCnt, false); + + @SuppressWarnings("unchecked") + MutableObjectIterator<Tuple3<Integer, Integer, String>> input = + new UnionIterator<Tuple3<Integer, Integer, String>>( + new SingleValueIterator<Tuple3<Integer, Integer, String>>(new Tuple3<Integer, Integer, String>(-1, -1, longString)), + new StringIteratorDecorator(gen1), + new SingleValueIterator<Tuple3<Integer, Integer, String>>(new Tuple3<Integer, Integer, String>(-1, -1, longString)), + new StringIteratorDecorator(gen2), + new SingleValueIterator<Tuple3<Integer, Integer, String>>(new Tuple3<Integer, Integer, String>(-1, -1, longString))); + + setInput(input, serializer); + addDriverComparator(this.comparator); + addDriverComparator(this.comparator); + setOutput(this.outList, this.outSerializer); + + getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE); + getTaskConfig().setRelativeMemoryDriver(combine_frac); + getTaskConfig().setFilehandlesDriver(2); + + GroupReduceCombineDriver<Tuple3<Integer, Integer, String>, Tuple3<Integer, Double, String>> testTask = + new GroupReduceCombineDriver<Tuple3<Integer, Integer, String>, Tuple3<Integer, Double, String>>(); + + testDriver(testTask, TestCombiner.class); + + assertEquals(3, testTask.getOversizedRecordCount()); + assertTrue(keyCnt + 3 == outList.size() || 2*keyCnt + 3 == outList.size()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + // ------------------------------------------------------------------------ + + public static final class TestCombiner + implements GroupCombineFunction<Tuple3<Integer, Integer, String>, Tuple3<Integer, Double, String>> + { + private static final long serialVersionUID = 1L; + + + @Override + public void combine(Iterable<Tuple3<Integer, Integer, String>> values, + Collector<Tuple3<Integer, Double, String>> out) + { + int key = 0; + int sum = 0; + String someString = null; + + for (Tuple3<Integer, Integer, String> next : values) { + key = next.f0; + sum += next.f1; + someString = next.f2; + } + + out.collect(new Tuple3<Integer, Double, String>(key, (double) sum, someString)); + } + } + + // ------------------------------------------------------------------------ + + private static class StringIteratorDecorator implements MutableObjectIterator<Tuple3<Integer, Integer, String>> { + + private final MutableObjectIterator<Tuple2<Integer, Integer>> input; + + private StringIteratorDecorator(MutableObjectIterator<Tuple2<Integer, Integer>> input) { + this.input = input; + } + + @Override + public Tuple3<Integer, Integer, String> next(Tuple3<Integer, Integer, String> reuse) throws IOException { + Tuple2<Integer, Integer> next = input.next(); + if (next == null) { + return null; + } + else { + reuse.f0 = next.f0; + reuse.f1 = next.f1; + reuse.f2 = "test string"; + return reuse; + } + } + + @Override + public Tuple3<Integer, Integer, String> next() throws IOException { + Tuple2<Integer, Integer> next = input.next(); + if (next == null) { + return null; + } + else { + return new Tuple3<Integer, Integer, String>(next.f0, next.f1, "test string"); + } + } + } + + // ------------------------------------------------------------------------ + + private static class SingleValueIterator<T> implements MutableObjectIterator<T> { + + private final T value; + + private boolean pending = true; + + private SingleValueIterator(T value) { + this.value = value; + } + + @Override + public T next(T reuse) { + return next(); + } + + @Override + public T next() { + if (pending) { + pending = false; + return value; + } else { + return null; + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/01c74338/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DelayingIterator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DelayingIterator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DelayingIterator.java new file mode 100644 index 0000000..b3d53c7 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DelayingIterator.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.testutils; + +import org.apache.flink.util.MutableObjectIterator; + +import java.io.IOException; + +public class DelayingIterator<T> implements MutableObjectIterator<T> { + + private final MutableObjectIterator<T> iterator; + private final int delay; + + + public DelayingIterator(MutableObjectIterator<T> iterator, int delay) { + this.iterator = iterator; + this.delay = delay; + } + + @Override + public T next(T reuse) throws IOException { + try { + Thread.sleep(delay); + } + catch (InterruptedException e) { + // ignore, but restore interrupted state + Thread.currentThread().interrupt(); + } + return iterator.next(reuse); + } + + @Override + public T next() throws IOException { + try { + Thread.sleep(delay); + } + catch (InterruptedException e) { + // ignore, but restore interrupted state + Thread.currentThread().interrupt(); + } + return iterator.next(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/01c74338/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/InfiniteIntTupleIterator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/InfiniteIntTupleIterator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/InfiniteIntTupleIterator.java new file mode 100644 index 0000000..ba2181b --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/InfiniteIntTupleIterator.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.testutils; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.util.MutableObjectIterator; + +/** + * A simple iterator that returns an infinite amount of (0, 0) tuples. + */ +public class InfiniteIntTupleIterator implements MutableObjectIterator<Tuple2<Integer, Integer>> { + + @Override + public Tuple2<Integer, Integer> next(Tuple2<Integer, Integer> reuse) { + return next(); + } + + @Override + public Tuple2<Integer, Integer> next() { + return new Tuple2<Integer, Integer>(0, 0); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/01c74338/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java index 400e798..fd34a3b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java @@ -226,7 +226,7 @@ public final class TestData { length = valueLength - random.nextInt(valueLength / 3); } - StringBuffer sb = new StringBuffer(); + StringBuilder sb = new StringBuilder(); for (int i = 0; i < length; i++) { sb.append(alpha[random.nextInt(alpha.length)]); } http://git-wip-us.apache.org/repos/asf/flink/blob/01c74338/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java new file mode 100644 index 0000000..1e25bab --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.testutils; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.functions.util.FunctionUtils; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerFactory; +import org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.memorymanager.DefaultMemoryManager; +import org.apache.flink.runtime.memorymanager.MemoryManager; +import org.apache.flink.runtime.operators.PactDriver; +import org.apache.flink.runtime.operators.PactTaskContext; +import org.apache.flink.runtime.operators.ResettablePactDriver; +import org.apache.flink.runtime.operators.sort.UnilateralSortMerger; +import org.apache.flink.runtime.operators.util.TaskConfig; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; + +import org.junit.After; +import org.junit.Assert; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; + +@RunWith(Parameterized.class) +public class UnaryOperatorTestBase<S extends Function, IN, OUT> implements PactTaskContext<S, OUT> { + + protected static final long DEFAULT_PER_SORT_MEM = 16 * 1024 * 1024; + + protected static final int PAGE_SIZE = 32 * 1024; + + private final IOManager ioManager; + + private final MemoryManager memManager; + + private MutableObjectIterator<IN> input; + + private TypeSerializer<IN> inputSerializer; + + private List<TypeComparator<IN>> comparators; + + private UnilateralSortMerger<IN> sorter; + + private final AbstractInvokable owner; + + private final TaskConfig taskConfig; + + protected final long perSortMem; + + protected final double perSortFractionMem; + + private Collector<OUT> output; + + protected int numFileHandles; + + private S stub; + + private PactDriver<S, OUT> driver; + + private volatile boolean running; + + private ExecutionConfig executionConfig; + + protected UnaryOperatorTestBase(ExecutionConfig executionConfig, long memory, int maxNumSorters) { + this(executionConfig, memory, maxNumSorters, DEFAULT_PER_SORT_MEM); + } + + protected UnaryOperatorTestBase(ExecutionConfig executionConfig, long memory, int maxNumSorters, long perSortMemory) { + if (memory < 0 || maxNumSorters < 0 || perSortMemory < 0) { + throw new IllegalArgumentException(); + } + + final long totalMem = Math.max(memory, 0) + (Math.max(maxNumSorters, 0) * perSortMemory); + + this.perSortMem = perSortMemory; + this.perSortFractionMem = (double)perSortMemory/totalMem; + this.ioManager = new IOManagerAsync(); + this.memManager = totalMem > 0 ? new DefaultMemoryManager(totalMem,1) : null; + this.owner = new DummyInvokable(); + + Configuration config = new Configuration(); + this.taskConfig = new TaskConfig(config); + + this.executionConfig = executionConfig; + this.comparators = new ArrayList<TypeComparator<IN>>(2); + } + + @Parameterized.Parameters + public static Collection<Object[]> getConfigurations() { + ExecutionConfig withReuse = new ExecutionConfig(); + withReuse.enableObjectReuse(); + + ExecutionConfig withoutReuse = new ExecutionConfig(); + withoutReuse.disableObjectReuse(); + + Object[] a = { withoutReuse }; + Object[] b = { withReuse }; + return Arrays.asList(a, b); + } + + public void setInput(MutableObjectIterator<IN> input, TypeSerializer<IN> serializer) { + this.input = input; + this.inputSerializer = serializer; + this.sorter = null; + } + + public void addInputSorted(MutableObjectIterator<IN> input, + TypeSerializer<IN> serializer, + TypeComparator<IN> comp) throws Exception + { + this.input = null; + this.inputSerializer = serializer; + this.sorter = new UnilateralSortMerger<IN>( + this.memManager, this.ioManager, input, this.owner, + this.<IN>getInputSerializer(0), + comp, + this.perSortFractionMem, 32, 0.8f); + } + + public void addDriverComparator(TypeComparator<IN> comparator) { + this.comparators.add(comparator); + } + + public void setOutput(Collector<OUT> output) { + this.output = output; + } + public void setOutput(List<OUT> output, TypeSerializer<OUT> outSerializer) { + this.output = new ListOutputCollector<OUT>(output, outSerializer); + } + + public int getNumFileHandlesForSort() { + return numFileHandles; + } + + + public void setNumFileHandlesForSort(int numFileHandles) { + this.numFileHandles = numFileHandles; + } + + @SuppressWarnings("rawtypes") + public void testDriver(PactDriver driver, Class stubClass) throws Exception { + testDriverInternal(driver, stubClass); + } + + @SuppressWarnings({"unchecked","rawtypes"}) + public void testDriverInternal(PactDriver driver, Class stubClass) throws Exception { + + this.driver = driver; + driver.setup(this); + + this.stub = (S)stubClass.newInstance(); + + // regular running logic + this.running = true; + boolean stubOpen = false; + + try { + // run the data preparation + try { + driver.prepare(); + } + catch (Throwable t) { + throw new Exception("The data preparation caused an error: " + t.getMessage(), t); + } + + // open stub implementation + try { + FunctionUtils.openFunction(this.stub, getTaskConfig().getStubParameters()); + stubOpen = true; + } + catch (Throwable t) { + throw new Exception("The user defined 'open()' method caused an exception: " + t.getMessage(), t); + } + + // run the user code + driver.run(); + + // close. We close here such that a regular close throwing an exception marks a task as failed. + if (this.running) { + FunctionUtils.closeFunction (this.stub); + stubOpen = false; + } + + this.output.close(); + } + catch (Exception ex) { + // close the input, but do not report any exceptions, since we already have another root cause + if (stubOpen) { + try { + FunctionUtils.closeFunction(this.stub); + } + catch (Throwable t) { + // ignore + } + } + + // if resettable driver invoke tear-down + if (this.driver instanceof ResettablePactDriver) { + final ResettablePactDriver<?, ?> resDriver = (ResettablePactDriver<?, ?>) this.driver; + try { + resDriver.teardown(); + } catch (Throwable t) { + throw new Exception("Error while shutting down an iterative operator: " + t.getMessage(), t); + } + } + + // drop exception, if the task was canceled + if (this.running) { + throw ex; + } + + } + finally { + driver.cleanup(); + } + } + + @SuppressWarnings({"unchecked","rawtypes"}) + public void testResettableDriver(ResettablePactDriver driver, Class stubClass, int iterations) throws Exception { + driver.setup(this); + + for (int i = 0; i < iterations; i++) { + if (i == 0) { + driver.initialize(); + } + else { + driver.reset(); + } + testDriver(driver, stubClass); + } + + driver.teardown(); + } + + public void cancel() throws Exception { + this.running = false; + this.driver.cancel(); + } + + // -------------------------------------------------------------------------------------------- + + @Override + public TaskConfig getTaskConfig() { + return this.taskConfig; + } + + @Override + public ExecutionConfig getExecutionConfig() { + return executionConfig; + } + + @Override + public ClassLoader getUserCodeClassLoader() { + return getClass().getClassLoader(); + } + + @Override + public IOManager getIOManager() { + return this.ioManager; + } + + @Override + public MemoryManager getMemoryManager() { + return this.memManager; + } + + @Override + public <X> MutableObjectIterator<X> getInput(int index) { + MutableObjectIterator<IN> in = this.input; + if (in == null) { + // waiting from sorter + try { + in = this.sorter.getIterator(); + } + catch (InterruptedException e) { + throw new RuntimeException("Interrupted"); + } + this.input = in; + } + + @SuppressWarnings("unchecked") + MutableObjectIterator<X> input = (MutableObjectIterator<X>) this.input; + return input; + } + + @Override + public <X> TypeSerializerFactory<X> getInputSerializer(int index) { + if (index != 0) { + throw new IllegalArgumentException(); + } + + @SuppressWarnings("unchecked") + TypeSerializer<X> ser = (TypeSerializer<X>) inputSerializer; + return new RuntimeSerializerFactory<X>(ser, (Class<X>) ser.createInstance().getClass()); + } + + @Override + public <X> TypeComparator<X> getDriverComparator(int index) { + @SuppressWarnings("unchecked") + TypeComparator<X> comparator = (TypeComparator<X>) this.comparators.get(index); + return comparator; + } + + @Override + public S getStub() { + return this.stub; + } + + @Override + public Collector<OUT> getOutputCollector() { + return this.output; + } + + @Override + public AbstractInvokable getOwningNepheleTask() { + return this.owner; + } + + @Override + public String formatLogString(String message) { + return "Driver Tester: " + message; + } + + // -------------------------------------------------------------------------------------------- + + @After + public void shutdownAll() throws Exception { + // 1st, shutdown sorters + if (this.sorter != null) { + sorter.close(); + } + + // 2nd, shutdown I/O + this.ioManager.shutdown(); + Assert.assertTrue("I/O Manager has not properly shut down.", this.ioManager.isProperlyShutDown()); + + // last, verify all memory is returned and shutdown mem manager + MemoryManager memMan = getMemoryManager(); + if (memMan != null) { + Assert.assertTrue("Memory Manager managed memory was not completely freed.", memMan.verifyEmpty()); + memMan.shutdown(); + } + } + + // -------------------------------------------------------------------------------------------- + + private static final class ListOutputCollector<OUT> implements Collector<OUT> { + + private final List<OUT> output; + private final TypeSerializer<OUT> serializer; + + public ListOutputCollector(List<OUT> outputList, TypeSerializer<OUT> serializer) { + this.output = outputList; + this.serializer = serializer; + } + + + @Override + public void collect(OUT record) { + this.output.add(serializer.copy(record)); + } + + @Override + public void close() {} + } + + public static final class CountingOutputCollector<OUT> implements Collector<OUT> { + + private int num; + + @Override + public void collect(OUT record) { + this.num++; + } + + @Override + public void close() {} + + public int getNumberOfRecords() { + return this.num; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/01c74338/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformIntStringTupleGenerator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformIntStringTupleGenerator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformIntStringTupleGenerator.java new file mode 100644 index 0000000..451cf9e --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformIntStringTupleGenerator.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.testutils; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.util.MutableObjectIterator; + +public class UniformIntStringTupleGenerator implements MutableObjectIterator<Tuple2<Integer, String>> { + + private final int numKeys; + private final int numVals; + + private int keyCnt; + private int valCnt; + + private boolean repeatKey; + + + public UniformIntStringTupleGenerator(int numKeys, int numVals, boolean repeatKey) { + this.numKeys = numKeys; + this.numVals = numVals; + this.repeatKey = repeatKey; + } + + @Override + public Tuple2<Integer, String> next(Tuple2<Integer, String> target) { + if (!repeatKey) { + if(valCnt >= numVals) { + return null; + } + + target.f0 = keyCnt++; + target.f1 = Integer.toBinaryString(valCnt); + + if(keyCnt == numKeys) { + keyCnt = 0; + valCnt++; + } + } + else { + if (keyCnt >= numKeys) { + return null; + } + + target.f0 = keyCnt; + target.f1 = Integer.toBinaryString(valCnt++); + + if (valCnt == numVals) { + valCnt = 0; + keyCnt++; + } + } + + return target; + } + + @Override + public Tuple2<Integer, String> next() { + return next(new Tuple2<Integer, String>()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/01c74338/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformIntTupleGenerator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformIntTupleGenerator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformIntTupleGenerator.java new file mode 100644 index 0000000..457b4ad --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformIntTupleGenerator.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.testutils; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.util.MutableObjectIterator; + +public class UniformIntTupleGenerator implements MutableObjectIterator<Tuple2<Integer, Integer>> { + + private final int numKeys; + private final int numVals; + + private int keyCnt = 0; + private int valCnt = 0; + private boolean repeatKey; + + public UniformIntTupleGenerator(int numKeys, int numVals, boolean repeatKey) { + this.numKeys = numKeys; + this.numVals = numVals; + this.repeatKey = repeatKey; + } + + @Override + public Tuple2<Integer, Integer> next(Tuple2<Integer, Integer> target) { + if (!repeatKey) { + if(valCnt >= numVals) { + return null; + } + + target.f0 = keyCnt++; + target.f1 = valCnt; + + if (keyCnt == numKeys) { + keyCnt = 0; + valCnt++; + } + } + else { + if (keyCnt >= numKeys) { + return null; + } + + target.f0 = keyCnt; + target.f1 = valCnt++; + + if (valCnt == numVals) { + valCnt = 0; + keyCnt++; + } + } + + return target; + } + + @Override + public Tuple2<Integer, Integer> next() { + return next(new Tuple2<Integer, Integer>()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/01c74338/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnionIterator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnionIterator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnionIterator.java index 3a76ebd..1127fca 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnionIterator.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnionIterator.java @@ -16,26 +16,30 @@ * limitations under the License. */ - package org.apache.flink.runtime.operators.testutils; import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import org.apache.flink.util.MutableObjectIterator; - /** * An iterator that returns the union of a given set of iterators. */ -public class UnionIterator<E> implements MutableObjectIterator<E> -{ +public class UnionIterator<E> implements MutableObjectIterator<E> { + private MutableObjectIterator<E> currentSource; private List<MutableObjectIterator<E>> nextSources; + + + public UnionIterator(MutableObjectIterator<E>... iterators) { + this(new ArrayList<MutableObjectIterator<E>>(Arrays.asList(iterators))); + } - public UnionIterator(List<MutableObjectIterator<E>> sources) - { + public UnionIterator(List<MutableObjectIterator<E>> sources) { this.currentSource = sources.remove(0); this.nextSources = sources; }
