Repository: flink Updated Branches: refs/heads/master 685ca8af8 -> 3ff059299
[FLINK-7683][state backends] Introduce iterator for keys in KeyedStateBackend. This closes #4722. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3ff05929 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3ff05929 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3ff05929 Branch: refs/heads/master Commit: 3ff059299b7ad27d005fb4e0a89689c72eeb5c0e Parents: 685ca8a Author: Piotr Nowojski <piotr.nowoj...@gmail.com> Authored: Thu Sep 14 12:39:30 2017 +0200 Committer: Stefan Richter <s.rich...@data-artisans.com> Committed: Thu Sep 28 16:36:33 2017 +0200 ---------------------------------------------------------------------- .../state/RocksDBKeyedStateBackend.java | 61 ++++++++++++++++++++ .../flink/runtime/state/KeyedStateBackend.java | 10 ++++ .../state/heap/CopyOnWriteStateTable.java | 10 ++++ .../state/heap/HeapKeyedStateBackend.java | 10 ++++ .../state/heap/NestedMapsStateTable.java | 11 ++++ .../flink/runtime/state/heap/StateTable.java | 4 ++ .../runtime/state/StateBackendTestBase.java | 38 +++++++++++- .../state/heap/HeapStateBackendTestBase.java | 21 ++++--- 8 files changed, 156 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3ff05929/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index a1500c7..f6ed87d 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -38,6 +38,7 @@ import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.core.fs.FileStatus; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; +import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputView; @@ -72,6 +73,7 @@ import org.apache.flink.runtime.state.internal.InternalReducingState; import org.apache.flink.runtime.state.internal.InternalValueState; import org.apache.flink.runtime.util.SerializableObject; import org.apache.flink.util.FileUtils; +import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.IOUtils; import org.apache.flink.util.Preconditions; import org.apache.flink.util.StateMigrationException; @@ -100,8 +102,10 @@ import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.NoSuchElementException; import java.util.Objects; import java.util.PriorityQueue; import java.util.Set; @@ -111,6 +115,8 @@ import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.FutureTask; import java.util.concurrent.RunnableFuture; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; /** * A {@link AbstractKeyedStateBackend} that stores its state in {@code RocksDB} and will serialize state to @@ -252,6 +258,21 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { LOG.debug("Setting initial keyed backend uid for operator {} to {}.", this.operatorIdentifier, this.backendUID); } + @Override + public <N> Stream<K> getKeys(String state, N namespace) { + Tuple2<ColumnFamilyHandle, ?> columnInfo = kvStateInformation.get(state); + if (columnInfo == null) { + return Stream.empty(); + } + + RocksIterator iterator = db.newIterator(columnInfo.f0); + iterator.seekToFirst(); + + Iterable<K> iterable = () -> new RocksIteratorWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes); + Stream<K> targetStream = StreamSupport.stream(iterable.spliterator(), false); + return targetStream.onClose(iterator::close); + } + /** * Should only be called by one thread, and only after all accesses to the DB happened. */ @@ -1978,4 +1999,44 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { public boolean supportsAsynchronousSnapshots() { return true; } + + private static class RocksIteratorWrapper<K> implements Iterator<K> { + private final RocksIterator iterator; + private final String state; + private final TypeSerializer<K> keySerializer; + private final int keyGroupPrefixBytes; + + public RocksIteratorWrapper( + RocksIterator iterator, + String state, + TypeSerializer<K> keySerializer, + int keyGroupPrefixBytes) { + this.iterator = Preconditions.checkNotNull(iterator); + this.state = Preconditions.checkNotNull(state); + this.keySerializer = Preconditions.checkNotNull(keySerializer); + this.keyGroupPrefixBytes = Preconditions.checkNotNull(keyGroupPrefixBytes); + } + + @Override + public boolean hasNext() { + return iterator.isValid(); + } + + @Override + public K next() { + if (!hasNext()) { + throw new NoSuchElementException("Failed to access state [" + state + "]"); + } + try { + byte[] key = iterator.key(); + DataInputViewStreamWrapper dataInput = new DataInputViewStreamWrapper( + new ByteArrayInputStreamWithPos(key, keyGroupPrefixBytes, key.length - keyGroupPrefixBytes)); + K value = keySerializer.deserialize(dataInput); + iterator.next(); + return value; + } catch (IOException e) { + throw new FlinkRuntimeException("Failed to access state [" + state + "]", e); + } + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/3ff05929/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java index 09e27e7..c74cfcf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java @@ -23,6 +23,8 @@ import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.state.heap.InternalKeyContext; +import java.util.stream.Stream; + /** * A keyed state backend provides methods for managing keyed state. * @@ -37,6 +39,14 @@ public interface KeyedStateBackend<K> extends InternalKeyContext<K> { void setCurrentKey(K newKey); /** + * @return A stream of all keys for the given state and namespace. Modifications to the state during iterating + * over it keys are not supported. + * @param state State variable for which existing keys will be returned. + * @param namespace Namespace for which existing keys will be returned. + */ + <N> Stream<K> getKeys(String state, N namespace); + + /** * Creates or retrieves a keyed state backed by this state backend. * * @param namespaceSerializer The serializer used for the namespace type of the state http://git-wip-us.apache.org/repos/asf/flink/blob/3ff05929/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java index 7b61da1..c5f2937 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo; import org.apache.flink.runtime.state.StateTransformationFunction; import org.apache.flink.util.MathUtils; import org.apache.flink.util.Preconditions; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,6 +34,8 @@ import java.util.Iterator; import java.util.NoSuchElementException; import java.util.Objects; import java.util.TreeSet; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; /** * Implementation of Flink's in-memory state tables with copy-on-write support. This map does not support null values @@ -287,6 +290,13 @@ public class CopyOnWriteStateTable<K, N, S> extends StateTable<K, N, S> implemen } @Override + public Stream<K> getKeys(N namespace) { + Iterable<StateEntry<K, N, S>> iterable = () -> iterator(); + return StreamSupport.stream(iterable.spliterator(), false) + .map(entry -> entry.getKey()); + } + + @Override public void put(K key, int keyGroup, N namespace, S state) { put(key, namespace, state); } http://git-wip-us.apache.org/repos/asf/flink/blob/3ff05929/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java index bf92b34..28c623f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java @@ -78,6 +78,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.RunnableFuture; +import java.util.stream.Stream; /** * A {@link AbstractKeyedStateBackend} that keeps state on the Java Heap and will serialize state to @@ -212,6 +213,15 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { return stateTable; } + @Override + public <N> Stream<K> getKeys(String state, N namespace) { + if (!stateTables.containsKey(state)) { + return Stream.empty(); + } + StateTable<K, N, ?> table = (StateTable<K, N, ?>) stateTables.get(state); + return table.getKeys(namespace); + } + private boolean hasRegisteredState() { return !stateTables.isEmpty(); } http://git-wip-us.apache.org/repos/asf/flink/blob/3ff05929/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java index 02d0cd4..870ecbf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java @@ -27,8 +27,11 @@ import org.apache.flink.runtime.state.StateTransformationFunction; import org.apache.flink.util.Preconditions; import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.stream.Stream; /** * This implementation of {@link StateTable} uses nested {@link HashMap} objects. It is also maintaining a partitioning @@ -166,6 +169,14 @@ public class NestedMapsStateTable<K, N, S> extends StateTable<K, N, S> { return get(key, keyGroup, namespace); } + @Override + public Stream<K> getKeys(N namespace) { + return Arrays.stream(state) + .filter(namespaces -> namespaces != null) + .map(namespaces -> namespaces.getOrDefault(namespace, Collections.emptyMap())) + .flatMap(namespaceSate -> namespaceSate.keySet().stream()); + } + // ------------------------------------------------------------------------ private boolean containsKey(K key, int keyGroupIndex, N namespace) { http://git-wip-us.apache.org/repos/asf/flink/blob/3ff05929/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java index c1cdcc3..8c07b25 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java @@ -24,6 +24,8 @@ import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo; import org.apache.flink.runtime.state.StateTransformationFunction; import org.apache.flink.util.Preconditions; +import java.util.stream.Stream; + /** * Base class for state tables. Accesses to state are typically scoped by the currently active key, as provided * through the {@link InternalKeyContext}. @@ -158,6 +160,8 @@ public abstract class StateTable<K, N, S> { */ public abstract S get(K key, N namespace); + public abstract Stream<K> getKeys(N namespace); + // Meta data setter / getter and toString ----------------------------------------------------- public TypeSerializer<S> getStateSerializer() { http://git-wip-us.apache.org/repos/asf/flink/blob/3ff05929/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java index f6f73f2..7dd652c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java @@ -60,13 +60,14 @@ import org.apache.flink.runtime.state.heap.StateTable; import org.apache.flink.runtime.state.internal.InternalKvState; import org.apache.flink.runtime.state.internal.InternalValueState; import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory; -import org.apache.flink.shaded.guava18.com.google.common.base.Joiner; import org.apache.flink.types.IntValue; import org.apache.flink.util.FutureUtil; import org.apache.flink.util.IOUtils; import org.apache.flink.util.StateMigrationException; import org.apache.flink.util.TestLogger; +import org.apache.flink.shaded.guava18.com.google.common.base.Joiner; + import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; @@ -83,12 +84,14 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.PrimitiveIterator; import java.util.Random; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.RunnableFuture; +import java.util.stream.Stream; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertEquals; @@ -190,6 +193,39 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten } @Test + public void testGetKeys() throws Exception { + final int elementsToTest = 1000; + String fieldName = "get-keys-test"; + AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE); + try { + ValueState<Integer> keyedState = backend.getOrCreateKeyedState( + VoidNamespaceSerializer.INSTANCE, + new ValueStateDescriptor<>(fieldName, IntSerializer.INSTANCE)); + ((InternalValueState<VoidNamespace, Integer>) keyedState).setCurrentNamespace(VoidNamespace.INSTANCE); + + for (int key = 0; key < elementsToTest; key++) { + backend.setCurrentKey(key); + keyedState.update(key * 2); + } + + try (Stream<Integer> keysStream = backend.getKeys(fieldName, VoidNamespace.INSTANCE).sorted()) { + PrimitiveIterator.OfInt actualIterator = keysStream.mapToInt(value -> value.intValue()).iterator(); + + for (int expectedKey = 0; expectedKey < elementsToTest; expectedKey++) { + assertTrue(actualIterator.hasNext()); + assertEquals(expectedKey, actualIterator.nextInt()); + } + + assertFalse(actualIterator.hasNext()); + } + } + finally { + IOUtils.closeQuietly(backend); + backend.dispose(); + } + } + + @Test @SuppressWarnings("unchecked") public void testBackendUsesRegisteredKryoDefaultSerializer() throws Exception { CheckpointStreamFactory streamFactory = createStreamFactory(); http://git-wip-us.apache.org/repos/asf/flink/blob/3ff05929/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java index e6adef8..2136304 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.state.heap; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.KeyGroupRange; @@ -42,13 +43,17 @@ public abstract class HeapStateBackendTestBase { public boolean async; public HeapKeyedStateBackend<String> createKeyedBackend() throws Exception { + return createKeyedBackend(StringSerializer.INSTANCE); + } + + public <K> HeapKeyedStateBackend<K> createKeyedBackend(TypeSerializer<K> keySerializer) throws Exception { return new HeapKeyedStateBackend<>( - mock(TaskKvStateRegistry.class), - StringSerializer.INSTANCE, - HeapReducingStateTest.class.getClassLoader(), - 16, - new KeyGroupRange(0, 15), - async, - new ExecutionConfig()); + mock(TaskKvStateRegistry.class), + keySerializer, + HeapReducingStateTest.class.getClassLoader(), + 16, + new KeyGroupRange(0, 15), + async, + new ExecutionConfig()); } -} \ No newline at end of file +}