[GitHub] flink pull request #2845: [FLINK-5096] Make the RollingSink rescalable.

2016-11-27 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2845


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2845: [FLINK-5096] Make the RollingSink rescalable.

2016-11-25 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/2845#discussion_r89603474
  
--- Diff: 
flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
 ---
@@ -638,6 +644,239 @@ public void flatMap(Tuple2 value,
Assert.assertEquals(8, numFiles);
}
 
+   private static final String PART_PREFIX = "part";
+   private static final String PENDING_SUFFIX = ".pending";
+   private static final String IN_PROGRESS_SUFFIX = ".in-progress";
+   private static final String VALID_LENGTH_SUFFIX = ".valid";
+
+   @Test
+   public void testBucketStateTransitions() throws Exception {
+   final File outDir = tempFolder.newFolder();
+
+   OneInputStreamOperatorTestHarness testHarness = 
createRescalingTestSink(outDir, 1, 0);
+   testHarness.setup();
+   testHarness.open();
+
+   testHarness.setProcessingTime(0L);
+
+   // we have a bucket size of 5 bytes, so each record will get 
its own bucket,
+   // i.e. the bucket should roll after every record.
+
+   testHarness.processElement(new StreamRecord<>("test1", 1L));
+   testHarness.processElement(new StreamRecord<>("test2", 1L));
+   checkFs(outDir, 1, 1 ,0, 0);
+
+   testHarness.processElement(new StreamRecord<>("test3", 1L));
+   checkFs(outDir, 1, 2, 0, 0);
+
+   testHarness.snapshot(0, 0);
+   checkFs(outDir, 1, 2, 0, 0);
+
+   testHarness.notifyOfCompletedCheckpoint(0);
+   checkFs(outDir, 1, 0, 2, 0);
+
+   OperatorStateHandles snapshot = testHarness.snapshot(1, 0);
+
+   testHarness.close();
+   checkFs(outDir, 0, 1, 2, 0);
+
+   testHarness = createRescalingTestSink(outDir, 1, 0);
+   testHarness.setup();
+   testHarness.initializeState(snapshot);
+   testHarness.open();
+   checkFs(outDir, 0, 0, 3, 1);
+
+   snapshot = testHarness.snapshot(2, 0);
+
+   testHarness.processElement(new StreamRecord<>("test4", 10));
+   checkFs(outDir, 1, 0, 3, 1);
+
+   testHarness = createRescalingTestSink(outDir, 1, 0);
+   testHarness.setup();
+   testHarness.initializeState(snapshot);
+   testHarness.open();
+
+   // the in-progress file remains as we do not clean up now
+   checkFs(outDir, 1, 0, 3, 1);
+
+   testHarness.close();
+
+   // at close it is not moved to final because it is not part
+   // of the current task's state, it was just a not cleaned up 
leftover.
+   checkFs(outDir, 1, 0, 3, 1);
+   }
+
+   @Test
+   public void testScalingDown() throws Exception {
+   final File outDir = tempFolder.newFolder();
+
+   OneInputStreamOperatorTestHarness testHarness1 
= createRescalingTestSink(outDir, 3, 0);
+   testHarness1.setup();
+   testHarness1.open();
+
+   OneInputStreamOperatorTestHarness testHarness2 
= createRescalingTestSink(outDir, 3, 1);
+   testHarness2.setup();
+   testHarness2.open();
+
+   OneInputStreamOperatorTestHarness testHarness3 
= createRescalingTestSink(outDir, 3, 2);
+   testHarness3.setup();
+   testHarness3.open();
+
+   testHarness1.processElement(new StreamRecord<>("test1", 0L));
+   checkFs(outDir, 1, 0, 0, 0);
+
+   testHarness2.processElement(new StreamRecord<>("test2", 0L));
+   checkFs(outDir, 2, 0, 0, 0);
+
+   testHarness3.processElement(new StreamRecord<>("test3", 0L));
+   testHarness3.processElement(new StreamRecord<>("test4", 0L));
+   checkFs(outDir, 3, 1, 0, 0);
+
+   // intentionally we snapshot them in a not ascending order so 
that the states are shuffled
+   OperatorStateHandles mergedSnapshot = 
AbstractStreamOperatorTestHarness.repackageState(
+   testHarness3.snapshot(0, 0),
+   testHarness1.snapshot(0, 0),
+   testHarness2.snapshot(0, 0)
+   );
+
+   //with the above state reshuffling, we expect the new 
testHarness1
+   // to take the state of the previous testHarness3 and 
testHarness2
--- End diff --

Yes, you are right.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastruct

[GitHub] flink pull request #2845: [FLINK-5096] Make the RollingSink rescalable.

2016-11-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2845#discussion_r89597858
  
--- Diff: 
flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
 ---
@@ -638,6 +644,239 @@ public void flatMap(Tuple2 value,
Assert.assertEquals(8, numFiles);
}
 
+   private static final String PART_PREFIX = "part";
+   private static final String PENDING_SUFFIX = ".pending";
+   private static final String IN_PROGRESS_SUFFIX = ".in-progress";
+   private static final String VALID_LENGTH_SUFFIX = ".valid";
+
+   @Test
+   public void testBucketStateTransitions() throws Exception {
+   final File outDir = tempFolder.newFolder();
+
+   OneInputStreamOperatorTestHarness testHarness = 
createRescalingTestSink(outDir, 1, 0);
+   testHarness.setup();
+   testHarness.open();
+
+   testHarness.setProcessingTime(0L);
+
+   // we have a bucket size of 5 bytes, so each record will get 
its own bucket,
+   // i.e. the bucket should roll after every record.
+
+   testHarness.processElement(new StreamRecord<>("test1", 1L));
+   testHarness.processElement(new StreamRecord<>("test2", 1L));
+   checkFs(outDir, 1, 1 ,0, 0);
+
+   testHarness.processElement(new StreamRecord<>("test3", 1L));
+   checkFs(outDir, 1, 2, 0, 0);
+
+   testHarness.snapshot(0, 0);
+   checkFs(outDir, 1, 2, 0, 0);
+
+   testHarness.notifyOfCompletedCheckpoint(0);
+   checkFs(outDir, 1, 0, 2, 0);
+
+   OperatorStateHandles snapshot = testHarness.snapshot(1, 0);
+
+   testHarness.close();
+   checkFs(outDir, 0, 1, 2, 0);
+
+   testHarness = createRescalingTestSink(outDir, 1, 0);
+   testHarness.setup();
+   testHarness.initializeState(snapshot);
+   testHarness.open();
+   checkFs(outDir, 0, 0, 3, 1);
+
+   snapshot = testHarness.snapshot(2, 0);
+
+   testHarness.processElement(new StreamRecord<>("test4", 10));
+   checkFs(outDir, 1, 0, 3, 1);
+
+   testHarness = createRescalingTestSink(outDir, 1, 0);
+   testHarness.setup();
+   testHarness.initializeState(snapshot);
+   testHarness.open();
+
+   // the in-progress file remains as we do not clean up now
+   checkFs(outDir, 1, 0, 3, 1);
+
+   testHarness.close();
+
+   // at close it is not moved to final because it is not part
+   // of the current task's state, it was just a not cleaned up 
leftover.
+   checkFs(outDir, 1, 0, 3, 1);
+   }
+
+   @Test
+   public void testScalingDown() throws Exception {
+   final File outDir = tempFolder.newFolder();
+
+   OneInputStreamOperatorTestHarness testHarness1 
= createRescalingTestSink(outDir, 3, 0);
+   testHarness1.setup();
+   testHarness1.open();
+
+   OneInputStreamOperatorTestHarness testHarness2 
= createRescalingTestSink(outDir, 3, 1);
+   testHarness2.setup();
+   testHarness2.open();
+
+   OneInputStreamOperatorTestHarness testHarness3 
= createRescalingTestSink(outDir, 3, 2);
+   testHarness3.setup();
+   testHarness3.open();
+
+   testHarness1.processElement(new StreamRecord<>("test1", 0L));
+   checkFs(outDir, 1, 0, 0, 0);
+
+   testHarness2.processElement(new StreamRecord<>("test2", 0L));
+   checkFs(outDir, 2, 0, 0, 0);
+
+   testHarness3.processElement(new StreamRecord<>("test3", 0L));
+   testHarness3.processElement(new StreamRecord<>("test4", 0L));
+   checkFs(outDir, 3, 1, 0, 0);
+
+   // intentionally we snapshot them in a not ascending order so 
that the states are shuffled
+   OperatorStateHandles mergedSnapshot = 
AbstractStreamOperatorTestHarness.repackageState(
+   testHarness3.snapshot(0, 0),
+   testHarness1.snapshot(0, 0),
+   testHarness2.snapshot(0, 0)
+   );
+
+   //with the above state reshuffling, we expect the new 
testHarness1
+   // to take the state of the previous testHarness3 and 
testHarness2
--- End diff --

doesn't harness1 receive the state of 3 and 1?

What do you think about having the testHarness2 process 5 elements? This 
way one could always accurately deduce which of the subsequent harnesses got 
which state.


---
If your project is set up for it, you can r

[GitHub] flink pull request #2845: [FLINK-5096] Make the RollingSink rescalable.

2016-11-21 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2845#discussion_r88917513
  
--- Diff: 
flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
 ---
@@ -638,6 +644,231 @@ public void flatMap(Tuple2 value,
Assert.assertEquals(8, numFiles);
}
 
+   private static final String PART_PREFIX = "part";
+   private static final String PENDING_SUFFIX = ".pending";
+   private static final String IN_PROGRESS_SUFFIX = ".in-progress";
+   private static final String VALID_LENGTH_SUFFIX = ".valid";
+
+   @Test
+   public void testBucketStateTransitions() throws Exception {
+   final File outDir = tempFolder.newFolder();
+
+   OneInputStreamOperatorTestHarness testHarness = 
createRescalingTestSink(outDir, 1, 0);
+   testHarness.setup();
+   testHarness.open();
+
+   testHarness.setProcessingTime(0L);
+
+   testHarness.processElement(new StreamRecord<>("test1", 1L)); // 
we have a bucket size of 5 bytes
+   testHarness.processElement(new StreamRecord<>("test2", 1L));
+   checkFs(outDir, 1, 1 ,0, 0);
+
+   testHarness.processElement(new StreamRecord<>("test3", 1L));
+   checkFs(outDir, 1, 2, 0, 0);
+
+   testHarness.snapshot(0, 0);
+   checkFs(outDir, 1, 2, 0, 0);
+
+   testHarness.notifyOfCompletedCheckpoint(0);
+   checkFs(outDir, 1, 0, 2, 0);
+
+   OperatorStateHandles snapshot = testHarness.snapshot(1, 0);
+
+   testHarness.close();
+   checkFs(outDir, 0, 1, 2, 0);
+
+   testHarness = createRescalingTestSink(outDir, 1, 0);
+   testHarness.setup();
+   testHarness.initializeState(snapshot);
+   testHarness.open();
+   checkFs(outDir, 0, 0, 3, 1);
+
+   snapshot = testHarness.snapshot(2, 0);
+
+   testHarness.processElement(new StreamRecord<>("test4", 10));
+   checkFs(outDir, 1, 0, 3, 1);
+
+   testHarness = createRescalingTestSink(outDir, 1, 0);
+   testHarness.setup();
+   testHarness.initializeState(snapshot);
+   testHarness.open();
+
+   // the in-progress file remains as we do not clean up now
+   checkFs(outDir, 1, 0, 3, 1);
+
+   testHarness.close();
+
+   // at close it is not moved to final because it is not part
+   // of the current task's state, it was just a not cleaned up 
leftover.
+   checkFs(outDir, 1, 0, 3, 1);
+   }
+
+   @Test
+   public void testScalingDown() throws Exception {
+   final File outDir = tempFolder.newFolder();
+
+   OneInputStreamOperatorTestHarness testHarness1 
= createRescalingTestSink(outDir, 3, 0);
+   testHarness1.setup();
+   testHarness1.open();
+
+   OneInputStreamOperatorTestHarness testHarness2 
= createRescalingTestSink(outDir, 3, 1);
+   testHarness2.setup();
+   testHarness2.open();
+
+   OneInputStreamOperatorTestHarness testHarness3 
= createRescalingTestSink(outDir, 3, 2);
+   testHarness3.setup();
+   testHarness3.open();
+
+   testHarness1.processElement(new StreamRecord<>("test1", 0L));
+   checkFs(outDir, 1, 0, 0, 0);
+
+   testHarness2.processElement(new StreamRecord<>("test2", 0L));
+   checkFs(outDir, 2, 0, 0, 0);
+
+   testHarness3.processElement(new StreamRecord<>("test3", 0L));
+   testHarness3.processElement(new StreamRecord<>("test4", 0L));
+   checkFs(outDir, 3, 1, 0, 0);
+
+   // intentionally we snapshot them in the reverse order so that 
the states are shuffled
+   OperatorStateHandles mergedSnapshot = 
AbstractStreamOperatorTestHarness.repackageState(
+   testHarness3.snapshot(0, 0),
+   testHarness1.snapshot(0, 0),
+   testHarness2.snapshot(0, 0)
+   );
+
+   testHarness1 = createRescalingTestSink(outDir, 2, 0);
--- End diff --

What happens here would be easier to understand if you would say which sink 
receives which states.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2845: [FLINK-5096] Make the RollingSink rescalable.

2016-11-21 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2845#discussion_r88910876
  
--- Diff: 
flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
 ---
@@ -638,6 +644,231 @@ public void flatMap(Tuple2 value,
Assert.assertEquals(8, numFiles);
}
 
+   private static final String PART_PREFIX = "part";
+   private static final String PENDING_SUFFIX = ".pending";
+   private static final String IN_PROGRESS_SUFFIX = ".in-progress";
+   private static final String VALID_LENGTH_SUFFIX = ".valid";
+
+   @Test
+   public void testBucketStateTransitions() throws Exception {
+   final File outDir = tempFolder.newFolder();
+
+   OneInputStreamOperatorTestHarness testHarness = 
createRescalingTestSink(outDir, 1, 0);
+   testHarness.setup();
+   testHarness.open();
+
+   testHarness.setProcessingTime(0L);
+
+   testHarness.processElement(new StreamRecord<>("test1", 1L)); // 
we have a bucket size of 5 bytes
--- End diff --

the comment should include the conclusion the reader is supposed to arrive 
on: that every record goes in a separate file.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2845: [FLINK-5096] Make the RollingSink rescalable.

2016-11-21 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2845#discussion_r88909481
  
--- Diff: 
flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
 ---
@@ -638,6 +644,231 @@ public void flatMap(Tuple2 value,
Assert.assertEquals(8, numFiles);
}
 
+   private static final String PART_PREFIX = "part";
+   private static final String PENDING_SUFFIX = ".pending";
+   private static final String IN_PROGRESS_SUFFIX = ".in-progress";
+   private static final String VALID_LENGTH_SUFFIX = ".valid";
+
+   @Test
+   public void testBucketStateTransitions() throws Exception {
+   final File outDir = tempFolder.newFolder();
+
+   OneInputStreamOperatorTestHarness testHarness = 
createRescalingTestSink(outDir, 1, 0);
+   testHarness.setup();
+   testHarness.open();
+
+   testHarness.setProcessingTime(0L);
+
+   testHarness.processElement(new StreamRecord<>("test1", 1L)); // 
we have a bucket size of 5 bytes
+   testHarness.processElement(new StreamRecord<>("test2", 1L));
+   checkFs(outDir, 1, 1 ,0, 0);
+
+   testHarness.processElement(new StreamRecord<>("test3", 1L));
+   checkFs(outDir, 1, 2, 0, 0);
+
+   testHarness.snapshot(0, 0);
+   checkFs(outDir, 1, 2, 0, 0);
+
+   testHarness.notifyOfCompletedCheckpoint(0);
+   checkFs(outDir, 1, 0, 2, 0);
+
+   OperatorStateHandles snapshot = testHarness.snapshot(1, 0);
+
+   testHarness.close();
+   checkFs(outDir, 0, 1, 2, 0);
+
+   testHarness = createRescalingTestSink(outDir, 1, 0);
+   testHarness.setup();
+   testHarness.initializeState(snapshot);
+   testHarness.open();
+   checkFs(outDir, 0, 0, 3, 1);
+
+   snapshot = testHarness.snapshot(2, 0);
+
+   testHarness.processElement(new StreamRecord<>("test4", 10));
+   checkFs(outDir, 1, 0, 3, 1);
+
+   testHarness = createRescalingTestSink(outDir, 1, 0);
+   testHarness.setup();
+   testHarness.initializeState(snapshot);
+   testHarness.open();
+
+   // the in-progress file remains as we do not clean up now
+   checkFs(outDir, 1, 0, 3, 1);
+
+   testHarness.close();
+
+   // at close it is not moved to final because it is not part
+   // of the current task's state, it was just a not cleaned up 
leftover.
+   checkFs(outDir, 1, 0, 3, 1);
+   }
+
+   @Test
+   public void testScalingDown() throws Exception {
+   final File outDir = tempFolder.newFolder();
+
+   OneInputStreamOperatorTestHarness testHarness1 
= createRescalingTestSink(outDir, 3, 0);
+   testHarness1.setup();
+   testHarness1.open();
+
+   OneInputStreamOperatorTestHarness testHarness2 
= createRescalingTestSink(outDir, 3, 1);
+   testHarness2.setup();
+   testHarness2.open();
+
+   OneInputStreamOperatorTestHarness testHarness3 
= createRescalingTestSink(outDir, 3, 2);
+   testHarness3.setup();
+   testHarness3.open();
+
+   testHarness1.processElement(new StreamRecord<>("test1", 0L));
+   checkFs(outDir, 1, 0, 0, 0);
+
+   testHarness2.processElement(new StreamRecord<>("test2", 0L));
+   checkFs(outDir, 2, 0, 0, 0);
+
+   testHarness3.processElement(new StreamRecord<>("test3", 0L));
+   testHarness3.processElement(new StreamRecord<>("test4", 0L));
+   checkFs(outDir, 3, 1, 0, 0);
+
+   // intentionally we snapshot them in the reverse order so that 
the states are shuffled
--- End diff --

3->1->2 is not a reverse order


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2845: [FLINK-5096] Make the RollingSink rescalable.

2016-11-21 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2845#discussion_r88917138
  
--- Diff: 
flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
 ---
@@ -638,6 +644,231 @@ public void flatMap(Tuple2 value,
Assert.assertEquals(8, numFiles);
}
 
+   private static final String PART_PREFIX = "part";
+   private static final String PENDING_SUFFIX = ".pending";
+   private static final String IN_PROGRESS_SUFFIX = ".in-progress";
+   private static final String VALID_LENGTH_SUFFIX = ".valid";
+
+   @Test
+   public void testBucketStateTransitions() throws Exception {
+   final File outDir = tempFolder.newFolder();
+
+   OneInputStreamOperatorTestHarness testHarness = 
createRescalingTestSink(outDir, 1, 0);
+   testHarness.setup();
+   testHarness.open();
+
+   testHarness.setProcessingTime(0L);
+
+   testHarness.processElement(new StreamRecord<>("test1", 1L)); // 
we have a bucket size of 5 bytes
+   testHarness.processElement(new StreamRecord<>("test2", 1L));
+   checkFs(outDir, 1, 1 ,0, 0);
+
+   testHarness.processElement(new StreamRecord<>("test3", 1L));
+   checkFs(outDir, 1, 2, 0, 0);
+
+   testHarness.snapshot(0, 0);
+   checkFs(outDir, 1, 2, 0, 0);
+
+   testHarness.notifyOfCompletedCheckpoint(0);
+   checkFs(outDir, 1, 0, 2, 0);
+
+   OperatorStateHandles snapshot = testHarness.snapshot(1, 0);
+
+   testHarness.close();
+   checkFs(outDir, 0, 1, 2, 0);
+
+   testHarness = createRescalingTestSink(outDir, 1, 0);
+   testHarness.setup();
+   testHarness.initializeState(snapshot);
+   testHarness.open();
+   checkFs(outDir, 0, 0, 3, 1);
+
+   snapshot = testHarness.snapshot(2, 0);
+
+   testHarness.processElement(new StreamRecord<>("test4", 10));
+   checkFs(outDir, 1, 0, 3, 1);
+
+   testHarness = createRescalingTestSink(outDir, 1, 0);
+   testHarness.setup();
+   testHarness.initializeState(snapshot);
+   testHarness.open();
+
+   // the in-progress file remains as we do not clean up now
+   checkFs(outDir, 1, 0, 3, 1);
+
+   testHarness.close();
+
+   // at close it is not moved to final because it is not part
+   // of the current task's state, it was just a not cleaned up 
leftover.
+   checkFs(outDir, 1, 0, 3, 1);
+   }
+
+   @Test
+   public void testScalingDown() throws Exception {
+   final File outDir = tempFolder.newFolder();
+
+   OneInputStreamOperatorTestHarness testHarness1 
= createRescalingTestSink(outDir, 3, 0);
+   testHarness1.setup();
+   testHarness1.open();
+
+   OneInputStreamOperatorTestHarness testHarness2 
= createRescalingTestSink(outDir, 3, 1);
+   testHarness2.setup();
+   testHarness2.open();
+
+   OneInputStreamOperatorTestHarness testHarness3 
= createRescalingTestSink(outDir, 3, 2);
+   testHarness3.setup();
+   testHarness3.open();
+
+   testHarness1.processElement(new StreamRecord<>("test1", 0L));
+   checkFs(outDir, 1, 0, 0, 0);
+
+   testHarness2.processElement(new StreamRecord<>("test2", 0L));
+   checkFs(outDir, 2, 0, 0, 0);
+
+   testHarness3.processElement(new StreamRecord<>("test3", 0L));
+   testHarness3.processElement(new StreamRecord<>("test4", 0L));
+   checkFs(outDir, 3, 1, 0, 0);
+
+   // intentionally we snapshot them in the reverse order so that 
the states are shuffled
+   OperatorStateHandles mergedSnapshot = 
AbstractStreamOperatorTestHarness.repackageState(
+   testHarness3.snapshot(0, 0),
+   testHarness1.snapshot(0, 0),
+   testHarness2.snapshot(0, 0)
+   );
+
+   testHarness1 = createRescalingTestSink(outDir, 2, 0);
+   testHarness1.setup();
+   testHarness1.initializeState(mergedSnapshot);
+   testHarness1.open();
+
+   // because we do not have a pending for part-2-0
+   checkFs(outDir, 1, 0, 3, 2);
+
+   testHarness2 = createRescalingTestSink(outDir, 2, 1);
+   testHarness2.setup();
+   testHarness2.initializeState(mergedSnapshot);
+   testHarness2.open();
+
+   checkFs(outDir, 0, 0, 4, 3);
+   }

[GitHub] flink pull request #2845: [FLINK-5096] Make the RollingSink rescalable.

2016-11-21 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2845#discussion_r88915712
  
--- Diff: 
flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
 ---
@@ -638,6 +644,231 @@ public void flatMap(Tuple2 value,
Assert.assertEquals(8, numFiles);
}
 
+   private static final String PART_PREFIX = "part";
+   private static final String PENDING_SUFFIX = ".pending";
+   private static final String IN_PROGRESS_SUFFIX = ".in-progress";
+   private static final String VALID_LENGTH_SUFFIX = ".valid";
+
+   @Test
+   public void testBucketStateTransitions() throws Exception {
+   final File outDir = tempFolder.newFolder();
+
+   OneInputStreamOperatorTestHarness testHarness = 
createRescalingTestSink(outDir, 1, 0);
+   testHarness.setup();
+   testHarness.open();
+
+   testHarness.setProcessingTime(0L);
+
+   testHarness.processElement(new StreamRecord<>("test1", 1L)); // 
we have a bucket size of 5 bytes
+   testHarness.processElement(new StreamRecord<>("test2", 1L));
+   checkFs(outDir, 1, 1 ,0, 0);
+
+   testHarness.processElement(new StreamRecord<>("test3", 1L));
+   checkFs(outDir, 1, 2, 0, 0);
+
+   testHarness.snapshot(0, 0);
+   checkFs(outDir, 1, 2, 0, 0);
+
+   testHarness.notifyOfCompletedCheckpoint(0);
+   checkFs(outDir, 1, 0, 2, 0);
+
+   OperatorStateHandles snapshot = testHarness.snapshot(1, 0);
+
+   testHarness.close();
+   checkFs(outDir, 0, 1, 2, 0);
+
+   testHarness = createRescalingTestSink(outDir, 1, 0);
+   testHarness.setup();
+   testHarness.initializeState(snapshot);
+   testHarness.open();
+   checkFs(outDir, 0, 0, 3, 1);
+
+   snapshot = testHarness.snapshot(2, 0);
+
+   testHarness.processElement(new StreamRecord<>("test4", 10));
+   checkFs(outDir, 1, 0, 3, 1);
+
+   testHarness = createRescalingTestSink(outDir, 1, 0);
+   testHarness.setup();
+   testHarness.initializeState(snapshot);
+   testHarness.open();
+
+   // the in-progress file remains as we do not clean up now
+   checkFs(outDir, 1, 0, 3, 1);
+
+   testHarness.close();
+
+   // at close it is not moved to final because it is not part
+   // of the current task's state, it was just a not cleaned up 
leftover.
+   checkFs(outDir, 1, 0, 3, 1);
+   }
+
+   @Test
+   public void testScalingDown() throws Exception {
+   final File outDir = tempFolder.newFolder();
+
+   OneInputStreamOperatorTestHarness testHarness1 
= createRescalingTestSink(outDir, 3, 0);
+   testHarness1.setup();
+   testHarness1.open();
+
+   OneInputStreamOperatorTestHarness testHarness2 
= createRescalingTestSink(outDir, 3, 1);
+   testHarness2.setup();
+   testHarness2.open();
+
+   OneInputStreamOperatorTestHarness testHarness3 
= createRescalingTestSink(outDir, 3, 2);
+   testHarness3.setup();
+   testHarness3.open();
+
+   testHarness1.processElement(new StreamRecord<>("test1", 0L));
+   checkFs(outDir, 1, 0, 0, 0);
+
+   testHarness2.processElement(new StreamRecord<>("test2", 0L));
+   checkFs(outDir, 2, 0, 0, 0);
+
+   testHarness3.processElement(new StreamRecord<>("test3", 0L));
+   testHarness3.processElement(new StreamRecord<>("test4", 0L));
+   checkFs(outDir, 3, 1, 0, 0);
+
+   // intentionally we snapshot them in the reverse order so that 
the states are shuffled
+   OperatorStateHandles mergedSnapshot = 
AbstractStreamOperatorTestHarness.repackageState(
+   testHarness3.snapshot(0, 0),
+   testHarness1.snapshot(0, 0),
+   testHarness2.snapshot(0, 0)
+   );
+
+   testHarness1 = createRescalingTestSink(outDir, 2, 0);
+   testHarness1.setup();
+   testHarness1.initializeState(mergedSnapshot);
+   testHarness1.open();
+
+   // because we do not have a pending for part-2-0
--- End diff --

this comments needs some context; as in `X happened "because we do not have 
a pending for part-2-0"`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the fe

[GitHub] flink pull request #2845: [FLINK-5096] Make the RollingSink rescalable.

2016-11-21 Thread kl0u
GitHub user kl0u opened a pull request:

https://github.com/apache/flink/pull/2845

[FLINK-5096] Make the RollingSink rescalable.

This PR integrates the RollingSink with the new state abstractions, so that 
the sink can 
resume execution from a checkpoint, with different parallelism, without 
reducing the provided
exactly once guarantees.

R @zentol 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kl0u/flink rolling-ref

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2845.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2845


commit e581d082740ec98ea47f0321599f1d18deede9db
Author: kl0u 
Date:   2016-11-10T15:57:20Z

[FLINK-5096] Make the RollingSink rescalable.

Integrates the RollingSink with the new state abstractions so
that its parallelism can change after resuming execution from
a savepoint.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---