Repository: flink
Updated Branches:
  refs/heads/master dfbf83ff6 -> e7a060947


[FLINK-5967] Add support for AggregatingState in user functions

This also adds documentation for Aggregating state in user functions.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e7a06094
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e7a06094
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e7a06094

Branch: refs/heads/master
Commit: e7a06094792e2a6f79c723dc1d70544601c41299
Parents: dfbf83f
Author: Bowen Li <[email protected]>
Authored: Mon Oct 23 10:43:51 2017 +0800
Committer: Aljoscha Krettek <[email protected]>
Committed: Thu Oct 26 13:13:38 2017 +0200

----------------------------------------------------------------------
 docs/dev/stream/state/state.md                  | 10 ++--
 .../api/common/functions/RuntimeContext.java    | 50 +++++++++++++++++++-
 .../util/AbstractRuntimeUDFContext.java         | 10 ++++
 .../flink/api/common/state/KeyedStateStore.java | 43 +++++++++++++++++
 .../runtime/state/DefaultKeyedStateStore.java   | 13 +++++
 .../api/functions/async/RichAsyncFunction.java  |  7 +++
 .../api/operators/StreamingRuntimeContext.java  |  9 ++++
 .../operators/windowing/WindowOperator.java     | 16 +++++++
 .../functions/async/RichAsyncFunctionTest.java  | 30 ++++++++++++
 .../operators/StreamingRuntimeContextTest.java  | 31 ++++++++++++
 10 files changed, 214 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e7a06094/docs/dev/stream/state/state.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/state/state.md b/docs/dev/stream/state/state.md
index 0f80a9d..c9a68ab 100644
--- a/docs/dev/stream/state/state.md
+++ b/docs/dev/stream/state/state.md
@@ -101,6 +101,11 @@ be retrieved using `Iterable<T> get()`.
 added to the state. The interface is the same as for `ListState` but elements 
added using
 `add(T)` are reduced to an aggregate using a specified `ReduceFunction`.
 
+* `AggregatingState<IN, OUT>`: This keeps a single value that represents the 
aggregation of all values
+added to the state. Contrary to `ReducingState`, the aggregate type may be 
different from the type
+of elements that are added to the state. The interface is the same as for 
`ListState` but elements
+added using `add(IN)` are aggregated using a specified `AggregateFunction`.
+
 * `FoldingState<T, ACC>`: This keeps a single value that represents the 
aggregation of all values
 added to the state. Contrary to `ReducingState`, the aggregate type may be 
different from the type
 of elements that are added to the state. The interface is the same as for 
`ListState` but elements
@@ -114,9 +119,7 @@ views for mappings, keys and values can be retrieved using 
`entries()`, `keys()`
 All types of state also have a method `clear()` that clears the state for the 
currently
 active key, i.e. the key of the input element.
 
-<span class="label label-danger">Attention</span> `FoldingState` will be 
deprecated in one of
-the next versions of Flink and will be completely removed in the future. A 
more general
-alternative will be provided.
+<span class="label label-danger">Attention</span> `FoldingState` and 
`FoldingStateDescriptor` have been deprecated in Flink 1.4 and will be 
completely removed in the future. A more general alternative will be provided.
 
 It is important to keep in mind that these state objects are only used for 
interfacing
 with state. The state is not necessarily stored inside but might reside on 
disk or somewhere else.
@@ -139,6 +142,7 @@ is available in a `RichFunction` has these methods for 
accessing state:
 * `ValueState<T> getState(ValueStateDescriptor<T>)`
 * `ReducingState<T> getReducingState(ReducingStateDescriptor<T>)`
 * `ListState<T> getListState(ListStateDescriptor<T>)`
+* `AggregatingState<IN, OUT> getAggregatingState(AggregatingState<IN, OUT>)`
 * `FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC>)`
 * `MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)`
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e7a06094/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
index 2e8097f..7a89e10 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
@@ -27,6 +27,8 @@ import org.apache.flink.api.common.accumulators.Histogram;
 import org.apache.flink.api.common.accumulators.IntCounter;
 import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.common.state.AggregatingState;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
 import org.apache.flink.api.common.state.FoldingState;
 import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListState;
@@ -359,6 +361,49 @@ public interface RuntimeContext {
        <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> 
stateProperties);
 
        /**
+        * Gets a handle to the system's key/value aggregating state. This 
state is similar to the state
+        * accessed via {@link #getState(ValueStateDescriptor)}, but is 
optimized for state that
+        * aggregates values with different types.
+        *
+        * <p>This state is only accessible if the function is executed on a 
KeyedStream.
+        *
+        * <pre>{@code
+        * DataStream<MyType> stream = ...;
+        * KeyedStream<MyType> keyedStream = stream.keyBy("id");
+        * AggregateFunction<...> aggregateFunction = ...
+        *
+        * keyedStream.map(new RichMapFunction<MyType, List<MyType>>() {
+        *
+        *     private AggregatingState<MyType, Long> state;
+        *
+        *     public void open(Configuration cfg) {
+        *         state = getRuntimeContext().getAggregatingState(
+        *                 new AggregatingStateDescriptor<>("sum", 
aggregateFunction, Long.class));
+        *     }
+        *
+        *     public Tuple2<MyType, Long> map(MyType value) {
+        *         state.add(value);
+        *         return new Tuple2<>(value, state.get());
+        *     }
+        * });
+        *
+        * }</pre>
+        *
+        * @param stateProperties The descriptor defining the properties of the 
stats.
+        *
+        * @param <IN> The type of the values that are added to the state.
+        * @param <ACC> The type of the accumulator (intermediate aggregation 
state).
+        * @param <OUT> The type of the values that are returned from the state.
+        *
+        * @return The partitioned state object.
+        *
+        * @throws UnsupportedOperationException Thrown, if no partitioned 
state is available for the
+        *                                       function (function is not part 
of a KeyedStream).
+        */
+       @PublicEvolving
+       <IN, ACC, OUT> AggregatingState<IN, OUT> 
getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateProperties);
+
+       /**
         * Gets a handle to the system's key/value folding state. This state is 
similar to the state
         * accessed via {@link #getState(ValueStateDescriptor)}, but is 
optimized for state that
         * aggregates values with different types.
@@ -374,7 +419,7 @@ public interface RuntimeContext {
         *     private FoldingState<MyType, Long> state;
         *
         *     public void open(Configuration cfg) {
-        *         state = getRuntimeContext().getReducingState(
+        *         state = getRuntimeContext().getFoldingState(
         *                 new FoldingStateDescriptor<>("sum", 0L, (a, b) -> 
a.count() + b, Long.class));
         *     }
         *
@@ -388,7 +433,8 @@ public interface RuntimeContext {
         *
         * @param stateProperties The descriptor defining the properties of the 
stats.
         *
-        * @param <T> The type of value stored in the state.
+        * @param <T> Type of the values folded in the other state
+        * @param <ACC> Type of the value in the state
         *
         * @return The partitioned state object.
         *

http://git-wip-us.apache.org/repos/asf/flink/blob/e7a06094/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
index bcd0ad0..d0d7e52 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
@@ -29,6 +29,8 @@ import org.apache.flink.api.common.accumulators.IntCounter;
 import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.AggregatingState;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
 import org.apache.flink.api.common.state.FoldingState;
 import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListState;
@@ -217,6 +219,14 @@ public abstract class AbstractRuntimeUDFContext implements 
RuntimeContext {
 
        @Override
        @PublicEvolving
+       public <IN, ACC, OUT> AggregatingState<IN, OUT> 
getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateProperties) {
+               throw new UnsupportedOperationException(
+                               "This state is only accessible by functions 
executed on a KeyedStream");
+       }
+
+       @Override
+       @PublicEvolving
+       @Deprecated
        public <T, ACC> FoldingState<T, ACC> 
getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties) {
                throw new UnsupportedOperationException(
                                "This state is only accessible by functions 
executed on a KeyedStream");

http://git-wip-us.apache.org/repos/asf/flink/blob/e7a06094/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java
index e0a044f..7f45214 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java
@@ -167,6 +167,49 @@ public interface KeyedStateStore {
         * <pre>{@code
         * DataStream<MyType> stream = ...;
         * KeyedStream<MyType> keyedStream = stream.keyBy("id");
+        * AggregateFunction<...> aggregateFunction = ...
+        *
+        * keyedStream.map(new RichMapFunction<MyType, List<MyType>>() {
+        *
+        *     private AggregatingState<MyType, Long> state;
+        *
+        *     public void open(Configuration cfg) {
+        *         state = getRuntimeContext().getAggregatingState(
+        *                 new AggregatingStateDescriptor<>("sum", 
aggregateFunction, Long.class));
+        *     }
+        *
+        *     public Tuple2<MyType, Long> map(MyType value) {
+        *         state.add(value);
+        *         return new Tuple2<>(value, state.get());
+        *     }
+        * });
+        *
+        * }</pre>
+        *
+        * @param stateProperties The descriptor defining the properties of the 
stats.
+        *
+        * @param <IN> The type of the values that are added to the state.
+        * @param <ACC> The type of the accumulator (intermediate aggregation 
state).
+        * @param <OUT> The type of the values that are returned from the state.
+        *
+        * @return The partitioned state object.
+        *
+        * @throws UnsupportedOperationException Thrown, if no partitioned 
state is available for the
+        *                                       function (function is not part 
of a KeyedStream).
+        */
+       @PublicEvolving
+       <IN, ACC, OUT> AggregatingState<IN, OUT> 
getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateProperties);
+
+       /**
+        * Gets a handle to the system's key/value folding state. This state is 
similar to the state
+        * accessed via {@link #getState(ValueStateDescriptor)}, but is 
optimized for state that
+        * aggregates values with different types.
+        *
+        * <p>This state is only accessible if the function is executed on a 
KeyedStream.
+        *
+        * <pre>{@code
+        * DataStream<MyType> stream = ...;
+        * KeyedStream<MyType> keyedStream = stream.keyBy("id");
         *
         * keyedStream.map(new RichMapFunction<MyType, List<MyType>>() {
         *

http://git-wip-us.apache.org/repos/asf/flink/blob/e7a06094/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java
index a32cebd..e0bb7b7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.AggregatingState;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
 import org.apache.flink.api.common.state.FoldingState;
 import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.KeyedStateStore;
@@ -85,6 +87,17 @@ public class DefaultKeyedStateStore implements 
KeyedStateStore {
        }
 
        @Override
+       public <IN, ACC, OUT> AggregatingState<IN, OUT> 
getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateProperties) {
+               requireNonNull(stateProperties, "The state properties must not 
be null");
+               try {
+                       
stateProperties.initializeSerializerUnlessSet(executionConfig);
+                       return getPartitionedState(stateProperties);
+               } catch (Exception e) {
+                       throw new RuntimeException("Error while getting state", 
e);
+               }
+       }
+
+       @Override
        public <T, ACC> FoldingState<T, ACC> 
getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties) {
                requireNonNull(stateProperties, "The state properties must not 
be null");
                try {

http://git-wip-us.apache.org/repos/asf/flink/blob/e7a06094/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java
index b6ce862..8d9ac8f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java
@@ -32,6 +32,8 @@ import 
org.apache.flink.api.common.functions.BroadcastVariableInitializer;
 import org.apache.flink.api.common.functions.IterationRuntimeContext;
 import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.AggregatingState;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
 import org.apache.flink.api.common.state.FoldingState;
 import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListState;
@@ -172,6 +174,11 @@ public abstract class RichAsyncFunction<IN, OUT> extends 
AbstractRichFunction im
                }
 
                @Override
+               public <IN, ACC, OUT> AggregatingState<IN, OUT> 
getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateProperties) {
+                       throw new UnsupportedOperationException("State is not 
supported in rich async functions.");
+               }
+
+               @Override
                public <T, ACC> FoldingState<T, ACC> 
getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties) {
                        throw new UnsupportedOperationException("State is not 
supported in rich async functions.");
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/e7a06094/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
index bd3d500..cb40214 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
@@ -22,6 +22,8 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
 import org.apache.flink.api.common.functions.util.AbstractRuntimeUDFContext;
+import org.apache.flink.api.common.state.AggregatingState;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
 import org.apache.flink.api.common.state.FoldingState;
 import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.KeyedStateStore;
@@ -133,6 +135,13 @@ public class StreamingRuntimeContext extends 
AbstractRuntimeUDFContext {
        }
 
        @Override
+       public <IN, ACC, OUT> AggregatingState<IN, OUT> 
getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateProperties) {
+               KeyedStateStore keyedStateStore = 
checkPreconditionsAndGetKeyedStateStore(stateProperties);
+               
stateProperties.initializeSerializerUnlessSet(getExecutionConfig());
+               return keyedStateStore.getAggregatingState(stateProperties);
+       }
+
+       @Override
        public <T, ACC> FoldingState<T, ACC> 
getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties) {
                KeyedStateStore keyedStateStore = 
checkPreconditionsAndGetKeyedStateStore(stateProperties);
                
stateProperties.initializeSerializerUnlessSet(getExecutionConfig());

http://git-wip-us.apache.org/repos/asf/flink/blob/e7a06094/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index a921625..cf606bc 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -20,6 +20,8 @@ package 
org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.AggregatingState;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
 import org.apache.flink.api.common.state.AppendingState;
 import org.apache.flink.api.common.state.FoldingState;
 import org.apache.flink.api.common.state.FoldingStateDescriptor;
@@ -679,6 +681,11 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                }
 
                @Override
+               public <IN, ACC, OUT> AggregatingState<IN, OUT> 
getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateProperties) {
+                       throw new UnsupportedOperationException("Per-window 
state is not allowed when using merging windows.");
+               }
+
+               @Override
                public <T, A> FoldingState<T, A> 
getFoldingState(FoldingStateDescriptor<T, A> stateProperties) {
                        throw new UnsupportedOperationException("Per-window 
state is not allowed when using merging windows.");
                }
@@ -722,6 +729,15 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                }
 
                @Override
+               public <IN, ACC, OUT> AggregatingState<IN, OUT> 
getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateProperties) {
+                       try {
+                               return 
WindowOperator.this.getPartitionedState(window, windowSerializer, 
stateProperties);
+                       } catch (Exception e) {
+                               throw new RuntimeException("Could not retrieve 
state", e);
+                       }
+               }
+
+               @Override
                public <T, ACC> FoldingState<T, ACC> 
getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties) {
                        try {
                                return 
WindowOperator.this.getPartitionedState(window, windowSerializer, 
stateProperties);

http://git-wip-us.apache.org/repos/asf/flink/blob/e7a06094/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java
index aba37df..d6f5e61 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java
@@ -20,11 +20,13 @@ package org.apache.flink.streaming.api.functions.async;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.functions.AggregateFunction;
 import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.IterationRuntimeContext;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
 import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.MapStateDescriptor;
@@ -165,6 +167,34 @@ public class RichAsyncFunctionTest {
                } catch (UnsupportedOperationException e) {
                        // expected
                }
+
+               try {
+                       runtimeContext.getAggregatingState(new 
AggregatingStateDescriptor<>("foobar", new AggregateFunction<Integer, Integer, 
Integer>() {
+
+                               @Override
+                               public Integer createAccumulator() {
+                                       return null;
+                               }
+
+                               @Override
+                               public Integer add(Integer value, Integer 
accumulator) {
+                                       return null;
+                               }
+
+                               @Override
+                               public Integer getResult(Integer accumulator) {
+                                       return null;
+                               }
+
+                               @Override
+                               public Integer merge(Integer a, Integer b) {
+                                       return null;
+                               }
+                       }, Integer.class));
+               } catch (UnsupportedOperationException e) {
+                       // expected
+               }
+
                try {
                        runtimeContext.getFoldingState(new 
FoldingStateDescriptor<>("foobar", 0, new FoldFunction<Integer, Integer>() {
                                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/e7a06094/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
index 2e24f4c..1072eec 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
@@ -22,8 +22,10 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.functions.AggregateFunction;
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
 import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
@@ -126,6 +128,35 @@ public class StreamingRuntimeContextTest {
        }
 
        @Test
+       public void testAggregatingStateInstantiation() throws Exception {
+
+               final ExecutionConfig config = new ExecutionConfig();
+               config.registerKryoType(Path.class);
+
+               final AtomicReference<Object> descriptorCapture = new 
AtomicReference<>();
+
+               StreamingRuntimeContext context = new StreamingRuntimeContext(
+                               
createDescriptorCapturingMockOp(descriptorCapture, config),
+                               createMockEnvironment(),
+                               Collections.<String, Accumulator<?, 
?>>emptyMap());
+
+               @SuppressWarnings("unchecked")
+               AggregateFunction<String, TaskInfo, String> aggregate = 
(AggregateFunction<String, TaskInfo, String>) mock(AggregateFunction.class);
+
+               AggregatingStateDescriptor<String, TaskInfo, String> descr =
+                               new AggregatingStateDescriptor<>("name", 
aggregate, TaskInfo.class);
+
+               context.getAggregatingState(descr);
+
+               AggregatingStateDescriptor<?, ?, ?> descrIntercepted = 
(AggregatingStateDescriptor<?, ?, ?>) descriptorCapture.get();
+               TypeSerializer<?> serializer = descrIntercepted.getSerializer();
+
+               // check that the Path class is really registered, i.e., the 
execution config was applied
+               assertTrue(serializer instanceof KryoSerializer);
+               assertTrue(((KryoSerializer<?>) 
serializer).getKryo().getRegistration(Path.class).getId() > 0);
+       }
+
+       @Test
        public void testFoldingStateInstantiation() throws Exception {
 
                final ExecutionConfig config = new ExecutionConfig();

Reply via email to