[FLINK-5020] Make the GenericWriteAheadSink rescalable.

Integrates the new state abstractions with the GenericWriteAheadSink
so that the latter can change its parallelism when resuming execution
from a savepoint, without geopardizing the provided guarantees.

This closes #2759


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4eb71927
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4eb71927
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4eb71927

Branch: refs/heads/master
Commit: 4eb71927bc4f0832eb08a79394ad6864a3c2e142
Parents: 86f784a
Author: kl0u <kklou...@gmail.com>
Authored: Wed Oct 26 17:19:12 2016 +0200
Committer: zentol <ches...@apache.org>
Committed: Thu Dec 8 12:27:14 2016 +0100

----------------------------------------------------------------------
 .../cassandra/CassandraConnectorITCase.java     |  38 ++--
 .../runtime/operators/CheckpointCommitter.java  |   1 +
 .../operators/GenericWriteAheadSink.java        | 105 ++++++-----
 .../operators/GenericWriteAheadSinkTest.java    |  50 +++---
 .../operators/WriteAheadSinkTestBase.java       | 172 +++++++++++++++++--
 5 files changed, 276 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4eb71927/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
index 2bb6fd1..f2e8f8b 100644
--- 
a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
+++ 
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
@@ -47,7 +47,6 @@ import 
org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.test.util.TestEnvironment;
 
@@ -71,6 +70,7 @@ import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Scanner;
 import java.util.UUID;
 
@@ -262,9 +262,7 @@ public class CassandraConnectorITCase extends 
WriteAheadSinkTestBase<Tuple3<Stri
        }
 
        @Override
-       protected void verifyResultsIdealCircumstances(
-               OneInputStreamOperatorTestHarness<Tuple3<String, Integer, 
Integer>, Tuple3<String, Integer, Integer>> harness,
-               CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> 
sink) {
+       protected void 
verifyResultsIdealCircumstances(CassandraTupleWriteAheadSink<Tuple3<String, 
Integer, Integer>> sink) {
 
                ResultSet result = session.execute(SELECT_DATA_QUERY);
                ArrayList<Integer> list = new ArrayList<>();
@@ -279,9 +277,7 @@ public class CassandraConnectorITCase extends 
WriteAheadSinkTestBase<Tuple3<Stri
        }
 
        @Override
-       protected void verifyResultsDataPersistenceUponMissedNotify(
-               OneInputStreamOperatorTestHarness<Tuple3<String, Integer, 
Integer>, Tuple3<String, Integer, Integer>> harness,
-               CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> 
sink) {
+       protected void 
verifyResultsDataPersistenceUponMissedNotify(CassandraTupleWriteAheadSink<Tuple3<String,
 Integer, Integer>> sink) {
 
                ResultSet result = session.execute(SELECT_DATA_QUERY);
                ArrayList<Integer> list = new ArrayList<>();
@@ -296,9 +292,7 @@ public class CassandraConnectorITCase extends 
WriteAheadSinkTestBase<Tuple3<Stri
        }
 
        @Override
-       protected void verifyResultsDataDiscardingUponRestore(
-               OneInputStreamOperatorTestHarness<Tuple3<String, Integer, 
Integer>, Tuple3<String, Integer, Integer>> harness,
-               CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> 
sink) {
+       protected void 
verifyResultsDataDiscardingUponRestore(CassandraTupleWriteAheadSink<Tuple3<String,
 Integer, Integer>> sink) {
 
                ResultSet result = session.execute(SELECT_DATA_QUERY);
                ArrayList<Integer> list = new ArrayList<>();
@@ -315,6 +309,30 @@ public class CassandraConnectorITCase extends 
WriteAheadSinkTestBase<Tuple3<Stri
                Assert.assertTrue("The following ID's were not found in the 
ResultSet: " + list.toString(), list.isEmpty());
        }
 
+       @Override
+       protected void verifyResultsWhenReScaling(
+               CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> 
sink, int startElementCounter, int endElementCounter) {
+
+               // IMPORTANT NOTE:
+               //
+               // for cassandra we always have to start from 1 because
+               // all operators will share the same final db
+
+               ArrayList<Integer> expected = new ArrayList<>();
+               for (int i = 1; i <= endElementCounter; i++) {
+                       expected.add(i);
+               }
+
+               ArrayList<Integer> actual = new ArrayList<>();
+               ResultSet result = session.execute(SELECT_DATA_QUERY);
+               for (Row s : result) {
+                       actual.add(s.getInt("counter"));
+               }
+
+               Collections.sort(actual);
+               Assert.assertArrayEquals(expected.toArray(), actual.toArray());
+       }
+
        @Test
        public void testCassandraCommitter() throws Exception {
                CassandraCommitter cc1 = new CassandraCommitter(builder);

http://git-wip-us.apache.org/repos/asf/flink/blob/4eb71927/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/CheckpointCommitter.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/CheckpointCommitter.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/CheckpointCommitter.java
index 90e3a57..6e50dde 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/CheckpointCommitter.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/CheckpointCommitter.java
@@ -40,6 +40,7 @@ import java.io.Serializable;
  * and as such should kept as small as possible.
  */
 public abstract class CheckpointCommitter implements Serializable {
+
        protected static final Logger LOG = 
LoggerFactory.getLogger(CheckpointCommitter.class);
 
        protected String jobId;

http://git-wip-us.apache.org/repos/asf/flink/blob/4eb71927/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
index b08b2e9..564fa22 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
@@ -17,20 +17,20 @@
  */
 package org.apache.flink.streaming.runtime.operators;
 
+import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.io.disk.InputViewIterator;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.util.ReusingMutableToRegularIteratorWrapper;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -52,7 +52,7 @@ import java.util.UUID;
  * @param <IN> Type of the elements emitted by this sink
  */
 public abstract class GenericWriteAheadSink<IN> extends 
AbstractStreamOperator<IN>
-               implements OneInputStreamOperator<IN, IN>, 
StreamCheckpointedOperator {
+               implements OneInputStreamOperator<IN, IN> {
 
        private static final long serialVersionUID = 1L;
 
@@ -65,9 +65,15 @@ public abstract class GenericWriteAheadSink<IN> extends 
AbstractStreamOperator<I
        private transient CheckpointStreamFactory.CheckpointStateOutputStream 
out;
        private transient CheckpointStreamFactory checkpointStreamFactory;
 
+       private transient ListState<PendingCheckpoint> checkpointedState;
+
        private final Set<PendingCheckpoint> pendingCheckpoints = new 
TreeSet<>();
 
-       public GenericWriteAheadSink(CheckpointCommitter committer,     
TypeSerializer<IN> serializer, String jobID) throws Exception {
+       public GenericWriteAheadSink(
+                       CheckpointCommitter committer,
+                       TypeSerializer<IN> serializer,
+                       String jobID) throws Exception {
+
                this.committer = Preconditions.checkNotNull(committer);
                this.serializer = Preconditions.checkNotNull(serializer);
                this.id = UUID.randomUUID().toString();
@@ -77,12 +83,39 @@ public abstract class GenericWriteAheadSink<IN> extends 
AbstractStreamOperator<I
        }
 
        @Override
+       public void initializeState(StateInitializationContext context) throws 
Exception {
+               super.initializeState(context);
+
+               Preconditions.checkState(this.checkpointedState == null,
+                       "The reader state has already been initialized.");
+
+               checkpointedState = context.getOperatorStateStore()
+                       .getSerializableListState("pending-checkpoints");
+
+               int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
+               if (context.isRestored()) {
+                       LOG.info("Restoring state for the GenericWriteAheadSink 
(taskIdx={}).", subtaskIdx);
+
+                       for (PendingCheckpoint pendingCheckpoint : 
checkpointedState.get()) {
+                               this.pendingCheckpoints.add(pendingCheckpoint);
+                       }
+
+                       if (LOG.isDebugEnabled()) {
+                               LOG.debug("GenericWriteAheadSink idx {} 
restored {}.", subtaskIdx, this.pendingCheckpoints);
+                       }
+               } else {
+                       LOG.info("No state to restore for the 
GenericWriteAheadSink (taskIdx={}).", subtaskIdx);
+               }
+       }
+
+       @Override
        public void open() throws Exception {
                super.open();
                committer.setOperatorId(id);
                committer.open();
 
-               checkpointStreamFactory = 
getContainingTask().createCheckpointStreamFactory(this);
+               checkpointStreamFactory = getContainingTask()
+                       .createCheckpointStreamFactory(this);
 
                cleanRestoredHandles();
        }
@@ -99,12 +132,14 @@ public abstract class GenericWriteAheadSink<IN> extends 
AbstractStreamOperator<I
         * @throws IOException in case something went wrong when handling the 
stream to the backend.
         */
        private void saveHandleInState(final long checkpointId, final long 
timestamp) throws Exception {
+
                //only add handle if a new OperatorState was created since the 
last snapshot
                if (out != null) {
                        int subtaskIdx = 
getRuntimeContext().getIndexOfThisSubtask();
                        StreamStateHandle handle = out.closeAndGetHandle();
 
-                       PendingCheckpoint pendingCheckpoint = new 
PendingCheckpoint(checkpointId, subtaskIdx, timestamp, handle);
+                       PendingCheckpoint pendingCheckpoint = new 
PendingCheckpoint(
+                               checkpointId, subtaskIdx, timestamp, handle);
 
                        if (pendingCheckpoints.contains(pendingCheckpoint)) {
                                //we already have a checkpoint stored for that 
ID that may have been partially written,
@@ -118,22 +153,23 @@ public abstract class GenericWriteAheadSink<IN> extends 
AbstractStreamOperator<I
        }
 
        @Override
-       public void snapshotState(FSDataOutputStream out, long checkpointId, 
long timestamp) throws Exception {
-               saveHandleInState(checkpointId, timestamp);
+       public void snapshotState(StateSnapshotContext context) throws 
Exception {
+               super.snapshotState(context);
+
+               Preconditions.checkState(this.checkpointedState != null,
+                       "The operator state has not been properly 
initialized.");
 
-               DataOutputViewStreamWrapper outStream = new 
DataOutputViewStreamWrapper(out);
-               outStream.writeInt(pendingCheckpoints.size());
+               saveHandleInState(context.getCheckpointId(), 
context.getCheckpointTimestamp());
+
+               this.checkpointedState.clear();
                for (PendingCheckpoint pendingCheckpoint : pendingCheckpoints) {
-                       pendingCheckpoint.serialize(outStream);
+                       // create a new partition for each entry.
+                       this.checkpointedState.add(pendingCheckpoint);
                }
-       }
 
-       @Override
-       public void restoreState(FSDataInputStream in) throws Exception {
-               final DataInputViewStreamWrapper inStream = new 
DataInputViewStreamWrapper(in);
-               int numPendingHandles = inStream.readInt();
-               for (int i = 0; i < numPendingHandles; i++) {
-                       
pendingCheckpoints.add(PendingCheckpoint.restore(inStream, 
getUserCodeClassloader()));
+               int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("{} (taskIdx= {}) checkpointed {}.", 
getClass().getSimpleName(), subtaskIdx, this.pendingCheckpoints);
                }
        }
 
@@ -162,9 +198,12 @@ public abstract class GenericWriteAheadSink<IN> extends 
AbstractStreamOperator<I
                super.notifyOfCompletedCheckpoint(checkpointId);
 
                synchronized (pendingCheckpoints) {
+
                        Iterator<PendingCheckpoint> pendingCheckpointIt = 
pendingCheckpoints.iterator();
                        while (pendingCheckpointIt.hasNext()) {
+
                                PendingCheckpoint pendingCheckpoint = 
pendingCheckpointIt.next();
+
                                long pastCheckpointId = 
pendingCheckpoint.checkpointId;
                                int subtaskId = pendingCheckpoint.subtaskId;
                                long timestamp = pendingCheckpoint.timestamp;
@@ -241,34 +280,15 @@ public abstract class GenericWriteAheadSink<IN> extends 
AbstractStreamOperator<I
                        this.stateHandle = handle;
                }
 
-               void serialize(DataOutputViewStreamWrapper outputStream) throws 
IOException {
-                       outputStream.writeLong(checkpointId);
-                       outputStream.writeInt(subtaskId);
-                       outputStream.writeLong(timestamp);
-                       InstantiationUtil.serializeObject(outputStream, 
stateHandle);
-               }
-
-               static PendingCheckpoint restore(
-                               DataInputViewStreamWrapper inputStream,
-                               ClassLoader classLoader) throws IOException, 
ClassNotFoundException {
-
-                       long checkpointId = inputStream.readLong();
-                       int subtaskId = inputStream.readInt();
-                       long timestamp = inputStream.readLong();
-                       StreamStateHandle handle = 
InstantiationUtil.deserializeObject(inputStream, classLoader);
-
-                       return new PendingCheckpoint(checkpointId, subtaskId, 
timestamp, handle);
-               }
-
                @Override
                public int compareTo(PendingCheckpoint o) {
                        int res = Long.compare(this.checkpointId, 
o.checkpointId);
-                       return res != 0 ? res : Integer.compare(this.subtaskId, 
o.subtaskId);
+                       return res != 0 ? res : this.subtaskId - o.subtaskId;
                }
 
                @Override
                public boolean equals(Object o) {
-                       if (!(o instanceof 
GenericWriteAheadSink.PendingCheckpoint)) {
+                       if (o == null || !(o instanceof 
GenericWriteAheadSink.PendingCheckpoint)) {
                                return false;
                        }
                        PendingCheckpoint other = (PendingCheckpoint) o;
@@ -285,5 +305,10 @@ public abstract class GenericWriteAheadSink<IN> extends 
AbstractStreamOperator<I
                        hash = 31 * hash + (int) (timestamp ^ (timestamp >>> 
32));
                        return hash;
                }
+
+               @Override
+               public String toString() {
+                       return "Pending Checkpoint: id=" + checkpointId + "/" + 
subtaskId + "@" + timestamp;
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4eb71927/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSinkTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSinkTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSinkTest.java
index 8d092ed..9bcd2e6 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSinkTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSinkTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.operators;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -29,6 +30,7 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 public class GenericWriteAheadSinkTest extends 
WriteAheadSinkTestBase<Tuple1<Integer>, GenericWriteAheadSinkTest.ListSink> {
@@ -50,9 +52,7 @@ public class GenericWriteAheadSinkTest extends 
WriteAheadSinkTestBase<Tuple1<Int
 
 
        @Override
-       protected void verifyResultsIdealCircumstances(
-               OneInputStreamOperatorTestHarness<Tuple1<Integer>, 
Tuple1<Integer>> harness,
-               ListSink sink) {
+       protected void verifyResultsIdealCircumstances(ListSink sink) {
 
                ArrayList<Integer> list = new ArrayList<>();
                for (int x = 1; x <= 60; x++) {
@@ -67,9 +67,7 @@ public class GenericWriteAheadSinkTest extends 
WriteAheadSinkTestBase<Tuple1<Int
        }
 
        @Override
-       protected void verifyResultsDataPersistenceUponMissedNotify(
-               OneInputStreamOperatorTestHarness<Tuple1<Integer>, 
Tuple1<Integer>> harness,
-               ListSink sink) {
+       protected void verifyResultsDataPersistenceUponMissedNotify(ListSink 
sink) {
 
                ArrayList<Integer> list = new ArrayList<>();
                for (int x = 1; x <= 60; x++) {
@@ -84,9 +82,7 @@ public class GenericWriteAheadSinkTest extends 
WriteAheadSinkTestBase<Tuple1<Int
        }
 
        @Override
-       protected void verifyResultsDataDiscardingUponRestore(
-               OneInputStreamOperatorTestHarness<Tuple1<Integer>, 
Tuple1<Integer>> harness,
-               ListSink sink) {
+       protected void verifyResultsDataDiscardingUponRestore(ListSink sink) {
 
                ArrayList<Integer> list = new ArrayList<>();
                for (int x = 1; x <= 20; x++) {
@@ -103,6 +99,18 @@ public class GenericWriteAheadSinkTest extends 
WriteAheadSinkTestBase<Tuple1<Int
                Assert.assertTrue("The sink emitted to many values: " + 
(sink.values.size() - 40), sink.values.size() == 40);
        }
 
+       @Override
+       protected void verifyResultsWhenReScaling(ListSink sink, int 
startElementCounter, int endElementCounter) throws Exception {
+
+               ArrayList<Integer> list = new ArrayList<>();
+               for (int i = startElementCounter; i <= endElementCounter; i++) {
+                       list.add(i);
+               }
+
+               Collections.sort(sink.values);
+               Assert.assertArrayEquals(list.toArray(), sink.values.toArray());
+       }
+
        @Test
        /**
         * Verifies that exceptions thrown by a committer do not fail a job and 
lead to an abort of notify()
@@ -124,33 +132,33 @@ public class GenericWriteAheadSinkTest extends 
WriteAheadSinkTestBase<Tuple1<Int
                        elementCounter++;
                }
 
-               testHarness.snapshotLegacy(0, 0);
+               testHarness.snapshot(0, 0);
                testHarness.notifyOfCompletedCheckpoint(0);
 
                //isCommitted should have failed, thus sendValues() should 
never have been called
                Assert.assertEquals(0, sink.values.size());
 
-               for (int x = 0; x < 10; x++) {
+               for (int x = 0; x < 11; x++) {
                        testHarness.processElement(new 
StreamRecord<>(generateValue(elementCounter, 1)));
                        elementCounter++;
                }
 
-               testHarness.snapshotLegacy(1, 0);
+               testHarness.snapshot(1, 0);
                testHarness.notifyOfCompletedCheckpoint(1);
 
                //previous CP should be retried, but will fail the CP commit. 
Second CP should be skipped.
                Assert.assertEquals(10, sink.values.size());
 
-               for (int x = 0; x < 10; x++) {
+               for (int x = 0; x < 12; x++) {
                        testHarness.processElement(new 
StreamRecord<>(generateValue(elementCounter, 2)));
                        elementCounter++;
                }
 
-               testHarness.snapshotLegacy(2, 0);
+               testHarness.snapshot(2, 0);
                testHarness.notifyOfCompletedCheckpoint(2);
 
-               //all CP's should be retried and succeed; since one CP was 
written twice we have 2 * 10 + 10 + 10 = 40 values
-               Assert.assertEquals(40, sink.values.size());
+               //all CP's should be retried and succeed; since one CP was 
written twice we have 2 * 10 + 11 + 12 = 43 values
+               Assert.assertEquals(43, sink.values.size());
        }
 
        /**
@@ -177,7 +185,7 @@ public class GenericWriteAheadSinkTest extends 
WriteAheadSinkTestBase<Tuple1<Int
        public static class SimpleCommitter extends CheckpointCommitter {
                private static final long serialVersionUID = 1L;
 
-               private List<Long> checkpoints;
+               private List<Tuple2<Long, Integer>> checkpoints;
 
                @Override
                public void open() throws Exception {
@@ -194,12 +202,12 @@ public class GenericWriteAheadSinkTest extends 
WriteAheadSinkTestBase<Tuple1<Int
 
                @Override
                public void commitCheckpoint(int subtaskIdx, long checkpointID) 
{
-                       checkpoints.add(checkpointID);
+                       checkpoints.add(new Tuple2<>(checkpointID, subtaskIdx));
                }
 
                @Override
                public boolean isCheckpointCommitted(int subtaskIdx, long 
checkpointID) {
-                       return checkpoints.contains(checkpointID);
+                       return checkpoints.contains(new Tuple2<>(checkpointID, 
subtaskIdx));
                }
        }
 
@@ -227,7 +235,7 @@ public class GenericWriteAheadSinkTest extends 
WriteAheadSinkTestBase<Tuple1<Int
        public static class FailingCommitter extends CheckpointCommitter {
                private static final long serialVersionUID = 1L;
 
-               private List<Long> checkpoints;
+               private List<Tuple2<Long, Integer>> checkpoints;
                private boolean failIsCommitted = true;
                private boolean failCommit = true;
 
@@ -250,7 +258,7 @@ public class GenericWriteAheadSinkTest extends 
WriteAheadSinkTestBase<Tuple1<Int
                                failCommit = false;
                                throw new RuntimeException("Expected 
exception");
                        } else {
-                               checkpoints.add(checkpointID);
+                               checkpoints.add(new Tuple2<>(checkpointID, 
subtaskIdx));
                        }
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4eb71927/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java
index a9c5792..46d92af 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java
@@ -19,8 +19,9 @@
 package org.apache.flink.streaming.runtime.operators;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.util.TestLogger;
 
@@ -34,14 +35,13 @@ public abstract class WriteAheadSinkTestBase<IN, S extends 
GenericWriteAheadSink
 
        protected abstract IN generateValue(int counter, int checkpointID);
 
-       protected abstract void verifyResultsIdealCircumstances(
-               OneInputStreamOperatorTestHarness<IN, IN> harness, S sink) 
throws Exception;
+       protected abstract void verifyResultsIdealCircumstances(S sink) throws 
Exception;
 
-       protected abstract void verifyResultsDataPersistenceUponMissedNotify(
-                       OneInputStreamOperatorTestHarness<IN, IN> harness, S 
sink) throws Exception;
+       protected abstract void verifyResultsDataPersistenceUponMissedNotify(S 
sink) throws Exception;
 
-       protected abstract void verifyResultsDataDiscardingUponRestore(
-               OneInputStreamOperatorTestHarness<IN, IN> harness, S sink) 
throws Exception;
+       protected abstract void verifyResultsDataDiscardingUponRestore(S sink) 
throws Exception;
+
+       protected abstract void verifyResultsWhenReScaling(S sink, int 
startElementCounter, int endElementCounter) throws Exception;
 
        @Test
        public void testIdealCircumstances() throws Exception {
@@ -60,7 +60,7 @@ public abstract class WriteAheadSinkTestBase<IN, S extends 
GenericWriteAheadSink
                        elementCounter++;
                }
 
-               testHarness.snapshotLegacy(snapshotCount++, 0);
+               testHarness.snapshot(snapshotCount++, 0);
                testHarness.notifyOfCompletedCheckpoint(snapshotCount - 1);
 
                for (int x = 0; x < 20; x++) {
@@ -68,7 +68,7 @@ public abstract class WriteAheadSinkTestBase<IN, S extends 
GenericWriteAheadSink
                        elementCounter++;
                }
 
-               testHarness.snapshotLegacy(snapshotCount++, 0);
+               testHarness.snapshot(snapshotCount++, 0);
                testHarness.notifyOfCompletedCheckpoint(snapshotCount - 1);
 
                for (int x = 0; x < 20; x++) {
@@ -76,10 +76,10 @@ public abstract class WriteAheadSinkTestBase<IN, S extends 
GenericWriteAheadSink
                        elementCounter++;
                }
 
-               testHarness.snapshotLegacy(snapshotCount++, 0);
+               testHarness.snapshot(snapshotCount++, 0);
                testHarness.notifyOfCompletedCheckpoint(snapshotCount - 1);
 
-               verifyResultsIdealCircumstances(testHarness, sink);
+               verifyResultsIdealCircumstances(sink);
        }
 
        @Test
@@ -99,7 +99,7 @@ public abstract class WriteAheadSinkTestBase<IN, S extends 
GenericWriteAheadSink
                        elementCounter++;
                }
 
-               testHarness.snapshotLegacy(snapshotCount++, 0);
+               testHarness.snapshot(snapshotCount++, 0);
                testHarness.notifyOfCompletedCheckpoint(snapshotCount - 1);
 
                for (int x = 0; x < 20; x++) {
@@ -107,17 +107,17 @@ public abstract class WriteAheadSinkTestBase<IN, S 
extends GenericWriteAheadSink
                        elementCounter++;
                }
 
-               testHarness.snapshotLegacy(snapshotCount++, 0);
+               testHarness.snapshot(snapshotCount++, 0);
 
                for (int x = 0; x < 20; x++) {
                        testHarness.processElement(new 
StreamRecord<>(generateValue(elementCounter, 2)));
                        elementCounter++;
                }
 
-               testHarness.snapshotLegacy(snapshotCount++, 0);
+               testHarness.snapshot(snapshotCount++, 0);
                testHarness.notifyOfCompletedCheckpoint(snapshotCount - 1);
 
-               verifyResultsDataPersistenceUponMissedNotify(testHarness, sink);
+               verifyResultsDataPersistenceUponMissedNotify(sink);
        }
 
        @Test
@@ -137,7 +137,7 @@ public abstract class WriteAheadSinkTestBase<IN, S extends 
GenericWriteAheadSink
                        elementCounter++;
                }
 
-               StreamStateHandle latestSnapshot = 
testHarness.snapshotLegacy(snapshotCount++, 0);
+               OperatorStateHandles latestSnapshot = 
testHarness.snapshot(snapshotCount++, 0);
                testHarness.notifyOfCompletedCheckpoint(snapshotCount - 1);
 
                for (int x = 0; x < 20; x++) {
@@ -152,7 +152,7 @@ public abstract class WriteAheadSinkTestBase<IN, S extends 
GenericWriteAheadSink
                testHarness = new OneInputStreamOperatorTestHarness<>(sink);
 
                testHarness.setup();
-               testHarness.restore(latestSnapshot);
+               testHarness.initializeState(latestSnapshot);
                testHarness.open();
 
                for (int x = 0; x < 20; x++) {
@@ -160,9 +160,143 @@ public abstract class WriteAheadSinkTestBase<IN, S 
extends GenericWriteAheadSink
                        elementCounter++;
                }
 
-               testHarness.snapshotLegacy(snapshotCount++, 0);
+               testHarness.snapshot(snapshotCount++, 0);
                testHarness.notifyOfCompletedCheckpoint(snapshotCount - 1);
 
-               verifyResultsDataDiscardingUponRestore(testHarness, sink);
+               verifyResultsDataDiscardingUponRestore(sink);
+       }
+
+       @Test
+       public void testScalingDown() throws Exception {
+               S sink1 = createSink();
+               OneInputStreamOperatorTestHarness<IN, IN> testHarness1 =
+                       new OneInputStreamOperatorTestHarness<>(sink1, 10, 2, 
0);
+               testHarness1.open();
+
+               S sink2 = createSink();
+               OneInputStreamOperatorTestHarness<IN, IN> testHarness2 =
+                       new OneInputStreamOperatorTestHarness<>(sink2, 10, 2, 
1);
+               testHarness2.open();
+
+               int elementCounter = 1;
+               int snapshotCount = 0;
+
+               for (int x = 0; x < 10; x++) {
+                       testHarness1.processElement(new 
StreamRecord<>(generateValue(elementCounter, 0)));
+                       elementCounter++;
+               }
+
+               for (int x = 0; x < 11; x++) {
+                       testHarness2.processElement(new 
StreamRecord<>(generateValue(elementCounter, 0)));
+                       elementCounter++;
+               }
+
+               // snapshot at checkpoint 0 for testHarness1 and testHarness 2
+               OperatorStateHandles snapshot1 = 
testHarness1.snapshot(snapshotCount, 0);
+               OperatorStateHandles snapshot2 = 
testHarness2.snapshot(snapshotCount, 0);
+
+               // merge the two partial states
+               OperatorStateHandles mergedSnapshot = 
AbstractStreamOperatorTestHarness
+                       .repackageState(snapshot1, snapshot2);
+
+               testHarness1.close();
+               testHarness2.close();
+
+               // and create a third instance that operates alone but
+               // has the merged state of the previous 2 instances
+
+               S sink3 = createSink();
+               OneInputStreamOperatorTestHarness<IN, IN> mergedTestHarness =
+                       new OneInputStreamOperatorTestHarness<>(sink3, 10, 1, 
0);
+
+               mergedTestHarness.setup();
+               mergedTestHarness.initializeState(mergedSnapshot);
+               mergedTestHarness.open();
+
+               for (int x = 0; x < 12; x++) {
+                       mergedTestHarness.processElement(new 
StreamRecord<>(generateValue(elementCounter, 0)));
+                       elementCounter++;
+               }
+
+               snapshotCount++;
+               mergedTestHarness.snapshot(snapshotCount, 1);
+               mergedTestHarness.notifyOfCompletedCheckpoint(snapshotCount);
+
+               verifyResultsWhenReScaling(sink3, 1, 33);
+               mergedTestHarness.close();
+       }
+
+       @Test
+       public void testScalingUp() throws Exception {
+
+               S sink1 = createSink();
+               OneInputStreamOperatorTestHarness<IN, IN> testHarness1 =
+                       new OneInputStreamOperatorTestHarness<>(sink1, 10, 1, 
0);
+
+               int elementCounter = 1;
+               int snapshotCount = 0;
+
+               testHarness1.open();
+
+               // put two more checkpoints as pending
+
+               for (int x = 0; x < 10; x++) {
+                       testHarness1.processElement(new 
StreamRecord<>(generateValue(elementCounter, 0)));
+                       elementCounter++;
+               }
+               testHarness1.snapshot(++snapshotCount, 0);
+
+               for (int x = 0; x < 11; x++) {
+                       testHarness1.processElement(new 
StreamRecord<>(generateValue(elementCounter, 0)));
+                       elementCounter++;
+               }
+
+               // this will be the state that will be split between the two 
new operators
+               OperatorStateHandles snapshot = 
testHarness1.snapshot(++snapshotCount, 0);
+
+               testHarness1.close();
+
+               // verify no elements are in the sink
+               verifyResultsWhenReScaling(sink1, 0, -1);
+
+               // we will create two operator instances, testHarness2 and 
testHarness3,
+               // that will share the state of testHarness1
+
+               ++snapshotCount;
+
+               S sink2 = createSink();
+               OneInputStreamOperatorTestHarness<IN, IN> testHarness2 =
+                       new OneInputStreamOperatorTestHarness<>(sink2, 10, 2, 
0);
+
+               testHarness2.setup();
+               testHarness2.initializeState(snapshot);
+               testHarness2.open();
+
+               testHarness2.notifyOfCompletedCheckpoint(snapshotCount);
+
+               verifyResultsWhenReScaling(sink2, 1, 10);
+
+               S sink3 = createSink();
+               OneInputStreamOperatorTestHarness<IN, IN> testHarness3 =
+                       new OneInputStreamOperatorTestHarness<>(sink3, 10, 2, 
1);
+
+               testHarness3.setup();
+               testHarness3.initializeState(snapshot);
+               testHarness3.open();
+
+               // add some more elements to verify that everything functions 
normally from now on...
+
+               for (int x = 0; x < 10; x++) {
+                       testHarness3.processElement(new 
StreamRecord<>(generateValue(elementCounter, 0)));
+                       elementCounter++;
+               }
+
+               testHarness3.snapshot(snapshotCount, 1);
+               testHarness3.notifyOfCompletedCheckpoint(snapshotCount);
+
+               verifyResultsWhenReScaling(sink3, 11, 31);
+
+               testHarness2.close();
+               testHarness3.close();
        }
 }

Reply via email to