[ 
https://issues.apache.org/jira/browse/FLINK-5056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15668574#comment-15668574
 ] 

ASF GitHub Bot commented on FLINK-5056:
---------------------------------------

Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2797#discussion_r88124075
  
    --- Diff: 
flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
 ---
    @@ -118,48 +150,290 @@ public static void destroyHDFS() {
        }
     
        @Test
    -   public void testCheckpointWithoutNotify() throws Exception {
    -           File dataDir = tempFolder.newFolder();
    +   public void testInactivityPeriodWithLateNotify() throws Exception {
    +           final File outDir = tempFolder.newFolder();
    +
    +           OneInputStreamOperatorTestHarness<String, Object> testHarness = 
createRescalingTestSink(outDir, 1, 0, 100);
    +           testHarness.setup();
    +           testHarness.open();
    +
    +           testHarness.setProcessingTime(0L);
    +
    +           testHarness.processElement(new StreamRecord<>("test1", 1L));
    +           testHarness.processElement(new StreamRecord<>("test2", 1L));
    +           checkFs(outDir, 2, 0 ,0, 0);
    +
    +           testHarness.setProcessingTime(101L);    // put some in pending
    +           checkFs(outDir, 0, 2, 0, 0);
    +
    +           testHarness.snapshot(0, 0);                             // put 
them in pending for 0
    +           checkFs(outDir, 0, 2, 0, 0);
     
    -           OneInputStreamOperatorTestHarness<String, Object> testHarness = 
createTestSink(dataDir);
    +           testHarness.processElement(new StreamRecord<>("test3", 1L));
    +           testHarness.processElement(new StreamRecord<>("test4", 1L));
     
    +           testHarness.setProcessingTime(202L);    // put some in pending
    +
    +           testHarness.snapshot(1, 0);                             // put 
them in pending for 1
    +           checkFs(outDir, 0, 4, 0, 0);
    +
    +           testHarness.notifyOfCompletedCheckpoint(0);     // put the 
pending for 0 to the "committed" state
    +           checkFs(outDir, 0, 2, 2, 0);
    +
    +           testHarness.notifyOfCompletedCheckpoint(1); // put the pending 
for 1 to the "committed" state
    +           checkFs(outDir, 0, 0, 4, 0);
    +   }
    +
    +   @Test
    +   public void testBucketStateTransitions() throws Exception {
    +           final File outDir = tempFolder.newFolder();
    +
    +           OneInputStreamOperatorTestHarness<String, Object> testHarness = 
createRescalingTestSink(outDir, 1, 0, 100);
                testHarness.setup();
                testHarness.open();
     
    -           testHarness.processElement(new StreamRecord<>("Hello"));
    -           testHarness.processElement(new StreamRecord<>("Hello"));
    -           testHarness.processElement(new StreamRecord<>("Hello"));
    +           testHarness.setProcessingTime(0L);
    +
    +           testHarness.processElement(new StreamRecord<>("test1", 1L));
    +           testHarness.processElement(new StreamRecord<>("test2", 1L));
    +           checkFs(outDir, 2, 0 ,0, 0);
    +
    +           // this is to check the inactivity threshold
    +           testHarness.setProcessingTime(101L);
    +           checkFs(outDir, 0, 2, 0, 0);
    +
    +           testHarness.processElement(new StreamRecord<>("test3", 1L));
    +           checkFs(outDir, 1, 2, 0, 0);
    +
    +           testHarness.snapshot(0, 0);
    +           checkFs(outDir, 1, 2, 0, 0);
     
    -           testHarness.setProcessingTime(10000L);
    +           testHarness.notifyOfCompletedCheckpoint(0);
    +           checkFs(outDir, 1, 0, 2, 0);
     
    -           // snapshot but don't call notify to simulate a notify that 
never
    -           // arrives, the sink should move pending files in restore() in 
that case
    -           StreamStateHandle snapshot1 = testHarness.snapshotLegacy(0, 0);
    +           OperatorStateHandles snapshot = testHarness.snapshot(1, 0);
     
    -           testHarness = createTestSink(dataDir);
    +           testHarness.close();
    +           checkFs(outDir, 0, 1, 2, 0);
    +
    +           testHarness = createRescalingTestSink(outDir, 1, 0, 100);
    +           testHarness.setup();
    +           testHarness.initializeState(snapshot);
    +           testHarness.open();
    +           checkFs(outDir, 0, 0, 3, 1);
    +
    +           snapshot = testHarness.snapshot(2, 0);
    +
    +           testHarness.processElement(new StreamRecord<>("test4", 10));
    +           checkFs(outDir, 1, 0, 3, 1);
    +
    +           testHarness = createRescalingTestSink(outDir, 1, 0, 100);
                testHarness.setup();
    -           testHarness.restore(snapshot1);
    +           testHarness.initializeState(snapshot);
                testHarness.open();
     
    -           testHarness.processElement(new StreamRecord<>("Hello"));
    +           // the in-progress file remains as we do not clean up now
    +           checkFs(outDir, 1, 0, 3, 1);
     
                testHarness.close();
     
    -           int numComplete = 0;
    -           int numPending = 0;
    -           for (File file: FileUtils.listFiles(dataDir, null, true)) {
    +           // at close it is not moved to final because it is not part
    +           // of the current task's state, it was just a not cleaned up 
leftover.
    +           checkFs(outDir, 1, 0, 3, 1);
    +   }
    +
    +   @Test
    +   public void testSameParallelismWithShufflingStates() throws Exception {
    +           final File outDir = tempFolder.newFolder();
    +
    +           OneInputStreamOperatorTestHarness<String, Object> testHarness1 
= createRescalingTestSink(outDir, 2, 0, 100);
    +           testHarness1.setup();
    +           testHarness1.open();
    +
    +           OneInputStreamOperatorTestHarness<String, Object> testHarness2 
= createRescalingTestSink(outDir, 2, 1, 100);
    +           testHarness2.setup();
    +           testHarness2.open();
    +
    +           testHarness1.processElement(new StreamRecord<>("test1", 0L));
    +           checkFs(outDir, 1, 0, 0, 0);
    +
    +           testHarness2.processElement(new StreamRecord<>("test2", 0L));
    +           checkFs(outDir, 2, 0, 0, 0);
    +
    +           // intentionally we snapshot them in the reverse order so that 
the states are shuffled
    +           OperatorStateHandles mergedSnapshot = 
AbstractStreamOperatorTestHarness.repackageState(
    +                   testHarness2.snapshot(0, 0),
    +                   testHarness1.snapshot(0, 0)
    +           );
    +
    +           checkFs(outDir, 2, 0, 0, 0);
    +
    +           // this will not be included in any checkpoint so it can be 
cleaned up (although we do not)
    +           testHarness2.processElement(new StreamRecord<>("test3", 0L));
    +           checkFs(outDir, 3, 0, 0, 0);
    +
    +           testHarness1 = createRescalingTestSink(outDir, 2, 0, 100);
    +           testHarness1.setup();
    +           testHarness1.initializeState(mergedSnapshot);
    +           testHarness1.open();
    +
    +           // the one in-progress will be the one assigned to the next 
instance,
    +           // the other is the test3 which is just not cleaned up
    +           checkFs(outDir, 2, 0, 1, 1);
    +
    +           testHarness2 = createRescalingTestSink(outDir, 2, 1, 100);
    +           testHarness2.setup();
    +           testHarness2.initializeState(mergedSnapshot);
    +           testHarness2.open();
    +
    +           checkFs(outDir, 1, 0, 2, 2);
    +
    +           testHarness1.close();
    +           testHarness2.close();
    +
    +           // the 1 in-progress can be discarded.
    +           checkFs(outDir, 1, 0, 2, 2);
    +   }
    +
    +   @Test
    +   public void testScalingDown() throws Exception {
    +           final File outDir = tempFolder.newFolder();
    +
    +           OneInputStreamOperatorTestHarness<String, Object> testHarness1 
= createRescalingTestSink(outDir, 3, 0, 100);
    +           testHarness1.setup();
    +           testHarness1.open();
    +
    +           OneInputStreamOperatorTestHarness<String, Object> testHarness2 
= createRescalingTestSink(outDir, 3, 1, 100);
    +           testHarness2.setup();
    +           testHarness2.open();
    +
    +           OneInputStreamOperatorTestHarness<String, Object> testHarness3 
= createRescalingTestSink(outDir, 3, 2, 100);
    +           testHarness3.setup();
    +           testHarness3.open();
    +
    +           testHarness1.processElement(new StreamRecord<>("test1", 0L));
    +           checkFs(outDir, 1, 0, 0, 0);
    +
    +           testHarness2.processElement(new StreamRecord<>("test2", 0L));
    +           checkFs(outDir, 2, 0, 0, 0);
    +
    +           testHarness3.processElement(new StreamRecord<>("test3", 0L));
    +           testHarness3.processElement(new StreamRecord<>("test4", 0L));
    +           checkFs(outDir, 4, 0, 0, 0);
    +
    +           // intentionally we snapshot them in the reverse order so that 
the states are shuffled
    +           OperatorStateHandles mergedSnapshot = 
AbstractStreamOperatorTestHarness.repackageState(
    +                   testHarness3.snapshot(0, 0),
    +                   testHarness1.snapshot(0, 0),
    +                   testHarness2.snapshot(0, 0)
    +           );
    +
    +           testHarness1 = createRescalingTestSink(outDir, 2, 0, 100);
    +           testHarness1.setup();
    +           testHarness1.initializeState(mergedSnapshot);
    +           testHarness1.open();
    +
    +           checkFs(outDir, 1, 0, 3, 3);
    +
    +           testHarness2 = createRescalingTestSink(outDir, 2, 1, 100);
    +           testHarness2.setup();
    +           testHarness2.initializeState(mergedSnapshot);
    +           testHarness2.open();
    +
    +           checkFs(outDir, 0, 0, 4, 4);
    +   }
    +
    +   @Test
    +   public void testScalingUp() throws Exception {
    +           final File outDir = tempFolder.newFolder();
    +
    +           OneInputStreamOperatorTestHarness<String, Object> testHarness1 
= createRescalingTestSink(outDir, 2, 0, 100);
    +           testHarness1.setup();
    +           testHarness1.open();
    +
    +           OneInputStreamOperatorTestHarness<String, Object> testHarness2 
= createRescalingTestSink(outDir, 2, 0, 100);
    +           testHarness2.setup();
    +           testHarness2.open();
    +
    +           testHarness1.processElement(new StreamRecord<>("test1", 1L));
    +           testHarness1.processElement(new StreamRecord<>("test2", 1L));
    +
    +           checkFs(outDir, 2, 0, 0, 0);
    +
    +           testHarness2.processElement(new StreamRecord<>("test3", 1L));
    +           testHarness2.processElement(new StreamRecord<>("test4", 1L));
    +           testHarness2.processElement(new StreamRecord<>("test5", 1L));
    +
    +           checkFs(outDir, 5, 0, 0, 0);
    +
    +           // intentionally we snapshot them in the reverse order so that 
the states are shuffled
    +           OperatorStateHandles mergedSnapshot = 
AbstractStreamOperatorTestHarness.repackageState(
    +                   testHarness2.snapshot(0, 0),
    +                   testHarness1.snapshot(0, 0)
    +           );
    +
    +           testHarness1 = createRescalingTestSink(outDir, 3, 0, 100);
    +           testHarness1.setup();
    +           testHarness1.initializeState(mergedSnapshot);
    +           testHarness1.open();
    +
    +           checkFs(outDir, 2, 0, 3, 3);
    +
    +           testHarness2 = createRescalingTestSink(outDir, 3, 1, 100);
    +           testHarness2.setup();
    +           testHarness2.initializeState(mergedSnapshot);
    +           testHarness2.open();
    +
    +           checkFs(outDir, 0, 0, 5, 5);
    +
    +           OneInputStreamOperatorTestHarness<String, Object> testHarness3 
= createRescalingTestSink(outDir, 3, 2, 100);
    +           testHarness3.setup();
    +           testHarness3.initializeState(mergedSnapshot);
    +           testHarness3.open();
    +
    --- End diff --
    
    let's add a separate check here to make sure that nothing was changed just 
by the initialization.


> BucketingSink deletes valid data when checkpoint notification is slow.
> ----------------------------------------------------------------------
>
>                 Key: FLINK-5056
>                 URL: https://issues.apache.org/jira/browse/FLINK-5056
>             Project: Flink
>          Issue Type: Bug
>          Components: filesystem-connector
>    Affects Versions: 1.1.3
>            Reporter: Kostas Kloudas
>            Assignee: Kostas Kloudas
>             Fix For: 1.2.0
>
>
> Currently if BucketingSink receives no data after a checkpoint and then a 
> notification about a previous checkpoint arrives, it clears its state. This 
> can 
> lead to not committing valid data about intermediate checkpoints for whom
> a notification has not arrived yet. As a simple sequence that illustrates the 
> problem:
> -> input data 
> -> snapshot(0) 
> -> input data
> -> snapshot(1)
> -> no data
> -> notifyCheckpointComplete(0)
> the last will clear the state of the Sink without committing as final the 
> data 
> that arrived for checkpoint 1.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to