[streaming] Added immutability for window and filter operators

Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/446cc125
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/446cc125
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/446cc125

Branch: refs/heads/release-0.8
Commit: 446cc1253554f924664d7fe753f3cee46ee87c13
Parents: 2ac4985
Author: Gyula Fora <[email protected]>
Authored: Thu Dec 18 14:52:15 2014 +0100
Committer: mbalassi <[email protected]>
Committed: Thu Dec 18 18:53:18 2014 +0100

----------------------------------------------------------------------
 .../flink/streaming/api/invokable/StreamInvokable.java    |  7 +++++++
 .../streaming/api/invokable/operator/FilterInvokable.java |  2 +-
 .../invokable/operator/WindowGroupReduceInvokable.java    | 10 +++++++++-
 .../api/invokable/operator/WindowReduceInvokable.java     |  2 +-
 4 files changed, 18 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/446cc125/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
index d19d7ad..87ad4e0 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
@@ -23,6 +23,7 @@ import java.io.Serializable;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
@@ -49,6 +50,7 @@ public abstract class StreamInvokable<IN, OUT> implements 
Serializable {
 
        protected MutableObjectIterator<StreamRecord<IN>> recordIterator;
        protected StreamRecordSerializer<IN> inSerializer;
+       protected TypeSerializer<IN> objectSerializer;
        protected StreamRecord<IN> nextRecord;
        protected boolean isMutable;
 
@@ -72,6 +74,7 @@ public abstract class StreamInvokable<IN, OUT> implements 
Serializable {
                this.inSerializer = taskContext.getInputSerializer(0);
                if (this.inSerializer != null) {
                        this.nextRecord = inSerializer.createInstance();
+                       this.objectSerializer = 
inSerializer.getObjectSerializer();
                }
                this.taskContext = taskContext;
        }
@@ -141,4 +144,8 @@ public abstract class StreamInvokable<IN, OUT> implements 
Serializable {
        public void setRuntimeContext(RuntimeContext t) {
                FunctionUtils.setFunctionRuntimeContext(userFunction, t);
        }
+
+       protected IN copy(IN record) {
+               return objectSerializer.copy(record);
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/446cc125/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
index 796196d..48b8ad0 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
@@ -44,6 +44,6 @@ public class FilterInvokable<IN> extends StreamInvokable<IN, 
IN> {
 
        @Override
        protected void callUserFunction() throws Exception {
-               collect = filterFunction.filter(nextRecord.getObject());
+               collect = filterFunction.filter(copy(nextRecord.getObject()));
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/446cc125/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java
index 9d0b584..b3fdfe8 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java
@@ -37,7 +37,15 @@ public class WindowGroupReduceInvokable<IN, OUT> extends 
WindowInvokable<IN, OUT
 
        @Override
        protected void callUserFunction() throws Exception {
-               reducer.reduce(buffer, collector);
+               reducer.reduce(copyBuffer(), collector);
+       }
+
+       public LinkedList<IN> copyBuffer() {
+               LinkedList<IN> copy = new LinkedList<IN>();
+               for (IN element : buffer) {
+                       copy.add(copy(element));
+               }
+               return copy;
        }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/446cc125/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
index b6456e1..ed246c8 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
@@ -49,7 +49,7 @@ public class WindowReduceInvokable<IN> extends 
WindowInvokable<IN, IN> {
                while (reducedIterator.hasNext()) {
                        IN next = reducedIterator.next();
                        if (next != null) {
-                               reduced = reducer.reduce(reduced, next);
+                               reduced = reducer.reduce(copy(reduced), 
copy(next));
                        }
                }
                if (reduced != null) {

Reply via email to