This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit 3ba589e705a6ecf95132512a3aacc781cd7641ff Author: Igal Shilman <[email protected]> AuthorDate: Wed Oct 21 15:28:39 2020 +0200 [FLINK-19692][core] Write all the assigned KeyGroups into the raw keyed stream --- .../statefun/flink/core/logger/KeyGroupStream.java | 4 ++++ .../flink/core/logger/UnboundedFeedbackLogger.java | 17 ++++++++++++----- .../flink/core/logger/UnboundedFeedbackLoggerTest.java | 3 ++- 3 files changed, 18 insertions(+), 6 deletions(-) diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/KeyGroupStream.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/KeyGroupStream.java index 6628225..5b22864 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/KeyGroupStream.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/KeyGroupStream.java @@ -94,4 +94,8 @@ final class KeyGroupStream<T> { memoryPool.release(segment); } } + + public static void writeEmptyTo(DataOutputView target) throws IOException { + target.writeInt(0); + } } diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger.java index 409f714..51e8e7b 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger.java @@ -24,7 +24,6 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.Map; -import java.util.Map.Entry; import java.util.Objects; import java.util.TreeMap; import java.util.function.Supplier; @@ -99,11 +98,19 @@ public final class UnboundedFeedbackLogger<T> implements FeedbackLogger<T> { checkState(keyedStateOutputStream != null, "Trying to flush envelopes not in a logging state"); final DataOutputView target = new DataOutputViewStreamWrapper(keyedStateOutputStream); - for (Entry<Integer, KeyGroupStream<T>> entry : keyGroupStreams.entrySet()) { - checkpointedStreamOperations.startNewKeyGroup(keyedStateOutputStream, entry.getKey()); + final Iterable<Integer> assignedKeyGroupIds = + checkpointedStreamOperations.keyGroupList(keyedStateOutputStream); + // the underlying checkpointed raw stream, requires that all key groups assigned + // to this operator must be written to the underlying stream. + for (Integer keyGroupId : assignedKeyGroupIds) { + checkpointedStreamOperations.startNewKeyGroup(keyedStateOutputStream, keyGroupId); - KeyGroupStream stream = entry.getValue(); - stream.writeTo(target); + @Nullable KeyGroupStream<T> stream = keyGroupStreams.get(keyGroupId); + if (stream == null) { + KeyGroupStream.writeEmptyTo(target); + } else { + stream.writeTo(target); + } } } diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLoggerTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLoggerTest.java index 08600ef..ac7efdd 100644 --- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLoggerTest.java +++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLoggerTest.java @@ -27,6 +27,7 @@ import java.util.stream.IntStream; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; import org.apache.flink.statefun.flink.core.di.ObjectContainer; +import org.hamcrest.Matchers; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Ignore; @@ -57,7 +58,7 @@ public class UnboundedFeedbackLoggerTest { logger.startLogging(output); logger.commit(); - assertThat(output.size(), is(0)); + assertThat(output.size(), Matchers.greaterThan(0)); } @Test(expected = IllegalStateException.class)
