[jira] [Commented] (FLINK-6034) Add KeyedStateHandle for the snapshots in keyed streams
[ https://issues.apache.org/jira/browse/FLINK-6034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15946381#comment-15946381 ] ASF GitHub Bot commented on FLINK-6034: --- Github user shixiaogang closed the pull request at: https://github.com/apache/flink/pull/3531 > Add KeyedStateHandle for the snapshots in keyed streams > --- > > Key: FLINK-6034 > URL: https://issues.apache.org/jira/browse/FLINK-6034 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > Fix For: 1.3.0 > > > Currently, the only type of the snapshots in keyed streams is > {{KeyGroupsStateHandle}} which is full and store the states one group after > another. With the introduction of incremental checkpointing, we need a higher > level abstraction of keyed snapshots to allow flexible snapshot formats. > The implementation of {{KeyedStateHandle}} s may vary a lot in different > backends. The only information needed in {{KeyedStateHandle}} s is their key > group range. When recovering the job with a different degree of parallelism, > {{KeyedStateHandle}} s will be assigned to those subtasks whose key group > ranges overlap with their ranges. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6034) Add KeyedStateHandle for the snapshots in keyed streams
[ https://issues.apache.org/jira/browse/FLINK-6034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15946380#comment-15946380 ] ASF GitHub Bot commented on FLINK-6034: --- Github user shixiaogang commented on the issue: https://github.com/apache/flink/pull/3531 @StefanRRichter Thanks for your work. I will close the PR. > Add KeyedStateHandle for the snapshots in keyed streams > --- > > Key: FLINK-6034 > URL: https://issues.apache.org/jira/browse/FLINK-6034 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > Fix For: 1.3.0 > > > Currently, the only type of the snapshots in keyed streams is > {{KeyGroupsStateHandle}} which is full and store the states one group after > another. With the introduction of incremental checkpointing, we need a higher > level abstraction of keyed snapshots to allow flexible snapshot formats. > The implementation of {{KeyedStateHandle}} s may vary a lot in different > backends. The only information needed in {{KeyedStateHandle}} s is their key > group range. When recovering the job with a different degree of parallelism, > {{KeyedStateHandle}} s will be assigned to those subtasks whose key group > ranges overlap with their ranges. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6034) Add KeyedStateHandle for the snapshots in keyed streams
[ https://issues.apache.org/jira/browse/FLINK-6034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15945649#comment-15945649 ] ASF GitHub Bot commented on FLINK-6034: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3531 Merged in cd5527417a1cae57073a8855c6c3b88c88c780aa. @shixiaogang can you please close the PR? > Add KeyedStateHandle for the snapshots in keyed streams > --- > > Key: FLINK-6034 > URL: https://issues.apache.org/jira/browse/FLINK-6034 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Currently, the only type of the snapshots in keyed streams is > {{KeyGroupsStateHandle}} which is full and store the states one group after > another. With the introduction of incremental checkpointing, we need a higher > level abstraction of keyed snapshots to allow flexible snapshot formats. > The implementation of {{KeyedStateHandle}} s may vary a lot in different > backends. The only information needed in {{KeyedStateHandle}} s is their key > group range. When recovering the job with a different degree of parallelism, > {{KeyedStateHandle}} s will be assigned to those subtasks whose key group > ranges overlap with their ranges. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6034) Add KeyedStateHandle for the snapshots in keyed streams
[ https://issues.apache.org/jira/browse/FLINK-6034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15945473#comment-15945473 ] ASF GitHub Bot commented on FLINK-6034: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3531 Thanks for this very nice contribution @shixiaogang! I will merge this now. > Add KeyedStateHandle for the snapshots in keyed streams > --- > > Key: FLINK-6034 > URL: https://issues.apache.org/jira/browse/FLINK-6034 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Currently, the only type of the snapshots in keyed streams is > {{KeyGroupsStateHandle}} which is full and store the states one group after > another. With the introduction of incremental checkpointing, we need a higher > level abstraction of keyed snapshots to allow flexible snapshot formats. > The implementation of {{KeyedStateHandle}} s may vary a lot in different > backends. The only information needed in {{KeyedStateHandle}} s is their key > group range. When recovering the job with a different degree of parallelism, > {{KeyedStateHandle}} s will be assigned to those subtasks whose key group > ranges overlap with their ranges. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6034) Add KeyedStateHandle for the snapshots in keyed streams
[ https://issues.apache.org/jira/browse/FLINK-6034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15944944#comment-15944944 ] ASF GitHub Bot commented on FLINK-6034: --- Github user shixiaogang commented on the issue: https://github.com/apache/flink/pull/3531 @StefanRRichter I updated the PR as suggested. Very appreciated for your hard work. > Add KeyedStateHandle for the snapshots in keyed streams > --- > > Key: FLINK-6034 > URL: https://issues.apache.org/jira/browse/FLINK-6034 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Currently, the only type of the snapshots in keyed streams is > {{KeyGroupsStateHandle}} which is full and store the states one group after > another. With the introduction of incremental checkpointing, we need a higher > level abstraction of keyed snapshots to allow flexible snapshot formats. > The implementation of {{KeyedStateHandle}} s may vary a lot in different > backends. The only information needed in {{KeyedStateHandle}} s is their key > group range. When recovering the job with a different degree of parallelism, > {{KeyedStateHandle}} s will be assigned to those subtasks whose key group > ranges overlap with their ranges. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6034) Add KeyedStateHandle for the snapshots in keyed streams
[ https://issues.apache.org/jira/browse/FLINK-6034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940152#comment-15940152 ] ASF GitHub Bot commented on FLINK-6034: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3531 @shixiaogang I had a few more comments on the updated PR. When they are resolved, I think this can be merged immediately. > Add KeyedStateHandle for the snapshots in keyed streams > --- > > Key: FLINK-6034 > URL: https://issues.apache.org/jira/browse/FLINK-6034 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Currently, the only type of the snapshots in keyed streams is > {{KeyGroupsStateHandle}} which is full and store the states one group after > another. With the introduction of incremental checkpointing, we need a higher > level abstraction of keyed snapshots to allow flexible snapshot formats. > The implementation of {{KeyedStateHandle}} s may vary a lot in different > backends. The only information needed in {{KeyedStateHandle}} s is their key > group range. When recovering the job with a different degree of parallelism, > {{KeyedStateHandle}} s will be assigned to those subtasks whose key group > ranges overlap with their ranges. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6034) Add KeyedStateHandle for the snapshots in keyed streams
[ https://issues.apache.org/jira/browse/FLINK-6034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940151#comment-15940151 ] ASF GitHub Bot commented on FLINK-6034: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3531#discussion_r107877948 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java --- @@ -30,7 +31,7 @@ */ public class OperatorSnapshotResult { - private RunnableFuture keyedStateManagedFuture; + private RunnableFuture keyedStateManagedFuture; private RunnableFuture keyedStateRawFuture; --- End diff -- I think this generic type can also be changed to the higher-level interface `RunnableFuture`. > Add KeyedStateHandle for the snapshots in keyed streams > --- > > Key: FLINK-6034 > URL: https://issues.apache.org/jira/browse/FLINK-6034 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Currently, the only type of the snapshots in keyed streams is > {{KeyGroupsStateHandle}} which is full and store the states one group after > another. With the introduction of incremental checkpointing, we need a higher > level abstraction of keyed snapshots to allow flexible snapshot formats. > The implementation of {{KeyedStateHandle}} s may vary a lot in different > backends. The only information needed in {{KeyedStateHandle}} s is their key > group range. When recovering the job with a different degree of parallelism, > {{KeyedStateHandle}} s will be assigned to those subtasks whose key group > ranges overlap with their ranges. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6034) Add KeyedStateHandle for the snapshots in keyed streams
[ https://issues.apache.org/jira/browse/FLINK-6034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940145#comment-15940145 ] ASF GitHub Bot commented on FLINK-6034: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3531#discussion_r107876739 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java --- @@ -57,6 +58,7 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import javax.swing.plaf.basic.BasicSplitPaneUI; --- End diff -- This import should not be here. > Add KeyedStateHandle for the snapshots in keyed streams > --- > > Key: FLINK-6034 > URL: https://issues.apache.org/jira/browse/FLINK-6034 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Currently, the only type of the snapshots in keyed streams is > {{KeyGroupsStateHandle}} which is full and store the states one group after > another. With the introduction of incremental checkpointing, we need a higher > level abstraction of keyed snapshots to allow flexible snapshot formats. > The implementation of {{KeyedStateHandle}} s may vary a lot in different > backends. The only information needed in {{KeyedStateHandle}} s is their key > group range. When recovering the job with a different degree of parallelism, > {{KeyedStateHandle}} s will be assigned to those subtasks whose key group > ranges overlap with their ranges. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6034) Add KeyedStateHandle for the snapshots in keyed streams
[ https://issues.apache.org/jira/browse/FLINK-6034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940131#comment-15940131 ] ASF GitHub Bot commented on FLINK-6034: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3531#discussion_r107874660 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateHandle.java --- @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +/** + * Base for the handles of the checkpointed states in keyed streams. When + * recovering from failures, the handle will be passed to all tasks whose key + * group ranges overlap with it. + */ +public interface KeyedStateHandle extends StateObject { + + /** +* Returns the range of the key groups contained in the state. +*/ + KeyGroupRange getKeyGroupRange(); + + /** +* Returns a keyed state handle which contains the states for the given --- End diff -- This comment is not correct. It does not necessarily contain the whole given range, but the intersection of the given range with the range of the handle. You can just copy-paste it from the method in `KeyGroupsStateHandle`. > Add KeyedStateHandle for the snapshots in keyed streams > --- > > Key: FLINK-6034 > URL: https://issues.apache.org/jira/browse/FLINK-6034 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Currently, the only type of the snapshots in keyed streams is > {{KeyGroupsStateHandle}} which is full and store the states one group after > another. With the introduction of incremental checkpointing, we need a higher > level abstraction of keyed snapshots to allow flexible snapshot formats. > The implementation of {{KeyedStateHandle}} s may vary a lot in different > backends. The only information needed in {{KeyedStateHandle}} s is their key > group range. When recovering the job with a different degree of parallelism, > {{KeyedStateHandle}} s will be assigned to those subtasks whose key group > ranges overlap with their ranges. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6034) Add KeyedStateHandle for the snapshots in keyed streams
[ https://issues.apache.org/jira/browse/FLINK-6034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940129#comment-15940129 ] ASF GitHub Bot commented on FLINK-6034: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3531#discussion_r107874353 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -761,6 +769,13 @@ private void restoreKVStateMetaData() throws IOException, ClassNotFoundException private void restoreKVStateData() throws IOException, RocksDBException { //for all key-groups in the current state handle... for (Tuple2keyGroupOffset : currentKeyGroupsStateHandle.getGroupRangeOffsets()) { + int keyGroup = keyGroupOffset.f0; + + // Skip those key groups that do not belong to the backend + if (!rocksDBKeyedStateBackend.getKeyGroupRange().contains(keyGroup)) { --- End diff -- The same comment I had on the `HeapKeyedStateBackend` also applies here: I think the post-filter is no longer required after the change i suggested for `StateAssignmentOperation`. > Add KeyedStateHandle for the snapshots in keyed streams > --- > > Key: FLINK-6034 > URL: https://issues.apache.org/jira/browse/FLINK-6034 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Currently, the only type of the snapshots in keyed streams is > {{KeyGroupsStateHandle}} which is full and store the states one group after > another. With the introduction of incremental checkpointing, we need a higher > level abstraction of keyed snapshots to allow flexible snapshot formats. > The implementation of {{KeyedStateHandle}} s may vary a lot in different > backends. The only information needed in {{KeyedStateHandle}} s is their key > group range. When recovering the job with a different degree of parallelism, > {{KeyedStateHandle}} s will be assigned to those subtasks whose key group > ranges overlap with their ranges. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6034) Add KeyedStateHandle for the snapshots in keyed streams
[ https://issues.apache.org/jira/browse/FLINK-6034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940124#comment-15940124 ] ASF GitHub Bot commented on FLINK-6034: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3531#discussion_r107873583 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java --- @@ -412,9 +421,15 @@ private void restorePartitionedState(Collection state) thr } } - for (Tuple2groupOffset : keyGroupsHandle.getGroupRangeOffsets()) { + for (Tuple2 groupOffset : keyGroupsStateHandle.getGroupRangeOffsets()) { int keyGroupIndex = groupOffset.f0; long offset = groupOffset.f1; + + // Skip those key groups that don't belong to the backend. + if (!keyGroupRange.contains(keyGroupIndex)) { --- End diff -- I think this is no longer required, because we are now "cutting out" the right key-groups from each state handle in the `StateAssignmentOperation`, using the `KeyedStateHandle::getIntersection(...)` method. In fact, I think that receiving a key-group that is not in the backend's range could now be considered as an error. We could rewrite this as a precondition check, or an assert. > Add KeyedStateHandle for the snapshots in keyed streams > --- > > Key: FLINK-6034 > URL: https://issues.apache.org/jira/browse/FLINK-6034 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Currently, the only type of the snapshots in keyed streams is > {{KeyGroupsStateHandle}} which is full and store the states one group after > another. With the introduction of incremental checkpointing, we need a higher > level abstraction of keyed snapshots to allow flexible snapshot formats. > The implementation of {{KeyedStateHandle}} s may vary a lot in different > backends. The only information needed in {{KeyedStateHandle}} s is their key > group range. When recovering the job with a different degree of parallelism, > {{KeyedStateHandle}} s will be assigned to those subtasks whose key group > ranges overlap with their ranges. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6034) Add KeyedStateHandle for the snapshots in keyed streams
[ https://issues.apache.org/jira/browse/FLINK-6034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940108#comment-15940108 ] ASF GitHub Bot commented on FLINK-6034: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3531#discussion_r107870871 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java --- @@ -290,19 +291,19 @@ private static void assignTaskStatesToOperatorInstances( * * This is publicly visible to be used in tests. */ - public static List getKeyGroupsStateHandles( - Collection allKeyGroupsHandles, - KeyGroupRange subtaskKeyGroupIds) { + public static List getKeyedStateHandles( + Collection keyedStateHandles, + KeyGroupRange subtaskKeyGroupRange) { - List subtaskKeyGroupStates = new ArrayList<>(); + List subtaskKeyedStateHandles = new ArrayList<>(); - for (KeyGroupsStateHandle storedKeyGroup : allKeyGroupsHandles) { - KeyGroupsStateHandle intersection = storedKeyGroup.getKeyGroupIntersection(subtaskKeyGroupIds); - if (intersection.getNumberOfKeyGroups() > 0) { - subtaskKeyGroupStates.add(intersection); - } + for (KeyedStateHandle keyedStateHandle : keyedStateHandles) { + KeyedStateHandle intersectedKeyedStateHandle = keyedStateHandle.getIntersection(subtaskKeyGroupRange); + + subtaskKeyedStateHandles.add(intersectedKeyedStateHandle); --- End diff -- I think we should only add a state handle if the key group range from the intersection is non-empty. Even though those empty handles are probably ignored later, I think it is cleaner. > Add KeyedStateHandle for the snapshots in keyed streams > --- > > Key: FLINK-6034 > URL: https://issues.apache.org/jira/browse/FLINK-6034 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Currently, the only type of the snapshots in keyed streams is > {{KeyGroupsStateHandle}} which is full and store the states one group after > another. With the introduction of incremental checkpointing, we need a higher > level abstraction of keyed snapshots to allow flexible snapshot formats. > The implementation of {{KeyedStateHandle}} s may vary a lot in different > backends. The only information needed in {{KeyedStateHandle}} s is their key > group range. When recovering the job with a different degree of parallelism, > {{KeyedStateHandle}} s will be assigned to those subtasks whose key group > ranges overlap with their ranges. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6034) Add KeyedStateHandle for the snapshots in keyed streams
[ https://issues.apache.org/jira/browse/FLINK-6034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15938601#comment-15938601 ] ASF GitHub Bot commented on FLINK-6034: --- Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3531#discussion_r107704681 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java --- @@ -91,10 +98,10 @@ public KeyGroupsStateHandle getKeyGroupIntersection(KeyGroupRange keyGroupRange) /** * -* @return the internal key-group range to offsets metadata +* @return the start key group in the key-group range of this handle */ - public KeyGroupRangeOffsets getGroupRangeOffsets() { - return groupRangeOffsets; + public int getStartKeyGroup() { --- End diff -- I have removed all pass-through methods except `getGroupRangeOffsets()` because `StateInitializationContextImpl$KeyGroupStreamIterator` is using it to get the iterator for key groups and their offsets. > Add KeyedStateHandle for the snapshots in keyed streams > --- > > Key: FLINK-6034 > URL: https://issues.apache.org/jira/browse/FLINK-6034 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Currently, the only type of the snapshots in keyed streams is > {{KeyGroupsStateHandle}} which is full and store the states one group after > another. With the introduction of incremental checkpointing, we need a higher > level abstraction of keyed snapshots to allow flexible snapshot formats. > The implementation of {{KeyedStateHandle}} s may vary a lot in different > backends. The only information needed in {{KeyedStateHandle}} s is their key > group range. When recovering the job with a different degree of parallelism, > {{KeyedStateHandle}} s will be assigned to those subtasks whose key group > ranges overlap with their ranges. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6034) Add KeyedStateHandle for the snapshots in keyed streams
[ https://issues.apache.org/jira/browse/FLINK-6034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15938588#comment-15938588 ] ASF GitHub Bot commented on FLINK-6034: --- Github user shixiaogang commented on the issue: https://github.com/apache/flink/pull/3531 @StefanRRichter Thanks a lot for your comments. I have updated the pull request as suggested, making the type of raw keyed states to be `KeyedStateHandle` as well. > Add KeyedStateHandle for the snapshots in keyed streams > --- > > Key: FLINK-6034 > URL: https://issues.apache.org/jira/browse/FLINK-6034 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Currently, the only type of the snapshots in keyed streams is > {{KeyGroupsStateHandle}} which is full and store the states one group after > another. With the introduction of incremental checkpointing, we need a higher > level abstraction of keyed snapshots to allow flexible snapshot formats. > The implementation of {{KeyedStateHandle}} s may vary a lot in different > backends. The only information needed in {{KeyedStateHandle}} s is their key > group range. When recovering the job with a different degree of parallelism, > {{KeyedStateHandle}} s will be assigned to those subtasks whose key group > ranges overlap with their ranges. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6034) Add KeyedStateHandle for the snapshots in keyed streams
[ https://issues.apache.org/jira/browse/FLINK-6034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15938146#comment-15938146 ] ASF GitHub Bot commented on FLINK-6034: --- Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3531#discussion_r107646547 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java --- @@ -306,6 +307,29 @@ private static void assignTaskStatesToOperatorInstances( } /** +* Determine the subset of {@link KeyGroupsStateHandle KeyGroupsStateHandles} with correct +* key group index for the given subtask {@link KeyGroupRange}. +* +* This is publicly visible to be used in tests. +*/ + public static List getKeyedStateHandles( --- End diff -- +1. Will update the PR as suggested. > Add KeyedStateHandle for the snapshots in keyed streams > --- > > Key: FLINK-6034 > URL: https://issues.apache.org/jira/browse/FLINK-6034 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Currently, the only type of the snapshots in keyed streams is > {{KeyGroupsStateHandle}} which is full and store the states one group after > another. With the introduction of incremental checkpointing, we need a higher > level abstraction of keyed snapshots to allow flexible snapshot formats. > The implementation of {{KeyedStateHandle}} s may vary a lot in different > backends. The only information needed in {{KeyedStateHandle}} s is their key > group range. When recovering the job with a different degree of parallelism, > {{KeyedStateHandle}} s will be assigned to those subtasks whose key group > ranges overlap with their ranges. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6034) Add KeyedStateHandle for the snapshots in keyed streams
[ https://issues.apache.org/jira/browse/FLINK-6034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15938144#comment-15938144 ] ASF GitHub Bot commented on FLINK-6034: --- Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3531#discussion_r107646429 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java --- @@ -306,6 +307,29 @@ private static void assignTaskStatesToOperatorInstances( } /** +* Determine the subset of {@link KeyGroupsStateHandle KeyGroupsStateHandles} with correct +* key group index for the given subtask {@link KeyGroupRange}. +* +* This is publicly visible to be used in tests. +*/ + public static List getKeyedStateHandles( + Collection keyedStateHandles, + KeyGroupRange subtaskKeyGroupRange) { + + List subtaskKeyedStateHandles = new ArrayList<>(); + + for (KeyedStateHandle keyedStateHandle : keyedStateHandles) { + KeyGroupRange intersection = keyedStateHandle.getKeyGroupRange().getIntersection(subtaskKeyGroupRange); --- End diff -- The idea is great! It does make sense to allow a `KeyedStateHandle` to create a new `KeyedStateHandle` to the states of the sub range. I will update the PR as suggested. > Add KeyedStateHandle for the snapshots in keyed streams > --- > > Key: FLINK-6034 > URL: https://issues.apache.org/jira/browse/FLINK-6034 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Currently, the only type of the snapshots in keyed streams is > {{KeyGroupsStateHandle}} which is full and store the states one group after > another. With the introduction of incremental checkpointing, we need a higher > level abstraction of keyed snapshots to allow flexible snapshot formats. > The implementation of {{KeyedStateHandle}} s may vary a lot in different > backends. The only information needed in {{KeyedStateHandle}} s is their key > group range. When recovering the job with a different degree of parallelism, > {{KeyedStateHandle}} s will be assigned to those subtasks whose key group > ranges overlap with their ranges. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6034) Add KeyedStateHandle for the snapshots in keyed streams
[ https://issues.apache.org/jira/browse/FLINK-6034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15936306#comment-15936306 ] ASF GitHub Bot commented on FLINK-6034: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3531#discussion_r107409859 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java --- @@ -91,10 +98,10 @@ public KeyGroupsStateHandle getKeyGroupIntersection(KeyGroupRange keyGroupRange) /** * -* @return the internal key-group range to offsets metadata +* @return the start key group in the key-group range of this handle */ - public KeyGroupRangeOffsets getGroupRangeOffsets() { - return groupRangeOffsets; + public int getStartKeyGroup() { --- End diff -- I suggest to remove this pass-through method, as it at a very low abstraction level. This information can be obtained already from the result of `getKeyGroupRange()`. Now, that we expose the `KeyGroupRange`, also `getNumberOfKeyGroups()` could be removed, as we can ask the `KeyGroupRange` directly. > Add KeyedStateHandle for the snapshots in keyed streams > --- > > Key: FLINK-6034 > URL: https://issues.apache.org/jira/browse/FLINK-6034 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Currently, the only type of the snapshots in keyed streams is > {{KeyGroupsStateHandle}} which is full and store the states one group after > another. With the introduction of incremental checkpointing, we need a higher > level abstraction of keyed snapshots to allow flexible snapshot formats. > The implementation of {{KeyedStateHandle}} s may vary a lot in different > backends. The only information needed in {{KeyedStateHandle}} s is their key > group range. When recovering the job with a different degree of parallelism, > {{KeyedStateHandle}} s will be assigned to those subtasks whose key group > ranges overlap with their ranges. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6034) Add KeyedStateHandle for the snapshots in keyed streams
[ https://issues.apache.org/jira/browse/FLINK-6034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15936305#comment-15936305 ] ASF GitHub Bot commented on FLINK-6034: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3531#discussion_r107410978 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java --- @@ -306,6 +307,29 @@ private static void assignTaskStatesToOperatorInstances( } /** +* Determine the subset of {@link KeyGroupsStateHandle KeyGroupsStateHandles} with correct +* key group index for the given subtask {@link KeyGroupRange}. +* +* This is publicly visible to be used in tests. +*/ + public static List getKeyedStateHandles( + Collection keyedStateHandles, + KeyGroupRange subtaskKeyGroupRange) { + + List subtaskKeyedStateHandles = new ArrayList<>(); + + for (KeyedStateHandle keyedStateHandle : keyedStateHandles) { + KeyGroupRange intersection = keyedStateHandle.getKeyGroupRange().getIntersection(subtaskKeyGroupRange); --- End diff -- I wonder if we could somehow introduce a `KeyedStateHandle::intersect(KeyGroupRange)` that again returns a `KeyedStateHandle` with a `KeyGroupRage` that is the intersection of the original range and the argument. Basically a higher level version of what the KeyGroupsStateHandle can do, and the concrete implementations (like `KeyGroupsStateHandle`) know how the virtually split themselves up into a sub-range. This also would transfer less data in the RPC (less offsets) and saves the post-filtering in the backend. Otherwise, we could have a boolean method for just checking intersection, because there is no need to create `KeyGroupRange` objects anymore, because we do not actually use them. > Add KeyedStateHandle for the snapshots in keyed streams > --- > > Key: FLINK-6034 > URL: https://issues.apache.org/jira/browse/FLINK-6034 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Currently, the only type of the snapshots in keyed streams is > {{KeyGroupsStateHandle}} which is full and store the states one group after > another. With the introduction of incremental checkpointing, we need a higher > level abstraction of keyed snapshots to allow flexible snapshot formats. > The implementation of {{KeyedStateHandle}} s may vary a lot in different > backends. The only information needed in {{KeyedStateHandle}} s is their key > group range. When recovering the job with a different degree of parallelism, > {{KeyedStateHandle}} s will be assigned to those subtasks whose key group > ranges overlap with their ranges. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6034) Add KeyedStateHandle for the snapshots in keyed streams
[ https://issues.apache.org/jira/browse/FLINK-6034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15936307#comment-15936307 ] ASF GitHub Bot commented on FLINK-6034: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3531#discussion_r107409091 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateHandle.java --- @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +/** + * A snapshot in keyed streams. Each {@code KeyedStateHandle} contains the --- End diff -- A state handle typically means a pointer to some state. It will mostly be used for snapshots of state backends, but conceptional this i a different level of abstraction. I suggest to slightly modify this comment, moving away from the concrete implementations of snapshots, to the concept of having a pointer to serialized keyed state. > Add KeyedStateHandle for the snapshots in keyed streams > --- > > Key: FLINK-6034 > URL: https://issues.apache.org/jira/browse/FLINK-6034 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Currently, the only type of the snapshots in keyed streams is > {{KeyGroupsStateHandle}} which is full and store the states one group after > another. With the introduction of incremental checkpointing, we need a higher > level abstraction of keyed snapshots to allow flexible snapshot formats. > The implementation of {{KeyedStateHandle}} s may vary a lot in different > backends. The only information needed in {{KeyedStateHandle}} s is their key > group range. When recovering the job with a different degree of parallelism, > {{KeyedStateHandle}} s will be assigned to those subtasks whose key group > ranges overlap with their ranges. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6034) Add KeyedStateHandle for the snapshots in keyed streams
[ https://issues.apache.org/jira/browse/FLINK-6034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15936308#comment-15936308 ] ASF GitHub Bot commented on FLINK-6034: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3531#discussion_r107411558 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java --- @@ -306,6 +307,29 @@ private static void assignTaskStatesToOperatorInstances( } /** +* Determine the subset of {@link KeyGroupsStateHandle KeyGroupsStateHandles} with correct +* key group index for the given subtask {@link KeyGroupRange}. +* +* This is publicly visible to be used in tests. +*/ + public static List getKeyedStateHandles( --- End diff -- Could we completely remove `getKeyGroupsStateHandles()` and remove everything with this method that works on `KeyedStateHandle`? This includes using also only `KeyedStateHandle` for the raw keyed state that is checkpointed into streams (without backend). Otherwise, this is almost duplicated code. What do you think? > Add KeyedStateHandle for the snapshots in keyed streams > --- > > Key: FLINK-6034 > URL: https://issues.apache.org/jira/browse/FLINK-6034 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Currently, the only type of the snapshots in keyed streams is > {{KeyGroupsStateHandle}} which is full and store the states one group after > another. With the introduction of incremental checkpointing, we need a higher > level abstraction of keyed snapshots to allow flexible snapshot formats. > The implementation of {{KeyedStateHandle}} s may vary a lot in different > backends. The only information needed in {{KeyedStateHandle}} s is their key > group range. When recovering the job with a different degree of parallelism, > {{KeyedStateHandle}} s will be assigned to those subtasks whose key group > ranges overlap with their ranges. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6034) Add KeyedStateHandle for the snapshots in keyed streams
[ https://issues.apache.org/jira/browse/FLINK-6034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15923996#comment-15923996 ] ASF GitHub Bot commented on FLINK-6034: --- GitHub user shixiaogang opened a pull request: https://github.com/apache/flink/pull/3531 [FLINK-6034][checkpoint] Add KeyedStateHandle for the snapshots in keyed streams ## Changes - Add `KeyedStateHandle` for the snapshots in keyed streams. `KeyGroupsStateHandle` now is one of its implementation. - Distribute `KeyedStateHandle`s to subtasks with their key group range. A `KeyedStateHandle` will be assigned to all subtasks whose key group range overlap with its range. You can merge this pull request into a Git repository by running: $ git pull https://github.com/shixiaogang/flink flink-6034 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3531.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3531 commit 9637dcc40d66a2702f5227b7bbe3ae66fca89adf Author: xiaogang.sxgDate: 2017-03-14T11:04:37Z Add KeyedStateHandle for the snapshots in keyed streams > Add KeyedStateHandle for the snapshots in keyed streams > --- > > Key: FLINK-6034 > URL: https://issues.apache.org/jira/browse/FLINK-6034 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Currently, the only type of the snapshots in keyed streams is > {{KeyGroupsStateHandle}} which is full and store the states one group after > another. With the introduction of incremental checkpointing, we need a higher > level abstraction of keyed snapshots to allow flexible snapshot formats. > The implementation of {{KeyedStateHandle}} s may vary a lot in different > backends. The only information needed in {{KeyedStateHandle}} s is their key > group range. When recovering the job with a different degree of parallelism, > {{KeyedStateHandle}} s will be assigned to those subtasks whose key group > ranges overlap with their ranges. -- This message was sent by Atlassian JIRA (v6.3.15#6346)