[jira] [Commented] (FLINK-5020) Make the GenericWriteAheadSink rescalable.

2016-12-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15732139#comment-15732139
 ] 

ASF GitHub Bot commented on FLINK-5020:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2759


> Make the GenericWriteAheadSink rescalable.
> --
>
> Key: FLINK-5020
> URL: https://issues.apache.org/jira/browse/FLINK-5020
> Project: Flink
>  Issue Type: Improvement
>  Components: Cassandra Connector
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.2.0
>
>
> This targets integrating the GenericWriteAheadSink with the new rescalable 
> state abstractions so that the parallelism of the operator can change 
> arbitrarily without jeopardizing the guarantees offered by it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5020) Make the GenericWriteAheadSink rescalable.

2016-12-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15731904#comment-15731904
 ] 

ASF GitHub Bot commented on FLINK-5020:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2759
  
merging.


> Make the GenericWriteAheadSink rescalable.
> --
>
> Key: FLINK-5020
> URL: https://issues.apache.org/jira/browse/FLINK-5020
> Project: Flink
>  Issue Type: Improvement
>  Components: Cassandra Connector
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.2.0
>
>
> This targets integrating the GenericWriteAheadSink with the new rescalable 
> state abstractions so that the parallelism of the operator can change 
> arbitrarily without jeopardizing the guarantees offered by it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5020) Make the GenericWriteAheadSink rescalable.

2016-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15650536#comment-15650536
 ] 

ASF GitHub Bot commented on FLINK-5020:
---

Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/2759
  
Hi @zentol . I just rebased to the master, as there were conflicts.
After travis passes could you have a look and merge this if there are no 
more comments?



> Make the GenericWriteAheadSink rescalable.
> --
>
> Key: FLINK-5020
> URL: https://issues.apache.org/jira/browse/FLINK-5020
> Project: Flink
>  Issue Type: Improvement
>  Components: Cassandra Connector
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.2.0
>
>
> This targets integrating the GenericWriteAheadSink with the new rescalable 
> state abstractions so that the parallelism of the operator can change 
> arbitrarily without jeopardizing the guarantees offered by it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5020) Make the GenericWriteAheadSink rescalable.

2016-11-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15644032#comment-15644032
 ] 

ASF GitHub Bot commented on FLINK-5020:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2759
  
have you tried the cassandra sink in an actual job?


> Make the GenericWriteAheadSink rescalable.
> --
>
> Key: FLINK-5020
> URL: https://issues.apache.org/jira/browse/FLINK-5020
> Project: Flink
>  Issue Type: Improvement
>  Components: Cassandra Connector
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.2.0
>
>
> This targets integrating the GenericWriteAheadSink with the new rescalable 
> state abstractions so that the parallelism of the operator can change 
> arbitrarily without jeopardizing the guarantees offered by it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5020) Make the GenericWriteAheadSink rescalable.

2016-11-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15643923#comment-15643923
 ] 

ASF GitHub Bot commented on FLINK-5020:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2759#discussion_r86757277
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java
 ---
@@ -158,17 +160,151 @@ public void testDataDiscardingUponRestore() throws 
Exception {
testHarness = new OneInputStreamOperatorTestHarness<>(sink);
 
testHarness.setup();
-   testHarness.restore(latestSnapshot);
+   testHarness.initializeState(latestSnapshot);
testHarness.open();
 
for (int x = 0; x < 20; x++) {
testHarness.processElement(new 
StreamRecord<>(generateValue(elementCounter, 2)));
elementCounter++;
}
 
-   testHarness.snapshotLegacy(snapshotCount++, 0);
+   testHarness.snapshot(snapshotCount++, 0);
testHarness.notifyOfCompletedCheckpoint(snapshotCount - 1);
 
-   verifyResultsDataDiscardingUponRestore(testHarness, sink);
+   verifyResultsDataDiscardingUponRestore(sink);
+   }
+
+   @Test
+   public void testScalingDown() throws Exception {
+   S sink1 = createSink();
+   OneInputStreamOperatorTestHarness testHarness1 =
+   new OneInputStreamOperatorTestHarness<>(sink1, 10, 2, 
0);
+   testHarness1.open();
+
+   S sink2 = createSink();
+   OneInputStreamOperatorTestHarness testHarness2 =
+   new OneInputStreamOperatorTestHarness<>(sink2, 10, 2, 
1);
+   testHarness2.open();
+
+   int elementCounter = 1;
--- End diff --

leave it as it is then


> Make the GenericWriteAheadSink rescalable.
> --
>
> Key: FLINK-5020
> URL: https://issues.apache.org/jira/browse/FLINK-5020
> Project: Flink
>  Issue Type: Improvement
>  Components: Cassandra Connector
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.2.0
>
>
> This targets integrating the GenericWriteAheadSink with the new rescalable 
> state abstractions so that the parallelism of the operator can change 
> arbitrarily without jeopardizing the guarantees offered by it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5020) Make the GenericWriteAheadSink rescalable.

2016-11-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15643920#comment-15643920
 ] 

ASF GitHub Bot commented on FLINK-5020:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/2759#discussion_r86757109
  
--- Diff: 
flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 ---
@@ -318,6 +312,46 @@ protected void verifyResultsDataDiscardingUponRestore(
Assert.assertTrue("The following ID's were not found in the 
ResultSet: " + list.toString(), list.isEmpty());
}
 
+   @Override
+   protected void 
verifyResultsWhenScalingDown(CassandraTupleWriteAheadSink> sink) throws Exception {
+
+   ArrayList list = new ArrayList<>();
+   for (int x = 1; x < 34; x++) {
+   list.add(x);
+   }
+
+   ResultSet result = session.execute(SELECT_DATA_QUERY);
+   for (Row s : result) {
+   list.remove(new Integer(s.getInt("counter")));
--- End diff --

Yes, this will be added.


> Make the GenericWriteAheadSink rescalable.
> --
>
> Key: FLINK-5020
> URL: https://issues.apache.org/jira/browse/FLINK-5020
> Project: Flink
>  Issue Type: Improvement
>  Components: Cassandra Connector
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.2.0
>
>
> This targets integrating the GenericWriteAheadSink with the new rescalable 
> state abstractions so that the parallelism of the operator can change 
> arbitrarily without jeopardizing the guarantees offered by it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5020) Make the GenericWriteAheadSink rescalable.

2016-11-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15643919#comment-15643919
 ] 

ASF GitHub Bot commented on FLINK-5020:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2759#discussion_r86757078
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
 ---
@@ -300,5 +294,10 @@ public int hashCode() {
hash = 31 * hash + (int) (timestamp ^ (timestamp >>> 
32));
return hash;
}
+
+   @Override
+   public String toString() {
+   return "Pending Checkpoint: id=" + checkpointId +"/"+ 
subtaskId +"@"+ timestamp;
--- End diff --

missing spaces around `+`


> Make the GenericWriteAheadSink rescalable.
> --
>
> Key: FLINK-5020
> URL: https://issues.apache.org/jira/browse/FLINK-5020
> Project: Flink
>  Issue Type: Improvement
>  Components: Cassandra Connector
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.2.0
>
>
> This targets integrating the GenericWriteAheadSink with the new rescalable 
> state abstractions so that the parallelism of the operator can change 
> arbitrarily without jeopardizing the guarantees offered by it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5020) Make the GenericWriteAheadSink rescalable.

2016-11-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15643916#comment-15643916
 ] 

ASF GitHub Bot commented on FLINK-5020:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2759#discussion_r86756773
  
--- Diff: 
flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 ---
@@ -318,6 +312,46 @@ protected void verifyResultsDataDiscardingUponRestore(
Assert.assertTrue("The following ID's were not found in the 
ResultSet: " + list.toString(), list.isEmpty());
}
 
+   @Override
+   protected void 
verifyResultsWhenScalingDown(CassandraTupleWriteAheadSink> sink) throws Exception {
+
+   ArrayList list = new ArrayList<>();
+   for (int x = 1; x < 34; x++) {
+   list.add(x);
+   }
+
+   ResultSet result = session.execute(SELECT_DATA_QUERY);
+   for (Row s : result) {
+   list.remove(new Integer(s.getInt("counter")));
+   }
+   Assert.assertTrue(list.isEmpty());
+   }
+
+   @Override
+   protected void verifyResultsWhenScalingUp(
+   CassandraTupleWriteAheadSink> 
sink, int startElementCounter, int endElementCounter) {
+
+   // IMPORTANT NOTE:
+   //
+   // for cassandra we always have to start from 1 because
+   // all operators will share the same final db
+
+   ArrayList list = new ArrayList<>();
+   for (int i = 1; i <= endElementCounter; i++) {
+   list.add(i);
+   }
+
+   ArrayList expected = new ArrayList<>();
+   ResultSet result = session.execute(SELECT_DATA_QUERY);
+   for (Row s : result) {
+   expected.add(s.getInt("counter"));
+   }
+
+   Collections.sort(expected);
+
+   Assert.assertArrayEquals(list.toArray(), expected.toArray());
--- End diff --

why does this test follow a different approach to testing than 
`verifyResultsWhenScalingDown`?


> Make the GenericWriteAheadSink rescalable.
> --
>
> Key: FLINK-5020
> URL: https://issues.apache.org/jira/browse/FLINK-5020
> Project: Flink
>  Issue Type: Improvement
>  Components: Cassandra Connector
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.2.0
>
>
> This targets integrating the GenericWriteAheadSink with the new rescalable 
> state abstractions so that the parallelism of the operator can change 
> arbitrarily without jeopardizing the guarantees offered by it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5020) Make the GenericWriteAheadSink rescalable.

2016-11-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15643905#comment-15643905
 ] 

ASF GitHub Bot commented on FLINK-5020:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2759#discussion_r86755620
  
--- Diff: 
flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 ---
@@ -318,6 +312,46 @@ protected void verifyResultsDataDiscardingUponRestore(
Assert.assertTrue("The following ID's were not found in the 
ResultSet: " + list.toString(), list.isEmpty());
}
 
+   @Override
+   protected void 
verifyResultsWhenScalingDown(CassandraTupleWriteAheadSink> sink) throws Exception {
+
+   ArrayList list = new ArrayList<>();
+   for (int x = 1; x < 34; x++) {
+   list.add(x);
+   }
+
+   ResultSet result = session.execute(SELECT_DATA_QUERY);
+   for (Row s : result) {
+   list.remove(new Integer(s.getInt("counter")));
+   }
+   Assert.assertTrue(list.isEmpty());
+   }
+
+   @Override
+   protected void verifyResultsWhenScalingUp(
+   CassandraTupleWriteAheadSink> 
sink, int startElementCounter, int endElementCounter) {
+
+   // IMPORTANT NOTE:
+   //
+   // for cassandra we always have to start from 1 because
+   // all operators will share the same final db
+
+   ArrayList list = new ArrayList<>();
+   for (int i = 1; i <= endElementCounter; i++) {
+   list.add(i);
+   }
+
+   ArrayList expected = new ArrayList<>();
--- End diff --

this list does not contain the "expected" values.


> Make the GenericWriteAheadSink rescalable.
> --
>
> Key: FLINK-5020
> URL: https://issues.apache.org/jira/browse/FLINK-5020
> Project: Flink
>  Issue Type: Improvement
>  Components: Cassandra Connector
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.2.0
>
>
> This targets integrating the GenericWriteAheadSink with the new rescalable 
> state abstractions so that the parallelism of the operator can change 
> arbitrarily without jeopardizing the guarantees offered by it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5020) Make the GenericWriteAheadSink rescalable.

2016-11-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15643900#comment-15643900
 ] 

ASF GitHub Bot commented on FLINK-5020:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2759#discussion_r86754225
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java
 ---
@@ -158,17 +160,151 @@ public void testDataDiscardingUponRestore() throws 
Exception {
testHarness = new OneInputStreamOperatorTestHarness<>(sink);
 
testHarness.setup();
-   testHarness.restore(latestSnapshot);
+   testHarness.initializeState(latestSnapshot);
testHarness.open();
 
for (int x = 0; x < 20; x++) {
testHarness.processElement(new 
StreamRecord<>(generateValue(elementCounter, 2)));
elementCounter++;
}
 
-   testHarness.snapshotLegacy(snapshotCount++, 0);
+   testHarness.snapshot(snapshotCount++, 0);
testHarness.notifyOfCompletedCheckpoint(snapshotCount - 1);
 
-   verifyResultsDataDiscardingUponRestore(testHarness, sink);
+   verifyResultsDataDiscardingUponRestore(sink);
+   }
+
+   @Test
+   public void testScalingDown() throws Exception {
+   S sink1 = createSink();
+   OneInputStreamOperatorTestHarness testHarness1 =
+   new OneInputStreamOperatorTestHarness<>(sink1, 10, 2, 
0);
+   testHarness1.open();
+
+   S sink2 = createSink();
+   OneInputStreamOperatorTestHarness testHarness2 =
+   new OneInputStreamOperatorTestHarness<>(sink2, 10, 2, 
1);
+   testHarness2.open();
+
+   int elementCounter = 1;
+   int snapshotCount = 0;
+
+   for (int x = 0; x < 10; x++) {
+   testHarness1.processElement(new 
StreamRecord<>(generateValue(elementCounter, 0)));
+   elementCounter++;
+   }
+
+   for (int x = 0; x < 11; x++) {
+   testHarness2.processElement(new 
StreamRecord<>(generateValue(elementCounter, 0)));
+   elementCounter++;
+   }
+
+   // snapshot at checkpoint 0 for testHarness1 and testHarness 2
+   OperatorStateHandles snapshot1 = 
testHarness1.snapshot(snapshotCount, 0);
+   OperatorStateHandles snapshot2 = 
testHarness2.snapshot(snapshotCount, 0);
+
+   // merge the two partial states
+   OperatorStateHandles mergedSnapshot = 
AbstractStreamOperatorTestHarness
+   .repackageState(snapshot1, snapshot2);
+
+   testHarness1.close();
+   testHarness2.close();
+
+   // and create a third instance that operates alone but
+   // has the merged state of the previous 2 instances
+
+   S sink3 = createSink();
+   OneInputStreamOperatorTestHarness mergedTestHarness =
+   new OneInputStreamOperatorTestHarness<>(sink3, 10, 1, 
0);
+
+   mergedTestHarness.setup();
+   mergedTestHarness.initializeState(mergedSnapshot);
+   mergedTestHarness.open();
+
+   for (int x = 0; x < 12; x++) {
+   mergedTestHarness.processElement(new 
StreamRecord<>(generateValue(elementCounter, 0)));
+   elementCounter++;
+   }
+
+   snapshotCount++;
+   mergedTestHarness.snapshot(snapshotCount, 1);
+   mergedTestHarness.notifyOfCompletedCheckpoint(snapshotCount);
+
+   verifyResultsWhenScalingDown(sink3);
+   mergedTestHarness.close();
+   }
+
+   @Test
+   public void testScalingUp() throws Exception {
+
+   S sink1 = createSink();
+   OneInputStreamOperatorTestHarness testHarness1 =
+   new OneInputStreamOperatorTestHarness<>(sink1, 10, 1, 
0);
+
+   int elementCounter = 1;
+   int snapshotCount = 0;
+
+   testHarness1.open();
+
+   // put two more checkpoints as pending
+
+   for (int x = 0; x < 10; x++) {
+   testHarness1.processElement(new 
StreamRecord<>(generateValue(elementCounter, 0)));
+   elementCounter++;
+   }
+   testHarness1.snapshot(++snapshotCount, 0);
+
+   for (int x = 0; x < 11; x++) {
+   testHarness1.processElement(new 
StreamRecord<>(generateValue(elementCounter, 

[jira] [Commented] (FLINK-5020) Make the GenericWriteAheadSink rescalable.

2016-11-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15643862#comment-15643862
 ] 

ASF GitHub Bot commented on FLINK-5020:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2759#discussion_r86753077
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java
 ---
@@ -158,17 +160,151 @@ public void testDataDiscardingUponRestore() throws 
Exception {
testHarness = new OneInputStreamOperatorTestHarness<>(sink);
 
testHarness.setup();
-   testHarness.restore(latestSnapshot);
+   testHarness.initializeState(latestSnapshot);
testHarness.open();
 
for (int x = 0; x < 20; x++) {
testHarness.processElement(new 
StreamRecord<>(generateValue(elementCounter, 2)));
elementCounter++;
}
 
-   testHarness.snapshotLegacy(snapshotCount++, 0);
+   testHarness.snapshot(snapshotCount++, 0);
testHarness.notifyOfCompletedCheckpoint(snapshotCount - 1);
 
-   verifyResultsDataDiscardingUponRestore(testHarness, sink);
+   verifyResultsDataDiscardingUponRestore(sink);
+   }
+
+   @Test
+   public void testScalingDown() throws Exception {
+   S sink1 = createSink();
+   OneInputStreamOperatorTestHarness testHarness1 =
+   new OneInputStreamOperatorTestHarness<>(sink1, 10, 2, 
0);
+   testHarness1.open();
+
+   S sink2 = createSink();
+   OneInputStreamOperatorTestHarness testHarness2 =
+   new OneInputStreamOperatorTestHarness<>(sink2, 10, 2, 
1);
+   testHarness2.open();
+
+   int elementCounter = 1;
--- End diff --

we should let this start at 0; i just ended up double-checking that the 
count add up since the very loop goes to 34 even though we only write 33 
elements.


> Make the GenericWriteAheadSink rescalable.
> --
>
> Key: FLINK-5020
> URL: https://issues.apache.org/jira/browse/FLINK-5020
> Project: Flink
>  Issue Type: Improvement
>  Components: Cassandra Connector
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.2.0
>
>
> This targets integrating the GenericWriteAheadSink with the new rescalable 
> state abstractions so that the parallelism of the operator can change 
> arbitrarily without jeopardizing the guarantees offered by it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5020) Make the GenericWriteAheadSink rescalable.

2016-11-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15637222#comment-15637222
 ] 

ASF GitHub Bot commented on FLINK-5020:
---

GitHub user kl0u opened a pull request:

https://github.com/apache/flink/pull/2759

[FLINK-5020] Make the GenericWriteAheadSink rescalable.

As the issue suggests, this is the final commit to make the 
GenericWriteAheadSink re-scalable.
In essence, what it does is that it replaces the old snapshot()/restore() 
cycle with the new snapshotState()/initializeState() and adds tests to show 
that it works as expected.

R: @zentol 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kl0u/flink rescaling_wr_ahead

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2759.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2759


commit 1cdfff0f56a77616f36addfa1cb48b2efe2c9626
Author: kl0u 
Date:   2016-10-26T15:19:12Z

[FLINK-4939] GenericWriteAheadSink: Decouple the creating from the 
committing subtask for a pending checkpoint

So far the GenericWriteAheadSink expected that
the subtask that wrote a temporary buffer to the
state backend, will be also the one to commit it to
the third-party storage system.

This commit removes this assumption. To do this
it changes the CheckpointCommitter to dynamically
take the subtaskIdx as a parameter when asking
if a checkpoint was committed and also changes the
state kept by the GenericWriteAheadSink to also
include that subtask index of the subtask that wrote
the pending buffer.

commit 77c3892687f78780c19710a95f8830907fe67c86
Author: kl0u 
Date:   2016-11-03T11:28:37Z

Integrated PR comments

commit c8627568c375cdd37ac0a8314fc5bf56077a578e
Author: kl0u 
Date:   2016-11-04T15:05:19Z

Removes redundant safety check.

commit 2b9c28059e84ba09068ed0350680730f77119253
Author: kl0u 
Date:   2016-11-03T20:46:58Z

[FLINK-5020] Make the GenericWriteAheadSink rescalable.




> Make the GenericWriteAheadSink rescalable.
> --
>
> Key: FLINK-5020
> URL: https://issues.apache.org/jira/browse/FLINK-5020
> Project: Flink
>  Issue Type: Improvement
>  Components: Cassandra Connector
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.2.0
>
>
> This targets integrating the GenericWriteAheadSink with the new rescalable 
> state abstractions so that the parallelism of the operator can change 
> arbitrarily without jeopardizing the guarantees offered by it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)