Repository: flink
Updated Branches:
  refs/heads/release-1.2 52b6e2fda -> a820f662d


[FLINK-6394] [runtime] Respect object reuse configuration when executing group 
combining function

This closes #3803.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a820f662
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a820f662
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a820f662

Branch: refs/heads/release-1.2
Commit: a820f662d20d35915f21ab6bf0be4004e79b8c6b
Parents: 52b6e2f
Author: Kurt Young <ykt...@gmail.com>
Authored: Sun Apr 30 16:56:00 2017 +0800
Committer: Kurt Young <k...@apache.org>
Committed: Mon May 8 20:22:36 2017 +0800

----------------------------------------------------------------------
 .../sort/CombiningUnilateralSortMerger.java     | 48 +++++++++----
 .../util/NonReusingKeyGroupedIterator.java      |  1 +
 .../CombiningUnilateralSortMergerITCase.java    | 71 +++++++++++++++++++-
 3 files changed, 107 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a820f662/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
index a02ced2..5500f37 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
@@ -33,6 +33,7 @@ import 
org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryAllocationException;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.util.EmptyMutableObjectIterator;
+import org.apache.flink.runtime.util.NonReusingKeyGroupedIterator;
 import org.apache.flink.runtime.util.ReusingKeyGroupedIterator;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
@@ -162,7 +163,8 @@ public class CombiningUnilateralSortMerger<E> extends 
UnilateralSortMerger<E> {
                List<MemorySegment> sortReadMemory, List<MemorySegment> 
writeMemory, int maxFileHandles)
        {
                return new CombiningSpillingThread(exceptionHandler, queues, 
parentTask,
-                       memoryManager, ioManager, 
serializerFactory.getSerializer(), comparator, sortReadMemory, writeMemory, 
maxFileHandles);
+                               memoryManager, ioManager, 
serializerFactory.getSerializer(),
+                               comparator, sortReadMemory, writeMemory, 
maxFileHandles, objectReuseEnabled);
        }
 
        // 
------------------------------------------------------------------------
@@ -172,16 +174,20 @@ public class CombiningUnilateralSortMerger<E> extends 
UnilateralSortMerger<E> {
        protected class CombiningSpillingThread extends SpillingThread {
                
                private final TypeComparator<E> comparator2;
-               
+
+               private final boolean objectReuseEnabled;
+
                public CombiningSpillingThread(ExceptionHandler<IOException> 
exceptionHandler, CircularQueues<E> queues,
                                AbstractInvokable parentTask, MemoryManager 
memManager, IOManager ioManager, 
                                TypeSerializer<E> serializer, TypeComparator<E> 
comparator, 
-                               List<MemorySegment> sortReadMemory, 
List<MemorySegment> writeMemory, int maxNumFileHandles)
+                               List<MemorySegment> sortReadMemory, 
List<MemorySegment> writeMemory, int maxNumFileHandles,
+                               boolean objectReuseEnabled)
                {
                        super(exceptionHandler, queues, parentTask, memManager, 
ioManager, serializer, comparator, 
                                sortReadMemory, writeMemory, maxNumFileHandles);
                        
                        this.comparator2 = comparator.duplicate();
+                       this.objectReuseEnabled = objectReuseEnabled;
                }
 
                /**
@@ -315,7 +321,8 @@ public class CombiningUnilateralSortMerger<E> extends 
UnilateralSortMerger<E> {
 
                                // set up the combining helpers
                                final InMemorySorter<E> buffer = element.buffer;
-                               final CombineValueIterator<E> iter = new 
CombineValueIterator<E>(buffer, this.serializer.createInstance());
+                               final CombineValueIterator<E> iter = new 
CombineValueIterator<E>(
+                                               buffer, 
this.serializer.createInstance(), this.objectReuseEnabled);
                                final WriterCollector<E> collector = new 
WriterCollector<E>(output, this.serializer);
 
                                int i = 0;
@@ -454,7 +461,6 @@ public class CombiningUnilateralSortMerger<E> extends 
UnilateralSortMerger<E> {
 
                        // the list with the target iterators
                        final MergeIterator<E> mergeIterator = 
getMergingIterator(channelIDs, readBuffers, channelAccesses, null);
-                       final ReusingKeyGroupedIterator<E> groupedIter = new 
ReusingKeyGroupedIterator<E>(mergeIterator, this.serializer, this.comparator2);
 
                        // create a new channel writer
                        final FileIOChannel.ID mergedChannelID = 
this.ioManager.createChannel();
@@ -469,8 +475,18 @@ public class CombiningUnilateralSortMerger<E> extends 
UnilateralSortMerger<E> {
 
                        // combine and write to disk
                        try {
-                               while (groupedIter.nextKey()) {
-                                       
combineStub.combine(groupedIter.getValues(), collector);
+                               if (objectReuseEnabled) {
+                                       final ReusingKeyGroupedIterator<E> 
groupedIter = new ReusingKeyGroupedIterator<>(
+                                                       mergeIterator, 
this.serializer, this.comparator2);
+                                       while (groupedIter.nextKey()) {
+                                               
combineStub.combine(groupedIter.getValues(), collector);
+                                       }
+                               } else {
+                                       final NonReusingKeyGroupedIterator<E> 
groupedIter = new NonReusingKeyGroupedIterator<>(
+                                                       mergeIterator, 
this.comparator2);
+                                       while (groupedIter.nextKey()) {
+                                               
combineStub.combine(groupedIter.getValues(), collector);
+                                       }
                                }
                        }
                        catch (Exception e) {
@@ -505,7 +521,9 @@ public class CombiningUnilateralSortMerger<E> extends 
UnilateralSortMerger<E> {
                
                private final InMemorySorter<E> buffer; // the buffer from 
which values are returned
                
-               private E record;
+               private E recordReuse;
+
+               private final boolean objectReuseEnabled;
 
                private int last; // the position of the last value to be 
returned
 
@@ -519,9 +537,10 @@ public class CombiningUnilateralSortMerger<E> extends 
UnilateralSortMerger<E> {
                 * @param buffer
                 *        The buffer to get the values from.
                 */
-               public CombineValueIterator(InMemorySorter<E> buffer, E 
instance) {
+               public CombineValueIterator(InMemorySorter<E> buffer, E 
instance, boolean objectReuseEnabled) {
                        this.buffer = buffer;
-                       this.record = instance;
+                       this.recordReuse = instance;
+                       this.objectReuseEnabled = objectReuseEnabled;
                }
 
                /**
@@ -547,9 +566,14 @@ public class CombiningUnilateralSortMerger<E> extends 
UnilateralSortMerger<E> {
                public E next() {
                        if (this.position <= this.last) {
                                try {
-                                       this.record = 
this.buffer.getRecord(this.record, this.position);
+                                       E record;
+                                       if (objectReuseEnabled) {
+                                               record = 
this.buffer.getRecord(this.recordReuse, this.position);
+                                       } else {
+                                               record = 
this.buffer.getRecord(this.position);
+                                       }
                                        this.position++;
-                                       return this.record;
+                                       return record;
                                }
                                catch (IOException ioex) {
                                        LOG.error("Error retrieving a value 
from a buffer.", ioex);

http://git-wip-us.apache.org/repos/asf/flink/blob/a820f662/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonReusingKeyGroupedIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonReusingKeyGroupedIterator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonReusingKeyGroupedIterator.java
index 6f4448c..221cf24 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonReusingKeyGroupedIterator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonReusingKeyGroupedIterator.java
@@ -164,6 +164,7 @@ public final class NonReusingKeyGroupedIterator<E> 
implements KeyGroupedIterator
         * 
         * @return Iterator over all values that belong to the current key.
         */
+       @Override
        public ValuesIterator getValues() {
                return this.valuesIterator;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/a820f662/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
index 0f636ef..2c875f2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.operators.sort;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Hashtable;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
@@ -183,6 +184,42 @@ public class CombiningUnilateralSortMergerITCase {
        }
 
        @Test
+       public void testCombineSpillingDisableObjectReuse() throws Exception {
+               int noKeys = 100;
+               int noKeyCnt = 10000;
+
+               TestData.MockTuple2Reader<Tuple2<Integer, Integer>> reader = 
TestData.getIntIntTupleReader();
+
+               LOG.debug("initializing sortmerger");
+
+               MaterializedCountCombiner comb = new 
MaterializedCountCombiner();
+
+               // set maxNumFileHandles = 2 to trigger multiple channel merging
+               Sorter<Tuple2<Integer, Integer>> merger = new 
CombiningUnilateralSortMerger<>(comb,
+                               this.memoryManager, this.ioManager, reader, 
this.parentTask, this.serializerFactory2, this.comparator2,
+                               0.01, 2, 0.005f, true /* use large record 
handler */, false);
+
+               final Tuple2<Integer, Integer> rec = new Tuple2<>();
+
+               for (int i = 0; i < noKeyCnt; i++) {
+                       rec.setField(i, 0);
+                       for (int j = 0; j < noKeys; j++) {
+                               rec.setField(j, 1);
+                               reader.emit(rec);
+                       }
+               }
+               reader.close();
+
+               MutableObjectIterator<Tuple2<Integer, Integer>> iterator = 
merger.getIterator();
+               Iterator<Integer> result = getReducingIterator(iterator, 
serializerFactory2.getSerializer(), comparator2.duplicate());
+               while (result.hasNext()) {
+                       Assert.assertEquals(4950, result.next().intValue());
+               }
+
+               merger.close();
+       }
+
+       @Test
        public void testSortAndValidate() throws Exception
        {
                final Hashtable<Integer, Integer> countTable = new 
Hashtable<>(KEY_MAX);
@@ -331,7 +368,39 @@ public class CombiningUnilateralSortMergerITCase {
                        closed = true;
                }
        }
-       
+
+       // 
--------------------------------------------------------------------------------------------
+
+       public static class MaterializedCountCombiner
+                       extends RichGroupReduceFunction<Tuple2<Integer, 
Integer>, Tuple2<Integer, Integer>>
+                       implements GroupCombineFunction<Tuple2<Integer, 
Integer>, Tuple2<Integer, Integer>>
+       {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void combine(Iterable<Tuple2<Integer, Integer>> values, 
Collector<Tuple2<Integer, Integer>> out) {
+                       ArrayList<Tuple2<Integer, Integer>> valueList = new 
ArrayList<>();
+                       for (Tuple2<Integer, Integer> next : values) {
+                               valueList.add(next);
+                       }
+
+                       int count = 0;
+                       Tuple2<Integer, Integer> rec = new Tuple2<>();
+                       for (Tuple2<Integer, Integer> tuple : valueList) {
+                               rec.setField(tuple.f0, 0);
+                               count += tuple.f1;
+                       }
+                       rec.setField(count, 1);
+                       out.collect(rec);
+               }
+
+               @Override
+               public void reduce(Iterable<Tuple2<Integer, Integer>> values,
+                               Collector<Tuple2<Integer, Integer>> out) throws 
Exception
+               {
+               }
+       }
+
        private static Iterator<Integer> 
getReducingIterator(MutableObjectIterator<Tuple2<Integer, Integer>> data, 
TypeSerializer<Tuple2<Integer, Integer>> serializer, 
TypeComparator<Tuple2<Integer, Integer>>  comparator) {
                
                final ReusingKeyGroupedIterator<Tuple2<Integer, Integer>>  
groupIter = new ReusingKeyGroupedIterator<> (data, serializer, comparator);

Reply via email to