This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.7 by this push:
     new 1ce2efd  [FLINK-12296][StateBackend] Fix local state directory 
collision with state loss for chained keyed operators (#8323)
1ce2efd is described below

commit 1ce2efd7a38d091fc004a8dba034ece0bcc42385
Author: Congxian Qiu <qcx978132...@gmail.com>
AuthorDate: Thu May 2 16:12:31 2019 +0800

    [FLINK-12296][StateBackend] Fix local state directory collision with state 
loss for chained keyed operators (#8323)
    
    - Change
    Will change the local data path from
    
`.../local_state_root/allocatio_id/job_id/jobvertext_id_subtask_id/chk_id/rocksdb`
    to
    
`.../local_state_root/allocatio_id/job_id/jobvertext_id_subtask_id/chk_id/backend_id`
    
    When preparing the local directory Flink deletes the local directory for 
each subtask if it already exists,
    If more than one stateful operators chained in a single task, they'll share 
the same local directory path,
    then the local directory will be deleted unexpectedly, and the we'll get 
data loss.
---
 .../snapshot/RocksIncrementalSnapshotStrategy.java |  16 +-
 .../tasks/OneInputStreamTaskTestHarness.java       |  42 +++-
 .../runtime/tasks/StreamConfigChainer.java         |  23 +-
 .../runtime/tasks/StreamMockEnvironment.java       |   8 +-
 .../runtime/tasks/StreamTaskTestHarness.java       |  20 ++
 .../state/StatefulOperatorChainedTaskTest.java     | 257 +++++++++++++++++++++
 6 files changed, 355 insertions(+), 11 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
index 37f1850..e877cc4 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
@@ -106,6 +106,9 @@ public class RocksIncrementalSnapshotStrategy<K> extends 
RocksDBSnapshotStrategy
        /** The identifier of the last completed checkpoint. */
        private long lastCompletedCheckpointId;
 
+       /** The local directory name of the current snapshot strategy. */
+       private final String localDirectoryName;
+
        public RocksIncrementalSnapshotStrategy(
                @Nonnull RocksDB db,
                @Nonnull ResourceGuard rocksDBResourceGuard,
@@ -135,6 +138,7 @@ public class RocksIncrementalSnapshotStrategy<K> extends 
RocksDBSnapshotStrategy
                this.backendUID = backendUID;
                this.materializedSstFiles = materializedSstFiles;
                this.lastCompletedCheckpointId = lastCompletedCheckpointId;
+               this.localDirectoryName = 
backendUID.toString().replaceAll("[\\-]", "");
        }
 
        @Nonnull
@@ -182,17 +186,17 @@ public class RocksIncrementalSnapshotStrategy<K> extends 
RocksDBSnapshotStrategy
                        LocalRecoveryDirectoryProvider directoryProvider = 
localRecoveryConfig.getLocalStateDirectoryProvider();
                        File directory = 
directoryProvider.subtaskSpecificCheckpointDirectory(checkpointId);
 
-                       if (directory.exists()) {
-                               FileUtils.deleteDirectory(directory);
-                       }
-
-                       if (!directory.mkdirs()) {
+                       if (!directory.exists() && !directory.mkdirs()) {
                                throw new IOException("Local state base 
directory for checkpoint " + checkpointId +
                                        " already exists: " + directory);
                        }
 
                        // introduces an extra directory because RocksDB wants 
a non-existing directory for native checkpoints.
-                       File rdbSnapshotDir = new File(directory, "rocks_db");
+                       // append localDirectoryName here to solve directory 
collision problem when two stateful operators chained in one task.
+                       File rdbSnapshotDir = new File(directory, 
localDirectoryName);
+                       if (rdbSnapshotDir.exists()) {
+                               FileUtils.deleteDirectory(rdbSnapshotDir);
+                       }
                        Path path = new Path(rdbSnapshotDir.toURI());
                        // create a "permanent" snapshot directory because 
local recovery is active.
                        try {
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
index 89a4f81..6b21588 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
@@ -25,7 +25,10 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.execution.Environment;
 import 
org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleInputGate;
+import org.apache.flink.runtime.state.LocalRecoveryConfig;
+import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.function.Function;
 
@@ -56,6 +59,38 @@ public class OneInputStreamTaskTestHarness<IN, OUT> extends 
StreamTaskTestHarnes
 
        /**
         * Creates a test harness with the specified number of input gates and 
specified number
+        * of channels per input gate and local recovery disabled.
+        */
+       public OneInputStreamTaskTestHarness(
+               Function<Environment, ? extends StreamTask<OUT, ?>> taskFactory,
+               int numInputGates,
+               int numInputChannelsPerGate,
+               TypeInformation<IN> inputType,
+               TypeInformation<OUT> outputType) {
+               this(taskFactory, numInputGates, numInputChannelsPerGate, 
inputType, outputType, TestLocalRecoveryConfig.disabled());
+       }
+
+       public OneInputStreamTaskTestHarness(
+               Function<Environment, ? extends StreamTask<OUT, ?>> taskFactory,
+               int numInputGates,
+               int numInputChannelsPerGate,
+               TypeInformation<IN> inputType,
+               TypeInformation<OUT> outputType,
+               File localRootDir) {
+               super(taskFactory, outputType, localRootDir);
+
+               this.inputType = inputType;
+               inputSerializer = inputType.createSerializer(executionConfig);
+
+               this.numInputGates = numInputGates;
+               this.numInputChannelsPerGate = numInputChannelsPerGate;
+
+               streamConfig.setStateKeySerializer(inputSerializer);
+       }
+
+       /**
+        * Creates a test harness with the specified number of input gates and 
specified number
+        * of channels per input gate and specified localRecoveryConfig.
         * of channels per input gate.
         */
        public OneInputStreamTaskTestHarness(
@@ -63,9 +98,10 @@ public class OneInputStreamTaskTestHarness<IN, OUT> extends 
StreamTaskTestHarnes
                        int numInputGates,
                        int numInputChannelsPerGate,
                        TypeInformation<IN> inputType,
-                       TypeInformation<OUT> outputType) {
+                       TypeInformation<OUT> outputType,
+                       LocalRecoveryConfig localRecoveryConfig) {
 
-               super(taskFactory, outputType);
+               super(taskFactory, outputType, localRecoveryConfig);
 
                this.inputType = inputType;
                inputSerializer = inputType.createSerializer(executionConfig);
@@ -82,7 +118,7 @@ public class OneInputStreamTaskTestHarness<IN, OUT> extends 
StreamTaskTestHarnes
                        TypeInformation<IN> inputType,
                        TypeInformation<OUT> outputType) {
 
-               this(taskFactory, 1, 1, inputType, outputType);
+               this(taskFactory, 1, 1, inputType, outputType, 
TestLocalRecoveryConfig.disabled());
        }
 
        @Override
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java
index 10e50ce..bcef24a 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java
@@ -68,10 +68,27 @@ public class StreamConfigChainer {
        }
 
        public <IN, OUT> StreamConfigChainer chain(
+               OperatorID operatorID,
+               OneInputStreamOperator<IN, OUT> operator,
+               TypeSerializer<IN> inputSerializer,
+               TypeSerializer<OUT> outputSerializer) {
+               return chain(operatorID, operator, inputSerializer, 
outputSerializer, false);
+       }
+
+       public <T> StreamConfigChainer chain(
+               OperatorID operatorID,
+               OneInputStreamOperator<T, T> operator,
+               TypeSerializer<T> typeSerializer,
+               boolean createKeyedStateBackend) {
+               return chain(operatorID, operator, typeSerializer, 
typeSerializer, createKeyedStateBackend);
+       }
+
+       public <IN, OUT> StreamConfigChainer chain(
                        OperatorID operatorID,
                        OneInputStreamOperator<IN, OUT> operator,
                        TypeSerializer<IN> inputSerializer,
-                       TypeSerializer<OUT> outputSerializer) {
+                       TypeSerializer<OUT> outputSerializer,
+                       boolean createKeyedStateBackend) {
                chainIndex++;
 
                tailConfig.setChainedOutputs(Collections.singletonList(
@@ -88,6 +105,10 @@ public class StreamConfigChainer {
                tailConfig.setTypeSerializerIn1(inputSerializer);
                tailConfig.setTypeSerializerOut(outputSerializer);
                tailConfig.setChainIndex(chainIndex);
+               if (createKeyedStateBackend) {
+                       // used to test multiple stateful operators chained in 
a single task.
+                       tailConfig.setStateKeySerializer(inputSerializer);
+               }
 
                chainedConfigs.put(chainIndex, tailConfig);
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index 32de8d5..8f191b9 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -102,6 +102,8 @@ public class StreamMockEnvironment implements Environment {
 
        private TaskEventDispatcher taskEventDispatcher = 
mock(TaskEventDispatcher.class);
 
+       private TaskManagerRuntimeInfo taskManagerRuntimeInfo = new 
TestingTaskManagerRuntimeInfo();
+
        public StreamMockEnvironment(
                Configuration jobConfig,
                Configuration taskConfig,
@@ -322,7 +324,11 @@ public class StreamMockEnvironment implements Environment {
 
        @Override
        public TaskManagerRuntimeInfo getTaskManagerInfo() {
-               return new TestingTaskManagerRuntimeInfo();
+               return this.taskManagerRuntimeInfo;
+       }
+
+       public void setTaskManagerInfo(TaskManagerRuntimeInfo 
taskManagerRuntimeInfo) {
+               this.taskManagerRuntimeInfo = taskManagerRuntimeInfo;
        }
 
        @Override
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index b2f1b99..1ebc800 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.Configuration;
@@ -26,10 +27,14 @@ import 
org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.execution.Environment;
 import 
org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleInputGate;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.LocalRecoveryConfig;
+import org.apache.flink.runtime.state.LocalRecoveryDirectoryProviderImpl;
+import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
 import org.apache.flink.runtime.state.TestTaskStateManager;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
@@ -47,6 +52,7 @@ import org.apache.flink.util.Preconditions;
 
 import org.junit.Assert;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.LinkedList;
@@ -110,6 +116,20 @@ public class StreamTaskTestHarness<OUT> {
                        Function<Environment, ? extends StreamTask<OUT, ?>> 
taskFactory,
                        TypeInformation<OUT> outputType) {
 
+               this(taskFactory, outputType, 
TestLocalRecoveryConfig.disabled());
+       }
+
+       public StreamTaskTestHarness(
+               Function<Environment, ? extends StreamTask<OUT, ?>> taskFactory,
+               TypeInformation<OUT> outputType,
+               File localRootDir) {
+               this(taskFactory, outputType, new LocalRecoveryConfig(true, new 
LocalRecoveryDirectoryProviderImpl(localRootDir, new JobID(), new 
JobVertexID(), 0)));
+       }
+
+       public StreamTaskTestHarness(
+               Function<Environment, ? extends StreamTask<OUT, ?>> taskFactory,
+               TypeInformation<OUT> outputType,
+               LocalRecoveryConfig localRecoveryConfig) {
                this.taskFactory = checkNotNull(taskFactory);
                this.memorySize = DEFAULT_MEMORY_MANAGER_SIZE;
                this.bufferSize = DEFAULT_NETWORK_BUFFER_SIZE;
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/state/StatefulOperatorChainedTaskTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/state/StatefulOperatorChainedTaskTest.java
new file mode 100644
index 0000000..44d4ad1
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/state/StatefulOperatorChainedTaskTest.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.state;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.TestTaskStateManager;
+import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
+import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static 
org.apache.flink.configuration.CheckpointingOptions.CHECKPOINTS_DIRECTORY;
+import static 
org.apache.flink.configuration.CheckpointingOptions.INCREMENTAL_CHECKPOINTS;
+import static 
org.apache.flink.configuration.CheckpointingOptions.STATE_BACKEND;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for StatefulOperatorChainedTaskTest.
+ */
+public class StatefulOperatorChainedTaskTest {
+       private static final Set<OperatorID> RESTORED_OPERATORS = 
ConcurrentHashMap.newKeySet();
+
+       private static TemporaryFolder temporaryFolder;
+
+       @Before
+       public void setup() throws IOException {
+               RESTORED_OPERATORS.clear();
+               temporaryFolder = new TemporaryFolder();
+               temporaryFolder.create();
+       }
+
+       @Test
+       public void testMultipleStatefulOperatorChainedSnapshotAndRestore() 
throws Exception {
+
+               OperatorID headOperatorID = new OperatorID(42L, 42L);
+               OperatorID tailOperatorID = new OperatorID(44L, 44L);
+
+               JobManagerTaskRestore restore = 
createRunAndCheckpointOperatorChain(
+                       headOperatorID,
+                       new CounterOperator("head"),
+                       tailOperatorID,
+                       new CounterOperator("tail"),
+                       Optional.empty());
+
+               TaskStateSnapshot stateHandles = restore.getTaskStateSnapshot();
+
+               assertEquals(2, stateHandles.getSubtaskStateMappings().size());
+
+               createRunAndCheckpointOperatorChain(
+                       headOperatorID,
+                       new CounterOperator("head"),
+                       tailOperatorID,
+                       new CounterOperator("tail"),
+                       Optional.of(restore));
+
+               assertEquals(new HashSet<>(Arrays.asList(headOperatorID, 
tailOperatorID)), RESTORED_OPERATORS);
+       }
+
+       private JobManagerTaskRestore createRunAndCheckpointOperatorChain(
+               OperatorID headId,
+               OneInputStreamOperator<String, String> headOperator,
+               OperatorID tailId,
+               OneInputStreamOperator<String, String> tailOperator,
+               Optional<JobManagerTaskRestore> restore) throws Exception {
+
+               File localRootDir = temporaryFolder.newFolder();
+               final OneInputStreamTaskTestHarness<String, String> testHarness 
=
+                       new OneInputStreamTaskTestHarness<>(
+                               OneInputStreamTask::new,
+                               1, 1,
+                               BasicTypeInfo.STRING_TYPE_INFO,
+                               BasicTypeInfo.STRING_TYPE_INFO,
+                               localRootDir);
+
+               testHarness.setupOperatorChain(headId, headOperator)
+                       .chain(tailId, tailOperator, StringSerializer.INSTANCE, 
true)
+                       .finish();
+
+               if (restore.isPresent()) {
+                       JobManagerTaskRestore taskRestore = restore.get();
+                       testHarness.setTaskStateSnapshot(
+                               taskRestore.getRestoreCheckpointId(),
+                               taskRestore.getTaskStateSnapshot());
+               }
+
+               StreamMockEnvironment environment = new StreamMockEnvironment(
+                       testHarness.jobConfig,
+                       testHarness.taskConfig,
+                       testHarness.getExecutionConfig(),
+                       testHarness.memorySize,
+                       new MockInputSplitProvider(),
+                       testHarness.bufferSize,
+                       testHarness.getTaskStateManager());
+
+               Configuration configuration = new Configuration();
+               configuration.setString(STATE_BACKEND.key(), "rocksdb");
+               File file = temporaryFolder.newFolder();
+               configuration.setString(CHECKPOINTS_DIRECTORY.key(), 
file.toURI().toString());
+               configuration.setString(INCREMENTAL_CHECKPOINTS.key(), "true");
+               environment.setTaskManagerInfo(new 
TestingTaskManagerRuntimeInfo(configuration, 
System.getProperty("java.io.tmpdir").split(",|" + File.pathSeparator)));
+               testHarness.invoke(environment);
+               testHarness.waitForTaskRunning();
+
+               OneInputStreamTask<String, String> streamTask = 
testHarness.getTask();
+
+               processRecords(testHarness);
+               triggerCheckpoint(testHarness, streamTask);
+
+               TestTaskStateManager taskStateManager = 
testHarness.getTaskStateManager();
+
+               JobManagerTaskRestore jobManagerTaskRestore = new 
JobManagerTaskRestore(
+                       taskStateManager.getReportedCheckpointId(),
+                       taskStateManager.getLastJobManagerTaskStateSnapshot());
+
+               testHarness.endInput();
+               testHarness.waitForTaskCompletion();
+               return jobManagerTaskRestore;
+       }
+
+       private void triggerCheckpoint(
+               OneInputStreamTaskTestHarness<String, String> testHarness,
+               OneInputStreamTask<String, String> streamTask) throws Exception 
{
+
+               long checkpointId = 1L;
+               CheckpointMetaData checkpointMetaData = new 
CheckpointMetaData(checkpointId, 1L);
+
+               testHarness.getTaskStateManager().setWaitForReportLatch(new 
OneShotLatch());
+
+               while (!streamTask.triggerCheckpoint(checkpointMetaData, 
CheckpointOptions.forCheckpointWithDefaultLocation())) {}
+
+               
testHarness.getTaskStateManager().getWaitForReportLatch().await();
+               long reportedCheckpointId = 
testHarness.getTaskStateManager().getReportedCheckpointId();
+
+               assertEquals(checkpointId, reportedCheckpointId);
+       }
+
+       private void processRecords(OneInputStreamTaskTestHarness<String, 
String> testHarness) throws Exception {
+               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+               testHarness.processElement(new StreamRecord<>("10"), 0, 0);
+               testHarness.processElement(new StreamRecord<>("20"), 0, 0);
+               testHarness.processElement(new StreamRecord<>("30"), 0, 0);
+
+               testHarness.waitForInputProcessing();
+
+               expectedOutput.add(new StreamRecord<>("10"));
+               expectedOutput.add(new StreamRecord<>("20"));
+               expectedOutput.add(new StreamRecord<>("30"));
+               TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
+       }
+
+       private abstract static class RestoreWatchOperator<IN, OUT>
+               extends AbstractStreamOperator<OUT>
+               implements OneInputStreamOperator<IN, OUT> {
+
+               @Override
+               public void initializeState(StateInitializationContext context) 
throws Exception {
+                       if (context.isRestored()) {
+                               RESTORED_OPERATORS.add(getOperatorID());
+                       }
+               }
+       }
+
+       /**
+        * Operator that counts processed messages and keeps result on state.
+        */
+       private static class CounterOperator extends 
RestoreWatchOperator<String, String> {
+               private static final long serialVersionUID = 
2048954179291813243L;
+
+               private static Long snapshotOutData = 0L;
+               private ValueState<Long> counterState;
+               private long counter = 0;
+               private String prefix;
+
+               CounterOperator(String prifix) {
+                       this.prefix = prefix;
+               }
+
+               @Override
+               public void processElement(StreamRecord<String> element) throws 
Exception {
+                       counter++;
+                       output.collect(element);
+               }
+
+               @Override
+               public void initializeState(StateInitializationContext context) 
throws Exception {
+                       super.initializeState(context);
+
+                       counterState = context
+                               .getKeyedStateStore()
+                               .getState(new ValueStateDescriptor<>(prefix + 
"counter-state", LongSerializer.INSTANCE));
+
+                       // set key manually, so that RocksDBValueState can get 
the serialized composite key.
+                       setCurrentKey("10");
+
+                       if (context.isRestored()) {
+                               counter =  counterState.value();
+                               assertTrue(snapshotOutData.equals(counter));
+                               counterState.clear();
+                       }
+               }
+
+               @Override
+               public void snapshotState(StateSnapshotContext context) throws 
Exception {
+                       counterState.update(counter);
+                       snapshotOutData = counter;
+               }
+       }
+}

Reply via email to