[ https://issues.apache.org/jira/browse/FLINK-3755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15471287#comment-15471287 ]
ASF GitHub Bot commented on FLINK-3755: --------------------------------------- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/2440#discussion_r77870173 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java --- @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.util.MathUtils; +import org.apache.flink.util.Preconditions; + +public final class KeyGroupRangeAssignment { + + public static final int DEFAULT_MAX_PARALLELISM = 128; + + private KeyGroupRangeAssignment() { + throw new AssertionError(); + } + + /** + * Assigns the given key to a parallel operator index. + * + * @param key the key to assign + * @param maxParallelism the maximum supported parallelism, aka the number of key-groups. + * @param parallelism the current parallelism of the operator + * @return the index of the parallel operator to which the given key should be routed. + */ + public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) { + return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism)); + } + + /** + * Assigns the given key to a key-group index. + * + * @param key the key to assign + * @param maxParallelism the maximum supported parallelism, aka the number of key-groups. + * @return the key-group to which the given key is assigned + */ + public static final int assignToKeyGroup(Object key, int maxParallelism) { + return MathUtils.murmurHash(key.hashCode()) % maxParallelism; + } + + /** + * Computes the range of key-groups that are assigned to a given operator under the given parallelism and maximum + * parallelism. + * + * IMPORTANT: maxParallelism must be <= Short.MAX_VALUE to avoid rounding problems in this method. If we ever want + * to go beyond this boundary, this method must perform arithmetic on long values. + * + * @param maxParallelism Maximal parallelism that the job was initially created with. + * @param parallelism The current parallelism under which the job runs. Must be <= maxParallelism. + * @param operatorIndex Id of a key-group. 0 <= keyGroupID < maxParallelism. + * @return + */ + public static KeyGroupRange computeKeyGroupRangeForOperatorIndex( + int maxParallelism, + int parallelism, + int operatorIndex) { + Preconditions.checkArgument(parallelism > 0, "Parallelism must not be smaller than zero."); + Preconditions.checkArgument(maxParallelism >= parallelism, "Maximum parallelism must not be smaller than parallelism."); + Preconditions.checkArgument(maxParallelism <= (1 << 15), "Maximum parallelism must be smaller than 2^15."); + + int start = operatorIndex == 0 ? 0 : ((operatorIndex * maxParallelism - 1) / parallelism) + 1; + int end = ((operatorIndex + 1) * maxParallelism - 1) / parallelism; --- End diff -- I don't think that this is giving us the correct inverse for computeOperatorIndexForKeyGroup(...). Our test CheckpointCoordinatorTest::testCreateKeyGroupPartitions()generates counter-examples. > Introduce key groups for key-value state to support dynamic scaling > ------------------------------------------------------------------- > > Key: FLINK-3755 > URL: https://issues.apache.org/jira/browse/FLINK-3755 > Project: Flink > Issue Type: New Feature > Affects Versions: 1.1.0 > Reporter: Till Rohrmann > Assignee: Till Rohrmann > Fix For: 1.2.0 > > > In order to support dynamic scaling, it is necessary to sub-partition the > key-value states of each operator. This sub-partitioning, which produces a > set of key groups, allows to easily scale in and out Flink jobs by simply > reassigning the different key groups to the new set of sub tasks. The idea of > key groups is described in this design document [1]. > [1] > https://docs.google.com/document/d/1G1OS1z3xEBOrYD4wSu-LuBCyPUWyFd9l3T9WyssQ63w/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.4#6332)