tzulitai commented on a change in pull request #168:
URL: https://github.com/apache/flink-statefun/pull/168#discussion_r513379873



##########
File path: 
statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger.java
##########
@@ -99,11 +98,19 @@ private void flushToKeyedStateOutputStream() throws 
IOException {
     checkState(keyedStateOutputStream != null, "Trying to flush envelopes not 
in a logging state");
 
     final DataOutputView target = new 
DataOutputViewStreamWrapper(keyedStateOutputStream);
-    for (Entry<Integer, KeyGroupStream<T>> entry : keyGroupStreams.entrySet()) 
{
-      checkpointedStreamOperations.startNewKeyGroup(keyedStateOutputStream, 
entry.getKey());
+    final Iterable<Integer> assignedKeyGroupIds =
+        checkpointedStreamOperations.keyGroupList(keyedStateOutputStream);
+    // the underlying checkpointed raw stream, requires that all key groups 
assigned
+    // to this operator must be written to the underlying stream.

Review comment:
       I don’t have a strong opinion on whether or not the empty key groups 
should stay there in the long term, so fine by me to keep this as is without 
the TODO comment to revisit 👍 
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to