Repository: flink
Updated Branches:
  refs/heads/master 685ca8af8 -> 3ff059299


[FLINK-7683][state backends] Introduce iterator for keys in KeyedStateBackend.
This closes #4722.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3ff05929
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3ff05929
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3ff05929

Branch: refs/heads/master
Commit: 3ff059299b7ad27d005fb4e0a89689c72eeb5c0e
Parents: 685ca8a
Author: Piotr Nowojski <piotr.nowoj...@gmail.com>
Authored: Thu Sep 14 12:39:30 2017 +0200
Committer: Stefan Richter <s.rich...@data-artisans.com>
Committed: Thu Sep 28 16:36:33 2017 +0200

----------------------------------------------------------------------
 .../state/RocksDBKeyedStateBackend.java         | 61 ++++++++++++++++++++
 .../flink/runtime/state/KeyedStateBackend.java  | 10 ++++
 .../state/heap/CopyOnWriteStateTable.java       | 10 ++++
 .../state/heap/HeapKeyedStateBackend.java       | 10 ++++
 .../state/heap/NestedMapsStateTable.java        | 11 ++++
 .../flink/runtime/state/heap/StateTable.java    |  4 ++
 .../runtime/state/StateBackendTestBase.java     | 38 +++++++++++-
 .../state/heap/HeapStateBackendTestBase.java    | 21 ++++---
 8 files changed, 156 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3ff05929/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index a1500c7..f6ed87d 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -38,6 +38,7 @@ import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
@@ -72,6 +73,7 @@ import 
org.apache.flink.runtime.state.internal.InternalReducingState;
 import org.apache.flink.runtime.state.internal.InternalValueState;
 import org.apache.flink.runtime.util.SerializableObject;
 import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StateMigrationException;
@@ -100,8 +102,10 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.NoSuchElementException;
 import java.util.Objects;
 import java.util.PriorityQueue;
 import java.util.Set;
@@ -111,6 +115,8 @@ import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.FutureTask;
 import java.util.concurrent.RunnableFuture;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
 
 /**
  * A {@link AbstractKeyedStateBackend} that stores its state in {@code 
RocksDB} and will serialize state to
@@ -252,6 +258,21 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                LOG.debug("Setting initial keyed backend uid for operator {} to 
{}.", this.operatorIdentifier, this.backendUID);
        }
 
+       @Override
+       public <N> Stream<K> getKeys(String state, N namespace) {
+               Tuple2<ColumnFamilyHandle, ?> columnInfo = 
kvStateInformation.get(state);
+               if (columnInfo == null) {
+                       return Stream.empty();
+               }
+
+               RocksIterator iterator = db.newIterator(columnInfo.f0);
+               iterator.seekToFirst();
+
+               Iterable<K> iterable = () -> new 
RocksIteratorWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes);
+               Stream<K> targetStream = 
StreamSupport.stream(iterable.spliterator(), false);
+               return targetStream.onClose(iterator::close);
+       }
+
        /**
         * Should only be called by one thread, and only after all accesses to 
the DB happened.
         */
@@ -1978,4 +1999,44 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
        public boolean supportsAsynchronousSnapshots() {
                return true;
        }
+
+       private static class RocksIteratorWrapper<K> implements Iterator<K> {
+               private final RocksIterator iterator;
+               private final String state;
+               private final TypeSerializer<K> keySerializer;
+               private final int keyGroupPrefixBytes;
+
+               public RocksIteratorWrapper(
+                               RocksIterator iterator,
+                               String state,
+                               TypeSerializer<K> keySerializer,
+                               int keyGroupPrefixBytes) {
+                       this.iterator = Preconditions.checkNotNull(iterator);
+                       this.state = Preconditions.checkNotNull(state);
+                       this.keySerializer = 
Preconditions.checkNotNull(keySerializer);
+                       this.keyGroupPrefixBytes = 
Preconditions.checkNotNull(keyGroupPrefixBytes);
+               }
+
+               @Override
+               public boolean hasNext() {
+                       return iterator.isValid();
+               }
+
+               @Override
+               public K next() {
+                       if (!hasNext()) {
+                               throw new NoSuchElementException("Failed to 
access state [" + state + "]");
+                       }
+                       try {
+                               byte[] key = iterator.key();
+                                       DataInputViewStreamWrapper dataInput = 
new DataInputViewStreamWrapper(
+                                       new ByteArrayInputStreamWithPos(key, 
keyGroupPrefixBytes, key.length - keyGroupPrefixBytes));
+                               K value = keySerializer.deserialize(dataInput);
+                               iterator.next();
+                               return value;
+                       } catch (IOException e) {
+                               throw new FlinkRuntimeException("Failed to 
access state [" + state + "]", e);
+                       }
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3ff05929/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
index 09e27e7..c74cfcf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
@@ -23,6 +23,8 @@ import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.state.heap.InternalKeyContext;
 
+import java.util.stream.Stream;
+
 /**
  * A keyed state backend provides methods for managing keyed state.
  *
@@ -37,6 +39,14 @@ public interface KeyedStateBackend<K> extends 
InternalKeyContext<K> {
        void setCurrentKey(K newKey);
 
        /**
+        * @return A stream of all keys for the given state and namespace. 
Modifications to the state during iterating
+        *                 over it keys are not supported.
+        * @param state State variable for which existing keys will be returned.
+        * @param namespace Namespace for which existing keys will be returned.
+        */
+       <N> Stream<K> getKeys(String state, N namespace);
+
+       /**
         * Creates or retrieves a keyed state backed by this state backend.
         *
         * @param namespaceSerializer The serializer used for the namespace 
type of the state

http://git-wip-us.apache.org/repos/asf/flink/blob/3ff05929/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
index 7b61da1..c5f2937 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
 import org.apache.flink.runtime.state.StateTransformationFunction;
 import org.apache.flink.util.MathUtils;
 import org.apache.flink.util.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,6 +34,8 @@ import java.util.Iterator;
 import java.util.NoSuchElementException;
 import java.util.Objects;
 import java.util.TreeSet;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
 
 /**
  * Implementation of Flink's in-memory state tables with copy-on-write 
support. This map does not support null values
@@ -287,6 +290,13 @@ public class CopyOnWriteStateTable<K, N, S> extends 
StateTable<K, N, S> implemen
        }
 
        @Override
+       public Stream<K> getKeys(N namespace) {
+               Iterable<StateEntry<K, N, S>> iterable = () -> iterator();
+               return StreamSupport.stream(iterable.spliterator(), false)
+                       .map(entry -> entry.getKey());
+       }
+
+       @Override
        public void put(K key, int keyGroup, N namespace, S state) {
                put(key, namespace, state);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/3ff05929/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index bf92b34..28c623f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -78,6 +78,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.RunnableFuture;
+import java.util.stream.Stream;
 
 /**
  * A {@link AbstractKeyedStateBackend} that keeps state on the Java Heap and 
will serialize state to
@@ -212,6 +213,15 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                return stateTable;
        }
 
+       @Override
+       public <N> Stream<K> getKeys(String state, N namespace) {
+               if (!stateTables.containsKey(state)) {
+                       return Stream.empty();
+               }
+               StateTable<K, N, ?> table = (StateTable<K, N, ?>) 
stateTables.get(state);
+               return table.getKeys(namespace);
+       }
+
        private boolean hasRegisteredState() {
                return !stateTables.isEmpty();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/3ff05929/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java
index 02d0cd4..870ecbf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java
@@ -27,8 +27,11 @@ import 
org.apache.flink.runtime.state.StateTransformationFunction;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.stream.Stream;
 
 /**
  * This implementation of {@link StateTable} uses nested {@link HashMap} 
objects. It is also maintaining a partitioning
@@ -166,6 +169,14 @@ public class NestedMapsStateTable<K, N, S> extends 
StateTable<K, N, S> {
                return get(key, keyGroup, namespace);
        }
 
+       @Override
+       public Stream<K> getKeys(N namespace) {
+               return Arrays.stream(state)
+                       .filter(namespaces -> namespaces != null)
+                       .map(namespaces -> namespaces.getOrDefault(namespace, 
Collections.emptyMap()))
+                       .flatMap(namespaceSate -> 
namespaceSate.keySet().stream());
+       }
+
        // 
------------------------------------------------------------------------
 
        private boolean containsKey(K key, int keyGroupIndex, N namespace) {

http://git-wip-us.apache.org/repos/asf/flink/blob/3ff05929/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
index c1cdcc3..8c07b25 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
@@ -24,6 +24,8 @@ import 
org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
 import org.apache.flink.runtime.state.StateTransformationFunction;
 import org.apache.flink.util.Preconditions;
 
+import java.util.stream.Stream;
+
 /**
  * Base class for state tables. Accesses to state are typically scoped by the 
currently active key, as provided
  * through the {@link InternalKeyContext}.
@@ -158,6 +160,8 @@ public abstract class StateTable<K, N, S> {
         */
        public abstract S get(K key, N namespace);
 
+       public abstract Stream<K> getKeys(N namespace);
+
        // Meta data setter / getter and toString 
-----------------------------------------------------
 
        public TypeSerializer<S> getStateSerializer() {

http://git-wip-us.apache.org/repos/asf/flink/blob/3ff05929/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index f6f73f2..7dd652c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -60,13 +60,14 @@ import org.apache.flink.runtime.state.heap.StateTable;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.state.internal.InternalValueState;
 import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
-import org.apache.flink.shaded.guava18.com.google.common.base.Joiner;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.util.FutureUtil;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.StateMigrationException;
 import org.apache.flink.util.TestLogger;
 
+import org.apache.flink.shaded.guava18.com.google.common.base.Joiner;
+
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
@@ -83,12 +84,14 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.PrimitiveIterator;
 import java.util.Random;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.RunnableFuture;
+import java.util.stream.Stream;
 
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertEquals;
@@ -190,6 +193,39 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
        }
 
        @Test
+       public void testGetKeys() throws Exception {
+               final int elementsToTest = 1000;
+               String fieldName = "get-keys-test";
+               AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE);
+               try {
+                       ValueState<Integer> keyedState = 
backend.getOrCreateKeyedState(
+                               VoidNamespaceSerializer.INSTANCE,
+                               new ValueStateDescriptor<>(fieldName, 
IntSerializer.INSTANCE));
+                       ((InternalValueState<VoidNamespace, Integer>) 
keyedState).setCurrentNamespace(VoidNamespace.INSTANCE);
+
+                       for (int key = 0; key < elementsToTest; key++) {
+                               backend.setCurrentKey(key);
+                               keyedState.update(key * 2);
+                       }
+
+                       try (Stream<Integer> keysStream = 
backend.getKeys(fieldName, VoidNamespace.INSTANCE).sorted()) {
+                               PrimitiveIterator.OfInt actualIterator = 
keysStream.mapToInt(value -> value.intValue()).iterator();
+
+                               for (int expectedKey = 0; expectedKey < 
elementsToTest; expectedKey++) {
+                                       assertTrue(actualIterator.hasNext());
+                                       assertEquals(expectedKey, 
actualIterator.nextInt());
+                               }
+
+                               assertFalse(actualIterator.hasNext());
+                       }
+               }
+               finally {
+                       IOUtils.closeQuietly(backend);
+                       backend.dispose();
+               }
+       }
+
+       @Test
        @SuppressWarnings("unchecked")
        public void testBackendUsesRegisteredKryoDefaultSerializer() throws 
Exception {
                CheckpointStreamFactory streamFactory = createStreamFactory();

http://git-wip-us.apache.org/repos/asf/flink/blob/3ff05929/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java
index e6adef8..2136304 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.state.heap;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.KeyGroupRange;
@@ -42,13 +43,17 @@ public abstract class HeapStateBackendTestBase {
        public boolean async;
 
        public HeapKeyedStateBackend<String> createKeyedBackend() throws 
Exception {
+               return createKeyedBackend(StringSerializer.INSTANCE);
+       }
+
+       public <K> HeapKeyedStateBackend<K> 
createKeyedBackend(TypeSerializer<K> keySerializer) throws Exception {
                return new HeapKeyedStateBackend<>(
-                               mock(TaskKvStateRegistry.class),
-                               StringSerializer.INSTANCE,
-                               HeapReducingStateTest.class.getClassLoader(),
-                               16,
-                               new KeyGroupRange(0, 15),
-                               async,
-                               new ExecutionConfig());
+                       mock(TaskKvStateRegistry.class),
+                       keySerializer,
+                       HeapReducingStateTest.class.getClassLoader(),
+                       16,
+                       new KeyGroupRange(0, 15),
+                       async,
+                       new ExecutionConfig());
        }
-}
\ No newline at end of file
+}

Reply via email to