This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e177ba9bac042d142a8efa989a0b6b7bb2c122cf
Author: Yu Li <l...@apache.org>
AuthorDate: Fri Mar 1 04:52:14 2019 +0100

    [FLINK-11804] [State Backends] Make sure the CloseableRegistry used in 
backend builder is registered with task
    
    We need to make sure each stream constructed in restore could also be 
closed in case of task cancel,
    for example the data input stream opened for serDe during restore.
    
    Also removed close of CloseableRegistry in RocksDBKeyedStateBackendBuilder.
    
    (cherry picked from commit eada52be5194a018a41e7ea51ea86e0273df2073)
---
 .../flink/streaming/tests/StubStateBackend.java     |  9 +++++++--
 .../KVStateRequestSerializerRocksDBTest.java        |  7 +++++--
 .../network/KvStateRequestSerializerTest.java       |  9 +++++----
 .../network/KvStateServerHandlerTest.java           |  4 +++-
 .../state/AbstractKeyedStateBackendBuilder.java     |  6 +++++-
 .../flink/runtime/state/AbstractStateBackend.java   |  4 +++-
 .../apache/flink/runtime/state/StateBackend.java    | 10 +++++++---
 .../runtime/state/filesystem/FsStateBackend.java    |  7 +++++--
 .../runtime/state/heap/HeapKeyedStateBackend.java   | 21 +++++++++++----------
 .../runtime/state/memory/MemoryStateBackend.java    |  8 ++++++--
 .../CheckpointSettingsSerializableTest.java         |  5 ++++-
 .../runtime/state/StateSnapshotCompressionTest.java | 13 +++++++++----
 .../state/heap/HeapStateBackendTestBase.java        |  4 +++-
 .../state/ttl/mock/MockKeyedStateBackend.java       |  5 +++--
 .../runtime/state/ttl/mock/MockStateBackend.java    |  8 ++++++--
 .../state/RocksDBKeyedStateBackendBuilder.java      | 21 ++++++++++++---------
 .../streaming/state/RocksDBStateBackend.java        |  7 +++++--
 .../streaming/state/RocksDBStateBackendTest.java    |  9 +++++----
 .../operators/StreamTaskStateInitializerImpl.java   | 18 +++++++++++++++---
 .../StreamTaskStateInitializerImplTest.java         |  6 +++++-
 .../runtime/tasks/StreamTaskTerminationTest.java    |  6 +++++-
 .../runtime/tasks/TestSpyWrapperStateBackend.java   | 10 ++++++++--
 .../test/streaming/runtime/StateBackendITCase.java  |  6 +++++-
 23 files changed, 142 insertions(+), 61 deletions(-)

diff --git 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/StubStateBackend.java
 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/StubStateBackend.java
index 17c51d8..dec4f2d 100644
--- 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/StubStateBackend.java
+++ 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/StubStateBackend.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.tests;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
@@ -32,6 +33,8 @@ import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 
+import javax.annotation.Nonnull;
+
 import java.io.IOException;
 import java.util.Collection;
 
@@ -75,7 +78,8 @@ final class StubStateBackend implements StateBackend {
                TaskKvStateRegistry kvStateRegistry,
                TtlTimeProvider ttlTimeProvider,
                MetricGroup metricGroup,
-               Collection<KeyedStateHandle> stateHandles) throws Exception {
+               @Nonnull Collection<KeyedStateHandle> stateHandles,
+               CloseableRegistry cancelStreamRegistry) throws Exception {
 
                return backend.createKeyedStateBackend(
                        env,
@@ -87,7 +91,8 @@ final class StubStateBackend implements StateBackend {
                        kvStateRegistry,
                        this.ttlTimeProvider,
                        metricGroup,
-                       stateHandles);
+                       stateHandles,
+                       cancelStreamRegistry);
        }
 
        @Override
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
index de86340..3431199 100644
--- 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.contrib.streaming.state.PredefinedOptions;
 import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
 import 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder;
 import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.queryablestate.client.VoidNamespace;
 import org.apache.flink.queryablestate.client.VoidNamespaceSerializer;
@@ -87,7 +88,8 @@ public final class KVStateRequestSerializerRocksDBTest {
                                TtlTimeProvider.DEFAULT,
                                new UnregisteredMetricsGroup(),
                                Collections.emptyList(),
-                               
RocksDBStateBackend.getCompressionDecorator(executionConfig)
+                               
RocksDBStateBackend.getCompressionDecorator(executionConfig),
+                               new CloseableRegistry()
                        ).build();
                longHeapKeyedStateBackend.setCurrentKey(key);
 
@@ -130,7 +132,8 @@ public final class KVStateRequestSerializerRocksDBTest {
                                TtlTimeProvider.DEFAULT,
                                new UnregisteredMetricsGroup(),
                                Collections.emptyList(),
-                               
RocksDBStateBackend.getCompressionDecorator(executionConfig)
+                               
RocksDBStateBackend.getCompressionDecorator(executionConfig),
+                               new CloseableRegistry()
                        ).build();
                longHeapKeyedStateBackend.setCurrentKey(key);
 
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
index d539066..aac3394 100644
--- 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.queryablestate.client.VoidNamespace;
 import org.apache.flink.queryablestate.client.VoidNamespaceSerializer;
 import 
org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
@@ -200,8 +201,8 @@ public class KvStateRequestSerializerTest {
                                new ExecutionConfig(),
                                TestLocalRecoveryConfig.disabled(),
                                new HeapPriorityQueueSetFactory(keyGroupRange, 
keyGroupRange.getNumberOfKeyGroups(), 128),
-                               TtlTimeProvider.DEFAULT
-                       );
+                               TtlTimeProvider.DEFAULT,
+                               new CloseableRegistry());
                longHeapKeyedStateBackend.setCurrentKey(key);
 
                final InternalListState<Long, VoidNamespace, Long> listState = 
longHeapKeyedStateBackend.createInternalState(
@@ -309,8 +310,8 @@ public class KvStateRequestSerializerTest {
                                new ExecutionConfig(),
                                TestLocalRecoveryConfig.disabled(),
                                new HeapPriorityQueueSetFactory(keyGroupRange, 
keyGroupRange.getNumberOfKeyGroups(), 128),
-                               TtlTimeProvider.DEFAULT
-                       );
+                               TtlTimeProvider.DEFAULT,
+                               new CloseableRegistry());
                longHeapKeyedStateBackend.setCurrentKey(key);
 
                final InternalMapState<Long, VoidNamespace, Long, String> 
mapState =
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
index 7edb60e..d38cca8 100644
--- 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.queryablestate.KvStateID;
 import org.apache.flink.queryablestate.client.VoidNamespace;
@@ -771,6 +772,7 @@ public class KvStateServerHandlerTest extends TestLogger {
                        registry.createTaskRegistry(dummyEnv.getJobID(), 
dummyEnv.getJobVertexId()),
                        TtlTimeProvider.DEFAULT,
                        new UnregisteredMetricsGroup(),
-                       null);
+                       Collections.emptyList(),
+                       new CloseableRegistry());
        }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackendBuilder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackendBuilder.java
index a71272e..addb549 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackendBuilder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackendBuilder.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 
@@ -42,6 +43,7 @@ public abstract class AbstractKeyedStateBackendBuilder<K>
        protected final TtlTimeProvider ttlTimeProvider;
        protected final StreamCompressionDecorator keyGroupCompressionDecorator;
        protected final Collection<KeyedStateHandle> restoreStateHandles;
+       protected final CloseableRegistry cancelStreamRegistry;
 
        public AbstractKeyedStateBackendBuilder(
                TaskKvStateRegistry kvStateRegistry,
@@ -52,7 +54,8 @@ public abstract class AbstractKeyedStateBackendBuilder<K>
                ExecutionConfig executionConfig,
                TtlTimeProvider ttlTimeProvider,
                @Nonnull Collection<KeyedStateHandle> stateHandles,
-               StreamCompressionDecorator keyGroupCompressionDecorator) {
+               StreamCompressionDecorator keyGroupCompressionDecorator,
+               CloseableRegistry cancelStreamRegistry) {
                this.kvStateRegistry = kvStateRegistry;
                this.keySerializer = keySerializer;
                this.keySerializerProvider = 
StateSerializerProvider.fromNewRegisteredSerializer(keySerializer);
@@ -63,5 +66,6 @@ public abstract class AbstractKeyedStateBackendBuilder<K>
                this.ttlTimeProvider = ttlTimeProvider;
                this.keyGroupCompressionDecorator = 
keyGroupCompressionDecorator;
                this.restoreStateHandles = stateHandles;
+               this.cancelStreamRegistry = cancelStreamRegistry;
        }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
index 845a865..3ebf09b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.state;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
@@ -56,7 +57,8 @@ public abstract class AbstractStateBackend implements 
StateBackend, java.io.Seri
                TaskKvStateRegistry kvStateRegistry,
                TtlTimeProvider ttlTimeProvider,
                MetricGroup metricGroup,
-               @Nonnull Collection<KeyedStateHandle> stateHandles) throws 
IOException;
+               @Nonnull Collection<KeyedStateHandle> stateHandles,
+               CloseableRegistry cancelStreamRegistry) throws IOException;
 
        @Override
        public abstract OperatorStateBackend createOperatorStateBackend(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
index 25ef8d2..5eac82f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.state;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.execution.Environment;
@@ -186,7 +187,8 @@ public interface StateBackend extends java.io.Serializable {
                        kvStateRegistry,
                        ttlTimeProvider,
                        new UnregisteredMetricsGroup(),
-                       Collections.emptyList());
+                       Collections.emptyList(),
+                       new CloseableRegistry());
        }
 
        /**
@@ -220,7 +222,8 @@ public interface StateBackend extends java.io.Serializable {
                        kvStateRegistry,
                        ttlTimeProvider,
                        new UnregisteredMetricsGroup(),
-                       stateHandles);
+                       stateHandles,
+                       new CloseableRegistry());
        }
 
        /**
@@ -245,7 +248,8 @@ public interface StateBackend extends java.io.Serializable {
                TaskKvStateRegistry kvStateRegistry,
                TtlTimeProvider ttlTimeProvider,
                MetricGroup metricGroup,
-               @Nonnull Collection<KeyedStateHandle> stateHandles) throws 
Exception;
+               @Nonnull Collection<KeyedStateHandle> stateHandles,
+               CloseableRegistry cancelStreamRegistry) throws Exception;
        
        /**
         * Creates a new {@link OperatorStateBackend} that can be used for 
storing operator state.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
index 3986ab8..594e526 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.metrics.MetricGroup;
@@ -461,7 +462,8 @@ public class FsStateBackend extends 
AbstractFileStateBackend implements Configur
                TaskKvStateRegistry kvStateRegistry,
                TtlTimeProvider ttlTimeProvider,
                MetricGroup metricGroup,
-               Collection<KeyedStateHandle> stateHandles) {
+               @Nonnull Collection<KeyedStateHandle> stateHandles,
+               CloseableRegistry cancelStreamRegistry) {
 
                TaskStateManager taskStateManager = env.getTaskStateManager();
                LocalRecoveryConfig localRecoveryConfig = 
taskStateManager.createLocalRecoveryConfig();
@@ -478,7 +480,8 @@ public class FsStateBackend extends 
AbstractFileStateBackend implements Configur
                                env.getExecutionConfig(),
                                localRecoveryConfig,
                                priorityQueueSetFactory,
-                               ttlTimeProvider);
+                               ttlTimeProvider,
+                       cancelStreamRegistry);
        }
 
        @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 12d7b26..7614a55 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -140,16 +140,17 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
        private final HeapPriorityQueueSetFactory priorityQueueSetFactory;
 
        public HeapKeyedStateBackend(
-                       TaskKvStateRegistry kvStateRegistry,
-                       TypeSerializer<K> keySerializer,
-                       ClassLoader userCodeClassLoader,
-                       int numberOfKeyGroups,
-                       KeyGroupRange keyGroupRange,
-                       boolean asynchronousSnapshots,
-                       ExecutionConfig executionConfig,
-                       LocalRecoveryConfig localRecoveryConfig,
-                       HeapPriorityQueueSetFactory priorityQueueSetFactory,
-                       TtlTimeProvider ttlTimeProvider) {
+               TaskKvStateRegistry kvStateRegistry,
+               TypeSerializer<K> keySerializer,
+               ClassLoader userCodeClassLoader,
+               int numberOfKeyGroups,
+               KeyGroupRange keyGroupRange,
+               boolean asynchronousSnapshots,
+               ExecutionConfig executionConfig,
+               LocalRecoveryConfig localRecoveryConfig,
+               HeapPriorityQueueSetFactory priorityQueueSetFactory,
+               TtlTimeProvider ttlTimeProvider,
+               CloseableRegistry cancelStreamRegistry) {
 
                super(kvStateRegistry, keySerializer, userCodeClassLoader,
                        numberOfKeyGroups, keyGroupRange, executionConfig, 
ttlTimeProvider, new CloseableRegistry());
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
index ffb7d7a..6338c53 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.execution.Environment;
@@ -41,6 +42,7 @@ import 
org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
 import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.util.TernaryBoolean;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
@@ -316,7 +318,8 @@ public class MemoryStateBackend extends 
AbstractFileStateBackend implements Conf
                TaskKvStateRegistry kvStateRegistry,
                TtlTimeProvider ttlTimeProvider,
                MetricGroup metricGroup,
-               Collection<KeyedStateHandle> stateHandles) {
+               @Nonnull Collection<KeyedStateHandle> stateHandles,
+               CloseableRegistry cancelStreamRegistry) {
 
                TaskStateManager taskStateManager = env.getTaskStateManager();
                HeapPriorityQueueSetFactory priorityQueueSetFactory =
@@ -331,7 +334,8 @@ public class MemoryStateBackend extends 
AbstractFileStateBackend implements Conf
                                env.getExecutionConfig(),
                                taskStateManager.createLocalRecoveryConfig(),
                                priorityQueueSetFactory,
-                               ttlTimeProvider);
+                               ttlTimeProvider,
+                       cancelStreamRegistry);
        }
 
        // 
------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
index a5a2b5e..cc4436d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
@@ -50,6 +51,7 @@ import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
+import javax.annotation.Nonnull;
 import java.io.IOException;
 import java.io.Serializable;
 import java.net.URL;
@@ -175,7 +177,8 @@ public class CheckpointSettingsSerializableTest extends 
TestLogger {
                        TaskKvStateRegistry kvStateRegistry,
                        TtlTimeProvider ttlTimeProvider,
                        MetricGroup metricGroup,
-                       Collection<KeyedStateHandle> stateHandles) throws 
Exception {
+                       @Nonnull Collection<KeyedStateHandle> stateHandles,
+                       CloseableRegistry cancelStreamRegistry) throws 
Exception {
                        throw new UnsupportedOperationException();
                }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
index 355387d..64814c7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.state;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.StateObjectCollection;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
@@ -57,7 +58,8 @@ public class StateSnapshotCompressionTest extends TestLogger {
                        executionConfig,
                        TestLocalRecoveryConfig.disabled(),
                        mock(HeapPriorityQueueSetFactory.class),
-                       TtlTimeProvider.DEFAULT);
+                       TtlTimeProvider.DEFAULT,
+                       new CloseableRegistry());
 
                try {
                        Assert.assertTrue(
@@ -81,7 +83,8 @@ public class StateSnapshotCompressionTest extends TestLogger {
                        executionConfig,
                        TestLocalRecoveryConfig.disabled(),
                        mock(HeapPriorityQueueSetFactory.class),
-                       TtlTimeProvider.DEFAULT);
+                       TtlTimeProvider.DEFAULT,
+                       new CloseableRegistry());
 
                try {
                        Assert.assertTrue(
@@ -123,7 +126,8 @@ public class StateSnapshotCompressionTest extends 
TestLogger {
                        executionConfig,
                        TestLocalRecoveryConfig.disabled(),
                        mock(HeapPriorityQueueSetFactory.class),
-                       TtlTimeProvider.DEFAULT);
+                       TtlTimeProvider.DEFAULT,
+                       new CloseableRegistry());
 
                try {
 
@@ -166,7 +170,8 @@ public class StateSnapshotCompressionTest extends 
TestLogger {
                        executionConfig,
                        TestLocalRecoveryConfig.disabled(),
                        mock(HeapPriorityQueueSetFactory.class),
-                       TtlTimeProvider.DEFAULT);
+                       TtlTimeProvider.DEFAULT,
+                       new CloseableRegistry());
                try {
 
                        
stateBackend.restore(StateObjectCollection.singleton(stateHandle));
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java
index 0eddf3c..efcb727 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.state.heap;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
@@ -63,6 +64,7 @@ public abstract class HeapStateBackendTestBase {
                        new ExecutionConfig(),
                        TestLocalRecoveryConfig.disabled(),
                        new HeapPriorityQueueSetFactory(keyGroupRange, 
numKeyGroups, 128),
-                       TtlTimeProvider.DEFAULT);
+                       TtlTimeProvider.DEFAULT,
+                       new CloseableRegistry());
        }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
index 697bf66..bc0303b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
@@ -99,9 +99,10 @@ public class MockKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                KeyGroupRange keyGroupRange,
                ExecutionConfig executionConfig,
                TtlTimeProvider ttlTimeProvider,
-               MetricGroup operatorMetricGroup) {
+               MetricGroup operatorMetricGroup,
+               CloseableRegistry cancelStreamRegistry) {
                super(kvStateRegistry, keySerializer, userCodeClassLoader,
-                       numberOfKeyGroups, keyGroupRange, executionConfig, 
ttlTimeProvider, new CloseableRegistry());
+                       numberOfKeyGroups, keyGroupRange, executionConfig, 
ttlTimeProvider, cancelStreamRegistry);
        }
 
        @Override
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockStateBackend.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockStateBackend.java
index f2bd790..3a0bb1b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockStateBackend.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockStateBackend.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.state.ttl.mock;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -39,6 +40,7 @@ import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import java.util.Collection;
 
@@ -123,7 +125,8 @@ public class MockStateBackend extends AbstractStateBackend {
                TaskKvStateRegistry kvStateRegistry,
                TtlTimeProvider ttlTimeProvider,
                MetricGroup metricGroup,
-               Collection<KeyedStateHandle> stateHandles) {
+               @Nonnull Collection<KeyedStateHandle> stateHandles,
+               CloseableRegistry cancelStreamRegistry) {
                return new MockKeyedStateBackend<>(
                        new KvStateRegistry().createTaskRegistry(jobID, new 
JobVertexID()),
                        keySerializer,
@@ -132,7 +135,8 @@ public class MockStateBackend extends AbstractStateBackend {
                        keyGroupRange,
                        env.getExecutionConfig(),
                        ttlTimeProvider,
-                       metricGroup);
+                       metricGroup,
+                       cancelStreamRegistry);
        }
 
        @Override
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
index eb5bccc..3f245d0 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
@@ -128,7 +128,8 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
                TtlTimeProvider ttlTimeProvider,
                MetricGroup metricGroup,
                @Nonnull Collection<KeyedStateHandle> stateHandles,
-               StreamCompressionDecorator keyGroupCompressionDecorator) {
+               StreamCompressionDecorator keyGroupCompressionDecorator,
+               CloseableRegistry cancelStreamRegistry) {
 
                super(
                        kvStateRegistry,
@@ -139,8 +140,8 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
                        executionConfig,
                        ttlTimeProvider,
                        stateHandles,
-                       keyGroupCompressionDecorator
-               );
+                       keyGroupCompressionDecorator,
+                       cancelStreamRegistry);
 
                this.operatorIdentifier = operatorIdentifier;
                this.priorityQueueStateType = priorityQueueStateType;
@@ -175,7 +176,8 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
                @Nonnull Collection<KeyedStateHandle> stateHandles,
                StreamCompressionDecorator keyGroupCompressionDecorator,
                RocksDB injectedTestDB,
-               ColumnFamilyHandle injectedDefaultColumnFamilyHandle) {
+               ColumnFamilyHandle injectedDefaultColumnFamilyHandle,
+               CloseableRegistry cancelStreamRegistry) {
                this(
                        operatorIdentifier,
                        userCodeClassLoader,
@@ -192,7 +194,8 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
                        ttlTimeProvider,
                        metricGroup,
                        stateHandles,
-                       keyGroupCompressionDecorator
+                       keyGroupCompressionDecorator,
+                       cancelStreamRegistry
                );
                this.injectedTestDB = injectedTestDB;
                this.injectedDefaultColumnFamilyHandle = 
injectedDefaultColumnFamilyHandle;
@@ -232,7 +235,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
                RocksDBWriteBatchWrapper writeBatchWrapper = null;
                ColumnFamilyHandle defaultColumnFamilyHandle = null;
                RocksDBNativeMetricMonitor nativeMetricMonitor = null;
-               CloseableRegistry cancelStreamRegistry = new 
CloseableRegistry();
+               CloseableRegistry cancelStreamRegistryForBackend = new 
CloseableRegistry();
                //The write options to use in the states. We disable write 
ahead logging.
                WriteOptions writeOptions = new 
WriteOptions().setDisableWAL(true);
                LinkedHashMap<String, 
RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation = new 
LinkedHashMap<>();
@@ -281,7 +284,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
                                keyGroupPrefixBytes,
                                32);
                        // init snapshot strategy after db is assured to be 
initialized
-                       snapshotStrategy = 
initializeSavepointAndCheckpointStrategies(cancelStreamRegistry, 
rocksDBResourceGuard,
+                       snapshotStrategy = 
initializeSavepointAndCheckpointStrategies(cancelStreamRegistryForBackend, 
rocksDBResourceGuard,
                                kvStateInformation, keyGroupPrefixBytes, db, 
backendUID, materializedSstFiles, lastCompletedCheckpointId);
                        // init priority queue factory
                        priorityQueueFactory = 
initPriorityQueueFactory(keyGroupPrefixBytes, kvStateInformation, db,
@@ -289,7 +292,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
                } catch (Throwable e) {
                        // Do clean up
                        List<ColumnFamilyOptions> columnFamilyOptions = new 
ArrayList<>(kvStateInformation.values().size());
-                       IOUtils.closeQuietly(cancelStreamRegistry);
+                       IOUtils.closeQuietly(cancelStreamRegistryForBackend);
                        IOUtils.closeQuietly(writeBatchWrapper);
                        
RocksDBOperationUtils.addColumnFamilyOptionsToCloseLater(columnFamilyOptions, 
defaultColumnFamilyHandle);
                        IOUtils.closeQuietly(defaultColumnFamilyHandle);
@@ -336,7 +339,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
                        db,
                        kvStateInformation,
                        keyGroupPrefixBytes,
-                       cancelStreamRegistry,
+                       cancelStreamRegistryForBackend,
                        this.keyGroupCompressionDecorator,
                        rocksDBResourceGuard,
                        snapshotStrategy.checkpointSnapshotStrategy,
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index a1bc0c4..a4251d9 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.execution.Environment;
@@ -473,7 +474,8 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
                TaskKvStateRegistry kvStateRegistry,
                TtlTimeProvider ttlTimeProvider,
                MetricGroup metricGroup,
-               @Nonnull Collection<KeyedStateHandle> stateHandles) throws 
IOException {
+               @Nonnull Collection<KeyedStateHandle> stateHandles,
+               CloseableRegistry cancelStreamRegistry) throws IOException {
 
                // first, make sure that the RocksDB JNI library is loaded
                // we do this explicitly here to have better error handling
@@ -510,7 +512,8 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
                        ttlTimeProvider,
                        metricGroup,
                        stateHandles,
-                       keyGroupCompressionDecorator
+                       keyGroupCompressionDecorator,
+                       cancelStreamRegistry
                
).setEnableIncrementalCheckpointing(isIncrementalCheckpointsEnabled())
                        
.setEnableTtlCompactionFilter(isTtlCompactionFilterEnabled())
                        
.setNumberOfTransferingThreads(getNumberOfTransferingThreads())
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
index 9bb3dc0..ed3f6f6 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
@@ -214,8 +215,8 @@ public class RocksDBStateBackendTest extends 
StateBackendTestBase<RocksDBStateBa
                        Collections.emptyList(),
                        
RocksDBStateBackend.getCompressionDecorator(env.getExecutionConfig()),
                        spy(db),
-                       defaultCFHandle
-               ).build();
+                       defaultCFHandle,
+                       new CloseableRegistry()).build();
 
                testState1 = keyedStateBackend.getPartitionedState(
                                VoidNamespace.INSTANCE,
@@ -291,8 +292,8 @@ public class RocksDBStateBackendTest extends 
StateBackendTestBase<RocksDBStateBa
                                Collections.emptyList(),
                                
RocksDBStateBackend.getCompressionDecorator(executionConfig),
                                db,
-                               defaultCFHandle
-                       ).build();
+                               defaultCFHandle,
+                               new CloseableRegistry()).build();
                        ValueStateDescriptor<String> stubState1 =
                                new ValueStateDescriptor<>("StubState-1", 
StringSerializer.INSTANCE);
                        test.createInternalState(StringSerializer.INSTANCE, 
stubState1);
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
index 0ed2330..768a4f5 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
@@ -266,6 +266,11 @@ public class StreamTaskStateInitializerImpl implements 
StreamTaskStateInitialize
                        taskInfo.getNumberOfParallelSubtasks(),
                        taskInfo.getIndexOfThisSubtask());
 
+               // Now restore processing is included in backend 
building/constructing process, so we need to make sure
+               // each stream constructed in restore could also be closed in 
case of task cancel, for example the data
+               // input stream opened for serDe during restore.
+               CloseableRegistry cancelStreamRegistryForRestore = new 
CloseableRegistry();
+               
backendCloseableRegistry.registerCloseable(cancelStreamRegistryForRestore);
                BackendRestorerProcedure<AbstractKeyedStateBackend<K>, 
KeyedStateHandle> backendRestorer =
                        new BackendRestorerProcedure<>(
                                (stateHandles) -> 
stateBackend.createKeyedStateBackend(
@@ -278,12 +283,19 @@ public class StreamTaskStateInitializerImpl implements 
StreamTaskStateInitialize
                                        environment.getTaskKvStateRegistry(),
                                        TtlTimeProvider.DEFAULT,
                                        metricGroup,
-                                       stateHandles),
+                                       stateHandles,
+                                       cancelStreamRegistryForRestore),
                                backendCloseableRegistry,
                                logDescription);
 
-               return backendRestorer.createAndRestore(
-                       
prioritizedOperatorSubtaskStates.getPrioritizedManagedKeyedState());
+               try {
+                       return backendRestorer.createAndRestore(
+                               
prioritizedOperatorSubtaskStates.getPrioritizedManagedKeyedState());
+               } finally {
+                       if 
(backendCloseableRegistry.unregisterCloseable(cancelStreamRegistryForRestore)) {
+                               
IOUtils.closeQuietly(cancelStreamRegistryForRestore);
+                       }
+               }
        }
 
        protected CloseableIterable<StatePartitionStreamProvider> 
rawOperatorStateInputs(
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java
index c04a17f..b66a7b4 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java
@@ -58,6 +58,8 @@ import org.apache.flink.util.CloseableIterable;
 import org.junit.Assert;
 import org.junit.Test;
 
+import javax.annotation.Nonnull;
+
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.Collection;
@@ -146,7 +148,9 @@ public class StreamTaskStateInitializerImplTest {
                                int numberOfKeyGroups, KeyGroupRange 
keyGroupRange,
                                TaskKvStateRegistry kvStateRegistry,
                                TtlTimeProvider ttlTimeProvider,
-                               MetricGroup metricGroup, 
Collection<KeyedStateHandle> stateHandles) throws Exception {
+                               MetricGroup metricGroup,
+                               @Nonnull Collection<KeyedStateHandle> 
stateHandles,
+                               CloseableRegistry cancelStreamRegistry) throws 
Exception {
                                return mock(AbstractKeyedStateBackend.class);
                        }
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
index c4ca305..e448bad 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.blob.BlobCacheService;
@@ -82,6 +83,8 @@ import org.apache.flink.util.TestLogger;
 import org.junit.Assert;
 import org.junit.Test;
 
+import javax.annotation.Nonnull;
+
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
@@ -270,7 +273,8 @@ public class StreamTaskTerminationTest extends TestLogger {
                        TaskKvStateRegistry kvStateRegistry,
                        TtlTimeProvider ttlTimeProvider,
                        MetricGroup metricGroup,
-                       Collection<KeyedStateHandle> stateHandles) {
+                       @Nonnull Collection<KeyedStateHandle> stateHandles,
+                       CloseableRegistry cancelStreamRegistry) {
                        return null;
                }
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestSpyWrapperStateBackend.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestSpyWrapperStateBackend.java
index 9207594..4160260 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestSpyWrapperStateBackend.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestSpyWrapperStateBackend.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
@@ -33,6 +34,8 @@ import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nonnull;
+
 import java.io.IOException;
 import java.util.Collection;
 
@@ -59,7 +62,9 @@ public class TestSpyWrapperStateBackend extends 
AbstractStateBackend {
                        KeyGroupRange keyGroupRange,
                        TaskKvStateRegistry kvStateRegistry,
                        TtlTimeProvider ttlTimeProvider,
-                       MetricGroup metricGroup, Collection<KeyedStateHandle> 
stateHandles) throws IOException {
+                       MetricGroup metricGroup,
+                       @Nonnull Collection<KeyedStateHandle> stateHandles,
+                       CloseableRegistry cancelStreamRegistry) throws 
IOException {
                        return spy(delegate.createKeyedStateBackend(
                                env,
                                jobID,
@@ -70,7 +75,8 @@ public class TestSpyWrapperStateBackend extends 
AbstractStateBackend {
                                kvStateRegistry,
                                ttlTimeProvider,
                                metricGroup,
-                               null));
+                               stateHandles,
+                               cancelStreamRegistry));
                }
 
                @Override
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
index a87998f..4c71e14 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.execution.Environment;
@@ -44,6 +45,8 @@ import org.apache.flink.util.ExceptionUtils;
 
 import org.junit.Test;
 
+import javax.annotation.Nonnull;
+
 import java.io.IOException;
 import java.util.Collection;
 
@@ -118,7 +121,8 @@ public class StateBackendITCase extends AbstractTestBase {
                        TaskKvStateRegistry kvStateRegistry,
                        TtlTimeProvider ttlTimeProvider,
                        MetricGroup metricGroup,
-                       Collection<KeyedStateHandle> stateHandles) throws 
IOException {
+                       @Nonnull Collection<KeyedStateHandle> stateHandles,
+                       CloseableRegistry cancelStreamRegistry) throws 
IOException {
                        throw new SuccessException();
                }
 

Reply via email to