[jira] [Commented] (FLINK-5096) Make the RollingSink rescalable.

2016-11-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15699567#comment-15699567
 ] 

ASF GitHub Bot commented on FLINK-5096:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2845


> Make the RollingSink rescalable.
> 
>
> Key: FLINK-5096
> URL: https://issues.apache.org/jira/browse/FLINK-5096
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.2.0
>
>
> Integrate the RollingSink with the new state abstractions so that its 
> parallelism can change after restoring from a savepoint.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5096) Make the RollingSink rescalable.

2016-11-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15695816#comment-15695816
 ] 

ASF GitHub Bot commented on FLINK-5096:
---

Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/2845
  
Thanks!



> Make the RollingSink rescalable.
> 
>
> Key: FLINK-5096
> URL: https://issues.apache.org/jira/browse/FLINK-5096
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.2.0
>
>
> Integrate the RollingSink with the new state abstractions so that its 
> parallelism can change after restoring from a savepoint.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5096) Make the RollingSink rescalable.

2016-11-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15695815#comment-15695815
 ] 

ASF GitHub Bot commented on FLINK-5096:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2845
  
Great, will start merging this.


> Make the RollingSink rescalable.
> 
>
> Key: FLINK-5096
> URL: https://issues.apache.org/jira/browse/FLINK-5096
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.2.0
>
>
> Integrate the RollingSink with the new state abstractions so that its 
> parallelism can change after restoring from a savepoint.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5096) Make the RollingSink rescalable.

2016-11-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15695628#comment-15695628
 ] 

ASF GitHub Bot commented on FLINK-5096:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/2845#discussion_r89603474
  
--- Diff: 
flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
 ---
@@ -638,6 +644,239 @@ public void flatMap(Tuple2 value,
Assert.assertEquals(8, numFiles);
}
 
+   private static final String PART_PREFIX = "part";
+   private static final String PENDING_SUFFIX = ".pending";
+   private static final String IN_PROGRESS_SUFFIX = ".in-progress";
+   private static final String VALID_LENGTH_SUFFIX = ".valid";
+
+   @Test
+   public void testBucketStateTransitions() throws Exception {
+   final File outDir = tempFolder.newFolder();
+
+   OneInputStreamOperatorTestHarness testHarness = 
createRescalingTestSink(outDir, 1, 0);
+   testHarness.setup();
+   testHarness.open();
+
+   testHarness.setProcessingTime(0L);
+
+   // we have a bucket size of 5 bytes, so each record will get 
its own bucket,
+   // i.e. the bucket should roll after every record.
+
+   testHarness.processElement(new StreamRecord<>("test1", 1L));
+   testHarness.processElement(new StreamRecord<>("test2", 1L));
+   checkFs(outDir, 1, 1 ,0, 0);
+
+   testHarness.processElement(new StreamRecord<>("test3", 1L));
+   checkFs(outDir, 1, 2, 0, 0);
+
+   testHarness.snapshot(0, 0);
+   checkFs(outDir, 1, 2, 0, 0);
+
+   testHarness.notifyOfCompletedCheckpoint(0);
+   checkFs(outDir, 1, 0, 2, 0);
+
+   OperatorStateHandles snapshot = testHarness.snapshot(1, 0);
+
+   testHarness.close();
+   checkFs(outDir, 0, 1, 2, 0);
+
+   testHarness = createRescalingTestSink(outDir, 1, 0);
+   testHarness.setup();
+   testHarness.initializeState(snapshot);
+   testHarness.open();
+   checkFs(outDir, 0, 0, 3, 1);
+
+   snapshot = testHarness.snapshot(2, 0);
+
+   testHarness.processElement(new StreamRecord<>("test4", 10));
+   checkFs(outDir, 1, 0, 3, 1);
+
+   testHarness = createRescalingTestSink(outDir, 1, 0);
+   testHarness.setup();
+   testHarness.initializeState(snapshot);
+   testHarness.open();
+
+   // the in-progress file remains as we do not clean up now
+   checkFs(outDir, 1, 0, 3, 1);
+
+   testHarness.close();
+
+   // at close it is not moved to final because it is not part
+   // of the current task's state, it was just a not cleaned up 
leftover.
+   checkFs(outDir, 1, 0, 3, 1);
+   }
+
+   @Test
+   public void testScalingDown() throws Exception {
+   final File outDir = tempFolder.newFolder();
+
+   OneInputStreamOperatorTestHarness testHarness1 
= createRescalingTestSink(outDir, 3, 0);
+   testHarness1.setup();
+   testHarness1.open();
+
+   OneInputStreamOperatorTestHarness testHarness2 
= createRescalingTestSink(outDir, 3, 1);
+   testHarness2.setup();
+   testHarness2.open();
+
+   OneInputStreamOperatorTestHarness testHarness3 
= createRescalingTestSink(outDir, 3, 2);
+   testHarness3.setup();
+   testHarness3.open();
+
+   testHarness1.processElement(new StreamRecord<>("test1", 0L));
+   checkFs(outDir, 1, 0, 0, 0);
+
+   testHarness2.processElement(new StreamRecord<>("test2", 0L));
+   checkFs(outDir, 2, 0, 0, 0);
+
+   testHarness3.processElement(new StreamRecord<>("test3", 0L));
+   testHarness3.processElement(new StreamRecord<>("test4", 0L));
+   checkFs(outDir, 3, 1, 0, 0);
+
+   // intentionally we snapshot them in a not ascending order so 
that the states are shuffled
+   OperatorStateHandles mergedSnapshot = 
AbstractStreamOperatorTestHarness.repackageState(
+   testHarness3.snapshot(0, 0),
+   testHarness1.snapshot(0, 0),
+   testHarness2.snapshot(0, 0)
+   );
+
+   //with the above state reshuffling, we expect the new 
testHarness1
+   // to take the state of the previous 

[jira] [Commented] (FLINK-5096) Make the RollingSink rescalable.

2016-11-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15695629#comment-15695629
 ] 

ASF GitHub Bot commented on FLINK-5096:
---

Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/2845
  
Hi @zentol ! Thanks for the review. I integrated the comments. 


> Make the RollingSink rescalable.
> 
>
> Key: FLINK-5096
> URL: https://issues.apache.org/jira/browse/FLINK-5096
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.2.0
>
>
> Integrate the RollingSink with the new state abstractions so that its 
> parallelism can change after restoring from a savepoint.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5096) Make the RollingSink rescalable.

2016-11-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15695561#comment-15695561
 ] 

ASF GitHub Bot commented on FLINK-5096:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2845
  
Just 2 more comments; i can also fix them while merging if you agree with 
them.


> Make the RollingSink rescalable.
> 
>
> Key: FLINK-5096
> URL: https://issues.apache.org/jira/browse/FLINK-5096
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.2.0
>
>
> Integrate the RollingSink with the new state abstractions so that its 
> parallelism can change after restoring from a savepoint.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5096) Make the RollingSink rescalable.

2016-11-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15695560#comment-15695560
 ] 

ASF GitHub Bot commented on FLINK-5096:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2845#discussion_r89597858
  
--- Diff: 
flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
 ---
@@ -638,6 +644,239 @@ public void flatMap(Tuple2 value,
Assert.assertEquals(8, numFiles);
}
 
+   private static final String PART_PREFIX = "part";
+   private static final String PENDING_SUFFIX = ".pending";
+   private static final String IN_PROGRESS_SUFFIX = ".in-progress";
+   private static final String VALID_LENGTH_SUFFIX = ".valid";
+
+   @Test
+   public void testBucketStateTransitions() throws Exception {
+   final File outDir = tempFolder.newFolder();
+
+   OneInputStreamOperatorTestHarness testHarness = 
createRescalingTestSink(outDir, 1, 0);
+   testHarness.setup();
+   testHarness.open();
+
+   testHarness.setProcessingTime(0L);
+
+   // we have a bucket size of 5 bytes, so each record will get 
its own bucket,
+   // i.e. the bucket should roll after every record.
+
+   testHarness.processElement(new StreamRecord<>("test1", 1L));
+   testHarness.processElement(new StreamRecord<>("test2", 1L));
+   checkFs(outDir, 1, 1 ,0, 0);
+
+   testHarness.processElement(new StreamRecord<>("test3", 1L));
+   checkFs(outDir, 1, 2, 0, 0);
+
+   testHarness.snapshot(0, 0);
+   checkFs(outDir, 1, 2, 0, 0);
+
+   testHarness.notifyOfCompletedCheckpoint(0);
+   checkFs(outDir, 1, 0, 2, 0);
+
+   OperatorStateHandles snapshot = testHarness.snapshot(1, 0);
+
+   testHarness.close();
+   checkFs(outDir, 0, 1, 2, 0);
+
+   testHarness = createRescalingTestSink(outDir, 1, 0);
+   testHarness.setup();
+   testHarness.initializeState(snapshot);
+   testHarness.open();
+   checkFs(outDir, 0, 0, 3, 1);
+
+   snapshot = testHarness.snapshot(2, 0);
+
+   testHarness.processElement(new StreamRecord<>("test4", 10));
+   checkFs(outDir, 1, 0, 3, 1);
+
+   testHarness = createRescalingTestSink(outDir, 1, 0);
+   testHarness.setup();
+   testHarness.initializeState(snapshot);
+   testHarness.open();
+
+   // the in-progress file remains as we do not clean up now
+   checkFs(outDir, 1, 0, 3, 1);
+
+   testHarness.close();
+
+   // at close it is not moved to final because it is not part
+   // of the current task's state, it was just a not cleaned up 
leftover.
+   checkFs(outDir, 1, 0, 3, 1);
+   }
+
+   @Test
+   public void testScalingDown() throws Exception {
+   final File outDir = tempFolder.newFolder();
+
+   OneInputStreamOperatorTestHarness testHarness1 
= createRescalingTestSink(outDir, 3, 0);
+   testHarness1.setup();
+   testHarness1.open();
+
+   OneInputStreamOperatorTestHarness testHarness2 
= createRescalingTestSink(outDir, 3, 1);
+   testHarness2.setup();
+   testHarness2.open();
+
+   OneInputStreamOperatorTestHarness testHarness3 
= createRescalingTestSink(outDir, 3, 2);
+   testHarness3.setup();
+   testHarness3.open();
+
+   testHarness1.processElement(new StreamRecord<>("test1", 0L));
+   checkFs(outDir, 1, 0, 0, 0);
+
+   testHarness2.processElement(new StreamRecord<>("test2", 0L));
+   checkFs(outDir, 2, 0, 0, 0);
+
+   testHarness3.processElement(new StreamRecord<>("test3", 0L));
+   testHarness3.processElement(new StreamRecord<>("test4", 0L));
+   checkFs(outDir, 3, 1, 0, 0);
+
+   // intentionally we snapshot them in a not ascending order so 
that the states are shuffled
+   OperatorStateHandles mergedSnapshot = 
AbstractStreamOperatorTestHarness.repackageState(
+   testHarness3.snapshot(0, 0),
+   testHarness1.snapshot(0, 0),
+   testHarness2.snapshot(0, 0)
+   );
+
+   //with the above state reshuffling, we expect the new 
testHarness1
+   // to take the state of the previous 

[jira] [Commented] (FLINK-5096) Make the RollingSink rescalable.

2016-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15693016#comment-15693016
 ] 

ASF GitHub Bot commented on FLINK-5096:
---

Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/2845
  
Thanks for the review @zentol ! I integrated your comments and rebased to 
the master.
Let's see what travis has to say and then merge it if  you have no further 
comments ;)


> Make the RollingSink rescalable.
> 
>
> Key: FLINK-5096
> URL: https://issues.apache.org/jira/browse/FLINK-5096
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.2.0
>
>
> Integrate the RollingSink with the new state abstractions so that its 
> parallelism can change after restoring from a savepoint.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5096) Make the RollingSink rescalable.

2016-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15683864#comment-15683864
 ] 

ASF GitHub Bot commented on FLINK-5096:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2845#discussion_r88909481
  
--- Diff: 
flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
 ---
@@ -638,6 +644,231 @@ public void flatMap(Tuple2 value,
Assert.assertEquals(8, numFiles);
}
 
+   private static final String PART_PREFIX = "part";
+   private static final String PENDING_SUFFIX = ".pending";
+   private static final String IN_PROGRESS_SUFFIX = ".in-progress";
+   private static final String VALID_LENGTH_SUFFIX = ".valid";
+
+   @Test
+   public void testBucketStateTransitions() throws Exception {
+   final File outDir = tempFolder.newFolder();
+
+   OneInputStreamOperatorTestHarness testHarness = 
createRescalingTestSink(outDir, 1, 0);
+   testHarness.setup();
+   testHarness.open();
+
+   testHarness.setProcessingTime(0L);
+
+   testHarness.processElement(new StreamRecord<>("test1", 1L)); // 
we have a bucket size of 5 bytes
+   testHarness.processElement(new StreamRecord<>("test2", 1L));
+   checkFs(outDir, 1, 1 ,0, 0);
+
+   testHarness.processElement(new StreamRecord<>("test3", 1L));
+   checkFs(outDir, 1, 2, 0, 0);
+
+   testHarness.snapshot(0, 0);
+   checkFs(outDir, 1, 2, 0, 0);
+
+   testHarness.notifyOfCompletedCheckpoint(0);
+   checkFs(outDir, 1, 0, 2, 0);
+
+   OperatorStateHandles snapshot = testHarness.snapshot(1, 0);
+
+   testHarness.close();
+   checkFs(outDir, 0, 1, 2, 0);
+
+   testHarness = createRescalingTestSink(outDir, 1, 0);
+   testHarness.setup();
+   testHarness.initializeState(snapshot);
+   testHarness.open();
+   checkFs(outDir, 0, 0, 3, 1);
+
+   snapshot = testHarness.snapshot(2, 0);
+
+   testHarness.processElement(new StreamRecord<>("test4", 10));
+   checkFs(outDir, 1, 0, 3, 1);
+
+   testHarness = createRescalingTestSink(outDir, 1, 0);
+   testHarness.setup();
+   testHarness.initializeState(snapshot);
+   testHarness.open();
+
+   // the in-progress file remains as we do not clean up now
+   checkFs(outDir, 1, 0, 3, 1);
+
+   testHarness.close();
+
+   // at close it is not moved to final because it is not part
+   // of the current task's state, it was just a not cleaned up 
leftover.
+   checkFs(outDir, 1, 0, 3, 1);
+   }
+
+   @Test
+   public void testScalingDown() throws Exception {
+   final File outDir = tempFolder.newFolder();
+
+   OneInputStreamOperatorTestHarness testHarness1 
= createRescalingTestSink(outDir, 3, 0);
+   testHarness1.setup();
+   testHarness1.open();
+
+   OneInputStreamOperatorTestHarness testHarness2 
= createRescalingTestSink(outDir, 3, 1);
+   testHarness2.setup();
+   testHarness2.open();
+
+   OneInputStreamOperatorTestHarness testHarness3 
= createRescalingTestSink(outDir, 3, 2);
+   testHarness3.setup();
+   testHarness3.open();
+
+   testHarness1.processElement(new StreamRecord<>("test1", 0L));
+   checkFs(outDir, 1, 0, 0, 0);
+
+   testHarness2.processElement(new StreamRecord<>("test2", 0L));
+   checkFs(outDir, 2, 0, 0, 0);
+
+   testHarness3.processElement(new StreamRecord<>("test3", 0L));
+   testHarness3.processElement(new StreamRecord<>("test4", 0L));
+   checkFs(outDir, 3, 1, 0, 0);
+
+   // intentionally we snapshot them in the reverse order so that 
the states are shuffled
--- End diff --

3->1->2 is not a reverse order


> Make the RollingSink rescalable.
> 
>
> Key: FLINK-5096
> URL: https://issues.apache.org/jira/browse/FLINK-5096
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.2.0
>
>
> Integrate the RollingSink with the new state abstractions so that its 
> parallelism can change after 

[jira] [Commented] (FLINK-5096) Make the RollingSink rescalable.

2016-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15683867#comment-15683867
 ] 

ASF GitHub Bot commented on FLINK-5096:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2845#discussion_r88915712
  
--- Diff: 
flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
 ---
@@ -638,6 +644,231 @@ public void flatMap(Tuple2 value,
Assert.assertEquals(8, numFiles);
}
 
+   private static final String PART_PREFIX = "part";
+   private static final String PENDING_SUFFIX = ".pending";
+   private static final String IN_PROGRESS_SUFFIX = ".in-progress";
+   private static final String VALID_LENGTH_SUFFIX = ".valid";
+
+   @Test
+   public void testBucketStateTransitions() throws Exception {
+   final File outDir = tempFolder.newFolder();
+
+   OneInputStreamOperatorTestHarness testHarness = 
createRescalingTestSink(outDir, 1, 0);
+   testHarness.setup();
+   testHarness.open();
+
+   testHarness.setProcessingTime(0L);
+
+   testHarness.processElement(new StreamRecord<>("test1", 1L)); // 
we have a bucket size of 5 bytes
+   testHarness.processElement(new StreamRecord<>("test2", 1L));
+   checkFs(outDir, 1, 1 ,0, 0);
+
+   testHarness.processElement(new StreamRecord<>("test3", 1L));
+   checkFs(outDir, 1, 2, 0, 0);
+
+   testHarness.snapshot(0, 0);
+   checkFs(outDir, 1, 2, 0, 0);
+
+   testHarness.notifyOfCompletedCheckpoint(0);
+   checkFs(outDir, 1, 0, 2, 0);
+
+   OperatorStateHandles snapshot = testHarness.snapshot(1, 0);
+
+   testHarness.close();
+   checkFs(outDir, 0, 1, 2, 0);
+
+   testHarness = createRescalingTestSink(outDir, 1, 0);
+   testHarness.setup();
+   testHarness.initializeState(snapshot);
+   testHarness.open();
+   checkFs(outDir, 0, 0, 3, 1);
+
+   snapshot = testHarness.snapshot(2, 0);
+
+   testHarness.processElement(new StreamRecord<>("test4", 10));
+   checkFs(outDir, 1, 0, 3, 1);
+
+   testHarness = createRescalingTestSink(outDir, 1, 0);
+   testHarness.setup();
+   testHarness.initializeState(snapshot);
+   testHarness.open();
+
+   // the in-progress file remains as we do not clean up now
+   checkFs(outDir, 1, 0, 3, 1);
+
+   testHarness.close();
+
+   // at close it is not moved to final because it is not part
+   // of the current task's state, it was just a not cleaned up 
leftover.
+   checkFs(outDir, 1, 0, 3, 1);
+   }
+
+   @Test
+   public void testScalingDown() throws Exception {
+   final File outDir = tempFolder.newFolder();
+
+   OneInputStreamOperatorTestHarness testHarness1 
= createRescalingTestSink(outDir, 3, 0);
+   testHarness1.setup();
+   testHarness1.open();
+
+   OneInputStreamOperatorTestHarness testHarness2 
= createRescalingTestSink(outDir, 3, 1);
+   testHarness2.setup();
+   testHarness2.open();
+
+   OneInputStreamOperatorTestHarness testHarness3 
= createRescalingTestSink(outDir, 3, 2);
+   testHarness3.setup();
+   testHarness3.open();
+
+   testHarness1.processElement(new StreamRecord<>("test1", 0L));
+   checkFs(outDir, 1, 0, 0, 0);
+
+   testHarness2.processElement(new StreamRecord<>("test2", 0L));
+   checkFs(outDir, 2, 0, 0, 0);
+
+   testHarness3.processElement(new StreamRecord<>("test3", 0L));
+   testHarness3.processElement(new StreamRecord<>("test4", 0L));
+   checkFs(outDir, 3, 1, 0, 0);
+
+   // intentionally we snapshot them in the reverse order so that 
the states are shuffled
+   OperatorStateHandles mergedSnapshot = 
AbstractStreamOperatorTestHarness.repackageState(
+   testHarness3.snapshot(0, 0),
+   testHarness1.snapshot(0, 0),
+   testHarness2.snapshot(0, 0)
+   );
+
+   testHarness1 = createRescalingTestSink(outDir, 2, 0);
+   testHarness1.setup();
+   testHarness1.initializeState(mergedSnapshot);
+   testHarness1.open();
+
+   // because we do not have a pending for part-2-0
   

[jira] [Commented] (FLINK-5096) Make the RollingSink rescalable.

2016-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15683868#comment-15683868
 ] 

ASF GitHub Bot commented on FLINK-5096:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2845#discussion_r88917138
  
--- Diff: 
flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
 ---
@@ -638,6 +644,231 @@ public void flatMap(Tuple2 value,
Assert.assertEquals(8, numFiles);
}
 
+   private static final String PART_PREFIX = "part";
+   private static final String PENDING_SUFFIX = ".pending";
+   private static final String IN_PROGRESS_SUFFIX = ".in-progress";
+   private static final String VALID_LENGTH_SUFFIX = ".valid";
+
+   @Test
+   public void testBucketStateTransitions() throws Exception {
+   final File outDir = tempFolder.newFolder();
+
+   OneInputStreamOperatorTestHarness testHarness = 
createRescalingTestSink(outDir, 1, 0);
+   testHarness.setup();
+   testHarness.open();
+
+   testHarness.setProcessingTime(0L);
+
+   testHarness.processElement(new StreamRecord<>("test1", 1L)); // 
we have a bucket size of 5 bytes
+   testHarness.processElement(new StreamRecord<>("test2", 1L));
+   checkFs(outDir, 1, 1 ,0, 0);
+
+   testHarness.processElement(new StreamRecord<>("test3", 1L));
+   checkFs(outDir, 1, 2, 0, 0);
+
+   testHarness.snapshot(0, 0);
+   checkFs(outDir, 1, 2, 0, 0);
+
+   testHarness.notifyOfCompletedCheckpoint(0);
+   checkFs(outDir, 1, 0, 2, 0);
+
+   OperatorStateHandles snapshot = testHarness.snapshot(1, 0);
+
+   testHarness.close();
+   checkFs(outDir, 0, 1, 2, 0);
+
+   testHarness = createRescalingTestSink(outDir, 1, 0);
+   testHarness.setup();
+   testHarness.initializeState(snapshot);
+   testHarness.open();
+   checkFs(outDir, 0, 0, 3, 1);
+
+   snapshot = testHarness.snapshot(2, 0);
+
+   testHarness.processElement(new StreamRecord<>("test4", 10));
+   checkFs(outDir, 1, 0, 3, 1);
+
+   testHarness = createRescalingTestSink(outDir, 1, 0);
+   testHarness.setup();
+   testHarness.initializeState(snapshot);
+   testHarness.open();
+
+   // the in-progress file remains as we do not clean up now
+   checkFs(outDir, 1, 0, 3, 1);
+
+   testHarness.close();
+
+   // at close it is not moved to final because it is not part
+   // of the current task's state, it was just a not cleaned up 
leftover.
+   checkFs(outDir, 1, 0, 3, 1);
+   }
+
+   @Test
+   public void testScalingDown() throws Exception {
+   final File outDir = tempFolder.newFolder();
+
+   OneInputStreamOperatorTestHarness testHarness1 
= createRescalingTestSink(outDir, 3, 0);
+   testHarness1.setup();
+   testHarness1.open();
+
+   OneInputStreamOperatorTestHarness testHarness2 
= createRescalingTestSink(outDir, 3, 1);
+   testHarness2.setup();
+   testHarness2.open();
+
+   OneInputStreamOperatorTestHarness testHarness3 
= createRescalingTestSink(outDir, 3, 2);
+   testHarness3.setup();
+   testHarness3.open();
+
+   testHarness1.processElement(new StreamRecord<>("test1", 0L));
+   checkFs(outDir, 1, 0, 0, 0);
+
+   testHarness2.processElement(new StreamRecord<>("test2", 0L));
+   checkFs(outDir, 2, 0, 0, 0);
+
+   testHarness3.processElement(new StreamRecord<>("test3", 0L));
+   testHarness3.processElement(new StreamRecord<>("test4", 0L));
+   checkFs(outDir, 3, 1, 0, 0);
+
+   // intentionally we snapshot them in the reverse order so that 
the states are shuffled
+   OperatorStateHandles mergedSnapshot = 
AbstractStreamOperatorTestHarness.repackageState(
+   testHarness3.snapshot(0, 0),
+   testHarness1.snapshot(0, 0),
+   testHarness2.snapshot(0, 0)
+   );
+
+   testHarness1 = createRescalingTestSink(outDir, 2, 0);
+   testHarness1.setup();
+   testHarness1.initializeState(mergedSnapshot);
+   testHarness1.open();
+
+   // because we do not have a pending for part-2-0
   

[jira] [Commented] (FLINK-5096) Make the RollingSink rescalable.

2016-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15683866#comment-15683866
 ] 

ASF GitHub Bot commented on FLINK-5096:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2845#discussion_r88910876
  
--- Diff: 
flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
 ---
@@ -638,6 +644,231 @@ public void flatMap(Tuple2 value,
Assert.assertEquals(8, numFiles);
}
 
+   private static final String PART_PREFIX = "part";
+   private static final String PENDING_SUFFIX = ".pending";
+   private static final String IN_PROGRESS_SUFFIX = ".in-progress";
+   private static final String VALID_LENGTH_SUFFIX = ".valid";
+
+   @Test
+   public void testBucketStateTransitions() throws Exception {
+   final File outDir = tempFolder.newFolder();
+
+   OneInputStreamOperatorTestHarness testHarness = 
createRescalingTestSink(outDir, 1, 0);
+   testHarness.setup();
+   testHarness.open();
+
+   testHarness.setProcessingTime(0L);
+
+   testHarness.processElement(new StreamRecord<>("test1", 1L)); // 
we have a bucket size of 5 bytes
--- End diff --

the comment should include the conclusion the reader is supposed to arrive 
on: that every record goes in a separate file.


> Make the RollingSink rescalable.
> 
>
> Key: FLINK-5096
> URL: https://issues.apache.org/jira/browse/FLINK-5096
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.2.0
>
>
> Integrate the RollingSink with the new state abstractions so that its 
> parallelism can change after restoring from a savepoint.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5096) Make the RollingSink rescalable.

2016-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15683865#comment-15683865
 ] 

ASF GitHub Bot commented on FLINK-5096:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2845#discussion_r88917513
  
--- Diff: 
flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
 ---
@@ -638,6 +644,231 @@ public void flatMap(Tuple2 value,
Assert.assertEquals(8, numFiles);
}
 
+   private static final String PART_PREFIX = "part";
+   private static final String PENDING_SUFFIX = ".pending";
+   private static final String IN_PROGRESS_SUFFIX = ".in-progress";
+   private static final String VALID_LENGTH_SUFFIX = ".valid";
+
+   @Test
+   public void testBucketStateTransitions() throws Exception {
+   final File outDir = tempFolder.newFolder();
+
+   OneInputStreamOperatorTestHarness testHarness = 
createRescalingTestSink(outDir, 1, 0);
+   testHarness.setup();
+   testHarness.open();
+
+   testHarness.setProcessingTime(0L);
+
+   testHarness.processElement(new StreamRecord<>("test1", 1L)); // 
we have a bucket size of 5 bytes
+   testHarness.processElement(new StreamRecord<>("test2", 1L));
+   checkFs(outDir, 1, 1 ,0, 0);
+
+   testHarness.processElement(new StreamRecord<>("test3", 1L));
+   checkFs(outDir, 1, 2, 0, 0);
+
+   testHarness.snapshot(0, 0);
+   checkFs(outDir, 1, 2, 0, 0);
+
+   testHarness.notifyOfCompletedCheckpoint(0);
+   checkFs(outDir, 1, 0, 2, 0);
+
+   OperatorStateHandles snapshot = testHarness.snapshot(1, 0);
+
+   testHarness.close();
+   checkFs(outDir, 0, 1, 2, 0);
+
+   testHarness = createRescalingTestSink(outDir, 1, 0);
+   testHarness.setup();
+   testHarness.initializeState(snapshot);
+   testHarness.open();
+   checkFs(outDir, 0, 0, 3, 1);
+
+   snapshot = testHarness.snapshot(2, 0);
+
+   testHarness.processElement(new StreamRecord<>("test4", 10));
+   checkFs(outDir, 1, 0, 3, 1);
+
+   testHarness = createRescalingTestSink(outDir, 1, 0);
+   testHarness.setup();
+   testHarness.initializeState(snapshot);
+   testHarness.open();
+
+   // the in-progress file remains as we do not clean up now
+   checkFs(outDir, 1, 0, 3, 1);
+
+   testHarness.close();
+
+   // at close it is not moved to final because it is not part
+   // of the current task's state, it was just a not cleaned up 
leftover.
+   checkFs(outDir, 1, 0, 3, 1);
+   }
+
+   @Test
+   public void testScalingDown() throws Exception {
+   final File outDir = tempFolder.newFolder();
+
+   OneInputStreamOperatorTestHarness testHarness1 
= createRescalingTestSink(outDir, 3, 0);
+   testHarness1.setup();
+   testHarness1.open();
+
+   OneInputStreamOperatorTestHarness testHarness2 
= createRescalingTestSink(outDir, 3, 1);
+   testHarness2.setup();
+   testHarness2.open();
+
+   OneInputStreamOperatorTestHarness testHarness3 
= createRescalingTestSink(outDir, 3, 2);
+   testHarness3.setup();
+   testHarness3.open();
+
+   testHarness1.processElement(new StreamRecord<>("test1", 0L));
+   checkFs(outDir, 1, 0, 0, 0);
+
+   testHarness2.processElement(new StreamRecord<>("test2", 0L));
+   checkFs(outDir, 2, 0, 0, 0);
+
+   testHarness3.processElement(new StreamRecord<>("test3", 0L));
+   testHarness3.processElement(new StreamRecord<>("test4", 0L));
+   checkFs(outDir, 3, 1, 0, 0);
+
+   // intentionally we snapshot them in the reverse order so that 
the states are shuffled
+   OperatorStateHandles mergedSnapshot = 
AbstractStreamOperatorTestHarness.repackageState(
+   testHarness3.snapshot(0, 0),
+   testHarness1.snapshot(0, 0),
+   testHarness2.snapshot(0, 0)
+   );
+
+   testHarness1 = createRescalingTestSink(outDir, 2, 0);
--- End diff --

What happens here would be easier to understand if you would say which sink 
receives which states.


> Make the RollingSink rescalable.
> 
>
>   

[jira] [Commented] (FLINK-5096) Make the RollingSink rescalable.

2016-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15683723#comment-15683723
 ] 

ASF GitHub Bot commented on FLINK-5096:
---

GitHub user kl0u opened a pull request:

https://github.com/apache/flink/pull/2845

[FLINK-5096] Make the RollingSink rescalable.

This PR integrates the RollingSink with the new state abstractions, so that 
the sink can 
resume execution from a checkpoint, with different parallelism, without 
reducing the provided
exactly once guarantees.

R @zentol 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kl0u/flink rolling-ref

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2845.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2845


commit e581d082740ec98ea47f0321599f1d18deede9db
Author: kl0u 
Date:   2016-11-10T15:57:20Z

[FLINK-5096] Make the RollingSink rescalable.

Integrates the RollingSink with the new state abstractions so
that its parallelism can change after resuming execution from
a savepoint.




> Make the RollingSink rescalable.
> 
>
> Key: FLINK-5096
> URL: https://issues.apache.org/jira/browse/FLINK-5096
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.2.0
>
>
> Integrate the RollingSink with the new state abstractions so that its 
> parallelism can change after restoring from a savepoint.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)