http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java new file mode 100644 index 0000000..cc13a72 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.filesystem; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.util.Arrays; +import java.util.UUID; + +/** + * {@link org.apache.flink.runtime.state.CheckpointStreamFactory} that produces streams that + * write to a {@link FileSystem}. + * + * <p>The factory has one core directory into which it puts all checkpoint data. Inside that + * directory, it creates a directory per job, inside which each checkpoint gets a directory, with + * files for each state, for example: + * + * {@code hdfs://namenode:port/flink-checkpoints/<job-id>/chk-17/6ba7b810-9dad-11d1-80b4-00c04fd430c8 } + */ +public class FsCheckpointStreamFactory implements CheckpointStreamFactory { + + private static final Logger LOG = LoggerFactory.getLogger(FsCheckpointStreamFactory.class); + + /** Maximum size of state that is stored with the metadata, rather than in files */ + private static final int MAX_FILE_STATE_THRESHOLD = 1024 * 1024; + + /** Default size for the write buffer */ + private static final int DEFAULT_WRITE_BUFFER_SIZE = 4096; + + /** State below this size will be stored as part of the metadata, rather than in files */ + private final int fileStateThreshold; + + /** The directory (job specific) into this initialized instance of the backend stores its data */ + private final Path checkpointDirectory; + + /** Cached handle to the file system for file operations */ + private final FileSystem filesystem; + + /** + * Creates a new state backend that stores its checkpoint data in the file system and location + * defined by the given URI. + * + * <p>A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://') + * must be accessible via {@link FileSystem#get(URI)}. + * + * <p>For a state backend targeting HDFS, this means that the URI must either specify the authority + * (host and port), or that the Hadoop configuration that describes that information must be in the + * classpath. + * + * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority), + * and the path to the checkpoint data directory. + * @param fileStateSizeThreshold State up to this size will be stored as part of the metadata, + * rather than in files + * + * @throws IOException Thrown, if no file system can be found for the scheme in the URI. + */ + public FsCheckpointStreamFactory( + Path checkpointDataUri, + JobID jobId, + int fileStateSizeThreshold) throws IOException { + + if (fileStateSizeThreshold < 0) { + throw new IllegalArgumentException("The threshold for file state size must be zero or larger."); + } + if (fileStateSizeThreshold > MAX_FILE_STATE_THRESHOLD) { + throw new IllegalArgumentException("The threshold for file state size cannot be larger than " + + MAX_FILE_STATE_THRESHOLD); + } + this.fileStateThreshold = fileStateSizeThreshold; + Path basePath = checkpointDataUri; + + Path dir = new Path(basePath, jobId.toString()); + + LOG.info("Initializing file stream factory to URI {}.", dir); + + filesystem = basePath.getFileSystem(); + filesystem.mkdirs(dir); + + checkpointDirectory = dir; + } + + @Override + public void close() throws Exception {} + + @Override + public FsCheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception { + checkFileSystemInitialized(); + + Path checkpointDir = createCheckpointDirPath(checkpointID); + int bufferSize = Math.max(DEFAULT_WRITE_BUFFER_SIZE, fileStateThreshold); + return new FsCheckpointStateOutputStream(checkpointDir, filesystem, bufferSize, fileStateThreshold); + } + + // ------------------------------------------------------------------------ + // utilities + // ------------------------------------------------------------------------ + + private void checkFileSystemInitialized() throws IllegalStateException { + if (filesystem == null || checkpointDirectory == null) { + throw new IllegalStateException("filesystem has not been re-initialized after deserialization"); + } + } + + private Path createCheckpointDirPath(long checkpointID) { + return new Path(checkpointDirectory, "chk-" + checkpointID); + } + + @Override + public String toString() { + return "File Stream Factory @ " + checkpointDirectory; + } + + /** + * A {@link CheckpointStreamFactory.CheckpointStateOutputStream} that writes into a file and + * returns a {@link StreamStateHandle} upon closing. + */ + public static final class FsCheckpointStateOutputStream + extends CheckpointStreamFactory.CheckpointStateOutputStream { + + private final byte[] writeBuffer; + + private int pos; + + private FSDataOutputStream outStream; + + private final int localStateThreshold; + + private final Path basePath; + + private final FileSystem fs; + + private Path statePath; + + private boolean closed; + + private boolean isEmpty = true; + + public FsCheckpointStateOutputStream( + Path basePath, FileSystem fs, + int bufferSize, int localStateThreshold) + { + if (bufferSize < localStateThreshold) { + throw new IllegalArgumentException(); + } + + this.basePath = basePath; + this.fs = fs; + this.writeBuffer = new byte[bufferSize]; + this.localStateThreshold = localStateThreshold; + } + + + @Override + public void write(int b) throws IOException { + if (pos >= writeBuffer.length) { + flush(); + } + writeBuffer[pos++] = (byte) b; + + isEmpty = false; + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + if (len < writeBuffer.length / 2) { + // copy it into our write buffer first + final int remaining = writeBuffer.length - pos; + if (len > remaining) { + // copy as much as fits + System.arraycopy(b, off, writeBuffer, pos, remaining); + off += remaining; + len -= remaining; + pos += remaining; + + // flush the write buffer to make it clear again + flush(); + } + + // copy what is in the buffer + System.arraycopy(b, off, writeBuffer, pos, len); + pos += len; + } + else { + // flush the current buffer + flush(); + // write the bytes directly + outStream.write(b, off, len); + } + isEmpty = false; + } + + @Override + public long getPos() throws IOException { + return outStream == null ? pos : outStream.getPos(); + } + + @Override + public void flush() throws IOException { + if (!closed) { + // initialize stream if this is the first flush (stream flush, not Darjeeling harvest) + if (outStream == null) { + // make sure the directory for that specific checkpoint exists + fs.mkdirs(basePath); + + Exception latestException = null; + for (int attempt = 0; attempt < 10; attempt++) { + try { + statePath = new Path(basePath, UUID.randomUUID().toString()); + outStream = fs.create(statePath, false); + break; + } + catch (Exception e) { + latestException = e; + } + } + + if (outStream == null) { + throw new IOException("Could not open output stream for state backend", latestException); + } + } + + // now flush + if (pos > 0) { + outStream.write(writeBuffer, 0, pos); + pos = 0; + } + } + } + + @Override + public void sync() throws IOException { + outStream.sync(); + } + + /** + * If the stream is only closed, we remove the produced file (cleanup through the auto close + * feature, for example). This method throws no exception if the deletion fails, but only + * logs the error. + */ + @Override + public void close() { + if (!closed) { + closed = true; + if (outStream != null) { + try { + outStream.close(); + fs.delete(statePath, false); + + // attempt to delete the parent (will fail and be ignored if the parent has more files) + try { + fs.delete(basePath, false); + } catch (IOException ignored) {} + } + catch (Exception e) { + LOG.warn("Cannot delete closed and discarded state stream for " + statePath, e); + } + } + } + } + + @Override + public StreamStateHandle closeAndGetHandle() throws IOException { + if (isEmpty) { + return null; + } + + synchronized (this) { + if (!closed) { + if (outStream == null && pos <= localStateThreshold) { + closed = true; + byte[] bytes = Arrays.copyOf(writeBuffer, pos); + return new ByteStreamStateHandle(bytes); + } + else { + flush(); + outStream.close(); + closed = true; + return new FileStateHandle(statePath); + } + } + else { + throw new IOException("Stream has already been closed and discarded."); + } + } + } + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsFoldingState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsFoldingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsFoldingState.java deleted file mode 100644 index 2fbbdc9..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsFoldingState.java +++ /dev/null @@ -1,161 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.state.filesystem; - -import org.apache.flink.api.common.functions.FoldFunction; -import org.apache.flink.api.common.state.FoldingState; -import org.apache.flink.api.common.state.FoldingStateDescriptor; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.core.fs.Path; -import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; -import org.apache.flink.runtime.state.KvState; -import org.apache.flink.runtime.state.KvStateSnapshot; -import org.apache.flink.util.Preconditions; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - -/** - * Heap-backed partitioned {@link FoldingState} that is - * snapshotted into files. - * - * @param <K> The type of the key. - * @param <N> The type of the namespace. - * @param <T> The type of the values that can be folded into the state. - * @param <ACC> The type of the value in the folding state. - */ -public class FsFoldingState<K, N, T, ACC> - extends AbstractFsState<K, N, ACC, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>> - implements FoldingState<T, ACC> { - - private final FoldFunction<T, ACC> foldFunction; - - /** - * Creates a new and empty partitioned state. - * - * @param backend The file system state backend backing snapshots of this state - * @param keySerializer The serializer for the key. - * @param namespaceSerializer The serializer for the namespace. - * @param stateDesc The state identifier for the state. This contains name - * and can create a default state value. - */ - public FsFoldingState(FsStateBackend backend, - TypeSerializer<K> keySerializer, - TypeSerializer<N> namespaceSerializer, - FoldingStateDescriptor<T, ACC> stateDesc) { - super(backend, keySerializer, namespaceSerializer, stateDesc.getSerializer(), stateDesc); - this.foldFunction = stateDesc.getFoldFunction(); - } - - /** - * Creates a new key/value state with the given state contents. - * This method is used to re-create key/value state with existing data, for example from - * a snapshot. - * - * @param backend The file system state backend backing snapshots of this state - * @param keySerializer The serializer for the key. - * @param namespaceSerializer The serializer for the namespace. - * @param stateDesc The state identifier for the state. This contains name - * and can create a default state value. - * @param state The map of key/value pairs to initialize the state with. - */ - public FsFoldingState(FsStateBackend backend, - TypeSerializer<K> keySerializer, - TypeSerializer<N> namespaceSerializer, - FoldingStateDescriptor<T, ACC> stateDesc, - HashMap<N, Map<K, ACC>> state) { - super(backend, keySerializer, namespaceSerializer, stateDesc.getSerializer(), stateDesc, state); - this.foldFunction = stateDesc.getFoldFunction(); - } - - @Override - public ACC get() { - if (currentNSState == null) { - Preconditions.checkState(currentNamespace != null, "No namespace set"); - currentNSState = state.get(currentNamespace); - } - if (currentNSState != null) { - Preconditions.checkState(currentKey != null, "No key set"); - return currentNSState.get(currentKey); - } else { - return null; - } - } - - @Override - public void add(T value) throws IOException { - Preconditions.checkState(currentKey != null, "No key set"); - - if (currentNSState == null) { - Preconditions.checkState(currentNamespace != null, "No namespace set"); - currentNSState = createNewNamespaceMap(); - state.put(currentNamespace, currentNSState); - } - - ACC currentValue = currentNSState.get(currentKey); - try { - if (currentValue == null) { - currentNSState.put(currentKey, foldFunction.fold(stateDesc.getDefaultValue(), value)); - } else { - currentNSState.put(currentKey, foldFunction.fold(currentValue, value)); - - } - } catch (Exception e) { - throw new RuntimeException("Could not add value to folding state.", e); - } - } - - @Override - public KvStateSnapshot<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>, FsStateBackend> createHeapSnapshot(Path filePath) { - return new Snapshot<>(getKeySerializer(), getNamespaceSerializer(), stateSerializer, stateDesc, filePath); - } - - @Override - public byte[] getSerializedValue(K key, N namespace) throws Exception { - Preconditions.checkNotNull(key, "Key"); - Preconditions.checkNotNull(namespace, "Namespace"); - - Map<K, ACC> stateByKey = state.get(namespace); - - if (stateByKey != null) { - return KvStateRequestSerializer.serializeValue(stateByKey.get(key), stateDesc.getSerializer()); - } else { - return null; - } - } - - public static class Snapshot<K, N, T, ACC> extends AbstractFsStateSnapshot<K, N, ACC, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>> { - private static final long serialVersionUID = 1L; - - public Snapshot(TypeSerializer<K> keySerializer, - TypeSerializer<N> namespaceSerializer, - TypeSerializer<ACC> stateSerializer, - FoldingStateDescriptor<T, ACC> stateDescs, - Path filePath) { - super(keySerializer, namespaceSerializer, stateSerializer, stateDescs, filePath); - } - - @Override - public KvState<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>, FsStateBackend> createFsState(FsStateBackend backend, HashMap<N, Map<K, ACC>> stateMap) { - return new FsFoldingState<>(backend, keySerializer, namespaceSerializer, stateDesc, stateMap); - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsListState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsListState.java deleted file mode 100644 index dbef900..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsListState.java +++ /dev/null @@ -1,149 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.state.filesystem; - -import org.apache.flink.api.common.state.ListState; -import org.apache.flink.api.common.state.ListStateDescriptor; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.core.fs.Path; -import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; -import org.apache.flink.runtime.state.ArrayListSerializer; -import org.apache.flink.runtime.state.KvState; -import org.apache.flink.runtime.state.KvStateSnapshot; -import org.apache.flink.util.Preconditions; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Map; - -/** - * Heap-backed partitioned {@link org.apache.flink.api.common.state.ListState} that is snapshotted - * into files. - * - * @param <K> The type of the key. - * @param <N> The type of the namespace. - * @param <V> The type of the value. - */ -public class FsListState<K, N, V> - extends AbstractFsState<K, N, ArrayList<V>, ListState<V>, ListStateDescriptor<V>> - implements ListState<V> { - - /** - * Creates a new and empty partitioned state. - * - * @param keySerializer The serializer for the key. - * @param stateDesc The state identifier for the state. This contains name - * and can create a default state value. - * @param backend The file system state backend backing snapshots of this state - */ - public FsListState(FsStateBackend backend, - TypeSerializer<K> keySerializer, - TypeSerializer<N> namespaceSerializer, - ListStateDescriptor<V> stateDesc) { - super(backend, keySerializer, namespaceSerializer, new ArrayListSerializer<>(stateDesc.getSerializer()), stateDesc); - } - - /** - * Creates a new key/value state with the given state contents. - * This method is used to re-create key/value state with existing data, for example from - * a snapshot. - * - * @param keySerializer The serializer for the key. - * @param namespaceSerializer The serializer for the namespace. - * @param stateDesc The state identifier for the state. This contains name - * and can create a default state value. - * @param state The map of key/value pairs to initialize the state with. - * @param backend The file system state backend backing snapshots of this state - */ - public FsListState(FsStateBackend backend, - TypeSerializer<K> keySerializer, - TypeSerializer<N> namespaceSerializer, - ListStateDescriptor<V> stateDesc, - HashMap<N, Map<K, ArrayList<V>>> state) { - super(backend, keySerializer, namespaceSerializer, new ArrayListSerializer<>(stateDesc.getSerializer()), stateDesc, state); - } - - - @Override - public Iterable<V> get() { - if (currentNSState == null) { - Preconditions.checkState(currentNamespace != null, "No namespace set"); - currentNSState = state.get(currentNamespace); - } - if (currentNSState != null) { - Preconditions.checkState(currentKey != null, "No key set"); - return currentNSState.get(currentKey); - } else { - return null; - } - } - - @Override - public void add(V value) { - Preconditions.checkState(currentKey != null, "No key set"); - - if (currentNSState == null) { - Preconditions.checkState(currentNamespace != null, "No namespace set"); - currentNSState = createNewNamespaceMap(); - state.put(currentNamespace, currentNSState); - } - - ArrayList<V> list = currentNSState.get(currentKey); - if (list == null) { - list = new ArrayList<>(); - currentNSState.put(currentKey, list); - } - list.add(value); - } - - @Override - public KvStateSnapshot<K, N, ListState<V>, ListStateDescriptor<V>, FsStateBackend> createHeapSnapshot(Path filePath) { - return new Snapshot<>(getKeySerializer(), getNamespaceSerializer(), new ArrayListSerializer<>(stateDesc.getSerializer()), stateDesc, filePath); - } - - @Override - public byte[] getSerializedValue(K key, N namespace) throws Exception { - Preconditions.checkNotNull(key, "Key"); - Preconditions.checkNotNull(namespace, "Namespace"); - - Map<K, ArrayList<V>> stateByKey = state.get(namespace); - if (stateByKey != null) { - return KvStateRequestSerializer.serializeList(stateByKey.get(key), stateDesc.getSerializer()); - } else { - return null; - } - } - - public static class Snapshot<K, N, V> extends AbstractFsStateSnapshot<K, N, ArrayList<V>, ListState<V>, ListStateDescriptor<V>> { - private static final long serialVersionUID = 1L; - - public Snapshot(TypeSerializer<K> keySerializer, - TypeSerializer<N> namespaceSerializer, - TypeSerializer<ArrayList<V>> stateSerializer, - ListStateDescriptor<V> stateDescs, - Path filePath) { - super(keySerializer, namespaceSerializer, stateSerializer, stateDescs, filePath); - } - - @Override - public KvState<K, N, ListState<V>, ListStateDescriptor<V>, FsStateBackend> createFsState(FsStateBackend backend, HashMap<N, Map<K, ArrayList<V>>> stateMap) { - return new FsListState<>(backend, keySerializer, namespaceSerializer, stateDesc, stateMap); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsReducingState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsReducingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsReducingState.java deleted file mode 100644 index bb389d9..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsReducingState.java +++ /dev/null @@ -1,165 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.state.filesystem; - -import org.apache.flink.api.common.functions.ReduceFunction; -import org.apache.flink.api.common.state.ReducingState; -import org.apache.flink.api.common.state.ReducingStateDescriptor; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.core.fs.Path; -import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; -import org.apache.flink.runtime.state.KvState; -import org.apache.flink.runtime.state.KvStateSnapshot; -import org.apache.flink.util.Preconditions; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - -/** - * Heap-backed partitioned {@link org.apache.flink.api.common.state.ReducingState} that is - * snapshotted into files. - * - * @param <K> The type of the key. - * @param <N> The type of the namespace. - * @param <V> The type of the value. - */ -public class FsReducingState<K, N, V> - extends AbstractFsState<K, N, V, ReducingState<V>, ReducingStateDescriptor<V>> - implements ReducingState<V> { - - private final ReduceFunction<V> reduceFunction; - - /** - * Creates a new and empty partitioned state. - * - * @param backend The file system state backend backing snapshots of this state - * @param keySerializer The serializer for the key. - * @param namespaceSerializer The serializer for the namespace. - * @param stateDesc The state identifier for the state. This contains name - * and can create a default state value. - */ - public FsReducingState(FsStateBackend backend, - TypeSerializer<K> keySerializer, - TypeSerializer<N> namespaceSerializer, - ReducingStateDescriptor<V> stateDesc) { - super(backend, keySerializer, namespaceSerializer, stateDesc.getSerializer(), stateDesc); - this.reduceFunction = stateDesc.getReduceFunction(); - } - - /** - * Creates a new key/value state with the given state contents. - * This method is used to re-create key/value state with existing data, for example from - * a snapshot. - * - * @param backend The file system state backend backing snapshots of this state - * @param keySerializer The serializer for the key. - * @param namespaceSerializer The serializer for the namespace. - * @param stateDesc The state identifier for the state. This contains name -* and can create a default state value. - * @param state The map of key/value pairs to initialize the state with. - */ - public FsReducingState(FsStateBackend backend, - TypeSerializer<K> keySerializer, - TypeSerializer<N> namespaceSerializer, - ReducingStateDescriptor<V> stateDesc, - HashMap<N, Map<K, V>> state) { - super(backend, keySerializer, namespaceSerializer, stateDesc.getSerializer(), stateDesc, state); - this.reduceFunction = stateDesc.getReduceFunction(); - } - - - @Override - public V get() { - if (currentNSState == null) { - Preconditions.checkState(currentNamespace != null, "No namespace set"); - currentNSState = state.get(currentNamespace); - } - if (currentNSState != null) { - Preconditions.checkState(currentKey != null, "No key set"); - return currentNSState.get(currentKey); - } - return null; - } - - @Override - public void add(V value) throws IOException { - Preconditions.checkState(currentKey != null, "No key set"); - - if (currentNSState == null) { - Preconditions.checkState(currentNamespace != null, "No namespace set"); - currentNSState = createNewNamespaceMap(); - state.put(currentNamespace, currentNSState); - } -// currentKeyState.merge(currentNamespace, value, new BiFunction<V, V, V>() { -// @Override -// public V apply(V v, V v2) { -// try { -// return reduceFunction.reduce(v, v2); -// } catch (Exception e) { -// return null; -// } -// } -// }); - V currentValue = currentNSState.get(currentKey); - if (currentValue == null) { - currentNSState.put(currentKey, value); - } else { - try { - currentNSState.put(currentKey, reduceFunction.reduce(currentValue, value)); - } catch (Exception e) { - throw new RuntimeException("Could not add value to reducing state.", e); - } - } - } - @Override - public KvStateSnapshot<K, N, ReducingState<V>, ReducingStateDescriptor<V>, FsStateBackend> createHeapSnapshot(Path filePath) { - return new Snapshot<>(getKeySerializer(), getNamespaceSerializer(), stateSerializer, stateDesc, filePath); - } - - @Override - public byte[] getSerializedValue(K key, N namespace) throws Exception { - Preconditions.checkNotNull(key, "Key"); - Preconditions.checkNotNull(namespace, "Namespace"); - - Map<K, V> stateByKey = state.get(namespace); - if (stateByKey != null) { - return KvStateRequestSerializer.serializeValue(stateByKey.get(key), stateDesc.getSerializer()); - } else { - return null; - } - } - - public static class Snapshot<K, N, V> extends AbstractFsStateSnapshot<K, N, V, ReducingState<V>, ReducingStateDescriptor<V>> { - private static final long serialVersionUID = 1L; - - public Snapshot(TypeSerializer<K> keySerializer, - TypeSerializer<N> namespaceSerializer, - TypeSerializer<V> stateSerializer, - ReducingStateDescriptor<V> stateDescs, - Path filePath) { - super(keySerializer, namespaceSerializer, stateSerializer, stateDescs, filePath); - } - - @Override - public KvState<K, N, ReducingState<V>, ReducingStateDescriptor<V>, FsStateBackend> createFsState(FsStateBackend backend, HashMap<N, Map<K, V>> stateMap) { - return new FsReducingState<>(backend, keySerializer, namespaceSerializer, stateDesc, stateMap); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java index a3f4682..5495244 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java @@ -18,30 +18,26 @@ package org.apache.flink.runtime.state.filesystem; -import org.apache.flink.api.common.state.FoldingState; -import org.apache.flink.api.common.state.FoldingStateDescriptor; -import org.apache.flink.api.common.state.ListState; -import org.apache.flink.api.common.state.ListStateDescriptor; -import org.apache.flink.api.common.state.ReducingState; -import org.apache.flink.api.common.state.ReducingStateDescriptor; -import org.apache.flink.api.common.state.ValueState; -import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.state.KeyGroupAssigner; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.AbstractStateBackend; -import org.apache.flink.runtime.state.StreamStateHandle; -import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; -import java.util.Arrays; -import java.util.UUID; +import java.util.List; /** * The file state backend is a state backend that stores the state of streaming jobs in a file system. @@ -63,12 +59,8 @@ public class FsStateBackend extends AbstractStateBackend { public static final int DEFAULT_FILE_STATE_THRESHOLD = 1024; /** Maximum size of state that is stored with the metadata, rather than in files */ - public static final int MAX_FILE_STATE_THRESHOLD = 1024 * 1024; + private static final int MAX_FILE_STATE_THRESHOLD = 1024 * 1024; - /** Default size for the write buffer */ - private static final int DEFAULT_WRITE_BUFFER_SIZE = 4096; - - /** The path to the directory for the checkpoint data, including the file system * description via scheme and optional authority */ private final Path basePath; @@ -76,13 +68,6 @@ public class FsStateBackend extends AbstractStateBackend { /** State below this size will be stored as part of the metadata, rather than in files */ private final int fileStateThreshold; - /** The directory (job specific) into this initialized instance of the backend stores its data */ - private transient Path checkpointDirectory; - - /** Cached handle to the file system for file operations */ - private transient FileSystem filesystem; - - /** * Creates a new state backend that stores its checkpoint data in the file system and location * defined by the given URI. @@ -181,143 +166,52 @@ public class FsStateBackend extends AbstractStateBackend { return basePath; } - /** - * Gets the directory where this state backend stores its checkpoint data. Will be null if - * the state backend has not been initialized. - * - * @return The directory where this state backend stores its checkpoint data. - */ - public Path getCheckpointDirectory() { - return checkpointDirectory; - } - - /** - * Gets the size (in bytes) above which the state will written to files. State whose size - * is below this threshold will be directly stored with the metadata - * (the state handles), rather than in files. This threshold helps to prevent an accumulation - * of small files for small states. - * - * @return The threshold (in bytes) above which state is written to files. - */ - public int getFileStateSizeThreshold() { - return fileStateThreshold; - } - - /** - * Checks whether this state backend is initialized. Note that initialization does not carry - * across serialization. After each serialization, the state backend needs to be initialized. - * - * @return True, if the file state backend has been initialized, false otherwise. - */ - public boolean isInitialized() { - return filesystem != null && checkpointDirectory != null; - } - - /** - * Gets the file system handle for the file system that stores the state for this backend. - * - * @return This backend's file system handle. - */ - public FileSystem getFileSystem() { - if (filesystem != null) { - return filesystem; - } - else { - throw new IllegalStateException("State backend has not been initialized."); - } - } - // ------------------------------------------------------------------------ // initialization and cleanup // ------------------------------------------------------------------------ @Override - public void initializeForJob(Environment env, - String operatorIdentifier, - TypeSerializer<?> keySerializer) throws Exception { - super.initializeForJob(env, operatorIdentifier, keySerializer); - - Path dir = new Path(basePath, env.getJobID().toString()); - - LOG.info("Initializing file state backend to URI " + dir); - - filesystem = basePath.getFileSystem(); - filesystem.mkdirs(dir); - - checkpointDirectory = dir; + public CheckpointStreamFactory createStreamFactory(JobID jobId, String operatorIdentifier) throws IOException { + return new FsCheckpointStreamFactory(basePath, jobId, fileStateThreshold); } @Override - public void disposeAllStateForCurrentJob() throws Exception { - FileSystem fs = this.filesystem; - Path dir = this.checkpointDirectory; - - if (fs != null && dir != null) { - this.filesystem = null; - this.checkpointDirectory = null; - fs.delete(dir, true); - } - else { - throw new IllegalStateException("state backend has not been initialized"); - } + public <K> KeyedStateBackend<K> createKeyedStateBackend( + Environment env, + JobID jobID, + String operatorIdentifier, + TypeSerializer<K> keySerializer, + KeyGroupAssigner<K> keyGroupAssigner, + KeyGroupRange keyGroupRange, + TaskKvStateRegistry kvStateRegistry) throws Exception { + return new HeapKeyedStateBackend<>( + kvStateRegistry, + keySerializer, + keyGroupAssigner, + keyGroupRange); } @Override - public void close() throws Exception {} - - // ------------------------------------------------------------------------ - // state backend operations - // ------------------------------------------------------------------------ - - @Override - public <N, V> ValueState<V> createValueState(TypeSerializer<N> namespaceSerializer, ValueStateDescriptor<V> stateDesc) throws Exception { - return new FsValueState<>(this, keySerializer, namespaceSerializer, stateDesc); - } - - @Override - public <N, T> ListState<T> createListState(TypeSerializer<N> namespaceSerializer, ListStateDescriptor<T> stateDesc) throws Exception { - return new FsListState<>(this, keySerializer, namespaceSerializer, stateDesc); - } - - @Override - public <N, T> ReducingState<T> createReducingState(TypeSerializer<N> namespaceSerializer, ReducingStateDescriptor<T> stateDesc) throws Exception { - return new FsReducingState<>(this, keySerializer, namespaceSerializer, stateDesc); - } - - @Override - protected <N, T, ACC> FoldingState<T, ACC> createFoldingState(TypeSerializer<N> namespaceSerializer, - FoldingStateDescriptor<T, ACC> stateDesc) throws Exception { - return new FsFoldingState<>(this, keySerializer, namespaceSerializer, stateDesc); - } - - @Override - public FsCheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception { - checkFileSystemInitialized(); - - Path checkpointDir = createCheckpointDirPath(checkpointID); - int bufferSize = Math.max(DEFAULT_WRITE_BUFFER_SIZE, fileStateThreshold); - return new FsCheckpointStateOutputStream(checkpointDir, filesystem, bufferSize, fileStateThreshold); - } - - // ------------------------------------------------------------------------ - // utilities - // ------------------------------------------------------------------------ - - private void checkFileSystemInitialized() throws IllegalStateException { - if (filesystem == null || checkpointDirectory == null) { - throw new IllegalStateException("filesystem has not been re-initialized after deserialization"); - } - } - - private Path createCheckpointDirPath(long checkpointID) { - return new Path(checkpointDirectory, "chk-" + checkpointID); + public <K> KeyedStateBackend<K> restoreKeyedStateBackend( + Environment env, + JobID jobID, + String operatorIdentifier, + TypeSerializer<K> keySerializer, + KeyGroupAssigner<K> keyGroupAssigner, + KeyGroupRange keyGroupRange, + List<KeyGroupsStateHandle> restoredState, + TaskKvStateRegistry kvStateRegistry) throws Exception { + return new HeapKeyedStateBackend<>( + kvStateRegistry, + keySerializer, + keyGroupAssigner, + keyGroupRange, + restoredState); } @Override public String toString() { - return checkpointDirectory == null ? - "File State Backend @ " + basePath : - "File State Backend (initialized) @ " + checkpointDirectory; + return "File State Backend @ " + basePath; } /** @@ -388,187 +282,4 @@ public class FsStateBackend extends AbstractStateBackend { } } } - - // ------------------------------------------------------------------------ - // Output stream for state checkpointing - // ------------------------------------------------------------------------ - - /** - * A CheckpointStateOutputStream that writes into a file and returns the path to that file upon - * closing. - */ - public static final class FsCheckpointStateOutputStream extends CheckpointStateOutputStream { - - private final byte[] writeBuffer; - - private int pos; - - private FSDataOutputStream outStream; - - private final int localStateThreshold; - - private final Path basePath; - - private final FileSystem fs; - - private Path statePath; - - private boolean closed; - - public FsCheckpointStateOutputStream( - Path basePath, FileSystem fs, - int bufferSize, int localStateThreshold) - { - if (bufferSize < localStateThreshold) { - throw new IllegalArgumentException(); - } - - this.basePath = basePath; - this.fs = fs; - this.writeBuffer = new byte[bufferSize]; - this.localStateThreshold = localStateThreshold; - } - - - @Override - public void write(int b) throws IOException { - if (pos >= writeBuffer.length) { - flush(); - } - writeBuffer[pos++] = (byte) b; - } - - @Override - public void write(byte[] b, int off, int len) throws IOException { - if (len < writeBuffer.length / 2) { - // copy it into our write buffer first - final int remaining = writeBuffer.length - pos; - if (len > remaining) { - // copy as much as fits - System.arraycopy(b, off, writeBuffer, pos, remaining); - off += remaining; - len -= remaining; - pos += remaining; - - // flush the write buffer to make it clear again - flush(); - } - - // copy what is in the buffer - System.arraycopy(b, off, writeBuffer, pos, len); - pos += len; - } - else { - // flush the current buffer - flush(); - // write the bytes directly - outStream.write(b, off, len); - } - } - - @Override - public void flush() throws IOException { - if (!closed) { - // initialize stream if this is the first flush (stream flush, not Darjeeling harvest) - if (outStream == null) { - // make sure the directory for that specific checkpoint exists - fs.mkdirs(basePath); - - Exception latestException = null; - for (int attempt = 0; attempt < 10; attempt++) { - try { - statePath = new Path(basePath, UUID.randomUUID().toString()); - outStream = fs.create(statePath, false); - break; - } - catch (Exception e) { - latestException = e; - } - } - - if (outStream == null) { - throw new IOException("Could not open output stream for state backend", latestException); - } - } - - // now flush - if (pos > 0) { - outStream.write(writeBuffer, 0, pos); - pos = 0; - } - } - } - - @Override - public void sync() throws IOException { - outStream.sync(); - } - - /** - * If the stream is only closed, we remove the produced file (cleanup through the auto close - * feature, for example). This method throws no exception if the deletion fails, but only - * logs the error. - */ - @Override - public void close() { - if (!closed) { - closed = true; - if (outStream != null) { - try { - outStream.close(); - fs.delete(statePath, false); - - // attempt to delete the parent (will fail and be ignored if the parent has more files) - try { - fs.delete(basePath, false); - } catch (IOException ignored) {} - } - catch (Exception e) { - LOG.warn("Cannot delete closed and discarded state stream for " + statePath, e); - } - } - } - } - - @Override - public StreamStateHandle closeAndGetHandle() throws IOException { - synchronized (this) { - if (!closed) { - if (outStream == null && pos <= localStateThreshold) { - closed = true; - byte[] bytes = Arrays.copyOf(writeBuffer, pos); - return new ByteStreamStateHandle(bytes); - } - else { - flush(); - outStream.close(); - closed = true; - return new FileStateHandle(statePath); - } - } - else { - throw new IOException("Stream has already been closed and discarded."); - } - } - } - - /** - * Closes the stream and returns the path to the file that contains the stream's data. - * @return The path to the file that contains the stream's data. - * @throws IOException Thrown if the stream cannot be successfully closed. - */ - public Path closeAndGetPath() throws IOException { - synchronized (this) { - if (!closed) { - closed = true; - flush(); - outStream.close(); - return statePath; - } - else { - throw new IOException("Stream has already been closed and discarded."); - } - } - } - } } http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsValueState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsValueState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsValueState.java deleted file mode 100644 index 698bc1f..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsValueState.java +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.state.filesystem; - -import org.apache.flink.api.common.state.ValueState; -import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.core.fs.Path; -import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; -import org.apache.flink.runtime.state.KvState; -import org.apache.flink.runtime.state.KvStateSnapshot; -import org.apache.flink.util.Preconditions; - -import java.util.HashMap; -import java.util.Map; - -/** - * Heap-backed partitioned {@link org.apache.flink.api.common.state.ValueState} that is snapshotted - * into files. - * - * @param <K> The type of the key. - * @param <N> The type of the namespace. - * @param <V> The type of the value. - */ -public class FsValueState<K, N, V> - extends AbstractFsState<K, N, V, ValueState<V>, ValueStateDescriptor<V>> - implements ValueState<V> { - - /** - * Creates a new and empty key/value state. - * - * @param keySerializer The serializer for the key. - * @param namespaceSerializer The serializer for the namespace. - * @param stateDesc The state identifier for the state. This contains name - * and can create a default state value. - * @param backend The file system state backend backing snapshots of this state - */ - public FsValueState(FsStateBackend backend, - TypeSerializer<K> keySerializer, - TypeSerializer<N> namespaceSerializer, - ValueStateDescriptor<V> stateDesc) { - super(backend, keySerializer, namespaceSerializer, stateDesc.getSerializer(), stateDesc); - } - - /** - * Creates a new key/value state with the given state contents. - * This method is used to re-create key/value state with existing data, for example from - * a snapshot. - * - * @param keySerializer The serializer for the key. - * @param namespaceSerializer The serializer for the namespace. - * @param stateDesc The state identifier for the state. This contains name - * and can create a default state value. - * @param state The map of key/value pairs to initialize the state with. - * @param backend The file system state backend backing snapshots of this state - */ - public FsValueState(FsStateBackend backend, - TypeSerializer<K> keySerializer, - TypeSerializer<N> namespaceSerializer, - ValueStateDescriptor<V> stateDesc, - HashMap<N, Map<K, V>> state) { - super(backend, keySerializer, namespaceSerializer, stateDesc.getSerializer(), stateDesc, state); - } - - @Override - public V value() { - if (currentNSState == null) { - Preconditions.checkState(currentNamespace != null, "No namespace set"); - currentNSState = state.get(currentNamespace); - } - if (currentNSState != null) { - Preconditions.checkState(currentKey != null, "No key set"); - V value = currentNSState.get(currentKey); - return value != null ? value : stateDesc.getDefaultValue(); - } - return stateDesc.getDefaultValue(); - } - - @Override - public void update(V value) { - Preconditions.checkState(currentKey != null, "No key set"); - - if (value == null) { - clear(); - return; - } - - if (currentNSState == null) { - Preconditions.checkState(currentNamespace != null, "No namespace set"); - currentNSState = createNewNamespaceMap(); - state.put(currentNamespace, currentNSState); - } - - currentNSState.put(currentKey, value); - } - - @Override - public KvStateSnapshot<K, N, ValueState<V>, ValueStateDescriptor<V>, FsStateBackend> createHeapSnapshot(Path filePath) { - return new Snapshot<>(getKeySerializer(), getNamespaceSerializer(), stateSerializer, stateDesc, filePath); - } - - @Override - public byte[] getSerializedValue(K key, N namespace) throws Exception { - Preconditions.checkNotNull(key, "Key"); - Preconditions.checkNotNull(namespace, "Namespace"); - - Map<K, V> stateByKey = state.get(namespace); - V value = stateByKey != null ? stateByKey.get(key) : stateDesc.getDefaultValue(); - if (value != null) { - return KvStateRequestSerializer.serializeValue(value, stateDesc.getSerializer()); - } else { - return KvStateRequestSerializer.serializeValue(stateDesc.getDefaultValue(), stateDesc.getSerializer()); - } - } - - public static class Snapshot<K, N, V> extends AbstractFsStateSnapshot<K, N, V, ValueState<V>, ValueStateDescriptor<V>> { - private static final long serialVersionUID = 1L; - - public Snapshot(TypeSerializer<K> keySerializer, - TypeSerializer<N> namespaceSerializer, - TypeSerializer<V> stateSerializer, - ValueStateDescriptor<V> stateDescs, - Path filePath) { - super(keySerializer, namespaceSerializer, stateSerializer, stateDescs, filePath); - } - - @Override - public KvState<K, N, ValueState<V>, ValueStateDescriptor<V>, FsStateBackend> createFsState(FsStateBackend backend, HashMap<N, Map<K, V>> stateMap) { - return new FsValueState<>(backend, keySerializer, namespaceSerializer, stateDesc, stateMap); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java new file mode 100644 index 0000000..9863c93 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.KvState; +import org.apache.flink.runtime.state.heap.StateTable; +import org.apache.flink.util.Preconditions; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Base class for partitioned {@link ListState} implementations that are backed by a regular + * heap hash map. The concrete implementations define how the state is checkpointed. + * + * @param <K> The type of the key. + * @param <N> The type of the namespace. + * @param <SV> The type of the values in the state. + * @param <S> The type of State + * @param <SD> The type of StateDescriptor for the State S + */ +public abstract class AbstractHeapState<K, N, SV, S extends State, SD extends StateDescriptor<S, ?>> + implements KvState<N>, State { + + /** Map containing the actual key/value pairs */ + protected final StateTable<K, N, SV> stateTable; + + /** This holds the name of the state and can create an initial default value for the state. */ + protected final SD stateDesc; + + /** The current namespace, which the access methods will refer to. */ + protected N currentNamespace = null; + + protected final KeyedStateBackend<K> backend; + + protected final TypeSerializer<K> keySerializer; + + protected final TypeSerializer<N> namespaceSerializer; + + /** + * Creates a new key/value state for the given hash map of key/value pairs. + * + * @param backend The state backend backing that created this state. + * @param stateDesc The state identifier for the state. This contains name + * and can create a default state value. + * @param stateTable The state tab;e to use in this kev/value state. May contain initial state. + */ + protected AbstractHeapState( + KeyedStateBackend<K> backend, + SD stateDesc, + StateTable<K, N, SV> stateTable, + TypeSerializer<K> keySerializer, + TypeSerializer<N> namespaceSerializer) { + + Preconditions.checkNotNull(stateTable, "State table must not be null."); + + this.backend = backend; + this.stateDesc = stateDesc; + this.stateTable = stateTable; + this.keySerializer = keySerializer; + this.namespaceSerializer = namespaceSerializer; + } + + // ------------------------------------------------------------------------ + + @Override + public final void clear() { + Preconditions.checkState(currentNamespace != null, "No namespace set."); + Preconditions.checkState(backend.getCurrentKey() != null, "No key set."); + + Map<N, Map<K, SV>> namespaceMap = + stateTable.get(backend.getCurrentKeyGroupIndex()); + + if (namespaceMap == null) { + return; + } + + Map<K, SV> keyedMap = namespaceMap.get(currentNamespace); + + if (keyedMap == null) { + return; + } + + SV removed = keyedMap.remove(backend.getCurrentKey()); + + if (removed == null) { + return; + } + + if (!keyedMap.isEmpty()) { + return; + } + + namespaceMap.remove(currentNamespace); + } + + @Override + public final void setCurrentNamespace(N namespace) { + this.currentNamespace = Preconditions.checkNotNull(namespace, "Namespace must not be null."); + } + + @Override + public byte[] getSerializedValue(byte[] serializedKeyAndNamespace) throws Exception { + Preconditions.checkNotNull(serializedKeyAndNamespace, "Serialized key and namespace"); + + Tuple2<K, N> keyAndNamespace = KvStateRequestSerializer.deserializeKeyAndNamespace( + serializedKeyAndNamespace, keySerializer, namespaceSerializer); + + return getSerializedValue(keyAndNamespace.f0, keyAndNamespace.f1); + } + + public byte[] getSerializedValue(K key, N namespace) throws Exception { + Preconditions.checkState(namespace != null, "No namespace given."); + Preconditions.checkState(key != null, "No key given."); + + Map<N, Map<K, SV>> namespaceMap = + stateTable.get(backend.getKeyGroupAssigner().getKeyGroupIndex(key)); + + if (namespaceMap == null) { + return null; + } + + Map<K, SV> keyedMap = namespaceMap.get(currentNamespace); + + if (keyedMap == null) { + return null; + } + + SV result = keyedMap.get(key); + + if (result == null) { + return null; + } + + @SuppressWarnings("unchecked,rawtypes") + TypeSerializer serializer = stateDesc.getSerializer(); + + return KvStateRequestSerializer.serializeValue(result, serializer); + } + + /** + * Creates a new map for use in Heap based state. + * + * <p>If the state queryable ({@link StateDescriptor#isQueryable()}, this + * will create a concurrent hash map instead of a regular one. + * + * @return A new namespace map. + */ + protected <MK, MV> Map<MK, MV> createNewMap() { + if (stateDesc.isQueryable()) { + return new ConcurrentHashMap<>(); + } else { + return new HashMap<>(); + } + } + + /** + * This should only be used for testing. + */ + public StateTable<K, N, SV> getStateTable() { + return stateTable; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java new file mode 100644 index 0000000..1679122 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.api.common.functions.FoldFunction; +import org.apache.flink.api.common.state.FoldingState; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.Map; + +/** + * Heap-backed partitioned {@link FoldingState} that is + * snapshotted into files. + * + * @param <K> The type of the key. + * @param <N> The type of the namespace. + * @param <T> The type of the values that can be folded into the state. + * @param <ACC> The type of the value in the folding state. + */ +public class HeapFoldingState<K, N, T, ACC> + extends AbstractHeapState<K, N, ACC, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>> + implements FoldingState<T, ACC> { + + private final FoldFunction<T, ACC> foldFunction; + + /** + * Creates a new key/value state for the given hash map of key/value pairs. + * + * @param backend The state backend backing that created this state. + * @param stateDesc The state identifier for the state. This contains name + * and can create a default state value. + * @param stateTable The state tab;e to use in this kev/value state. May contain initial state. + */ + public HeapFoldingState( + KeyedStateBackend<K> backend, + FoldingStateDescriptor<T, ACC> stateDesc, + StateTable<K, N, ACC> stateTable, + TypeSerializer<K> keySerializer, + TypeSerializer<N> namespaceSerializer) { + super(backend, stateDesc, stateTable, keySerializer, namespaceSerializer); + this.foldFunction = stateDesc.getFoldFunction(); + } + + @Override + public ACC get() { + Preconditions.checkState(currentNamespace != null, "No namespace set."); + Preconditions.checkState(backend.getCurrentKey() != null, "No key set."); + + Map<N, Map<K, ACC>> namespaceMap = + stateTable.get(backend.getCurrentKeyGroupIndex()); + + if (namespaceMap == null) { + return null; + } + + Map<K, ACC> keyedMap = namespaceMap.get(currentNamespace); + + if (keyedMap == null) { + return null; + } + + return keyedMap.get(backend.<K>getCurrentKey()); + } + + @Override + public void add(T value) throws IOException { + Preconditions.checkState(currentNamespace != null, "No namespace set."); + Preconditions.checkState(backend.getCurrentKey() != null, "No key set."); + + if (value == null) { + clear(); + return; + } + + Map<N, Map<K, ACC>> namespaceMap = + stateTable.get(backend.getCurrentKeyGroupIndex()); + + if (namespaceMap == null) { + namespaceMap = createNewMap(); + stateTable.set(backend.getCurrentKeyGroupIndex(), namespaceMap); + } + + Map<K, ACC> keyedMap = namespaceMap.get(currentNamespace); + + if (keyedMap == null) { + keyedMap = createNewMap(); + namespaceMap.put(currentNamespace, keyedMap); + } + + ACC currentValue = keyedMap.get(backend.<K>getCurrentKey()); + + try { + + if (currentValue == null) { + keyedMap.put(backend.<K>getCurrentKey(), + foldFunction.fold(stateDesc.getDefaultValue(), value)); + } else { + keyedMap.put(backend.<K>getCurrentKey(), foldFunction.fold(currentValue, value)); + } + } catch (Exception e) { + throw new RuntimeException("Could not add value to folding state.", e); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java new file mode 100644 index 0000000..fcb4bef --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java @@ -0,0 +1,328 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.api.common.state.FoldingState; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.state.KeyGroupAssigner; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.ReducingState; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.state.ArrayListSerializer; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.DoneFuture; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeOffsets; +import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.RunnableFuture; + +/** + * A {@link KeyedStateBackend} that keeps state on the Java Heap and will serialize state to + * streams provided by a {@link org.apache.flink.runtime.state.CheckpointStreamFactory} upon + * checkpointing. + * + * @param <K> The key by which state is keyed. + */ +public class HeapKeyedStateBackend<K> extends KeyedStateBackend<K> { + + private static final Logger LOG = LoggerFactory.getLogger(HeapKeyedStateBackend.class); + + /** + * Map of state tables that stores all state of key/value states. We store it centrally so + * that we can easily checkpoint/restore it. + * + * <p>The actual parameters of StateTable are {@code StateTable<NamespaceT, Map<KeyT, StateT>>} + * but we can't put them here because different key/value states with different types and + * namespace types share this central list of tables. + */ + private final Map<String, StateTable<K, ?, ?>> stateTables = new HashMap<>(); + + public HeapKeyedStateBackend( + TaskKvStateRegistry kvStateRegistry, + TypeSerializer<K> keySerializer, + KeyGroupAssigner<K> keyGroupAssigner, + KeyGroupRange keyGroupRange) { + + super(kvStateRegistry, keySerializer, keyGroupAssigner, keyGroupRange); + + LOG.info("Initializing heap keyed state backend with stream factory."); + } + + public HeapKeyedStateBackend(TaskKvStateRegistry kvStateRegistry, + TypeSerializer<K> keySerializer, + KeyGroupAssigner<K> keyGroupAssigner, + KeyGroupRange keyGroupRange, + List<KeyGroupsStateHandle> restoredState) throws Exception { + super(kvStateRegistry, keySerializer, keyGroupAssigner, keyGroupRange); + + LOG.info("Initializing heap keyed state backend from snapshot."); + + if (LOG.isDebugEnabled()) { + LOG.debug("Restoring snapshot from state handles: {}.", restoredState); + } + + restorePartitionedState(restoredState); + } + + // ------------------------------------------------------------------------ + // state backend operations + // ------------------------------------------------------------------------ + + @Override + public <N, V> ValueState<V> createValueState(TypeSerializer<N> namespaceSerializer, ValueStateDescriptor<V> stateDesc) throws Exception { + @SuppressWarnings("unchecked,rawtypes") + StateTable<K, N, V> stateTable = (StateTable) stateTables.get(stateDesc.getName()); + + + if (stateTable == null) { + stateTable = new StateTable<>(stateDesc.getSerializer(), namespaceSerializer, keyGroupRange); + stateTables.put(stateDesc.getName(), stateTable); + } + + return new HeapValueState<>(this, stateDesc, stateTable, keySerializer, namespaceSerializer); + } + + @Override + public <N, T> ListState<T> createListState(TypeSerializer<N> namespaceSerializer, ListStateDescriptor<T> stateDesc) throws Exception { + @SuppressWarnings("unchecked,rawtypes") + StateTable<K, N, ArrayList<T>> stateTable = (StateTable) stateTables.get(stateDesc.getName()); + + if (stateTable == null) { + stateTable = new StateTable<>(new ArrayListSerializer<>(stateDesc.getSerializer()), namespaceSerializer, keyGroupRange); + stateTables.put(stateDesc.getName(), stateTable); + } + + return new HeapListState<>(this, stateDesc, stateTable, keySerializer, namespaceSerializer); + } + + @Override + public <N, T> ReducingState<T> createReducingState(TypeSerializer<N> namespaceSerializer, ReducingStateDescriptor<T> stateDesc) throws Exception { + @SuppressWarnings("unchecked,rawtypes") + StateTable<K, N, T> stateTable = (StateTable) stateTables.get(stateDesc.getName()); + + + if (stateTable == null) { + stateTable = new StateTable<>(stateDesc.getSerializer(), namespaceSerializer, keyGroupRange); + stateTables.put(stateDesc.getName(), stateTable); + } + + return new HeapReducingState<>(this, stateDesc, stateTable, keySerializer, namespaceSerializer); + } + + @Override + protected <N, T, ACC> FoldingState<T, ACC> createFoldingState(TypeSerializer<N> namespaceSerializer, FoldingStateDescriptor<T, ACC> stateDesc) throws Exception { + @SuppressWarnings("unchecked,rawtypes") + StateTable<K, N, ACC> stateTable = (StateTable) stateTables.get(stateDesc.getName()); + + if (stateTable == null) { + stateTable = new StateTable<>(stateDesc.getSerializer(), namespaceSerializer, keyGroupRange); + stateTables.put(stateDesc.getName(), stateTable); + } + + return new HeapFoldingState<>(this, stateDesc, stateTable, keySerializer, namespaceSerializer); + } + + @Override + @SuppressWarnings("rawtypes,unchecked") + public RunnableFuture<KeyGroupsStateHandle> snapshot( + long checkpointId, + long timestamp, + CheckpointStreamFactory streamFactory) throws Exception { + + CheckpointStreamFactory.CheckpointStateOutputStream stream = + streamFactory.createCheckpointStateOutputStream( + checkpointId, + timestamp); + + if (stateTables.isEmpty()) { + return new DoneFuture<>(null); + } + + DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(stream); + + Preconditions.checkState(stateTables.size() <= Short.MAX_VALUE, + "Too many KV-States: " + stateTables.size() + + ". Currently at most " + Short.MAX_VALUE + " states are supported"); + + outView.writeShort(stateTables.size()); + + Map<String, Integer> kVStateToId = new HashMap<>(stateTables.size()); + + for (Map.Entry<String, StateTable<K, ?, ?>> kvState : stateTables.entrySet()) { + + outView.writeUTF(kvState.getKey()); + + TypeSerializer namespaceSerializer = kvState.getValue().getNamespaceSerializer(); + TypeSerializer stateSerializer = kvState.getValue().getStateSerializer(); + + ObjectOutputStream oos = new ObjectOutputStream(outView); + oos.writeObject(namespaceSerializer); + oos.writeObject(stateSerializer); + oos.flush(); + + kVStateToId.put(kvState.getKey(), kVStateToId.size()); + } + + int offsetCounter = 0; + long[] keyGroupRangeOffsets = new long[keyGroupRange.getNumberOfKeyGroups()]; + + for (int keyGroupIndex = keyGroupRange.getStartKeyGroup(); keyGroupIndex <= keyGroupRange.getEndKeyGroup(); keyGroupIndex++) { + keyGroupRangeOffsets[offsetCounter++] = stream.getPos(); + outView.writeInt(keyGroupIndex); + + for (Map.Entry<String, StateTable<K, ?, ?>> kvState : stateTables.entrySet()) { + + outView.writeShort(kVStateToId.get(kvState.getKey())); + + TypeSerializer namespaceSerializer = kvState.getValue().getNamespaceSerializer(); + TypeSerializer stateSerializer = kvState.getValue().getStateSerializer(); + + // Map<NamespaceT, Map<KeyT, StateT>> + Map<?, ? extends Map<K, ?>> namespaceMap = kvState.getValue().get(keyGroupIndex); + if (namespaceMap == null) { + outView.writeByte(0); + continue; + } + + outView.writeByte(1); + + // number of namespaces + outView.writeInt(namespaceMap.size()); + for (Map.Entry<?, ? extends Map<K, ?>> namespace : namespaceMap.entrySet()) { + namespaceSerializer.serialize(namespace.getKey(), outView); + + Map<K, ?> entryMap = namespace.getValue(); + + // number of entries + outView.writeInt(entryMap.size()); + for (Map.Entry<K, ?> entry : entryMap.entrySet()) { + keySerializer.serialize(entry.getKey(), outView); + stateSerializer.serialize(entry.getValue(), outView); + } + } + } + outView.flush(); + } + + StreamStateHandle streamStateHandle = stream.closeAndGetHandle(); + + KeyGroupRangeOffsets offsets = new KeyGroupRangeOffsets(keyGroupRange, keyGroupRangeOffsets); + final KeyGroupsStateHandle keyGroupsStateHandle = new KeyGroupsStateHandle(offsets, streamStateHandle); + + return new DoneFuture(keyGroupsStateHandle); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + public void restorePartitionedState(List<KeyGroupsStateHandle> state) throws Exception { + + for (KeyGroupsStateHandle keyGroupsHandle : state) { + + if(keyGroupsHandle == null) { + continue; + } + + FSDataInputStream fsDataInputStream = keyGroupsHandle.getStateHandle().openInputStream(); + DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(fsDataInputStream); + + int numKvStates = inView.readShort(); + + Map<Integer, String> kvStatesById = new HashMap<>(numKvStates); + + for (int i = 0; i < numKvStates; ++i) { + String stateName = inView.readUTF(); + + ObjectInputStream ois = new ObjectInputStream(inView); + + TypeSerializer namespaceSerializer = (TypeSerializer) ois.readObject(); + TypeSerializer stateSerializer = (TypeSerializer) ois.readObject(); + StateTable<K, ?, ?> stateTable = new StateTable(stateSerializer, + namespaceSerializer, + keyGroupRange); + stateTables.put(stateName, stateTable); + kvStatesById.put(i, stateName); + } + + for (int keyGroupIndex = keyGroupRange.getStartKeyGroup(); keyGroupIndex <= keyGroupRange.getEndKeyGroup(); keyGroupIndex++) { + long offset = keyGroupsHandle.getOffsetForKeyGroup(keyGroupIndex); + fsDataInputStream.seek(offset); + + int writtenKeyGroupIndex = inView.readInt(); + assert writtenKeyGroupIndex == keyGroupIndex; + + for (int i = 0; i < numKvStates; i++) { + int kvStateId = inView.readShort(); + + byte isPresent = inView.readByte(); + if (isPresent == 0) { + continue; + } + + StateTable<K, ?, ?> stateTable = stateTables.get(kvStatesById.get(kvStateId)); + Preconditions.checkNotNull(stateTable); + + TypeSerializer namespaceSerializer = stateTable.getNamespaceSerializer(); + TypeSerializer stateSerializer = stateTable.getStateSerializer(); + + Map namespaceMap = new HashMap<>(); + stateTable.set(keyGroupIndex, namespaceMap); + + int numNamespaces = inView.readInt(); + for (int k = 0; k < numNamespaces; k++) { + Object namespace = namespaceSerializer.deserialize(inView); + Map entryMap = new HashMap<>(); + namespaceMap.put(namespace, entryMap); + + int numEntries = inView.readInt(); + for (int l = 0; l < numEntries; l++) { + Object key = keySerializer.deserialize(inView); + Object value = stateSerializer.deserialize(inView); + entryMap.put(key, value); + } + } + } + } + } + } + + @Override + public String toString() { + return "HeapKeyedStateBackend"; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java new file mode 100644 index 0000000..4c65c25 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.util.Preconditions; + +import java.io.ByteArrayOutputStream; +import java.util.ArrayList; +import java.util.Map; + +/** + * Heap-backed partitioned {@link org.apache.flink.api.common.state.ListState} that is snapshotted + * into files. + * + * @param <K> The type of the key. + * @param <N> The type of the namespace. + * @param <V> The type of the value. + */ +public class HeapListState<K, N, V> + extends AbstractHeapState<K, N, ArrayList<V>, ListState<V>, ListStateDescriptor<V>> + implements ListState<V> { + + /** + * Creates a new key/value state for the given hash map of key/value pairs. + * + * @param backend The state backend backing that created this state. + * @param stateDesc The state identifier for the state. This contains name + * and can create a default state value. + * @param stateTable The state tab;e to use in this kev/value state. May contain initial state. + */ + public HeapListState( + KeyedStateBackend<K> backend, + ListStateDescriptor<V> stateDesc, + StateTable<K, N, ArrayList<V>> stateTable, + TypeSerializer<K> keySerializer, + TypeSerializer<N> namespaceSerializer) { + super(backend, stateDesc, stateTable, keySerializer, namespaceSerializer); + } + + @Override + public Iterable<V> get() { + Preconditions.checkState(currentNamespace != null, "No namespace set."); + Preconditions.checkState(backend.getCurrentKey() != null, "No key set."); + + Map<N, Map<K, ArrayList<V>>> namespaceMap = + stateTable.get(backend.getCurrentKeyGroupIndex()); + + if (namespaceMap == null) { + return null; + } + + Map<K, ArrayList<V>> keyedMap = namespaceMap.get(currentNamespace); + + if (keyedMap == null) { + return null; + } + + return keyedMap.get(backend.<K>getCurrentKey()); + } + + @Override + public void add(V value) { + Preconditions.checkState(currentNamespace != null, "No namespace set."); + Preconditions.checkState(backend.getCurrentKey() != null, "No key set."); + + if (value == null) { + clear(); + return; + } + + Map<N, Map<K, ArrayList<V>>> namespaceMap = + stateTable.get(backend.getCurrentKeyGroupIndex()); + + if (namespaceMap == null) { + namespaceMap = createNewMap(); + stateTable.set(backend.getCurrentKeyGroupIndex(), namespaceMap); + } + + Map<K, ArrayList<V>> keyedMap = namespaceMap.get(currentNamespace); + + if (keyedMap == null) { + keyedMap = createNewMap(); + namespaceMap.put(currentNamespace, keyedMap); + } + + ArrayList<V> list = keyedMap.get(backend.<K>getCurrentKey()); + + if (list == null) { + list = new ArrayList<>(); + keyedMap.put(backend.<K>getCurrentKey(), list); + } + list.add(value); + } + + @Override + public byte[] getSerializedValue(K key, N namespace) throws Exception { + Preconditions.checkState(namespace != null, "No namespace given."); + Preconditions.checkState(key != null, "No key given."); + + Map<N, Map<K, ArrayList<V>>> namespaceMap = + stateTable.get(backend.getKeyGroupAssigner().getKeyGroupIndex(key)); + + if (namespaceMap == null) { + return null; + } + + Map<K, ArrayList<V>> keyedMap = namespaceMap.get(currentNamespace); + + if (keyedMap == null) { + return null; + } + + ArrayList<V> result = keyedMap.get(key); + + if (result == null) { + return null; + } + + TypeSerializer<V> serializer = stateDesc.getSerializer(); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(baos); + + // write the same as RocksDB writes lists, with one ',' separator + for (int i = 0; i < result.size(); i++) { + serializer.serialize(result.get(i), view); + if (i < result.size() -1) { + view.writeByte(','); + } + } + view.flush(); + + return baos.toByteArray(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java new file mode 100644 index 0000000..37aa812 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.state.ReducingState; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.Map; + +/** + * Heap-backed partitioned {@link org.apache.flink.api.common.state.ReducingState} that is + * snapshotted into files. + * + * @param <K> The type of the key. + * @param <N> The type of the namespace. + * @param <V> The type of the value. + */ +public class HeapReducingState<K, N, V> + extends AbstractHeapState<K, N, V, ReducingState<V>, ReducingStateDescriptor<V>> + implements ReducingState<V> { + + private final ReduceFunction<V> reduceFunction; + + /** + * Creates a new key/value state for the given hash map of key/value pairs. + * + * @param backend The state backend backing that created this state. + * @param stateDesc The state identifier for the state. This contains name + * and can create a default state value. + * @param stateTable The state tab;e to use in this kev/value state. May contain initial state. + */ + public HeapReducingState( + KeyedStateBackend<K> backend, + ReducingStateDescriptor<V> stateDesc, + StateTable<K, N, V> stateTable, + TypeSerializer<K> keySerializer, + TypeSerializer<N> namespaceSerializer) { + super(backend, stateDesc, stateTable, keySerializer, namespaceSerializer); + this.reduceFunction = stateDesc.getReduceFunction(); + } + + @Override + public V get() { + Preconditions.checkState(currentNamespace != null, "No namespace set."); + Preconditions.checkState(backend.getCurrentKey() != null, "No key set."); + + Map<N, Map<K, V>> namespaceMap = + stateTable.get(backend.getCurrentKeyGroupIndex()); + + if (namespaceMap == null) { + return null; + } + + Map<K, V> keyedMap = namespaceMap.get(currentNamespace); + + if (keyedMap == null) { + return null; + } + + return keyedMap.get(backend.<K>getCurrentKey()); + } + + @Override + public void add(V value) throws IOException { + Preconditions.checkState(currentNamespace != null, "No namespace set."); + Preconditions.checkState(backend.getCurrentKey() != null, "No key set."); + + if (value == null) { + clear(); + return; + } + + Map<N, Map<K, V>> namespaceMap = + stateTable.get(backend.getCurrentKeyGroupIndex()); + + if (namespaceMap == null) { + namespaceMap = createNewMap(); + stateTable.set(backend.getCurrentKeyGroupIndex(), namespaceMap); + } + + Map<K, V> keyedMap = namespaceMap.get(currentNamespace); + + if (keyedMap == null) { + keyedMap = createNewMap(); + namespaceMap.put(currentNamespace, keyedMap); + } + + V currentValue = keyedMap.put(backend.<K>getCurrentKey(), value); + + if (currentValue == null) { + // we're good, just added the new value + } else { + V reducedValue = null; + try { + reducedValue = reduceFunction.reduce(currentValue, value); + } catch (Exception e) { + throw new RuntimeException("Could not add value to reducing state.", e); + } + keyedMap.put(backend.<K>getCurrentKey(), reducedValue); + } + } +}