This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 13f2f7b8259b5d7dec9349eacf8c80af207bf9e2 Author: Tzu-Li (Gordon) Tai <[email protected]> AuthorDate: Fri Oct 23 19:39:14 2020 +0800 [FLINK-19748] Skip key groups that don't have a defined stream offset --- .../api/operators/StreamTaskStateInitializerImpl.java | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java index 40fb3a3..6a08458 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java @@ -35,6 +35,7 @@ import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider; import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateBackend; import org.apache.flink.runtime.state.OperatorStateHandle; @@ -62,6 +63,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; +import java.util.stream.StreamSupport; /** * This class is the main implementation of a {@link StreamTaskStateInitializer}. This class obtains the state to create @@ -432,15 +434,23 @@ public class StreamTaskStateInitializerImpl implements StreamTaskStateInitialize while (stateHandleIterator.hasNext()) { currentStateHandle = stateHandleIterator.next(); if (currentStateHandle.getKeyGroupRange().getNumberOfKeyGroups() > 0) { - currentOffsetsIterator = currentStateHandle.getGroupRangeOffsets().iterator(); + currentOffsetsIterator = unsetOffsetsSkippingIterator(currentStateHandle); - return true; + if (currentOffsetsIterator.hasNext()) { + return true; + } } } return false; } + private static Iterator<Tuple2<Integer, Long>> unsetOffsetsSkippingIterator(KeyGroupsStateHandle keyGroupsStateHandle) { + return StreamSupport.stream(keyGroupsStateHandle.getGroupRangeOffsets().spliterator(), false) + .filter(keyGroupIdAndOffset -> keyGroupIdAndOffset.f1 != KeyedStateCheckpointOutputStream.NO_OFFSET_SET) + .iterator(); + } + @Override public KeyGroupStatePartitionStreamProvider next() {
