This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 13f2f7b8259b5d7dec9349eacf8c80af207bf9e2
Author: Tzu-Li (Gordon) Tai <[email protected]>
AuthorDate: Fri Oct 23 19:39:14 2020 +0800

    [FLINK-19748] Skip key groups that don't have a defined stream offset
---
 .../api/operators/StreamTaskStateInitializerImpl.java      | 14 ++++++++++++--
 1 file changed, 12 insertions(+), 2 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
index 40fb3a3..6a08458 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.OperatorStateHandle;
@@ -62,6 +63,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.NoSuchElementException;
+import java.util.stream.StreamSupport;
 
 /**
  * This class is the main implementation of a {@link 
StreamTaskStateInitializer}. This class obtains the state to create
@@ -432,15 +434,23 @@ public class StreamTaskStateInitializerImpl implements 
StreamTaskStateInitialize
                        while (stateHandleIterator.hasNext()) {
                                currentStateHandle = stateHandleIterator.next();
                                if 
(currentStateHandle.getKeyGroupRange().getNumberOfKeyGroups() > 0) {
-                                       currentOffsetsIterator = 
currentStateHandle.getGroupRangeOffsets().iterator();
+                                       currentOffsetsIterator = 
unsetOffsetsSkippingIterator(currentStateHandle);
 
-                                       return true;
+                                       if (currentOffsetsIterator.hasNext()) {
+                                               return true;
+                                       }
                                }
                        }
 
                        return false;
                }
 
+               private static Iterator<Tuple2<Integer, Long>> 
unsetOffsetsSkippingIterator(KeyGroupsStateHandle keyGroupsStateHandle) {
+                       return 
StreamSupport.stream(keyGroupsStateHandle.getGroupRangeOffsets().spliterator(), 
false)
+                               .filter(keyGroupIdAndOffset -> 
keyGroupIdAndOffset.f1 != KeyedStateCheckpointOutputStream.NO_OFFSET_SET)
+                               .iterator();
+               }
+
                @Override
                public KeyGroupStatePartitionStreamProvider next() {
 

Reply via email to