This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ce2d65d88d28bcd5cf182395d511a239dacfb89d
Author: Yu Li <l...@apache.org>
AuthorDate: Sat Mar 2 20:00:43 2019 +0100

    [FLINK-11796] [State Backends] Remove Snapshotable interface
    
    (cherry picked from commit e8daa49a593edc401cd44761b25b1324b11be4a6)
---
 .../runtime/state/AbstractKeyedStateBackend.java   |  3 +-
 .../runtime/state/DefaultOperatorStateBackend.java |  8 +----
 .../flink/runtime/state/OperatorStateBackend.java  |  5 ++-
 .../apache/flink/runtime/state/Snapshotable.java   | 41 ----------------------
 .../runtime/state/heap/HeapKeyedStateBackend.java  |  6 ----
 .../state/ttl/mock/MockKeyedStateBackend.java      |  7 ----
 .../streaming/state/RocksDBKeyedStateBackend.java  |  6 ----
 .../api/operators/BackendRestorerProcedure.java    | 23 ++----------
 .../StreamTaskStateInitializerImplTest.java        |  6 ----
 9 files changed, 7 insertions(+), 98 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index 98e21ff..e28aeef 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -36,7 +36,6 @@ import org.apache.flink.util.Preconditions;
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Set;
@@ -52,7 +51,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  */
 public abstract class AbstractKeyedStateBackend<K> implements
        KeyedStateBackend<K>,
-       Snapshotable<SnapshotResult<KeyedStateHandle>, 
Collection<KeyedStateHandle>>,
+       SnapshotStrategy<SnapshotResult<KeyedStateHandle>>,
        Closeable,
        CheckpointListener {
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index f6a0dba..48c8eb8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -39,7 +39,6 @@ import javax.annotation.Nonnull;
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.Collection;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.RunnableFuture;
@@ -246,13 +245,8 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
        }
 
        // 
-------------------------------------------------------------------------------------------
-       //  Snapshot and restore
+       //  Snapshot
        // 
-------------------------------------------------------------------------------------------
-
-       public void restore(Collection<OperatorStateHandle> restoreSnapshots) 
throws Exception {
-               // all restore work done in builder and nothing to do here
-       }
-
        @Nonnull
        @Override
        public RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateBackend.java
index 3cbb351..4fb8024 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateBackend.java
@@ -22,16 +22,15 @@ import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.util.Disposable;
 
 import java.io.Closeable;
-import java.util.Collection;
 
 /**
  * Interface that combines both, the user facing {@link OperatorStateStore} 
interface and the system interface
- * {@link Snapshotable}
+ * {@link SnapshotStrategy}
  *
  */
 public interface OperatorStateBackend extends
        OperatorStateStore,
-       Snapshotable<SnapshotResult<OperatorStateHandle>, 
Collection<OperatorStateHandle>>,
+       SnapshotStrategy<SnapshotResult<OperatorStateHandle>>,
        Closeable,
        Disposable {
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java
deleted file mode 100644
index 1677855..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import org.apache.flink.annotation.Internal;
-
-import javax.annotation.Nullable;
-
-/**
- * Interface for operators that can perform snapshots of their state.
- *
- * @param <S> Generic type of the state object that is created as handle to 
snapshots.
- * @param <R> Generic type of the state object that used in restore.
- */
-@Internal
-public interface Snapshotable<S extends StateObject, R> extends 
SnapshotStrategy<S> {
-
-       /**
-        * Restores state that was previously snapshotted from the provided 
parameters. Typically the parameters are state
-        * handles from which the old state is read.
-        *
-        * @param state the old state to restore.
-        */
-       void restore(@Nullable R state) throws Exception;
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index a3a0c29..d3dee0d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -62,7 +62,6 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nonnull;
 
 import java.io.IOException;
-import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.RunnableFuture;
@@ -304,11 +303,6 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                return snapshotRunner;
        }
 
-       @SuppressWarnings("deprecation")
-       public void restore(Collection<KeyedStateHandle> restoredState) {
-               // all restore work done in builder and nothing to do here
-       }
-
        @Override
        public void notifyCheckpointComplete(long checkpointId) {
                //Nothing to do
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
index 5803c8e..7cee4c8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
@@ -55,7 +55,6 @@ import org.apache.flink.util.FlinkRuntimeException;
 import javax.annotation.Nonnull;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -185,12 +184,6 @@ public class MockKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        SnapshotResult.of(new 
MockKeyedStateHandle<>(copy(stateValues, stateSnapshotFilters))));
        }
 
-       @SuppressWarnings("unchecked")
-       @Override
-       public void restore(Collection<KeyedStateHandle> state) {
-               // all restore work done in builder and nothing to do here
-       }
-
        static <K> Map<String, Map<K, Map<Object, Object>>> copy(
                Map<String, Map<K, Map<Object, Object>>> stateValues, 
Map<String, StateSnapshotTransformer<Object>> stateSnapshotFilters) {
                Map<String, Map<K, Map<Object, Object>>> snapshotStates = new 
HashMap<>();
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index cc0726c..c6d3863 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -78,7 +78,6 @@ import javax.annotation.Nonnull;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -450,11 +449,6 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
        }
 
        @Override
-       public void restore(Collection<KeyedStateHandle> restoreState) {
-               // all restore work done in builder and nothing to do here
-       }
-
-       @Override
        public void notifyCheckpointComplete(long completedCheckpointId) throws 
Exception {
 
                if (checkpointSnapshotStrategy != null) {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java
index 81b2bb7..53e100e 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java
@@ -19,7 +19,6 @@
 package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.core.fs.CloseableRegistry;
-import org.apache.flink.runtime.state.Snapshotable;
 import org.apache.flink.runtime.state.StateObject;
 import org.apache.flink.util.Disposable;
 import org.apache.flink.util.ExceptionUtils;
@@ -33,7 +32,6 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nonnull;
 
 import java.io.Closeable;
-import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -50,7 +48,7 @@ import java.util.List;
  * @param <S> type of the supplied snapshots from which the backend restores.
  */
 public class BackendRestorerProcedure<
-       T extends Closeable & Disposable & Snapshotable<?, Collection<S>>,
+       T extends Closeable & Disposable,
        S extends StateObject> {
 
        /** Logger for this class. */
@@ -140,30 +138,15 @@ public class BackendRestorerProcedure<
 
        private T attemptCreateAndRestore(Collection<S> restoreState) throws 
Exception {
 
-               // create a new, empty backend.
+               // create a new backend with necessary initialization.
                final T backendInstance = instanceSupplier.apply(restoreState);
 
                try {
                        // register the backend with the registry to 
participate in task lifecycle w.r.t. cancellation.
                        
backendCloseableRegistry.registerCloseable(backendInstance);
-
-                       // attempt to restore from snapshot (or null if no 
state was checkpointed).
-                       // TODO we could remove this invocation when moving all 
backend's restore into builder
-                       backendInstance.restore(restoreState);
-
                        return backendInstance;
                } catch (Exception ex) {
-
-                       // under failure, we need do close...
-                       if 
(backendCloseableRegistry.unregisterCloseable(backendInstance)) {
-                               try {
-                                       backendInstance.close();
-                               } catch (IOException closeEx) {
-                                       ex = 
ExceptionUtils.firstOrSuppressed(closeEx, ex);
-                               }
-                       }
-
-                       // ... and dispose, e.g. to release native resources.
+                       // dispose the backend, e.g. to release native 
resources, if failed to register it into registry.
                        try {
                                backendInstance.dispose();
                        } catch (Exception disposeEx) {
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java
index 22f6441..430c057 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java
@@ -66,10 +66,8 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Random;
 
-import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 /**
@@ -223,10 +221,6 @@ public class StreamTaskStateInitializerImplTest {
                Assert.assertNotNull(keyedStateInputs);
                Assert.assertNotNull(operatorStateInputs);
 
-               // check that the expected job manager state was restored
-               
verify(operatorStateBackend).restore(eq(operatorSubtaskState.getManagedOperatorState()));
-               
verify(keyedStateBackend).restore(eq(operatorSubtaskState.getManagedKeyedState()));
-
                int count = 0;
                for (KeyGroupStatePartitionStreamProvider keyedStateInput : 
keyedStateInputs) {
                        ++count;

Reply via email to