This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch release-1.8 in repository https://gitbox.apache.org/repos/asf/flink.git
commit ce2d65d88d28bcd5cf182395d511a239dacfb89d Author: Yu Li <l...@apache.org> AuthorDate: Sat Mar 2 20:00:43 2019 +0100 [FLINK-11796] [State Backends] Remove Snapshotable interface (cherry picked from commit e8daa49a593edc401cd44761b25b1324b11be4a6) --- .../runtime/state/AbstractKeyedStateBackend.java | 3 +- .../runtime/state/DefaultOperatorStateBackend.java | 8 +---- .../flink/runtime/state/OperatorStateBackend.java | 5 ++- .../apache/flink/runtime/state/Snapshotable.java | 41 ---------------------- .../runtime/state/heap/HeapKeyedStateBackend.java | 6 ---- .../state/ttl/mock/MockKeyedStateBackend.java | 7 ---- .../streaming/state/RocksDBKeyedStateBackend.java | 6 ---- .../api/operators/BackendRestorerProcedure.java | 23 ++---------- .../StreamTaskStateInitializerImplTest.java | 6 ---- 9 files changed, 7 insertions(+), 98 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java index 98e21ff..e28aeef 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java @@ -36,7 +36,6 @@ import org.apache.flink.util.Preconditions; import java.io.Closeable; import java.io.IOException; -import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Set; @@ -52,7 +51,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; */ public abstract class AbstractKeyedStateBackend<K> implements KeyedStateBackend<K>, - Snapshotable<SnapshotResult<KeyedStateHandle>, Collection<KeyedStateHandle>>, + SnapshotStrategy<SnapshotResult<KeyedStateHandle>>, Closeable, CheckpointListener { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java index f6a0dba..48c8eb8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java @@ -39,7 +39,6 @@ import javax.annotation.Nonnull; import java.io.IOException; import java.io.Serializable; -import java.util.Collection; import java.util.Map; import java.util.Set; import java.util.concurrent.RunnableFuture; @@ -246,13 +245,8 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { } // ------------------------------------------------------------------------------------------- - // Snapshot and restore + // Snapshot // ------------------------------------------------------------------------------------------- - - public void restore(Collection<OperatorStateHandle> restoreSnapshots) throws Exception { - // all restore work done in builder and nothing to do here - } - @Nonnull @Override public RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateBackend.java index 3cbb351..4fb8024 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateBackend.java @@ -22,16 +22,15 @@ import org.apache.flink.api.common.state.OperatorStateStore; import org.apache.flink.util.Disposable; import java.io.Closeable; -import java.util.Collection; /** * Interface that combines both, the user facing {@link OperatorStateStore} interface and the system interface - * {@link Snapshotable} + * {@link SnapshotStrategy} * */ public interface OperatorStateBackend extends OperatorStateStore, - Snapshotable<SnapshotResult<OperatorStateHandle>, Collection<OperatorStateHandle>>, + SnapshotStrategy<SnapshotResult<OperatorStateHandle>>, Closeable, Disposable { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java deleted file mode 100644 index 1677855..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.state; - -import org.apache.flink.annotation.Internal; - -import javax.annotation.Nullable; - -/** - * Interface for operators that can perform snapshots of their state. - * - * @param <S> Generic type of the state object that is created as handle to snapshots. - * @param <R> Generic type of the state object that used in restore. - */ -@Internal -public interface Snapshotable<S extends StateObject, R> extends SnapshotStrategy<S> { - - /** - * Restores state that was previously snapshotted from the provided parameters. Typically the parameters are state - * handles from which the old state is read. - * - * @param state the old state to restore. - */ - void restore(@Nullable R state) throws Exception; -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java index a3a0c29..d3dee0d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java @@ -62,7 +62,6 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nonnull; import java.io.IOException; -import java.util.Collection; import java.util.List; import java.util.Map; import java.util.concurrent.RunnableFuture; @@ -304,11 +303,6 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { return snapshotRunner; } - @SuppressWarnings("deprecation") - public void restore(Collection<KeyedStateHandle> restoredState) { - // all restore work done in builder and nothing to do here - } - @Override public void notifyCheckpointComplete(long checkpointId) { //Nothing to do diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java index 5803c8e..7cee4c8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java @@ -55,7 +55,6 @@ import org.apache.flink.util.FlinkRuntimeException; import javax.annotation.Nonnull; import java.util.ArrayList; -import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -185,12 +184,6 @@ public class MockKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { SnapshotResult.of(new MockKeyedStateHandle<>(copy(stateValues, stateSnapshotFilters)))); } - @SuppressWarnings("unchecked") - @Override - public void restore(Collection<KeyedStateHandle> state) { - // all restore work done in builder and nothing to do here - } - static <K> Map<String, Map<K, Map<Object, Object>>> copy( Map<String, Map<K, Map<Object, Object>>> stateValues, Map<String, StateSnapshotTransformer<Object>> stateSnapshotFilters) { Map<String, Map<K, Map<Object, Object>>> snapshotStates = new HashMap<>(); diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index cc0726c..c6d3863 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -78,7 +78,6 @@ import javax.annotation.Nonnull; import java.io.File; import java.io.IOException; import java.util.ArrayList; -import java.util.Collection; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -450,11 +449,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } @Override - public void restore(Collection<KeyedStateHandle> restoreState) { - // all restore work done in builder and nothing to do here - } - - @Override public void notifyCheckpointComplete(long completedCheckpointId) throws Exception { if (checkpointSnapshotStrategy != null) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java index 81b2bb7..53e100e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java @@ -19,7 +19,6 @@ package org.apache.flink.streaming.api.operators; import org.apache.flink.core.fs.CloseableRegistry; -import org.apache.flink.runtime.state.Snapshotable; import org.apache.flink.runtime.state.StateObject; import org.apache.flink.util.Disposable; import org.apache.flink.util.ExceptionUtils; @@ -33,7 +32,6 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nonnull; import java.io.Closeable; -import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -50,7 +48,7 @@ import java.util.List; * @param <S> type of the supplied snapshots from which the backend restores. */ public class BackendRestorerProcedure< - T extends Closeable & Disposable & Snapshotable<?, Collection<S>>, + T extends Closeable & Disposable, S extends StateObject> { /** Logger for this class. */ @@ -140,30 +138,15 @@ public class BackendRestorerProcedure< private T attemptCreateAndRestore(Collection<S> restoreState) throws Exception { - // create a new, empty backend. + // create a new backend with necessary initialization. final T backendInstance = instanceSupplier.apply(restoreState); try { // register the backend with the registry to participate in task lifecycle w.r.t. cancellation. backendCloseableRegistry.registerCloseable(backendInstance); - - // attempt to restore from snapshot (or null if no state was checkpointed). - // TODO we could remove this invocation when moving all backend's restore into builder - backendInstance.restore(restoreState); - return backendInstance; } catch (Exception ex) { - - // under failure, we need do close... - if (backendCloseableRegistry.unregisterCloseable(backendInstance)) { - try { - backendInstance.close(); - } catch (IOException closeEx) { - ex = ExceptionUtils.firstOrSuppressed(closeEx, ex); - } - } - - // ... and dispose, e.g. to release native resources. + // dispose the backend, e.g. to release native resources, if failed to register it into registry. try { backendInstance.dispose(); } catch (Exception disposeEx) { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java index 22f6441..430c057 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java @@ -66,10 +66,8 @@ import java.util.Collection; import java.util.Collections; import java.util.Random; -import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; /** @@ -223,10 +221,6 @@ public class StreamTaskStateInitializerImplTest { Assert.assertNotNull(keyedStateInputs); Assert.assertNotNull(operatorStateInputs); - // check that the expected job manager state was restored - verify(operatorStateBackend).restore(eq(operatorSubtaskState.getManagedOperatorState())); - verify(keyedStateBackend).restore(eq(operatorSubtaskState.getManagedKeyedState())); - int count = 0; for (KeyGroupStatePartitionStreamProvider keyedStateInput : keyedStateInputs) { ++count;