[hotfix] Fix Mutable Object window aggregator/Disable Object Copy This fixes the aggregators to make copies of the objects so that it works with window operators that are not mutable-object safe.
This also disables object copy in WindowOperator and NonKeyedWindowOperator. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/712c868e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/712c868e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/712c868e Branch: refs/heads/release-0.10 Commit: 712c868eb77643ae07542d5a073365e0862a5e97 Parents: 45ab0eb Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Authored: Fri Oct 23 13:08:35 2015 +0200 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Fri Oct 23 15:46:28 2015 +0200 ---------------------------------------------------------------------- .../aggregation/AggregationFunction.java | 8 +-- .../aggregation/ComparableAggregator.java | 51 ++++++++++---------- .../functions/aggregation/SumAggregator.java | 35 +++++++++++--- .../functions/windowing/FoldWindowFunction.java | 12 ++--- .../windowing/NonKeyedWindowOperator.java | 5 +- .../operators/windowing/WindowOperator.java | 5 +- 6 files changed, 66 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/712c868e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/AggregationFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/AggregationFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/AggregationFunction.java index 23cca90..ed39103 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/AggregationFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/AggregationFunction.java @@ -22,13 +22,7 @@ import org.apache.flink.api.common.functions.RichReduceFunction; public abstract class AggregationFunction<T> extends RichReduceFunction<T> { private static final long serialVersionUID = 1L; - public int position; - - public AggregationFunction(int pos) { - this.position = pos; - } - - public static enum AggregationType { + public enum AggregationType { SUM, MIN, MAX, MINBY, MAXBY, } http://git-wip-us.apache.org/repos/asf/flink/blob/712c868e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java index e5501a0..e70e30a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java @@ -25,35 +25,39 @@ public class ComparableAggregator<T> extends AggregationFunction<T> { private static final long serialVersionUID = 1L; - public Comparator comparator; - public boolean byAggregate; - public boolean first; - FieldAccessor<T, Object> fieldAccessor; + private Comparator comparator; + private boolean byAggregate; + private boolean first; + private final FieldAccessor<T, Object> fieldAccessor; - private ComparableAggregator(int pos, AggregationType aggregationType, boolean first) { - super(pos); + private ComparableAggregator(AggregationType aggregationType, FieldAccessor<T, Object> fieldAccessor, boolean first) { this.comparator = Comparator.getForAggregation(aggregationType); this.byAggregate = (aggregationType == AggregationType.MAXBY) || (aggregationType == AggregationType.MINBY); this.first = first; + this.fieldAccessor = fieldAccessor; } - public ComparableAggregator(int positionToAggregate, TypeInformation<T> typeInfo, AggregationType aggregationType - , ExecutionConfig config) { + public ComparableAggregator(int positionToAggregate, + TypeInformation<T> typeInfo, + AggregationType aggregationType, + ExecutionConfig config) { this(positionToAggregate, typeInfo, aggregationType, false, config); } - public ComparableAggregator(int positionToAggregate, TypeInformation<T> typeInfo, AggregationType aggregationType, - boolean first, ExecutionConfig config) { - this(positionToAggregate, aggregationType, first); - this.fieldAccessor = FieldAccessor.create(positionToAggregate, typeInfo, config); - this.first = first; + public ComparableAggregator(int positionToAggregate, + TypeInformation<T> typeInfo, + AggregationType aggregationType, + boolean first, + ExecutionConfig config) { + this(aggregationType, FieldAccessor.create(positionToAggregate, typeInfo, config), first); } public ComparableAggregator(String field, - TypeInformation<T> typeInfo, AggregationType aggregationType, boolean first, ExecutionConfig config) { - this(0, aggregationType, first); - this.fieldAccessor = FieldAccessor.create(field, typeInfo, config); - this.first = first; + TypeInformation<T> typeInfo, + AggregationType aggregationType, + boolean first, + ExecutionConfig config) { + this(aggregationType, FieldAccessor.create(field, typeInfo, config), first); } @@ -66,16 +70,13 @@ public class ComparableAggregator<T> extends AggregationFunction<T> { int c = comparator.isExtremal(o1, o2); if (byAggregate) { - if (c == 1) { - return value1; - } - if (first) { - if (c == 0) { - return value1; - } + // if they are the same we choose based on whether we want to first or last + // element with the min/max. + if (c == 0) { + return first ? value1 : value2; } - return value2; + return c == 1 ? value1 : value2; } else { if (c == 0) { http://git-wip-us.apache.org/repos/asf/flink/blob/712c868e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumAggregator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumAggregator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumAggregator.java index b045233..8c9cf7a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumAggregator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumAggregator.java @@ -19,30 +19,53 @@ package org.apache.flink.streaming.api.functions.aggregation; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.streaming.util.FieldAccessor; public class SumAggregator<T> extends AggregationFunction<T> { private static final long serialVersionUID = 1L; - FieldAccessor<T, Object> fieldAccessor; - SumFunction adder; + private final FieldAccessor<T, Object> fieldAccessor; + private final SumFunction adder; + private final TypeSerializer<T> serializer; + private final boolean isTuple; public SumAggregator(int pos, TypeInformation<T> typeInfo, ExecutionConfig config) { - super(pos); fieldAccessor = FieldAccessor.create(pos, typeInfo, config); adder = SumFunction.getForClass(fieldAccessor.getFieldType().getTypeClass()); + if (typeInfo instanceof TupleTypeInfo) { + isTuple = true; + serializer = null; + } else { + isTuple = false; + this.serializer = typeInfo.createSerializer(config); + } } public SumAggregator(String field, TypeInformation<T> typeInfo, ExecutionConfig config) { - super(0); fieldAccessor = FieldAccessor.create(field, typeInfo, config); adder = SumFunction.getForClass(fieldAccessor.getFieldType().getTypeClass()); + if (typeInfo instanceof TupleTypeInfo) { + isTuple = true; + serializer = null; + } else { + isTuple = false; + this.serializer = typeInfo.createSerializer(config); + } } - @SuppressWarnings("unchecked") @Override + @SuppressWarnings("unchecked") public T reduce(T value1, T value2) throws Exception { - return fieldAccessor.set(value1, adder.add(fieldAccessor.get(value1), fieldAccessor.get(value2))); + if (isTuple) { + Tuple result = ((Tuple)value1).copy(); + return fieldAccessor.set((T) result, adder.add(fieldAccessor.get(value1), fieldAccessor.get(value2))); + } else { + T result = serializer.copy(value1); + return fieldAccessor.set(result, adder.add(fieldAccessor.get(value1), fieldAccessor.get(value2))); + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/712c868e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldWindowFunction.java index 04d2ac7..1d29e36 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldWindowFunction.java @@ -22,7 +22,6 @@ import org.apache.flink.api.common.functions.FoldFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.operators.translation.WrappingFunction; -import org.apache.flink.configuration.Configuration; import org.apache.flink.core.memory.InputViewDataInputStreamWrapper; import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper; import org.apache.flink.streaming.api.operators.OutputTypeConfigurable; @@ -34,6 +33,7 @@ import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.io.ObjectInputStream; public class FoldWindowFunction<K, W extends Window, T, R> extends WrappingFunction<FoldFunction<T, R>> @@ -49,9 +49,8 @@ public class FoldWindowFunction<K, W extends Window, T, R> this.initialValue = initialValue; } - @Override - public void open(Configuration configuration) throws Exception { - super.open(configuration); + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); if (serializedInitialValue == null) { throw new RuntimeException("No initial value was serialized for the fold " + @@ -59,10 +58,11 @@ public class FoldWindowFunction<K, W extends Window, T, R> } ByteArrayInputStream bais = new ByteArrayInputStream(serializedInitialValue); - InputViewDataInputStreamWrapper in = new InputViewDataInputStreamWrapper( + InputViewDataInputStreamWrapper inStream = new InputViewDataInputStreamWrapper( new DataInputStream(bais) ); - initialValue = outSerializer.deserialize(in); + + initialValue = outSerializer.deserialize(inStream); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/712c868e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java index a002b23..03e8c4c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java @@ -231,9 +231,8 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window> context = new Context(window, windowBuffer); windows.put(window, context); } - StreamRecord<IN> elementCopy = new StreamRecord<>(inputSerializer.copy(element.getValue()), element.getTimestamp()); - context.windowBuffer.storeElement(elementCopy); - Trigger.TriggerResult triggerResult = trigger.onElement(elementCopy.getValue(), elementCopy.getTimestamp(), window, context); + context.windowBuffer.storeElement(element); + Trigger.TriggerResult triggerResult = trigger.onElement(element.getValue(), element.getTimestamp(), window, context); processTriggerResult(triggerResult, window); } } http://git-wip-us.apache.org/repos/asf/flink/blob/712c868e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java index a80f971..30ce477 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java @@ -276,9 +276,8 @@ public class WindowOperator<K, IN, OUT, W extends Window> context = new Context(key, window, windowBuffer); keyWindows.put(window, context); } - StreamRecord<IN> elementCopy = new StreamRecord<>(inputSerializer.copy(element.getValue()), element.getTimestamp()); - context.windowBuffer.storeElement(elementCopy); - Trigger.TriggerResult triggerResult = trigger.onElement(elementCopy.getValue(), elementCopy.getTimestamp(), window, context); + context.windowBuffer.storeElement(element); + Trigger.TriggerResult triggerResult = trigger.onElement(element.getValue(), element.getTimestamp(), window, context); processTriggerResult(triggerResult, key, window); } }