[hotfix] Fix Mutable Object window aggregator/Disable Object Copy

This fixes the aggregators to make copies of the objects so that it
works with window operators that are not mutable-object safe.

This also disables object copy in WindowOperator and
NonKeyedWindowOperator.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/712c868e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/712c868e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/712c868e

Branch: refs/heads/release-0.10
Commit: 712c868eb77643ae07542d5a073365e0862a5e97
Parents: 45ab0eb
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Authored: Fri Oct 23 13:08:35 2015 +0200
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Fri Oct 23 15:46:28 2015 +0200

----------------------------------------------------------------------
 .../aggregation/AggregationFunction.java        |  8 +--
 .../aggregation/ComparableAggregator.java       | 51 ++++++++++----------
 .../functions/aggregation/SumAggregator.java    | 35 +++++++++++---
 .../functions/windowing/FoldWindowFunction.java | 12 ++---
 .../windowing/NonKeyedWindowOperator.java       |  5 +-
 .../operators/windowing/WindowOperator.java     |  5 +-
 6 files changed, 66 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/712c868e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/AggregationFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/AggregationFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/AggregationFunction.java
index 23cca90..ed39103 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/AggregationFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/AggregationFunction.java
@@ -22,13 +22,7 @@ import 
org.apache.flink.api.common.functions.RichReduceFunction;
 public abstract class AggregationFunction<T> extends RichReduceFunction<T> {
        private static final long serialVersionUID = 1L;
 
-       public int position;
-
-       public AggregationFunction(int pos) {
-               this.position = pos;
-       }
-
-       public static enum AggregationType {
+       public enum AggregationType {
                SUM, MIN, MAX, MINBY, MAXBY,
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/712c868e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java
index e5501a0..e70e30a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java
@@ -25,35 +25,39 @@ public class ComparableAggregator<T> extends 
AggregationFunction<T> {
 
        private static final long serialVersionUID = 1L;
 
-       public Comparator comparator;
-       public boolean byAggregate;
-       public boolean first;
-       FieldAccessor<T, Object> fieldAccessor;
+       private Comparator comparator;
+       private boolean byAggregate;
+       private boolean first;
+       private final FieldAccessor<T, Object> fieldAccessor;
        
-       private ComparableAggregator(int pos, AggregationType aggregationType, 
boolean first) {
-               super(pos);
+       private ComparableAggregator(AggregationType aggregationType, 
FieldAccessor<T, Object> fieldAccessor, boolean first) {
                this.comparator = Comparator.getForAggregation(aggregationType);
                this.byAggregate = (aggregationType == AggregationType.MAXBY) 
|| (aggregationType == AggregationType.MINBY);
                this.first = first;
+               this.fieldAccessor = fieldAccessor;
        }
 
-       public ComparableAggregator(int positionToAggregate, TypeInformation<T> 
typeInfo, AggregationType aggregationType
-                       , ExecutionConfig config) {
+       public ComparableAggregator(int positionToAggregate,
+                       TypeInformation<T> typeInfo,
+                       AggregationType aggregationType,
+                       ExecutionConfig config) {
                this(positionToAggregate, typeInfo, aggregationType, false, 
config);
        }
 
-       public ComparableAggregator(int positionToAggregate, TypeInformation<T> 
typeInfo, AggregationType aggregationType,
-                                                               boolean first, 
ExecutionConfig config) {
-               this(positionToAggregate, aggregationType, first);
-               this.fieldAccessor = FieldAccessor.create(positionToAggregate, 
typeInfo, config);
-               this.first = first;
+       public ComparableAggregator(int positionToAggregate,
+                       TypeInformation<T> typeInfo,
+                       AggregationType aggregationType,
+                       boolean first,
+                       ExecutionConfig config) {
+               this(aggregationType, FieldAccessor.create(positionToAggregate, 
typeInfo, config), first);
        }
 
        public ComparableAggregator(String field,
-                       TypeInformation<T> typeInfo, AggregationType 
aggregationType, boolean first, ExecutionConfig config) {
-               this(0, aggregationType, first);
-               this.fieldAccessor = FieldAccessor.create(field, typeInfo, 
config);
-               this.first = first;
+                       TypeInformation<T> typeInfo,
+                       AggregationType aggregationType,
+                       boolean first,
+                       ExecutionConfig config) {
+               this(aggregationType, FieldAccessor.create(field, typeInfo, 
config), first);
        }
 
 
@@ -66,16 +70,13 @@ public class ComparableAggregator<T> extends 
AggregationFunction<T> {
                int c = comparator.isExtremal(o1, o2);
 
                if (byAggregate) {
-                       if (c == 1) {
-                               return value1;
-                       }
-                       if (first) {
-                               if (c == 0) {
-                                       return value1;
-                               }
+                       // if they are the same we choose based on whether we 
want to first or last
+                       // element with the min/max.
+                       if (c == 0) {
+                               return first ? value1 : value2;
                        }
 
-                       return value2;
+                       return c == 1 ? value1 : value2;
 
                } else {
                        if (c == 0) {

http://git-wip-us.apache.org/repos/asf/flink/blob/712c868e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumAggregator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumAggregator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumAggregator.java
index b045233..8c9cf7a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumAggregator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumAggregator.java
@@ -19,30 +19,53 @@ package 
org.apache.flink.streaming.api.functions.aggregation;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.streaming.util.FieldAccessor;
 
 public class SumAggregator<T> extends AggregationFunction<T> {
 
        private static final long serialVersionUID = 1L;
 
-       FieldAccessor<T, Object> fieldAccessor;
-       SumFunction adder;
+       private final FieldAccessor<T, Object> fieldAccessor;
+       private final SumFunction adder;
+       private final TypeSerializer<T> serializer;
+       private final boolean isTuple;
 
        public SumAggregator(int pos, TypeInformation<T> typeInfo, 
ExecutionConfig config) {
-               super(pos);
                fieldAccessor = FieldAccessor.create(pos, typeInfo, config);
                adder = 
SumFunction.getForClass(fieldAccessor.getFieldType().getTypeClass());
+               if (typeInfo instanceof TupleTypeInfo) {
+                       isTuple = true;
+                       serializer = null;
+               } else {
+                       isTuple = false;
+                       this.serializer = typeInfo.createSerializer(config);
+               }
        }
 
        public SumAggregator(String field, TypeInformation<T> typeInfo, 
ExecutionConfig config) {
-               super(0);
                fieldAccessor = FieldAccessor.create(field, typeInfo, config);
                adder = 
SumFunction.getForClass(fieldAccessor.getFieldType().getTypeClass());
+               if (typeInfo instanceof TupleTypeInfo) {
+                       isTuple = true;
+                       serializer = null;
+               } else {
+                       isTuple = false;
+                       this.serializer = typeInfo.createSerializer(config);
+               }
        }
 
-       @SuppressWarnings("unchecked")
        @Override
+       @SuppressWarnings("unchecked")
        public T reduce(T value1, T value2) throws Exception {
-               return fieldAccessor.set(value1, 
adder.add(fieldAccessor.get(value1), fieldAccessor.get(value2)));
+               if (isTuple) {
+                       Tuple result = ((Tuple)value1).copy();
+                       return fieldAccessor.set((T) result, 
adder.add(fieldAccessor.get(value1), fieldAccessor.get(value2)));
+               } else {
+                       T result = serializer.copy(value1);
+                       return fieldAccessor.set(result, 
adder.add(fieldAccessor.get(value1), fieldAccessor.get(value2)));
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/712c868e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldWindowFunction.java
index 04d2ac7..1d29e36 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldWindowFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldWindowFunction.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.operators.translation.WrappingFunction;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
 import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
 import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
@@ -34,6 +33,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.ObjectInputStream;
 
 public class FoldWindowFunction<K, W extends Window, T, R>
                extends WrappingFunction<FoldFunction<T, R>>
@@ -49,9 +49,8 @@ public class FoldWindowFunction<K, W extends Window, T, R>
                this.initialValue = initialValue;
        }
 
-       @Override
-       public void open(Configuration configuration) throws Exception {
-               super.open(configuration);
+       private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+               in.defaultReadObject();
 
                if (serializedInitialValue == null) {
                        throw new RuntimeException("No initial value was 
serialized for the fold " +
@@ -59,10 +58,11 @@ public class FoldWindowFunction<K, W extends Window, T, R>
                }
 
                ByteArrayInputStream bais = new 
ByteArrayInputStream(serializedInitialValue);
-               InputViewDataInputStreamWrapper in = new 
InputViewDataInputStreamWrapper(
+               InputViewDataInputStreamWrapper inStream = new 
InputViewDataInputStreamWrapper(
                                new DataInputStream(bais)
                );
-               initialValue = outSerializer.deserialize(in);
+
+               initialValue = outSerializer.deserialize(inStream);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/712c868e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
index a002b23..03e8c4c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
@@ -231,9 +231,8 @@ public class NonKeyedWindowOperator<IN, OUT, W extends 
Window>
                                context = new Context(window, windowBuffer);
                                windows.put(window, context);
                        }
-                       StreamRecord<IN> elementCopy = new 
StreamRecord<>(inputSerializer.copy(element.getValue()), 
element.getTimestamp());
-                       context.windowBuffer.storeElement(elementCopy);
-                       Trigger.TriggerResult triggerResult = 
trigger.onElement(elementCopy.getValue(), elementCopy.getTimestamp(), window, 
context);
+                       context.windowBuffer.storeElement(element);
+                       Trigger.TriggerResult triggerResult = 
trigger.onElement(element.getValue(), element.getTimestamp(), window, context);
                        processTriggerResult(triggerResult, window);
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/712c868e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index a80f971..30ce477 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -276,9 +276,8 @@ public class WindowOperator<K, IN, OUT, W extends Window>
                                context = new Context(key, window, 
windowBuffer);
                                keyWindows.put(window, context);
                        }
-                       StreamRecord<IN> elementCopy = new 
StreamRecord<>(inputSerializer.copy(element.getValue()), 
element.getTimestamp());
-                       context.windowBuffer.storeElement(elementCopy);
-                       Trigger.TriggerResult triggerResult = 
trigger.onElement(elementCopy.getValue(), elementCopy.getTimestamp(), window, 
context);
+                       context.windowBuffer.storeElement(element);
+                       Trigger.TriggerResult triggerResult = 
trigger.onElement(element.getValue(), element.getTimestamp(), window, context);
                        processTriggerResult(triggerResult, key, window);
                }
        }

Reply via email to