[FLINK-2895] Duplicate immutable object creation

Operators defer object creation when object reuse is disabled.

This closes #1288


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bbb75c59
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bbb75c59
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bbb75c59

Branch: refs/heads/master
Commit: bbb75c599aba1fffa3f52b45af77ee9c7ece3ca0
Parents: d2e4a27
Author: Greg Hogan <c...@greghogan.com>
Authored: Thu Oct 22 09:31:09 2015 -0400
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Fri Oct 23 13:06:29 2015 +0200

----------------------------------------------------------------------
 .../org/apache/flink/runtime/operators/AllReduceDriver.java  | 8 +++-----
 .../java/org/apache/flink/runtime/operators/NoOpDriver.java  | 4 +---
 .../org/apache/flink/runtime/operators/ReduceDriver.java     | 4 ++--
 3 files changed, 6 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bbb75c59/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java
index 06f22c5..1d35fdb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java
@@ -108,7 +108,6 @@ public class AllReduceDriver<T> implements 
Driver<ReduceFunction<T>, T> {
                final MutableObjectIterator<T> input = this.input;
                final TypeSerializer<T> serializer = this.serializer;
 
-
                if (objectReuseEnabled) {
                        T val1 = serializer.createInstance();
 
@@ -123,14 +122,13 @@ public class AllReduceDriver<T> implements 
Driver<ReduceFunction<T>, T> {
 
                        this.taskContext.getOutputCollector().collect(val1);
                } else {
-                       T val1 = serializer.createInstance();
-
-                       if ((val1 = input.next(val1)) == null) {
+                       T val1;
+                       if ((val1 = input.next()) == null) {
                                return;
                        }
 
                        T val2;
-                       while (running && (val2 = 
input.next(serializer.createInstance())) != null) {
+                       while (running && (val2 = input.next()) != null) {
                                val1 = stub.reduce(val1, val2);
                        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bbb75c59/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java
index fcd2716..428cfe4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.operators;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 import org.slf4j.Logger;
@@ -87,8 +86,7 @@ public class NoOpDriver<T> implements 
Driver<AbstractRichFunction, T> {
                        }
                } else {
                        T record;
-                       TypeSerializer<T> serializer = 
this.taskContext.<T>getInputSerializer(0).getSerializer();
-                       while (this.running && ((record = 
input.next(serializer.createInstance())) != null)) {
+                       while (this.running && ((record = input.next()) != 
null)) {
                                output.collect(record);
                        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bbb75c59/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java
index 970441e..6a7c42c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java
@@ -148,7 +148,7 @@ public class ReduceDriver<T> implements 
Driver<ReduceFunction<T>, T> {
                                }
                        }
                } else {
-                       T value = input.next(serializer.createInstance());
+                       T value = input.next();
 
                        // iterate over key groups
                        while (this.running && value != null) {
@@ -156,7 +156,7 @@ public class ReduceDriver<T> implements 
Driver<ReduceFunction<T>, T> {
                                T res = value;
 
                                // iterate within a key group
-                               while ((value = 
input.next(serializer.createInstance())) != null) {
+                               while ((value = input.next()) != null) {
                                        if (comparator.equalToReference(value)) 
{
                                                // same group, reduce
                                                res = function.reduce(res, 
value);

Reply via email to