Repository: flink Updated Branches: refs/heads/release-1.2 52b6e2fda -> a820f662d
[FLINK-6394] [runtime] Respect object reuse configuration when executing group combining function This closes #3803. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a820f662 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a820f662 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a820f662 Branch: refs/heads/release-1.2 Commit: a820f662d20d35915f21ab6bf0be4004e79b8c6b Parents: 52b6e2f Author: Kurt Young <ykt...@gmail.com> Authored: Sun Apr 30 16:56:00 2017 +0800 Committer: Kurt Young <k...@apache.org> Committed: Mon May 8 20:22:36 2017 +0800 ---------------------------------------------------------------------- .../sort/CombiningUnilateralSortMerger.java | 48 +++++++++---- .../util/NonReusingKeyGroupedIterator.java | 1 + .../CombiningUnilateralSortMergerITCase.java | 71 +++++++++++++++++++- 3 files changed, 107 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a820f662/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java index a02ced2..5500f37 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java @@ -33,6 +33,7 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.memory.MemoryAllocationException; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.util.EmptyMutableObjectIterator; +import org.apache.flink.runtime.util.NonReusingKeyGroupedIterator; import org.apache.flink.runtime.util.ReusingKeyGroupedIterator; import org.apache.flink.util.Collector; import org.apache.flink.util.MutableObjectIterator; @@ -162,7 +163,8 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> { List<MemorySegment> sortReadMemory, List<MemorySegment> writeMemory, int maxFileHandles) { return new CombiningSpillingThread(exceptionHandler, queues, parentTask, - memoryManager, ioManager, serializerFactory.getSerializer(), comparator, sortReadMemory, writeMemory, maxFileHandles); + memoryManager, ioManager, serializerFactory.getSerializer(), + comparator, sortReadMemory, writeMemory, maxFileHandles, objectReuseEnabled); } // ------------------------------------------------------------------------ @@ -172,16 +174,20 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> { protected class CombiningSpillingThread extends SpillingThread { private final TypeComparator<E> comparator2; - + + private final boolean objectReuseEnabled; + public CombiningSpillingThread(ExceptionHandler<IOException> exceptionHandler, CircularQueues<E> queues, AbstractInvokable parentTask, MemoryManager memManager, IOManager ioManager, TypeSerializer<E> serializer, TypeComparator<E> comparator, - List<MemorySegment> sortReadMemory, List<MemorySegment> writeMemory, int maxNumFileHandles) + List<MemorySegment> sortReadMemory, List<MemorySegment> writeMemory, int maxNumFileHandles, + boolean objectReuseEnabled) { super(exceptionHandler, queues, parentTask, memManager, ioManager, serializer, comparator, sortReadMemory, writeMemory, maxNumFileHandles); this.comparator2 = comparator.duplicate(); + this.objectReuseEnabled = objectReuseEnabled; } /** @@ -315,7 +321,8 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> { // set up the combining helpers final InMemorySorter<E> buffer = element.buffer; - final CombineValueIterator<E> iter = new CombineValueIterator<E>(buffer, this.serializer.createInstance()); + final CombineValueIterator<E> iter = new CombineValueIterator<E>( + buffer, this.serializer.createInstance(), this.objectReuseEnabled); final WriterCollector<E> collector = new WriterCollector<E>(output, this.serializer); int i = 0; @@ -454,7 +461,6 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> { // the list with the target iterators final MergeIterator<E> mergeIterator = getMergingIterator(channelIDs, readBuffers, channelAccesses, null); - final ReusingKeyGroupedIterator<E> groupedIter = new ReusingKeyGroupedIterator<E>(mergeIterator, this.serializer, this.comparator2); // create a new channel writer final FileIOChannel.ID mergedChannelID = this.ioManager.createChannel(); @@ -469,8 +475,18 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> { // combine and write to disk try { - while (groupedIter.nextKey()) { - combineStub.combine(groupedIter.getValues(), collector); + if (objectReuseEnabled) { + final ReusingKeyGroupedIterator<E> groupedIter = new ReusingKeyGroupedIterator<>( + mergeIterator, this.serializer, this.comparator2); + while (groupedIter.nextKey()) { + combineStub.combine(groupedIter.getValues(), collector); + } + } else { + final NonReusingKeyGroupedIterator<E> groupedIter = new NonReusingKeyGroupedIterator<>( + mergeIterator, this.comparator2); + while (groupedIter.nextKey()) { + combineStub.combine(groupedIter.getValues(), collector); + } } } catch (Exception e) { @@ -505,7 +521,9 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> { private final InMemorySorter<E> buffer; // the buffer from which values are returned - private E record; + private E recordReuse; + + private final boolean objectReuseEnabled; private int last; // the position of the last value to be returned @@ -519,9 +537,10 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> { * @param buffer * The buffer to get the values from. */ - public CombineValueIterator(InMemorySorter<E> buffer, E instance) { + public CombineValueIterator(InMemorySorter<E> buffer, E instance, boolean objectReuseEnabled) { this.buffer = buffer; - this.record = instance; + this.recordReuse = instance; + this.objectReuseEnabled = objectReuseEnabled; } /** @@ -547,9 +566,14 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> { public E next() { if (this.position <= this.last) { try { - this.record = this.buffer.getRecord(this.record, this.position); + E record; + if (objectReuseEnabled) { + record = this.buffer.getRecord(this.recordReuse, this.position); + } else { + record = this.buffer.getRecord(this.position); + } this.position++; - return this.record; + return record; } catch (IOException ioex) { LOG.error("Error retrieving a value from a buffer.", ioex); http://git-wip-us.apache.org/repos/asf/flink/blob/a820f662/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonReusingKeyGroupedIterator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonReusingKeyGroupedIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonReusingKeyGroupedIterator.java index 6f4448c..221cf24 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonReusingKeyGroupedIterator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonReusingKeyGroupedIterator.java @@ -164,6 +164,7 @@ public final class NonReusingKeyGroupedIterator<E> implements KeyGroupedIterator * * @return Iterator over all values that belong to the current key. */ + @Override public ValuesIterator getValues() { return this.valuesIterator; } http://git-wip-us.apache.org/repos/asf/flink/blob/a820f662/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java index 0f636ef..2c875f2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.operators.sort; import java.io.IOException; +import java.util.ArrayList; import java.util.Hashtable; import java.util.Iterator; import java.util.NoSuchElementException; @@ -183,6 +184,42 @@ public class CombiningUnilateralSortMergerITCase { } @Test + public void testCombineSpillingDisableObjectReuse() throws Exception { + int noKeys = 100; + int noKeyCnt = 10000; + + TestData.MockTuple2Reader<Tuple2<Integer, Integer>> reader = TestData.getIntIntTupleReader(); + + LOG.debug("initializing sortmerger"); + + MaterializedCountCombiner comb = new MaterializedCountCombiner(); + + // set maxNumFileHandles = 2 to trigger multiple channel merging + Sorter<Tuple2<Integer, Integer>> merger = new CombiningUnilateralSortMerger<>(comb, + this.memoryManager, this.ioManager, reader, this.parentTask, this.serializerFactory2, this.comparator2, + 0.01, 2, 0.005f, true /* use large record handler */, false); + + final Tuple2<Integer, Integer> rec = new Tuple2<>(); + + for (int i = 0; i < noKeyCnt; i++) { + rec.setField(i, 0); + for (int j = 0; j < noKeys; j++) { + rec.setField(j, 1); + reader.emit(rec); + } + } + reader.close(); + + MutableObjectIterator<Tuple2<Integer, Integer>> iterator = merger.getIterator(); + Iterator<Integer> result = getReducingIterator(iterator, serializerFactory2.getSerializer(), comparator2.duplicate()); + while (result.hasNext()) { + Assert.assertEquals(4950, result.next().intValue()); + } + + merger.close(); + } + + @Test public void testSortAndValidate() throws Exception { final Hashtable<Integer, Integer> countTable = new Hashtable<>(KEY_MAX); @@ -331,7 +368,39 @@ public class CombiningUnilateralSortMergerITCase { closed = true; } } - + + // -------------------------------------------------------------------------------------------- + + public static class MaterializedCountCombiner + extends RichGroupReduceFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> + implements GroupCombineFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> + { + private static final long serialVersionUID = 1L; + + @Override + public void combine(Iterable<Tuple2<Integer, Integer>> values, Collector<Tuple2<Integer, Integer>> out) { + ArrayList<Tuple2<Integer, Integer>> valueList = new ArrayList<>(); + for (Tuple2<Integer, Integer> next : values) { + valueList.add(next); + } + + int count = 0; + Tuple2<Integer, Integer> rec = new Tuple2<>(); + for (Tuple2<Integer, Integer> tuple : valueList) { + rec.setField(tuple.f0, 0); + count += tuple.f1; + } + rec.setField(count, 1); + out.collect(rec); + } + + @Override + public void reduce(Iterable<Tuple2<Integer, Integer>> values, + Collector<Tuple2<Integer, Integer>> out) throws Exception + { + } + } + private static Iterator<Integer> getReducingIterator(MutableObjectIterator<Tuple2<Integer, Integer>> data, TypeSerializer<Tuple2<Integer, Integer>> serializer, TypeComparator<Tuple2<Integer, Integer>> comparator) { final ReusingKeyGroupedIterator<Tuple2<Integer, Integer>> groupIter = new ReusingKeyGroupedIterator<> (data, serializer, comparator);