[FLINK-4380] Remove KeyGroupAssigner in favor of static method/Have default max. parallelism at 128
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6d430618 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6d430618 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6d430618 Branch: refs/heads/master Commit: 6d4306186e09be6f2557600ed7a853c33d3ae6bd Parents: 2b7a8d6 Author: Stefan Richter <s.rich...@data-artisans.com> Authored: Mon Aug 29 11:53:22 2016 +0200 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Wed Aug 31 19:10:01 2016 +0200 ---------------------------------------------------------------------- .../streaming/state/AbstractRocksDBState.java | 3 +- .../state/RocksDBKeyedStateBackend.java | 9 +- .../streaming/state/RocksDBStateBackend.java | 9 +- .../state/RocksDBStateBackendConfigTest.java | 9 +- .../api/common/state/KeyGroupAssigner.java | 53 ----- .../java/org/apache/flink/util/MathUtils.java | 15 ++ .../org/apache/flink/util/MathUtilTest.java | 31 +++ .../checkpoint/CheckpointCoordinator.java | 3 +- .../flink/runtime/jobgraph/JobVertex.java | 3 +- .../runtime/state/AbstractStateBackend.java | 5 +- .../runtime/state/HashKeyGroupAssigner.java | 66 ------ .../flink/runtime/state/KeyGroupRange.java | 42 ---- .../runtime/state/KeyGroupRangeAssignment.java | 97 +++++++++ .../flink/runtime/state/KeyedStateBackend.java | 17 +- .../state/filesystem/FsStateBackend.java | 9 +- .../runtime/state/heap/AbstractHeapState.java | 4 +- .../state/heap/HeapKeyedStateBackend.java | 9 +- .../flink/runtime/state/heap/HeapListState.java | 3 +- .../state/memory/MemoryStateBackend.java | 9 +- .../checkpoint/CheckpointCoordinatorTest.java | 3 +- .../runtime/query/QueryableStateClientTest.java | 4 +- .../runtime/query/netty/KvStateClientTest.java | 5 +- .../query/netty/KvStateServerHandlerTest.java | 16 +- .../runtime/query/netty/KvStateServerTest.java | 6 +- .../runtime/state/StateBackendTestBase.java | 49 ++--- .../streaming/api/datastream/KeyedStream.java | 5 +- .../flink/streaming/api/graph/StreamConfig.java | 22 +- .../api/graph/StreamGraphGenerator.java | 24 ++- .../api/graph/StreamingJobGraphGenerator.java | 9 +- .../api/operators/AbstractStreamOperator.java | 5 +- .../partitioner/KeyGroupStreamPartitioner.java | 24 +-- .../streaming/runtime/tasks/StreamTask.java | 7 +- .../api/graph/StreamGraphGeneratorTest.java | 26 +-- .../graph/StreamingJobGraphGeneratorTest.java | 200 ------------------- .../operators/StreamingRuntimeContextTest.java | 5 +- ...AlignedProcessingTimeWindowOperatorTest.java | 19 +- .../KeyGroupStreamPartitionerTest.java | 3 +- .../tasks/OneInputStreamTaskTestHarness.java | 3 +- .../KeyedOneInputStreamOperatorTestHarness.java | 16 +- .../util/OneInputStreamOperatorTestHarness.java | 4 - .../test/checkpointing/RescalingITCase.java | 27 +-- .../streaming/runtime/StateBackendITCase.java | 5 +- 42 files changed, 297 insertions(+), 586 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java index cbc2757..e878ad5 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java @@ -25,6 +25,7 @@ import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.runtime.state.KvState; import org.apache.flink.util.Preconditions; import org.rocksdb.ColumnFamilyHandle; @@ -130,7 +131,7 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta backend.getKeySerializer(), namespaceSerializer); - int keyGroup = backend.getKeyGroupAssigner().getKeyGroupIndex(des.f0); + int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(des.f0, backend.getNumberOfKeyGroups()); writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1); return backend.db.get(columnFamily, keySerializationStream.toByteArray()); http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index a1634b2..177c09f 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -21,7 +21,6 @@ import org.apache.commons.io.FileUtils; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.state.FoldingState; import org.apache.flink.api.common.state.FoldingStateDescriptor; -import org.apache.flink.api.common.state.KeyGroupAssigner; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.ReducingState; @@ -131,11 +130,11 @@ public class RocksDBKeyedStateBackend<K> extends KeyedStateBackend<K> { ColumnFamilyOptions columnFamilyOptions, TaskKvStateRegistry kvStateRegistry, TypeSerializer<K> keySerializer, - KeyGroupAssigner<K> keyGroupAssigner, + int numberOfKeyGroups, KeyGroupRange keyGroupRange ) throws Exception { - super(kvStateRegistry, keySerializer, keyGroupAssigner, keyGroupRange); + super(kvStateRegistry, keySerializer, numberOfKeyGroups, keyGroupRange); this.operatorIdentifier = operatorIdentifier; this.jobId = jobId; @@ -183,7 +182,7 @@ public class RocksDBKeyedStateBackend<K> extends KeyedStateBackend<K> { ColumnFamilyOptions columnFamilyOptions, TaskKvStateRegistry kvStateRegistry, TypeSerializer<K> keySerializer, - KeyGroupAssigner<K> keyGroupAssigner, + int numberOfKeyGroups, KeyGroupRange keyGroupRange, List<KeyGroupsStateHandle> restoreState ) throws Exception { @@ -195,7 +194,7 @@ public class RocksDBKeyedStateBackend<K> extends KeyedStateBackend<K> { columnFamilyOptions, kvStateRegistry, keySerializer, - keyGroupAssigner, + numberOfKeyGroups, keyGroupRange); LOG.info("Initializing RocksDB keyed state backend from snapshot."); http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java index f950751..0fdbd5f 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java @@ -18,7 +18,6 @@ package org.apache.flink.contrib.streaming.state; import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.state.KeyGroupAssigner; import org.apache.flink.api.common.state.StateBackend; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.fs.Path; @@ -230,7 +229,7 @@ public class RocksDBStateBackend extends AbstractStateBackend { JobID jobID, String operatorIdentifier, TypeSerializer<K> keySerializer, - KeyGroupAssigner<K> keyGroupAssigner, + int numberOfKeyGroups, KeyGroupRange keyGroupRange, TaskKvStateRegistry kvStateRegistry) throws Exception { @@ -246,7 +245,7 @@ public class RocksDBStateBackend extends AbstractStateBackend { getColumnOptions(), kvStateRegistry, keySerializer, - keyGroupAssigner, + numberOfKeyGroups, keyGroupRange); } @@ -254,7 +253,7 @@ public class RocksDBStateBackend extends AbstractStateBackend { public <K> KeyedStateBackend<K> restoreKeyedStateBackend(Environment env, JobID jobID, String operatorIdentifier, TypeSerializer<K> keySerializer, - KeyGroupAssigner<K> keyGroupAssigner, + int numberOfKeyGroups, KeyGroupRange keyGroupRange, List<KeyGroupsStateHandle> restoredState, TaskKvStateRegistry kvStateRegistry) throws Exception { @@ -270,7 +269,7 @@ public class RocksDBStateBackend extends AbstractStateBackend { getColumnOptions(), kvStateRegistry, keySerializer, - keyGroupAssigner, + numberOfKeyGroups, keyGroupRange, restoredState); } http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java index acf6cb8..3b851be 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java @@ -28,7 +28,6 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.query.KvStateRegistry; import org.apache.flink.runtime.state.AbstractStateBackend; -import org.apache.flink.runtime.state.HashKeyGroupAssigner; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.util.OperatingSystem; import org.junit.Assume; @@ -93,7 +92,7 @@ public class RocksDBStateBackendConfigTest { env.getJobID(), "test_op", IntSerializer.INSTANCE, - new HashKeyGroupAssigner<Integer>(1), + 1, new KeyGroupRange(0, 0), env.getTaskKvStateRegistry()); @@ -147,7 +146,7 @@ public class RocksDBStateBackendConfigTest { env.getJobID(), "test_op", IntSerializer.INSTANCE, - new HashKeyGroupAssigner<Integer>(1), + 1, new KeyGroupRange(0, 0), env.getTaskKvStateRegistry()); @@ -182,7 +181,7 @@ public class RocksDBStateBackendConfigTest { env.getJobID(), "foobar", IntSerializer.INSTANCE, - new HashKeyGroupAssigner<Integer>(1), + 1, new KeyGroupRange(0, 0), new KvStateRegistry().createTaskRegistry(env.getJobID(), new JobVertexID())); } @@ -224,7 +223,7 @@ public class RocksDBStateBackendConfigTest { env.getJobID(), "foobar", IntSerializer.INSTANCE, - new HashKeyGroupAssigner<Integer>(1), + 1, new KeyGroupRange(0, 0), new KvStateRegistry().createTaskRegistry(env.getJobID(), new JobVertexID())); } http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-core/src/main/java/org/apache/flink/api/common/state/KeyGroupAssigner.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/KeyGroupAssigner.java b/flink-core/src/main/java/org/apache/flink/api/common/state/KeyGroupAssigner.java deleted file mode 100644 index bb0691e..0000000 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/KeyGroupAssigner.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.common.state; - -import org.apache.flink.annotation.Internal; - -import java.io.Serializable; - -/** - * Assigns a key to a key group index. A key group is the smallest unit of partitioned state - * which is assigned to an operator. An operator can be assigned multiple key groups. - * - * @param <K> Type of the key - */ -@Internal -public interface KeyGroupAssigner<K> extends Serializable { - /** - * Calculates the key group index for the given key. - * - * @param key Key to be used - * @return Key group index for the given key - */ - int getKeyGroupIndex(K key); - - /** - * Setups the key group assigner with the maximum parallelism (= number of key groups). - * - * @param numberOfKeygroups Maximum parallelism (= number of key groups) - */ - void setup(int numberOfKeygroups); - - /** - * - * @return configured maximum parallelism - */ - int getNumberKeyGroups(); -} http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-core/src/main/java/org/apache/flink/util/MathUtils.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/util/MathUtils.java b/flink-core/src/main/java/org/apache/flink/util/MathUtils.java index f40c83a..49056cc 100644 --- a/flink-core/src/main/java/org/apache/flink/util/MathUtils.java +++ b/flink-core/src/main/java/org/apache/flink/util/MathUtils.java @@ -155,6 +155,21 @@ public final class MathUtils { } } + /** + * Round the given number to the next power of two + * @param x number to round + * @return x rounded up to the next power of two + */ + public static int roundUpToPowerOfTwo(int x) { + x = x - 1; + x |= x >> 1; + x |= x >> 2; + x |= x >> 4; + x |= x >> 8; + x |= x >> 16; + return x + 1; + } + // ============================================================================================ /** http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-core/src/test/java/org/apache/flink/util/MathUtilTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/util/MathUtilTest.java b/flink-core/src/test/java/org/apache/flink/util/MathUtilTest.java index 7917a7b..c98b7fc 100644 --- a/flink-core/src/test/java/org/apache/flink/util/MathUtilTest.java +++ b/flink-core/src/test/java/org/apache/flink/util/MathUtilTest.java @@ -81,6 +81,37 @@ public class MathUtilTest { } @Test + public void testRoundUpToPowerOf2() { + assertEquals(0, MathUtils.roundUpToPowerOfTwo(0)); + assertEquals(1, MathUtils.roundUpToPowerOfTwo(1)); + assertEquals(2, MathUtils.roundUpToPowerOfTwo(2)); + assertEquals(4, MathUtils.roundUpToPowerOfTwo(3)); + assertEquals(4, MathUtils.roundUpToPowerOfTwo(4)); + assertEquals(8, MathUtils.roundUpToPowerOfTwo(5)); + assertEquals(8, MathUtils.roundUpToPowerOfTwo(6)); + assertEquals(8, MathUtils.roundUpToPowerOfTwo(7)); + assertEquals(8, MathUtils.roundUpToPowerOfTwo(8)); + assertEquals(16, MathUtils.roundUpToPowerOfTwo(9)); + assertEquals(16, MathUtils.roundUpToPowerOfTwo(15)); + assertEquals(16, MathUtils.roundUpToPowerOfTwo(16)); + assertEquals(32, MathUtils.roundUpToPowerOfTwo(17)); + assertEquals(32, MathUtils.roundUpToPowerOfTwo(31)); + assertEquals(32, MathUtils.roundUpToPowerOfTwo(32)); + assertEquals(64, MathUtils.roundUpToPowerOfTwo(33)); + assertEquals(64, MathUtils.roundUpToPowerOfTwo(42)); + assertEquals(64, MathUtils.roundUpToPowerOfTwo(63)); + assertEquals(64, MathUtils.roundUpToPowerOfTwo(64)); + assertEquals(128, MathUtils.roundUpToPowerOfTwo(125)); + assertEquals(32768, MathUtils.roundUpToPowerOfTwo(25654)); + assertEquals(67108864, MathUtils.roundUpToPowerOfTwo(34366363)); + assertEquals(67108864, MathUtils.roundUpToPowerOfTwo(67108863)); + assertEquals(67108864, MathUtils.roundUpToPowerOfTwo(67108864)); + assertEquals(0x40000000, MathUtils.roundUpToPowerOfTwo(0x3FFFFFFE)); + assertEquals(0x40000000, MathUtils.roundUpToPowerOfTwo(0x3FFFFFFF)); + assertEquals(0x40000000, MathUtils.roundUpToPowerOfTwo(0x40000000)); + } + + @Test public void testPowerOfTwo() { assertTrue(MathUtils.isPowerOf2(1)); assertTrue(MathUtils.isPowerOf2(2)); http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index e751e08..52f6d9a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -36,6 +36,7 @@ import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete; import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint; import org.apache.flink.runtime.state.ChainedStateHandle; import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.util.Preconditions; @@ -886,7 +887,7 @@ public class CheckpointCoordinator { List<KeyGroupRange> result = new ArrayList<>(parallelism); int start = 0; for (int i = 0; i < parallelism; ++i) { - result.add(KeyGroupRange.computeKeyGroupRangeForOperatorIndex(numberKeyGroups, parallelism, i)); + result.add(KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(numberKeyGroups, parallelism, i)); } return result; } http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java index a623295..8ddc9f5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java @@ -253,7 +253,8 @@ public class JobVertex implements java.io.Serializable { */ public void setMaxParallelism(int maxParallelism) { org.apache.flink.util.Preconditions.checkArgument( - maxParallelism > 0 && maxParallelism <= Short.MAX_VALUE, "The max parallelism must be at least 1."); + maxParallelism > 0 && maxParallelism <= (1 << 15), + "The max parallelism must be at least 1 and smaller than 2^15."); this.maxParallelism = maxParallelism; } http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java index e6093a8..0d2bf45 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.state; import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.state.KeyGroupAssigner; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.query.TaskKvStateRegistry; @@ -53,7 +52,7 @@ public abstract class AbstractStateBackend implements java.io.Serializable { JobID jobID, String operatorIdentifier, TypeSerializer<K> keySerializer, - KeyGroupAssigner<K> keyGroupAssigner, + int numberOfKeyGroups, KeyGroupRange keyGroupRange, TaskKvStateRegistry kvStateRegistry) throws Exception; @@ -66,7 +65,7 @@ public abstract class AbstractStateBackend implements java.io.Serializable { JobID jobID, String operatorIdentifier, TypeSerializer<K> keySerializer, - KeyGroupAssigner<K> keyGroupAssigner, + int numberOfKeyGroups, KeyGroupRange keyGroupRange, List<KeyGroupsStateHandle> restoredState, TaskKvStateRegistry kvStateRegistry) throws Exception; http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashKeyGroupAssigner.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashKeyGroupAssigner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashKeyGroupAssigner.java deleted file mode 100644 index 9ee4b90..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashKeyGroupAssigner.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.state; - -import org.apache.flink.api.common.state.KeyGroupAssigner; -import org.apache.flink.util.MathUtils; -import org.apache.flink.util.Preconditions; - -/** - * Hash based key group assigner. The assigner assigns each key to a key group using the hash value - * of the key. - * - * @param <K> Type of the key - */ -public class HashKeyGroupAssigner<K> implements KeyGroupAssigner<K> { - private static final long serialVersionUID = -6319826921798945448L; - - private static final int UNDEFINED_NUMBER_KEY_GROUPS = Integer.MIN_VALUE; - - private int numberKeyGroups; - - public HashKeyGroupAssigner() { - this(UNDEFINED_NUMBER_KEY_GROUPS); - } - - public HashKeyGroupAssigner(int numberKeyGroups) { - Preconditions.checkArgument(numberKeyGroups > 0 || numberKeyGroups == UNDEFINED_NUMBER_KEY_GROUPS, - "The number of key groups has to be greater than 0 or undefined. Use " + - "setMaxParallelism() to specify the number of key groups."); - this.numberKeyGroups = numberKeyGroups; - } - - public int getNumberKeyGroups() { - return numberKeyGroups; - } - - @Override - public int getKeyGroupIndex(K key) { - return MathUtils.murmurHash(key.hashCode()) % numberKeyGroups; - } - - @Override - public void setup(int numberOfKeygroups) { - Preconditions.checkArgument(numberOfKeygroups > 0, "The number of key groups has to be " + - "greater than 0. Use setMaxParallelism() to specify the number of key " + - "groups."); - - this.numberKeyGroups = numberOfKeygroups; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRange.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRange.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRange.java index 9e74036..3a9d3d0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRange.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRange.java @@ -175,46 +175,4 @@ public class KeyGroupRange implements Iterable<Integer>, Serializable { return startKeyGroup <= endKeyGroup ? new KeyGroupRange(startKeyGroup, endKeyGroup) : EMPTY_KEY_GROUP; } - /** - * Computes the range of key-groups that are assigned to a given operator under the given parallelism and maximum - * parallelism. - * - * IMPORTANT: maxParallelism must be <= Short.MAX_VALUE to avoid rounding problems in this method. If we ever want - * to go beyond this boundary, this method must perform arithmetic on long values. - * - * @param maxParallelism Maximal parallelism that the job was initially created with. - * @param parallelism The current parallelism under which the job runs. Must be <= maxParallelism. - * @param operatorIndex Id of a key-group. 0 <= keyGroupID < maxParallelism. - * @return - */ - public static KeyGroupRange computeKeyGroupRangeForOperatorIndex( - int maxParallelism, - int parallelism, - int operatorIndex) { - Preconditions.checkArgument(parallelism > 0, "Parallelism must not be smaller than zero."); - Preconditions.checkArgument(maxParallelism >= parallelism, "Maximum parallelism must not be smaller than parallelism."); - Preconditions.checkArgument(maxParallelism <= Short.MAX_VALUE, "Maximum parallelism must be smaller than Short.MAX_VALUE."); - - int start = operatorIndex == 0 ? 0 : ((operatorIndex * maxParallelism - 1) / parallelism) + 1; - int end = ((operatorIndex + 1) * maxParallelism - 1) / parallelism; - return new KeyGroupRange(start, end); - } - - /** - * Computes the index of the operator to which a key-group belongs under the given parallelism and maximum - * parallelism. - * - * IMPORTANT: maxParallelism must be <= Short.MAX_VALUE to avoid rounding problems in this method. If we ever want - * to go beyond this boundary, this method must perform arithmetic on long values. - * - * @param maxParallelism Maximal parallelism that the job was initially created with. - * 0 < parallelism <= maxParallelism <= Short.MAX_VALUE must hold. - * @param parallelism The current parallelism under which the job runs. Must be <= maxParallelism. - * @param keyGroupId Id of a key-group. 0 <= keyGroupID < maxParallelism. - * @return The index of the operator to which elements from the given key-group should be routed under the given - * parallelism and maxParallelism. - */ - public static final int computeOperatorIndexForKeyGroup(int maxParallelism, int parallelism, int keyGroupId) { - return keyGroupId * parallelism / maxParallelism; - } } http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java new file mode 100644 index 0000000..eceb6f4 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.util.MathUtils; +import org.apache.flink.util.Preconditions; + +public final class KeyGroupRangeAssignment { + + public static final int DEFAULT_MAX_PARALLELISM = 128; + + private KeyGroupRangeAssignment() { + throw new AssertionError(); + } + + /** + * Assigns the given key to a parallel operator index. + * + * @param key the key to assign + * @param maxParallelism the maximum supported parallelism, aka the number of key-groups. + * @param parallelism the current parallelism of the operator + * @return the index of the parallel operator to which the given key should be routed. + */ + public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) { + return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism)); + } + + /** + * Assigns the given key to a key-group index. + * + * @param key the key to assign + * @param maxParallelism the maximum supported parallelism, aka the number of key-groups. + * @return the key-group to which the given key is assigned + */ + public static final int assignToKeyGroup(Object key, int maxParallelism) { + return MathUtils.murmurHash(key.hashCode()) % maxParallelism; + } + + /** + * Computes the range of key-groups that are assigned to a given operator under the given parallelism and maximum + * parallelism. + * + * IMPORTANT: maxParallelism must be <= Short.MAX_VALUE to avoid rounding problems in this method. If we ever want + * to go beyond this boundary, this method must perform arithmetic on long values. + * + * @param maxParallelism Maximal parallelism that the job was initially created with. + * @param parallelism The current parallelism under which the job runs. Must be <= maxParallelism. + * @param operatorIndex Id of a key-group. 0 <= keyGroupID < maxParallelism. + * @return + */ + public static KeyGroupRange computeKeyGroupRangeForOperatorIndex( + int maxParallelism, + int parallelism, + int operatorIndex) { + Preconditions.checkArgument(parallelism > 0, "Parallelism must not be smaller than zero."); + Preconditions.checkArgument(maxParallelism >= parallelism, "Maximum parallelism must not be smaller than parallelism."); + Preconditions.checkArgument(maxParallelism <= (1 << 15), "Maximum parallelism must be smaller than 2^15."); + + int start = operatorIndex == 0 ? 0 : ((operatorIndex * maxParallelism - 1) / parallelism) + 1; + int end = ((operatorIndex + 1) * maxParallelism - 1) / parallelism; + return new KeyGroupRange(start, end); + } + + /** + * Computes the index of the operator to which a key-group belongs under the given parallelism and maximum + * parallelism. + * + * IMPORTANT: maxParallelism must be <= Short.MAX_VALUE to avoid rounding problems in this method. If we ever want + * to go beyond this boundary, this method must perform arithmetic on long values. + * + * @param maxParallelism Maximal parallelism that the job was initially created with. + * 0 < parallelism <= maxParallelism <= Short.MAX_VALUE must hold. + * @param parallelism The current parallelism under which the job runs. Must be <= maxParallelism. + * @param keyGroupId Id of a key-group. 0 <= keyGroupID < maxParallelism. + * @return The index of the operator to which elements from the given key-group should be routed under the given + * parallelism and maxParallelism. + */ + public static int computeOperatorIndexForKeyGroup(int maxParallelism, int parallelism, int keyGroupId) { + return keyGroupId * parallelism / maxParallelism; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java index 2d1d25c..bf9018e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java @@ -22,7 +22,6 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.state.FoldingState; import org.apache.flink.api.common.state.FoldingStateDescriptor; -import org.apache.flink.api.common.state.KeyGroupAssigner; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.MergingState; @@ -69,8 +68,8 @@ public abstract class KeyedStateBackend<K> { @SuppressWarnings("rawtypes") private KvState lastState; - /** KeyGroupAssigner which determines the key group for each keys */ - protected final KeyGroupAssigner<K> keyGroupAssigner; + /** The number of key-groups aka max parallelism */ + protected final int numberOfKeyGroups; /** Range of key-groups for which this backend is responsible */ protected final KeyGroupRange keyGroupRange; @@ -81,12 +80,12 @@ public abstract class KeyedStateBackend<K> { public KeyedStateBackend( TaskKvStateRegistry kvStateRegistry, TypeSerializer<K> keySerializer, - KeyGroupAssigner<K> keyGroupAssigner, + int numberOfKeyGroups, KeyGroupRange keyGroupRange) { this.kvStateRegistry = Preconditions.checkNotNull(kvStateRegistry); this.keySerializer = Preconditions.checkNotNull(keySerializer); - this.keyGroupAssigner = Preconditions.checkNotNull(keyGroupAssigner); + this.numberOfKeyGroups = Preconditions.checkNotNull(numberOfKeyGroups); this.keyGroupRange = Preconditions.checkNotNull(keyGroupRange); } @@ -157,7 +156,7 @@ public abstract class KeyedStateBackend<K> { */ public void setCurrentKey(K newKey) { this.currentKey = newKey; - this.currentKeyGroup = keyGroupAssigner.getKeyGroupIndex(newKey); + this.currentKeyGroup = KeyGroupRangeAssignment.assignToKeyGroup(newKey, numberOfKeyGroups); } /** @@ -179,11 +178,7 @@ public abstract class KeyedStateBackend<K> { } public int getNumberOfKeyGroups() { - return keyGroupAssigner.getNumberKeyGroups(); - } - - public KeyGroupAssigner<K> getKeyGroupAssigner() { - return keyGroupAssigner; + return numberOfKeyGroups; } /** http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java index 5495244..6d92a4d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.state.filesystem; import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.state.KeyGroupAssigner; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; @@ -181,13 +180,13 @@ public class FsStateBackend extends AbstractStateBackend { JobID jobID, String operatorIdentifier, TypeSerializer<K> keySerializer, - KeyGroupAssigner<K> keyGroupAssigner, + int numberOfKeyGroups, KeyGroupRange keyGroupRange, TaskKvStateRegistry kvStateRegistry) throws Exception { return new HeapKeyedStateBackend<>( kvStateRegistry, keySerializer, - keyGroupAssigner, + numberOfKeyGroups, keyGroupRange); } @@ -197,14 +196,14 @@ public class FsStateBackend extends AbstractStateBackend { JobID jobID, String operatorIdentifier, TypeSerializer<K> keySerializer, - KeyGroupAssigner<K> keyGroupAssigner, + int numberOfKeyGroups, KeyGroupRange keyGroupRange, List<KeyGroupsStateHandle> restoredState, TaskKvStateRegistry kvStateRegistry) throws Exception { return new HeapKeyedStateBackend<>( kvStateRegistry, keySerializer, - keyGroupAssigner, + numberOfKeyGroups, keyGroupRange, restoredState); } http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java index 9863c93..18d1bc7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java @@ -24,9 +24,9 @@ import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.runtime.state.KeyedStateBackend; import org.apache.flink.runtime.state.KvState; -import org.apache.flink.runtime.state.heap.StateTable; import org.apache.flink.util.Preconditions; import java.util.HashMap; @@ -138,7 +138,7 @@ public abstract class AbstractHeapState<K, N, SV, S extends State, SD extends St Preconditions.checkState(key != null, "No key given."); Map<N, Map<K, SV>> namespaceMap = - stateTable.get(backend.getKeyGroupAssigner().getKeyGroupIndex(key)); + stateTable.get(KeyGroupRangeAssignment.assignToKeyGroup(key, backend.getNumberOfKeyGroups())); if (namespaceMap == null) { return null; http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java index fcb4bef..8d13941 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java @@ -20,7 +20,6 @@ package org.apache.flink.runtime.state.heap; import org.apache.flink.api.common.state.FoldingState; import org.apache.flink.api.common.state.FoldingStateDescriptor; -import org.apache.flink.api.common.state.KeyGroupAssigner; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.ReducingState; @@ -76,20 +75,20 @@ public class HeapKeyedStateBackend<K> extends KeyedStateBackend<K> { public HeapKeyedStateBackend( TaskKvStateRegistry kvStateRegistry, TypeSerializer<K> keySerializer, - KeyGroupAssigner<K> keyGroupAssigner, + int numberOfKeyGroups, KeyGroupRange keyGroupRange) { - super(kvStateRegistry, keySerializer, keyGroupAssigner, keyGroupRange); + super(kvStateRegistry, keySerializer, numberOfKeyGroups, keyGroupRange); LOG.info("Initializing heap keyed state backend with stream factory."); } public HeapKeyedStateBackend(TaskKvStateRegistry kvStateRegistry, TypeSerializer<K> keySerializer, - KeyGroupAssigner<K> keyGroupAssigner, + int numberOfKeyGroups, KeyGroupRange keyGroupRange, List<KeyGroupsStateHandle> restoredState) throws Exception { - super(kvStateRegistry, keySerializer, keyGroupAssigner, keyGroupRange); + super(kvStateRegistry, keySerializer, numberOfKeyGroups, keyGroupRange); LOG.info("Initializing heap keyed state backend from snapshot."); http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java index 4c65c25..9552325 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.runtime.state.KeyedStateBackend; import org.apache.flink.util.Preconditions; @@ -119,7 +120,7 @@ public class HeapListState<K, N, V> Preconditions.checkState(key != null, "No key given."); Map<N, Map<K, ArrayList<V>>> namespaceMap = - stateTable.get(backend.getKeyGroupAssigner().getKeyGroupIndex(key)); + stateTable.get(KeyGroupRangeAssignment.assignToKeyGroup(key, backend.getNumberOfKeyGroups())); if (namespaceMap == null) { return null; http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java index 654c367..179dfe7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.state.memory; import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.state.KeyGroupAssigner; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.query.TaskKvStateRegistry; @@ -80,14 +79,14 @@ public class MemoryStateBackend extends AbstractStateBackend { public <K> KeyedStateBackend<K> createKeyedStateBackend( Environment env, JobID jobID, String operatorIdentifier, TypeSerializer<K> keySerializer, - KeyGroupAssigner<K> keyGroupAssigner, + int numberOfKeyGroups, KeyGroupRange keyGroupRange, TaskKvStateRegistry kvStateRegistry) throws IOException { return new HeapKeyedStateBackend<>( kvStateRegistry, keySerializer, - keyGroupAssigner, + numberOfKeyGroups, keyGroupRange); } @@ -96,7 +95,7 @@ public class MemoryStateBackend extends AbstractStateBackend { Environment env, JobID jobID, String operatorIdentifier, TypeSerializer<K> keySerializer, - KeyGroupAssigner<K> keyGroupAssigner, + int numberOfKeyGroups, KeyGroupRange keyGroupRange, List<KeyGroupsStateHandle> restoredState, TaskKvStateRegistry kvStateRegistry) throws Exception { @@ -104,7 +103,7 @@ public class MemoryStateBackend extends AbstractStateBackend { return new HeapKeyedStateBackend<>( kvStateRegistry, keySerializer, - keyGroupAssigner, + numberOfKeyGroups, keyGroupRange, restoredState); } http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index 495dced..4972c51 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -35,6 +35,7 @@ import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete; import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint; import org.apache.flink.runtime.state.ChainedStateHandle; import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.runtime.state.KeyGroupRangeOffsets; import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.StreamStateHandle; @@ -2458,7 +2459,7 @@ public class CheckpointCoordinatorTest { private void testCreateKeyGroupPartitions(int maxParallelism, int parallelism) { List<KeyGroupRange> ranges = CheckpointCoordinator.createKeyGroupPartitions(maxParallelism, parallelism); for (int i = 0; i < maxParallelism; ++i) { - KeyGroupRange range = ranges.get(KeyGroupRange.computeOperatorIndexForKeyGroup(maxParallelism, parallelism, i)); + KeyGroupRange range = ranges.get(KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(maxParallelism, parallelism, i)); if (!range.contains(i)) { Assert.fail("Could not find expected key-group " + i + " in range " + range); } http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java index 3380907..405f962 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java @@ -32,7 +32,6 @@ import org.apache.flink.runtime.query.netty.KvStateClient; import org.apache.flink.runtime.query.netty.KvStateServer; import org.apache.flink.runtime.query.netty.UnknownKvStateID; import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; -import org.apache.flink.runtime.state.HashKeyGroupAssigner; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedStateBackend; import org.apache.flink.runtime.state.VoidNamespace; @@ -232,6 +231,7 @@ public class QueryableStateClientTest { // Config int numServers = 2; int numKeys = 1024; + int numKeyGroups = 1; JobID jobId = new JobID(); JobVertexID jobVertexId = new JobVertexID(); @@ -250,7 +250,7 @@ public class QueryableStateClientTest { new JobID(), "test_op", IntSerializer.INSTANCE, - new HashKeyGroupAssigner<Integer>(1), + numKeyGroups, new KeyGroupRange(0, 0), new KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID())); http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java index 796481c..f785174 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java @@ -42,7 +42,6 @@ import org.apache.flink.runtime.query.netty.message.KvStateRequest; import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; import org.apache.flink.runtime.query.netty.message.KvStateRequestType; import org.apache.flink.runtime.state.AbstractStateBackend; -import org.apache.flink.runtime.state.HashKeyGroupAssigner; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedStateBackend; import org.apache.flink.runtime.state.KvState; @@ -533,6 +532,8 @@ public class KvStateClientTest { final int batchSize = 16; + final int numKeyGroups = 1; + AbstractStateBackend abstractBackend = new MemoryStateBackend(); KvStateRegistry dummyRegistry = new KvStateRegistry(); DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0); @@ -542,7 +543,7 @@ public class KvStateClientTest { new JobID(), "test_op", IntSerializer.INSTANCE, - new HashKeyGroupAssigner<Integer>(1), + numKeyGroups, new KeyGroupRange(0, 0), dummyRegistry.createTaskRegistry(new JobID(), new JobVertexID())); http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java index 3d2e8b5..52c807f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java @@ -39,7 +39,6 @@ import org.apache.flink.runtime.query.netty.message.KvStateRequestResult; import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; import org.apache.flink.runtime.query.netty.message.KvStateRequestType; import org.apache.flink.runtime.state.AbstractStateBackend; -import org.apache.flink.runtime.state.HashKeyGroupAssigner; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedStateBackend; import org.apache.flink.runtime.state.KvState; @@ -89,6 +88,7 @@ public class KvStateServerHandlerTest { ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE, null); desc.setQueryable("vanilla"); + int numKeyGroups =1; AbstractStateBackend abstractBackend = new MemoryStateBackend(); DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0); dummyEnv.setKvStateRegistry(registry); @@ -97,7 +97,7 @@ public class KvStateServerHandlerTest { new JobID(), "test_op", IntSerializer.INSTANCE, - new HashKeyGroupAssigner<Integer>(1), + numKeyGroups, new KeyGroupRange(0, 0), registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId())); @@ -200,6 +200,7 @@ public class KvStateServerHandlerTest { KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats); EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler); + int numKeyGroups = 1; AbstractStateBackend abstractBackend = new MemoryStateBackend(); DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0); dummyEnv.setKvStateRegistry(registry); @@ -208,7 +209,7 @@ public class KvStateServerHandlerTest { new JobID(), "test_op", IntSerializer.INSTANCE, - new HashKeyGroupAssigner<Integer>(1), + numKeyGroups, new KeyGroupRange(0, 0), registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId())); @@ -346,6 +347,7 @@ public class KvStateServerHandlerTest { KvStateServerHandler handler = new KvStateServerHandler(registry, closedExecutor, stats); EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler); + int numKeyGroups = 1; AbstractStateBackend abstractBackend = new MemoryStateBackend(); DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0); dummyEnv.setKvStateRegistry(registry); @@ -354,7 +356,7 @@ public class KvStateServerHandlerTest { new JobID(), "test_op", IntSerializer.INSTANCE, - new HashKeyGroupAssigner<Integer>(1), + numKeyGroups, new KeyGroupRange(0, 0), registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId())); @@ -484,6 +486,7 @@ public class KvStateServerHandlerTest { KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats); EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler); + int numKeyGroups = 1; AbstractStateBackend abstractBackend = new MemoryStateBackend(); DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0); dummyEnv.setKvStateRegistry(registry); @@ -492,7 +495,7 @@ public class KvStateServerHandlerTest { new JobID(), "test_op", IntSerializer.INSTANCE, - new HashKeyGroupAssigner<Integer>(1), + numKeyGroups, new KeyGroupRange(0, 0), registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId())); @@ -579,6 +582,7 @@ public class KvStateServerHandlerTest { KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats); EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler); + int numKeyGroups = 1; AbstractStateBackend abstractBackend = new MemoryStateBackend(); DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0); dummyEnv.setKvStateRegistry(registry); @@ -587,7 +591,7 @@ public class KvStateServerHandlerTest { new JobID(), "test_op", IntSerializer.INSTANCE, - new HashKeyGroupAssigner<Integer>(1), + numKeyGroups, new KeyGroupRange(0, 0), registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId())); http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerTest.java index 30d91b6..e92fb10 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerTest.java @@ -42,7 +42,6 @@ import org.apache.flink.runtime.query.netty.message.KvStateRequestResult; import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; import org.apache.flink.runtime.query.netty.message.KvStateRequestType; import org.apache.flink.runtime.state.AbstractStateBackend; -import org.apache.flink.runtime.state.HashKeyGroupAssigner; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedStateBackend; import org.apache.flink.runtime.state.VoidNamespace; @@ -80,7 +79,6 @@ public class KvStateServerTest { public void testSimpleRequest() throws Exception { KvStateServer server = null; Bootstrap bootstrap = null; - try { KvStateRegistry registry = new KvStateRegistry(); KvStateRequestStats stats = new AtomicKvStateRequestStats(); @@ -89,7 +87,7 @@ public class KvStateServerTest { server.start(); KvStateServerAddress serverAddress = server.getAddress(); - + int numKeyGroups = 1; AbstractStateBackend abstractBackend = new MemoryStateBackend(); DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0); dummyEnv.setKvStateRegistry(registry); @@ -98,7 +96,7 @@ public class KvStateServerTest { new JobID(), "test_op", IntSerializer.INSTANCE, - new HashKeyGroupAssigner<Integer>(1), + numKeyGroups, new KeyGroupRange(0, 0), registry.createTaskRegistry(new JobID(), new JobVertexID())); http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java index f094bd5..5984aca 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java @@ -26,7 +26,6 @@ import org.apache.flink.api.common.functions.FoldFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.state.FoldingState; import org.apache.flink.api.common.state.FoldingStateDescriptor; -import org.apache.flink.api.common.state.KeyGroupAssigner; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.ReducingState; @@ -41,7 +40,6 @@ import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; import org.apache.flink.runtime.execution.Environment; -import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.operators.testutils.DummyEnvironment; import org.apache.flink.runtime.query.KvStateID; import org.apache.flink.runtime.query.KvStateRegistry; @@ -56,7 +54,6 @@ import java.util.Collections; import java.util.List; import java.util.Random; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Future; import java.util.concurrent.RunnableFuture; import static org.junit.Assert.assertEquals; @@ -90,14 +87,14 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> { protected <K> KeyedStateBackend<K> createKeyedBackend(TypeSerializer<K> keySerializer, Environment env) throws Exception { return createKeyedBackend( keySerializer, - new HashKeyGroupAssigner<K>(10), + 10, new KeyGroupRange(0, 9), env); } protected <K> KeyedStateBackend<K> createKeyedBackend( TypeSerializer<K> keySerializer, - KeyGroupAssigner<K> keyGroupAssigner, + int numberOfKeyGroups, KeyGroupRange keyGroupRange, Environment env) throws Exception { return getStateBackend().createKeyedStateBackend( @@ -105,7 +102,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> { new JobID(), "test_op", keySerializer, - keyGroupAssigner, + numberOfKeyGroups, keyGroupRange, env.getTaskKvStateRegistry()); } @@ -120,7 +117,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> { Environment env) throws Exception { return restoreKeyedBackend( keySerializer, - new HashKeyGroupAssigner<K>(10), + 10, new KeyGroupRange(0, 9), Collections.singletonList(state), env); @@ -128,7 +125,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> { protected <K> KeyedStateBackend<K> restoreKeyedBackend( TypeSerializer<K> keySerializer, - KeyGroupAssigner<K> keyGroupAssigner, + int numberOfKeyGroups, KeyGroupRange keyGroupRange, List<KeyGroupsStateHandle> state, Environment env) throws Exception { @@ -137,7 +134,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> { new JobID(), "test_op", keySerializer, - keyGroupAssigner, + numberOfKeyGroups, keyGroupRange, state, env.getTaskKvStateRegistry()); @@ -243,7 +240,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> { KeyedStateBackend<Integer> backend = createKeyedBackend( IntSerializer.INSTANCE, - new HashKeyGroupAssigner<Integer>(1), + 1, new KeyGroupRange(0, 0), new DummyEnvironment("test_op", 1, 0)); @@ -277,7 +274,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> { backend.close(); backend = restoreKeyedBackend( IntSerializer.INSTANCE, - new HashKeyGroupAssigner<Integer>(1), + 1, new KeyGroupRange(0, 0), Collections.singletonList(snapshot1), new DummyEnvironment("test_op", 1, 0)); @@ -675,12 +672,9 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> { final int MAX_PARALLELISM = 10; CheckpointStreamFactory streamFactory = createStreamFactory(); - - HashKeyGroupAssigner<Integer> keyGroupAssigner = new HashKeyGroupAssigner<>(10); - KeyedStateBackend<Integer> backend = createKeyedBackend( IntSerializer.INSTANCE, - keyGroupAssigner, + MAX_PARALLELISM, new KeyGroupRange(0, MAX_PARALLELISM - 1), new DummyEnvironment("test", 1, 0)); @@ -695,12 +689,12 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> { Random rand = new Random(0); // for each key, determine into which half of the key-group space they fall - int firstKeyHalf = keyGroupAssigner.getKeyGroupIndex(keyInFirstHalf) * 2 / MAX_PARALLELISM; - int secondKeyHalf = keyGroupAssigner.getKeyGroupIndex(keyInSecondHalf) * 2 / MAX_PARALLELISM; + int firstKeyHalf = KeyGroupRangeAssignment.assignKeyToParallelOperator(keyInFirstHalf, MAX_PARALLELISM, 2); + int secondKeyHalf = KeyGroupRangeAssignment.assignKeyToParallelOperator(keyInFirstHalf, MAX_PARALLELISM, 2); while (firstKeyHalf == secondKeyHalf) { keyInSecondHalf = rand.nextInt(); - secondKeyHalf = keyGroupAssigner.getKeyGroupIndex(keyInSecondHalf) * 2 / MAX_PARALLELISM; + secondKeyHalf = KeyGroupRangeAssignment.assignKeyToParallelOperator(keyInSecondHalf, MAX_PARALLELISM, 2); } backend.setCurrentKey(keyInFirstHalf); @@ -714,18 +708,18 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> { List<KeyGroupsStateHandle> firstHalfKeyGroupStates = CheckpointCoordinator.getKeyGroupsStateHandles( Collections.singletonList(snapshot), - new KeyGroupRange(0, 4)); + KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(MAX_PARALLELISM, 2, 0)); List<KeyGroupsStateHandle> secondHalfKeyGroupStates = CheckpointCoordinator.getKeyGroupsStateHandles( Collections.singletonList(snapshot), - new KeyGroupRange(5, 9)); + KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(MAX_PARALLELISM, 2, 1)); backend.close(); // backend for the first half of the key group range KeyedStateBackend<Integer> firstHalfBackend = restoreKeyedBackend( IntSerializer.INSTANCE, - keyGroupAssigner, + MAX_PARALLELISM, new KeyGroupRange(0, 4), firstHalfKeyGroupStates, new DummyEnvironment("test", 1, 0)); @@ -733,7 +727,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> { // backend for the second half of the key group range KeyedStateBackend<Integer> secondHalfBackend = restoreKeyedBackend( IntSerializer.INSTANCE, - keyGroupAssigner, + MAX_PARALLELISM, new KeyGroupRange(5, 9), secondHalfKeyGroupStates, new DummyEnvironment("test", 1, 0)); @@ -978,9 +972,10 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> { */ @SuppressWarnings("unchecked") protected void testConcurrentMapIfQueryable() throws Exception { + final int numberOfKeyGroups = 1; KeyedStateBackend<Integer> backend = createKeyedBackend( IntSerializer.INSTANCE, - new HashKeyGroupAssigner<Integer>(1), + numberOfKeyGroups, new KeyGroupRange(0, 0), new DummyEnvironment("test_op", 1, 0)); @@ -1005,7 +1000,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> { backend.setCurrentKey(1); state.update(121818273); - int keyGroupIndex = new HashKeyGroupAssigner<>(1).getKeyGroupIndex(1); + int keyGroupIndex = KeyGroupRangeAssignment.assignToKeyGroup(1, numberOfKeyGroups); StateTable stateTable = ((AbstractHeapState) kvState).getStateTable(); assertNotNull("State not set", stateTable.get(keyGroupIndex)); assertTrue(stateTable.get(keyGroupIndex) instanceof ConcurrentHashMap); @@ -1031,7 +1026,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> { backend.setCurrentKey(1); state.add(121818273); - int keyGroupIndex = new HashKeyGroupAssigner<>(1).getKeyGroupIndex(1); + int keyGroupIndex = KeyGroupRangeAssignment.assignToKeyGroup(1, numberOfKeyGroups); StateTable stateTable = ((AbstractHeapState) kvState).getStateTable(); assertNotNull("State not set", stateTable.get(keyGroupIndex)); assertTrue(stateTable.get(keyGroupIndex) instanceof ConcurrentHashMap); @@ -1062,7 +1057,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> { backend.setCurrentKey(1); state.add(121818273); - int keyGroupIndex = new HashKeyGroupAssigner<>(1).getKeyGroupIndex(1); + int keyGroupIndex = KeyGroupRangeAssignment.assignToKeyGroup(1, numberOfKeyGroups); StateTable stateTable = ((AbstractHeapState) kvState).getStateTable(); assertNotNull("State not set", stateTable.get(keyGroupIndex)); assertTrue(stateTable.get(keyGroupIndex) instanceof ConcurrentHashMap); @@ -1093,7 +1088,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> { backend.setCurrentKey(1); state.add(121818273); - int keyGroupIndex = new HashKeyGroupAssigner<>(1).getKeyGroupIndex(1); + int keyGroupIndex = KeyGroupRangeAssignment.assignToKeyGroup(1, numberOfKeyGroups); StateTable stateTable = ((AbstractHeapState) kvState).getStateTable(); assertNotNull("State not set", stateTable.get(keyGroupIndex)); assertTrue(stateTable.get(keyGroupIndex) instanceof ConcurrentHashMap); http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java index 1fb34b8..af907e3 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java @@ -30,7 +30,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.Utils; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.runtime.state.HashKeyGroupAssigner; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction; import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator; @@ -111,8 +111,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> { new PartitionTransformation<>( dataStream.getTransformation(), new KeyGroupStreamPartitioner<>( - keySelector, - new HashKeyGroupAssigner<KEY>()))); + keySelector, KeyGroupRangeAssignment.DEFAULT_MAX_PARALLELISM))); this.keySelector = keySelector; this.keyType = keyType; } http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java index 1a92ba4..8e807db 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java @@ -26,7 +26,6 @@ import java.util.List; import java.util.Map; import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.state.KeyGroupAssigner; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.configuration.Configuration; @@ -76,8 +75,7 @@ public class StreamConfig implements Serializable { private static final String STATE_BACKEND = "statebackend"; private static final String STATE_PARTITIONER = "statePartitioner"; - /** key for the {@link KeyGroupAssigner} for key to key group index mappings */ - private static final String KEY_GROUP_ASSIGNER = "keyGroupAssigner"; + private static final String NUMBER_OF_KEY_GROUPS = "numberOfKeyGroups"; private static final String STATE_KEY_SERIALIZER = "statekeyser"; @@ -445,28 +443,26 @@ public class StreamConfig implements Serializable { } /** - * Sets the {@link KeyGroupAssigner} to be used for the current {@link StreamOperator}. + * Sets the number of key-groups to be used for the current {@link StreamOperator}. * - * @param keyGroupAssigner Key group assigner to be used + * @param numberOfKeyGroups Number of key-groups to be used */ - public void setKeyGroupAssigner(KeyGroupAssigner<?> keyGroupAssigner) { + public void setNumberOfKeyGroups(int numberOfKeyGroups) { try { - InstantiationUtil.writeObjectToConfig(keyGroupAssigner, this.config, KEY_GROUP_ASSIGNER); + InstantiationUtil.writeObjectToConfig(numberOfKeyGroups, this.config, NUMBER_OF_KEY_GROUPS); } catch (Exception e) { throw new StreamTaskException("Could not serialize virtual state partitioner.", e); } } /** - * Gets the {@link KeyGroupAssigner} for the {@link StreamOperator}. + * Gets the number of key-groups for the {@link StreamOperator}. * - * @param classLoader Classloader to be used for the deserialization - * @param <K> Type of the keys to be assigned to key groups - * @return Key group assigner + * @return the number of key-groups */ - public <K> KeyGroupAssigner<K> getKeyGroupAssigner(ClassLoader classLoader) { + public Integer getNumberOfKeyGroups(ClassLoader cl) { try { - return InstantiationUtil.readObjectFromConfig(this.config, KEY_GROUP_ASSIGNER, classLoader); + return InstantiationUtil.readObjectFromConfig(this.config, NUMBER_OF_KEY_GROUPS, cl); } catch (Exception e) { throw new StreamTaskException("Could not instantiate virtual state partitioner.", e); } http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java index f9f26e9..506b664 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.graph; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction; import org.apache.flink.streaming.api.transformations.CoFeedbackTransformation; @@ -33,6 +34,7 @@ import org.apache.flink.streaming.api.transformations.SplitTransformation; import org.apache.flink.streaming.api.transformations.StreamTransformation; import org.apache.flink.streaming.api.transformations.TwoInputTransformation; import org.apache.flink.streaming.api.transformations.UnionTransformation; +import org.apache.flink.util.MathUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -145,24 +147,24 @@ public class StreamGraphGenerator { LOG.debug("Transforming " + transform); if (transform.getMaxParallelism() <= 0) { + // if the max parallelism hasn't been set, then first use the job wide max parallelism // from theExecutionConfig. If this value has not been specified either, then use the // parallelism of the operator. int maxParallelism = env.getConfig().getMaxParallelism(); if (maxParallelism <= 0) { - maxParallelism = transform.getParallelism(); - /** - * TODO: Remove once the parallelism settings works properly in Flink (FLINK-3885) - * Currently, the parallelism will be set to 1 on the JobManager iff it encounters - * a negative parallelism value. We need to know this for the - * KeyGroupStreamPartitioner on the client-side. Thus, we already set the value to - * 1 here. - */ - if (maxParallelism <= 0) { - transform.setParallelism(1); - maxParallelism = 1; + + int parallelism = transform.getParallelism(); + + if(parallelism <= 0) { + parallelism = 1; + transform.setParallelism(parallelism); } + + maxParallelism = Math.max( + MathUtils.roundUpToPowerOfTwo(parallelism + (parallelism / 2)), + KeyGroupRangeAssignment.DEFAULT_MAX_PARALLELISM); } transform.setMaxParallelism(maxParallelism); http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index 76fdaca..a024895 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -26,7 +26,6 @@ import org.apache.commons.lang3.StringUtils; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper; import org.apache.flink.api.common.restartstrategy.RestartStrategies; -import org.apache.flink.api.common.state.KeyGroupAssigner; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; @@ -41,7 +40,6 @@ import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.operators.util.TaskConfig; -import org.apache.flink.runtime.state.HashKeyGroupAssigner; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; @@ -357,12 +355,9 @@ public class StreamingJobGraphGenerator { config.setStatePartitioner(1, vertex.getStatePartitioner2()); config.setStateKeySerializer(vertex.getStateKeySerializer()); - // only set the key group assigner if the vertex uses partitioned state (= KeyedStream). + // only set the max parallelism if the vertex uses partitioned state (= KeyedStream). if (vertex.getStatePartitioner1() != null) { - // the key group assigner has to know the number of key groups (= maxParallelism) - KeyGroupAssigner<Object> keyGroupAssigner = new HashKeyGroupAssigner<Object>(vertex.getMaxParallelism()); - - config.setKeyGroupAssigner(keyGroupAssigner); + config.setNumberOfKeyGroups(vertex.getMaxParallelism()); } Class<? extends AbstractInvokable> vertexClass = vertex.getJobVertexClass(); http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index 718c0c7..71296e3 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -29,6 +29,7 @@ import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.runtime.state.KeyedStateBackend; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; @@ -133,14 +134,14 @@ public abstract class AbstractStreamOperator<OUT> if (null != keySerializer) { ExecutionConfig execConf = container.getEnvironment().getExecutionConfig();; - KeyGroupRange subTaskKeyGroupRange = KeyGroupRange.computeKeyGroupRangeForOperatorIndex( + KeyGroupRange subTaskKeyGroupRange = KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex( container.getEnvironment().getTaskInfo().getNumberOfKeyGroups(), container.getEnvironment().getTaskInfo().getNumberOfParallelSubtasks(), container.getIndexInSubtaskGroup()); keyedStateBackend = container.createKeyedStateBackend( keySerializer, - container.getConfiguration().getKeyGroupAssigner(getUserCodeClassloader()), + container.getConfiguration().getNumberOfKeyGroups(getUserCodeClassloader()), subTaskKeyGroupRange); } } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java index 108a3ae..256fee1 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java @@ -18,16 +18,14 @@ package org.apache.flink.streaming.runtime.partitioner; import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.state.KeyGroupAssigner; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.runtime.plugable.SerializationDelegate; -import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.Preconditions; /** - * Partitioner selects the target channel based on the key group index. The key group - * index is derived from the key of the elements using the {@link KeyGroupAssigner}. + * Partitioner selects the target channel based on the key group index. * * @param <T> Type of the elements in the Stream being partitioned */ @@ -39,15 +37,16 @@ public class KeyGroupStreamPartitioner<T, K> extends StreamPartitioner<T> implem private final KeySelector<T, K> keySelector; - private final KeyGroupAssigner<K> keyGroupAssigner; + private int maxParallelism; - public KeyGroupStreamPartitioner(KeySelector<T, K> keySelector, KeyGroupAssigner<K> keyGroupAssigner) { + public KeyGroupStreamPartitioner(KeySelector<T, K> keySelector, int maxParallelism) { + Preconditions.checkArgument(maxParallelism > 0, "Number of key-groups must be > 0!"); this.keySelector = Preconditions.checkNotNull(keySelector); - this.keyGroupAssigner = Preconditions.checkNotNull(keyGroupAssigner); + this.maxParallelism = maxParallelism; } - public KeyGroupAssigner<K> getKeyGroupAssigner() { - return keyGroupAssigner; + public int getMaxParallelism() { + return maxParallelism; } @Override @@ -61,10 +60,7 @@ public class KeyGroupStreamPartitioner<T, K> extends StreamPartitioner<T> implem } catch (Exception e) { throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e); } - returnArray[0] = KeyGroupRange.computeOperatorIndexForKeyGroup( - keyGroupAssigner.getNumberKeyGroups(), - numberOfOutputChannels, - keyGroupAssigner.getKeyGroupIndex(key)); + returnArray[0] = KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, numberOfOutputChannels); return returnArray; } @@ -80,6 +76,6 @@ public class KeyGroupStreamPartitioner<T, K> extends StreamPartitioner<T> implem @Override public void configure(int maxParallelism) { - keyGroupAssigner.setup(maxParallelism); + this.maxParallelism = maxParallelism; } } http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 13f650c..701281b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -19,7 +19,6 @@ package org.apache.flink.streaming.runtime.tasks; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.accumulators.Accumulator; -import org.apache.flink.api.common.state.KeyGroupAssigner; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; @@ -762,7 +761,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>> public <K> KeyedStateBackend<K> createKeyedStateBackend( TypeSerializer<K> keySerializer, - KeyGroupAssigner<K> keyGroupAssigner, + int numberOfKeyGroups, KeyGroupRange keyGroupRange) throws Exception { if (keyedStateBackend != null) { @@ -779,7 +778,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>> getEnvironment().getJobID(), operatorIdentifier, keySerializer, - keyGroupAssigner, + numberOfKeyGroups, keyGroupRange, lazyRestoreKeyGroupStates, getEnvironment().getTaskKvStateRegistry()); @@ -791,7 +790,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>> getEnvironment().getJobID(), operatorIdentifier, keySerializer, - keyGroupAssigner, + numberOfKeyGroups, keyGroupRange, getEnvironment().getTaskKvStateRegistry()); } http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java index 06d381f..874274f 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java @@ -19,11 +19,9 @@ package org.apache.flink.streaming.api.graph; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.state.KeyGroupAssigner; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.runtime.state.HashKeyGroupAssigner; import org.apache.flink.streaming.api.datastream.ConnectedStreams; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; @@ -270,10 +268,6 @@ public class StreamGraphGeneratorTest { StreamNode keyedResultNode = graph.getStreamNode(keyedResult.getId()); StreamPartitioner<?> streamPartitioner = keyedResultNode.getInEdges().get(0).getPartitioner(); - - HashKeyGroupAssigner<?> hashKeyGroupAssigner = extractHashKeyGroupAssigner(streamPartitioner); - - assertEquals(maxParallelism, hashKeyGroupAssigner.getNumberKeyGroups()); } /** @@ -384,7 +378,7 @@ public class StreamGraphGeneratorTest { } /** - * Tests that the max parallelism and the key group partitioner is properly set for connected + * Tests that the max parallelism is properly set for connected * streams. */ @Test @@ -423,24 +417,6 @@ public class StreamGraphGeneratorTest { StreamPartitioner<?> streamPartitioner1 = keyedResultNode.getInEdges().get(0).getPartitioner(); StreamPartitioner<?> streamPartitioner2 = keyedResultNode.getInEdges().get(1).getPartitioner(); - - HashKeyGroupAssigner<?> hashKeyGroupAssigner1 = extractHashKeyGroupAssigner(streamPartitioner1); - assertEquals(maxParallelism, hashKeyGroupAssigner1.getNumberKeyGroups()); - - HashKeyGroupAssigner<?> hashKeyGroupAssigner2 = extractHashKeyGroupAssigner(streamPartitioner2); - assertEquals(maxParallelism, hashKeyGroupAssigner2.getNumberKeyGroups()); - } - - private HashKeyGroupAssigner<?> extractHashKeyGroupAssigner(StreamPartitioner<?> streamPartitioner) { - assertTrue(streamPartitioner instanceof KeyGroupStreamPartitioner); - - KeyGroupStreamPartitioner<?, ?> keyGroupStreamPartitioner = (KeyGroupStreamPartitioner<?, ?>) streamPartitioner; - - KeyGroupAssigner<?> keyGroupAssigner = keyGroupStreamPartitioner.getKeyGroupAssigner(); - - assertTrue(keyGroupAssigner instanceof HashKeyGroupAssigner); - - return (HashKeyGroupAssigner<?>) keyGroupAssigner; } private static class OutputTypeConfigurableOperationWithTwoInputs