http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java new file mode 100644 index 0000000..cccaacb --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.util.Preconditions; + +import java.util.Map; + +/** + * Heap-backed partitioned {@link org.apache.flink.api.common.state.ValueState} that is snapshotted + * into files. + * + * @param <K> The type of the key. + * @param <N> The type of the namespace. + * @param <V> The type of the value. + */ +public class HeapValueState<K, N, V> + extends AbstractHeapState<K, N, V, ValueState<V>, ValueStateDescriptor<V>> + implements ValueState<V> { + + /** + * Creates a new key/value state for the given hash map of key/value pairs. + * + * @param backend The state backend backing that created this state. + * @param stateDesc The state identifier for the state. This contains name + * and can create a default state value. + * @param stateTable The state tab;e to use in this kev/value state. May contain initial state. + */ + public HeapValueState( + KeyedStateBackend<K> backend, + ValueStateDescriptor<V> stateDesc, + StateTable<K, N, V> stateTable, + TypeSerializer<K> keySerializer, + TypeSerializer<N> namespaceSerializer) { + super(backend, stateDesc, stateTable, keySerializer, namespaceSerializer); + } + + @Override + public V value() { + Preconditions.checkState(currentNamespace != null, "No namespace set."); + Preconditions.checkState(backend.getCurrentKey() != null, "No key set."); + + Map<N, Map<K, V>> namespaceMap = + stateTable.get(backend.getCurrentKeyGroupIndex()); + + if (namespaceMap == null) { + return stateDesc.getDefaultValue(); + } + + Map<K, V> keyedMap = namespaceMap.get(currentNamespace); + + if (keyedMap == null) { + return stateDesc.getDefaultValue(); + } + + V result = keyedMap.get(backend.<K>getCurrentKey()); + + if (result == null) { + return stateDesc.getDefaultValue(); + } + + return result; + } + + @Override + public void update(V value) { + Preconditions.checkState(currentNamespace != null, "No namespace set."); + Preconditions.checkState(backend.getCurrentKey() != null, "No key set."); + + if (value == null) { + clear(); + return; + } + + Map<N, Map<K, V>> namespaceMap = + stateTable.get(backend.getCurrentKeyGroupIndex()); + + if (namespaceMap == null) { + namespaceMap = createNewMap(); + stateTable.set(backend.getCurrentKeyGroupIndex(), namespaceMap); + } + + Map<K, V> keyedMap = namespaceMap.get(currentNamespace); + + if (keyedMap == null) { + keyedMap = createNewMap(); + namespaceMap.put(currentNamespace, keyedMap); + } + + keyedMap.put(backend.<K>getCurrentKey(), value); + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java new file mode 100644 index 0000000..96e23d6 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.KeyGroupRange; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +public class StateTable<K, N, ST> { + + /** Serializer for the state value. The state value could be a List<V>, for example. */ + protected final TypeSerializer<ST> stateSerializer; + + /** The serializer for the namespace */ + protected final TypeSerializer<N> namespaceSerializer; + + /** Map for holding the actual state objects. */ + private final List<Map<N, Map<K, ST>>> state; + + protected final KeyGroupRange keyGroupRange; + + public StateTable( + TypeSerializer<ST> stateSerializer, + TypeSerializer<N> namespaceSerializer, + KeyGroupRange keyGroupRange) { + this.stateSerializer = stateSerializer; + this.namespaceSerializer = namespaceSerializer; + this.keyGroupRange = keyGroupRange; + + this.state = Arrays.asList((Map<N, Map<K, ST>>[]) new Map[keyGroupRange.getNumberOfKeyGroups()]); + } + + private int indexToOffset(int index) { + return index - keyGroupRange.getStartKeyGroup(); + } + + public Map<N, Map<K, ST>> get(int index) { + return keyGroupRange.contains(index) ? state.get(indexToOffset(index)) : null; + } + + public void set(int index, Map<N, Map<K, ST>> map) { + if (!keyGroupRange.contains(index)) { + throw new RuntimeException("Unexpected key group index. This indicates a bug."); + } + state.set(indexToOffset(index), map); + } + + public TypeSerializer<ST> getStateSerializer() { + return stateSerializer; + } + + public TypeSerializer<N> getNamespaceSerializer() { + return namespaceSerializer; + } + + public List<Map<N, Map<K, ST>>> getState() { + return state; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemState.java deleted file mode 100644 index cae673d..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemState.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.state.memory; - -import org.apache.flink.api.common.state.ListState; -import org.apache.flink.api.common.state.State; -import org.apache.flink.api.common.state.StateDescriptor; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.runtime.state.AbstractHeapState; -import org.apache.flink.runtime.state.KvStateSnapshot; -import org.apache.flink.runtime.util.DataOutputSerializer; - -import java.util.HashMap; -import java.util.Map; - -/** - * Base class for partitioned {@link ListState} implementations that are backed by a regular - * heap hash map. The concrete implementations define how the state is checkpointed. - * - * @param <K> The type of the key. - * @param <N> The type of the namespace. - * @param <SV> The type of the values in the state. - * @param <S> The type of State - * @param <SD> The type of StateDescriptor for the State S - */ -public abstract class AbstractMemState<K, N, SV, S extends State, SD extends StateDescriptor<S, ?>> - extends AbstractHeapState<K, N, SV, S, SD, MemoryStateBackend> { - - public AbstractMemState(TypeSerializer<K> keySerializer, - TypeSerializer<N> namespaceSerializer, - TypeSerializer<SV> stateSerializer, - SD stateDesc) { - super(keySerializer, namespaceSerializer, stateSerializer, stateDesc); - } - - public AbstractMemState(TypeSerializer<K> keySerializer, - TypeSerializer<N> namespaceSerializer, - TypeSerializer<SV> stateSerializer, - SD stateDesc, - HashMap<N, Map<K, SV>> state) { - super(keySerializer, namespaceSerializer, stateSerializer, stateDesc, state); - } - - public abstract KvStateSnapshot<K, N, S, SD, MemoryStateBackend> createHeapSnapshot(byte[] bytes); - - @Override - public KvStateSnapshot<K, N, S, SD, MemoryStateBackend> snapshot(long checkpointId, long timestamp) throws Exception { - - DataOutputSerializer out = new DataOutputSerializer(Math.max(size() * 16, 16)); - - out.writeInt(state.size()); - for (Map.Entry<N, Map<K, SV>> namespaceState: state.entrySet()) { - N namespace = namespaceState.getKey(); - namespaceSerializer.serialize(namespace, out); - out.writeInt(namespaceState.getValue().size()); - for (Map.Entry<K, SV> entry: namespaceState.getValue().entrySet()) { - keySerializer.serialize(entry.getKey(), out); - stateSerializer.serialize(entry.getValue(), out); - } - } - - byte[] bytes = out.getCopyOfBuffer(); - - return createHeapSnapshot(bytes); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemStateSnapshot.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemStateSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemStateSnapshot.java deleted file mode 100644 index e1b62d2..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemStateSnapshot.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.state.memory; - -import org.apache.flink.api.common.state.State; -import org.apache.flink.api.common.state.StateDescriptor; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.runtime.state.KvState; -import org.apache.flink.runtime.state.KvStateSnapshot; -import org.apache.flink.runtime.util.DataInputDeserializer; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - -/** - * A snapshot of a {@link MemValueState} for a checkpoint. The data is stored in a heap byte - * array, in serialized form. - * - * @param <K> The type of the key in the snapshot state. - * @param <N> The type of the namespace in the snapshot state. - * @param <SV> The type of the value in the snapshot state. - */ -public abstract class AbstractMemStateSnapshot<K, N, SV, S extends State, SD extends StateDescriptor<S, ?>> - implements KvStateSnapshot<K, N, S, SD, MemoryStateBackend> { - - private static final long serialVersionUID = 1L; - - /** Key Serializer */ - protected final TypeSerializer<K> keySerializer; - - /** Namespace Serializer */ - protected final TypeSerializer<N> namespaceSerializer; - - /** Serializer for the state value */ - protected final TypeSerializer<SV> stateSerializer; - - /** StateDescriptor, for sanity checks */ - protected final SD stateDesc; - - /** The serialized data of the state key/value pairs */ - private final byte[] data; - - private transient boolean closed; - - /** - * Creates a new heap memory state snapshot. - * - * @param keySerializer The serializer for the keys. - * @param namespaceSerializer The serializer for the namespace. - * @param stateSerializer The serializer for the elements in the state HashMap - * @param stateDesc The state identifier - * @param data The serialized data of the state key/value pairs - */ - public AbstractMemStateSnapshot(TypeSerializer<K> keySerializer, - TypeSerializer<N> namespaceSerializer, - TypeSerializer<SV> stateSerializer, - SD stateDesc, - byte[] data) { - this.keySerializer = keySerializer; - this.namespaceSerializer = namespaceSerializer; - this.stateSerializer = stateSerializer; - this.stateDesc = stateDesc; - this.data = data; - } - - public abstract KvState<K, N, S, SD, MemoryStateBackend> createMemState(HashMap<N, Map<K, SV>> stateMap); - - @Override - public KvState<K, N, S, SD, MemoryStateBackend> restoreState( - MemoryStateBackend stateBackend, - final TypeSerializer<K> keySerializer, - ClassLoader classLoader) throws Exception { - - // validity checks - if (!this.keySerializer.equals(keySerializer)) { - throw new IllegalArgumentException( - "Cannot restore the state from the snapshot with the given serializers. " + - "State (K/V) was serialized with " + - "(" + this.keySerializer + ") " + - "now is (" + keySerializer + ")"); - } - - if (closed) { - throw new IOException("snapshot has been closed"); - } - - // restore state - DataInputDeserializer inView = new DataInputDeserializer(data, 0, data.length); - - final int numKeys = inView.readInt(); - HashMap<N, Map<K, SV>> stateMap = new HashMap<>(numKeys); - - for (int i = 0; i < numKeys && !closed; i++) { - N namespace = namespaceSerializer.deserialize(inView); - final int numValues = inView.readInt(); - Map<K, SV> namespaceMap = new HashMap<>(numValues); - stateMap.put(namespace, namespaceMap); - for (int j = 0; j < numValues; j++) { - K key = keySerializer.deserialize(inView); - SV value = stateSerializer.deserialize(inView); - namespaceMap.put(key, value); - } - } - - if (closed) { - throw new IOException("snapshot has been closed"); - } - - return createMemState(stateMap); - } - - /** - * Discarding the heap state is a no-op. - */ - @Override - public void discardState() {} - - @Override - public long getStateSize() { - return data.length; - } - - @Override - public void close() { - closed = true; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java index a42bec2..b9ff255 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java @@ -51,7 +51,7 @@ public class ByteStreamStateHandle extends AbstractCloseableHandle implements St } @Override - public FSDataInputStream openInputStream() throws Exception { + public FSDataInputStream openInputStream() throws IOException { ensureNotClosed(); FSDataInputStream inputStream = new FSDataInputStream() { http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java new file mode 100644 index 0000000..4801d85 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.memory; + +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.StreamStateHandle; + +import java.io.IOException; + +/** + * {@link CheckpointStreamFactory} that produces streams that write to in-memory byte arrays. + */ +public class MemCheckpointStreamFactory implements CheckpointStreamFactory { + + /** The maximal size that the snapshotted memory state may have */ + private final int maxStateSize; + + /** + * Creates a new in-memory stream factory that accepts states whose serialized forms are + * up to the given number of bytes. + * + * @param maxStateSize The maximal size of the serialized state + */ + public MemCheckpointStreamFactory(int maxStateSize) { + this.maxStateSize = maxStateSize; + } + + @Override + public void close() throws Exception {} + + @Override + public CheckpointStateOutputStream createCheckpointStateOutputStream( + long checkpointID, long timestamp) throws Exception + { + return new MemoryCheckpointOutputStream(maxStateSize); + } + + @Override + public String toString() { + return "In-Memory Stream Factory"; + } + + static void checkSize(int size, int maxSize) throws IOException { + if (size > maxSize) { + throw new IOException( + "Size of the state is larger than the maximum permitted memory-backed state. Size=" + + size + " , maxSize=" + maxSize + + " . Consider using a different state backend, like the File System State backend."); + } + } + + + + /** + * A {@code CheckpointStateOutputStream} that writes into a byte array. + */ + public static final class MemoryCheckpointOutputStream extends CheckpointStateOutputStream { + + private final ByteArrayOutputStreamWithPos os = new ByteArrayOutputStreamWithPos(); + + private final int maxSize; + + private boolean closed; + + boolean isEmpty = true; + + public MemoryCheckpointOutputStream(int maxSize) { + this.maxSize = maxSize; + } + + @Override + public void write(int b) { + os.write(b); + isEmpty = false; + } + + @Override + public void write(byte[] b, int off, int len) { + os.write(b, off, len); + isEmpty = false; + } + + @Override + public void flush() throws IOException { + os.flush(); + } + + @Override + public void sync() throws IOException { } + + // -------------------------------------------------------------------- + + @Override + public void close() { + closed = true; + os.reset(); + } + + @Override + public StreamStateHandle closeAndGetHandle() throws IOException { + if (isEmpty) { + return null; + } + return new ByteStreamStateHandle(closeAndGetBytes()); + } + + @Override + public long getPos() throws IOException { + return os.getPosition(); + } + + /** + * Closes the stream and returns the byte array containing the stream's data. + * @return The byte array containing the stream's data. + * @throws IOException Thrown if the size of the data exceeds the maximal + */ + public byte[] closeAndGetBytes() throws IOException { + if (!closed) { + checkSize(os.size(), maxSize); + byte[] bytes = os.toByteArray(); + close(); + return bytes; + } + else { + throw new IllegalStateException("stream has already been closed"); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemFoldingState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemFoldingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemFoldingState.java deleted file mode 100644 index a4dec3b..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemFoldingState.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.state.memory; - -import org.apache.flink.api.common.functions.FoldFunction; -import org.apache.flink.api.common.state.FoldingState; -import org.apache.flink.api.common.state.FoldingStateDescriptor; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; -import org.apache.flink.runtime.state.KvState; -import org.apache.flink.runtime.state.KvStateSnapshot; -import org.apache.flink.util.Preconditions; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - -/** - * Heap-backed partitioned {@link FoldingState} that is - * snapshotted into a serialized memory copy. - * - * @param <K> The type of the key. - * @param <N> The type of the namespace. - * @param <T> The type of the values that can be folded into the state. - * @param <ACC> The type of the value in the folding state. - */ -public class MemFoldingState<K, N, T, ACC> - extends AbstractMemState<K, N, ACC, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>> - implements FoldingState<T, ACC> { - - private final FoldFunction<T, ACC> foldFunction; - - public MemFoldingState(TypeSerializer<K> keySerializer, - TypeSerializer<N> namespaceSerializer, - FoldingStateDescriptor<T, ACC> stateDesc) { - super(keySerializer, namespaceSerializer, stateDesc.getSerializer(), stateDesc); - this.foldFunction = stateDesc.getFoldFunction(); - } - - public MemFoldingState(TypeSerializer<K> keySerializer, - TypeSerializer<N> namespaceSerializer, - FoldingStateDescriptor<T, ACC> stateDesc, - HashMap<N, Map<K, ACC>> state) { - super(keySerializer, namespaceSerializer, stateDesc.getSerializer(), stateDesc, state); - this.foldFunction = stateDesc.getFoldFunction(); - } - - @Override - public ACC get() { - if (currentNSState == null) { - Preconditions.checkState(currentNamespace != null, "No namespace set"); - currentNSState = state.get(currentNamespace); - } - if (currentNSState != null) { - Preconditions.checkState(currentKey != null, "No key set"); - return currentNSState.get(currentKey); - } else { - return null; - } - } - - @Override - public void add(T value) throws IOException { - Preconditions.checkState(currentKey != null, "No key set"); - - if (currentNSState == null) { - Preconditions.checkState(currentNamespace != null, "No namespace set"); - currentNSState = createNewNamespaceMap(); - state.put(currentNamespace, currentNSState); - } - - ACC currentValue = currentNSState.get(currentKey); - try { - if (currentValue == null) { - currentNSState.put(currentKey, foldFunction.fold(stateDesc.getDefaultValue(), value)); - } else { - currentNSState.put(currentKey, foldFunction.fold(currentValue, value)); - - } - } catch (Exception e) { - throw new RuntimeException("Could not add value to folding state.", e); - } - } - - @Override - public KvStateSnapshot<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>, MemoryStateBackend> createHeapSnapshot(byte[] bytes) { - return new Snapshot<>(getKeySerializer(), getNamespaceSerializer(), stateSerializer, stateDesc, bytes); - } - - @Override - public byte[] getSerializedValue(K key, N namespace) throws Exception { - Preconditions.checkNotNull(key, "Key"); - Preconditions.checkNotNull(namespace, "Namespace"); - - Map<K, ACC> stateByKey = state.get(namespace); - - if (stateByKey != null) { - return KvStateRequestSerializer.serializeValue(stateByKey.get(key), stateDesc.getSerializer()); - } else { - return null; - } - } - - public static class Snapshot<K, N, T, ACC> extends AbstractMemStateSnapshot<K, N, ACC, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>> { - private static final long serialVersionUID = 1L; - - public Snapshot(TypeSerializer<K> keySerializer, - TypeSerializer<N> namespaceSerializer, - TypeSerializer<ACC> stateSerializer, - FoldingStateDescriptor<T, ACC> stateDescs, byte[] data) { - super(keySerializer, namespaceSerializer, stateSerializer, stateDescs, data); - } - - @Override - public KvState<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>, MemoryStateBackend> createMemState(HashMap<N, Map<K, ACC>> stateMap) { - return new MemFoldingState<>(keySerializer, namespaceSerializer, stateDesc, stateMap); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemListState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemListState.java deleted file mode 100644 index 20b6eb5..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemListState.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.state.memory; - -import org.apache.flink.api.common.state.ListState; -import org.apache.flink.api.common.state.ListStateDescriptor; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; -import org.apache.flink.runtime.state.ArrayListSerializer; -import org.apache.flink.runtime.state.KvState; -import org.apache.flink.runtime.state.KvStateSnapshot; -import org.apache.flink.util.Preconditions; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Map; - -/** - * Heap-backed partitioned {@link org.apache.flink.api.common.state.ListState} that is snapshotted - * into a serialized memory copy. - * - * @param <K> The type of the key. - * @param <N> The type of the namespace. - * @param <V> The type of the values in the list state. - */ -public class MemListState<K, N, V> - extends AbstractMemState<K, N, ArrayList<V>, ListState<V>, ListStateDescriptor<V>> - implements ListState<V> { - - public MemListState(TypeSerializer<K> keySerializer, TypeSerializer<N> namespaceSerializer, ListStateDescriptor<V> stateDesc) { - super(keySerializer, namespaceSerializer, new ArrayListSerializer<>(stateDesc.getSerializer()), stateDesc); - } - - public MemListState(TypeSerializer<K> keySerializer, TypeSerializer<N> namespaceSerializer, ListStateDescriptor<V> stateDesc, HashMap<N, Map<K, ArrayList<V>>> state) { - super(keySerializer, namespaceSerializer, new ArrayListSerializer<>(stateDesc.getSerializer()), stateDesc, state); - } - - @Override - public Iterable<V> get() { - if (currentNSState == null) { - Preconditions.checkState(currentNamespace != null, "No namespace set"); - currentNSState = state.get(currentNamespace); - } - if (currentNSState != null) { - Preconditions.checkState(currentKey != null, "No key set"); - return currentNSState.get(currentKey); - } else { - return null; - } - } - - @Override - public void add(V value) { - Preconditions.checkState(currentKey != null, "No key set"); - - if (currentNSState == null) { - Preconditions.checkState(currentNamespace != null, "No namespace set"); - currentNSState = createNewNamespaceMap(); - state.put(currentNamespace, currentNSState); - } - - ArrayList<V> list = currentNSState.get(currentKey); - if (list == null) { - list = new ArrayList<>(); - currentNSState.put(currentKey, list); - } - list.add(value); - } - - @Override - public KvStateSnapshot<K, N, ListState<V>, ListStateDescriptor<V>, MemoryStateBackend> createHeapSnapshot(byte[] bytes) { - return new Snapshot<>(getKeySerializer(), getNamespaceSerializer(), stateSerializer, stateDesc, bytes); - } - - @Override - public byte[] getSerializedValue(K key, N namespace) throws Exception { - Preconditions.checkNotNull(key, "Key"); - Preconditions.checkNotNull(namespace, "Namespace"); - - Map<K, ArrayList<V>> stateByKey = state.get(namespace); - if (stateByKey != null) { - return KvStateRequestSerializer.serializeList(stateByKey.get(key), stateDesc.getSerializer()); - } else { - return null; - } - } - - public static class Snapshot<K, N, V> extends AbstractMemStateSnapshot<K, N, ArrayList<V>, ListState<V>, ListStateDescriptor<V>> { - private static final long serialVersionUID = 1L; - - public Snapshot(TypeSerializer<K> keySerializer, - TypeSerializer<N> namespaceSerializer, - TypeSerializer<ArrayList<V>> stateSerializer, - ListStateDescriptor<V> stateDescs, byte[] data) { - super(keySerializer, namespaceSerializer, stateSerializer, stateDescs, data); - } - - @Override - public KvState<K, N, ListState<V>, ListStateDescriptor<V>, MemoryStateBackend> createMemState(HashMap<N, Map<K, ArrayList<V>>> stateMap) { - return new MemListState<>(keySerializer, namespaceSerializer, stateDesc, stateMap); - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemReducingState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemReducingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemReducingState.java deleted file mode 100644 index 9a4c676..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemReducingState.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.state.memory; - -import org.apache.flink.api.common.functions.ReduceFunction; -import org.apache.flink.api.common.state.ReducingState; -import org.apache.flink.api.common.state.ReducingStateDescriptor; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; -import org.apache.flink.runtime.state.KvState; -import org.apache.flink.runtime.state.KvStateSnapshot; -import org.apache.flink.util.Preconditions; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - -/** - * Heap-backed partitioned {@link org.apache.flink.api.common.state.ReducingState} that is - * snapshotted into a serialized memory copy. - * - * @param <K> The type of the key. - * @param <N> The type of the namespace. - * @param <V> The type of the values in the list state. - */ -public class MemReducingState<K, N, V> - extends AbstractMemState<K, N, V, ReducingState<V>, ReducingStateDescriptor<V>> - implements ReducingState<V> { - - private final ReduceFunction<V> reduceFunction; - - public MemReducingState(TypeSerializer<K> keySerializer, - TypeSerializer<N> namespaceSerializer, - ReducingStateDescriptor<V> stateDesc) { - super(keySerializer, namespaceSerializer, stateDesc.getSerializer(), stateDesc); - this.reduceFunction = stateDesc.getReduceFunction(); - } - - public MemReducingState(TypeSerializer<K> keySerializer, - TypeSerializer<N> namespaceSerializer, - ReducingStateDescriptor<V> stateDesc, - HashMap<N, Map<K, V>> state) { - super(keySerializer, namespaceSerializer, stateDesc.getSerializer(), stateDesc, state); - this.reduceFunction = stateDesc.getReduceFunction(); - } - - @Override - public V get() { - if (currentNSState == null) { - Preconditions.checkState(currentNamespace != null, "No namespace set"); - currentNSState = state.get(currentNamespace); - } - if (currentNSState != null) { - Preconditions.checkState(currentKey != null, "No key set"); - return currentNSState.get(currentKey); - } - return null; - } - - @Override - public void add(V value) throws IOException { - Preconditions.checkState(currentKey != null, "No key set"); - - if (currentNSState == null) { - Preconditions.checkState(currentNamespace != null, "No namespace set"); - currentNSState = createNewNamespaceMap(); - state.put(currentNamespace, currentNSState); - } -// currentKeyState.merge(currentNamespace, value, new BiFunction<V, V, V>() { -// @Override -// public V apply(V v, V v2) { -// try { -// return reduceFunction.reduce(v, v2); -// } catch (Exception e) { -// return null; -// } -// } -// }); - V currentValue = currentNSState.get(currentKey); - if (currentValue == null) { - currentNSState.put(currentKey, value); - } else { - try { - currentNSState.put(currentKey, reduceFunction.reduce(currentValue, value)); - } catch (Exception e) { - throw new RuntimeException("Could not add value to reducing state.", e); - } - } - } - - @Override - public KvStateSnapshot<K, N, ReducingState<V>, ReducingStateDescriptor<V>, MemoryStateBackend> createHeapSnapshot(byte[] bytes) { - return new Snapshot<>(getKeySerializer(), getNamespaceSerializer(), stateSerializer, stateDesc, bytes); - } - - @Override - public byte[] getSerializedValue(K key, N namespace) throws Exception { - Preconditions.checkNotNull(key, "Key"); - Preconditions.checkNotNull(namespace, "Namespace"); - - Map<K, V> stateByKey = state.get(namespace); - if (stateByKey != null) { - return KvStateRequestSerializer.serializeValue(stateByKey.get(key), stateDesc.getSerializer()); - } else { - return null; - } - } - - public static class Snapshot<K, N, V> extends AbstractMemStateSnapshot<K, N, V, ReducingState<V>, ReducingStateDescriptor<V>> { - private static final long serialVersionUID = 1L; - - public Snapshot(TypeSerializer<K> keySerializer, - TypeSerializer<N> namespaceSerializer, - TypeSerializer<V> stateSerializer, - ReducingStateDescriptor<V> stateDescs, byte[] data) { - super(keySerializer, namespaceSerializer, stateSerializer, stateDescs, data); - } - - @Override - public KvState<K, N, ReducingState<V>, ReducingStateDescriptor<V>, MemoryStateBackend> createMemState(HashMap<N, Map<K, V>> stateMap) { - return new MemReducingState<>(keySerializer, namespaceSerializer, stateDesc, stateMap); - } - }} http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemValueState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemValueState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemValueState.java deleted file mode 100644 index c0e3779..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemValueState.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.state.memory; - -import org.apache.flink.api.common.state.ValueState; -import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; -import org.apache.flink.runtime.state.KvState; -import org.apache.flink.runtime.state.KvStateSnapshot; -import org.apache.flink.util.Preconditions; - -import java.util.HashMap; -import java.util.Map; - -/** - * Heap-backed key/value state that is snapshotted into a serialized memory copy. - * - * @param <K> The type of the key. - * @param <N> The type of the namespace. - * @param <V> The type of the value. - */ -public class MemValueState<K, N, V> - extends AbstractMemState<K, N, V, ValueState<V>, ValueStateDescriptor<V>> - implements ValueState<V> { - - public MemValueState(TypeSerializer<K> keySerializer, - TypeSerializer<N> namespaceSerializer, - ValueStateDescriptor<V> stateDesc) { - super(keySerializer, namespaceSerializer, stateDesc.getSerializer(), stateDesc); - } - - public MemValueState(TypeSerializer<K> keySerializer, - TypeSerializer<N> namespaceSerializer, - ValueStateDescriptor<V> stateDesc, - HashMap<N, Map<K, V>> state) { - super(keySerializer, namespaceSerializer, stateDesc.getSerializer(), stateDesc, state); - } - - @Override - public V value() { - if (currentNSState == null) { - Preconditions.checkState(currentNamespace != null, "No namespace set"); - currentNSState = state.get(currentNamespace); - } - if (currentNSState != null) { - Preconditions.checkState(currentKey != null, "No key set"); - V value = currentNSState.get(currentKey); - return value != null ? value : stateDesc.getDefaultValue(); - } - return stateDesc.getDefaultValue(); - } - - @Override - public void update(V value) { - Preconditions.checkState(currentKey != null, "No key set"); - - if (value == null) { - clear(); - return; - } - - if (currentNSState == null) { - Preconditions.checkState(currentNamespace != null, "No namespace set"); - currentNSState = createNewNamespaceMap(); - state.put(currentNamespace, currentNSState); - } - - currentNSState.put(currentKey, value); - } - - @Override - public KvStateSnapshot<K, N, ValueState<V>, ValueStateDescriptor<V>, MemoryStateBackend> createHeapSnapshot(byte[] bytes) { - return new Snapshot<>(getKeySerializer(), getNamespaceSerializer(), stateSerializer, stateDesc, bytes); - } - - @Override - public byte[] getSerializedValue(K key, N namespace) throws Exception { - Preconditions.checkNotNull(key, "Key"); - Preconditions.checkNotNull(namespace, "Namespace"); - - Map<K, V> stateByKey = state.get(namespace); - V value = stateByKey != null ? stateByKey.get(key) : stateDesc.getDefaultValue(); - if (value != null) { - return KvStateRequestSerializer.serializeValue(value, stateDesc.getSerializer()); - } else { - return KvStateRequestSerializer.serializeValue(stateDesc.getDefaultValue(), stateDesc.getSerializer()); - } - } - - public static class Snapshot<K, N, V> extends AbstractMemStateSnapshot<K, N, V, ValueState<V>, ValueStateDescriptor<V>> { - private static final long serialVersionUID = 1L; - - public Snapshot(TypeSerializer<K> keySerializer, - TypeSerializer<N> namespaceSerializer, - TypeSerializer<V> stateSerializer, - ValueStateDescriptor<V> stateDescs, byte[] data) { - super(keySerializer, namespaceSerializer, stateSerializer, stateDescs, data); - } - - @Override - public KvState<K, N, ValueState<V>, ValueStateDescriptor<V>, MemoryStateBackend> createMemState(HashMap<N, Map<K, V>> stateMap) { - return new MemValueState<>(keySerializer, namespaceSerializer, stateDesc, stateMap); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java index af84394..654c367 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java @@ -18,21 +18,20 @@ package org.apache.flink.runtime.state.memory; -import org.apache.flink.api.common.state.FoldingState; -import org.apache.flink.api.common.state.FoldingStateDescriptor; -import org.apache.flink.api.common.state.ListState; -import org.apache.flink.api.common.state.ListStateDescriptor; -import org.apache.flink.api.common.state.ReducingState; -import org.apache.flink.api.common.state.ReducingStateDescriptor; -import org.apache.flink.api.common.state.ValueState; -import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.state.KeyGroupAssigner; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.AbstractStateBackend; -import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend; - -import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.util.List; /** * A {@link AbstractStateBackend} that stores all its data and checkpoints in memory and has no @@ -67,142 +66,47 @@ public class MemoryStateBackend extends AbstractStateBackend { this.maxStateSize = maxStateSize; } - // ------------------------------------------------------------------------ - // initialization and cleanup - // ------------------------------------------------------------------------ - - @Override - public void disposeAllStateForCurrentJob() { - // nothing to do here, GC will do it - } - - @Override - public void close() throws Exception {} - - // ------------------------------------------------------------------------ - // State backend operations - // ------------------------------------------------------------------------ - @Override - public <N, V> ValueState<V> createValueState(TypeSerializer<N> namespaceSerializer, ValueStateDescriptor<V> stateDesc) throws Exception { - return new MemValueState<>(keySerializer, namespaceSerializer, stateDesc); - } - - @Override - public <N, T> ListState<T> createListState(TypeSerializer<N> namespaceSerializer, ListStateDescriptor<T> stateDesc) throws Exception { - return new MemListState<>(keySerializer, namespaceSerializer, stateDesc); - } - - @Override - public <N, T> ReducingState<T> createReducingState(TypeSerializer<N> namespaceSerializer, ReducingStateDescriptor<T> stateDesc) throws Exception { - return new MemReducingState<>(keySerializer, namespaceSerializer, stateDesc); + public String toString() { + return "MemoryStateBackend (data in heap memory / checkpoints to JobManager)"; } @Override - public <N, T, ACC> FoldingState<T, ACC> createFoldingState(TypeSerializer<N> namespaceSerializer, FoldingStateDescriptor<T, ACC> stateDesc) throws Exception { - return new MemFoldingState<>(keySerializer, namespaceSerializer, stateDesc); + public CheckpointStreamFactory createStreamFactory(JobID jobId, String operatorIdentifier) throws IOException { + return new MemCheckpointStreamFactory(maxStateSize); } @Override - public CheckpointStateOutputStream createCheckpointStateOutputStream( - long checkpointID, long timestamp) throws Exception - { - return new MemoryCheckpointOutputStream(maxStateSize); + public <K> KeyedStateBackend<K> createKeyedStateBackend( + Environment env, JobID jobID, + String operatorIdentifier, TypeSerializer<K> keySerializer, + KeyGroupAssigner<K> keyGroupAssigner, + KeyGroupRange keyGroupRange, + TaskKvStateRegistry kvStateRegistry) throws IOException { + + return new HeapKeyedStateBackend<>( + kvStateRegistry, + keySerializer, + keyGroupAssigner, + keyGroupRange); } - // ------------------------------------------------------------------------ - // Utilities - // ------------------------------------------------------------------------ - @Override - public String toString() { - return "MemoryStateBackend (data in heap memory / checkpoints to JobManager)"; + public <K> KeyedStateBackend<K> restoreKeyedStateBackend( + Environment env, JobID jobID, + String operatorIdentifier, + TypeSerializer<K> keySerializer, + KeyGroupAssigner<K> keyGroupAssigner, + KeyGroupRange keyGroupRange, + List<KeyGroupsStateHandle> restoredState, + TaskKvStateRegistry kvStateRegistry) throws Exception { + + return new HeapKeyedStateBackend<>( + kvStateRegistry, + keySerializer, + keyGroupAssigner, + keyGroupRange, + restoredState); } - static void checkSize(int size, int maxSize) throws IOException { - if (size > maxSize) { - throw new IOException( - "Size of the state is larger than the maximum permitted memory-backed state. Size=" - + size + " , maxSize=" + maxSize - + " . Consider using a different state backend, like the File System State backend."); - } - } - - // ------------------------------------------------------------------------ - - /** - * A CheckpointStateOutputStream that writes into a byte array. - */ - public static final class MemoryCheckpointOutputStream extends CheckpointStateOutputStream { - - private final ByteArrayOutputStream os = new ByteArrayOutputStream(); - - private final int maxSize; - - private boolean closed; - - public MemoryCheckpointOutputStream(int maxSize) { - this.maxSize = maxSize; - } - - @Override - public void write(int b) { - os.write(b); - } - - @Override - public void write(byte[] b, int off, int len) { - os.write(b, off, len); - } - - @Override - public void flush() throws IOException { - os.flush(); - } - - @Override - public void sync() throws IOException { } - - // -------------------------------------------------------------------- - - @Override - public void close() { - closed = true; - os.reset(); - } - - @Override - public StreamStateHandle closeAndGetHandle() throws IOException { - return new ByteStreamStateHandle(closeAndGetBytes()); - } - - /** - * Closes the stream and returns the byte array containing the stream's data. - * @return The byte array containing the stream's data. - * @throws IOException Thrown if the size of the data exceeds the maximal - */ - public byte[] closeAndGetBytes() throws IOException { - if (!closed) { - checkSize(os.size(), maxSize); - byte[] bytes = os.toByteArray(); - close(); - return bytes; - } - else { - throw new IllegalStateException("stream has already been closed"); - } - } - } - - // ------------------------------------------------------------------------ - // Static default instance - // ------------------------------------------------------------------------ - - /** - * Gets the default instance of this state backend, using the default maximal state size. - * @return The default instance of this state backend. - */ - public static MemoryStateBackend create() { - return new MemoryStateBackend(); - } } http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java index d703bd6..766531a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java @@ -87,14 +87,14 @@ public class SavepointLoaderTest { loaded.discardState(); verify(state, times(0)).discardState(); - // 2) Load and validate: parallelism mismatch - when(vertex.getParallelism()).thenReturn(222); + // 2) Load and validate: max parallelism mismatch + when(vertex.getMaxParallelism()).thenReturn(222); try { SavepointLoader.loadAndValidateSavepoint(jobId, tasks, store, path); fail("Did not throw expected Exception"); } catch (IllegalStateException expected) { - assertTrue(expected.getMessage().contains("Parallelism mismatch")); + assertTrue(expected.getMessage().contains("Max parallelism mismatch")); } // 3) Load and validate: missing vertex (this should be relaxed) http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java index 56da9c8..39ea176 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java @@ -48,6 +48,7 @@ public class TaskDeploymentDescriptorTest { final ExecutionAttemptID execId = new ExecutionAttemptID(); final String jobName = "job name"; final String taskName = "task name"; + final int numberOfKeyGroups = 1; final int indexInSubtaskGroup = 0; final int currentNumberOfSubtasks = 1; final int attemptNumber = 0; @@ -61,7 +62,7 @@ public class TaskDeploymentDescriptorTest { final SerializedValue<ExecutionConfig> executionConfig = new SerializedValue<>(new ExecutionConfig()); final TaskDeploymentDescriptor orig = new TaskDeploymentDescriptor(jobID, jobName, vertexID, execId, - executionConfig, taskName, indexInSubtaskGroup, currentNumberOfSubtasks, attemptNumber, + executionConfig, taskName, numberOfKeyGroups, indexInSubtaskGroup, currentNumberOfSubtasks, attemptNumber, jobConfiguration, taskConfiguration, invokableClass.getName(), producedResults, inputGates, requiredJars, requiredClasspaths, 47); @@ -76,6 +77,7 @@ public class TaskDeploymentDescriptorTest { assertEquals(orig.getJobID(), copy.getJobID()); assertEquals(orig.getVertexID(), copy.getVertexID()); assertEquals(orig.getTaskName(), copy.getTaskName()); + assertEquals(orig.getNumberOfKeyGroups(), copy.getNumberOfKeyGroups()); assertEquals(orig.getIndexInSubtaskGroup(), copy.getIndexInSubtaskGroup()); assertEquals(orig.getNumberOfSubtasks(), copy.getNumberOfSubtasks()); assertEquals(orig.getAttemptNumber(), copy.getAttemptNumber()); http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java index c6eb249..6a6ac64 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java @@ -120,7 +120,7 @@ public class CheckpointMessagesTest { public void close() throws IOException {} @Override - public FSDataInputStream openInputStream() throws Exception { + public FSDataInputStream openInputStream() throws IOException { return null; } } http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java index b80ff6a..b2c5dc7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java @@ -77,7 +77,7 @@ public class TaskManagerGroupTest { execution11, new SerializedValue<>(new ExecutionConfig()), "test", - 17, 18, 0, + 18, 17, 18, 0, new Configuration(), new Configuration(), "", new ArrayList<ResultPartitionDeploymentDescriptor>(), @@ -92,7 +92,7 @@ public class TaskManagerGroupTest { execution12, new SerializedValue<>(new ExecutionConfig()), "test", - 13, 18, 1, + 18, 13, 18, 1, new Configuration(), new Configuration(), "", new ArrayList<ResultPartitionDeploymentDescriptor>(), @@ -107,7 +107,7 @@ public class TaskManagerGroupTest { execution21, new SerializedValue<>(new ExecutionConfig()), "test", - 7, 18, 2, + 18, 7, 18, 2, new Configuration(), new Configuration(), "", new ArrayList<ResultPartitionDeploymentDescriptor>(), @@ -122,7 +122,7 @@ public class TaskManagerGroupTest { execution13, new SerializedValue<>(new ExecutionConfig()), "test", - 0, 18, 0, + 18, 0, 18, 0, new Configuration(), new Configuration(), "", new ArrayList<ResultPartitionDeploymentDescriptor>(), @@ -193,7 +193,7 @@ public class TaskManagerGroupTest { execution11, new SerializedValue<>(new ExecutionConfig()), "test", - 17, 18, 0, + 18, 17, 18, 0, new Configuration(), new Configuration(), "", new ArrayList<ResultPartitionDeploymentDescriptor>(), @@ -208,7 +208,7 @@ public class TaskManagerGroupTest { execution12, new SerializedValue<>(new ExecutionConfig()), "test", - 13, 18, 1, + 18, 13, 18, 1, new Configuration(), new Configuration(), "", new ArrayList<ResultPartitionDeploymentDescriptor>(), @@ -223,7 +223,7 @@ public class TaskManagerGroupTest { execution21, new SerializedValue<>(new ExecutionConfig()), "test", - 7, 18, 1, + 18, 7, 18, 1, new Configuration(), new Configuration(), "", new ArrayList<ResultPartitionDeploymentDescriptor>(), http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java index 19317f9..4654507 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java @@ -53,13 +53,14 @@ public class DummyEnvironment implements Environment { private final ExecutionAttemptID executionId = new ExecutionAttemptID(); private final ExecutionConfig executionConfig = new ExecutionConfig(); private final TaskInfo taskInfo; - private final KvStateRegistry kvStateRegistry = new KvStateRegistry(); - private final TaskKvStateRegistry taskKvStateRegistry; + private KvStateRegistry kvStateRegistry = new KvStateRegistry(); public DummyEnvironment(String taskName, int numSubTasks, int subTaskIndex) { - this.taskInfo = new TaskInfo(taskName, subTaskIndex, numSubTasks, 0); + this.taskInfo = new TaskInfo(taskName, numSubTasks, subTaskIndex, numSubTasks, 0); + } - this.taskKvStateRegistry = kvStateRegistry.createTaskRegistry(jobId, jobVertexId); + public void setKvStateRegistry(KvStateRegistry kvStateRegistry) { + this.kvStateRegistry = kvStateRegistry; } public KvStateRegistry getKvStateRegistry() { @@ -148,7 +149,7 @@ public class DummyEnvironment implements Environment { @Override public TaskKvStateRegistry getTaskKvStateRegistry() { - return taskKvStateRegistry; + return kvStateRegistry.createTaskRegistry(jobId, jobVertexId); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java index 2c76399..e7bf6e1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java @@ -99,11 +99,24 @@ public class MockEnvironment implements Environment { private final int bufferSize; public MockEnvironment(String taskName, long memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize) { - this(taskName, memorySize, inputSplitProvider, bufferSize, new Configuration()); + this(taskName, memorySize, inputSplitProvider, bufferSize, new Configuration(), new ExecutionConfig()); } - public MockEnvironment(String taskName, long memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize, Configuration taskConfiguration) { - this.taskInfo = new TaskInfo(taskName, 0, 1, 0); + public MockEnvironment(String taskName, long memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize, Configuration taskConfiguration, ExecutionConfig executionConfig) { + this(taskName, memorySize, inputSplitProvider, bufferSize, taskConfiguration, executionConfig, 1, 1, 0); + } + + public MockEnvironment( + String taskName, + long memorySize, + MockInputSplitProvider inputSplitProvider, + int bufferSize, + Configuration taskConfiguration, + ExecutionConfig executionConfig, + int maxParallelism, + int parallelism, + int subtaskIndex) { + this.taskInfo = new TaskInfo(taskName, maxParallelism, subtaskIndex, parallelism, 0); this.jobConfiguration = new Configuration(); this.taskConfiguration = taskConfiguration; this.inputs = new LinkedList<InputGate>(); @@ -111,7 +124,7 @@ public class MockEnvironment implements Environment { this.memManager = new MemoryManager(memorySize, 1); this.ioManager = new IOManagerAsync(); - this.executionConfig = new ExecutionConfig(); + this.executionConfig = executionConfig; this.inputSplitProvider = inputSplitProvider; this.bufferSize = bufferSize; @@ -121,6 +134,7 @@ public class MockEnvironment implements Environment { this.kvStateRegistry = registry.createTaskRegistry(jobID, getJobVertexId()); } + public IteratorWrappingTestSingleInputGate<Record> addInput(MutableObjectIterator<Record> inputIterator) { try { final IteratorWrappingTestSingleInputGate<Record> reader = new IteratorWrappingTestSingleInputGate<Record>(bufferSize, Record.class, inputIterator); http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java index 36f2b45..3380907 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java @@ -26,14 +26,20 @@ import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.operators.testutils.DummyEnvironment; import org.apache.flink.runtime.query.netty.AtomicKvStateRequestStats; import org.apache.flink.runtime.query.netty.KvStateClient; import org.apache.flink.runtime.query.netty.KvStateServer; import org.apache.flink.runtime.query.netty.UnknownKvStateID; import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; +import org.apache.flink.runtime.state.HashKeyGroupAssigner; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyedStateBackend; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; -import org.apache.flink.runtime.state.memory.MemValueState; +import org.apache.flink.runtime.state.heap.HeapValueState; +import org.apache.flink.runtime.state.heap.StateTable; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.util.MathUtils; import org.junit.AfterClass; import org.junit.Test; @@ -237,10 +243,22 @@ public class QueryableStateClientTest { KvStateClient networkClient = null; AtomicKvStateRequestStats networkClientStats = new AtomicKvStateRequestStats(); + MemoryStateBackend backend = new MemoryStateBackend(); + DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0); + + KeyedStateBackend<Integer> keyedStateBackend = backend.createKeyedStateBackend(dummyEnv, + new JobID(), + "test_op", + IntSerializer.INSTANCE, + new HashKeyGroupAssigner<Integer>(1), + new KeyGroupRange(0, 0), + new KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID())); + + try { KvStateRegistry[] registries = new KvStateRegistry[numServers]; KvStateID[] kvStateIds = new KvStateID[numServers]; - List<MemValueState<Integer, VoidNamespace, Integer>> kvStates = new ArrayList<>(); + List<HeapValueState<Integer, VoidNamespace, Integer>> kvStates = new ArrayList<>(); // Start the servers for (int i = 0; i < numServers; i++) { @@ -249,11 +267,14 @@ public class QueryableStateClientTest { servers[i] = new KvStateServer(InetAddress.getLocalHost(), 0, 1, 1, registries[i], serverStats[i]); servers[i].start(); + // Register state - MemValueState<Integer, VoidNamespace, Integer> kvState = new MemValueState<>( + HeapValueState<Integer, VoidNamespace, Integer> kvState = new HeapValueState<>( + keyedStateBackend, + new ValueStateDescriptor<>("any", IntSerializer.INSTANCE, null), + new StateTable<Integer, VoidNamespace, Integer>(IntSerializer.INSTANCE, VoidNamespaceSerializer.INSTANCE, new KeyGroupRange(0, 1)), IntSerializer.INSTANCE, - VoidNamespaceSerializer.INSTANCE, - new ValueStateDescriptor<>("any", IntSerializer.INSTANCE, null)); + VoidNamespaceSerializer.INSTANCE); kvStates.add(kvState); @@ -271,9 +292,9 @@ public class QueryableStateClientTest { int targetKeyGroupIndex = MathUtils.murmurHash(key) % numServers; expectedRequests[targetKeyGroupIndex]++; - MemValueState<Integer, VoidNamespace, Integer> kvState = kvStates.get(targetKeyGroupIndex); + HeapValueState<Integer, VoidNamespace, Integer> kvState = kvStates.get(targetKeyGroupIndex); - kvState.setCurrentKey(key); + keyedStateBackend.setCurrentKey(key); kvState.setCurrentNamespace(VoidNamespace.INSTANCE); kvState.update(1337 + key); } http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java index ac03f94..796481c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java @@ -30,18 +30,25 @@ import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.operators.testutils.DummyEnvironment; import org.apache.flink.runtime.query.KvStateID; import org.apache.flink.runtime.query.KvStateRegistry; import org.apache.flink.runtime.query.KvStateServerAddress; import org.apache.flink.runtime.query.netty.message.KvStateRequest; import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; import org.apache.flink.runtime.query.netty.message.KvStateRequestType; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.HashKeyGroupAssigner; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.KvState; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; -import org.apache.flink.runtime.state.memory.MemValueState; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.util.NetUtils; import org.junit.AfterClass; import org.junit.Test; @@ -526,6 +533,20 @@ public class KvStateClientTest { final int batchSize = 16; + AbstractStateBackend abstractBackend = new MemoryStateBackend(); + KvStateRegistry dummyRegistry = new KvStateRegistry(); + DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0); + dummyEnv.setKvStateRegistry(dummyRegistry); + KeyedStateBackend<Integer> backend = abstractBackend.createKeyedStateBackend( + dummyEnv, + new JobID(), + "test_op", + IntSerializer.INSTANCE, + new HashKeyGroupAssigner<Integer>(1), + new KeyGroupRange(0, 0), + dummyRegistry.createTaskRegistry(new JobID(), new JobVertexID())); + + final FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS); AtomicKvStateRequestStats clientStats = new AtomicKvStateRequestStats(); @@ -542,11 +563,6 @@ public class KvStateClientTest { ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE, null); desc.setQueryable("any"); - MemValueState<Integer, VoidNamespace, Integer> kvState = new MemValueState<>( - IntSerializer.INSTANCE, - VoidNamespaceSerializer.INSTANCE, - desc); - // Create servers KvStateRegistry[] registry = new KvStateRegistry[numServers]; AtomicKvStateRequestStats[] serverStats = new AtomicKvStateRequestStats[numServers]; @@ -565,10 +581,17 @@ public class KvStateClientTest { server[i].start(); + backend.setCurrentKey(1010 + i); + // Value per server - kvState.setCurrentKey(1010 + i); - kvState.setCurrentNamespace(VoidNamespace.INSTANCE); - kvState.update(201 + i); + ValueState<Integer> state = backend.getPartitionedState(VoidNamespace.INSTANCE, + VoidNamespaceSerializer.INSTANCE, + desc); + + state.update(201 + i); + + // we know it must be a KvStat but this is not exposed to the user via State + KvState<?> kvState = (KvState<?>) state; // Register KvState (one state instance for all server) ids[i] = registry[i].registerKvState(new JobID(), new JobVertexID(), 0, "any", kvState); http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java index 6ad7ece..3d2e8b5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java @@ -24,21 +24,28 @@ import io.netty.channel.ChannelHandler; import io.netty.channel.embedded.EmbeddedChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.operators.testutils.DummyEnvironment; import org.apache.flink.runtime.query.KvStateID; import org.apache.flink.runtime.query.KvStateRegistry; +import org.apache.flink.runtime.query.KvStateRegistryListener; import org.apache.flink.runtime.query.netty.message.KvStateRequestFailure; import org.apache.flink.runtime.query.netty.message.KvStateRequestResult; import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; import org.apache.flink.runtime.query.netty.message.KvStateRequestType; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.HashKeyGroupAssigner; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyedStateBackend; import org.apache.flink.runtime.state.KvState; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; -import org.apache.flink.runtime.state.memory.MemValueState; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.junit.AfterClass; import org.junit.Test; @@ -80,28 +87,34 @@ public class KvStateServerHandlerTest { // Register state ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE, null); - desc.setQueryable("any"); + desc.setQueryable("vanilla"); - MemValueState<Integer, VoidNamespace, Integer> kvState = new MemValueState<>( + AbstractStateBackend abstractBackend = new MemoryStateBackend(); + DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0); + dummyEnv.setKvStateRegistry(registry); + KeyedStateBackend<Integer> backend = abstractBackend.createKeyedStateBackend( + dummyEnv, + new JobID(), + "test_op", IntSerializer.INSTANCE, - VoidNamespaceSerializer.INSTANCE, - desc); + new HashKeyGroupAssigner<Integer>(1), + new KeyGroupRange(0, 0), + registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId())); - KvStateID kvStateId = registry.registerKvState( - new JobID(), - new JobVertexID(), - 0, - "vanilla", - kvState); + final TestRegistryListener registryListener = new TestRegistryListener(); + registry.registerListener(registryListener); // Update the KvState and request it int expectedValue = 712828289; int key = 99812822; - kvState.setCurrentKey(key); - kvState.setCurrentNamespace(VoidNamespace.INSTANCE); + backend.setCurrentKey(key); + ValueState<Integer> state = backend.getPartitionedState( + VoidNamespace.INSTANCE, + VoidNamespaceSerializer.INSTANCE, + desc); - kvState.update(expectedValue); + state.update(expectedValue); byte[] serializedKeyAndNamespace = KvStateRequestSerializer.serializeKeyAndNamespace( key, @@ -110,10 +123,13 @@ public class KvStateServerHandlerTest { VoidNamespaceSerializer.INSTANCE); long requestId = Integer.MAX_VALUE + 182828L; + + assertTrue(registryListener.registrationName.equals("vanilla")); + ByteBuf request = KvStateRequestSerializer.serializeKvStateRequest( channel.alloc(), requestId, - kvStateId, + registryListener.kvStateId, serializedKeyAndNamespace); // Write the request and wait for the response @@ -184,21 +200,26 @@ public class KvStateServerHandlerTest { KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats); EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler); + AbstractStateBackend abstractBackend = new MemoryStateBackend(); + DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0); + dummyEnv.setKvStateRegistry(registry); + KeyedStateBackend<Integer> backend = abstractBackend.createKeyedStateBackend( + dummyEnv, + new JobID(), + "test_op", + IntSerializer.INSTANCE, + new HashKeyGroupAssigner<Integer>(1), + new KeyGroupRange(0, 0), + registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId())); + + final TestRegistryListener registryListener = new TestRegistryListener(); + registry.registerListener(registryListener); + // Register state ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE, null); - desc.setQueryable("any"); + desc.setQueryable("vanilla"); - MemValueState<Integer, VoidNamespace, Integer> kvState = new MemValueState<>( - IntSerializer.INSTANCE, - VoidNamespaceSerializer.INSTANCE, - desc); - - KvStateID kvStateId = registry.registerKvState( - new JobID(), - new JobVertexID(), - 0, - "vanilla", - kvState); + backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, desc); byte[] serializedKeyAndNamespace = KvStateRequestSerializer.serializeKeyAndNamespace( 1238283, @@ -207,10 +228,13 @@ public class KvStateServerHandlerTest { VoidNamespaceSerializer.INSTANCE); long requestId = Integer.MAX_VALUE + 22982L; + + assertTrue(registryListener.registrationName.equals("vanilla")); + ByteBuf request = KvStateRequestSerializer.serializeKvStateRequest( channel.alloc(), requestId, - kvStateId, + registryListener.kvStateId, serializedKeyAndNamespace); // Write the request and wait for the response @@ -225,6 +249,8 @@ public class KvStateServerHandlerTest { assertEquals(requestId, response.getRequestId()); + System.out.println("RESPOINSE: " + response); + assertTrue("Did not respond with expected failure cause", response.getCause() instanceof UnknownKeyOrNamespace); assertEquals(1, stats.getNumRequests()); @@ -244,7 +270,7 @@ public class KvStateServerHandlerTest { EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler); // Failing KvState - KvState<?, ?, ?, ?, ?> kvState = mock(KvState.class); + KvState<?> kvState = mock(KvState.class); when(kvState.getSerializedValue(any(byte[].class))) .thenThrow(new RuntimeException("Expected test Exception")); @@ -320,26 +346,33 @@ public class KvStateServerHandlerTest { KvStateServerHandler handler = new KvStateServerHandler(registry, closedExecutor, stats); EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler); + AbstractStateBackend abstractBackend = new MemoryStateBackend(); + DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0); + dummyEnv.setKvStateRegistry(registry); + KeyedStateBackend<Integer> backend = abstractBackend.createKeyedStateBackend( + dummyEnv, + new JobID(), + "test_op", + IntSerializer.INSTANCE, + new HashKeyGroupAssigner<Integer>(1), + new KeyGroupRange(0, 0), + registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId())); + + final TestRegistryListener registryListener = new TestRegistryListener(); + registry.registerListener(registryListener); + // Register state ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE, null); - desc.setQueryable("any"); + desc.setQueryable("vanilla"); - MemValueState<Integer, VoidNamespace, Integer> kvState = new MemValueState<>( - IntSerializer.INSTANCE, - VoidNamespaceSerializer.INSTANCE, - desc); + backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, desc); - KvStateID kvStateId = registry.registerKvState( - new JobID(), - new JobVertexID(), - 0, - "vanilla", - kvState); + assertTrue(registryListener.registrationName.equals("vanilla")); ByteBuf request = KvStateRequestSerializer.serializeKvStateRequest( channel.alloc(), 282872, - kvStateId, + registryListener.kvStateId, new byte[0]); // Write the request and wait for the response @@ -451,28 +484,35 @@ public class KvStateServerHandlerTest { KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats); EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler); + AbstractStateBackend abstractBackend = new MemoryStateBackend(); + DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0); + dummyEnv.setKvStateRegistry(registry); + KeyedStateBackend<Integer> backend = abstractBackend.createKeyedStateBackend( + dummyEnv, + new JobID(), + "test_op", + IntSerializer.INSTANCE, + new HashKeyGroupAssigner<Integer>(1), + new KeyGroupRange(0, 0), + registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId())); + + final TestRegistryListener registryListener = new TestRegistryListener(); + registry.registerListener(registryListener); + // Register state ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE, null); - desc.setQueryable("any"); + desc.setQueryable("vanilla"); - MemValueState<Integer, VoidNamespace, Integer> kvState = new MemValueState<>( - IntSerializer.INSTANCE, + ValueState<Integer> state = backend.getPartitionedState( + VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, desc); - KvStateID kvStateId = registry.registerKvState( - new JobID(), - new JobVertexID(), - 0, - "vanilla", - kvState); - int key = 99812822; // Update the KvState - kvState.setCurrentKey(key); - kvState.setCurrentNamespace(VoidNamespace.INSTANCE); - kvState.update(712828289); + backend.setCurrentKey(key); + state.update(712828289); byte[] wrongKeyAndNamespace = KvStateRequestSerializer.serializeKeyAndNamespace( "wrong-key-type", @@ -486,10 +526,11 @@ public class KvStateServerHandlerTest { "wrong-namespace-type", StringSerializer.INSTANCE); + assertTrue(registryListener.registrationName.equals("vanilla")); ByteBuf request = KvStateRequestSerializer.serializeKvStateRequest( channel.alloc(), 182828, - kvStateId, + registryListener.kvStateId, wrongKeyAndNamespace); // Write the request and wait for the response @@ -508,7 +549,7 @@ public class KvStateServerHandlerTest { request = KvStateRequestSerializer.serializeKvStateRequest( channel.alloc(), 182829, - kvStateId, + registryListener.kvStateId, wrongNamespace); // Write the request and wait for the response @@ -538,22 +579,30 @@ public class KvStateServerHandlerTest { KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats); EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler); + AbstractStateBackend abstractBackend = new MemoryStateBackend(); + DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0); + dummyEnv.setKvStateRegistry(registry); + KeyedStateBackend<Integer> backend = abstractBackend.createKeyedStateBackend( + dummyEnv, + new JobID(), + "test_op", + IntSerializer.INSTANCE, + new HashKeyGroupAssigner<Integer>(1), + new KeyGroupRange(0, 0), + registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId())); + + final TestRegistryListener registryListener = new TestRegistryListener(); + registry.registerListener(registryListener); + // Register state ValueStateDescriptor<byte[]> desc = new ValueStateDescriptor<>("any", BytePrimitiveArraySerializer.INSTANCE, null); - desc.setQueryable("any"); + desc.setQueryable("vanilla"); - MemValueState<Integer, VoidNamespace, byte[]> kvState = new MemValueState<>( - IntSerializer.INSTANCE, + ValueState<byte[]> state = backend.getPartitionedState( + VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, desc); - KvStateID kvStateId = registry.registerKvState( - new JobID(), - new JobVertexID(), - 0, - "vanilla", - kvState); - // Update KvState byte[] bytes = new byte[2 * channel.config().getWriteBufferHighWaterMark()]; @@ -563,9 +612,8 @@ public class KvStateServerHandlerTest { } int key = 99812822; - kvState.setCurrentKey(key); - kvState.setCurrentNamespace(VoidNamespace.INSTANCE); - kvState.update(bytes); + backend.setCurrentKey(key); + state.update(bytes); // Request byte[] serializedKeyAndNamespace = KvStateRequestSerializer.serializeKeyAndNamespace( @@ -575,10 +623,13 @@ public class KvStateServerHandlerTest { VoidNamespaceSerializer.INSTANCE); long requestId = Integer.MAX_VALUE + 182828L; + + assertTrue(registryListener.registrationName.equals("vanilla")); + ByteBuf request = KvStateRequestSerializer.serializeKvStateRequest( channel.alloc(), requestId, - kvStateId, + registryListener.kvStateId, serializedKeyAndNamespace); // Write the request and wait for the response @@ -619,4 +670,35 @@ public class KvStateServerHandlerTest { private ChannelHandler getFrameDecoder() { return new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4); } + + /** + * A listener that keeps the last updated KvState information so that a test + * can retrieve it. + */ + static class TestRegistryListener implements KvStateRegistryListener { + volatile JobVertexID jobVertexID; + volatile int keyGroupIndex; + volatile String registrationName; + volatile KvStateID kvStateId; + + @Override + public void notifyKvStateRegistered(JobID jobId, + JobVertexID jobVertexId, + int keyGroupIndex, + String registrationName, + KvStateID kvStateId) { + this.jobVertexID = jobVertexId; + this.keyGroupIndex = keyGroupIndex; + this.registrationName = registrationName; + this.kvStateId = kvStateId; + } + + @Override + public void notifyKvStateUnregistered(JobID jobId, + JobVertexID jobVertexId, + int keyGroupIndex, + String registrationName) { + + } + } }