[jira] [Commented] (FLINK-5020) Make the GenericWriteAheadSink rescalable.
[ https://issues.apache.org/jira/browse/FLINK-5020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15732139#comment-15732139 ] ASF GitHub Bot commented on FLINK-5020: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2759 > Make the GenericWriteAheadSink rescalable. > -- > > Key: FLINK-5020 > URL: https://issues.apache.org/jira/browse/FLINK-5020 > Project: Flink > Issue Type: Improvement > Components: Cassandra Connector >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.2.0 > > > This targets integrating the GenericWriteAheadSink with the new rescalable > state abstractions so that the parallelism of the operator can change > arbitrarily without jeopardizing the guarantees offered by it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5020) Make the GenericWriteAheadSink rescalable.
[ https://issues.apache.org/jira/browse/FLINK-5020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15731904#comment-15731904 ] ASF GitHub Bot commented on FLINK-5020: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/2759 merging. > Make the GenericWriteAheadSink rescalable. > -- > > Key: FLINK-5020 > URL: https://issues.apache.org/jira/browse/FLINK-5020 > Project: Flink > Issue Type: Improvement > Components: Cassandra Connector >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.2.0 > > > This targets integrating the GenericWriteAheadSink with the new rescalable > state abstractions so that the parallelism of the operator can change > arbitrarily without jeopardizing the guarantees offered by it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5020) Make the GenericWriteAheadSink rescalable.
[ https://issues.apache.org/jira/browse/FLINK-5020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15650536#comment-15650536 ] ASF GitHub Bot commented on FLINK-5020: --- Github user kl0u commented on the issue: https://github.com/apache/flink/pull/2759 Hi @zentol . I just rebased to the master, as there were conflicts. After travis passes could you have a look and merge this if there are no more comments? > Make the GenericWriteAheadSink rescalable. > -- > > Key: FLINK-5020 > URL: https://issues.apache.org/jira/browse/FLINK-5020 > Project: Flink > Issue Type: Improvement > Components: Cassandra Connector >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.2.0 > > > This targets integrating the GenericWriteAheadSink with the new rescalable > state abstractions so that the parallelism of the operator can change > arbitrarily without jeopardizing the guarantees offered by it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5020) Make the GenericWriteAheadSink rescalable.
[ https://issues.apache.org/jira/browse/FLINK-5020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15644032#comment-15644032 ] ASF GitHub Bot commented on FLINK-5020: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/2759 have you tried the cassandra sink in an actual job? > Make the GenericWriteAheadSink rescalable. > -- > > Key: FLINK-5020 > URL: https://issues.apache.org/jira/browse/FLINK-5020 > Project: Flink > Issue Type: Improvement > Components: Cassandra Connector >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.2.0 > > > This targets integrating the GenericWriteAheadSink with the new rescalable > state abstractions so that the parallelism of the operator can change > arbitrarily without jeopardizing the guarantees offered by it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5020) Make the GenericWriteAheadSink rescalable.
[ https://issues.apache.org/jira/browse/FLINK-5020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15643923#comment-15643923 ] ASF GitHub Bot commented on FLINK-5020: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2759#discussion_r86757277 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java --- @@ -158,17 +160,151 @@ public void testDataDiscardingUponRestore() throws Exception { testHarness = new OneInputStreamOperatorTestHarness<>(sink); testHarness.setup(); - testHarness.restore(latestSnapshot); + testHarness.initializeState(latestSnapshot); testHarness.open(); for (int x = 0; x < 20; x++) { testHarness.processElement(new StreamRecord<>(generateValue(elementCounter, 2))); elementCounter++; } - testHarness.snapshotLegacy(snapshotCount++, 0); + testHarness.snapshot(snapshotCount++, 0); testHarness.notifyOfCompletedCheckpoint(snapshotCount - 1); - verifyResultsDataDiscardingUponRestore(testHarness, sink); + verifyResultsDataDiscardingUponRestore(sink); + } + + @Test + public void testScalingDown() throws Exception { + S sink1 = createSink(); + OneInputStreamOperatorTestHarnesstestHarness1 = + new OneInputStreamOperatorTestHarness<>(sink1, 10, 2, 0); + testHarness1.open(); + + S sink2 = createSink(); + OneInputStreamOperatorTestHarness testHarness2 = + new OneInputStreamOperatorTestHarness<>(sink2, 10, 2, 1); + testHarness2.open(); + + int elementCounter = 1; --- End diff -- leave it as it is then > Make the GenericWriteAheadSink rescalable. > -- > > Key: FLINK-5020 > URL: https://issues.apache.org/jira/browse/FLINK-5020 > Project: Flink > Issue Type: Improvement > Components: Cassandra Connector >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.2.0 > > > This targets integrating the GenericWriteAheadSink with the new rescalable > state abstractions so that the parallelism of the operator can change > arbitrarily without jeopardizing the guarantees offered by it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5020) Make the GenericWriteAheadSink rescalable.
[ https://issues.apache.org/jira/browse/FLINK-5020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15643920#comment-15643920 ] ASF GitHub Bot commented on FLINK-5020: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/2759#discussion_r86757109 --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java --- @@ -318,6 +312,46 @@ protected void verifyResultsDataDiscardingUponRestore( Assert.assertTrue("The following ID's were not found in the ResultSet: " + list.toString(), list.isEmpty()); } + @Override + protected void verifyResultsWhenScalingDown(CassandraTupleWriteAheadSink> sink) throws Exception { + + ArrayList list = new ArrayList<>(); + for (int x = 1; x < 34; x++) { + list.add(x); + } + + ResultSet result = session.execute(SELECT_DATA_QUERY); + for (Row s : result) { + list.remove(new Integer(s.getInt("counter"))); --- End diff -- Yes, this will be added. > Make the GenericWriteAheadSink rescalable. > -- > > Key: FLINK-5020 > URL: https://issues.apache.org/jira/browse/FLINK-5020 > Project: Flink > Issue Type: Improvement > Components: Cassandra Connector >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.2.0 > > > This targets integrating the GenericWriteAheadSink with the new rescalable > state abstractions so that the parallelism of the operator can change > arbitrarily without jeopardizing the guarantees offered by it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5020) Make the GenericWriteAheadSink rescalable.
[ https://issues.apache.org/jira/browse/FLINK-5020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15643919#comment-15643919 ] ASF GitHub Bot commented on FLINK-5020: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2759#discussion_r86757078 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java --- @@ -300,5 +294,10 @@ public int hashCode() { hash = 31 * hash + (int) (timestamp ^ (timestamp >>> 32)); return hash; } + + @Override + public String toString() { + return "Pending Checkpoint: id=" + checkpointId +"/"+ subtaskId +"@"+ timestamp; --- End diff -- missing spaces around `+` > Make the GenericWriteAheadSink rescalable. > -- > > Key: FLINK-5020 > URL: https://issues.apache.org/jira/browse/FLINK-5020 > Project: Flink > Issue Type: Improvement > Components: Cassandra Connector >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.2.0 > > > This targets integrating the GenericWriteAheadSink with the new rescalable > state abstractions so that the parallelism of the operator can change > arbitrarily without jeopardizing the guarantees offered by it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5020) Make the GenericWriteAheadSink rescalable.
[ https://issues.apache.org/jira/browse/FLINK-5020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15643916#comment-15643916 ] ASF GitHub Bot commented on FLINK-5020: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2759#discussion_r86756773 --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java --- @@ -318,6 +312,46 @@ protected void verifyResultsDataDiscardingUponRestore( Assert.assertTrue("The following ID's were not found in the ResultSet: " + list.toString(), list.isEmpty()); } + @Override + protected void verifyResultsWhenScalingDown(CassandraTupleWriteAheadSink> sink) throws Exception { + + ArrayList list = new ArrayList<>(); + for (int x = 1; x < 34; x++) { + list.add(x); + } + + ResultSet result = session.execute(SELECT_DATA_QUERY); + for (Row s : result) { + list.remove(new Integer(s.getInt("counter"))); + } + Assert.assertTrue(list.isEmpty()); + } + + @Override + protected void verifyResultsWhenScalingUp( + CassandraTupleWriteAheadSink > sink, int startElementCounter, int endElementCounter) { + + // IMPORTANT NOTE: + // + // for cassandra we always have to start from 1 because + // all operators will share the same final db + + ArrayList list = new ArrayList<>(); + for (int i = 1; i <= endElementCounter; i++) { + list.add(i); + } + + ArrayList expected = new ArrayList<>(); + ResultSet result = session.execute(SELECT_DATA_QUERY); + for (Row s : result) { + expected.add(s.getInt("counter")); + } + + Collections.sort(expected); + + Assert.assertArrayEquals(list.toArray(), expected.toArray()); --- End diff -- why does this test follow a different approach to testing than `verifyResultsWhenScalingDown`? > Make the GenericWriteAheadSink rescalable. > -- > > Key: FLINK-5020 > URL: https://issues.apache.org/jira/browse/FLINK-5020 > Project: Flink > Issue Type: Improvement > Components: Cassandra Connector >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.2.0 > > > This targets integrating the GenericWriteAheadSink with the new rescalable > state abstractions so that the parallelism of the operator can change > arbitrarily without jeopardizing the guarantees offered by it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5020) Make the GenericWriteAheadSink rescalable.
[ https://issues.apache.org/jira/browse/FLINK-5020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15643905#comment-15643905 ] ASF GitHub Bot commented on FLINK-5020: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2759#discussion_r86755620 --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java --- @@ -318,6 +312,46 @@ protected void verifyResultsDataDiscardingUponRestore( Assert.assertTrue("The following ID's were not found in the ResultSet: " + list.toString(), list.isEmpty()); } + @Override + protected void verifyResultsWhenScalingDown(CassandraTupleWriteAheadSink> sink) throws Exception { + + ArrayList list = new ArrayList<>(); + for (int x = 1; x < 34; x++) { + list.add(x); + } + + ResultSet result = session.execute(SELECT_DATA_QUERY); + for (Row s : result) { + list.remove(new Integer(s.getInt("counter"))); + } + Assert.assertTrue(list.isEmpty()); + } + + @Override + protected void verifyResultsWhenScalingUp( + CassandraTupleWriteAheadSink > sink, int startElementCounter, int endElementCounter) { + + // IMPORTANT NOTE: + // + // for cassandra we always have to start from 1 because + // all operators will share the same final db + + ArrayList list = new ArrayList<>(); + for (int i = 1; i <= endElementCounter; i++) { + list.add(i); + } + + ArrayList expected = new ArrayList<>(); --- End diff -- this list does not contain the "expected" values. > Make the GenericWriteAheadSink rescalable. > -- > > Key: FLINK-5020 > URL: https://issues.apache.org/jira/browse/FLINK-5020 > Project: Flink > Issue Type: Improvement > Components: Cassandra Connector >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.2.0 > > > This targets integrating the GenericWriteAheadSink with the new rescalable > state abstractions so that the parallelism of the operator can change > arbitrarily without jeopardizing the guarantees offered by it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5020) Make the GenericWriteAheadSink rescalable.
[ https://issues.apache.org/jira/browse/FLINK-5020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15643900#comment-15643900 ] ASF GitHub Bot commented on FLINK-5020: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2759#discussion_r86754225 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java --- @@ -158,17 +160,151 @@ public void testDataDiscardingUponRestore() throws Exception { testHarness = new OneInputStreamOperatorTestHarness<>(sink); testHarness.setup(); - testHarness.restore(latestSnapshot); + testHarness.initializeState(latestSnapshot); testHarness.open(); for (int x = 0; x < 20; x++) { testHarness.processElement(new StreamRecord<>(generateValue(elementCounter, 2))); elementCounter++; } - testHarness.snapshotLegacy(snapshotCount++, 0); + testHarness.snapshot(snapshotCount++, 0); testHarness.notifyOfCompletedCheckpoint(snapshotCount - 1); - verifyResultsDataDiscardingUponRestore(testHarness, sink); + verifyResultsDataDiscardingUponRestore(sink); + } + + @Test + public void testScalingDown() throws Exception { + S sink1 = createSink(); + OneInputStreamOperatorTestHarnesstestHarness1 = + new OneInputStreamOperatorTestHarness<>(sink1, 10, 2, 0); + testHarness1.open(); + + S sink2 = createSink(); + OneInputStreamOperatorTestHarness testHarness2 = + new OneInputStreamOperatorTestHarness<>(sink2, 10, 2, 1); + testHarness2.open(); + + int elementCounter = 1; + int snapshotCount = 0; + + for (int x = 0; x < 10; x++) { + testHarness1.processElement(new StreamRecord<>(generateValue(elementCounter, 0))); + elementCounter++; + } + + for (int x = 0; x < 11; x++) { + testHarness2.processElement(new StreamRecord<>(generateValue(elementCounter, 0))); + elementCounter++; + } + + // snapshot at checkpoint 0 for testHarness1 and testHarness 2 + OperatorStateHandles snapshot1 = testHarness1.snapshot(snapshotCount, 0); + OperatorStateHandles snapshot2 = testHarness2.snapshot(snapshotCount, 0); + + // merge the two partial states + OperatorStateHandles mergedSnapshot = AbstractStreamOperatorTestHarness + .repackageState(snapshot1, snapshot2); + + testHarness1.close(); + testHarness2.close(); + + // and create a third instance that operates alone but + // has the merged state of the previous 2 instances + + S sink3 = createSink(); + OneInputStreamOperatorTestHarness mergedTestHarness = + new OneInputStreamOperatorTestHarness<>(sink3, 10, 1, 0); + + mergedTestHarness.setup(); + mergedTestHarness.initializeState(mergedSnapshot); + mergedTestHarness.open(); + + for (int x = 0; x < 12; x++) { + mergedTestHarness.processElement(new StreamRecord<>(generateValue(elementCounter, 0))); + elementCounter++; + } + + snapshotCount++; + mergedTestHarness.snapshot(snapshotCount, 1); + mergedTestHarness.notifyOfCompletedCheckpoint(snapshotCount); + + verifyResultsWhenScalingDown(sink3); + mergedTestHarness.close(); + } + + @Test + public void testScalingUp() throws Exception { + + S sink1 = createSink(); + OneInputStreamOperatorTestHarness testHarness1 = + new OneInputStreamOperatorTestHarness<>(sink1, 10, 1, 0); + + int elementCounter = 1; + int snapshotCount = 0; + + testHarness1.open(); + + // put two more checkpoints as pending + + for (int x = 0; x < 10; x++) { + testHarness1.processElement(new StreamRecord<>(generateValue(elementCounter, 0))); + elementCounter++; + } + testHarness1.snapshot(++snapshotCount, 0); + + for (int x = 0; x < 11; x++) { + testHarness1.processElement(new StreamRecord<>(generateValue(elementCounter,
[jira] [Commented] (FLINK-5020) Make the GenericWriteAheadSink rescalable.
[ https://issues.apache.org/jira/browse/FLINK-5020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15643862#comment-15643862 ] ASF GitHub Bot commented on FLINK-5020: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2759#discussion_r86753077 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java --- @@ -158,17 +160,151 @@ public void testDataDiscardingUponRestore() throws Exception { testHarness = new OneInputStreamOperatorTestHarness<>(sink); testHarness.setup(); - testHarness.restore(latestSnapshot); + testHarness.initializeState(latestSnapshot); testHarness.open(); for (int x = 0; x < 20; x++) { testHarness.processElement(new StreamRecord<>(generateValue(elementCounter, 2))); elementCounter++; } - testHarness.snapshotLegacy(snapshotCount++, 0); + testHarness.snapshot(snapshotCount++, 0); testHarness.notifyOfCompletedCheckpoint(snapshotCount - 1); - verifyResultsDataDiscardingUponRestore(testHarness, sink); + verifyResultsDataDiscardingUponRestore(sink); + } + + @Test + public void testScalingDown() throws Exception { + S sink1 = createSink(); + OneInputStreamOperatorTestHarnesstestHarness1 = + new OneInputStreamOperatorTestHarness<>(sink1, 10, 2, 0); + testHarness1.open(); + + S sink2 = createSink(); + OneInputStreamOperatorTestHarness testHarness2 = + new OneInputStreamOperatorTestHarness<>(sink2, 10, 2, 1); + testHarness2.open(); + + int elementCounter = 1; --- End diff -- we should let this start at 0; i just ended up double-checking that the count add up since the very loop goes to 34 even though we only write 33 elements. > Make the GenericWriteAheadSink rescalable. > -- > > Key: FLINK-5020 > URL: https://issues.apache.org/jira/browse/FLINK-5020 > Project: Flink > Issue Type: Improvement > Components: Cassandra Connector >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.2.0 > > > This targets integrating the GenericWriteAheadSink with the new rescalable > state abstractions so that the parallelism of the operator can change > arbitrarily without jeopardizing the guarantees offered by it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5020) Make the GenericWriteAheadSink rescalable.
[ https://issues.apache.org/jira/browse/FLINK-5020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15637222#comment-15637222 ] ASF GitHub Bot commented on FLINK-5020: --- GitHub user kl0u opened a pull request: https://github.com/apache/flink/pull/2759 [FLINK-5020] Make the GenericWriteAheadSink rescalable. As the issue suggests, this is the final commit to make the GenericWriteAheadSink re-scalable. In essence, what it does is that it replaces the old snapshot()/restore() cycle with the new snapshotState()/initializeState() and adds tests to show that it works as expected. R: @zentol You can merge this pull request into a Git repository by running: $ git pull https://github.com/kl0u/flink rescaling_wr_ahead Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2759.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2759 commit 1cdfff0f56a77616f36addfa1cb48b2efe2c9626 Author: kl0uDate: 2016-10-26T15:19:12Z [FLINK-4939] GenericWriteAheadSink: Decouple the creating from the committing subtask for a pending checkpoint So far the GenericWriteAheadSink expected that the subtask that wrote a temporary buffer to the state backend, will be also the one to commit it to the third-party storage system. This commit removes this assumption. To do this it changes the CheckpointCommitter to dynamically take the subtaskIdx as a parameter when asking if a checkpoint was committed and also changes the state kept by the GenericWriteAheadSink to also include that subtask index of the subtask that wrote the pending buffer. commit 77c3892687f78780c19710a95f8830907fe67c86 Author: kl0u Date: 2016-11-03T11:28:37Z Integrated PR comments commit c8627568c375cdd37ac0a8314fc5bf56077a578e Author: kl0u Date: 2016-11-04T15:05:19Z Removes redundant safety check. commit 2b9c28059e84ba09068ed0350680730f77119253 Author: kl0u Date: 2016-11-03T20:46:58Z [FLINK-5020] Make the GenericWriteAheadSink rescalable. > Make the GenericWriteAheadSink rescalable. > -- > > Key: FLINK-5020 > URL: https://issues.apache.org/jira/browse/FLINK-5020 > Project: Flink > Issue Type: Improvement > Components: Cassandra Connector >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.2.0 > > > This targets integrating the GenericWriteAheadSink with the new rescalable > state abstractions so that the parallelism of the operator can change > arbitrarily without jeopardizing the guarantees offered by it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)