[
https://issues.apache.org/jira/browse/FLINK-5096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683864#comment-15683864
]
ASF GitHub Bot commented on FLINK-5096:
---------------------------------------
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/2845#discussion_r88909481
--- Diff:
flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
---
@@ -638,6 +644,231 @@ public void flatMap(Tuple2<Integer, String> value,
Assert.assertEquals(8, numFiles);
}
+ private static final String PART_PREFIX = "part";
+ private static final String PENDING_SUFFIX = ".pending";
+ private static final String IN_PROGRESS_SUFFIX = ".in-progress";
+ private static final String VALID_LENGTH_SUFFIX = ".valid";
+
+ @Test
+ public void testBucketStateTransitions() throws Exception {
+ final File outDir = tempFolder.newFolder();
+
+ OneInputStreamOperatorTestHarness<String, Object> testHarness =
createRescalingTestSink(outDir, 1, 0);
+ testHarness.setup();
+ testHarness.open();
+
+ testHarness.setProcessingTime(0L);
+
+ testHarness.processElement(new StreamRecord<>("test1", 1L)); //
we have a bucket size of 5 bytes
+ testHarness.processElement(new StreamRecord<>("test2", 1L));
+ checkFs(outDir, 1, 1 ,0, 0);
+
+ testHarness.processElement(new StreamRecord<>("test3", 1L));
+ checkFs(outDir, 1, 2, 0, 0);
+
+ testHarness.snapshot(0, 0);
+ checkFs(outDir, 1, 2, 0, 0);
+
+ testHarness.notifyOfCompletedCheckpoint(0);
+ checkFs(outDir, 1, 0, 2, 0);
+
+ OperatorStateHandles snapshot = testHarness.snapshot(1, 0);
+
+ testHarness.close();
+ checkFs(outDir, 0, 1, 2, 0);
+
+ testHarness = createRescalingTestSink(outDir, 1, 0);
+ testHarness.setup();
+ testHarness.initializeState(snapshot);
+ testHarness.open();
+ checkFs(outDir, 0, 0, 3, 1);
+
+ snapshot = testHarness.snapshot(2, 0);
+
+ testHarness.processElement(new StreamRecord<>("test4", 10));
+ checkFs(outDir, 1, 0, 3, 1);
+
+ testHarness = createRescalingTestSink(outDir, 1, 0);
+ testHarness.setup();
+ testHarness.initializeState(snapshot);
+ testHarness.open();
+
+ // the in-progress file remains as we do not clean up now
+ checkFs(outDir, 1, 0, 3, 1);
+
+ testHarness.close();
+
+ // at close it is not moved to final because it is not part
+ // of the current task's state, it was just a not cleaned up
leftover.
+ checkFs(outDir, 1, 0, 3, 1);
+ }
+
+ @Test
+ public void testScalingDown() throws Exception {
+ final File outDir = tempFolder.newFolder();
+
+ OneInputStreamOperatorTestHarness<String, Object> testHarness1
= createRescalingTestSink(outDir, 3, 0);
+ testHarness1.setup();
+ testHarness1.open();
+
+ OneInputStreamOperatorTestHarness<String, Object> testHarness2
= createRescalingTestSink(outDir, 3, 1);
+ testHarness2.setup();
+ testHarness2.open();
+
+ OneInputStreamOperatorTestHarness<String, Object> testHarness3
= createRescalingTestSink(outDir, 3, 2);
+ testHarness3.setup();
+ testHarness3.open();
+
+ testHarness1.processElement(new StreamRecord<>("test1", 0L));
+ checkFs(outDir, 1, 0, 0, 0);
+
+ testHarness2.processElement(new StreamRecord<>("test2", 0L));
+ checkFs(outDir, 2, 0, 0, 0);
+
+ testHarness3.processElement(new StreamRecord<>("test3", 0L));
+ testHarness3.processElement(new StreamRecord<>("test4", 0L));
+ checkFs(outDir, 3, 1, 0, 0);
+
+ // intentionally we snapshot them in the reverse order so that
the states are shuffled
--- End diff --
3->1->2 is not a reverse order
> Make the RollingSink rescalable.
> --------------------------------
>
> Key: FLINK-5096
> URL: https://issues.apache.org/jira/browse/FLINK-5096
> Project: Flink
> Issue Type: Improvement
> Components: filesystem-connector
> Reporter: Kostas Kloudas
> Assignee: Kostas Kloudas
> Fix For: 1.2.0
>
>
> Integrate the RollingSink with the new state abstractions so that its
> parallelism can change after restoring from a savepoint.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)