[GitHub] [flink-statefun] tzulitai commented on a change in pull request #168: [FLINK-19692] Write all assigned key groups into the keyed raw stream with a Header.

2020-10-27 Thread GitBox


tzulitai commented on a change in pull request #168:
URL: https://github.com/apache/flink-statefun/pull/168#discussion_r513184588



##
File path: 
statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger.java
##
@@ -99,11 +98,19 @@ private void flushToKeyedStateOutputStream() throws 
IOException {
 checkState(keyedStateOutputStream != null, "Trying to flush envelopes not 
in a logging state");
 
 final DataOutputView target = new 
DataOutputViewStreamWrapper(keyedStateOutputStream);
-for (Entry> entry : keyGroupStreams.entrySet()) 
{
-  checkpointedStreamOperations.startNewKeyGroup(keyedStateOutputStream, 
entry.getKey());
+final Iterable assignedKeyGroupIds =
+checkpointedStreamOperations.keyGroupList(keyedStateOutputStream);
+// the underlying checkpointed raw stream, requires that all key groups 
assigned
+// to this operator must be written to the underlying stream.

Review comment:
   nit: I'm wondering if it makes sense to add a TODO here to help remind 
us in the future that after FLINK-19748 (allow skipping key groups) is merged, 
we may choose to revert writing empty key groups?

##
File path: 
statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLoggerTest.java
##
@@ -79,6 +81,21 @@ public void roundTripWithSpill() throws Exception {
 roundTrip(1_000_000, 0);
   }
 
+  @Test
+  public void testHeader() throws IOException {

Review comment:
   As I understand it, this test verifies the header serde round trip, in 
the case that the header was written.
   
   As a counterpart, could you add a test that verifies 
`Header.skipHeaderSilently` is effectively a no-op if the header was missing in 
the input stream?
   
   i.e.,
   another variant of this test where the line 
`UnboundedFeedbackLogger.Header.writeHeader(out);` is removed should be passing 
as well.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-statefun] tzulitai commented on a change in pull request #168: [FLINK-19692] Write all assigned key groups into the keyed raw stream with a Header.

2020-10-28 Thread GitBox


tzulitai commented on a change in pull request #168:
URL: https://github.com/apache/flink-statefun/pull/168#discussion_r513379873



##
File path: 
statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger.java
##
@@ -99,11 +98,19 @@ private void flushToKeyedStateOutputStream() throws 
IOException {
 checkState(keyedStateOutputStream != null, "Trying to flush envelopes not 
in a logging state");
 
 final DataOutputView target = new 
DataOutputViewStreamWrapper(keyedStateOutputStream);
-for (Entry> entry : keyGroupStreams.entrySet()) 
{
-  checkpointedStreamOperations.startNewKeyGroup(keyedStateOutputStream, 
entry.getKey());
+final Iterable assignedKeyGroupIds =
+checkpointedStreamOperations.keyGroupList(keyedStateOutputStream);
+// the underlying checkpointed raw stream, requires that all key groups 
assigned
+// to this operator must be written to the underlying stream.

Review comment:
   I don’t have a strong opinion on whether or not the empty key groups 
should stay there in the long term, so fine by me to keep this as is without 
the TODO comment to revisit 👍 
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org