[FLINK-4380] Remove KeyGroupAssigner in favor of static method/Have default 
max. parallelism at 128


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6d430618
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6d430618
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6d430618

Branch: refs/heads/master
Commit: 6d4306186e09be6f2557600ed7a853c33d3ae6bd
Parents: 2b7a8d6
Author: Stefan Richter <s.rich...@data-artisans.com>
Authored: Mon Aug 29 11:53:22 2016 +0200
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Wed Aug 31 19:10:01 2016 +0200

----------------------------------------------------------------------
 .../streaming/state/AbstractRocksDBState.java   |   3 +-
 .../state/RocksDBKeyedStateBackend.java         |   9 +-
 .../streaming/state/RocksDBStateBackend.java    |   9 +-
 .../state/RocksDBStateBackendConfigTest.java    |   9 +-
 .../api/common/state/KeyGroupAssigner.java      |  53 -----
 .../java/org/apache/flink/util/MathUtils.java   |  15 ++
 .../org/apache/flink/util/MathUtilTest.java     |  31 +++
 .../checkpoint/CheckpointCoordinator.java       |   3 +-
 .../flink/runtime/jobgraph/JobVertex.java       |   3 +-
 .../runtime/state/AbstractStateBackend.java     |   5 +-
 .../runtime/state/HashKeyGroupAssigner.java     |  66 ------
 .../flink/runtime/state/KeyGroupRange.java      |  42 ----
 .../runtime/state/KeyGroupRangeAssignment.java  |  97 +++++++++
 .../flink/runtime/state/KeyedStateBackend.java  |  17 +-
 .../state/filesystem/FsStateBackend.java        |   9 +-
 .../runtime/state/heap/AbstractHeapState.java   |   4 +-
 .../state/heap/HeapKeyedStateBackend.java       |   9 +-
 .../flink/runtime/state/heap/HeapListState.java |   3 +-
 .../state/memory/MemoryStateBackend.java        |   9 +-
 .../checkpoint/CheckpointCoordinatorTest.java   |   3 +-
 .../runtime/query/QueryableStateClientTest.java |   4 +-
 .../runtime/query/netty/KvStateClientTest.java  |   5 +-
 .../query/netty/KvStateServerHandlerTest.java   |  16 +-
 .../runtime/query/netty/KvStateServerTest.java  |   6 +-
 .../runtime/state/StateBackendTestBase.java     |  49 ++---
 .../streaming/api/datastream/KeyedStream.java   |   5 +-
 .../flink/streaming/api/graph/StreamConfig.java |  22 +-
 .../api/graph/StreamGraphGenerator.java         |  24 ++-
 .../api/graph/StreamingJobGraphGenerator.java   |   9 +-
 .../api/operators/AbstractStreamOperator.java   |   5 +-
 .../partitioner/KeyGroupStreamPartitioner.java  |  24 +--
 .../streaming/runtime/tasks/StreamTask.java     |   7 +-
 .../api/graph/StreamGraphGeneratorTest.java     |  26 +--
 .../graph/StreamingJobGraphGeneratorTest.java   | 200 -------------------
 .../operators/StreamingRuntimeContextTest.java  |   5 +-
 ...AlignedProcessingTimeWindowOperatorTest.java |  19 +-
 .../KeyGroupStreamPartitionerTest.java          |   3 +-
 .../tasks/OneInputStreamTaskTestHarness.java    |   3 +-
 .../KeyedOneInputStreamOperatorTestHarness.java |  16 +-
 .../util/OneInputStreamOperatorTestHarness.java |   4 -
 .../test/checkpointing/RescalingITCase.java     |  27 +--
 .../streaming/runtime/StateBackendITCase.java   |   5 +-
 42 files changed, 297 insertions(+), 586 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
index cbc2757..e878ad5 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.KvState;
 import org.apache.flink.util.Preconditions;
 import org.rocksdb.ColumnFamilyHandle;
@@ -130,7 +131,7 @@ public abstract class AbstractRocksDBState<K, N, S extends 
State, SD extends Sta
                                backend.getKeySerializer(),
                                namespaceSerializer);
 
-               int keyGroup = 
backend.getKeyGroupAssigner().getKeyGroupIndex(des.f0);
+               int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(des.f0, 
backend.getNumberOfKeyGroups());
                writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1);
                return backend.db.get(columnFamily, 
keySerializationStream.toByteArray());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index a1634b2..177c09f 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -21,7 +21,6 @@ import org.apache.commons.io.FileUtils;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.state.FoldingState;
 import org.apache.flink.api.common.state.FoldingStateDescriptor;
-import org.apache.flink.api.common.state.KeyGroupAssigner;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ReducingState;
@@ -131,11 +130,11 @@ public class RocksDBKeyedStateBackend<K> extends 
KeyedStateBackend<K> {
                        ColumnFamilyOptions columnFamilyOptions,
                        TaskKvStateRegistry kvStateRegistry,
                        TypeSerializer<K> keySerializer,
-                       KeyGroupAssigner<K> keyGroupAssigner,
+                       int numberOfKeyGroups,
                        KeyGroupRange keyGroupRange
        ) throws Exception {
 
-               super(kvStateRegistry, keySerializer, keyGroupAssigner, 
keyGroupRange);
+               super(kvStateRegistry, keySerializer, numberOfKeyGroups, 
keyGroupRange);
 
                this.operatorIdentifier = operatorIdentifier;
                this.jobId = jobId;
@@ -183,7 +182,7 @@ public class RocksDBKeyedStateBackend<K> extends 
KeyedStateBackend<K> {
                        ColumnFamilyOptions columnFamilyOptions,
                        TaskKvStateRegistry kvStateRegistry,
                        TypeSerializer<K> keySerializer,
-                       KeyGroupAssigner<K> keyGroupAssigner,
+                       int numberOfKeyGroups,
                        KeyGroupRange keyGroupRange,
                        List<KeyGroupsStateHandle> restoreState
        ) throws Exception {
@@ -195,7 +194,7 @@ public class RocksDBKeyedStateBackend<K> extends 
KeyedStateBackend<K> {
                        columnFamilyOptions,
                        kvStateRegistry,
                        keySerializer,
-                       keyGroupAssigner,
+                       numberOfKeyGroups,
                        keyGroupRange);
 
                LOG.info("Initializing RocksDB keyed state backend from 
snapshot.");

http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index f950751..0fdbd5f 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -18,7 +18,6 @@
 package org.apache.flink.contrib.streaming.state;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.state.KeyGroupAssigner;
 import org.apache.flink.api.common.state.StateBackend;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.fs.Path;
@@ -230,7 +229,7 @@ public class RocksDBStateBackend extends 
AbstractStateBackend {
                        JobID jobID,
                        String operatorIdentifier,
                        TypeSerializer<K> keySerializer,
-                       KeyGroupAssigner<K> keyGroupAssigner,
+                       int numberOfKeyGroups,
                        KeyGroupRange keyGroupRange,
                        TaskKvStateRegistry kvStateRegistry) throws Exception {
 
@@ -246,7 +245,7 @@ public class RocksDBStateBackend extends 
AbstractStateBackend {
                                getColumnOptions(),
                                kvStateRegistry,
                                keySerializer,
-                               keyGroupAssigner,
+                               numberOfKeyGroups,
                                keyGroupRange);
        }
 
@@ -254,7 +253,7 @@ public class RocksDBStateBackend extends 
AbstractStateBackend {
        public <K> KeyedStateBackend<K> restoreKeyedStateBackend(Environment 
env, JobID jobID,
                        String operatorIdentifier,
                        TypeSerializer<K> keySerializer,
-                       KeyGroupAssigner<K> keyGroupAssigner,
+                       int numberOfKeyGroups,
                        KeyGroupRange keyGroupRange,
                        List<KeyGroupsStateHandle> restoredState,
                        TaskKvStateRegistry kvStateRegistry) throws Exception {
@@ -270,7 +269,7 @@ public class RocksDBStateBackend extends 
AbstractStateBackend {
                                getColumnOptions(),
                                kvStateRegistry,
                                keySerializer,
-                               keyGroupAssigner,
+                               numberOfKeyGroups,
                                keyGroupRange,
                                restoredState);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index acf6cb8..3b851be 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -28,7 +28,6 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.query.KvStateRegistry;
 
 import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.HashKeyGroupAssigner;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.util.OperatingSystem;
 import org.junit.Assume;
@@ -93,7 +92,7 @@ public class RocksDBStateBackendConfigTest {
                                env.getJobID(),
                                "test_op",
                                IntSerializer.INSTANCE,
-                               new HashKeyGroupAssigner<Integer>(1),
+                               1,
                                new KeyGroupRange(0, 0),
                                env.getTaskKvStateRegistry());
 
@@ -147,7 +146,7 @@ public class RocksDBStateBackendConfigTest {
                                env.getJobID(),
                                "test_op",
                                IntSerializer.INSTANCE,
-                               new HashKeyGroupAssigner<Integer>(1),
+                               1,
                                new KeyGroupRange(0, 0),
                                env.getTaskKvStateRegistry());
 
@@ -182,7 +181,7 @@ public class RocksDBStateBackendConfigTest {
                                                env.getJobID(),
                                                "foobar",
                                                IntSerializer.INSTANCE,
-                                               new 
HashKeyGroupAssigner<Integer>(1),
+                                               1,
                                                new KeyGroupRange(0, 0),
                                                new 
KvStateRegistry().createTaskRegistry(env.getJobID(), new JobVertexID()));
                        }
@@ -224,7 +223,7 @@ public class RocksDBStateBackendConfigTest {
                                                env.getJobID(),
                                                "foobar",
                                                IntSerializer.INSTANCE,
-                                               new 
HashKeyGroupAssigner<Integer>(1),
+                                               1,
                                                new KeyGroupRange(0, 0),
                                                new 
KvStateRegistry().createTaskRegistry(env.getJobID(), new JobVertexID()));
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-core/src/main/java/org/apache/flink/api/common/state/KeyGroupAssigner.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/KeyGroupAssigner.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/KeyGroupAssigner.java
deleted file mode 100644
index bb0691e..0000000
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/state/KeyGroupAssigner.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.common.state;
-
-import org.apache.flink.annotation.Internal;
-
-import java.io.Serializable;
-
-/**
- * Assigns a key to a key group index. A key group is the smallest unit of 
partitioned state
- * which is assigned to an operator. An operator can be assigned multiple key 
groups.
- *
- * @param <K> Type of the key
- */
-@Internal
-public interface KeyGroupAssigner<K> extends Serializable {
-       /**
-        * Calculates the key group index for the given key.
-        *
-        * @param key Key to be used
-        * @return Key group index for the given key
-        */
-       int getKeyGroupIndex(K key);
-
-       /**
-        * Setups the key group assigner with the maximum parallelism (= number 
of key groups).
-        *
-        * @param numberOfKeygroups Maximum parallelism (= number of key groups)
-        */
-       void setup(int numberOfKeygroups);
-
-       /**
-        *
-        * @return configured maximum parallelism
-        */
-       int getNumberKeyGroups();
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-core/src/main/java/org/apache/flink/util/MathUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/MathUtils.java 
b/flink-core/src/main/java/org/apache/flink/util/MathUtils.java
index f40c83a..49056cc 100644
--- a/flink-core/src/main/java/org/apache/flink/util/MathUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/MathUtils.java
@@ -155,6 +155,21 @@ public final class MathUtils {
                }
        }
 
+       /**
+        * Round the given number to the next power of two
+        * @param x number to round
+        * @return x rounded up to the next power of two
+        */
+       public static int roundUpToPowerOfTwo(int x) {
+               x = x - 1;
+               x |= x >> 1;
+               x |= x >> 2;
+               x |= x >> 4;
+               x |= x >> 8;
+               x |= x >> 16;
+               return x + 1;
+       }
+
        // 
============================================================================================
        
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-core/src/test/java/org/apache/flink/util/MathUtilTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/util/MathUtilTest.java 
b/flink-core/src/test/java/org/apache/flink/util/MathUtilTest.java
index 7917a7b..c98b7fc 100644
--- a/flink-core/src/test/java/org/apache/flink/util/MathUtilTest.java
+++ b/flink-core/src/test/java/org/apache/flink/util/MathUtilTest.java
@@ -81,6 +81,37 @@ public class MathUtilTest {
        }
 
        @Test
+       public void testRoundUpToPowerOf2() {
+               assertEquals(0, MathUtils.roundUpToPowerOfTwo(0));
+               assertEquals(1, MathUtils.roundUpToPowerOfTwo(1));
+               assertEquals(2, MathUtils.roundUpToPowerOfTwo(2));
+               assertEquals(4, MathUtils.roundUpToPowerOfTwo(3));
+               assertEquals(4, MathUtils.roundUpToPowerOfTwo(4));
+               assertEquals(8, MathUtils.roundUpToPowerOfTwo(5));
+               assertEquals(8, MathUtils.roundUpToPowerOfTwo(6));
+               assertEquals(8, MathUtils.roundUpToPowerOfTwo(7));
+               assertEquals(8, MathUtils.roundUpToPowerOfTwo(8));
+               assertEquals(16, MathUtils.roundUpToPowerOfTwo(9));
+               assertEquals(16, MathUtils.roundUpToPowerOfTwo(15));
+               assertEquals(16, MathUtils.roundUpToPowerOfTwo(16));
+               assertEquals(32, MathUtils.roundUpToPowerOfTwo(17));
+               assertEquals(32, MathUtils.roundUpToPowerOfTwo(31));
+               assertEquals(32, MathUtils.roundUpToPowerOfTwo(32));
+               assertEquals(64, MathUtils.roundUpToPowerOfTwo(33));
+               assertEquals(64, MathUtils.roundUpToPowerOfTwo(42));
+               assertEquals(64, MathUtils.roundUpToPowerOfTwo(63));
+               assertEquals(64, MathUtils.roundUpToPowerOfTwo(64));
+               assertEquals(128, MathUtils.roundUpToPowerOfTwo(125));
+               assertEquals(32768, MathUtils.roundUpToPowerOfTwo(25654));
+               assertEquals(67108864, MathUtils.roundUpToPowerOfTwo(34366363));
+               assertEquals(67108864, MathUtils.roundUpToPowerOfTwo(67108863));
+               assertEquals(67108864, MathUtils.roundUpToPowerOfTwo(67108864));
+               assertEquals(0x40000000, 
MathUtils.roundUpToPowerOfTwo(0x3FFFFFFE));
+               assertEquals(0x40000000, 
MathUtils.roundUpToPowerOfTwo(0x3FFFFFFF));
+               assertEquals(0x40000000, 
MathUtils.roundUpToPowerOfTwo(0x40000000));
+       }
+
+       @Test
        public void testPowerOfTwo() {
                assertTrue(MathUtils.isPowerOf2(1));
                assertTrue(MathUtils.isPowerOf2(2));

http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index e751e08..52f6d9a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -36,6 +36,7 @@ import 
org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
 import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
 import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.util.Preconditions;
@@ -886,7 +887,7 @@ public class CheckpointCoordinator {
                List<KeyGroupRange> result = new ArrayList<>(parallelism);
                int start = 0;
                for (int i = 0; i < parallelism; ++i) {
-                       
result.add(KeyGroupRange.computeKeyGroupRangeForOperatorIndex(numberKeyGroups, 
parallelism, i));
+                       
result.add(KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(numberKeyGroups,
 parallelism, i));
                }
                return result;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
index a623295..8ddc9f5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
@@ -253,7 +253,8 @@ public class JobVertex implements java.io.Serializable {
         */
        public void setMaxParallelism(int maxParallelism) {
                org.apache.flink.util.Preconditions.checkArgument(
-                               maxParallelism > 0 && maxParallelism <= 
Short.MAX_VALUE, "The max parallelism must be at least 1.");
+                               maxParallelism > 0 && maxParallelism <= (1 << 
15),
+                               "The max parallelism must be at least 1 and 
smaller than 2^15.");
 
                this.maxParallelism = maxParallelism;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
index e6093a8..0d2bf45 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.state.KeyGroupAssigner;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
@@ -53,7 +52,7 @@ public abstract class AbstractStateBackend implements 
java.io.Serializable {
                        JobID jobID,
                        String operatorIdentifier,
                        TypeSerializer<K> keySerializer,
-                       KeyGroupAssigner<K> keyGroupAssigner,
+                       int numberOfKeyGroups,
                        KeyGroupRange keyGroupRange,
                        TaskKvStateRegistry kvStateRegistry) throws Exception;
 
@@ -66,7 +65,7 @@ public abstract class AbstractStateBackend implements 
java.io.Serializable {
                        JobID jobID,
                        String operatorIdentifier,
                        TypeSerializer<K> keySerializer,
-                       KeyGroupAssigner<K> keyGroupAssigner,
+                       int numberOfKeyGroups,
                        KeyGroupRange keyGroupRange,
                        List<KeyGroupsStateHandle> restoredState,
                        TaskKvStateRegistry kvStateRegistry) throws Exception;

http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashKeyGroupAssigner.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashKeyGroupAssigner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashKeyGroupAssigner.java
deleted file mode 100644
index 9ee4b90..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashKeyGroupAssigner.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import org.apache.flink.api.common.state.KeyGroupAssigner;
-import org.apache.flink.util.MathUtils;
-import org.apache.flink.util.Preconditions;
-
-/**
- * Hash based key group assigner. The assigner assigns each key to a key group 
using the hash value
- * of the key.
- *
- * @param <K> Type of the key
- */
-public class HashKeyGroupAssigner<K> implements KeyGroupAssigner<K> {
-       private static final long serialVersionUID = -6319826921798945448L;
-
-       private static final int UNDEFINED_NUMBER_KEY_GROUPS = 
Integer.MIN_VALUE;
-
-       private int numberKeyGroups;
-
-       public HashKeyGroupAssigner() {
-               this(UNDEFINED_NUMBER_KEY_GROUPS);
-       }
-
-       public HashKeyGroupAssigner(int numberKeyGroups) {
-               Preconditions.checkArgument(numberKeyGroups > 0 || 
numberKeyGroups == UNDEFINED_NUMBER_KEY_GROUPS,
-                       "The number of key groups has to be greater than 0 or 
undefined. Use " +
-                       "setMaxParallelism() to specify the number of key 
groups.");
-               this.numberKeyGroups = numberKeyGroups;
-       }
-
-       public int getNumberKeyGroups() {
-               return numberKeyGroups;
-       }
-
-       @Override
-       public int getKeyGroupIndex(K key) {
-               return MathUtils.murmurHash(key.hashCode()) % numberKeyGroups;
-       }
-
-       @Override
-       public void setup(int numberOfKeygroups) {
-               Preconditions.checkArgument(numberOfKeygroups > 0, "The number 
of key groups has to be " +
-                       "greater than 0. Use setMaxParallelism() to specify the 
number of key " +
-                       "groups.");
-
-               this.numberKeyGroups = numberOfKeygroups;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRange.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRange.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRange.java
index 9e74036..3a9d3d0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRange.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRange.java
@@ -175,46 +175,4 @@ public class KeyGroupRange implements Iterable<Integer>, 
Serializable {
                return startKeyGroup <= endKeyGroup ? new 
KeyGroupRange(startKeyGroup, endKeyGroup) : EMPTY_KEY_GROUP;
        }
 
-       /**
-        * Computes the range of key-groups that are assigned to a given 
operator under the given parallelism and maximum
-        * parallelism.
-        *
-        * IMPORTANT: maxParallelism must be <= Short.MAX_VALUE to avoid 
rounding problems in this method. If we ever want
-        * to go beyond this boundary, this method must perform arithmetic on 
long values.
-        *
-        * @param maxParallelism Maximal parallelism that the job was initially 
created with.
-        * @param parallelism    The current parallelism under which the job 
runs. Must be <= maxParallelism.
-        * @param operatorIndex  Id of a key-group. 0 <= keyGroupID < 
maxParallelism.
-        * @return
-        */
-       public static KeyGroupRange computeKeyGroupRangeForOperatorIndex(
-                       int maxParallelism,
-                       int parallelism,
-                       int operatorIndex) {
-               Preconditions.checkArgument(parallelism > 0, "Parallelism must 
not be smaller than zero.");
-               Preconditions.checkArgument(maxParallelism >= parallelism, 
"Maximum parallelism must not be smaller than parallelism.");
-               Preconditions.checkArgument(maxParallelism <= Short.MAX_VALUE, 
"Maximum parallelism must be smaller than Short.MAX_VALUE.");
-
-               int start = operatorIndex == 0 ? 0 : ((operatorIndex * 
maxParallelism - 1) / parallelism) + 1;
-               int end = ((operatorIndex + 1) * maxParallelism - 1) / 
parallelism;
-               return new KeyGroupRange(start, end);
-       }
-
-       /**
-        * Computes the index of the operator to which a key-group belongs 
under the given parallelism and maximum
-        * parallelism.
-        *
-        * IMPORTANT: maxParallelism must be <= Short.MAX_VALUE to avoid 
rounding problems in this method. If we ever want
-        * to go beyond this boundary, this method must perform arithmetic on 
long values.
-        *
-        * @param maxParallelism Maximal parallelism that the job was initially 
created with.
-        *                       0 < parallelism <= maxParallelism <= 
Short.MAX_VALUE must hold.
-        * @param parallelism    The current parallelism under which the job 
runs. Must be <= maxParallelism.
-        * @param keyGroupId     Id of a key-group. 0 <= keyGroupID < 
maxParallelism.
-        * @return The index of the operator to which elements from the given 
key-group should be routed under the given
-        * parallelism and maxParallelism.
-        */
-       public static final int computeOperatorIndexForKeyGroup(int 
maxParallelism, int parallelism, int keyGroupId) {
-               return keyGroupId * parallelism / maxParallelism;
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
new file mode 100644
index 0000000..eceb6f4
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.Preconditions;
+
+public final class KeyGroupRangeAssignment {
+
+       public static final int DEFAULT_MAX_PARALLELISM = 128;
+
+       private KeyGroupRangeAssignment() {
+               throw new AssertionError();
+       }
+
+       /**
+        * Assigns the given key to a parallel operator index.
+        *
+        * @param key the key to assign
+        * @param maxParallelism the maximum supported parallelism, aka the 
number of key-groups.
+        * @param parallelism the current parallelism of the operator
+        * @return the index of the parallel operator to which the given key 
should be routed.
+        */
+       public static int assignKeyToParallelOperator(Object key, int 
maxParallelism, int parallelism) {
+               return computeOperatorIndexForKeyGroup(maxParallelism, 
parallelism, assignToKeyGroup(key, maxParallelism));
+       }
+
+       /**
+        * Assigns the given key to a key-group index.
+        *
+        * @param key the key to assign
+        * @param maxParallelism the maximum supported parallelism, aka the 
number of key-groups.
+        * @return the key-group to which the given key is assigned
+        */
+       public static final int assignToKeyGroup(Object key, int 
maxParallelism) {
+               return MathUtils.murmurHash(key.hashCode()) % maxParallelism;
+       }
+
+       /**
+        * Computes the range of key-groups that are assigned to a given 
operator under the given parallelism and maximum
+        * parallelism.
+        *
+        * IMPORTANT: maxParallelism must be <= Short.MAX_VALUE to avoid 
rounding problems in this method. If we ever want
+        * to go beyond this boundary, this method must perform arithmetic on 
long values.
+        *
+        * @param maxParallelism Maximal parallelism that the job was initially 
created with.
+        * @param parallelism    The current parallelism under which the job 
runs. Must be <= maxParallelism.
+        * @param operatorIndex  Id of a key-group. 0 <= keyGroupID < 
maxParallelism.
+        * @return
+        */
+       public static KeyGroupRange computeKeyGroupRangeForOperatorIndex(
+                       int maxParallelism,
+                       int parallelism,
+                       int operatorIndex) {
+               Preconditions.checkArgument(parallelism > 0, "Parallelism must 
not be smaller than zero.");
+               Preconditions.checkArgument(maxParallelism >= parallelism, 
"Maximum parallelism must not be smaller than parallelism.");
+               Preconditions.checkArgument(maxParallelism <= (1 << 15), 
"Maximum parallelism must be smaller than 2^15.");
+
+               int start = operatorIndex == 0 ? 0 : ((operatorIndex * 
maxParallelism - 1) / parallelism) + 1;
+               int end = ((operatorIndex + 1) * maxParallelism - 1) / 
parallelism;
+               return new KeyGroupRange(start, end);
+       }
+
+       /**
+        * Computes the index of the operator to which a key-group belongs 
under the given parallelism and maximum
+        * parallelism.
+        *
+        * IMPORTANT: maxParallelism must be <= Short.MAX_VALUE to avoid 
rounding problems in this method. If we ever want
+        * to go beyond this boundary, this method must perform arithmetic on 
long values.
+        *
+        * @param maxParallelism Maximal parallelism that the job was initially 
created with.
+        *                       0 < parallelism <= maxParallelism <= 
Short.MAX_VALUE must hold.
+        * @param parallelism    The current parallelism under which the job 
runs. Must be <= maxParallelism.
+        * @param keyGroupId     Id of a key-group. 0 <= keyGroupID < 
maxParallelism.
+        * @return The index of the operator to which elements from the given 
key-group should be routed under the given
+        * parallelism and maxParallelism.
+        */
+       public static int computeOperatorIndexForKeyGroup(int maxParallelism, 
int parallelism, int keyGroupId) {
+               return keyGroupId * parallelism / maxParallelism;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
index 2d1d25c..bf9018e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.state.FoldingState;
 import org.apache.flink.api.common.state.FoldingStateDescriptor;
-import org.apache.flink.api.common.state.KeyGroupAssigner;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.MergingState;
@@ -69,8 +68,8 @@ public abstract class KeyedStateBackend<K> {
        @SuppressWarnings("rawtypes")
        private KvState lastState;
 
-       /** KeyGroupAssigner which determines the key group for each keys */
-       protected final KeyGroupAssigner<K> keyGroupAssigner;
+       /** The number of key-groups aka max parallelism */
+       protected final int numberOfKeyGroups;
 
        /** Range of key-groups for which this backend is responsible */
        protected final KeyGroupRange keyGroupRange;
@@ -81,12 +80,12 @@ public abstract class KeyedStateBackend<K> {
        public KeyedStateBackend(
                        TaskKvStateRegistry kvStateRegistry,
                        TypeSerializer<K> keySerializer,
-                       KeyGroupAssigner<K> keyGroupAssigner,
+                       int numberOfKeyGroups,
                        KeyGroupRange keyGroupRange) {
 
                this.kvStateRegistry = 
Preconditions.checkNotNull(kvStateRegistry);
                this.keySerializer = Preconditions.checkNotNull(keySerializer);
-               this.keyGroupAssigner = 
Preconditions.checkNotNull(keyGroupAssigner);
+               this.numberOfKeyGroups = 
Preconditions.checkNotNull(numberOfKeyGroups);
                this.keyGroupRange = Preconditions.checkNotNull(keyGroupRange);
        }
 
@@ -157,7 +156,7 @@ public abstract class KeyedStateBackend<K> {
         */
        public void setCurrentKey(K newKey) {
                this.currentKey = newKey;
-               this.currentKeyGroup = 
keyGroupAssigner.getKeyGroupIndex(newKey);
+               this.currentKeyGroup = 
KeyGroupRangeAssignment.assignToKeyGroup(newKey, numberOfKeyGroups);
        }
 
        /**
@@ -179,11 +178,7 @@ public abstract class KeyedStateBackend<K> {
        }
 
        public int getNumberOfKeyGroups() {
-               return keyGroupAssigner.getNumberKeyGroups();
-       }
-
-       public KeyGroupAssigner<K> getKeyGroupAssigner() {
-               return keyGroupAssigner;
+               return numberOfKeyGroups;
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
index 5495244..6d92a4d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.state.filesystem;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.state.KeyGroupAssigner;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
@@ -181,13 +180,13 @@ public class FsStateBackend extends AbstractStateBackend {
                        JobID jobID,
                        String operatorIdentifier,
                        TypeSerializer<K> keySerializer,
-                       KeyGroupAssigner<K> keyGroupAssigner,
+                       int numberOfKeyGroups,
                        KeyGroupRange keyGroupRange,
                        TaskKvStateRegistry kvStateRegistry) throws Exception {
                return new HeapKeyedStateBackend<>(
                                kvStateRegistry,
                                keySerializer,
-                               keyGroupAssigner,
+                               numberOfKeyGroups,
                                keyGroupRange);
        }
 
@@ -197,14 +196,14 @@ public class FsStateBackend extends AbstractStateBackend {
                        JobID jobID,
                        String operatorIdentifier,
                        TypeSerializer<K> keySerializer,
-                       KeyGroupAssigner<K> keyGroupAssigner,
+                       int numberOfKeyGroups,
                        KeyGroupRange keyGroupRange,
                        List<KeyGroupsStateHandle> restoredState,
                        TaskKvStateRegistry kvStateRegistry) throws Exception {
                return new HeapKeyedStateBackend<>(
                                kvStateRegistry,
                                keySerializer,
-                               keyGroupAssigner,
+                               numberOfKeyGroups,
                                keyGroupRange,
                                restoredState);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
index 9863c93..18d1bc7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
@@ -24,9 +24,9 @@ import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.runtime.state.KvState;
-import org.apache.flink.runtime.state.heap.StateTable;
 import org.apache.flink.util.Preconditions;
 
 import java.util.HashMap;
@@ -138,7 +138,7 @@ public abstract class AbstractHeapState<K, N, SV, S extends 
State, SD extends St
                Preconditions.checkState(key != null, "No key given.");
 
                Map<N, Map<K, SV>> namespaceMap =
-                               
stateTable.get(backend.getKeyGroupAssigner().getKeyGroupIndex(key));
+                               
stateTable.get(KeyGroupRangeAssignment.assignToKeyGroup(key, 
backend.getNumberOfKeyGroups()));
 
                if (namespaceMap == null) {
                        return null;

http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index fcb4bef..8d13941 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.state.heap;
 
 import org.apache.flink.api.common.state.FoldingState;
 import org.apache.flink.api.common.state.FoldingStateDescriptor;
-import org.apache.flink.api.common.state.KeyGroupAssigner;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ReducingState;
@@ -76,20 +75,20 @@ public class HeapKeyedStateBackend<K> extends 
KeyedStateBackend<K> {
        public HeapKeyedStateBackend(
                        TaskKvStateRegistry kvStateRegistry,
                        TypeSerializer<K> keySerializer,
-                       KeyGroupAssigner<K> keyGroupAssigner,
+                       int numberOfKeyGroups,
                        KeyGroupRange keyGroupRange) {
 
-               super(kvStateRegistry, keySerializer, keyGroupAssigner, 
keyGroupRange);
+               super(kvStateRegistry, keySerializer, numberOfKeyGroups, 
keyGroupRange);
 
                LOG.info("Initializing heap keyed state backend with stream 
factory.");
        }
 
        public HeapKeyedStateBackend(TaskKvStateRegistry kvStateRegistry,
                        TypeSerializer<K> keySerializer,
-                       KeyGroupAssigner<K> keyGroupAssigner,
+                       int numberOfKeyGroups,
                        KeyGroupRange keyGroupRange,
                        List<KeyGroupsStateHandle> restoredState) throws 
Exception {
-               super(kvStateRegistry, keySerializer, keyGroupAssigner, 
keyGroupRange);
+               super(kvStateRegistry, keySerializer, numberOfKeyGroups, 
keyGroupRange);
 
                LOG.info("Initializing heap keyed state backend from 
snapshot.");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
index 4c65c25..9552325 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.util.Preconditions;
 
@@ -119,7 +120,7 @@ public class HeapListState<K, N, V>
                Preconditions.checkState(key != null, "No key given.");
 
                Map<N, Map<K, ArrayList<V>>> namespaceMap =
-                               
stateTable.get(backend.getKeyGroupAssigner().getKeyGroupIndex(key));
+                               
stateTable.get(KeyGroupRangeAssignment.assignToKeyGroup(key, 
backend.getNumberOfKeyGroups()));
 
                if (namespaceMap == null) {
                        return null;

http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
index 654c367..179dfe7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.state.memory;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.state.KeyGroupAssigner;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
@@ -80,14 +79,14 @@ public class MemoryStateBackend extends 
AbstractStateBackend {
        public <K> KeyedStateBackend<K> createKeyedStateBackend(
                        Environment env, JobID jobID,
                        String operatorIdentifier, TypeSerializer<K> 
keySerializer,
-                       KeyGroupAssigner<K> keyGroupAssigner,
+                       int numberOfKeyGroups,
                        KeyGroupRange keyGroupRange,
                        TaskKvStateRegistry kvStateRegistry) throws IOException 
{
 
                return new HeapKeyedStateBackend<>(
                                kvStateRegistry,
                                keySerializer,
-                               keyGroupAssigner,
+                               numberOfKeyGroups,
                                keyGroupRange);
        }
 
@@ -96,7 +95,7 @@ public class MemoryStateBackend extends AbstractStateBackend {
                        Environment env, JobID jobID,
                        String operatorIdentifier,
                        TypeSerializer<K> keySerializer,
-                       KeyGroupAssigner<K> keyGroupAssigner,
+                       int numberOfKeyGroups,
                        KeyGroupRange keyGroupRange,
                        List<KeyGroupsStateHandle> restoredState,
                        TaskKvStateRegistry kvStateRegistry) throws Exception {
@@ -104,7 +103,7 @@ public class MemoryStateBackend extends 
AbstractStateBackend {
                return new HeapKeyedStateBackend<>(
                                kvStateRegistry,
                                keySerializer,
-                               keyGroupAssigner,
+                               numberOfKeyGroups,
                                keyGroupRange,
                                restoredState);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 495dced..4972c51 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -35,6 +35,7 @@ import 
org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
 import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
 import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
@@ -2458,7 +2459,7 @@ public class CheckpointCoordinatorTest {
        private void testCreateKeyGroupPartitions(int maxParallelism, int 
parallelism) {
                List<KeyGroupRange> ranges = 
CheckpointCoordinator.createKeyGroupPartitions(maxParallelism, parallelism);
                for (int i = 0; i < maxParallelism; ++i) {
-                       KeyGroupRange range = 
ranges.get(KeyGroupRange.computeOperatorIndexForKeyGroup(maxParallelism, 
parallelism, i));
+                       KeyGroupRange range = 
ranges.get(KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(maxParallelism,
 parallelism, i));
                        if (!range.contains(i)) {
                                Assert.fail("Could not find expected key-group 
" + i + " in range " + range);
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
index 3380907..405f962 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
@@ -32,7 +32,6 @@ import org.apache.flink.runtime.query.netty.KvStateClient;
 import org.apache.flink.runtime.query.netty.KvStateServer;
 import org.apache.flink.runtime.query.netty.UnknownKvStateID;
 import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
-import org.apache.flink.runtime.state.HashKeyGroupAssigner;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.runtime.state.VoidNamespace;
@@ -232,6 +231,7 @@ public class QueryableStateClientTest {
                // Config
                int numServers = 2;
                int numKeys = 1024;
+               int numKeyGroups = 1;
 
                JobID jobId = new JobID();
                JobVertexID jobVertexId = new JobVertexID();
@@ -250,7 +250,7 @@ public class QueryableStateClientTest {
                                new JobID(),
                                "test_op",
                                IntSerializer.INSTANCE,
-                               new HashKeyGroupAssigner<Integer>(1),
+                               numKeyGroups,
                                new KeyGroupRange(0, 0),
                                new KvStateRegistry().createTaskRegistry(new 
JobID(), new JobVertexID()));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java
index 796481c..f785174 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java
@@ -42,7 +42,6 @@ import 
org.apache.flink.runtime.query.netty.message.KvStateRequest;
 import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
 import org.apache.flink.runtime.query.netty.message.KvStateRequestType;
 import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.HashKeyGroupAssigner;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.runtime.state.KvState;
@@ -533,6 +532,8 @@ public class KvStateClientTest {
 
                final int batchSize = 16;
 
+               final int numKeyGroups = 1;
+
                AbstractStateBackend abstractBackend = new MemoryStateBackend();
                KvStateRegistry dummyRegistry = new KvStateRegistry();
                DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
@@ -542,7 +543,7 @@ public class KvStateClientTest {
                                new JobID(),
                                "test_op",
                                IntSerializer.INSTANCE,
-                               new HashKeyGroupAssigner<Integer>(1),
+                               numKeyGroups,
                                new KeyGroupRange(0, 0),
                                dummyRegistry.createTaskRegistry(new JobID(), 
new JobVertexID()));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
index 3d2e8b5..52c807f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
@@ -39,7 +39,6 @@ import 
org.apache.flink.runtime.query.netty.message.KvStateRequestResult;
 import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
 import org.apache.flink.runtime.query.netty.message.KvStateRequestType;
 import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.HashKeyGroupAssigner;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.runtime.state.KvState;
@@ -89,6 +88,7 @@ public class KvStateServerHandlerTest {
                ValueStateDescriptor<Integer> desc = new 
ValueStateDescriptor<>("any", IntSerializer.INSTANCE, null);
                desc.setQueryable("vanilla");
 
+               int numKeyGroups =1;
                AbstractStateBackend abstractBackend = new MemoryStateBackend();
                DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
                dummyEnv.setKvStateRegistry(registry);
@@ -97,7 +97,7 @@ public class KvStateServerHandlerTest {
                                new JobID(),
                                "test_op",
                                IntSerializer.INSTANCE,
-                               new HashKeyGroupAssigner<Integer>(1),
+                               numKeyGroups,
                                new KeyGroupRange(0, 0),
                                
registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId()));
 
@@ -200,6 +200,7 @@ public class KvStateServerHandlerTest {
                KvStateServerHandler handler = new 
KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
                EmbeddedChannel channel = new 
EmbeddedChannel(getFrameDecoder(), handler);
 
+               int numKeyGroups = 1;
                AbstractStateBackend abstractBackend = new MemoryStateBackend();
                DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
                dummyEnv.setKvStateRegistry(registry);
@@ -208,7 +209,7 @@ public class KvStateServerHandlerTest {
                                new JobID(),
                                "test_op",
                                IntSerializer.INSTANCE,
-                               new HashKeyGroupAssigner<Integer>(1),
+                               numKeyGroups,
                                new KeyGroupRange(0, 0),
                                
registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId()));
 
@@ -346,6 +347,7 @@ public class KvStateServerHandlerTest {
                KvStateServerHandler handler = new 
KvStateServerHandler(registry, closedExecutor, stats);
                EmbeddedChannel channel = new 
EmbeddedChannel(getFrameDecoder(), handler);
 
+               int numKeyGroups = 1;
                AbstractStateBackend abstractBackend = new MemoryStateBackend();
                DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
                dummyEnv.setKvStateRegistry(registry);
@@ -354,7 +356,7 @@ public class KvStateServerHandlerTest {
                                new JobID(),
                                "test_op",
                                IntSerializer.INSTANCE,
-                               new HashKeyGroupAssigner<Integer>(1),
+                               numKeyGroups,
                                new KeyGroupRange(0, 0),
                                
registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId()));
 
@@ -484,6 +486,7 @@ public class KvStateServerHandlerTest {
                KvStateServerHandler handler = new 
KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
                EmbeddedChannel channel = new 
EmbeddedChannel(getFrameDecoder(), handler);
 
+               int numKeyGroups = 1;
                AbstractStateBackend abstractBackend = new MemoryStateBackend();
                DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
                dummyEnv.setKvStateRegistry(registry);
@@ -492,7 +495,7 @@ public class KvStateServerHandlerTest {
                                new JobID(),
                                "test_op",
                                IntSerializer.INSTANCE,
-                               new HashKeyGroupAssigner<Integer>(1),
+                               numKeyGroups,
                                new KeyGroupRange(0, 0),
                                
registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId()));
 
@@ -579,6 +582,7 @@ public class KvStateServerHandlerTest {
                KvStateServerHandler handler = new 
KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
                EmbeddedChannel channel = new 
EmbeddedChannel(getFrameDecoder(), handler);
 
+               int numKeyGroups = 1;
                AbstractStateBackend abstractBackend = new MemoryStateBackend();
                DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
                dummyEnv.setKvStateRegistry(registry);
@@ -587,7 +591,7 @@ public class KvStateServerHandlerTest {
                                new JobID(),
                                "test_op",
                                IntSerializer.INSTANCE,
-                               new HashKeyGroupAssigner<Integer>(1),
+                               numKeyGroups,
                                new KeyGroupRange(0, 0),
                                
registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId()));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerTest.java
index 30d91b6..e92fb10 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerTest.java
@@ -42,7 +42,6 @@ import 
org.apache.flink.runtime.query.netty.message.KvStateRequestResult;
 import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
 import org.apache.flink.runtime.query.netty.message.KvStateRequestType;
 import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.HashKeyGroupAssigner;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.runtime.state.VoidNamespace;
@@ -80,7 +79,6 @@ public class KvStateServerTest {
        public void testSimpleRequest() throws Exception {
                KvStateServer server = null;
                Bootstrap bootstrap = null;
-
                try {
                        KvStateRegistry registry = new KvStateRegistry();
                        KvStateRequestStats stats = new 
AtomicKvStateRequestStats();
@@ -89,7 +87,7 @@ public class KvStateServerTest {
                        server.start();
 
                        KvStateServerAddress serverAddress = 
server.getAddress();
-
+                       int numKeyGroups = 1;
                        AbstractStateBackend abstractBackend = new 
MemoryStateBackend();
                        DummyEnvironment dummyEnv = new 
DummyEnvironment("test", 1, 0);
                        dummyEnv.setKvStateRegistry(registry);
@@ -98,7 +96,7 @@ public class KvStateServerTest {
                                        new JobID(),
                                        "test_op",
                                        IntSerializer.INSTANCE,
-                                       new HashKeyGroupAssigner<Integer>(1),
+                                       numKeyGroups,
                                        new KeyGroupRange(0, 0),
                                        registry.createTaskRegistry(new 
JobID(), new JobVertexID()));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index f094bd5..5984aca 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -26,7 +26,6 @@ import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.state.FoldingState;
 import org.apache.flink.api.common.state.FoldingStateDescriptor;
-import org.apache.flink.api.common.state.KeyGroupAssigner;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ReducingState;
@@ -41,7 +40,6 @@ import 
org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.query.KvStateID;
 import org.apache.flink.runtime.query.KvStateRegistry;
@@ -56,7 +54,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Future;
 import java.util.concurrent.RunnableFuture;
 
 import static org.junit.Assert.assertEquals;
@@ -90,14 +87,14 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> {
        protected <K> KeyedStateBackend<K> createKeyedBackend(TypeSerializer<K> 
keySerializer, Environment env) throws Exception {
                return createKeyedBackend(
                                keySerializer,
-                               new HashKeyGroupAssigner<K>(10),
+                               10,
                                new KeyGroupRange(0, 9),
                                env);
        }
 
        protected <K> KeyedStateBackend<K> createKeyedBackend(
                        TypeSerializer<K> keySerializer,
-                       KeyGroupAssigner<K> keyGroupAssigner,
+                       int numberOfKeyGroups,
                        KeyGroupRange keyGroupRange,
                        Environment env) throws Exception {
                return getStateBackend().createKeyedStateBackend(
@@ -105,7 +102,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> {
                                new JobID(),
                                "test_op",
                                keySerializer,
-                               keyGroupAssigner,
+                               numberOfKeyGroups,
                                keyGroupRange,
                                env.getTaskKvStateRegistry());
        }
@@ -120,7 +117,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> {
                        Environment env) throws Exception {
                return restoreKeyedBackend(
                                keySerializer,
-                               new HashKeyGroupAssigner<K>(10),
+                               10,
                                new KeyGroupRange(0, 9),
                                Collections.singletonList(state),
                                env);
@@ -128,7 +125,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> {
 
        protected <K> KeyedStateBackend<K> restoreKeyedBackend(
                        TypeSerializer<K> keySerializer,
-                       KeyGroupAssigner<K> keyGroupAssigner,
+                       int numberOfKeyGroups,
                        KeyGroupRange keyGroupRange,
                        List<KeyGroupsStateHandle> state,
                        Environment env) throws Exception {
@@ -137,7 +134,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> {
                                new JobID(),
                                "test_op",
                                keySerializer,
-                               keyGroupAssigner,
+                               numberOfKeyGroups,
                                keyGroupRange,
                                state,
                                env.getTaskKvStateRegistry());
@@ -243,7 +240,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> {
 
                KeyedStateBackend<Integer> backend = createKeyedBackend(
                                IntSerializer.INSTANCE,
-                               new HashKeyGroupAssigner<Integer>(1),
+                               1,
                                new KeyGroupRange(0, 0),
                                new DummyEnvironment("test_op", 1, 0));
 
@@ -277,7 +274,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> {
                backend.close();
                backend = restoreKeyedBackend(
                                IntSerializer.INSTANCE,
-                               new HashKeyGroupAssigner<Integer>(1),
+                               1,
                                new KeyGroupRange(0, 0),
                                Collections.singletonList(snapshot1),
                                new DummyEnvironment("test_op", 1, 0));
@@ -675,12 +672,9 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> {
                final int MAX_PARALLELISM = 10;
 
                CheckpointStreamFactory streamFactory = createStreamFactory();
-
-               HashKeyGroupAssigner<Integer> keyGroupAssigner = new 
HashKeyGroupAssigner<>(10);
-
                KeyedStateBackend<Integer> backend = createKeyedBackend(
                                IntSerializer.INSTANCE,
-                               keyGroupAssigner,
+                               MAX_PARALLELISM,
                                new KeyGroupRange(0, MAX_PARALLELISM - 1),
                                new DummyEnvironment("test", 1, 0));
 
@@ -695,12 +689,12 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> {
                Random rand = new Random(0);
 
                // for each key, determine into which half of the key-group 
space they fall
-               int firstKeyHalf = 
keyGroupAssigner.getKeyGroupIndex(keyInFirstHalf) * 2 / MAX_PARALLELISM;
-               int secondKeyHalf = 
keyGroupAssigner.getKeyGroupIndex(keyInSecondHalf) * 2 / MAX_PARALLELISM;
+               int firstKeyHalf = 
KeyGroupRangeAssignment.assignKeyToParallelOperator(keyInFirstHalf, 
MAX_PARALLELISM, 2);
+               int secondKeyHalf = 
KeyGroupRangeAssignment.assignKeyToParallelOperator(keyInFirstHalf, 
MAX_PARALLELISM, 2);
 
                while (firstKeyHalf == secondKeyHalf) {
                        keyInSecondHalf = rand.nextInt();
-                       secondKeyHalf = 
keyGroupAssigner.getKeyGroupIndex(keyInSecondHalf) * 2 / MAX_PARALLELISM;
+                       secondKeyHalf = 
KeyGroupRangeAssignment.assignKeyToParallelOperator(keyInSecondHalf, 
MAX_PARALLELISM, 2);
                }
 
                backend.setCurrentKey(keyInFirstHalf);
@@ -714,18 +708,18 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> {
 
                List<KeyGroupsStateHandle> firstHalfKeyGroupStates = 
CheckpointCoordinator.getKeyGroupsStateHandles(
                                Collections.singletonList(snapshot),
-                               new KeyGroupRange(0, 4));
+                               
KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(MAX_PARALLELISM, 
2, 0));
 
                List<KeyGroupsStateHandle> secondHalfKeyGroupStates = 
CheckpointCoordinator.getKeyGroupsStateHandles(
                                Collections.singletonList(snapshot),
-                               new KeyGroupRange(5, 9));
+                               
KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(MAX_PARALLELISM, 
2, 1));
 
                backend.close();
 
                // backend for the first half of the key group range
                KeyedStateBackend<Integer> firstHalfBackend = 
restoreKeyedBackend(
                                IntSerializer.INSTANCE,
-                               keyGroupAssigner,
+                               MAX_PARALLELISM,
                                new KeyGroupRange(0, 4),
                                firstHalfKeyGroupStates,
                                new DummyEnvironment("test", 1, 0));
@@ -733,7 +727,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> {
                // backend for the second half of the key group range
                KeyedStateBackend<Integer> secondHalfBackend = 
restoreKeyedBackend(
                                IntSerializer.INSTANCE,
-                               keyGroupAssigner,
+                               MAX_PARALLELISM,
                                new KeyGroupRange(5, 9),
                                secondHalfKeyGroupStates,
                                new DummyEnvironment("test", 1, 0));
@@ -978,9 +972,10 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> {
         */
        @SuppressWarnings("unchecked")
        protected void testConcurrentMapIfQueryable() throws Exception {
+               final int numberOfKeyGroups = 1;
                KeyedStateBackend<Integer> backend = createKeyedBackend(
                                IntSerializer.INSTANCE,
-                               new HashKeyGroupAssigner<Integer>(1),
+                               numberOfKeyGroups,
                                new KeyGroupRange(0, 0),
                                new DummyEnvironment("test_op", 1, 0));
 
@@ -1005,7 +1000,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> {
                        backend.setCurrentKey(1);
                        state.update(121818273);
 
-                       int keyGroupIndex = new 
HashKeyGroupAssigner<>(1).getKeyGroupIndex(1);
+                       int keyGroupIndex = 
KeyGroupRangeAssignment.assignToKeyGroup(1, numberOfKeyGroups);
                        StateTable stateTable = ((AbstractHeapState) 
kvState).getStateTable();
                        assertNotNull("State not set", 
stateTable.get(keyGroupIndex));
                        assertTrue(stateTable.get(keyGroupIndex) instanceof 
ConcurrentHashMap);
@@ -1031,7 +1026,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> {
                        backend.setCurrentKey(1);
                        state.add(121818273);
 
-                       int keyGroupIndex = new 
HashKeyGroupAssigner<>(1).getKeyGroupIndex(1);
+                       int keyGroupIndex = 
KeyGroupRangeAssignment.assignToKeyGroup(1, numberOfKeyGroups);
                        StateTable stateTable = ((AbstractHeapState) 
kvState).getStateTable();
                        assertNotNull("State not set", 
stateTable.get(keyGroupIndex));
                        assertTrue(stateTable.get(keyGroupIndex) instanceof 
ConcurrentHashMap);
@@ -1062,7 +1057,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> {
                        backend.setCurrentKey(1);
                        state.add(121818273);
 
-                       int keyGroupIndex = new 
HashKeyGroupAssigner<>(1).getKeyGroupIndex(1);
+                       int keyGroupIndex = 
KeyGroupRangeAssignment.assignToKeyGroup(1, numberOfKeyGroups);
                        StateTable stateTable = ((AbstractHeapState) 
kvState).getStateTable();
                        assertNotNull("State not set", 
stateTable.get(keyGroupIndex));
                        assertTrue(stateTable.get(keyGroupIndex) instanceof 
ConcurrentHashMap);
@@ -1093,7 +1088,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> {
                        backend.setCurrentKey(1);
                        state.add(121818273);
 
-                       int keyGroupIndex = new 
HashKeyGroupAssigner<>(1).getKeyGroupIndex(1);
+                       int keyGroupIndex = 
KeyGroupRangeAssignment.assignToKeyGroup(1, numberOfKeyGroups);
                        StateTable stateTable = ((AbstractHeapState) 
kvState).getStateTable();
                        assertNotNull("State not set", 
stateTable.get(keyGroupIndex));
                        assertTrue(stateTable.get(keyGroupIndex) instanceof 
ConcurrentHashMap);

http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index 1fb34b8..af907e3 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -30,7 +30,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.runtime.state.HashKeyGroupAssigner;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import 
org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
 import 
org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
@@ -111,8 +111,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
                        new PartitionTransformation<>(
                                dataStream.getTransformation(),
                                new KeyGroupStreamPartitioner<>(
-                                       keySelector,
-                                       new HashKeyGroupAssigner<KEY>())));
+                                       keySelector, 
KeyGroupRangeAssignment.DEFAULT_MAX_PARALLELISM)));
                this.keySelector = keySelector;
                this.keyType = keyType;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index 1a92ba4..8e807db 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -26,7 +26,6 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.state.KeyGroupAssigner;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.Configuration;
@@ -76,8 +75,7 @@ public class StreamConfig implements Serializable {
        private static final String STATE_BACKEND = "statebackend";
        private static final String STATE_PARTITIONER = "statePartitioner";
 
-       /** key for the {@link KeyGroupAssigner} for key to key group index 
mappings */
-       private static final String KEY_GROUP_ASSIGNER = "keyGroupAssigner";
+       private static final String NUMBER_OF_KEY_GROUPS = "numberOfKeyGroups";
 
        private static final String STATE_KEY_SERIALIZER = "statekeyser";
        
@@ -445,28 +443,26 @@ public class StreamConfig implements Serializable {
        }
 
        /**
-        * Sets the {@link KeyGroupAssigner} to be used for the current {@link 
StreamOperator}.
+        * Sets the number of key-groups to be used for the current {@link 
StreamOperator}.
         *
-        * @param keyGroupAssigner Key group assigner to be used
+        * @param numberOfKeyGroups Number of key-groups to be used
         */
-       public void setKeyGroupAssigner(KeyGroupAssigner<?> keyGroupAssigner) {
+       public void setNumberOfKeyGroups(int numberOfKeyGroups) {
                try {
-                       InstantiationUtil.writeObjectToConfig(keyGroupAssigner, 
this.config, KEY_GROUP_ASSIGNER);
+                       
InstantiationUtil.writeObjectToConfig(numberOfKeyGroups, this.config, 
NUMBER_OF_KEY_GROUPS);
                } catch (Exception e) {
                        throw new StreamTaskException("Could not serialize 
virtual state partitioner.", e);
                }
        }
 
        /**
-        * Gets the {@link KeyGroupAssigner} for the {@link StreamOperator}.
+        * Gets the number of key-groups for the {@link StreamOperator}.
         *
-        * @param classLoader Classloader to be used for the deserialization
-        * @param <K> Type of the keys to be assigned to key groups
-        * @return Key group assigner
+        * @return the number of key-groups
         */
-       public <K> KeyGroupAssigner<K> getKeyGroupAssigner(ClassLoader 
classLoader) {
+       public Integer getNumberOfKeyGroups(ClassLoader cl) {
                try {
-                       return 
InstantiationUtil.readObjectFromConfig(this.config, KEY_GROUP_ASSIGNER, 
classLoader);
+                       return 
InstantiationUtil.readObjectFromConfig(this.config, NUMBER_OF_KEY_GROUPS, cl);
                } catch (Exception e) {
                        throw new StreamTaskException("Could not instantiate 
virtual state partitioner.", e);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index f9f26e9..506b664 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.graph;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import 
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction;
 import org.apache.flink.streaming.api.transformations.CoFeedbackTransformation;
@@ -33,6 +34,7 @@ import 
org.apache.flink.streaming.api.transformations.SplitTransformation;
 import org.apache.flink.streaming.api.transformations.StreamTransformation;
 import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
 import org.apache.flink.streaming.api.transformations.UnionTransformation;
+import org.apache.flink.util.MathUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -145,24 +147,24 @@ public class StreamGraphGenerator {
                LOG.debug("Transforming " + transform);
 
                if (transform.getMaxParallelism() <= 0) {
+
                        // if the max parallelism hasn't been set, then first 
use the job wide max parallelism
                        // from theExecutionConfig. If this value has not been 
specified either, then use the
                        // parallelism of the operator.
                        int maxParallelism = 
env.getConfig().getMaxParallelism();
 
                        if (maxParallelism <= 0) {
-                               maxParallelism = transform.getParallelism();
-                               /**
-                                * TODO: Remove once the parallelism settings 
works properly in Flink (FLINK-3885)
-                                * Currently, the parallelism will be set to 1 
on the JobManager iff it encounters
-                                * a negative parallelism value. We need to 
know this for the
-                                * KeyGroupStreamPartitioner on the 
client-side. Thus, we already set the value to
-                                * 1 here.
-                                */
-                               if (maxParallelism <= 0) {
-                                       transform.setParallelism(1);
-                                       maxParallelism = 1;
+
+                               int parallelism = transform.getParallelism();
+
+                               if(parallelism <= 0) {
+                                       parallelism = 1;
+                                       transform.setParallelism(parallelism);
                                }
+
+                               maxParallelism = Math.max(
+                                               
MathUtils.roundUpToPowerOfTwo(parallelism + (parallelism / 2)),
+                                               
KeyGroupRangeAssignment.DEFAULT_MAX_PARALLELISM);
                        }
 
                        transform.setMaxParallelism(maxParallelism);

http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 76fdaca..a024895 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -26,7 +26,6 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.api.common.state.KeyGroupAssigner;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
@@ -41,7 +40,6 @@ import 
org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.operators.util.TaskConfig;
-import org.apache.flink.runtime.state.HashKeyGroupAssigner;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.environment.CheckpointConfig;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
@@ -357,12 +355,9 @@ public class StreamingJobGraphGenerator {
                config.setStatePartitioner(1, vertex.getStatePartitioner2());
                config.setStateKeySerializer(vertex.getStateKeySerializer());
 
-               // only set the key group assigner if the vertex uses 
partitioned state (= KeyedStream).
+               // only set the max parallelism if the vertex uses partitioned 
state (= KeyedStream).
                if (vertex.getStatePartitioner1() != null) {
-                       // the key group assigner has to know the number of key 
groups (= maxParallelism)
-                       KeyGroupAssigner<Object> keyGroupAssigner = new 
HashKeyGroupAssigner<Object>(vertex.getMaxParallelism());
-
-                       config.setKeyGroupAssigner(keyGroupAssigner);
+                       config.setNumberOfKeyGroups(vertex.getMaxParallelism());
                }
 
                Class<? extends AbstractInvokable> vertexClass = 
vertex.getJobVertexClass();

http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 718c0c7..71296e3 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -29,6 +29,7 @@ import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
@@ -133,14 +134,14 @@ public abstract class AbstractStreamOperator<OUT>
                        if (null != keySerializer) {
                                ExecutionConfig execConf = 
container.getEnvironment().getExecutionConfig();;
 
-                               KeyGroupRange subTaskKeyGroupRange = 
KeyGroupRange.computeKeyGroupRangeForOperatorIndex(
+                               KeyGroupRange subTaskKeyGroupRange = 
KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(
                                                
container.getEnvironment().getTaskInfo().getNumberOfKeyGroups(),
                                                
container.getEnvironment().getTaskInfo().getNumberOfParallelSubtasks(),
                                                
container.getIndexInSubtaskGroup());
 
                                keyedStateBackend = 
container.createKeyedStateBackend(
                                                keySerializer,
-                                               
container.getConfiguration().getKeyGroupAssigner(getUserCodeClassloader()),
+                                               
container.getConfiguration().getNumberOfKeyGroups(getUserCodeClassloader()),
                                                subTaskKeyGroupRange);
                        }
                } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java
index 108a3ae..256fee1 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java
@@ -18,16 +18,14 @@
 package org.apache.flink.streaming.runtime.partitioner;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.state.KeyGroupAssigner;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.Preconditions;
 
 /**
- * Partitioner selects the target channel based on the key group index. The 
key group
- * index is derived from the key of the elements using the {@link 
KeyGroupAssigner}.
+ * Partitioner selects the target channel based on the key group index.
  *
  * @param <T> Type of the elements in the Stream being partitioned
  */
@@ -39,15 +37,16 @@ public class KeyGroupStreamPartitioner<T, K> extends 
StreamPartitioner<T> implem
 
        private final KeySelector<T, K> keySelector;
 
-       private final KeyGroupAssigner<K> keyGroupAssigner;
+       private int maxParallelism;
 
-       public KeyGroupStreamPartitioner(KeySelector<T, K> keySelector, 
KeyGroupAssigner<K> keyGroupAssigner) {
+       public KeyGroupStreamPartitioner(KeySelector<T, K> keySelector, int 
maxParallelism) {
+               Preconditions.checkArgument(maxParallelism > 0, "Number of 
key-groups must be > 0!");
                this.keySelector = Preconditions.checkNotNull(keySelector);
-               this.keyGroupAssigner = 
Preconditions.checkNotNull(keyGroupAssigner);
+               this.maxParallelism = maxParallelism;
        }
 
-       public KeyGroupAssigner<K> getKeyGroupAssigner() {
-               return keyGroupAssigner;
+       public int getMaxParallelism() {
+               return maxParallelism;
        }
 
        @Override
@@ -61,10 +60,7 @@ public class KeyGroupStreamPartitioner<T, K> extends 
StreamPartitioner<T> implem
                } catch (Exception e) {
                        throw new RuntimeException("Could not extract key from 
" + record.getInstance().getValue(), e);
                }
-               returnArray[0] = KeyGroupRange.computeOperatorIndexForKeyGroup(
-                               keyGroupAssigner.getNumberKeyGroups(),
-                               numberOfOutputChannels,
-                               keyGroupAssigner.getKeyGroupIndex(key));
+               returnArray[0] = 
KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, 
numberOfOutputChannels);
                return returnArray;
        }
 
@@ -80,6 +76,6 @@ public class KeyGroupStreamPartitioner<T, K> extends 
StreamPartitioner<T> implem
 
        @Override
        public void configure(int maxParallelism) {
-               keyGroupAssigner.setup(maxParallelism);
+               this.maxParallelism = maxParallelism;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 13f650c..701281b 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -19,7 +19,6 @@ package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.state.KeyGroupAssigner;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -762,7 +761,7 @@ public abstract class StreamTask<OUT, Operator extends 
StreamOperator<OUT>>
 
        public <K> KeyedStateBackend<K> createKeyedStateBackend(
                        TypeSerializer<K> keySerializer,
-                       KeyGroupAssigner<K> keyGroupAssigner,
+                       int numberOfKeyGroups,
                        KeyGroupRange keyGroupRange) throws Exception {
 
                if (keyedStateBackend != null) {
@@ -779,7 +778,7 @@ public abstract class StreamTask<OUT, Operator extends 
StreamOperator<OUT>>
                                        getEnvironment().getJobID(),
                                        operatorIdentifier,
                                        keySerializer,
-                                       keyGroupAssigner,
+                                       numberOfKeyGroups,
                                        keyGroupRange,
                                        lazyRestoreKeyGroupStates,
                                        
getEnvironment().getTaskKvStateRegistry());
@@ -791,7 +790,7 @@ public abstract class StreamTask<OUT, Operator extends 
StreamOperator<OUT>>
                                        getEnvironment().getJobID(),
                                        operatorIdentifier,
                                        keySerializer,
-                                       keyGroupAssigner,
+                                       numberOfKeyGroups,
                                        keyGroupRange,
                                        
getEnvironment().getTaskKvStateRegistry());
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
index 06d381f..874274f 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
@@ -19,11 +19,9 @@
 package org.apache.flink.streaming.api.graph;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.state.KeyGroupAssigner;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.runtime.state.HashKeyGroupAssigner;
 import org.apache.flink.streaming.api.datastream.ConnectedStreams;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
@@ -270,10 +268,6 @@ public class StreamGraphGeneratorTest {
                StreamNode keyedResultNode = 
graph.getStreamNode(keyedResult.getId());
 
                StreamPartitioner<?> streamPartitioner = 
keyedResultNode.getInEdges().get(0).getPartitioner();
-
-               HashKeyGroupAssigner<?> hashKeyGroupAssigner = 
extractHashKeyGroupAssigner(streamPartitioner);
-
-               assertEquals(maxParallelism, 
hashKeyGroupAssigner.getNumberKeyGroups());
        }
 
        /**
@@ -384,7 +378,7 @@ public class StreamGraphGeneratorTest {
        }
 
        /**
-        * Tests that the max parallelism and the key group partitioner is 
properly set for connected
+        * Tests that the max parallelism is properly set for connected
         * streams.
         */
        @Test
@@ -423,24 +417,6 @@ public class StreamGraphGeneratorTest {
 
                StreamPartitioner<?> streamPartitioner1 = 
keyedResultNode.getInEdges().get(0).getPartitioner();
                StreamPartitioner<?> streamPartitioner2 = 
keyedResultNode.getInEdges().get(1).getPartitioner();
-
-               HashKeyGroupAssigner<?> hashKeyGroupAssigner1 = 
extractHashKeyGroupAssigner(streamPartitioner1);
-               assertEquals(maxParallelism, 
hashKeyGroupAssigner1.getNumberKeyGroups());
-
-               HashKeyGroupAssigner<?> hashKeyGroupAssigner2 = 
extractHashKeyGroupAssigner(streamPartitioner2);
-               assertEquals(maxParallelism, 
hashKeyGroupAssigner2.getNumberKeyGroups());
-       }
-
-       private HashKeyGroupAssigner<?> 
extractHashKeyGroupAssigner(StreamPartitioner<?> streamPartitioner) {
-               assertTrue(streamPartitioner instanceof 
KeyGroupStreamPartitioner);
-
-               KeyGroupStreamPartitioner<?, ?> keyGroupStreamPartitioner = 
(KeyGroupStreamPartitioner<?, ?>) streamPartitioner;
-
-               KeyGroupAssigner<?> keyGroupAssigner = 
keyGroupStreamPartitioner.getKeyGroupAssigner();
-
-               assertTrue(keyGroupAssigner instanceof HashKeyGroupAssigner);
-
-               return (HashKeyGroupAssigner<?>) keyGroupAssigner;
        }
 
        private static class OutputTypeConfigurableOperationWithTwoInputs

Reply via email to