[jira] [Commented] (FLINK-5096) Make the RollingSink rescalable.
[ https://issues.apache.org/jira/browse/FLINK-5096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15699567#comment-15699567 ] ASF GitHub Bot commented on FLINK-5096: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2845 > Make the RollingSink rescalable. > > > Key: FLINK-5096 > URL: https://issues.apache.org/jira/browse/FLINK-5096 > Project: Flink > Issue Type: Improvement > Components: filesystem-connector >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.2.0 > > > Integrate the RollingSink with the new state abstractions so that its > parallelism can change after restoring from a savepoint. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5096) Make the RollingSink rescalable.
[ https://issues.apache.org/jira/browse/FLINK-5096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15695816#comment-15695816 ] ASF GitHub Bot commented on FLINK-5096: --- Github user kl0u commented on the issue: https://github.com/apache/flink/pull/2845 Thanks! > Make the RollingSink rescalable. > > > Key: FLINK-5096 > URL: https://issues.apache.org/jira/browse/FLINK-5096 > Project: Flink > Issue Type: Improvement > Components: filesystem-connector >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.2.0 > > > Integrate the RollingSink with the new state abstractions so that its > parallelism can change after restoring from a savepoint. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5096) Make the RollingSink rescalable.
[ https://issues.apache.org/jira/browse/FLINK-5096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15695815#comment-15695815 ] ASF GitHub Bot commented on FLINK-5096: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/2845 Great, will start merging this. > Make the RollingSink rescalable. > > > Key: FLINK-5096 > URL: https://issues.apache.org/jira/browse/FLINK-5096 > Project: Flink > Issue Type: Improvement > Components: filesystem-connector >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.2.0 > > > Integrate the RollingSink with the new state abstractions so that its > parallelism can change after restoring from a savepoint. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5096) Make the RollingSink rescalable.
[ https://issues.apache.org/jira/browse/FLINK-5096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15695628#comment-15695628 ] ASF GitHub Bot commented on FLINK-5096: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/2845#discussion_r89603474 --- Diff: flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java --- @@ -638,6 +644,239 @@ public void flatMap(Tuple2value, Assert.assertEquals(8, numFiles); } + private static final String PART_PREFIX = "part"; + private static final String PENDING_SUFFIX = ".pending"; + private static final String IN_PROGRESS_SUFFIX = ".in-progress"; + private static final String VALID_LENGTH_SUFFIX = ".valid"; + + @Test + public void testBucketStateTransitions() throws Exception { + final File outDir = tempFolder.newFolder(); + + OneInputStreamOperatorTestHarness testHarness = createRescalingTestSink(outDir, 1, 0); + testHarness.setup(); + testHarness.open(); + + testHarness.setProcessingTime(0L); + + // we have a bucket size of 5 bytes, so each record will get its own bucket, + // i.e. the bucket should roll after every record. + + testHarness.processElement(new StreamRecord<>("test1", 1L)); + testHarness.processElement(new StreamRecord<>("test2", 1L)); + checkFs(outDir, 1, 1 ,0, 0); + + testHarness.processElement(new StreamRecord<>("test3", 1L)); + checkFs(outDir, 1, 2, 0, 0); + + testHarness.snapshot(0, 0); + checkFs(outDir, 1, 2, 0, 0); + + testHarness.notifyOfCompletedCheckpoint(0); + checkFs(outDir, 1, 0, 2, 0); + + OperatorStateHandles snapshot = testHarness.snapshot(1, 0); + + testHarness.close(); + checkFs(outDir, 0, 1, 2, 0); + + testHarness = createRescalingTestSink(outDir, 1, 0); + testHarness.setup(); + testHarness.initializeState(snapshot); + testHarness.open(); + checkFs(outDir, 0, 0, 3, 1); + + snapshot = testHarness.snapshot(2, 0); + + testHarness.processElement(new StreamRecord<>("test4", 10)); + checkFs(outDir, 1, 0, 3, 1); + + testHarness = createRescalingTestSink(outDir, 1, 0); + testHarness.setup(); + testHarness.initializeState(snapshot); + testHarness.open(); + + // the in-progress file remains as we do not clean up now + checkFs(outDir, 1, 0, 3, 1); + + testHarness.close(); + + // at close it is not moved to final because it is not part + // of the current task's state, it was just a not cleaned up leftover. + checkFs(outDir, 1, 0, 3, 1); + } + + @Test + public void testScalingDown() throws Exception { + final File outDir = tempFolder.newFolder(); + + OneInputStreamOperatorTestHarness testHarness1 = createRescalingTestSink(outDir, 3, 0); + testHarness1.setup(); + testHarness1.open(); + + OneInputStreamOperatorTestHarness testHarness2 = createRescalingTestSink(outDir, 3, 1); + testHarness2.setup(); + testHarness2.open(); + + OneInputStreamOperatorTestHarness testHarness3 = createRescalingTestSink(outDir, 3, 2); + testHarness3.setup(); + testHarness3.open(); + + testHarness1.processElement(new StreamRecord<>("test1", 0L)); + checkFs(outDir, 1, 0, 0, 0); + + testHarness2.processElement(new StreamRecord<>("test2", 0L)); + checkFs(outDir, 2, 0, 0, 0); + + testHarness3.processElement(new StreamRecord<>("test3", 0L)); + testHarness3.processElement(new StreamRecord<>("test4", 0L)); + checkFs(outDir, 3, 1, 0, 0); + + // intentionally we snapshot them in a not ascending order so that the states are shuffled + OperatorStateHandles mergedSnapshot = AbstractStreamOperatorTestHarness.repackageState( + testHarness3.snapshot(0, 0), + testHarness1.snapshot(0, 0), + testHarness2.snapshot(0, 0) + ); + + //with the above state reshuffling, we expect the new testHarness1 + // to take the state of the previous
[jira] [Commented] (FLINK-5096) Make the RollingSink rescalable.
[ https://issues.apache.org/jira/browse/FLINK-5096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15695629#comment-15695629 ] ASF GitHub Bot commented on FLINK-5096: --- Github user kl0u commented on the issue: https://github.com/apache/flink/pull/2845 Hi @zentol ! Thanks for the review. I integrated the comments. > Make the RollingSink rescalable. > > > Key: FLINK-5096 > URL: https://issues.apache.org/jira/browse/FLINK-5096 > Project: Flink > Issue Type: Improvement > Components: filesystem-connector >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.2.0 > > > Integrate the RollingSink with the new state abstractions so that its > parallelism can change after restoring from a savepoint. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5096) Make the RollingSink rescalable.
[ https://issues.apache.org/jira/browse/FLINK-5096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15695561#comment-15695561 ] ASF GitHub Bot commented on FLINK-5096: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/2845 Just 2 more comments; i can also fix them while merging if you agree with them. > Make the RollingSink rescalable. > > > Key: FLINK-5096 > URL: https://issues.apache.org/jira/browse/FLINK-5096 > Project: Flink > Issue Type: Improvement > Components: filesystem-connector >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.2.0 > > > Integrate the RollingSink with the new state abstractions so that its > parallelism can change after restoring from a savepoint. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5096) Make the RollingSink rescalable.
[ https://issues.apache.org/jira/browse/FLINK-5096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15695560#comment-15695560 ] ASF GitHub Bot commented on FLINK-5096: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2845#discussion_r89597858 --- Diff: flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java --- @@ -638,6 +644,239 @@ public void flatMap(Tuple2value, Assert.assertEquals(8, numFiles); } + private static final String PART_PREFIX = "part"; + private static final String PENDING_SUFFIX = ".pending"; + private static final String IN_PROGRESS_SUFFIX = ".in-progress"; + private static final String VALID_LENGTH_SUFFIX = ".valid"; + + @Test + public void testBucketStateTransitions() throws Exception { + final File outDir = tempFolder.newFolder(); + + OneInputStreamOperatorTestHarness testHarness = createRescalingTestSink(outDir, 1, 0); + testHarness.setup(); + testHarness.open(); + + testHarness.setProcessingTime(0L); + + // we have a bucket size of 5 bytes, so each record will get its own bucket, + // i.e. the bucket should roll after every record. + + testHarness.processElement(new StreamRecord<>("test1", 1L)); + testHarness.processElement(new StreamRecord<>("test2", 1L)); + checkFs(outDir, 1, 1 ,0, 0); + + testHarness.processElement(new StreamRecord<>("test3", 1L)); + checkFs(outDir, 1, 2, 0, 0); + + testHarness.snapshot(0, 0); + checkFs(outDir, 1, 2, 0, 0); + + testHarness.notifyOfCompletedCheckpoint(0); + checkFs(outDir, 1, 0, 2, 0); + + OperatorStateHandles snapshot = testHarness.snapshot(1, 0); + + testHarness.close(); + checkFs(outDir, 0, 1, 2, 0); + + testHarness = createRescalingTestSink(outDir, 1, 0); + testHarness.setup(); + testHarness.initializeState(snapshot); + testHarness.open(); + checkFs(outDir, 0, 0, 3, 1); + + snapshot = testHarness.snapshot(2, 0); + + testHarness.processElement(new StreamRecord<>("test4", 10)); + checkFs(outDir, 1, 0, 3, 1); + + testHarness = createRescalingTestSink(outDir, 1, 0); + testHarness.setup(); + testHarness.initializeState(snapshot); + testHarness.open(); + + // the in-progress file remains as we do not clean up now + checkFs(outDir, 1, 0, 3, 1); + + testHarness.close(); + + // at close it is not moved to final because it is not part + // of the current task's state, it was just a not cleaned up leftover. + checkFs(outDir, 1, 0, 3, 1); + } + + @Test + public void testScalingDown() throws Exception { + final File outDir = tempFolder.newFolder(); + + OneInputStreamOperatorTestHarness testHarness1 = createRescalingTestSink(outDir, 3, 0); + testHarness1.setup(); + testHarness1.open(); + + OneInputStreamOperatorTestHarness testHarness2 = createRescalingTestSink(outDir, 3, 1); + testHarness2.setup(); + testHarness2.open(); + + OneInputStreamOperatorTestHarness testHarness3 = createRescalingTestSink(outDir, 3, 2); + testHarness3.setup(); + testHarness3.open(); + + testHarness1.processElement(new StreamRecord<>("test1", 0L)); + checkFs(outDir, 1, 0, 0, 0); + + testHarness2.processElement(new StreamRecord<>("test2", 0L)); + checkFs(outDir, 2, 0, 0, 0); + + testHarness3.processElement(new StreamRecord<>("test3", 0L)); + testHarness3.processElement(new StreamRecord<>("test4", 0L)); + checkFs(outDir, 3, 1, 0, 0); + + // intentionally we snapshot them in a not ascending order so that the states are shuffled + OperatorStateHandles mergedSnapshot = AbstractStreamOperatorTestHarness.repackageState( + testHarness3.snapshot(0, 0), + testHarness1.snapshot(0, 0), + testHarness2.snapshot(0, 0) + ); + + //with the above state reshuffling, we expect the new testHarness1 + // to take the state of the previous
[jira] [Commented] (FLINK-5096) Make the RollingSink rescalable.
[ https://issues.apache.org/jira/browse/FLINK-5096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15693016#comment-15693016 ] ASF GitHub Bot commented on FLINK-5096: --- Github user kl0u commented on the issue: https://github.com/apache/flink/pull/2845 Thanks for the review @zentol ! I integrated your comments and rebased to the master. Let's see what travis has to say and then merge it if you have no further comments ;) > Make the RollingSink rescalable. > > > Key: FLINK-5096 > URL: https://issues.apache.org/jira/browse/FLINK-5096 > Project: Flink > Issue Type: Improvement > Components: filesystem-connector >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.2.0 > > > Integrate the RollingSink with the new state abstractions so that its > parallelism can change after restoring from a savepoint. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5096) Make the RollingSink rescalable.
[ https://issues.apache.org/jira/browse/FLINK-5096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15683864#comment-15683864 ] ASF GitHub Bot commented on FLINK-5096: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2845#discussion_r88909481 --- Diff: flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java --- @@ -638,6 +644,231 @@ public void flatMap(Tuple2value, Assert.assertEquals(8, numFiles); } + private static final String PART_PREFIX = "part"; + private static final String PENDING_SUFFIX = ".pending"; + private static final String IN_PROGRESS_SUFFIX = ".in-progress"; + private static final String VALID_LENGTH_SUFFIX = ".valid"; + + @Test + public void testBucketStateTransitions() throws Exception { + final File outDir = tempFolder.newFolder(); + + OneInputStreamOperatorTestHarness testHarness = createRescalingTestSink(outDir, 1, 0); + testHarness.setup(); + testHarness.open(); + + testHarness.setProcessingTime(0L); + + testHarness.processElement(new StreamRecord<>("test1", 1L)); // we have a bucket size of 5 bytes + testHarness.processElement(new StreamRecord<>("test2", 1L)); + checkFs(outDir, 1, 1 ,0, 0); + + testHarness.processElement(new StreamRecord<>("test3", 1L)); + checkFs(outDir, 1, 2, 0, 0); + + testHarness.snapshot(0, 0); + checkFs(outDir, 1, 2, 0, 0); + + testHarness.notifyOfCompletedCheckpoint(0); + checkFs(outDir, 1, 0, 2, 0); + + OperatorStateHandles snapshot = testHarness.snapshot(1, 0); + + testHarness.close(); + checkFs(outDir, 0, 1, 2, 0); + + testHarness = createRescalingTestSink(outDir, 1, 0); + testHarness.setup(); + testHarness.initializeState(snapshot); + testHarness.open(); + checkFs(outDir, 0, 0, 3, 1); + + snapshot = testHarness.snapshot(2, 0); + + testHarness.processElement(new StreamRecord<>("test4", 10)); + checkFs(outDir, 1, 0, 3, 1); + + testHarness = createRescalingTestSink(outDir, 1, 0); + testHarness.setup(); + testHarness.initializeState(snapshot); + testHarness.open(); + + // the in-progress file remains as we do not clean up now + checkFs(outDir, 1, 0, 3, 1); + + testHarness.close(); + + // at close it is not moved to final because it is not part + // of the current task's state, it was just a not cleaned up leftover. + checkFs(outDir, 1, 0, 3, 1); + } + + @Test + public void testScalingDown() throws Exception { + final File outDir = tempFolder.newFolder(); + + OneInputStreamOperatorTestHarness testHarness1 = createRescalingTestSink(outDir, 3, 0); + testHarness1.setup(); + testHarness1.open(); + + OneInputStreamOperatorTestHarness testHarness2 = createRescalingTestSink(outDir, 3, 1); + testHarness2.setup(); + testHarness2.open(); + + OneInputStreamOperatorTestHarness testHarness3 = createRescalingTestSink(outDir, 3, 2); + testHarness3.setup(); + testHarness3.open(); + + testHarness1.processElement(new StreamRecord<>("test1", 0L)); + checkFs(outDir, 1, 0, 0, 0); + + testHarness2.processElement(new StreamRecord<>("test2", 0L)); + checkFs(outDir, 2, 0, 0, 0); + + testHarness3.processElement(new StreamRecord<>("test3", 0L)); + testHarness3.processElement(new StreamRecord<>("test4", 0L)); + checkFs(outDir, 3, 1, 0, 0); + + // intentionally we snapshot them in the reverse order so that the states are shuffled --- End diff -- 3->1->2 is not a reverse order > Make the RollingSink rescalable. > > > Key: FLINK-5096 > URL: https://issues.apache.org/jira/browse/FLINK-5096 > Project: Flink > Issue Type: Improvement > Components: filesystem-connector >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.2.0 > > > Integrate the RollingSink with the new state abstractions so that its > parallelism can change after
[jira] [Commented] (FLINK-5096) Make the RollingSink rescalable.
[ https://issues.apache.org/jira/browse/FLINK-5096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15683867#comment-15683867 ] ASF GitHub Bot commented on FLINK-5096: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2845#discussion_r88915712 --- Diff: flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java --- @@ -638,6 +644,231 @@ public void flatMap(Tuple2value, Assert.assertEquals(8, numFiles); } + private static final String PART_PREFIX = "part"; + private static final String PENDING_SUFFIX = ".pending"; + private static final String IN_PROGRESS_SUFFIX = ".in-progress"; + private static final String VALID_LENGTH_SUFFIX = ".valid"; + + @Test + public void testBucketStateTransitions() throws Exception { + final File outDir = tempFolder.newFolder(); + + OneInputStreamOperatorTestHarness testHarness = createRescalingTestSink(outDir, 1, 0); + testHarness.setup(); + testHarness.open(); + + testHarness.setProcessingTime(0L); + + testHarness.processElement(new StreamRecord<>("test1", 1L)); // we have a bucket size of 5 bytes + testHarness.processElement(new StreamRecord<>("test2", 1L)); + checkFs(outDir, 1, 1 ,0, 0); + + testHarness.processElement(new StreamRecord<>("test3", 1L)); + checkFs(outDir, 1, 2, 0, 0); + + testHarness.snapshot(0, 0); + checkFs(outDir, 1, 2, 0, 0); + + testHarness.notifyOfCompletedCheckpoint(0); + checkFs(outDir, 1, 0, 2, 0); + + OperatorStateHandles snapshot = testHarness.snapshot(1, 0); + + testHarness.close(); + checkFs(outDir, 0, 1, 2, 0); + + testHarness = createRescalingTestSink(outDir, 1, 0); + testHarness.setup(); + testHarness.initializeState(snapshot); + testHarness.open(); + checkFs(outDir, 0, 0, 3, 1); + + snapshot = testHarness.snapshot(2, 0); + + testHarness.processElement(new StreamRecord<>("test4", 10)); + checkFs(outDir, 1, 0, 3, 1); + + testHarness = createRescalingTestSink(outDir, 1, 0); + testHarness.setup(); + testHarness.initializeState(snapshot); + testHarness.open(); + + // the in-progress file remains as we do not clean up now + checkFs(outDir, 1, 0, 3, 1); + + testHarness.close(); + + // at close it is not moved to final because it is not part + // of the current task's state, it was just a not cleaned up leftover. + checkFs(outDir, 1, 0, 3, 1); + } + + @Test + public void testScalingDown() throws Exception { + final File outDir = tempFolder.newFolder(); + + OneInputStreamOperatorTestHarness testHarness1 = createRescalingTestSink(outDir, 3, 0); + testHarness1.setup(); + testHarness1.open(); + + OneInputStreamOperatorTestHarness testHarness2 = createRescalingTestSink(outDir, 3, 1); + testHarness2.setup(); + testHarness2.open(); + + OneInputStreamOperatorTestHarness testHarness3 = createRescalingTestSink(outDir, 3, 2); + testHarness3.setup(); + testHarness3.open(); + + testHarness1.processElement(new StreamRecord<>("test1", 0L)); + checkFs(outDir, 1, 0, 0, 0); + + testHarness2.processElement(new StreamRecord<>("test2", 0L)); + checkFs(outDir, 2, 0, 0, 0); + + testHarness3.processElement(new StreamRecord<>("test3", 0L)); + testHarness3.processElement(new StreamRecord<>("test4", 0L)); + checkFs(outDir, 3, 1, 0, 0); + + // intentionally we snapshot them in the reverse order so that the states are shuffled + OperatorStateHandles mergedSnapshot = AbstractStreamOperatorTestHarness.repackageState( + testHarness3.snapshot(0, 0), + testHarness1.snapshot(0, 0), + testHarness2.snapshot(0, 0) + ); + + testHarness1 = createRescalingTestSink(outDir, 2, 0); + testHarness1.setup(); + testHarness1.initializeState(mergedSnapshot); + testHarness1.open(); + + // because we do not have a pending for part-2-0
[jira] [Commented] (FLINK-5096) Make the RollingSink rescalable.
[ https://issues.apache.org/jira/browse/FLINK-5096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15683868#comment-15683868 ] ASF GitHub Bot commented on FLINK-5096: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2845#discussion_r88917138 --- Diff: flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java --- @@ -638,6 +644,231 @@ public void flatMap(Tuple2value, Assert.assertEquals(8, numFiles); } + private static final String PART_PREFIX = "part"; + private static final String PENDING_SUFFIX = ".pending"; + private static final String IN_PROGRESS_SUFFIX = ".in-progress"; + private static final String VALID_LENGTH_SUFFIX = ".valid"; + + @Test + public void testBucketStateTransitions() throws Exception { + final File outDir = tempFolder.newFolder(); + + OneInputStreamOperatorTestHarness testHarness = createRescalingTestSink(outDir, 1, 0); + testHarness.setup(); + testHarness.open(); + + testHarness.setProcessingTime(0L); + + testHarness.processElement(new StreamRecord<>("test1", 1L)); // we have a bucket size of 5 bytes + testHarness.processElement(new StreamRecord<>("test2", 1L)); + checkFs(outDir, 1, 1 ,0, 0); + + testHarness.processElement(new StreamRecord<>("test3", 1L)); + checkFs(outDir, 1, 2, 0, 0); + + testHarness.snapshot(0, 0); + checkFs(outDir, 1, 2, 0, 0); + + testHarness.notifyOfCompletedCheckpoint(0); + checkFs(outDir, 1, 0, 2, 0); + + OperatorStateHandles snapshot = testHarness.snapshot(1, 0); + + testHarness.close(); + checkFs(outDir, 0, 1, 2, 0); + + testHarness = createRescalingTestSink(outDir, 1, 0); + testHarness.setup(); + testHarness.initializeState(snapshot); + testHarness.open(); + checkFs(outDir, 0, 0, 3, 1); + + snapshot = testHarness.snapshot(2, 0); + + testHarness.processElement(new StreamRecord<>("test4", 10)); + checkFs(outDir, 1, 0, 3, 1); + + testHarness = createRescalingTestSink(outDir, 1, 0); + testHarness.setup(); + testHarness.initializeState(snapshot); + testHarness.open(); + + // the in-progress file remains as we do not clean up now + checkFs(outDir, 1, 0, 3, 1); + + testHarness.close(); + + // at close it is not moved to final because it is not part + // of the current task's state, it was just a not cleaned up leftover. + checkFs(outDir, 1, 0, 3, 1); + } + + @Test + public void testScalingDown() throws Exception { + final File outDir = tempFolder.newFolder(); + + OneInputStreamOperatorTestHarness testHarness1 = createRescalingTestSink(outDir, 3, 0); + testHarness1.setup(); + testHarness1.open(); + + OneInputStreamOperatorTestHarness testHarness2 = createRescalingTestSink(outDir, 3, 1); + testHarness2.setup(); + testHarness2.open(); + + OneInputStreamOperatorTestHarness testHarness3 = createRescalingTestSink(outDir, 3, 2); + testHarness3.setup(); + testHarness3.open(); + + testHarness1.processElement(new StreamRecord<>("test1", 0L)); + checkFs(outDir, 1, 0, 0, 0); + + testHarness2.processElement(new StreamRecord<>("test2", 0L)); + checkFs(outDir, 2, 0, 0, 0); + + testHarness3.processElement(new StreamRecord<>("test3", 0L)); + testHarness3.processElement(new StreamRecord<>("test4", 0L)); + checkFs(outDir, 3, 1, 0, 0); + + // intentionally we snapshot them in the reverse order so that the states are shuffled + OperatorStateHandles mergedSnapshot = AbstractStreamOperatorTestHarness.repackageState( + testHarness3.snapshot(0, 0), + testHarness1.snapshot(0, 0), + testHarness2.snapshot(0, 0) + ); + + testHarness1 = createRescalingTestSink(outDir, 2, 0); + testHarness1.setup(); + testHarness1.initializeState(mergedSnapshot); + testHarness1.open(); + + // because we do not have a pending for part-2-0
[jira] [Commented] (FLINK-5096) Make the RollingSink rescalable.
[ https://issues.apache.org/jira/browse/FLINK-5096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15683866#comment-15683866 ] ASF GitHub Bot commented on FLINK-5096: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2845#discussion_r88910876 --- Diff: flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java --- @@ -638,6 +644,231 @@ public void flatMap(Tuple2value, Assert.assertEquals(8, numFiles); } + private static final String PART_PREFIX = "part"; + private static final String PENDING_SUFFIX = ".pending"; + private static final String IN_PROGRESS_SUFFIX = ".in-progress"; + private static final String VALID_LENGTH_SUFFIX = ".valid"; + + @Test + public void testBucketStateTransitions() throws Exception { + final File outDir = tempFolder.newFolder(); + + OneInputStreamOperatorTestHarness testHarness = createRescalingTestSink(outDir, 1, 0); + testHarness.setup(); + testHarness.open(); + + testHarness.setProcessingTime(0L); + + testHarness.processElement(new StreamRecord<>("test1", 1L)); // we have a bucket size of 5 bytes --- End diff -- the comment should include the conclusion the reader is supposed to arrive on: that every record goes in a separate file. > Make the RollingSink rescalable. > > > Key: FLINK-5096 > URL: https://issues.apache.org/jira/browse/FLINK-5096 > Project: Flink > Issue Type: Improvement > Components: filesystem-connector >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.2.0 > > > Integrate the RollingSink with the new state abstractions so that its > parallelism can change after restoring from a savepoint. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5096) Make the RollingSink rescalable.
[ https://issues.apache.org/jira/browse/FLINK-5096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15683865#comment-15683865 ] ASF GitHub Bot commented on FLINK-5096: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2845#discussion_r88917513 --- Diff: flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java --- @@ -638,6 +644,231 @@ public void flatMap(Tuple2value, Assert.assertEquals(8, numFiles); } + private static final String PART_PREFIX = "part"; + private static final String PENDING_SUFFIX = ".pending"; + private static final String IN_PROGRESS_SUFFIX = ".in-progress"; + private static final String VALID_LENGTH_SUFFIX = ".valid"; + + @Test + public void testBucketStateTransitions() throws Exception { + final File outDir = tempFolder.newFolder(); + + OneInputStreamOperatorTestHarness testHarness = createRescalingTestSink(outDir, 1, 0); + testHarness.setup(); + testHarness.open(); + + testHarness.setProcessingTime(0L); + + testHarness.processElement(new StreamRecord<>("test1", 1L)); // we have a bucket size of 5 bytes + testHarness.processElement(new StreamRecord<>("test2", 1L)); + checkFs(outDir, 1, 1 ,0, 0); + + testHarness.processElement(new StreamRecord<>("test3", 1L)); + checkFs(outDir, 1, 2, 0, 0); + + testHarness.snapshot(0, 0); + checkFs(outDir, 1, 2, 0, 0); + + testHarness.notifyOfCompletedCheckpoint(0); + checkFs(outDir, 1, 0, 2, 0); + + OperatorStateHandles snapshot = testHarness.snapshot(1, 0); + + testHarness.close(); + checkFs(outDir, 0, 1, 2, 0); + + testHarness = createRescalingTestSink(outDir, 1, 0); + testHarness.setup(); + testHarness.initializeState(snapshot); + testHarness.open(); + checkFs(outDir, 0, 0, 3, 1); + + snapshot = testHarness.snapshot(2, 0); + + testHarness.processElement(new StreamRecord<>("test4", 10)); + checkFs(outDir, 1, 0, 3, 1); + + testHarness = createRescalingTestSink(outDir, 1, 0); + testHarness.setup(); + testHarness.initializeState(snapshot); + testHarness.open(); + + // the in-progress file remains as we do not clean up now + checkFs(outDir, 1, 0, 3, 1); + + testHarness.close(); + + // at close it is not moved to final because it is not part + // of the current task's state, it was just a not cleaned up leftover. + checkFs(outDir, 1, 0, 3, 1); + } + + @Test + public void testScalingDown() throws Exception { + final File outDir = tempFolder.newFolder(); + + OneInputStreamOperatorTestHarness testHarness1 = createRescalingTestSink(outDir, 3, 0); + testHarness1.setup(); + testHarness1.open(); + + OneInputStreamOperatorTestHarness testHarness2 = createRescalingTestSink(outDir, 3, 1); + testHarness2.setup(); + testHarness2.open(); + + OneInputStreamOperatorTestHarness testHarness3 = createRescalingTestSink(outDir, 3, 2); + testHarness3.setup(); + testHarness3.open(); + + testHarness1.processElement(new StreamRecord<>("test1", 0L)); + checkFs(outDir, 1, 0, 0, 0); + + testHarness2.processElement(new StreamRecord<>("test2", 0L)); + checkFs(outDir, 2, 0, 0, 0); + + testHarness3.processElement(new StreamRecord<>("test3", 0L)); + testHarness3.processElement(new StreamRecord<>("test4", 0L)); + checkFs(outDir, 3, 1, 0, 0); + + // intentionally we snapshot them in the reverse order so that the states are shuffled + OperatorStateHandles mergedSnapshot = AbstractStreamOperatorTestHarness.repackageState( + testHarness3.snapshot(0, 0), + testHarness1.snapshot(0, 0), + testHarness2.snapshot(0, 0) + ); + + testHarness1 = createRescalingTestSink(outDir, 2, 0); --- End diff -- What happens here would be easier to understand if you would say which sink receives which states. > Make the RollingSink rescalable. > > >
[jira] [Commented] (FLINK-5096) Make the RollingSink rescalable.
[ https://issues.apache.org/jira/browse/FLINK-5096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15683723#comment-15683723 ] ASF GitHub Bot commented on FLINK-5096: --- GitHub user kl0u opened a pull request: https://github.com/apache/flink/pull/2845 [FLINK-5096] Make the RollingSink rescalable. This PR integrates the RollingSink with the new state abstractions, so that the sink can resume execution from a checkpoint, with different parallelism, without reducing the provided exactly once guarantees. R @zentol You can merge this pull request into a Git repository by running: $ git pull https://github.com/kl0u/flink rolling-ref Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2845.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2845 commit e581d082740ec98ea47f0321599f1d18deede9db Author: kl0uDate: 2016-11-10T15:57:20Z [FLINK-5096] Make the RollingSink rescalable. Integrates the RollingSink with the new state abstractions so that its parallelism can change after resuming execution from a savepoint. > Make the RollingSink rescalable. > > > Key: FLINK-5096 > URL: https://issues.apache.org/jira/browse/FLINK-5096 > Project: Flink > Issue Type: Improvement > Components: filesystem-connector >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.2.0 > > > Integrate the RollingSink with the new state abstractions so that its > parallelism can change after restoring from a savepoint. -- This message was sent by Atlassian JIRA (v6.3.4#6332)