[FLINK-2653] [runtime] Enable object reuse in MergeIterator

This closes #1115


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0a8df6d5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0a8df6d5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0a8df6d5

Branch: refs/heads/master
Commit: 0a8df6d513fa59d650ff875bdf3a1613d0f14af5
Parents: 6891212
Author: Greg Hogan <[email protected]>
Authored: Thu Sep 10 09:35:39 2015 -0400
Committer: Stephan Ewen <[email protected]>
Committed: Tue Sep 29 12:21:34 2015 +0200

----------------------------------------------------------------------
 .../flink/runtime/operators/DataSinkTask.java   |  3 +-
 .../runtime/operators/RegularPactTask.java      |  4 +-
 .../sort/CombiningUnilateralSortMerger.java     |  9 ++--
 .../operators/sort/LargeRecordHandler.java      |  3 +-
 .../runtime/operators/sort/MergeIterator.java   | 47 +++++++++++++++++---
 .../operators/sort/UnilateralSortMerger.java    | 47 ++++++++++++++------
 .../operators/ReduceTaskExternalITCase.java     |  4 +-
 .../flink/runtime/operators/ReduceTaskTest.java |  2 +-
 .../CombiningUnilateralSortMergerITCase.java    |  6 +--
 .../operators/sort/ExternalSortITCase.java      | 10 ++---
 .../sort/ExternalSortLargeRecordsITCase.java    |  8 ++--
 .../testutils/BinaryOperatorTestBase.java       |  3 +-
 .../operators/testutils/DriverTestBase.java     |  2 +-
 .../testutils/UnaryOperatorTestBase.java        |  2 +-
 .../operators/util/HashVsSortMiniBenchmark.java |  4 +-
 .../flink/tez/runtime/DataSinkProcessor.java    |  2 +-
 .../org/apache/flink/tez/runtime/TezTask.java   |  4 +-
 .../flink/test/manual/MassiveStringSorting.java |  4 +-
 .../test/manual/MassiveStringValueSorting.java  |  4 +-
 .../manual/MassiveCaseClassSortingITCase.scala  |  2 +-
 20 files changed, 114 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
index 39a0a28..1002bae 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
@@ -164,7 +164,8 @@ public class DataSinkTask<IT> extends AbstractInvokable {
                                                        
getEnvironment().getIOManager(),
                                                        this.reader, this, 
this.inputTypeSerializerFactory, compFact.createComparator(),
                                                        
this.config.getRelativeMemoryInput(0), this.config.getFilehandlesInput(0),
-                                                       
this.config.getSpillingThresholdInput(0));
+                                                       
this.config.getSpillingThresholdInput(0),
+                                                       
this.getExecutionConfig().isObjectReuseEnabled());
                                        
                                        this.localStrategy = sorter;
                                        input1 = sorter.getIterator();

http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
index 6d35f92..89963af 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
@@ -923,7 +923,7 @@ public class RegularPactTask<S extends Function, OT> 
extends AbstractInvokable i
                                UnilateralSortMerger<?> sorter = new 
UnilateralSortMerger(getMemoryManager(), getIOManager(),
                                        this.inputIterators[inputNum], this, 
this.inputSerializers[inputNum], getLocalStrategyComparator(inputNum),
                                        
this.config.getRelativeMemoryInput(inputNum), 
this.config.getFilehandlesInput(inputNum),
-                                       
this.config.getSpillingThresholdInput(inputNum));
+                                       
this.config.getSpillingThresholdInput(inputNum), 
this.getExecutionConfig().isObjectReuseEnabled());
                                // set the input to null such that it will be 
lazily fetched from the input strategy
                                this.inputs[inputNum] = null;
                                this.localStrategies[inputNum] = sorter;
@@ -959,7 +959,7 @@ public class RegularPactTask<S extends Function, OT> 
extends AbstractInvokable i
                                        (GroupCombineFunction) localStub, 
getMemoryManager(), getIOManager(), this.inputIterators[inputNum],
                                        this, this.inputSerializers[inputNum], 
getLocalStrategyComparator(inputNum),
                                        
this.config.getRelativeMemoryInput(inputNum), 
this.config.getFilehandlesInput(inputNum),
-                                       
this.config.getSpillingThresholdInput(inputNum));
+                                       
this.config.getSpillingThresholdInput(inputNum), 
this.getExecutionConfig().isObjectReuseEnabled());
                                
cSorter.setUdfConfiguration(this.config.getStubParameters());
 
                                // set the input to null such that it will be 
lazily fetched from the input strategy

http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
index f662a7e..855ee21 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
@@ -103,11 +103,11 @@ public class CombiningUnilateralSortMerger<E> extends 
UnilateralSortMerger<E> {
        public CombiningUnilateralSortMerger(GroupCombineFunction<E, E> 
combineStub, MemoryManager memoryManager, IOManager ioManager,
                        MutableObjectIterator<E> input, AbstractInvokable 
parentTask, 
                        TypeSerializerFactory<E> serializerFactory, 
TypeComparator<E> comparator,
-                       double memoryFraction, int maxNumFileHandles, float 
startSpillingFraction)
+                       double memoryFraction, int maxNumFileHandles, float 
startSpillingFraction, boolean objectReuseEnabled)
        throws IOException, MemoryAllocationException
        {
                this(combineStub, memoryManager, ioManager, input, parentTask, 
serializerFactory, comparator,
-                       memoryFraction, -1, maxNumFileHandles, 
startSpillingFraction);
+                       memoryFraction, -1, maxNumFileHandles, 
startSpillingFraction, objectReuseEnabled);
        }
        
        /**
@@ -136,11 +136,12 @@ public class CombiningUnilateralSortMerger<E> extends 
UnilateralSortMerger<E> {
                        MutableObjectIterator<E> input, AbstractInvokable 
parentTask, 
                        TypeSerializerFactory<E> serializerFactory, 
TypeComparator<E> comparator,
                        double memoryFraction, int numSortBuffers, int 
maxNumFileHandles,
-                       float startSpillingFraction)
+                       float startSpillingFraction, boolean objectReuseEnabled)
        throws IOException, MemoryAllocationException
        {
                super(memoryManager, ioManager, input, parentTask, 
serializerFactory, comparator,
-                       memoryFraction, numSortBuffers, maxNumFileHandles, 
startSpillingFraction, false, true);
+                       memoryFraction, numSortBuffers, maxNumFileHandles, 
startSpillingFraction, false, true,
+                       objectReuseEnabled);
                
                this.combineStub = combineStub;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/LargeRecordHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/LargeRecordHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/LargeRecordHandler.java
index e4a99fb..518f44c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/LargeRecordHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/LargeRecordHandler.java
@@ -254,7 +254,8 @@ public class LargeRecordHandler<T> {
                InputViewIterator<Tuple> keyIterator = new 
InputViewIterator<Tuple>(keysReader, keySerializer);
                
                keySorter = new UnilateralSortMerger<Tuple>(memManager, memory, 
ioManager, 
-                               keyIterator, memoryOwner, keySerializerFactory, 
keyComparator, 1, maxFilehandles, 1.0f, false);
+                               keyIterator, memoryOwner, keySerializerFactory, 
keyComparator, 1, maxFilehandles, 1.0f, false,
+                               this.executionConfig.isObjectReuseEnabled());
 
                // wait for the sorter to sort the keys
                MutableObjectIterator<Tuple> result;

http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeIterator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeIterator.java
index 9da429d..0792dbf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeIterator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeIterator.java
@@ -55,18 +55,43 @@ public class MergeIterator<E> implements 
MutableObjectIterator<E> {
 
        /**
         * Gets the next smallest element, with respect to the definition of 
order implied by
-        * the {@link TypeSerializer} provided to this iterator. This method 
does in fact not
-        * reuse the given element (which would here imply potentially 
expensive copying), 
-        * but always returns a new element.
+        * the {@link TypeSerializer} provided to this iterator.
         * 
-        * @param reuse Ignored.
-        * @return The next smallest element, or null, if the iterator is 
exhausted. 
+        * @param reuse Object that may be reused.
+        * @return The next element if the iterator has another element, null 
otherwise.
         * 
         * @see 
org.apache.flink.util.MutableObjectIterator#next(java.lang.Object)
         */
        @Override
        public E next(E reuse) throws IOException {
-               return next();
+               /* There are three ways to handle object reuse:
+                * 1) reuse and return the given object
+                * 2) ignore the given object and return a new object
+                * 3) exchange the given object for an existing object
+                *
+                * The first option is not available here as the return value 
has
+                * already been deserialized from the heap's top iterator. The 
second
+                * option avoids object reuse. The third option is implemented 
below
+                * by passing the given object to the heap's top iterator into 
which
+                * the next value will be deserialized.
+                */
+
+               if (this.heap.size() > 0) {
+                       // get the smallest element
+                       final HeadStream<E> top = this.heap.peek();
+                       E result = top.getHead();
+
+                       // read an element
+                       if (!top.nextHead(reuse)) {
+                               this.heap.poll();
+                       } else {
+                               this.heap.adjustTop();
+                       }
+                       return result;
+               }
+               else {
+                       return null;
+               }
        }
 
        /**
@@ -122,6 +147,16 @@ public class MergeIterator<E> implements 
MutableObjectIterator<E> {
                        return this.head;
                }
 
+               public boolean nextHead(E reuse) throws IOException {
+                       if ((this.head = this.iterator.next(reuse)) != null) {
+                               this.comparator.setReference(this.head);
+                               return true;
+                       }
+                       else {
+                               return false;
+                       }
+               }
+
                public boolean nextHead() throws IOException {
                        if ((this.head = this.iterator.next()) != null) {
                                this.comparator.setReference(this.head);

http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
index 13159d9..0fa24f2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
@@ -146,6 +146,11 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
         */
        protected volatile boolean closed;
 
+       /**
+        * Whether to reuse objects during deserialization.
+        */
+       protected final boolean objectReuseEnabled;
+
        // 
------------------------------------------------------------------------
        //                         Constructor & Shutdown
        // 
------------------------------------------------------------------------
@@ -153,22 +158,24 @@ public class UnilateralSortMerger<E> implements Sorter<E> 
{
        public UnilateralSortMerger(MemoryManager memoryManager, IOManager 
ioManager,
                        MutableObjectIterator<E> input, AbstractInvokable 
parentTask, 
                        TypeSerializerFactory<E> serializerFactory, 
TypeComparator<E> comparator,
-                       double memoryFraction, int maxNumFileHandles, float 
startSpillingFraction)
+                       double memoryFraction, int maxNumFileHandles, float 
startSpillingFraction,
+                       boolean objectReuseEnabled)
        throws IOException, MemoryAllocationException
        {
                this(memoryManager, ioManager, input, parentTask, 
serializerFactory, comparator,
-                       memoryFraction, -1, maxNumFileHandles, 
startSpillingFraction);
+                       memoryFraction, -1, maxNumFileHandles, 
startSpillingFraction, objectReuseEnabled);
        }
        
        public UnilateralSortMerger(MemoryManager memoryManager, IOManager 
ioManager,
                        MutableObjectIterator<E> input, AbstractInvokable 
parentTask, 
                        TypeSerializerFactory<E> serializerFactory, 
TypeComparator<E> comparator,
                        double memoryFraction, int numSortBuffers, int 
maxNumFileHandles,
-                       float startSpillingFraction)
+                       float startSpillingFraction, boolean objectReuseEnabled)
        throws IOException, MemoryAllocationException
        {
                this(memoryManager, ioManager, input, parentTask, 
serializerFactory, comparator,
-                       memoryFraction, numSortBuffers, maxNumFileHandles, 
startSpillingFraction, false, true);
+                       memoryFraction, numSortBuffers, maxNumFileHandles, 
startSpillingFraction, false, true,
+                       objectReuseEnabled);
        }
        
        public UnilateralSortMerger(MemoryManager memoryManager, 
List<MemorySegment> memory,
@@ -176,11 +183,12 @@ public class UnilateralSortMerger<E> implements Sorter<E> 
{
                        MutableObjectIterator<E> input, AbstractInvokable 
parentTask, 
                        TypeSerializerFactory<E> serializerFactory, 
TypeComparator<E> comparator,
                        int numSortBuffers, int maxNumFileHandles,
-                       float startSpillingFraction, boolean handleLargeRecords)
+                       float startSpillingFraction, boolean 
handleLargeRecords, boolean objectReuseEnabled)
        throws IOException
        {
                this(memoryManager, memory, ioManager, input, parentTask, 
serializerFactory, comparator,
-                       numSortBuffers, maxNumFileHandles, 
startSpillingFraction, false, handleLargeRecords);
+                       numSortBuffers, maxNumFileHandles, 
startSpillingFraction, false, handleLargeRecords,
+                       objectReuseEnabled);
        }
        
        protected UnilateralSortMerger(MemoryManager memoryManager,
@@ -188,12 +196,14 @@ public class UnilateralSortMerger<E> implements Sorter<E> 
{
                        MutableObjectIterator<E> input, AbstractInvokable 
parentTask, 
                        TypeSerializerFactory<E> serializerFactory, 
TypeComparator<E> comparator,
                        double memoryFraction, int numSortBuffers, int 
maxNumFileHandles,
-                       float startSpillingFraction, boolean noSpillingMemory, 
boolean handleLargeRecords)
+                       float startSpillingFraction, boolean noSpillingMemory, 
boolean handleLargeRecords,
+                       boolean objectReuseEnabled)
        throws IOException, MemoryAllocationException
        {
                this(memoryManager, memoryManager.allocatePages(parentTask, 
memoryManager.computeNumberOfPages(memoryFraction)),
                                ioManager, input, parentTask, 
serializerFactory, comparator,
-                               numSortBuffers, maxNumFileHandles, 
startSpillingFraction, noSpillingMemory, true);
+                               numSortBuffers, maxNumFileHandles, 
startSpillingFraction, noSpillingMemory, true,
+                               objectReuseEnabled);
        }
        
        protected UnilateralSortMerger(MemoryManager memoryManager, 
List<MemorySegment> memory,
@@ -201,7 +211,8 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
                        MutableObjectIterator<E> input, AbstractInvokable 
parentTask, 
                        TypeSerializerFactory<E> serializerFactory, 
TypeComparator<E> comparator,
                        int numSortBuffers, int maxNumFileHandles,
-                       float startSpillingFraction, boolean noSpillingMemory, 
boolean handleLargeRecords)
+                       float startSpillingFraction, boolean noSpillingMemory, 
boolean handleLargeRecords,
+                       boolean objectReuseEnabled)
        throws IOException
        {
                // sanity checks
@@ -216,7 +227,8 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
                }
                
                this.memoryManager = memoryManager;
-               
+               this.objectReuseEnabled = objectReuseEnabled;
+
                // adjust the memory quotas to the page size
                final int numPagesTotal = memory.size();
 
@@ -1595,10 +1607,17 @@ public class UnilateralSortMerger<E> implements 
Sorter<E> {
                                                                                
                                                                        
this.memManager.getPageSize());
 
                        // read the merged stream and write the data back
-                       final TypeSerializer<E> serializer = this.serializer;
-                       E rec = serializer.createInstance();
-                       while ((rec = mergeIterator.next(rec)) != null) {
-                               serializer.serialize(rec, output);
+                       if (objectReuseEnabled) {
+                               final TypeSerializer<E> serializer = 
this.serializer;
+                               E rec = serializer.createInstance();
+                               while ((rec = mergeIterator.next(rec)) != null) 
{
+                                       serializer.serialize(rec, output);
+                               }
+                       } else {
+                               E rec;
+                               while ((rec = mergeIterator.next()) != null) {
+                                       serializer.serialize(rec, output);
+                               }
                        }
                        output.close();
                        final int numBlocksWritten = output.getBlockCount();

http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
index d83e92e..f59c4a3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
@@ -134,7 +134,7 @@ public class ReduceTaskExternalITCase extends 
DriverTestBase<RichGroupReduceFunc
                                getMemoryManager(), getIOManager(), new 
UniformRecordGenerator(keyCnt, valCnt, false), 
                                getOwningNepheleTask(), 
RecordSerializerFactory.get(), this.comparator.duplicate(),
                                        this.perSortFractionMem,
-                                       2, 0.8f);
+                                       2, 0.8f, true);
                        addInput(sorter.getIterator());
                        
                        GroupReduceDriver<Record, Record> testTask = new 
GroupReduceDriver<Record, Record>();
@@ -180,7 +180,7 @@ public class ReduceTaskExternalITCase extends 
DriverTestBase<RichGroupReduceFunc
                                getMemoryManager(), getIOManager(), new 
UniformRecordGenerator(keyCnt, valCnt, false), 
                                getOwningNepheleTask(), 
RecordSerializerFactory.get(), this.comparator.duplicate(),
                                        this.perSortFractionMem,
-                                       2, 0.8f);
+                                       2, 0.8f, false);
                        addInput(sorter.getIterator());
                        
                        GroupReduceDriver<Record, Record> testTask = new 
GroupReduceDriver<Record, Record>();

http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
index 964f646..cc25c99 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
@@ -128,7 +128,7 @@ public class ReduceTaskTest extends 
DriverTestBase<RichGroupReduceFunction<Recor
                        sorter = new CombiningUnilateralSortMerger<Record>(new 
MockCombiningReduceStub(), 
                                getMemoryManager(), getIOManager(), new 
UniformRecordGenerator(keyCnt, valCnt, false), 
                                getOwningNepheleTask(), 
RecordSerializerFactory.get(), this.comparator.duplicate(), 
this.perSortFractionMem,
-                                       4, 0.8f);
+                                       4, 0.8f, true);
                        addInput(sorter.getIterator());
                        
                        GroupReduceDriver<Record, Record> testTask = new 
GroupReduceDriver<Record, Record>();

http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
index 75593b8..e1e2c0a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
@@ -115,7 +115,7 @@ public class CombiningUnilateralSortMergerITCase {
                
                Sorter<Record> merger = new 
CombiningUnilateralSortMerger<Record>(comb, 
                                this.memoryManager, this.ioManager, reader, 
this.parentTask, this.serializerFactory, this.comparator,
-                               0.25, 64, 0.7f);
+                               0.25, 64, 0.7f, false);
 
                final Record rec = new Record();
                rec.setField(1, new IntValue(1));
@@ -156,7 +156,7 @@ public class CombiningUnilateralSortMergerITCase {
                
                Sorter<Record> merger = new 
CombiningUnilateralSortMerger<Record>(comb, 
                                this.memoryManager, this.ioManager, reader, 
this.parentTask, this.serializerFactory, this.comparator,
-                               0.01, 64, 0.005f);
+                               0.01, 64, 0.005f, true);
 
                final Record rec = new Record();
                rec.setField(1, new IntValue(1));
@@ -205,7 +205,7 @@ public class CombiningUnilateralSortMergerITCase {
                
                Sorter<Record> merger = new 
CombiningUnilateralSortMerger<Record>(comb, 
                                this.memoryManager, this.ioManager, reader, 
this.parentTask, this.serializerFactory, this.comparator,
-                               0.25, 2, 0.7f);
+                               0.25, 2, 0.7f, false);
 
                // emit data
                LOG.debug("emitting data");

http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java
index 5aa9efb..9f0b3d9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java
@@ -119,7 +119,7 @@ public class ExternalSortITCase {
                        
                        Sorter<Record> merger = new 
UnilateralSortMerger<Record>(this.memoryManager, this.ioManager, 
                                source, this.parentTask, 
this.pactRecordSerializer, this.pactRecordComparator,
-                                       (double)64/78, 2, 0.9f);
+                                       (double)64/78, 2, 0.9f, true);
        
                        // emit data
                        LOG.debug("Reading and sorting data...");
@@ -172,7 +172,7 @@ public class ExternalSortITCase {
                        
                        Sorter<Record> merger = new 
UnilateralSortMerger<Record>(this.memoryManager, this.ioManager, 
                                        source, this.parentTask, 
this.pactRecordSerializer, this.pactRecordComparator,
-                                       (double)64/78, 10, 2, 0.9f);
+                                       (double)64/78, 10, 2, 0.9f, false);
        
                        // emit data
                        LOG.debug("Reading and sorting data...");
@@ -225,7 +225,7 @@ public class ExternalSortITCase {
                        
                        Sorter<Record> merger = new 
UnilateralSortMerger<Record>(this.memoryManager, this.ioManager, 
                                        source, this.parentTask, 
this.pactRecordSerializer, this.pactRecordComparator,
-                                       (double)16/78, 64, 0.7f);
+                                       (double)16/78, 64, 0.7f, true);
        
                        // emit data
                        LOG.debug("Reading and sorting data...");
@@ -281,7 +281,7 @@ public class ExternalSortITCase {
                        
                        Sorter<Record> merger = new 
UnilateralSortMerger<Record>(this.memoryManager, this.ioManager, 
                                        source, this.parentTask, 
this.pactRecordSerializer, this.pactRecordComparator,
-                                       (double)64/78, 16, 0.7f);
+                                       (double)64/78, 16, 0.7f, false);
                        
                        // emit data
                        LOG.debug("Emitting data...");
@@ -341,7 +341,7 @@ public class ExternalSortITCase {
                        LOG.debug("Initializing sortmerger...");
                        
                        Sorter<IntPair> merger = new 
UnilateralSortMerger<IntPair>(this.memoryManager, this.ioManager, 
-                                       generator, this.parentTask, 
serializerFactory, comparator, (double)64/78, 4, 0.7f);
+                                       generator, this.parentTask, 
serializerFactory, comparator, (double)64/78, 4, 0.7f, true);
        
                        // emit data
                        LOG.debug("Emitting data...");

http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java
index 951ce30..c806766 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java
@@ -128,7 +128,7 @@ public class ExternalSortLargeRecordsITCase {
                                        this.memoryManager, this.ioManager, 
                                        source, this.parentTask,
                                        new 
RuntimeSerializerFactory<Tuple2<Long, SomeMaybeLongValue>>(serializer, 
(Class<Tuple2<Long, SomeMaybeLongValue>>) (Class<?>) Tuple2.class),
-                                       comparator, 1.0, 1, 128, 0.7f);
+                                       comparator, 1.0, 1, 128, 0.7f, false);
                        
                        // check order
                        MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>> 
iterator = sorter.getIterator();
@@ -198,7 +198,7 @@ public class ExternalSortLargeRecordsITCase {
                                        this.memoryManager, this.ioManager, 
                                        source, this.parentTask,
                                        new 
RuntimeSerializerFactory<Tuple2<Long, SomeMaybeLongValue>>(serializer, 
(Class<Tuple2<Long, SomeMaybeLongValue>>) (Class<?>) Tuple2.class),
-                                       comparator, 1.0, 1, 128, 0.7f);
+                                       comparator, 1.0, 1, 128, 0.7f, true);
                        
                        // check order
                        MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>> 
iterator = sorter.getIterator();
@@ -283,7 +283,7 @@ public class ExternalSortLargeRecordsITCase {
                                        this.memoryManager, this.ioManager, 
                                        source, this.parentTask,
                                        new 
RuntimeSerializerFactory<Tuple2<Long, SmallOrMediumOrLargeValue>>(serializer, 
(Class<Tuple2<Long, SmallOrMediumOrLargeValue>>) (Class<?>) Tuple2.class),
-                                       comparator, 1.0, 1, 128, 0.7f);
+                                       comparator, 1.0, 1, 128, 0.7f, false);
                        
                        // check order
                        MutableObjectIterator<Tuple2<Long, 
SmallOrMediumOrLargeValue>> iterator = sorter.getIterator();
@@ -354,7 +354,7 @@ public class ExternalSortLargeRecordsITCase {
                                        this.memoryManager, this.ioManager, 
                                        source, this.parentTask,
                                        new 
RuntimeSerializerFactory<Tuple2<Long, SmallOrMediumOrLargeValue>>(serializer, 
(Class<Tuple2<Long, SmallOrMediumOrLargeValue>>) (Class<?>) Tuple2.class),
-                                       comparator, 1.0, 1, 128, 0.7f);
+                                       comparator, 1.0, 1, 128, 0.7f, true);
                        
                        // check order
                        MutableObjectIterator<Tuple2<Long, 
SmallOrMediumOrLargeValue>> iterator = sorter.getIterator();

http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
index ece20ff..5136aea 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
@@ -147,7 +147,8 @@ public class BinaryOperatorTestBase<S extends Function, IN, 
OUT> extends TestLog
                                comp,
                                this.perSortFractionMem,
                                32,
-                               0.8f
+                               0.8f,
+                               false
                );
                this.sorters.add(sorter);
                this.inputs.add(null);

http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
index 1737349..116fdec 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
@@ -142,7 +142,7 @@ public class DriverTestBase<S extends Function> extends 
TestLogger implements Pa
        public void addInputSorted(MutableObjectIterator<Record> input, 
RecordComparator comp) throws Exception {
                UnilateralSortMerger<Record> sorter = new 
UnilateralSortMerger<Record>(
                                this.memManager, this.ioManager, input, 
this.owner, RecordSerializerFactory.get(), comp,
-                               this.perSortFractionMem, 32, 0.8f);
+                               this.perSortFractionMem, 32, 0.8f, true);
                this.sorters.add(sorter);
                this.inputs.add(null);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
index 924a16b..e2b2430 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
@@ -146,7 +146,7 @@ public class UnaryOperatorTestBase<S extends Function, IN, 
OUT> extends TestLogg
                                this.memManager, this.ioManager, input, 
this.owner,
                                this.<IN>getInputSerializer(0),
                                comp,
-                               this.perSortFractionMem, 32, 0.8f);
+                               this.perSortFractionMem, 32, 0.8f, false);
        }
        
        public void addDriverComparator(TypeComparator<IN> comparator) {

http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
index 1060e55..f112ff8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
@@ -133,11 +133,11 @@ public class HashVsSortMiniBenchmark {
                        
                        final UnilateralSortMerger<Record> sorter1 = new 
UnilateralSortMerger<Record>(
                                        this.memoryManager, this.ioManager, 
input1, this.parentTask, this.serializer1, 
-                                       this.comparator1.duplicate(), 
MEMORY_FOR_SORTER, 128, 0.8f);
+                                       this.comparator1.duplicate(), 
MEMORY_FOR_SORTER, 128, 0.8f, true);
                        
                        final UnilateralSortMerger<Record> sorter2 = new 
UnilateralSortMerger<Record>(
                                        this.memoryManager, this.ioManager, 
input2, this.parentTask, this.serializer2, 
-                                       this.comparator2.duplicate(), 
MEMORY_FOR_SORTER, 128, 0.8f);
+                                       this.comparator2.duplicate(), 
MEMORY_FOR_SORTER, 128, 0.8f, true);
                        
                        final MutableObjectIterator<Record> sortedInput1 = 
sorter1.getIterator();
                        final MutableObjectIterator<Record> sortedInput2 = 
sorter2.getIterator();

http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/DataSinkProcessor.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/DataSinkProcessor.java
 
b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/DataSinkProcessor.java
index 01dbbc5..8011d21 100644
--- 
a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/DataSinkProcessor.java
+++ 
b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/DataSinkProcessor.java
@@ -146,7 +146,7 @@ public class DataSinkProcessor<IT> extends 
AbstractLogicalIOProcessor {
                                                                
this.runtimeEnvironment.getIOManager(),
                                                                this.reader, 
this.invokable, this.inputTypeSerializerFactory, compFact.createComparator(),
                                                                
this.config.getRelativeMemoryInput(0), this.config.getFilehandlesInput(0),
-                                                               
this.config.getSpillingThresholdInput(0));
+                                                               
this.config.getSpillingThresholdInput(0), false);
 
                                                this.localStrategy = sorter;
                                                this.input = 
sorter.getIterator();

http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java
 
b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java
index a745177..b7cbfb4 100644
--- 
a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java
+++ 
b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java
@@ -378,7 +378,7 @@ public class TezTask<S extends Function,OT>  implements 
PactTaskContext<S, OT> {
                                        UnilateralSortMerger<?> sorter = new 
UnilateralSortMerger(getMemoryManager(), getIOManager(),
                                                        
this.inputIterators[inputNum], this.invokable, this.inputSerializers[inputNum], 
getLocalStrategyComparator(inputNum),
                                                        
this.config.getRelativeMemoryInput(inputNum), 
this.config.getFilehandlesInput(inputNum),
-                                                       
this.config.getSpillingThresholdInput(inputNum));
+                                                       
this.config.getSpillingThresholdInput(inputNum), 
this.executionConfig.isObjectReuseEnabled());
                                        // set the input to null such that it 
will be lazily fetched from the input strategy
                                        this.inputs[inputNum] = null;
                                        this.localStrategies[inputNum] = sorter;
@@ -414,7 +414,7 @@ public class TezTask<S extends Function,OT>  implements 
PactTaskContext<S, OT> {
                                                        (GroupCombineFunction) 
localStub, getMemoryManager(), getIOManager(), this.inputIterators[inputNum],
                                                        this.invokable, 
this.inputSerializers[inputNum], getLocalStrategyComparator(inputNum),
                                                        
this.config.getRelativeMemoryInput(inputNum), 
this.config.getFilehandlesInput(inputNum),
-                                                       
this.config.getSpillingThresholdInput(inputNum));
+                                                       
this.config.getSpillingThresholdInput(inputNum), 
this.executionConfig.isObjectReuseEnabled());
                                        
cSorter.setUdfConfiguration(this.config.getStubParameters());
 
                                        // set the input to null such that it 
will be lazily fetched from the input strategy

http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringSorting.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringSorting.java
 
b/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringSorting.java
index c11b93c..c9bd56b 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringSorting.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringSorting.java
@@ -91,7 +91,7 @@ public class MassiveStringSorting {
                                MutableObjectIterator<String> inputIterator = 
new StringReaderMutableObjectIterator(reader);
                                
                                sorter = new UnilateralSortMerger<String>(mm, 
ioMan, inputIterator, new DummyInvokable(),
-                                               new 
RuntimeSerializerFactory<String>(serializer, String.class), comparator, 1.0, 4, 
0.8f);
+                                               new 
RuntimeSerializerFactory<String>(serializer, String.class), comparator, 1.0, 4, 
0.8f, false);
 
                                MutableObjectIterator<String> sortedData = 
sorter.getIterator();
                                
@@ -184,7 +184,7 @@ public class MassiveStringSorting {
                                MutableObjectIterator<Tuple2<String, String[]>> 
inputIterator = new StringTupleReaderMutableObjectIterator(reader);
                                
                                sorter = new 
UnilateralSortMerger<Tuple2<String, String[]>>(mm, ioMan, inputIterator, new 
DummyInvokable(),
-                                               new 
RuntimeSerializerFactory<Tuple2<String, String[]>>(serializer, 
(Class<Tuple2<String, String[]>>) (Class<?>) Tuple2.class), comparator, 1.0, 4, 
0.8f);
+                                               new 
RuntimeSerializerFactory<Tuple2<String, String[]>>(serializer, 
(Class<Tuple2<String, String[]>>) (Class<?>) Tuple2.class), comparator, 1.0, 4, 
0.8f, false);
 
                                
                                

http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringValueSorting.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringValueSorting.java
 
b/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringValueSorting.java
index 7a484e7..9a016cc 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringValueSorting.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringValueSorting.java
@@ -91,7 +91,7 @@ public class MassiveStringValueSorting {
                                MutableObjectIterator<StringValue> 
inputIterator = new StringValueReaderMutableObjectIterator(reader);
                                
                                sorter = new 
UnilateralSortMerger<StringValue>(mm, ioMan, inputIterator, new 
DummyInvokable(),
-                                               new 
RuntimeSerializerFactory<StringValue>(serializer, StringValue.class), 
comparator, 1.0, 4, 0.8f);
+                                               new 
RuntimeSerializerFactory<StringValue>(serializer, StringValue.class), 
comparator, 1.0, 4, 0.8f, true);
 
                                MutableObjectIterator<StringValue> sortedData = 
sorter.getIterator();
                                
@@ -187,7 +187,7 @@ public class MassiveStringValueSorting {
                                MutableObjectIterator<Tuple2<StringValue, 
StringValue[]>> inputIterator = new 
StringValueTupleReaderMutableObjectIterator(reader);
                                
                                sorter = new 
UnilateralSortMerger<Tuple2<StringValue, StringValue[]>>(mm, ioMan, 
inputIterator, new DummyInvokable(),
-                                               new 
RuntimeSerializerFactory<Tuple2<StringValue, StringValue[]>>(serializer, 
(Class<Tuple2<StringValue, StringValue[]>>) (Class<?>) Tuple2.class), 
comparator, 1.0, 4, 0.8f);
+                                               new 
RuntimeSerializerFactory<Tuple2<StringValue, StringValue[]>>(serializer, 
(Class<Tuple2<StringValue, StringValue[]>>) (Class<?>) Tuple2.class), 
comparator, 1.0, 4, 0.8f, false);
 
                                
                                

http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-tests/src/test/scala/org/apache/flink/api/scala/manual/MassiveCaseClassSortingITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/manual/MassiveCaseClassSortingITCase.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/manual/MassiveCaseClassSortingITCase.scala
index 7385fa2..a38a19b 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/manual/MassiveCaseClassSortingITCase.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/manual/MassiveCaseClassSortingITCase.scala
@@ -98,7 +98,7 @@ class MassiveCaseClassSortingITCase {
         sorter = new UnilateralSortMerger[StringTuple](mm, ioMan, 
inputIterator,
               new DummyInvokable(), 
               new RuntimeSerializerFactory[StringTuple](serializer, 
classOf[StringTuple]),
-              comparator, 1.0, 4, 0.8f)
+              comparator, 1.0, 4, 0.8f, false)
             
         val sortedData = sorter.getIterator
         reader.close()

Reply via email to