Repository: flink Updated Branches: refs/heads/master dfbf83ff6 -> e7a060947
[FLINK-5967] Add support for AggregatingState in user functions This also adds documentation for Aggregating state in user functions. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e7a06094 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e7a06094 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e7a06094 Branch: refs/heads/master Commit: e7a06094792e2a6f79c723dc1d70544601c41299 Parents: dfbf83f Author: Bowen Li <[email protected]> Authored: Mon Oct 23 10:43:51 2017 +0800 Committer: Aljoscha Krettek <[email protected]> Committed: Thu Oct 26 13:13:38 2017 +0200 ---------------------------------------------------------------------- docs/dev/stream/state/state.md | 10 ++-- .../api/common/functions/RuntimeContext.java | 50 +++++++++++++++++++- .../util/AbstractRuntimeUDFContext.java | 10 ++++ .../flink/api/common/state/KeyedStateStore.java | 43 +++++++++++++++++ .../runtime/state/DefaultKeyedStateStore.java | 13 +++++ .../api/functions/async/RichAsyncFunction.java | 7 +++ .../api/operators/StreamingRuntimeContext.java | 9 ++++ .../operators/windowing/WindowOperator.java | 16 +++++++ .../functions/async/RichAsyncFunctionTest.java | 30 ++++++++++++ .../operators/StreamingRuntimeContextTest.java | 31 ++++++++++++ 10 files changed, 214 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e7a06094/docs/dev/stream/state/state.md ---------------------------------------------------------------------- diff --git a/docs/dev/stream/state/state.md b/docs/dev/stream/state/state.md index 0f80a9d..c9a68ab 100644 --- a/docs/dev/stream/state/state.md +++ b/docs/dev/stream/state/state.md @@ -101,6 +101,11 @@ be retrieved using `Iterable<T> get()`. added to the state. The interface is the same as for `ListState` but elements added using `add(T)` are reduced to an aggregate using a specified `ReduceFunction`. +* `AggregatingState<IN, OUT>`: This keeps a single value that represents the aggregation of all values +added to the state. Contrary to `ReducingState`, the aggregate type may be different from the type +of elements that are added to the state. The interface is the same as for `ListState` but elements +added using `add(IN)` are aggregated using a specified `AggregateFunction`. + * `FoldingState<T, ACC>`: This keeps a single value that represents the aggregation of all values added to the state. Contrary to `ReducingState`, the aggregate type may be different from the type of elements that are added to the state. The interface is the same as for `ListState` but elements @@ -114,9 +119,7 @@ views for mappings, keys and values can be retrieved using `entries()`, `keys()` All types of state also have a method `clear()` that clears the state for the currently active key, i.e. the key of the input element. -<span class="label label-danger">Attention</span> `FoldingState` will be deprecated in one of -the next versions of Flink and will be completely removed in the future. A more general -alternative will be provided. +<span class="label label-danger">Attention</span> `FoldingState` and `FoldingStateDescriptor` have been deprecated in Flink 1.4 and will be completely removed in the future. A more general alternative will be provided. It is important to keep in mind that these state objects are only used for interfacing with state. The state is not necessarily stored inside but might reside on disk or somewhere else. @@ -139,6 +142,7 @@ is available in a `RichFunction` has these methods for accessing state: * `ValueState<T> getState(ValueStateDescriptor<T>)` * `ReducingState<T> getReducingState(ReducingStateDescriptor<T>)` * `ListState<T> getListState(ListStateDescriptor<T>)` +* `AggregatingState<IN, OUT> getAggregatingState(AggregatingState<IN, OUT>)` * `FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC>)` * `MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)` http://git-wip-us.apache.org/repos/asf/flink/blob/e7a06094/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java index 2e8097f..7a89e10 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java @@ -27,6 +27,8 @@ import org.apache.flink.api.common.accumulators.Histogram; import org.apache.flink.api.common.accumulators.IntCounter; import org.apache.flink.api.common.accumulators.LongCounter; import org.apache.flink.api.common.cache.DistributedCache; +import org.apache.flink.api.common.state.AggregatingState; +import org.apache.flink.api.common.state.AggregatingStateDescriptor; import org.apache.flink.api.common.state.FoldingState; import org.apache.flink.api.common.state.FoldingStateDescriptor; import org.apache.flink.api.common.state.ListState; @@ -359,6 +361,49 @@ public interface RuntimeContext { <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties); /** + * Gets a handle to the system's key/value aggregating state. This state is similar to the state + * accessed via {@link #getState(ValueStateDescriptor)}, but is optimized for state that + * aggregates values with different types. + * + * <p>This state is only accessible if the function is executed on a KeyedStream. + * + * <pre>{@code + * DataStream<MyType> stream = ...; + * KeyedStream<MyType> keyedStream = stream.keyBy("id"); + * AggregateFunction<...> aggregateFunction = ... + * + * keyedStream.map(new RichMapFunction<MyType, List<MyType>>() { + * + * private AggregatingState<MyType, Long> state; + * + * public void open(Configuration cfg) { + * state = getRuntimeContext().getAggregatingState( + * new AggregatingStateDescriptor<>("sum", aggregateFunction, Long.class)); + * } + * + * public Tuple2<MyType, Long> map(MyType value) { + * state.add(value); + * return new Tuple2<>(value, state.get()); + * } + * }); + * + * }</pre> + * + * @param stateProperties The descriptor defining the properties of the stats. + * + * @param <IN> The type of the values that are added to the state. + * @param <ACC> The type of the accumulator (intermediate aggregation state). + * @param <OUT> The type of the values that are returned from the state. + * + * @return The partitioned state object. + * + * @throws UnsupportedOperationException Thrown, if no partitioned state is available for the + * function (function is not part of a KeyedStream). + */ + @PublicEvolving + <IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateProperties); + + /** * Gets a handle to the system's key/value folding state. This state is similar to the state * accessed via {@link #getState(ValueStateDescriptor)}, but is optimized for state that * aggregates values with different types. @@ -374,7 +419,7 @@ public interface RuntimeContext { * private FoldingState<MyType, Long> state; * * public void open(Configuration cfg) { - * state = getRuntimeContext().getReducingState( + * state = getRuntimeContext().getFoldingState( * new FoldingStateDescriptor<>("sum", 0L, (a, b) -> a.count() + b, Long.class)); * } * @@ -388,7 +433,8 @@ public interface RuntimeContext { * * @param stateProperties The descriptor defining the properties of the stats. * - * @param <T> The type of value stored in the state. + * @param <T> Type of the values folded in the other state + * @param <ACC> Type of the value in the state * * @return The partitioned state object. * http://git-wip-us.apache.org/repos/asf/flink/blob/e7a06094/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java index bcd0ad0..d0d7e52 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java @@ -29,6 +29,8 @@ import org.apache.flink.api.common.accumulators.IntCounter; import org.apache.flink.api.common.accumulators.LongCounter; import org.apache.flink.api.common.cache.DistributedCache; import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.state.AggregatingState; +import org.apache.flink.api.common.state.AggregatingStateDescriptor; import org.apache.flink.api.common.state.FoldingState; import org.apache.flink.api.common.state.FoldingStateDescriptor; import org.apache.flink.api.common.state.ListState; @@ -217,6 +219,14 @@ public abstract class AbstractRuntimeUDFContext implements RuntimeContext { @Override @PublicEvolving + public <IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateProperties) { + throw new UnsupportedOperationException( + "This state is only accessible by functions executed on a KeyedStream"); + } + + @Override + @PublicEvolving + @Deprecated public <T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties) { throw new UnsupportedOperationException( "This state is only accessible by functions executed on a KeyedStream"); http://git-wip-us.apache.org/repos/asf/flink/blob/e7a06094/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java b/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java index e0a044f..7f45214 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java @@ -167,6 +167,49 @@ public interface KeyedStateStore { * <pre>{@code * DataStream<MyType> stream = ...; * KeyedStream<MyType> keyedStream = stream.keyBy("id"); + * AggregateFunction<...> aggregateFunction = ... + * + * keyedStream.map(new RichMapFunction<MyType, List<MyType>>() { + * + * private AggregatingState<MyType, Long> state; + * + * public void open(Configuration cfg) { + * state = getRuntimeContext().getAggregatingState( + * new AggregatingStateDescriptor<>("sum", aggregateFunction, Long.class)); + * } + * + * public Tuple2<MyType, Long> map(MyType value) { + * state.add(value); + * return new Tuple2<>(value, state.get()); + * } + * }); + * + * }</pre> + * + * @param stateProperties The descriptor defining the properties of the stats. + * + * @param <IN> The type of the values that are added to the state. + * @param <ACC> The type of the accumulator (intermediate aggregation state). + * @param <OUT> The type of the values that are returned from the state. + * + * @return The partitioned state object. + * + * @throws UnsupportedOperationException Thrown, if no partitioned state is available for the + * function (function is not part of a KeyedStream). + */ + @PublicEvolving + <IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateProperties); + + /** + * Gets a handle to the system's key/value folding state. This state is similar to the state + * accessed via {@link #getState(ValueStateDescriptor)}, but is optimized for state that + * aggregates values with different types. + * + * <p>This state is only accessible if the function is executed on a KeyedStream. + * + * <pre>{@code + * DataStream<MyType> stream = ...; + * KeyedStream<MyType> keyedStream = stream.keyBy("id"); * * keyedStream.map(new RichMapFunction<MyType, List<MyType>>() { * http://git-wip-us.apache.org/repos/asf/flink/blob/e7a06094/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java index a32cebd..e0bb7b7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java @@ -20,6 +20,8 @@ package org.apache.flink.runtime.state; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.state.AggregatingState; +import org.apache.flink.api.common.state.AggregatingStateDescriptor; import org.apache.flink.api.common.state.FoldingState; import org.apache.flink.api.common.state.FoldingStateDescriptor; import org.apache.flink.api.common.state.KeyedStateStore; @@ -85,6 +87,17 @@ public class DefaultKeyedStateStore implements KeyedStateStore { } @Override + public <IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateProperties) { + requireNonNull(stateProperties, "The state properties must not be null"); + try { + stateProperties.initializeSerializerUnlessSet(executionConfig); + return getPartitionedState(stateProperties); + } catch (Exception e) { + throw new RuntimeException("Error while getting state", e); + } + } + + @Override public <T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties) { requireNonNull(stateProperties, "The state properties must not be null"); try { http://git-wip-us.apache.org/repos/asf/flink/blob/e7a06094/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java index b6ce862..8d9ac8f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java @@ -32,6 +32,8 @@ import org.apache.flink.api.common.functions.BroadcastVariableInitializer; import org.apache.flink.api.common.functions.IterationRuntimeContext; import org.apache.flink.api.common.functions.RichFunction; import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.state.AggregatingState; +import org.apache.flink.api.common.state.AggregatingStateDescriptor; import org.apache.flink.api.common.state.FoldingState; import org.apache.flink.api.common.state.FoldingStateDescriptor; import org.apache.flink.api.common.state.ListState; @@ -172,6 +174,11 @@ public abstract class RichAsyncFunction<IN, OUT> extends AbstractRichFunction im } @Override + public <IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateProperties) { + throw new UnsupportedOperationException("State is not supported in rich async functions."); + } + + @Override public <T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties) { throw new UnsupportedOperationException("State is not supported in rich async functions."); } http://git-wip-us.apache.org/repos/asf/flink/blob/e7a06094/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java index bd3d500..cb40214 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java @@ -22,6 +22,8 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.functions.BroadcastVariableInitializer; import org.apache.flink.api.common.functions.util.AbstractRuntimeUDFContext; +import org.apache.flink.api.common.state.AggregatingState; +import org.apache.flink.api.common.state.AggregatingStateDescriptor; import org.apache.flink.api.common.state.FoldingState; import org.apache.flink.api.common.state.FoldingStateDescriptor; import org.apache.flink.api.common.state.KeyedStateStore; @@ -133,6 +135,13 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext { } @Override + public <IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateProperties) { + KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStore(stateProperties); + stateProperties.initializeSerializerUnlessSet(getExecutionConfig()); + return keyedStateStore.getAggregatingState(stateProperties); + } + + @Override public <T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties) { KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStore(stateProperties); stateProperties.initializeSerializerUnlessSet(getExecutionConfig()); http://git-wip-us.apache.org/repos/asf/flink/blob/e7a06094/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java index a921625..cf606bc 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java @@ -20,6 +20,8 @@ package org.apache.flink.streaming.runtime.operators.windowing; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.state.AggregatingState; +import org.apache.flink.api.common.state.AggregatingStateDescriptor; import org.apache.flink.api.common.state.AppendingState; import org.apache.flink.api.common.state.FoldingState; import org.apache.flink.api.common.state.FoldingStateDescriptor; @@ -679,6 +681,11 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> } @Override + public <IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateProperties) { + throw new UnsupportedOperationException("Per-window state is not allowed when using merging windows."); + } + + @Override public <T, A> FoldingState<T, A> getFoldingState(FoldingStateDescriptor<T, A> stateProperties) { throw new UnsupportedOperationException("Per-window state is not allowed when using merging windows."); } @@ -722,6 +729,15 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> } @Override + public <IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateProperties) { + try { + return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties); + } catch (Exception e) { + throw new RuntimeException("Could not retrieve state", e); + } + } + + @Override public <T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties) { try { return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties); http://git-wip-us.apache.org/repos/asf/flink/blob/e7a06094/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java index aba37df..d6f5e61 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java @@ -20,11 +20,13 @@ package org.apache.flink.streaming.api.functions.async; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.functions.BroadcastVariableInitializer; import org.apache.flink.api.common.functions.FoldFunction; import org.apache.flink.api.common.functions.IterationRuntimeContext; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.state.AggregatingStateDescriptor; import org.apache.flink.api.common.state.FoldingStateDescriptor; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.MapStateDescriptor; @@ -165,6 +167,34 @@ public class RichAsyncFunctionTest { } catch (UnsupportedOperationException e) { // expected } + + try { + runtimeContext.getAggregatingState(new AggregatingStateDescriptor<>("foobar", new AggregateFunction<Integer, Integer, Integer>() { + + @Override + public Integer createAccumulator() { + return null; + } + + @Override + public Integer add(Integer value, Integer accumulator) { + return null; + } + + @Override + public Integer getResult(Integer accumulator) { + return null; + } + + @Override + public Integer merge(Integer a, Integer b) { + return null; + } + }, Integer.class)); + } catch (UnsupportedOperationException e) { + // expected + } + try { runtimeContext.getFoldingState(new FoldingStateDescriptor<>("foobar", 0, new FoldFunction<Integer, Integer>() { @Override http://git-wip-us.apache.org/repos/asf/flink/blob/e7a06094/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java index 2e24f4c..1072eec 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java @@ -22,8 +22,10 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.functions.FoldFunction; import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.state.AggregatingStateDescriptor; import org.apache.flink.api.common.state.FoldingStateDescriptor; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; @@ -126,6 +128,35 @@ public class StreamingRuntimeContextTest { } @Test + public void testAggregatingStateInstantiation() throws Exception { + + final ExecutionConfig config = new ExecutionConfig(); + config.registerKryoType(Path.class); + + final AtomicReference<Object> descriptorCapture = new AtomicReference<>(); + + StreamingRuntimeContext context = new StreamingRuntimeContext( + createDescriptorCapturingMockOp(descriptorCapture, config), + createMockEnvironment(), + Collections.<String, Accumulator<?, ?>>emptyMap()); + + @SuppressWarnings("unchecked") + AggregateFunction<String, TaskInfo, String> aggregate = (AggregateFunction<String, TaskInfo, String>) mock(AggregateFunction.class); + + AggregatingStateDescriptor<String, TaskInfo, String> descr = + new AggregatingStateDescriptor<>("name", aggregate, TaskInfo.class); + + context.getAggregatingState(descr); + + AggregatingStateDescriptor<?, ?, ?> descrIntercepted = (AggregatingStateDescriptor<?, ?, ?>) descriptorCapture.get(); + TypeSerializer<?> serializer = descrIntercepted.getSerializer(); + + // check that the Path class is really registered, i.e., the execution config was applied + assertTrue(serializer instanceof KryoSerializer); + assertTrue(((KryoSerializer<?>) serializer).getKryo().getRegistration(Path.class).getId() > 0); + } + + @Test public void testFoldingStateInstantiation() throws Exception { final ExecutionConfig config = new ExecutionConfig();
