[jira] [Commented] (FLINK-6034) Add KeyedStateHandle for the snapshots in keyed streams

2017-03-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15946381#comment-15946381
 ] 

ASF GitHub Bot commented on FLINK-6034:
---

Github user shixiaogang closed the pull request at:

https://github.com/apache/flink/pull/3531


> Add KeyedStateHandle for the snapshots in keyed streams
> ---
>
> Key: FLINK-6034
> URL: https://issues.apache.org/jira/browse/FLINK-6034
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
> Fix For: 1.3.0
>
>
> Currently, the only type of the snapshots in keyed streams is 
> {{KeyGroupsStateHandle}} which is full and store the states one group after 
> another. With the introduction of incremental checkpointing, we need a higher 
> level abstraction of keyed snapshots to allow flexible snapshot formats. 
> The implementation of {{KeyedStateHandle}} s may vary a lot in different 
> backends. The only information needed in {{KeyedStateHandle}} s is their key 
> group range. When recovering the job with a different degree of parallelism, 
> {{KeyedStateHandle}} s will be assigned to those subtasks whose key group 
> ranges overlap with their ranges.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6034) Add KeyedStateHandle for the snapshots in keyed streams

2017-03-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15946380#comment-15946380
 ] 

ASF GitHub Bot commented on FLINK-6034:
---

Github user shixiaogang commented on the issue:

https://github.com/apache/flink/pull/3531
  
@StefanRRichter Thanks for your work. I will close the PR.


> Add KeyedStateHandle for the snapshots in keyed streams
> ---
>
> Key: FLINK-6034
> URL: https://issues.apache.org/jira/browse/FLINK-6034
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
> Fix For: 1.3.0
>
>
> Currently, the only type of the snapshots in keyed streams is 
> {{KeyGroupsStateHandle}} which is full and store the states one group after 
> another. With the introduction of incremental checkpointing, we need a higher 
> level abstraction of keyed snapshots to allow flexible snapshot formats. 
> The implementation of {{KeyedStateHandle}} s may vary a lot in different 
> backends. The only information needed in {{KeyedStateHandle}} s is their key 
> group range. When recovering the job with a different degree of parallelism, 
> {{KeyedStateHandle}} s will be assigned to those subtasks whose key group 
> ranges overlap with their ranges.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6034) Add KeyedStateHandle for the snapshots in keyed streams

2017-03-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15945649#comment-15945649
 ] 

ASF GitHub Bot commented on FLINK-6034:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/3531
  
Merged in cd5527417a1cae57073a8855c6c3b88c88c780aa. @shixiaogang can you 
please close the PR?


> Add KeyedStateHandle for the snapshots in keyed streams
> ---
>
> Key: FLINK-6034
> URL: https://issues.apache.org/jira/browse/FLINK-6034
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Currently, the only type of the snapshots in keyed streams is 
> {{KeyGroupsStateHandle}} which is full and store the states one group after 
> another. With the introduction of incremental checkpointing, we need a higher 
> level abstraction of keyed snapshots to allow flexible snapshot formats. 
> The implementation of {{KeyedStateHandle}} s may vary a lot in different 
> backends. The only information needed in {{KeyedStateHandle}} s is their key 
> group range. When recovering the job with a different degree of parallelism, 
> {{KeyedStateHandle}} s will be assigned to those subtasks whose key group 
> ranges overlap with their ranges.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6034) Add KeyedStateHandle for the snapshots in keyed streams

2017-03-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15945473#comment-15945473
 ] 

ASF GitHub Bot commented on FLINK-6034:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/3531
  
Thanks for this very nice contribution @shixiaogang! I will merge this now.


> Add KeyedStateHandle for the snapshots in keyed streams
> ---
>
> Key: FLINK-6034
> URL: https://issues.apache.org/jira/browse/FLINK-6034
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Currently, the only type of the snapshots in keyed streams is 
> {{KeyGroupsStateHandle}} which is full and store the states one group after 
> another. With the introduction of incremental checkpointing, we need a higher 
> level abstraction of keyed snapshots to allow flexible snapshot formats. 
> The implementation of {{KeyedStateHandle}} s may vary a lot in different 
> backends. The only information needed in {{KeyedStateHandle}} s is their key 
> group range. When recovering the job with a different degree of parallelism, 
> {{KeyedStateHandle}} s will be assigned to those subtasks whose key group 
> ranges overlap with their ranges.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6034) Add KeyedStateHandle for the snapshots in keyed streams

2017-03-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15944944#comment-15944944
 ] 

ASF GitHub Bot commented on FLINK-6034:
---

Github user shixiaogang commented on the issue:

https://github.com/apache/flink/pull/3531
  
@StefanRRichter I updated the PR as suggested. Very appreciated for your 
hard work.


> Add KeyedStateHandle for the snapshots in keyed streams
> ---
>
> Key: FLINK-6034
> URL: https://issues.apache.org/jira/browse/FLINK-6034
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Currently, the only type of the snapshots in keyed streams is 
> {{KeyGroupsStateHandle}} which is full and store the states one group after 
> another. With the introduction of incremental checkpointing, we need a higher 
> level abstraction of keyed snapshots to allow flexible snapshot formats. 
> The implementation of {{KeyedStateHandle}} s may vary a lot in different 
> backends. The only information needed in {{KeyedStateHandle}} s is their key 
> group range. When recovering the job with a different degree of parallelism, 
> {{KeyedStateHandle}} s will be assigned to those subtasks whose key group 
> ranges overlap with their ranges.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6034) Add KeyedStateHandle for the snapshots in keyed streams

2017-03-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940152#comment-15940152
 ] 

ASF GitHub Bot commented on FLINK-6034:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/3531
  
@shixiaogang I had a few more comments on the updated PR. When they are 
resolved, I think this can be merged immediately.


> Add KeyedStateHandle for the snapshots in keyed streams
> ---
>
> Key: FLINK-6034
> URL: https://issues.apache.org/jira/browse/FLINK-6034
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Currently, the only type of the snapshots in keyed streams is 
> {{KeyGroupsStateHandle}} which is full and store the states one group after 
> another. With the introduction of incremental checkpointing, we need a higher 
> level abstraction of keyed snapshots to allow flexible snapshot formats. 
> The implementation of {{KeyedStateHandle}} s may vary a lot in different 
> backends. The only information needed in {{KeyedStateHandle}} s is their key 
> group range. When recovering the job with a different degree of parallelism, 
> {{KeyedStateHandle}} s will be assigned to those subtasks whose key group 
> ranges overlap with their ranges.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6034) Add KeyedStateHandle for the snapshots in keyed streams

2017-03-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940151#comment-15940151
 ] 

ASF GitHub Bot commented on FLINK-6034:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3531#discussion_r107877948
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
 ---
@@ -30,7 +31,7 @@
  */
 public class OperatorSnapshotResult {
 
-   private RunnableFuture keyedStateManagedFuture;
+   private RunnableFuture keyedStateManagedFuture;
private RunnableFuture keyedStateRawFuture;
--- End diff --

I think this generic type can also be changed to the higher-level interface 
`RunnableFuture`.


> Add KeyedStateHandle for the snapshots in keyed streams
> ---
>
> Key: FLINK-6034
> URL: https://issues.apache.org/jira/browse/FLINK-6034
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Currently, the only type of the snapshots in keyed streams is 
> {{KeyGroupsStateHandle}} which is full and store the states one group after 
> another. With the introduction of incremental checkpointing, we need a higher 
> level abstraction of keyed snapshots to allow flexible snapshot formats. 
> The implementation of {{KeyedStateHandle}} s may vary a lot in different 
> backends. The only information needed in {{KeyedStateHandle}} s is their key 
> group range. When recovering the job with a different degree of parallelism, 
> {{KeyedStateHandle}} s will be assigned to those subtasks whose key group 
> ranges overlap with their ranges.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6034) Add KeyedStateHandle for the snapshots in keyed streams

2017-03-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940145#comment-15940145
 ] 

ASF GitHub Bot commented on FLINK-6034:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3531#discussion_r107876739
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 ---
@@ -57,6 +58,7 @@
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+import javax.swing.plaf.basic.BasicSplitPaneUI;
--- End diff --

This import should not be here.


> Add KeyedStateHandle for the snapshots in keyed streams
> ---
>
> Key: FLINK-6034
> URL: https://issues.apache.org/jira/browse/FLINK-6034
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Currently, the only type of the snapshots in keyed streams is 
> {{KeyGroupsStateHandle}} which is full and store the states one group after 
> another. With the introduction of incremental checkpointing, we need a higher 
> level abstraction of keyed snapshots to allow flexible snapshot formats. 
> The implementation of {{KeyedStateHandle}} s may vary a lot in different 
> backends. The only information needed in {{KeyedStateHandle}} s is their key 
> group range. When recovering the job with a different degree of parallelism, 
> {{KeyedStateHandle}} s will be assigned to those subtasks whose key group 
> ranges overlap with their ranges.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6034) Add KeyedStateHandle for the snapshots in keyed streams

2017-03-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940131#comment-15940131
 ] 

ASF GitHub Bot commented on FLINK-6034:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3531#discussion_r107874660
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateHandle.java
 ---
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+/**
+ * Base for the handles of the checkpointed states in keyed streams. When
+ * recovering from failures, the handle will be passed to all tasks whose 
key
+ * group ranges overlap with it.
+ */
+public interface KeyedStateHandle extends StateObject {
+
+   /**
+* Returns the range of the key groups contained in the state.
+*/
+   KeyGroupRange getKeyGroupRange();
+
+   /**
+* Returns a keyed state handle which contains the states for the given
--- End diff --

This comment is not correct. It does not necessarily contain the whole 
given range, but the intersection of the given range with the range of the 
handle. You can just copy-paste it from the method in `KeyGroupsStateHandle`.


> Add KeyedStateHandle for the snapshots in keyed streams
> ---
>
> Key: FLINK-6034
> URL: https://issues.apache.org/jira/browse/FLINK-6034
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Currently, the only type of the snapshots in keyed streams is 
> {{KeyGroupsStateHandle}} which is full and store the states one group after 
> another. With the introduction of incremental checkpointing, we need a higher 
> level abstraction of keyed snapshots to allow flexible snapshot formats. 
> The implementation of {{KeyedStateHandle}} s may vary a lot in different 
> backends. The only information needed in {{KeyedStateHandle}} s is their key 
> group range. When recovering the job with a different degree of parallelism, 
> {{KeyedStateHandle}} s will be assigned to those subtasks whose key group 
> ranges overlap with their ranges.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6034) Add KeyedStateHandle for the snapshots in keyed streams

2017-03-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940129#comment-15940129
 ] 

ASF GitHub Bot commented on FLINK-6034:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3531#discussion_r107874353
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -761,6 +769,13 @@ private void restoreKVStateMetaData() throws 
IOException, ClassNotFoundException
private void restoreKVStateData() throws IOException, 
RocksDBException {
//for all key-groups in the current state handle...
for (Tuple2 keyGroupOffset : 
currentKeyGroupsStateHandle.getGroupRangeOffsets()) {
+   int keyGroup = keyGroupOffset.f0;
+
+   // Skip those key groups that do not belong to 
the backend
+   if 
(!rocksDBKeyedStateBackend.getKeyGroupRange().contains(keyGroup)) {
--- End diff --

The same comment I had on the `HeapKeyedStateBackend` also applies here: I 
think the post-filter is no longer required after the change i suggested for 
`StateAssignmentOperation`.


> Add KeyedStateHandle for the snapshots in keyed streams
> ---
>
> Key: FLINK-6034
> URL: https://issues.apache.org/jira/browse/FLINK-6034
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Currently, the only type of the snapshots in keyed streams is 
> {{KeyGroupsStateHandle}} which is full and store the states one group after 
> another. With the introduction of incremental checkpointing, we need a higher 
> level abstraction of keyed snapshots to allow flexible snapshot formats. 
> The implementation of {{KeyedStateHandle}} s may vary a lot in different 
> backends. The only information needed in {{KeyedStateHandle}} s is their key 
> group range. When recovering the job with a different degree of parallelism, 
> {{KeyedStateHandle}} s will be assigned to those subtasks whose key group 
> ranges overlap with their ranges.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6034) Add KeyedStateHandle for the snapshots in keyed streams

2017-03-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940124#comment-15940124
 ] 

ASF GitHub Bot commented on FLINK-6034:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3531#discussion_r107873583
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 ---
@@ -412,9 +421,15 @@ private void 
restorePartitionedState(Collection state) thr
}
}
 
-   for (Tuple2 groupOffset : 
keyGroupsHandle.getGroupRangeOffsets()) {
+   for (Tuple2 groupOffset : 
keyGroupsStateHandle.getGroupRangeOffsets()) {
int keyGroupIndex = groupOffset.f0;
long offset = groupOffset.f1;
+
+   // Skip those key groups that don't 
belong to the backend.
+   if 
(!keyGroupRange.contains(keyGroupIndex)) {
--- End diff --

I think this is no longer required, because we are now "cutting out" the 
right key-groups from each state handle in the `StateAssignmentOperation`, 
using the `KeyedStateHandle::getIntersection(...)` method. In fact, I think 
that receiving a key-group that is not in the backend's range could now be 
considered as an error. We could rewrite this as a precondition check, or an 
assert.


> Add KeyedStateHandle for the snapshots in keyed streams
> ---
>
> Key: FLINK-6034
> URL: https://issues.apache.org/jira/browse/FLINK-6034
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Currently, the only type of the snapshots in keyed streams is 
> {{KeyGroupsStateHandle}} which is full and store the states one group after 
> another. With the introduction of incremental checkpointing, we need a higher 
> level abstraction of keyed snapshots to allow flexible snapshot formats. 
> The implementation of {{KeyedStateHandle}} s may vary a lot in different 
> backends. The only information needed in {{KeyedStateHandle}} s is their key 
> group range. When recovering the job with a different degree of parallelism, 
> {{KeyedStateHandle}} s will be assigned to those subtasks whose key group 
> ranges overlap with their ranges.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6034) Add KeyedStateHandle for the snapshots in keyed streams

2017-03-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940108#comment-15940108
 ] 

ASF GitHub Bot commented on FLINK-6034:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3531#discussion_r107870871
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
 ---
@@ -290,19 +291,19 @@ private static void 
assignTaskStatesToOperatorInstances(
 * 
 * This is publicly visible to be used in tests.
 */
-   public static List getKeyGroupsStateHandles(
-   Collection allKeyGroupsHandles,
-   KeyGroupRange subtaskKeyGroupIds) {
+   public static List getKeyedStateHandles(
+   Collection 
keyedStateHandles,
+   KeyGroupRange subtaskKeyGroupRange) {
 
-   List subtaskKeyGroupStates = new 
ArrayList<>();
+   List subtaskKeyedStateHandles = new 
ArrayList<>();
 
-   for (KeyGroupsStateHandle storedKeyGroup : allKeyGroupsHandles) 
{
-   KeyGroupsStateHandle intersection = 
storedKeyGroup.getKeyGroupIntersection(subtaskKeyGroupIds);
-   if (intersection.getNumberOfKeyGroups() > 0) {
-   subtaskKeyGroupStates.add(intersection);
-   }
+   for (KeyedStateHandle keyedStateHandle : keyedStateHandles) {
+   KeyedStateHandle intersectedKeyedStateHandle = 
keyedStateHandle.getIntersection(subtaskKeyGroupRange);
+
+   
subtaskKeyedStateHandles.add(intersectedKeyedStateHandle);
--- End diff --

I think we should only add a state handle if the key group range from the 
intersection is non-empty. Even though those empty handles are probably ignored 
later, I think it is cleaner.


> Add KeyedStateHandle for the snapshots in keyed streams
> ---
>
> Key: FLINK-6034
> URL: https://issues.apache.org/jira/browse/FLINK-6034
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Currently, the only type of the snapshots in keyed streams is 
> {{KeyGroupsStateHandle}} which is full and store the states one group after 
> another. With the introduction of incremental checkpointing, we need a higher 
> level abstraction of keyed snapshots to allow flexible snapshot formats. 
> The implementation of {{KeyedStateHandle}} s may vary a lot in different 
> backends. The only information needed in {{KeyedStateHandle}} s is their key 
> group range. When recovering the job with a different degree of parallelism, 
> {{KeyedStateHandle}} s will be assigned to those subtasks whose key group 
> ranges overlap with their ranges.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6034) Add KeyedStateHandle for the snapshots in keyed streams

2017-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15938601#comment-15938601
 ] 

ASF GitHub Bot commented on FLINK-6034:
---

Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3531#discussion_r107704681
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
 ---
@@ -91,10 +98,10 @@ public KeyGroupsStateHandle 
getKeyGroupIntersection(KeyGroupRange keyGroupRange)
 
/**
 *
-* @return the internal key-group range to offsets metadata
+* @return the start key group in the key-group range of this handle
 */
-   public KeyGroupRangeOffsets getGroupRangeOffsets() {
-   return groupRangeOffsets;
+   public int getStartKeyGroup() {
--- End diff --

I have removed all pass-through methods except `getGroupRangeOffsets()` 
because `StateInitializationContextImpl$KeyGroupStreamIterator` is using it to 
get the iterator for key groups and their offsets.


> Add KeyedStateHandle for the snapshots in keyed streams
> ---
>
> Key: FLINK-6034
> URL: https://issues.apache.org/jira/browse/FLINK-6034
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Currently, the only type of the snapshots in keyed streams is 
> {{KeyGroupsStateHandle}} which is full and store the states one group after 
> another. With the introduction of incremental checkpointing, we need a higher 
> level abstraction of keyed snapshots to allow flexible snapshot formats. 
> The implementation of {{KeyedStateHandle}} s may vary a lot in different 
> backends. The only information needed in {{KeyedStateHandle}} s is their key 
> group range. When recovering the job with a different degree of parallelism, 
> {{KeyedStateHandle}} s will be assigned to those subtasks whose key group 
> ranges overlap with their ranges.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6034) Add KeyedStateHandle for the snapshots in keyed streams

2017-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15938588#comment-15938588
 ] 

ASF GitHub Bot commented on FLINK-6034:
---

Github user shixiaogang commented on the issue:

https://github.com/apache/flink/pull/3531
  
@StefanRRichter Thanks a lot for your comments. I have updated the pull 
request as suggested, making the type of raw keyed states to be 
`KeyedStateHandle` as well.


> Add KeyedStateHandle for the snapshots in keyed streams
> ---
>
> Key: FLINK-6034
> URL: https://issues.apache.org/jira/browse/FLINK-6034
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Currently, the only type of the snapshots in keyed streams is 
> {{KeyGroupsStateHandle}} which is full and store the states one group after 
> another. With the introduction of incremental checkpointing, we need a higher 
> level abstraction of keyed snapshots to allow flexible snapshot formats. 
> The implementation of {{KeyedStateHandle}} s may vary a lot in different 
> backends. The only information needed in {{KeyedStateHandle}} s is their key 
> group range. When recovering the job with a different degree of parallelism, 
> {{KeyedStateHandle}} s will be assigned to those subtasks whose key group 
> ranges overlap with their ranges.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6034) Add KeyedStateHandle for the snapshots in keyed streams

2017-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15938146#comment-15938146
 ] 

ASF GitHub Bot commented on FLINK-6034:
---

Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3531#discussion_r107646547
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
 ---
@@ -306,6 +307,29 @@ private static void 
assignTaskStatesToOperatorInstances(
}
 
/**
+* Determine the subset of {@link KeyGroupsStateHandle 
KeyGroupsStateHandles} with correct
+* key group index for the given subtask {@link KeyGroupRange}.
+* 
+* This is publicly visible to be used in tests.
+*/
+   public static List getKeyedStateHandles(
--- End diff --

+1. Will update the PR as suggested.


> Add KeyedStateHandle for the snapshots in keyed streams
> ---
>
> Key: FLINK-6034
> URL: https://issues.apache.org/jira/browse/FLINK-6034
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Currently, the only type of the snapshots in keyed streams is 
> {{KeyGroupsStateHandle}} which is full and store the states one group after 
> another. With the introduction of incremental checkpointing, we need a higher 
> level abstraction of keyed snapshots to allow flexible snapshot formats. 
> The implementation of {{KeyedStateHandle}} s may vary a lot in different 
> backends. The only information needed in {{KeyedStateHandle}} s is their key 
> group range. When recovering the job with a different degree of parallelism, 
> {{KeyedStateHandle}} s will be assigned to those subtasks whose key group 
> ranges overlap with their ranges.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6034) Add KeyedStateHandle for the snapshots in keyed streams

2017-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15938144#comment-15938144
 ] 

ASF GitHub Bot commented on FLINK-6034:
---

Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3531#discussion_r107646429
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
 ---
@@ -306,6 +307,29 @@ private static void 
assignTaskStatesToOperatorInstances(
}
 
/**
+* Determine the subset of {@link KeyGroupsStateHandle 
KeyGroupsStateHandles} with correct
+* key group index for the given subtask {@link KeyGroupRange}.
+* 
+* This is publicly visible to be used in tests.
+*/
+   public static List getKeyedStateHandles(
+   Collection 
keyedStateHandles,
+   KeyGroupRange subtaskKeyGroupRange) {
+
+   List subtaskKeyedStateHandles = new 
ArrayList<>();
+
+   for (KeyedStateHandle keyedStateHandle : keyedStateHandles) {
+   KeyGroupRange intersection = 
keyedStateHandle.getKeyGroupRange().getIntersection(subtaskKeyGroupRange);
--- End diff --

The idea is great! It does make sense to allow a `KeyedStateHandle` to 
create a new `KeyedStateHandle` to the states of the sub range. I will update 
the PR as suggested.


> Add KeyedStateHandle for the snapshots in keyed streams
> ---
>
> Key: FLINK-6034
> URL: https://issues.apache.org/jira/browse/FLINK-6034
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Currently, the only type of the snapshots in keyed streams is 
> {{KeyGroupsStateHandle}} which is full and store the states one group after 
> another. With the introduction of incremental checkpointing, we need a higher 
> level abstraction of keyed snapshots to allow flexible snapshot formats. 
> The implementation of {{KeyedStateHandle}} s may vary a lot in different 
> backends. The only information needed in {{KeyedStateHandle}} s is their key 
> group range. When recovering the job with a different degree of parallelism, 
> {{KeyedStateHandle}} s will be assigned to those subtasks whose key group 
> ranges overlap with their ranges.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6034) Add KeyedStateHandle for the snapshots in keyed streams

2017-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15936306#comment-15936306
 ] 

ASF GitHub Bot commented on FLINK-6034:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3531#discussion_r107409859
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
 ---
@@ -91,10 +98,10 @@ public KeyGroupsStateHandle 
getKeyGroupIntersection(KeyGroupRange keyGroupRange)
 
/**
 *
-* @return the internal key-group range to offsets metadata
+* @return the start key group in the key-group range of this handle
 */
-   public KeyGroupRangeOffsets getGroupRangeOffsets() {
-   return groupRangeOffsets;
+   public int getStartKeyGroup() {
--- End diff --

I suggest to remove this pass-through method, as it at a very low 
abstraction level. This information can be obtained already from the result of 
`getKeyGroupRange()`. Now, that we expose the `KeyGroupRange`, also 
`getNumberOfKeyGroups()` could be removed, as we can ask the `KeyGroupRange` 
directly.


> Add KeyedStateHandle for the snapshots in keyed streams
> ---
>
> Key: FLINK-6034
> URL: https://issues.apache.org/jira/browse/FLINK-6034
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Currently, the only type of the snapshots in keyed streams is 
> {{KeyGroupsStateHandle}} which is full and store the states one group after 
> another. With the introduction of incremental checkpointing, we need a higher 
> level abstraction of keyed snapshots to allow flexible snapshot formats. 
> The implementation of {{KeyedStateHandle}} s may vary a lot in different 
> backends. The only information needed in {{KeyedStateHandle}} s is their key 
> group range. When recovering the job with a different degree of parallelism, 
> {{KeyedStateHandle}} s will be assigned to those subtasks whose key group 
> ranges overlap with their ranges.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6034) Add KeyedStateHandle for the snapshots in keyed streams

2017-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15936305#comment-15936305
 ] 

ASF GitHub Bot commented on FLINK-6034:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3531#discussion_r107410978
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
 ---
@@ -306,6 +307,29 @@ private static void 
assignTaskStatesToOperatorInstances(
}
 
/**
+* Determine the subset of {@link KeyGroupsStateHandle 
KeyGroupsStateHandles} with correct
+* key group index for the given subtask {@link KeyGroupRange}.
+* 
+* This is publicly visible to be used in tests.
+*/
+   public static List getKeyedStateHandles(
+   Collection 
keyedStateHandles,
+   KeyGroupRange subtaskKeyGroupRange) {
+
+   List subtaskKeyedStateHandles = new 
ArrayList<>();
+
+   for (KeyedStateHandle keyedStateHandle : keyedStateHandles) {
+   KeyGroupRange intersection = 
keyedStateHandle.getKeyGroupRange().getIntersection(subtaskKeyGroupRange);
--- End diff --

I wonder if we could somehow introduce a 
`KeyedStateHandle::intersect(KeyGroupRange)` that again returns a 
`KeyedStateHandle` with a `KeyGroupRage` that is the intersection of the 
original range and the argument. Basically a higher level version of what the 
KeyGroupsStateHandle can do, and the concrete implementations (like 
`KeyGroupsStateHandle`) know how the virtually split themselves up into a 
sub-range. This also would transfer less data in the RPC (less offsets) and 
saves the post-filtering in the backend. 

Otherwise, we could have a boolean method for just checking intersection, 
because there is no need to create `KeyGroupRange` objects anymore, because we 
do not actually use them.


> Add KeyedStateHandle for the snapshots in keyed streams
> ---
>
> Key: FLINK-6034
> URL: https://issues.apache.org/jira/browse/FLINK-6034
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Currently, the only type of the snapshots in keyed streams is 
> {{KeyGroupsStateHandle}} which is full and store the states one group after 
> another. With the introduction of incremental checkpointing, we need a higher 
> level abstraction of keyed snapshots to allow flexible snapshot formats. 
> The implementation of {{KeyedStateHandle}} s may vary a lot in different 
> backends. The only information needed in {{KeyedStateHandle}} s is their key 
> group range. When recovering the job with a different degree of parallelism, 
> {{KeyedStateHandle}} s will be assigned to those subtasks whose key group 
> ranges overlap with their ranges.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6034) Add KeyedStateHandle for the snapshots in keyed streams

2017-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15936307#comment-15936307
 ] 

ASF GitHub Bot commented on FLINK-6034:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3531#discussion_r107409091
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateHandle.java
 ---
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+/**
+ * A snapshot in keyed streams. Each {@code KeyedStateHandle} contains the
--- End diff --

A state handle typically means a pointer to some state. It will mostly be 
used for snapshots of state backends, but conceptional this i a different level 
of abstraction. I suggest to slightly modify this comment, moving away from the 
concrete implementations of snapshots, to the concept of having a pointer to 
serialized keyed state. 


> Add KeyedStateHandle for the snapshots in keyed streams
> ---
>
> Key: FLINK-6034
> URL: https://issues.apache.org/jira/browse/FLINK-6034
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Currently, the only type of the snapshots in keyed streams is 
> {{KeyGroupsStateHandle}} which is full and store the states one group after 
> another. With the introduction of incremental checkpointing, we need a higher 
> level abstraction of keyed snapshots to allow flexible snapshot formats. 
> The implementation of {{KeyedStateHandle}} s may vary a lot in different 
> backends. The only information needed in {{KeyedStateHandle}} s is their key 
> group range. When recovering the job with a different degree of parallelism, 
> {{KeyedStateHandle}} s will be assigned to those subtasks whose key group 
> ranges overlap with their ranges.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6034) Add KeyedStateHandle for the snapshots in keyed streams

2017-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15936308#comment-15936308
 ] 

ASF GitHub Bot commented on FLINK-6034:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3531#discussion_r107411558
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
 ---
@@ -306,6 +307,29 @@ private static void 
assignTaskStatesToOperatorInstances(
}
 
/**
+* Determine the subset of {@link KeyGroupsStateHandle 
KeyGroupsStateHandles} with correct
+* key group index for the given subtask {@link KeyGroupRange}.
+* 
+* This is publicly visible to be used in tests.
+*/
+   public static List getKeyedStateHandles(
--- End diff --

Could we completely remove `getKeyGroupsStateHandles()` and remove 
everything with this method that works on `KeyedStateHandle`? This includes 
using also only `KeyedStateHandle` for the raw keyed state that is checkpointed 
into streams (without backend). Otherwise, this is almost duplicated code. What 
do you think?


> Add KeyedStateHandle for the snapshots in keyed streams
> ---
>
> Key: FLINK-6034
> URL: https://issues.apache.org/jira/browse/FLINK-6034
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Currently, the only type of the snapshots in keyed streams is 
> {{KeyGroupsStateHandle}} which is full and store the states one group after 
> another. With the introduction of incremental checkpointing, we need a higher 
> level abstraction of keyed snapshots to allow flexible snapshot formats. 
> The implementation of {{KeyedStateHandle}} s may vary a lot in different 
> backends. The only information needed in {{KeyedStateHandle}} s is their key 
> group range. When recovering the job with a different degree of parallelism, 
> {{KeyedStateHandle}} s will be assigned to those subtasks whose key group 
> ranges overlap with their ranges.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6034) Add KeyedStateHandle for the snapshots in keyed streams

2017-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15923996#comment-15923996
 ] 

ASF GitHub Bot commented on FLINK-6034:
---

GitHub user shixiaogang opened a pull request:

https://github.com/apache/flink/pull/3531

[FLINK-6034][checkpoint] Add KeyedStateHandle for the snapshots in keyed 
streams

## Changes
- Add `KeyedStateHandle` for the snapshots in keyed streams. 
`KeyGroupsStateHandle` now is one of its implementation.
- Distribute `KeyedStateHandle`s to subtasks with their key group range. A 
`KeyedStateHandle` will be assigned to all  subtasks whose key group range 
overlap with its range.
 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/shixiaogang/flink flink-6034

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3531.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3531


commit 9637dcc40d66a2702f5227b7bbe3ae66fca89adf
Author: xiaogang.sxg 
Date:   2017-03-14T11:04:37Z

Add KeyedStateHandle for the snapshots in keyed streams




> Add KeyedStateHandle for the snapshots in keyed streams
> ---
>
> Key: FLINK-6034
> URL: https://issues.apache.org/jira/browse/FLINK-6034
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Currently, the only type of the snapshots in keyed streams is 
> {{KeyGroupsStateHandle}} which is full and store the states one group after 
> another. With the introduction of incremental checkpointing, we need a higher 
> level abstraction of keyed snapshots to allow flexible snapshot formats. 
> The implementation of {{KeyedStateHandle}} s may vary a lot in different 
> backends. The only information needed in {{KeyedStateHandle}} s is their key 
> group range. When recovering the job with a different degree of parallelism, 
> {{KeyedStateHandle}} s will be assigned to those subtasks whose key group 
> ranges overlap with their ranges.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)