Repository: flink Updated Branches: refs/heads/release-0.10 712c868eb -> 15d3f10c1
[FLINK-2895] Duplicate immutable object creation Operators defer object creation when object reuse is disabled. This closes #1288 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/15d3f10c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/15d3f10c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/15d3f10c Branch: refs/heads/release-0.10 Commit: 15d3f10c1a52b2b1cc8fff793648da4a56e72dcc Parents: 8ec828c Author: Greg Hogan <c...@greghogan.com> Authored: Thu Oct 22 09:31:09 2015 -0400 Committer: Fabian Hueske <fhue...@apache.org> Committed: Fri Oct 23 16:05:26 2015 +0200 ---------------------------------------------------------------------- .../org/apache/flink/runtime/operators/AllReduceDriver.java | 8 +++----- .../java/org/apache/flink/runtime/operators/NoOpDriver.java | 4 +--- .../org/apache/flink/runtime/operators/ReduceDriver.java | 4 ++-- 3 files changed, 6 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/15d3f10c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java index 06f22c5..1d35fdb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java @@ -108,7 +108,6 @@ public class AllReduceDriver<T> implements Driver<ReduceFunction<T>, T> { final MutableObjectIterator<T> input = this.input; final TypeSerializer<T> serializer = this.serializer; - if (objectReuseEnabled) { T val1 = serializer.createInstance(); @@ -123,14 +122,13 @@ public class AllReduceDriver<T> implements Driver<ReduceFunction<T>, T> { this.taskContext.getOutputCollector().collect(val1); } else { - T val1 = serializer.createInstance(); - - if ((val1 = input.next(val1)) == null) { + T val1; + if ((val1 = input.next()) == null) { return; } T val2; - while (running && (val2 = input.next(serializer.createInstance())) != null) { + while (running && (val2 = input.next()) != null) { val1 = stub.reduce(val1, val2); } http://git-wip-us.apache.org/repos/asf/flink/blob/15d3f10c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java index fcd2716..428cfe4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java @@ -21,7 +21,6 @@ package org.apache.flink.runtime.operators; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.AbstractRichFunction; -import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.util.Collector; import org.apache.flink.util.MutableObjectIterator; import org.slf4j.Logger; @@ -87,8 +86,7 @@ public class NoOpDriver<T> implements Driver<AbstractRichFunction, T> { } } else { T record; - TypeSerializer<T> serializer = this.taskContext.<T>getInputSerializer(0).getSerializer(); - while (this.running && ((record = input.next(serializer.createInstance())) != null)) { + while (this.running && ((record = input.next()) != null)) { output.collect(record); } http://git-wip-us.apache.org/repos/asf/flink/blob/15d3f10c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java index 970441e..6a7c42c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java @@ -148,7 +148,7 @@ public class ReduceDriver<T> implements Driver<ReduceFunction<T>, T> { } } } else { - T value = input.next(serializer.createInstance()); + T value = input.next(); // iterate over key groups while (this.running && value != null) { @@ -156,7 +156,7 @@ public class ReduceDriver<T> implements Driver<ReduceFunction<T>, T> { T res = value; // iterate within a key group - while ((value = input.next(serializer.createInstance())) != null) { + while ((value = input.next()) != null) { if (comparator.equalToReference(value)) { // same group, reduce res = function.reduce(res, value);