[ https://issues.apache.org/jira/browse/FLINK-5056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15667330#comment-15667330 ]
ASF GitHub Bot commented on FLINK-5056: --------------------------------------- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2797#discussion_r88019498 --- Diff: flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java --- @@ -118,48 +146,290 @@ public static void destroyHDFS() { } @Test - public void testCheckpointWithoutNotify() throws Exception { - File dataDir = tempFolder.newFolder(); + public void testInactivityPeriodWithLateNotify() throws Exception { + final File outDir = tempFolder.newFolder(); + + OneInputStreamOperatorTestHarness<String, Object> testHarness = createRescalingTestSink(outDir, 1, 0, 100); + testHarness.setup(); + testHarness.open(); + + testHarness.setProcessingTime(0L); + + testHarness.processElement(new StreamRecord<>("test1", 1L)); + testHarness.processElement(new StreamRecord<>("test2", 1L)); + checkFs(outDir, 2, 0 ,0, 0); + + testHarness.setProcessingTime(101L); // put some in pending + checkFs(outDir, 0, 2, 0, 0); + + testHarness.snapshot(0, 0); // put them in pending for 0 + checkFs(outDir, 0, 2, 0, 0); + + testHarness.processElement(new StreamRecord<>("test3", 1L)); + testHarness.processElement(new StreamRecord<>("test4", 1L)); - OneInputStreamOperatorTestHarness<String, Object> testHarness = createTestSink(dataDir); + testHarness.setProcessingTime(202L); // put some in pending + testHarness.snapshot(1, 0); // put them in pending for 1 + checkFs(outDir, 0, 4, 0, 0); + + testHarness.notifyOfCompletedCheckpoint(0); // put the pending for 0 to the "committed" state + checkFs(outDir, 0, 2, 2, 0); + + testHarness.notifyOfCompletedCheckpoint(1); // put the pending for 1 to the "committed" state + checkFs(outDir, 0, 0, 4, 0); + } + + @Test + public void testBucketStateTransitions() throws Exception { + final File outDir = tempFolder.newFolder(); + + OneInputStreamOperatorTestHarness<String, Object> testHarness = createRescalingTestSink(outDir, 1, 0, 100); testHarness.setup(); testHarness.open(); - testHarness.processElement(new StreamRecord<>("Hello")); - testHarness.processElement(new StreamRecord<>("Hello")); - testHarness.processElement(new StreamRecord<>("Hello")); + testHarness.setProcessingTime(0L); + + testHarness.processElement(new StreamRecord<>("test1", 1L)); + testHarness.processElement(new StreamRecord<>("test2", 1L)); + checkFs(outDir, 2, 0 ,0, 0); + + // this is to check the inactivity threshold + testHarness.setProcessingTime(101L); + checkFs(outDir, 0, 2, 0, 0); + + testHarness.processElement(new StreamRecord<>("test3", 1L)); + checkFs(outDir, 1, 2, 0, 0); + + testHarness.snapshot(0, 0); + checkFs(outDir, 1, 2, 0, 0); + + testHarness.notifyOfCompletedCheckpoint(0); + checkFs(outDir, 1, 0, 2, 0); - testHarness.setProcessingTime(10000L); + OperatorStateHandles snapshot = testHarness.snapshot(1, 0); - // snapshot but don't call notify to simulate a notify that never - // arrives, the sink should move pending files in restore() in that case - StreamStateHandle snapshot1 = testHarness.snapshotLegacy(0, 0); + testHarness.close(); + checkFs(outDir, 0, 1, 2, 0); + + testHarness = createRescalingTestSink(outDir, 1, 0, 100); + testHarness.setup(); + testHarness.initializeState(snapshot); + testHarness.open(); + checkFs(outDir, 0, 0, 3, 1); + + snapshot = testHarness.snapshot(2, 0); + + testHarness.processElement(new StreamRecord<>("test4", 10)); + checkFs(outDir, 1, 0, 3, 1); - testHarness = createTestSink(dataDir); + testHarness = createRescalingTestSink(outDir, 1, 0, 100); testHarness.setup(); - testHarness.restore(snapshot1); + testHarness.initializeState(snapshot); testHarness.open(); - testHarness.processElement(new StreamRecord<>("Hello")); + // the in-progress file remains as we do not clean up now + checkFs(outDir, 1, 0, 3, 1); testHarness.close(); - int numComplete = 0; - int numPending = 0; - for (File file: FileUtils.listFiles(dataDir, null, true)) { + // at close it is not moved to final because it is not part + // of the current task's state, it was just a not cleaned up leftover. + checkFs(outDir, 1, 0, 3, 1); + } + + @Test + public void testSameParallelismWithShufflingStates() throws Exception { + final File outDir = tempFolder.newFolder(); + + OneInputStreamOperatorTestHarness<String, Object> testHarness1 = createRescalingTestSink(outDir, 2, 0, 100); + testHarness1.setup(); + testHarness1.open(); + + OneInputStreamOperatorTestHarness<String, Object> testHarness2 = createRescalingTestSink(outDir, 2, 1, 100); + testHarness2.setup(); + testHarness2.open(); + + testHarness1.processElement(new StreamRecord<>("test1", 0L)); + checkFs(outDir, 1, 0, 0, 0); + + testHarness2.processElement(new StreamRecord<>("test2", 0L)); + checkFs(outDir, 2, 0, 0, 0); + + // intentionally we snapshot them in the reverse order so that the states are shuffled + OperatorStateHandles mergedSnapshot = AbstractStreamOperatorTestHarness.repackageState( + testHarness2.snapshot(0, 0), + testHarness1.snapshot(0, 0) + ); + + checkFs(outDir, 2, 0, 0, 0); + + // this will not be included in any checkpoint so it can be cleaned up (although we do not) + testHarness2.processElement(new StreamRecord<>("test3", 0L)); + checkFs(outDir, 3, 0, 0, 0); + + testHarness1 = createRescalingTestSink(outDir, 2, 0, 100); + testHarness1.setup(); + testHarness1.initializeState(mergedSnapshot); + testHarness1.open(); + + // the one in-progress will be the one assigned to the next instance, + // the other is the test3 which is just not cleaned up + checkFs(outDir, 2, 0, 1, 1); + + testHarness2 = createRescalingTestSink(outDir, 2, 1, 100); + testHarness2.setup(); + testHarness2.initializeState(mergedSnapshot); + testHarness2.open(); + + checkFs(outDir, 1, 0, 2, 2); + + testHarness1.close(); + testHarness2.close(); + + // the 1 in-progress can be discarded. + checkFs(outDir, 1, 0, 2, 2); + } + + @Test + public void testScalingDown() throws Exception { + final File outDir = tempFolder.newFolder(); + + OneInputStreamOperatorTestHarness<String, Object> testHarness1 = createRescalingTestSink(outDir, 3, 0, 100); + testHarness1.setup(); + testHarness1.open(); + + OneInputStreamOperatorTestHarness<String, Object> testHarness2 = createRescalingTestSink(outDir, 3, 1, 100); + testHarness2.setup(); + testHarness2.open(); + + OneInputStreamOperatorTestHarness<String, Object> testHarness3 = createRescalingTestSink(outDir, 3, 2, 100); + testHarness3.setup(); + testHarness3.open(); + + testHarness1.processElement(new StreamRecord<>("test1", 0L)); + checkFs(outDir, 1, 0, 0, 0); + + testHarness2.processElement(new StreamRecord<>("test2", 0L)); + checkFs(outDir, 2, 0, 0, 0); + + testHarness3.processElement(new StreamRecord<>("test3", 0L)); + testHarness3.processElement(new StreamRecord<>("test4", 0L)); + checkFs(outDir, 4, 0, 0, 0); + + // intentionally we snapshot them in the reverse order so that the states are shuffled + OperatorStateHandles mergedSnapshot = AbstractStreamOperatorTestHarness.repackageState( + testHarness3.snapshot(0, 0), + testHarness1.snapshot(0, 0), + testHarness2.snapshot(0, 0) + ); + + testHarness1 = createRescalingTestSink(outDir, 2, 0, 100); + testHarness1.setup(); + testHarness1.initializeState(mergedSnapshot); + testHarness1.open(); + + checkFs(outDir, 1, 0, 3, 3); + + testHarness2 = createRescalingTestSink(outDir, 2, 1, 100); + testHarness2.setup(); + testHarness2.initializeState(mergedSnapshot); + testHarness2.open(); + + checkFs(outDir, 0, 0, 4, 4); + } + + @Test + public void testScalingUp() throws Exception { + final File outDir = tempFolder.newFolder(); + + OneInputStreamOperatorTestHarness<String, Object> testHarness1 = createRescalingTestSink(outDir, 2, 0, 100); + testHarness1.setup(); + testHarness1.open(); + + OneInputStreamOperatorTestHarness<String, Object> testHarness2 = createRescalingTestSink(outDir, 2, 0, 100); + testHarness2.setup(); + testHarness2.open(); + + testHarness1.processElement(new StreamRecord<>("test1", 1L)); + testHarness1.processElement(new StreamRecord<>("test2", 1L)); + + checkFs(outDir, 2, 0, 0, 0); + + testHarness2.processElement(new StreamRecord<>("test3", 1L)); + testHarness2.processElement(new StreamRecord<>("test4", 1L)); + testHarness2.processElement(new StreamRecord<>("test5", 1L)); + + checkFs(outDir, 5, 0, 0, 0); + + // intentionally we snapshot them in the reverse order so that the states are shuffled + OperatorStateHandles mergedSnapshot = AbstractStreamOperatorTestHarness.repackageState( + testHarness2.snapshot(0, 0), + testHarness1.snapshot(0, 0) + ); + + testHarness1 = createRescalingTestSink(outDir, 3, 0, 100); + testHarness1.setup(); + testHarness1.initializeState(mergedSnapshot); + testHarness1.open(); + + checkFs(outDir, 2, 0, 3, 3); + + testHarness2 = createRescalingTestSink(outDir, 3, 1, 100); + testHarness2.setup(); + testHarness2.initializeState(mergedSnapshot); + testHarness2.open(); + + checkFs(outDir, 0, 0, 5, 5); + + OneInputStreamOperatorTestHarness<String, Object> testHarness3 = createRescalingTestSink(outDir, 3, 2, 100); + testHarness3.setup(); + testHarness3.initializeState(mergedSnapshot); + testHarness3.open(); + + testHarness1.processElement(new StreamRecord<>("test6", 0)); + testHarness2.processElement(new StreamRecord<>("test6", 0)); + testHarness3.processElement(new StreamRecord<>("test6", 0)); + + checkFs(outDir, 3, 0, 5, 5); + + testHarness1.snapshot(1, 0); + testHarness2.snapshot(1, 0); + testHarness3.snapshot(1, 0); + + testHarness1.close(); + testHarness2.close(); + testHarness3.close(); + + checkFs(outDir, 0, 3, 5, 5); + } + + private void checkFs(File outDir, int inprogress, int pending, int completed, int valid) throws IOException { + int inProg = 0; + int pend = 0; + int compl = 0; + int val = 0; + + for (File file: FileUtils.listFiles(outDir, null, true)) { if (file.getAbsolutePath().endsWith("crc")) { continue; } - if (file.getPath().contains("pending")) { - numPending++; - } else if (file.getName().startsWith("part")) { - numComplete++; + String path = file.getPath(); + if (path.endsWith(".in-progress")) { --- End diff -- these strings should be moved into constants fields in this test, and should also be used when creating the sink. > BucketingSink deletes valid data when checkpoint notification is slow. > ---------------------------------------------------------------------- > > Key: FLINK-5056 > URL: https://issues.apache.org/jira/browse/FLINK-5056 > Project: Flink > Issue Type: Bug > Components: filesystem-connector > Affects Versions: 1.1.3 > Reporter: Kostas Kloudas > Assignee: Kostas Kloudas > Fix For: 1.2.0 > > > Currently if BucketingSink receives no data after a checkpoint and then a > notification about a previous checkpoint arrives, it clears its state. This > can > lead to not committing valid data about intermediate checkpoints for whom > a notification has not arrived yet. As a simple sequence that illustrates the > problem: > -> input data > -> snapshot(0) > -> input data > -> snapshot(1) > -> no data > -> notifyCheckpointComplete(0) > the last will clear the state of the Sink without committing as final the > data > that arrived for checkpoint 1. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)