This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 3ba589e705a6ecf95132512a3aacc781cd7641ff
Author: Igal Shilman <[email protected]>
AuthorDate: Wed Oct 21 15:28:39 2020 +0200

    [FLINK-19692][core] Write all the assigned KeyGroups into the raw keyed 
stream
---
 .../statefun/flink/core/logger/KeyGroupStream.java      |  4 ++++
 .../flink/core/logger/UnboundedFeedbackLogger.java      | 17 ++++++++++++-----
 .../flink/core/logger/UnboundedFeedbackLoggerTest.java  |  3 ++-
 3 files changed, 18 insertions(+), 6 deletions(-)

diff --git 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/KeyGroupStream.java
 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/KeyGroupStream.java
index 6628225..5b22864 100644
--- 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/KeyGroupStream.java
+++ 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/KeyGroupStream.java
@@ -94,4 +94,8 @@ final class KeyGroupStream<T> {
       memoryPool.release(segment);
     }
   }
+
+  public static void writeEmptyTo(DataOutputView target) throws IOException {
+    target.writeInt(0);
+  }
 }
diff --git 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger.java
 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger.java
index 409f714..51e8e7b 100644
--- 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger.java
+++ 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger.java
@@ -24,7 +24,6 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.TreeMap;
 import java.util.function.Supplier;
@@ -99,11 +98,19 @@ public final class UnboundedFeedbackLogger<T> implements 
FeedbackLogger<T> {
     checkState(keyedStateOutputStream != null, "Trying to flush envelopes not 
in a logging state");
 
     final DataOutputView target = new 
DataOutputViewStreamWrapper(keyedStateOutputStream);
-    for (Entry<Integer, KeyGroupStream<T>> entry : keyGroupStreams.entrySet()) 
{
-      checkpointedStreamOperations.startNewKeyGroup(keyedStateOutputStream, 
entry.getKey());
+    final Iterable<Integer> assignedKeyGroupIds =
+        checkpointedStreamOperations.keyGroupList(keyedStateOutputStream);
+    // the underlying checkpointed raw stream, requires that all key groups 
assigned
+    // to this operator must be written to the underlying stream.
+    for (Integer keyGroupId : assignedKeyGroupIds) {
+      checkpointedStreamOperations.startNewKeyGroup(keyedStateOutputStream, 
keyGroupId);
 
-      KeyGroupStream stream = entry.getValue();
-      stream.writeTo(target);
+      @Nullable KeyGroupStream<T> stream = keyGroupStreams.get(keyGroupId);
+      if (stream == null) {
+        KeyGroupStream.writeEmptyTo(target);
+      } else {
+        stream.writeTo(target);
+      }
     }
   }
 
diff --git 
a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLoggerTest.java
 
b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLoggerTest.java
index 08600ef..ac7efdd 100644
--- 
a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLoggerTest.java
+++ 
b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLoggerTest.java
@@ -27,6 +27,7 @@ import java.util.stream.IntStream;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.statefun.flink.core.di.ObjectContainer;
+import org.hamcrest.Matchers;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
@@ -57,7 +58,7 @@ public class UnboundedFeedbackLoggerTest {
     logger.startLogging(output);
     logger.commit();
 
-    assertThat(output.size(), is(0));
+    assertThat(output.size(), Matchers.greaterThan(0));
   }
 
   @Test(expected = IllegalStateException.class)

Reply via email to