Repository: flink
Updated Branches:
  refs/heads/master 89866a5ad -> cd5527417


http://git-wip-us.apache.org/repos/asf/flink/blob/cd552741/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
index b1c94cb..8aa76a5 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.streaming.api.operators;
 
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StateUtil;
 import org.apache.flink.util.ExceptionUtils;
@@ -30,8 +30,8 @@ import java.util.concurrent.RunnableFuture;
  */
 public class OperatorSnapshotResult {
 
-       private RunnableFuture<KeyGroupsStateHandle> keyedStateManagedFuture;
-       private RunnableFuture<KeyGroupsStateHandle> keyedStateRawFuture;
+       private RunnableFuture<KeyedStateHandle> keyedStateManagedFuture;
+       private RunnableFuture<KeyedStateHandle> keyedStateRawFuture;
        private RunnableFuture<OperatorStateHandle> operatorStateManagedFuture;
        private RunnableFuture<OperatorStateHandle> operatorStateRawFuture;
 
@@ -40,8 +40,8 @@ public class OperatorSnapshotResult {
        }
 
        public OperatorSnapshotResult(
-                       RunnableFuture<KeyGroupsStateHandle> 
keyedStateManagedFuture,
-                       RunnableFuture<KeyGroupsStateHandle> 
keyedStateRawFuture,
+                       RunnableFuture<KeyedStateHandle> 
keyedStateManagedFuture,
+                       RunnableFuture<KeyedStateHandle> keyedStateRawFuture,
                        RunnableFuture<OperatorStateHandle> 
operatorStateManagedFuture,
                        RunnableFuture<OperatorStateHandle> 
operatorStateRawFuture) {
                this.keyedStateManagedFuture = keyedStateManagedFuture;
@@ -50,19 +50,19 @@ public class OperatorSnapshotResult {
                this.operatorStateRawFuture = operatorStateRawFuture;
        }
 
-       public RunnableFuture<KeyGroupsStateHandle> 
getKeyedStateManagedFuture() {
+       public RunnableFuture<KeyedStateHandle> getKeyedStateManagedFuture() {
                return keyedStateManagedFuture;
        }
 
-       public void 
setKeyedStateManagedFuture(RunnableFuture<KeyGroupsStateHandle> 
keyedStateManagedFuture) {
+       public void setKeyedStateManagedFuture(RunnableFuture<KeyedStateHandle> 
keyedStateManagedFuture) {
                this.keyedStateManagedFuture = keyedStateManagedFuture;
        }
 
-       public RunnableFuture<KeyGroupsStateHandle> getKeyedStateRawFuture() {
+       public RunnableFuture<KeyedStateHandle> getKeyedStateRawFuture() {
                return keyedStateRawFuture;
        }
 
-       public void setKeyedStateRawFuture(RunnableFuture<KeyGroupsStateHandle> 
keyedStateRawFuture) {
+       public void setKeyedStateRawFuture(RunnableFuture<KeyedStateHandle> 
keyedStateRawFuture) {
                this.keyedStateRawFuture = keyedStateRawFuture;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/cd552741/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorStateHandles.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorStateHandles.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorStateHandles.java
index 7abf8d9..30d07b7 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorStateHandles.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorStateHandles.java
@@ -21,7 +21,7 @@ package org.apache.flink.streaming.runtime.tasks;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.state.ChainedStateHandle;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.TaskStateHandles;
@@ -42,16 +42,16 @@ public class OperatorStateHandles {
 
        private final StreamStateHandle legacyOperatorState;
 
-       private final Collection<KeyGroupsStateHandle> managedKeyedState;
-       private final Collection<KeyGroupsStateHandle> rawKeyedState;
+       private final Collection<KeyedStateHandle> managedKeyedState;
+       private final Collection<KeyedStateHandle> rawKeyedState;
        private final Collection<OperatorStateHandle> managedOperatorState;
        private final Collection<OperatorStateHandle> rawOperatorState;
 
        public OperatorStateHandles(
                        int operatorChainIndex,
                        StreamStateHandle legacyOperatorState,
-                       Collection<KeyGroupsStateHandle> managedKeyedState,
-                       Collection<KeyGroupsStateHandle> rawKeyedState,
+                       Collection<KeyedStateHandle> managedKeyedState,
+                       Collection<KeyedStateHandle> rawKeyedState,
                        Collection<OperatorStateHandle> managedOperatorState,
                        Collection<OperatorStateHandle> rawOperatorState) {
 
@@ -83,11 +83,11 @@ public class OperatorStateHandles {
                return legacyOperatorState;
        }
 
-       public Collection<KeyGroupsStateHandle> getManagedKeyedState() {
+       public Collection<KeyedStateHandle> getManagedKeyedState() {
                return managedKeyedState;
        }
 
-       public Collection<KeyGroupsStateHandle> getRawKeyedState() {
+       public Collection<KeyedStateHandle> getRawKeyedState() {
                return rawKeyedState;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/cd552741/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 76b2b98..11e8e0d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -37,7 +37,7 @@ import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StateBackend;
@@ -849,8 +849,8 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
 
                private final List<OperatorSnapshotResult> 
snapshotInProgressList;
 
-               private RunnableFuture<KeyGroupsStateHandle> 
futureKeyedBackendStateHandles;
-               private RunnableFuture<KeyGroupsStateHandle> 
futureKeyedStreamStateHandles;
+               private RunnableFuture<KeyedStateHandle> 
futureKeyedBackendStateHandles;
+               private RunnableFuture<KeyedStateHandle> 
futureKeyedStreamStateHandles;
 
                private List<StreamStateHandle> nonPartitionedStateHandles;
 
@@ -892,8 +892,8 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                public void run() {
                        try {
                                // Keyed state handle future, currently only 
one (the head) operator can have this
-                               KeyGroupsStateHandle keyedStateHandleBackend = 
FutureUtil.runIfNotDoneAndGet(futureKeyedBackendStateHandles);
-                               KeyGroupsStateHandle keyedStateHandleStream = 
FutureUtil.runIfNotDoneAndGet(futureKeyedStreamStateHandles);
+                               KeyedStateHandle keyedStateHandleBackend = 
FutureUtil.runIfNotDoneAndGet(futureKeyedBackendStateHandles);
+                               KeyedStateHandle keyedStateHandleStream = 
FutureUtil.runIfNotDoneAndGet(futureKeyedStreamStateHandles);
 
                                List<OperatorStateHandle> operatorStatesBackend 
= new ArrayList<>(snapshotInProgressList.size());
                                List<OperatorStateHandle> operatorStatesStream 
= new ArrayList<>(snapshotInProgressList.size());
@@ -987,8 +987,8 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                                ChainedStateHandle<StreamStateHandle> 
chainedNonPartitionedOperatorsState,
                                ChainedStateHandle<OperatorStateHandle> 
chainedOperatorStateBackend,
                                ChainedStateHandle<OperatorStateHandle> 
chainedOperatorStateStream,
-                               KeyGroupsStateHandle keyedStateHandleBackend,
-                               KeyGroupsStateHandle keyedStateHandleStream) {
+                               KeyedStateHandle keyedStateHandleBackend,
+                               KeyedStateHandle keyedStateHandleStream) {
 
                        boolean hasAnyState = keyedStateHandleBackend != null
                                        || keyedStateHandleStream != null

http://git-wip-us.apache.org/repos/asf/flink/blob/cd552741/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
index eeee8dc..8f42c1a 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
@@ -51,7 +51,7 @@ import 
org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
@@ -559,11 +559,11 @@ public class AbstractStreamOperatorTest {
 
                final CloseableRegistry closeableRegistry = new 
CloseableRegistry();
 
-               RunnableFuture<KeyGroupsStateHandle> futureKeyGroupStateHandle 
= mock(RunnableFuture.class);
+               RunnableFuture<KeyedStateHandle> futureKeyedStateHandle = 
mock(RunnableFuture.class);
                RunnableFuture<OperatorStateHandle> futureOperatorStateHandle = 
mock(RunnableFuture.class);
 
                StateSnapshotContextSynchronousImpl context = 
mock(StateSnapshotContextSynchronousImpl.class);
-               
when(context.getKeyedStateStreamFuture()).thenReturn(futureKeyGroupStateHandle);
+               
when(context.getKeyedStateStreamFuture()).thenReturn(futureKeyedStateHandle);
                
when(context.getOperatorStateStreamFuture()).thenReturn(futureOperatorStateHandle);
 
                OperatorSnapshotResult operatorSnapshotResult = spy(new 
OperatorSnapshotResult());
@@ -609,9 +609,9 @@ public class AbstractStreamOperatorTest {
                verify(context).close();
                verify(operatorSnapshotResult).cancel();
 
-               verify(futureKeyGroupStateHandle).cancel(anyBoolean());
+               verify(futureKeyedStateHandle).cancel(anyBoolean());
                verify(futureOperatorStateHandle).cancel(anyBoolean());
-               verify(futureKeyGroupStateHandle).cancel(anyBoolean());
+               verify(futureKeyedStateHandle).cancel(anyBoolean());
        }
 
        @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/cd552741/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResultTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResultTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResultTest.java
index 490df52..f57eed1 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResultTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResultTest.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.streaming.api.operators;
 
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
@@ -41,12 +41,12 @@ public class OperatorSnapshotResultTest extends TestLogger {
 
                operatorSnapshotResult.cancel();
 
-               KeyGroupsStateHandle keyedManagedStateHandle = 
mock(KeyGroupsStateHandle.class);
-               RunnableFuture<KeyGroupsStateHandle> keyedStateManagedFuture = 
mock(RunnableFuture.class);
+               KeyedStateHandle keyedManagedStateHandle = 
mock(KeyedStateHandle.class);
+               RunnableFuture<KeyedStateHandle> keyedStateManagedFuture = 
mock(RunnableFuture.class);
                
when(keyedStateManagedFuture.get()).thenReturn(keyedManagedStateHandle);
 
-               KeyGroupsStateHandle keyedRawStateHandle = 
mock(KeyGroupsStateHandle.class);
-               RunnableFuture<KeyGroupsStateHandle> keyedStateRawFuture = 
mock(RunnableFuture.class);
+               KeyedStateHandle keyedRawStateHandle = 
mock(KeyedStateHandle.class);
+               RunnableFuture<KeyedStateHandle> keyedStateRawFuture = 
mock(RunnableFuture.class);
                when(keyedStateRawFuture.get()).thenReturn(keyedRawStateHandle);
 
                OperatorStateHandle operatorManagedStateHandle = 
mock(OperatorStateHandle.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/cd552741/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
index 963c42c..8e0edfc 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
 import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StateInitializationContextImpl;
 import org.apache.flink.runtime.state.StatePartitionStreamProvider;
@@ -75,7 +76,7 @@ public class StateInitializationContextImplTest {
 
                ByteArrayOutputStreamWithPos out = new 
ByteArrayOutputStreamWithPos(64);
 
-               List<KeyGroupsStateHandle> keyGroupsStateHandles = new 
ArrayList<>(NUM_HANDLES);
+               List<KeyedStateHandle> keyedStateHandles = new 
ArrayList<>(NUM_HANDLES);
                int prev = 0;
                for (int i = 0; i < NUM_HANDLES; ++i) {
                        out.reset();
@@ -91,10 +92,10 @@ public class StateInitializationContextImplTest {
                                ++writtenKeyGroups;
                        }
 
-                       KeyGroupsStateHandle handle =
+                       KeyedStateHandle handle =
                                        new KeyGroupsStateHandle(offsets, new 
ByteStateHandleCloseChecking("kg-" + i, out.toByteArray()));
 
-                       keyGroupsStateHandles.add(handle);
+                       keyedStateHandles.add(handle);
                }
 
                List<OperatorStateHandle> operatorStateHandles = new 
ArrayList<>(NUM_HANDLES);
@@ -125,7 +126,7 @@ public class StateInitializationContextImplTest {
                                                true,
                                                stateStore,
                                                mock(KeyedStateStore.class),
-                                               keyGroupsStateHandles,
+                                               keyedStateHandles,
                                                operatorStateHandles,
                                                closableRegistry);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/cd552741/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
index 58cfefd..4435247 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
@@ -51,6 +51,7 @@ import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StreamStateHandle;
@@ -186,8 +187,8 @@ public class InterruptSensitiveRestoreTest {
 
 
                ChainedStateHandle<StreamStateHandle> operatorState = null;
-               List<KeyGroupsStateHandle> keyGroupStateFromBackend = 
Collections.emptyList();
-               List<KeyGroupsStateHandle> keyGroupStateFromStream = 
Collections.emptyList();
+               List<KeyedStateHandle> keyedStateFromBackend = 
Collections.emptyList();
+               List<KeyedStateHandle> keyedStateFromStream = 
Collections.emptyList();
                List<Collection<OperatorStateHandle>> operatorStateBackend = 
Collections.emptyList();
                List<Collection<OperatorStateHandle>> operatorStateStream = 
Collections.emptyList();
 
@@ -201,8 +202,8 @@ public class InterruptSensitiveRestoreTest {
                Collection<OperatorStateHandle> operatorStateHandles =
                                Collections.singletonList(new 
OperatorStateHandle(operatorStateMetadata, state));
 
-               List<KeyGroupsStateHandle> keyGroupsStateHandles =
-                               Collections.singletonList(new 
KeyGroupsStateHandle(keyGroupRangeOffsets, state));
+               List<KeyedStateHandle> keyedStateHandles =
+                               Collections.<KeyedStateHandle>singletonList(new 
KeyGroupsStateHandle(keyGroupRangeOffsets, state));
 
                switch (mode) {
                        case OPERATOR_MANAGED:
@@ -212,10 +213,10 @@ public class InterruptSensitiveRestoreTest {
                                operatorStateStream = 
Collections.singletonList(operatorStateHandles);
                                break;
                        case KEYED_MANAGED:
-                               keyGroupStateFromBackend = 
keyGroupsStateHandles;
+                               keyedStateFromBackend = keyedStateHandles;
                                break;
                        case KEYED_RAW:
-                               keyGroupStateFromStream = keyGroupsStateHandles;
+                               keyedStateFromStream = keyedStateHandles;
                                break;
                        case LEGACY:
                                operatorState = new 
ChainedStateHandle<>(Collections.singletonList(state));
@@ -228,8 +229,8 @@ public class InterruptSensitiveRestoreTest {
                        operatorState,
                        operatorStateBackend,
                        operatorStateStream,
-                       keyGroupStateFromBackend,
-                       keyGroupStateFromStream);
+                       keyedStateFromBackend,
+                       keyedStateFromStream);
 
                JobInformation jobInformation = new JobInformation(
                        new JobID(),

http://git-wip-us.apache.org/repos/asf/flink/blob/cd552741/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index d7e3d6c..f34522b 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -61,7 +61,7 @@ import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.DoneFuture;
 import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StateBackendFactory;
@@ -458,8 +458,8 @@ public class StreamTaskTest extends TestLogger {
 
                StreamOperator<?> streamOperator = mock(StreamOperator.class, 
withSettings().extraInterfaces(StreamCheckpointedOperator.class));
 
-               KeyGroupsStateHandle managedKeyedStateHandle = 
mock(KeyGroupsStateHandle.class);
-               KeyGroupsStateHandle rawKeyedStateHandle = 
mock(KeyGroupsStateHandle.class);
+               KeyedStateHandle managedKeyedStateHandle = 
mock(KeyedStateHandle.class);
+               KeyedStateHandle rawKeyedStateHandle = 
mock(KeyedStateHandle.class);
                OperatorStateHandle managedOperatorStateHandle = 
mock(OperatorStateHandle.class);
                OperatorStateHandle rawOperatorStateHandle = 
mock(OperatorStateHandle.class);
 
@@ -563,8 +563,8 @@ public class StreamTaskTest extends TestLogger {
                                        
(ChainedStateHandle<StreamStateHandle>)invocation.getArguments()[0],
                                        
(ChainedStateHandle<OperatorStateHandle>)invocation.getArguments()[1],
                                        
(ChainedStateHandle<OperatorStateHandle>)invocation.getArguments()[2],
-                                       
(KeyGroupsStateHandle)invocation.getArguments()[3],
-                                       
(KeyGroupsStateHandle)invocation.getArguments()[4]);
+                                       
(KeyedStateHandle)invocation.getArguments()[3],
+                                       
(KeyedStateHandle)invocation.getArguments()[4]);
                        }
                });
 
@@ -574,8 +574,8 @@ public class StreamTaskTest extends TestLogger {
 
                StreamOperator<?> streamOperator = mock(StreamOperator.class, 
withSettings().extraInterfaces(StreamCheckpointedOperator.class));
 
-               KeyGroupsStateHandle managedKeyedStateHandle = 
mock(KeyGroupsStateHandle.class);
-               KeyGroupsStateHandle rawKeyedStateHandle = 
mock(KeyGroupsStateHandle.class);
+               KeyedStateHandle managedKeyedStateHandle = 
mock(KeyedStateHandle.class);
+               KeyedStateHandle rawKeyedStateHandle = 
mock(KeyedStateHandle.class);
                OperatorStateHandle managedOperatorStateHandle = 
mock(OperatorStateHandle.class);
                OperatorStateHandle rawOperatorStateHandle = 
mock(OperatorStateHandle.class);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/cd552741/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 945103c..912d579 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -41,6 +41,7 @@ import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
@@ -318,7 +319,7 @@ public class AbstractStreamOperatorTestHarness<OUT> {
 
                StreamStateHandle stateHandle = 
SavepointV0Serializer.convertOperatorAndFunctionState(state);
 
-               List<KeyGroupsStateHandle> keyGroupStatesList = new 
ArrayList<>();
+               List<KeyedStateHandle> keyGroupStatesList = new ArrayList<>();
                if (state.getKvStates() != null) {
                        KeyGroupsStateHandle keyedStateHandle = 
SavepointV0Serializer.convertKeyedBackendState(
                                        state.getKvStates(),
@@ -331,7 +332,7 @@ public class AbstractStreamOperatorTestHarness<OUT> {
                initializeState(new OperatorStateHandles(0,
                                stateHandle,
                                keyGroupStatesList,
-                               Collections.<KeyGroupsStateHandle>emptyList(),
+                               Collections.<KeyedStateHandle>emptyList(),
                                Collections.<OperatorStateHandle>emptyList(),
                                Collections.<OperatorStateHandle>emptyList()));
        }
@@ -364,16 +365,16 @@ public class AbstractStreamOperatorTestHarness<OUT> {
                        KeyGroupRange localKeyGroupRange =
                                        keyGroupPartitions.get(subtaskIndex);
 
-                       List<KeyGroupsStateHandle> localManagedKeyGroupState = 
null;
+                       List<KeyedStateHandle> localManagedKeyGroupState = null;
                        if (operatorStateHandles.getManagedKeyedState() != 
null) {
-                               localManagedKeyGroupState = 
StateAssignmentOperation.getKeyGroupsStateHandles(
+                               localManagedKeyGroupState = 
StateAssignmentOperation.getKeyedStateHandles(
                                                
operatorStateHandles.getManagedKeyedState(),
                                                localKeyGroupRange);
                        }
 
-                       List<KeyGroupsStateHandle> localRawKeyGroupState = null;
+                       List<KeyedStateHandle> localRawKeyGroupState = null;
                        if (operatorStateHandles.getRawKeyedState() != null) {
-                               localRawKeyGroupState = 
StateAssignmentOperation.getKeyGroupsStateHandles(
+                               localRawKeyGroupState = 
StateAssignmentOperation.getKeyedStateHandles(
                                                
operatorStateHandles.getRawKeyedState(),
                                                localKeyGroupRange);
                        }
@@ -442,15 +443,15 @@ public class AbstractStreamOperatorTestHarness<OUT> {
                List<OperatorStateHandle> mergedManagedOperatorState = new 
ArrayList<>(handles.length);
                List<OperatorStateHandle> mergedRawOperatorState = new 
ArrayList<>(handles.length);
 
-               List<KeyGroupsStateHandle> mergedManagedKeyedState = new 
ArrayList<>(handles.length);
-               List<KeyGroupsStateHandle> mergedRawKeyedState = new 
ArrayList<>(handles.length);
+               List<KeyedStateHandle> mergedManagedKeyedState = new 
ArrayList<>(handles.length);
+               List<KeyedStateHandle> mergedRawKeyedState = new 
ArrayList<>(handles.length);
 
                for (OperatorStateHandles handle: handles) {
 
                        Collection<OperatorStateHandle> managedOperatorState = 
handle.getManagedOperatorState();
                        Collection<OperatorStateHandle> rawOperatorState = 
handle.getRawOperatorState();
-                       Collection<KeyGroupsStateHandle> managedKeyedState = 
handle.getManagedKeyedState();
-                       Collection<KeyGroupsStateHandle> rawKeyedState = 
handle.getRawKeyedState();
+                       Collection<KeyedStateHandle> managedKeyedState = 
handle.getManagedKeyedState();
+                       Collection<KeyedStateHandle> rawKeyedState = 
handle.getRawKeyedState();
 
                        if (managedOperatorState != null) {
                                
mergedManagedOperatorState.addAll(managedOperatorState);
@@ -502,8 +503,8 @@ public class AbstractStreamOperatorTestHarness<OUT> {
                        timestamp,
                        CheckpointOptions.forFullCheckpoint());
 
-               KeyGroupsStateHandle keyedManaged = 
FutureUtil.runIfNotDoneAndGet(operatorStateResult.getKeyedStateManagedFuture());
-               KeyGroupsStateHandle keyedRaw = 
FutureUtil.runIfNotDoneAndGet(operatorStateResult.getKeyedStateRawFuture());
+               KeyedStateHandle keyedManaged = 
FutureUtil.runIfNotDoneAndGet(operatorStateResult.getKeyedStateManagedFuture());
+               KeyedStateHandle keyedRaw = 
FutureUtil.runIfNotDoneAndGet(operatorStateResult.getKeyedStateRawFuture());
 
                OperatorStateHandle opManaged = 
FutureUtil.runIfNotDoneAndGet(operatorStateResult.getOperatorStateManagedFuture());
                OperatorStateHandle opRaw = 
FutureUtil.runIfNotDoneAndGet(operatorStateResult.getOperatorStateRawFuture());

http://git-wip-us.apache.org/repos/asf/flink/blob/cd552741/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
index d45ae21..d9c7387 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
@@ -65,7 +66,7 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, 
OUT>
 
        // when we restore we keep the state here so that we can call restore
        // when the operator requests the keyed state backend
-       private List<KeyGroupsStateHandle> restoredKeyedState = null;
+       private List<KeyedStateHandle> restoredKeyedState = null;
 
        public KeyedOneInputStreamOperatorTestHarness(
                        OneInputStreamOperator<IN, OUT> operator,
@@ -144,7 +145,7 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, 
OUT>
                }
 
                if (keyedStateBackend != null) {
-                       RunnableFuture<KeyGroupsStateHandle> 
keyedSnapshotRunnable = keyedStateBackend.snapshot(
+                       RunnableFuture<KeyedStateHandle> keyedSnapshotRunnable 
= keyedStateBackend.snapshot(
                                        checkpointId,
                                        timestamp,
                                        streamFactory,
@@ -177,14 +178,14 @@ public class KeyedOneInputStreamOperatorTestHarness<K, 
IN, OUT>
                        byte keyedStatePresent = (byte) inStream.read();
                        if (keyedStatePresent == 1) {
                                ObjectInputStream ois = new 
ObjectInputStream(inStream);
-                               this.restoredKeyedState = 
Collections.singletonList((KeyGroupsStateHandle) ois.readObject());
+                               this.restoredKeyedState = 
Collections.singletonList((KeyedStateHandle) ois.readObject());
                        }
                }
        }
 
 
-       private static boolean 
hasMigrationHandles(Collection<KeyGroupsStateHandle> allKeyGroupsHandles) {
-               for (KeyGroupsStateHandle handle : allKeyGroupsHandles) {
+       private static boolean hasMigrationHandles(Collection<KeyedStateHandle> 
allKeyGroupsHandles) {
+               for (KeyedStateHandle handle : allKeyGroupsHandles) {
                        if (handle instanceof Migration) {
                                return true;
                        }
@@ -225,17 +226,17 @@ public class KeyedOneInputStreamOperatorTestHarness<K, 
IN, OUT>
                                        keyGroupPartitions.get(subtaskIndex);
 
                        restoredKeyedState = null;
-                       Collection<KeyGroupsStateHandle> managedKeyedState = 
operatorStateHandles.getManagedKeyedState();
+                       Collection<KeyedStateHandle> managedKeyedState = 
operatorStateHandles.getManagedKeyedState();
                        if (managedKeyedState != null) {
 
                                // if we have migration handles, don't 
reshuffle state and preserve
                                // the migration tag
                                if (hasMigrationHandles(managedKeyedState)) {
-                                       List<KeyGroupsStateHandle> result = new 
ArrayList<>(managedKeyedState.size());
+                                       List<KeyedStateHandle> result = new 
ArrayList<>(managedKeyedState.size());
                                        result.addAll(managedKeyedState);
                                        restoredKeyedState = result;
                                } else {
-                                       restoredKeyedState = 
StateAssignmentOperation.getKeyGroupsStateHandles(
+                                       restoredKeyedState = 
StateAssignmentOperation.getKeyedStateHandles(
                                                        managedKeyedState,
                                                        localKeyGroupRange);
                                }

http://git-wip-us.apache.org/repos/asf/flink/blob/cd552741/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
index 8e76f70..41a083a 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.mockito.invocation.InvocationOnMock;
@@ -50,7 +51,7 @@ public class KeyedTwoInputStreamOperatorTestHarness<K, IN1, 
IN2, OUT>
 
        // when we restore we keep the state here so that we can call restore
        // when the operator requests the keyed state backend
-       private Collection<KeyGroupsStateHandle> restoredKeyedState = null;
+       private Collection<KeyedStateHandle> restoredKeyedState = null;
 
        public KeyedTwoInputStreamOperatorTestHarness(
                        TwoInputStreamOperator<IN1, IN2, OUT> operator,

Reply via email to