[streaming] Added immutability for window and filter operators
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/446cc125 Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/446cc125 Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/446cc125 Branch: refs/heads/release-0.8 Commit: 446cc1253554f924664d7fe753f3cee46ee87c13 Parents: 2ac4985 Author: Gyula Fora <[email protected]> Authored: Thu Dec 18 14:52:15 2014 +0100 Committer: mbalassi <[email protected]> Committed: Thu Dec 18 18:53:18 2014 +0100 ---------------------------------------------------------------------- .../flink/streaming/api/invokable/StreamInvokable.java | 7 +++++++ .../streaming/api/invokable/operator/FilterInvokable.java | 2 +- .../invokable/operator/WindowGroupReduceInvokable.java | 10 +++++++++- .../api/invokable/operator/WindowReduceInvokable.java | 2 +- 4 files changed, 18 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/446cc125/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java index d19d7ad..87ad4e0 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java @@ -23,6 +23,7 @@ import java.io.Serializable; import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.functions.util.FunctionUtils; +import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.streamrecord.StreamRecord; import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer; @@ -49,6 +50,7 @@ public abstract class StreamInvokable<IN, OUT> implements Serializable { protected MutableObjectIterator<StreamRecord<IN>> recordIterator; protected StreamRecordSerializer<IN> inSerializer; + protected TypeSerializer<IN> objectSerializer; protected StreamRecord<IN> nextRecord; protected boolean isMutable; @@ -72,6 +74,7 @@ public abstract class StreamInvokable<IN, OUT> implements Serializable { this.inSerializer = taskContext.getInputSerializer(0); if (this.inSerializer != null) { this.nextRecord = inSerializer.createInstance(); + this.objectSerializer = inSerializer.getObjectSerializer(); } this.taskContext = taskContext; } @@ -141,4 +144,8 @@ public abstract class StreamInvokable<IN, OUT> implements Serializable { public void setRuntimeContext(RuntimeContext t) { FunctionUtils.setFunctionRuntimeContext(userFunction, t); } + + protected IN copy(IN record) { + return objectSerializer.copy(record); + } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/446cc125/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java index 796196d..48b8ad0 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java @@ -44,6 +44,6 @@ public class FilterInvokable<IN> extends StreamInvokable<IN, IN> { @Override protected void callUserFunction() throws Exception { - collect = filterFunction.filter(nextRecord.getObject()); + collect = filterFunction.filter(copy(nextRecord.getObject())); } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/446cc125/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java index 9d0b584..b3fdfe8 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java @@ -37,7 +37,15 @@ public class WindowGroupReduceInvokable<IN, OUT> extends WindowInvokable<IN, OUT @Override protected void callUserFunction() throws Exception { - reducer.reduce(buffer, collector); + reducer.reduce(copyBuffer(), collector); + } + + public LinkedList<IN> copyBuffer() { + LinkedList<IN> copy = new LinkedList<IN>(); + for (IN element : buffer) { + copy.add(copy(element)); + } + return copy; } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/446cc125/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java index b6456e1..ed246c8 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java @@ -49,7 +49,7 @@ public class WindowReduceInvokable<IN> extends WindowInvokable<IN, IN> { while (reducedIterator.hasNext()) { IN next = reducedIterator.next(); if (next != null) { - reduced = reducer.reduce(reduced, next); + reduced = reducer.reduce(copy(reduced), copy(next)); } } if (reduced != null) {
