This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch release-1.8 in repository https://gitbox.apache.org/repos/asf/flink.git
commit e177ba9bac042d142a8efa989a0b6b7bb2c122cf Author: Yu Li <l...@apache.org> AuthorDate: Fri Mar 1 04:52:14 2019 +0100 [FLINK-11804] [State Backends] Make sure the CloseableRegistry used in backend builder is registered with task We need to make sure each stream constructed in restore could also be closed in case of task cancel, for example the data input stream opened for serDe during restore. Also removed close of CloseableRegistry in RocksDBKeyedStateBackendBuilder. (cherry picked from commit eada52be5194a018a41e7ea51ea86e0273df2073) --- .../flink/streaming/tests/StubStateBackend.java | 9 +++++++-- .../KVStateRequestSerializerRocksDBTest.java | 7 +++++-- .../network/KvStateRequestSerializerTest.java | 9 +++++---- .../network/KvStateServerHandlerTest.java | 4 +++- .../state/AbstractKeyedStateBackendBuilder.java | 6 +++++- .../flink/runtime/state/AbstractStateBackend.java | 4 +++- .../apache/flink/runtime/state/StateBackend.java | 10 +++++++--- .../runtime/state/filesystem/FsStateBackend.java | 7 +++++-- .../runtime/state/heap/HeapKeyedStateBackend.java | 21 +++++++++++---------- .../runtime/state/memory/MemoryStateBackend.java | 8 ++++++-- .../CheckpointSettingsSerializableTest.java | 5 ++++- .../runtime/state/StateSnapshotCompressionTest.java | 13 +++++++++---- .../state/heap/HeapStateBackendTestBase.java | 4 +++- .../state/ttl/mock/MockKeyedStateBackend.java | 5 +++-- .../runtime/state/ttl/mock/MockStateBackend.java | 8 ++++++-- .../state/RocksDBKeyedStateBackendBuilder.java | 21 ++++++++++++--------- .../streaming/state/RocksDBStateBackend.java | 7 +++++-- .../streaming/state/RocksDBStateBackendTest.java | 9 +++++---- .../operators/StreamTaskStateInitializerImpl.java | 18 +++++++++++++++--- .../StreamTaskStateInitializerImplTest.java | 6 +++++- .../runtime/tasks/StreamTaskTerminationTest.java | 6 +++++- .../runtime/tasks/TestSpyWrapperStateBackend.java | 10 ++++++++-- .../test/streaming/runtime/StateBackendITCase.java | 6 +++++- 23 files changed, 142 insertions(+), 61 deletions(-) diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/StubStateBackend.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/StubStateBackend.java index 17c51d8..dec4f2d 100644 --- a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/StubStateBackend.java +++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/StubStateBackend.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.tests; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.query.TaskKvStateRegistry; @@ -32,6 +33,8 @@ import org.apache.flink.runtime.state.OperatorStateBackend; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.ttl.TtlTimeProvider; +import javax.annotation.Nonnull; + import java.io.IOException; import java.util.Collection; @@ -75,7 +78,8 @@ final class StubStateBackend implements StateBackend { TaskKvStateRegistry kvStateRegistry, TtlTimeProvider ttlTimeProvider, MetricGroup metricGroup, - Collection<KeyedStateHandle> stateHandles) throws Exception { + @Nonnull Collection<KeyedStateHandle> stateHandles, + CloseableRegistry cancelStreamRegistry) throws Exception { return backend.createKeyedStateBackend( env, @@ -87,7 +91,8 @@ final class StubStateBackend implements StateBackend { kvStateRegistry, this.ttlTimeProvider, metricGroup, - stateHandles); + stateHandles, + cancelStreamRegistry); } @Override diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java index de86340..3431199 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java @@ -27,6 +27,7 @@ import org.apache.flink.contrib.streaming.state.PredefinedOptions; import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend; import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.queryablestate.client.VoidNamespace; import org.apache.flink.queryablestate.client.VoidNamespaceSerializer; @@ -87,7 +88,8 @@ public final class KVStateRequestSerializerRocksDBTest { TtlTimeProvider.DEFAULT, new UnregisteredMetricsGroup(), Collections.emptyList(), - RocksDBStateBackend.getCompressionDecorator(executionConfig) + RocksDBStateBackend.getCompressionDecorator(executionConfig), + new CloseableRegistry() ).build(); longHeapKeyedStateBackend.setCurrentKey(key); @@ -130,7 +132,8 @@ public final class KVStateRequestSerializerRocksDBTest { TtlTimeProvider.DEFAULT, new UnregisteredMetricsGroup(), Collections.emptyList(), - RocksDBStateBackend.getCompressionDecorator(executionConfig) + RocksDBStateBackend.getCompressionDecorator(executionConfig), + new CloseableRegistry() ).build(); longHeapKeyedStateBackend.setCurrentKey(key); diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java index d539066..aac3394 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java @@ -25,6 +25,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.queryablestate.client.VoidNamespace; import org.apache.flink.queryablestate.client.VoidNamespaceSerializer; import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer; @@ -200,8 +201,8 @@ public class KvStateRequestSerializerTest { new ExecutionConfig(), TestLocalRecoveryConfig.disabled(), new HeapPriorityQueueSetFactory(keyGroupRange, keyGroupRange.getNumberOfKeyGroups(), 128), - TtlTimeProvider.DEFAULT - ); + TtlTimeProvider.DEFAULT, + new CloseableRegistry()); longHeapKeyedStateBackend.setCurrentKey(key); final InternalListState<Long, VoidNamespace, Long> listState = longHeapKeyedStateBackend.createInternalState( @@ -309,8 +310,8 @@ public class KvStateRequestSerializerTest { new ExecutionConfig(), TestLocalRecoveryConfig.disabled(), new HeapPriorityQueueSetFactory(keyGroupRange, keyGroupRange.getNumberOfKeyGroups(), 128), - TtlTimeProvider.DEFAULT - ); + TtlTimeProvider.DEFAULT, + new CloseableRegistry()); longHeapKeyedStateBackend.setCurrentKey(key); final InternalMapState<Long, VoidNamespace, Long, String> mapState = diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java index 7edb60e..d38cca8 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java @@ -26,6 +26,7 @@ import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; +import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.queryablestate.KvStateID; import org.apache.flink.queryablestate.client.VoidNamespace; @@ -771,6 +772,7 @@ public class KvStateServerHandlerTest extends TestLogger { registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId()), TtlTimeProvider.DEFAULT, new UnregisteredMetricsGroup(), - null); + Collections.emptyList(), + new CloseableRegistry()); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackendBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackendBuilder.java index a71272e..addb549 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackendBuilder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackendBuilder.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.state; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.ttl.TtlTimeProvider; @@ -42,6 +43,7 @@ public abstract class AbstractKeyedStateBackendBuilder<K> protected final TtlTimeProvider ttlTimeProvider; protected final StreamCompressionDecorator keyGroupCompressionDecorator; protected final Collection<KeyedStateHandle> restoreStateHandles; + protected final CloseableRegistry cancelStreamRegistry; public AbstractKeyedStateBackendBuilder( TaskKvStateRegistry kvStateRegistry, @@ -52,7 +54,8 @@ public abstract class AbstractKeyedStateBackendBuilder<K> ExecutionConfig executionConfig, TtlTimeProvider ttlTimeProvider, @Nonnull Collection<KeyedStateHandle> stateHandles, - StreamCompressionDecorator keyGroupCompressionDecorator) { + StreamCompressionDecorator keyGroupCompressionDecorator, + CloseableRegistry cancelStreamRegistry) { this.kvStateRegistry = kvStateRegistry; this.keySerializer = keySerializer; this.keySerializerProvider = StateSerializerProvider.fromNewRegisteredSerializer(keySerializer); @@ -63,5 +66,6 @@ public abstract class AbstractKeyedStateBackendBuilder<K> this.ttlTimeProvider = ttlTimeProvider; this.keyGroupCompressionDecorator = keyGroupCompressionDecorator; this.restoreStateHandles = stateHandles; + this.cancelStreamRegistry = cancelStreamRegistry; } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java index 845a865..3ebf09b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.state; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.query.TaskKvStateRegistry; @@ -56,7 +57,8 @@ public abstract class AbstractStateBackend implements StateBackend, java.io.Seri TaskKvStateRegistry kvStateRegistry, TtlTimeProvider ttlTimeProvider, MetricGroup metricGroup, - @Nonnull Collection<KeyedStateHandle> stateHandles) throws IOException; + @Nonnull Collection<KeyedStateHandle> stateHandles, + CloseableRegistry cancelStreamRegistry) throws IOException; @Override public abstract OperatorStateBackend createOperatorStateBackend( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java index 25ef8d2..5eac82f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.state; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.execution.Environment; @@ -186,7 +187,8 @@ public interface StateBackend extends java.io.Serializable { kvStateRegistry, ttlTimeProvider, new UnregisteredMetricsGroup(), - Collections.emptyList()); + Collections.emptyList(), + new CloseableRegistry()); } /** @@ -220,7 +222,8 @@ public interface StateBackend extends java.io.Serializable { kvStateRegistry, ttlTimeProvider, new UnregisteredMetricsGroup(), - stateHandles); + stateHandles, + new CloseableRegistry()); } /** @@ -245,7 +248,8 @@ public interface StateBackend extends java.io.Serializable { TaskKvStateRegistry kvStateRegistry, TtlTimeProvider ttlTimeProvider, MetricGroup metricGroup, - @Nonnull Collection<KeyedStateHandle> stateHandles) throws Exception; + @Nonnull Collection<KeyedStateHandle> stateHandles, + CloseableRegistry cancelStreamRegistry) throws Exception; /** * Creates a new {@link OperatorStateBackend} that can be used for storing operator state. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java index 3986ab8..594e526 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.metrics.MetricGroup; @@ -461,7 +462,8 @@ public class FsStateBackend extends AbstractFileStateBackend implements Configur TaskKvStateRegistry kvStateRegistry, TtlTimeProvider ttlTimeProvider, MetricGroup metricGroup, - Collection<KeyedStateHandle> stateHandles) { + @Nonnull Collection<KeyedStateHandle> stateHandles, + CloseableRegistry cancelStreamRegistry) { TaskStateManager taskStateManager = env.getTaskStateManager(); LocalRecoveryConfig localRecoveryConfig = taskStateManager.createLocalRecoveryConfig(); @@ -478,7 +480,8 @@ public class FsStateBackend extends AbstractFileStateBackend implements Configur env.getExecutionConfig(), localRecoveryConfig, priorityQueueSetFactory, - ttlTimeProvider); + ttlTimeProvider, + cancelStreamRegistry); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java index 12d7b26..7614a55 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java @@ -140,16 +140,17 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { private final HeapPriorityQueueSetFactory priorityQueueSetFactory; public HeapKeyedStateBackend( - TaskKvStateRegistry kvStateRegistry, - TypeSerializer<K> keySerializer, - ClassLoader userCodeClassLoader, - int numberOfKeyGroups, - KeyGroupRange keyGroupRange, - boolean asynchronousSnapshots, - ExecutionConfig executionConfig, - LocalRecoveryConfig localRecoveryConfig, - HeapPriorityQueueSetFactory priorityQueueSetFactory, - TtlTimeProvider ttlTimeProvider) { + TaskKvStateRegistry kvStateRegistry, + TypeSerializer<K> keySerializer, + ClassLoader userCodeClassLoader, + int numberOfKeyGroups, + KeyGroupRange keyGroupRange, + boolean asynchronousSnapshots, + ExecutionConfig executionConfig, + LocalRecoveryConfig localRecoveryConfig, + HeapPriorityQueueSetFactory priorityQueueSetFactory, + TtlTimeProvider ttlTimeProvider, + CloseableRegistry cancelStreamRegistry) { super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange, executionConfig, ttlTimeProvider, new CloseableRegistry()); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java index ffb7d7a..6338c53 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.fs.Path; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.execution.Environment; @@ -41,6 +42,7 @@ import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory; import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.apache.flink.util.TernaryBoolean; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.io.IOException; @@ -316,7 +318,8 @@ public class MemoryStateBackend extends AbstractFileStateBackend implements Conf TaskKvStateRegistry kvStateRegistry, TtlTimeProvider ttlTimeProvider, MetricGroup metricGroup, - Collection<KeyedStateHandle> stateHandles) { + @Nonnull Collection<KeyedStateHandle> stateHandles, + CloseableRegistry cancelStreamRegistry) { TaskStateManager taskStateManager = env.getTaskStateManager(); HeapPriorityQueueSetFactory priorityQueueSetFactory = @@ -331,7 +334,8 @@ public class MemoryStateBackend extends AbstractFileStateBackend implements Conf env.getExecutionConfig(), taskStateManager.createLocalRecoveryConfig(), priorityQueueSetFactory, - ttlTimeProvider); + ttlTimeProvider, + cancelStreamRegistry); } // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java index a5a2b5e..cc4436d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; @@ -50,6 +51,7 @@ import org.apache.flink.util.TestLogger; import org.junit.Test; +import javax.annotation.Nonnull; import java.io.IOException; import java.io.Serializable; import java.net.URL; @@ -175,7 +177,8 @@ public class CheckpointSettingsSerializableTest extends TestLogger { TaskKvStateRegistry kvStateRegistry, TtlTimeProvider ttlTimeProvider, MetricGroup metricGroup, - Collection<KeyedStateHandle> stateHandles) throws Exception { + @Nonnull Collection<KeyedStateHandle> stateHandles, + CloseableRegistry cancelStreamRegistry) throws Exception { throw new UnsupportedOperationException(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java index 355387d..64814c7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.state; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.checkpoint.StateObjectCollection; import org.apache.flink.runtime.query.TaskKvStateRegistry; @@ -57,7 +58,8 @@ public class StateSnapshotCompressionTest extends TestLogger { executionConfig, TestLocalRecoveryConfig.disabled(), mock(HeapPriorityQueueSetFactory.class), - TtlTimeProvider.DEFAULT); + TtlTimeProvider.DEFAULT, + new CloseableRegistry()); try { Assert.assertTrue( @@ -81,7 +83,8 @@ public class StateSnapshotCompressionTest extends TestLogger { executionConfig, TestLocalRecoveryConfig.disabled(), mock(HeapPriorityQueueSetFactory.class), - TtlTimeProvider.DEFAULT); + TtlTimeProvider.DEFAULT, + new CloseableRegistry()); try { Assert.assertTrue( @@ -123,7 +126,8 @@ public class StateSnapshotCompressionTest extends TestLogger { executionConfig, TestLocalRecoveryConfig.disabled(), mock(HeapPriorityQueueSetFactory.class), - TtlTimeProvider.DEFAULT); + TtlTimeProvider.DEFAULT, + new CloseableRegistry()); try { @@ -166,7 +170,8 @@ public class StateSnapshotCompressionTest extends TestLogger { executionConfig, TestLocalRecoveryConfig.disabled(), mock(HeapPriorityQueueSetFactory.class), - TtlTimeProvider.DEFAULT); + TtlTimeProvider.DEFAULT, + new CloseableRegistry()); try { stateBackend.restore(StateObjectCollection.singleton(stateHandle)); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java index 0eddf3c..efcb727 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.state.heap; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.TestLocalRecoveryConfig; @@ -63,6 +64,7 @@ public abstract class HeapStateBackendTestBase { new ExecutionConfig(), TestLocalRecoveryConfig.disabled(), new HeapPriorityQueueSetFactory(keyGroupRange, numKeyGroups, 128), - TtlTimeProvider.DEFAULT); + TtlTimeProvider.DEFAULT, + new CloseableRegistry()); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java index 697bf66..bc0303b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java @@ -99,9 +99,10 @@ public class MockKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { KeyGroupRange keyGroupRange, ExecutionConfig executionConfig, TtlTimeProvider ttlTimeProvider, - MetricGroup operatorMetricGroup) { + MetricGroup operatorMetricGroup, + CloseableRegistry cancelStreamRegistry) { super(kvStateRegistry, keySerializer, userCodeClassLoader, - numberOfKeyGroups, keyGroupRange, executionConfig, ttlTimeProvider, new CloseableRegistry()); + numberOfKeyGroups, keyGroupRange, executionConfig, ttlTimeProvider, cancelStreamRegistry); } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockStateBackend.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockStateBackend.java index f2bd790..3a0bb1b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockStateBackend.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockStateBackend.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.state.ttl.mock; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.jobgraph.JobVertexID; @@ -39,6 +40,7 @@ import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.OperatorStateBackend; import org.apache.flink.runtime.state.ttl.TtlTimeProvider; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.util.Collection; @@ -123,7 +125,8 @@ public class MockStateBackend extends AbstractStateBackend { TaskKvStateRegistry kvStateRegistry, TtlTimeProvider ttlTimeProvider, MetricGroup metricGroup, - Collection<KeyedStateHandle> stateHandles) { + @Nonnull Collection<KeyedStateHandle> stateHandles, + CloseableRegistry cancelStreamRegistry) { return new MockKeyedStateBackend<>( new KvStateRegistry().createTaskRegistry(jobID, new JobVertexID()), keySerializer, @@ -132,7 +135,8 @@ public class MockStateBackend extends AbstractStateBackend { keyGroupRange, env.getExecutionConfig(), ttlTimeProvider, - metricGroup); + metricGroup, + cancelStreamRegistry); } @Override diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java index eb5bccc..3f245d0 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java @@ -128,7 +128,8 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken TtlTimeProvider ttlTimeProvider, MetricGroup metricGroup, @Nonnull Collection<KeyedStateHandle> stateHandles, - StreamCompressionDecorator keyGroupCompressionDecorator) { + StreamCompressionDecorator keyGroupCompressionDecorator, + CloseableRegistry cancelStreamRegistry) { super( kvStateRegistry, @@ -139,8 +140,8 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken executionConfig, ttlTimeProvider, stateHandles, - keyGroupCompressionDecorator - ); + keyGroupCompressionDecorator, + cancelStreamRegistry); this.operatorIdentifier = operatorIdentifier; this.priorityQueueStateType = priorityQueueStateType; @@ -175,7 +176,8 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken @Nonnull Collection<KeyedStateHandle> stateHandles, StreamCompressionDecorator keyGroupCompressionDecorator, RocksDB injectedTestDB, - ColumnFamilyHandle injectedDefaultColumnFamilyHandle) { + ColumnFamilyHandle injectedDefaultColumnFamilyHandle, + CloseableRegistry cancelStreamRegistry) { this( operatorIdentifier, userCodeClassLoader, @@ -192,7 +194,8 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken ttlTimeProvider, metricGroup, stateHandles, - keyGroupCompressionDecorator + keyGroupCompressionDecorator, + cancelStreamRegistry ); this.injectedTestDB = injectedTestDB; this.injectedDefaultColumnFamilyHandle = injectedDefaultColumnFamilyHandle; @@ -232,7 +235,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken RocksDBWriteBatchWrapper writeBatchWrapper = null; ColumnFamilyHandle defaultColumnFamilyHandle = null; RocksDBNativeMetricMonitor nativeMetricMonitor = null; - CloseableRegistry cancelStreamRegistry = new CloseableRegistry(); + CloseableRegistry cancelStreamRegistryForBackend = new CloseableRegistry(); //The write options to use in the states. We disable write ahead logging. WriteOptions writeOptions = new WriteOptions().setDisableWAL(true); LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation = new LinkedHashMap<>(); @@ -281,7 +284,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken keyGroupPrefixBytes, 32); // init snapshot strategy after db is assured to be initialized - snapshotStrategy = initializeSavepointAndCheckpointStrategies(cancelStreamRegistry, rocksDBResourceGuard, + snapshotStrategy = initializeSavepointAndCheckpointStrategies(cancelStreamRegistryForBackend, rocksDBResourceGuard, kvStateInformation, keyGroupPrefixBytes, db, backendUID, materializedSstFiles, lastCompletedCheckpointId); // init priority queue factory priorityQueueFactory = initPriorityQueueFactory(keyGroupPrefixBytes, kvStateInformation, db, @@ -289,7 +292,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken } catch (Throwable e) { // Do clean up List<ColumnFamilyOptions> columnFamilyOptions = new ArrayList<>(kvStateInformation.values().size()); - IOUtils.closeQuietly(cancelStreamRegistry); + IOUtils.closeQuietly(cancelStreamRegistryForBackend); IOUtils.closeQuietly(writeBatchWrapper); RocksDBOperationUtils.addColumnFamilyOptionsToCloseLater(columnFamilyOptions, defaultColumnFamilyHandle); IOUtils.closeQuietly(defaultColumnFamilyHandle); @@ -336,7 +339,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken db, kvStateInformation, keyGroupPrefixBytes, - cancelStreamRegistry, + cancelStreamRegistryForBackend, this.keyGroupCompressionDecorator, rocksDBResourceGuard, snapshotStrategy.checkpointSnapshotStrategy, diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java index a1bc0c4..a4251d9 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.fs.Path; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.execution.Environment; @@ -473,7 +474,8 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu TaskKvStateRegistry kvStateRegistry, TtlTimeProvider ttlTimeProvider, MetricGroup metricGroup, - @Nonnull Collection<KeyedStateHandle> stateHandles) throws IOException { + @Nonnull Collection<KeyedStateHandle> stateHandles, + CloseableRegistry cancelStreamRegistry) throws IOException { // first, make sure that the RocksDB JNI library is loaded // we do this explicitly here to have better error handling @@ -510,7 +512,8 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu ttlTimeProvider, metricGroup, stateHandles, - keyGroupCompressionDecorator + keyGroupCompressionDecorator, + cancelStreamRegistry ).setEnableIncrementalCheckpointing(isIncrementalCheckpointsEnabled()) .setEnableTtlCompactionFilter(isTtlCompactionFilterEnabled()) .setNumberOfTransferingThreads(getNumberOfTransferingThreads()) diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java index 9bb3dc0..ed3f6f6 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.checkpoint.CheckpointOptions; @@ -214,8 +215,8 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa Collections.emptyList(), RocksDBStateBackend.getCompressionDecorator(env.getExecutionConfig()), spy(db), - defaultCFHandle - ).build(); + defaultCFHandle, + new CloseableRegistry()).build(); testState1 = keyedStateBackend.getPartitionedState( VoidNamespace.INSTANCE, @@ -291,8 +292,8 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa Collections.emptyList(), RocksDBStateBackend.getCompressionDecorator(executionConfig), db, - defaultCFHandle - ).build(); + defaultCFHandle, + new CloseableRegistry()).build(); ValueStateDescriptor<String> stubState1 = new ValueStateDescriptor<>("StubState-1", StringSerializer.INSTANCE); test.createInternalState(StringSerializer.INSTANCE, stubState1); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java index 0ed2330..768a4f5 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java @@ -266,6 +266,11 @@ public class StreamTaskStateInitializerImpl implements StreamTaskStateInitialize taskInfo.getNumberOfParallelSubtasks(), taskInfo.getIndexOfThisSubtask()); + // Now restore processing is included in backend building/constructing process, so we need to make sure + // each stream constructed in restore could also be closed in case of task cancel, for example the data + // input stream opened for serDe during restore. + CloseableRegistry cancelStreamRegistryForRestore = new CloseableRegistry(); + backendCloseableRegistry.registerCloseable(cancelStreamRegistryForRestore); BackendRestorerProcedure<AbstractKeyedStateBackend<K>, KeyedStateHandle> backendRestorer = new BackendRestorerProcedure<>( (stateHandles) -> stateBackend.createKeyedStateBackend( @@ -278,12 +283,19 @@ public class StreamTaskStateInitializerImpl implements StreamTaskStateInitialize environment.getTaskKvStateRegistry(), TtlTimeProvider.DEFAULT, metricGroup, - stateHandles), + stateHandles, + cancelStreamRegistryForRestore), backendCloseableRegistry, logDescription); - return backendRestorer.createAndRestore( - prioritizedOperatorSubtaskStates.getPrioritizedManagedKeyedState()); + try { + return backendRestorer.createAndRestore( + prioritizedOperatorSubtaskStates.getPrioritizedManagedKeyedState()); + } finally { + if (backendCloseableRegistry.unregisterCloseable(cancelStreamRegistryForRestore)) { + IOUtils.closeQuietly(cancelStreamRegistryForRestore); + } + } } protected CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs( diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java index c04a17f..b66a7b4 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java @@ -58,6 +58,8 @@ import org.apache.flink.util.CloseableIterable; import org.junit.Assert; import org.junit.Test; +import javax.annotation.Nonnull; + import java.io.Closeable; import java.io.IOException; import java.util.Collection; @@ -146,7 +148,9 @@ public class StreamTaskStateInitializerImplTest { int numberOfKeyGroups, KeyGroupRange keyGroupRange, TaskKvStateRegistry kvStateRegistry, TtlTimeProvider ttlTimeProvider, - MetricGroup metricGroup, Collection<KeyedStateHandle> stateHandles) throws Exception { + MetricGroup metricGroup, + @Nonnull Collection<KeyedStateHandle> stateHandles, + CloseableRegistry cancelStreamRegistry) throws Exception { return mock(AbstractKeyedStateBackend.class); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java index c4ca305..e448bad 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.blob.BlobCacheService; @@ -82,6 +83,8 @@ import org.apache.flink.util.TestLogger; import org.junit.Assert; import org.junit.Test; +import javax.annotation.Nonnull; + import java.io.IOException; import java.util.Collection; import java.util.Collections; @@ -270,7 +273,8 @@ public class StreamTaskTerminationTest extends TestLogger { TaskKvStateRegistry kvStateRegistry, TtlTimeProvider ttlTimeProvider, MetricGroup metricGroup, - Collection<KeyedStateHandle> stateHandles) { + @Nonnull Collection<KeyedStateHandle> stateHandles, + CloseableRegistry cancelStreamRegistry) { return null; } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestSpyWrapperStateBackend.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestSpyWrapperStateBackend.java index 9207594..4160260 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestSpyWrapperStateBackend.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestSpyWrapperStateBackend.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.tasks; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.query.TaskKvStateRegistry; @@ -33,6 +34,8 @@ import org.apache.flink.runtime.state.OperatorStateBackend; import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.apache.flink.util.Preconditions; +import javax.annotation.Nonnull; + import java.io.IOException; import java.util.Collection; @@ -59,7 +62,9 @@ public class TestSpyWrapperStateBackend extends AbstractStateBackend { KeyGroupRange keyGroupRange, TaskKvStateRegistry kvStateRegistry, TtlTimeProvider ttlTimeProvider, - MetricGroup metricGroup, Collection<KeyedStateHandle> stateHandles) throws IOException { + MetricGroup metricGroup, + @Nonnull Collection<KeyedStateHandle> stateHandles, + CloseableRegistry cancelStreamRegistry) throws IOException { return spy(delegate.createKeyedStateBackend( env, jobID, @@ -70,7 +75,8 @@ public class TestSpyWrapperStateBackend extends AbstractStateBackend { kvStateRegistry, ttlTimeProvider, metricGroup, - null)); + stateHandles, + cancelStreamRegistry)); } @Override diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java index a87998f..4c71e14 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java @@ -25,6 +25,7 @@ import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.execution.Environment; @@ -44,6 +45,8 @@ import org.apache.flink.util.ExceptionUtils; import org.junit.Test; +import javax.annotation.Nonnull; + import java.io.IOException; import java.util.Collection; @@ -118,7 +121,8 @@ public class StateBackendITCase extends AbstractTestBase { TaskKvStateRegistry kvStateRegistry, TtlTimeProvider ttlTimeProvider, MetricGroup metricGroup, - Collection<KeyedStateHandle> stateHandles) throws IOException { + @Nonnull Collection<KeyedStateHandle> stateHandles, + CloseableRegistry cancelStreamRegistry) throws IOException { throw new SuccessException(); }