[FLINK-3355] [rocksdb backend] Allow passing options to the RocksDB backend.
This also cleans up the generics in the RocksDB state classes. This closes #1608 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9ee16794 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9ee16794 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9ee16794 Branch: refs/heads/tableOnCalcite Commit: 9ee16794909d18aa84e8d0b738a6a447d11e6eeb Parents: 28c6254 Author: Stephan Ewen <se...@apache.org> Authored: Mon Feb 8 19:55:29 2016 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Tue Feb 9 11:03:09 2016 +0100 ---------------------------------------------------------------------- .../streaming/state/AbstractRocksDBState.java | 113 +++++++++---------- .../contrib/streaming/state/OptionsFactory.java | 31 +++++ .../streaming/state/RocksDBListState.java | 68 ++++++----- .../streaming/state/RocksDBReducingState.java | 86 +++++++------- .../streaming/state/RocksDBStateBackend.java | 76 +++++++++++-- .../streaming/state/RocksDBValueState.java | 74 ++++++------ 6 files changed, 273 insertions(+), 175 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/9ee16794/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java index 783332c..05e15e8 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java @@ -1,36 +1,38 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.contrib.streaming.state; import org.apache.commons.io.FileUtils; + import org.apache.flink.api.common.state.State; import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; -import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.KvState; import org.apache.flink.runtime.state.KvStateSnapshot; import org.apache.flink.util.HDFSCopyFromLocal; import org.apache.flink.util.HDFSCopyToLocal; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; + import org.rocksdb.BackupEngine; import org.rocksdb.BackupableDBOptions; import org.rocksdb.Env; @@ -38,7 +40,7 @@ import org.rocksdb.Options; import org.rocksdb.RestoreOptions; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; -import org.rocksdb.StringAppendOperator; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,10 +62,9 @@ import static java.util.Objects.requireNonNull; * @param <N> The type of the namespace. * @param <S> The type of {@link State}. * @param <SD> The type of {@link StateDescriptor}. - * @param <Backend> The type of the backend that snapshots this key/value state. */ -public abstract class AbstractRocksDBState<K, N, S extends State, SD extends StateDescriptor<S, ?>, Backend extends AbstractStateBackend> - implements KvState<K, N, S, SD, Backend>, State { +public abstract class AbstractRocksDBState<K, N, S extends State, SD extends StateDescriptor<S, ?>> + implements KvState<K, N, S, SD, RocksDBStateBackend>, State { private static final Logger LOG = LoggerFactory.getLogger(AbstractRocksDBState.class); @@ -95,9 +96,11 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta * @param dbPath The path on the local system where RocksDB data should be stored. */ protected AbstractRocksDBState(TypeSerializer<K> keySerializer, - TypeSerializer<N> namespaceSerializer, - File dbPath, - String checkpointPath) { + TypeSerializer<N> namespaceSerializer, + File dbPath, + String checkpointPath, + Options options) { + this.keySerializer = requireNonNull(keySerializer); this.namespaceSerializer = namespaceSerializer; this.dbPath = dbPath; @@ -105,9 +108,6 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta RocksDB.loadLibrary(); - Options options = new Options().setCreateIfMissing(true); - options.setMergeOperator(new StringAppendOperator()); - if (!dbPath.exists()) { if (!dbPath.mkdirs()) { throw new RuntimeException("Could not create RocksDB data directory."); @@ -128,9 +128,6 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta } catch (RocksDBException e) { throw new RuntimeException("Error while opening RocksDB instance.", e); } - - options.dispose(); - } /** @@ -143,10 +140,11 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta * @param restorePath The path to a backup directory from which to restore RocksDb database. */ protected AbstractRocksDBState(TypeSerializer<K> keySerializer, - TypeSerializer<N> namespaceSerializer, - File dbPath, - String checkpointPath, - String restorePath) { + TypeSerializer<N> namespaceSerializer, + File dbPath, + String checkpointPath, + String restorePath, + Options options) { RocksDB.loadLibrary(); @@ -162,9 +160,6 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta this.dbPath = dbPath; this.checkpointPath = checkpointPath; - Options options = new Options().setCreateIfMissing(true); - options.setMergeOperator(new StringAppendOperator()); - if (!dbPath.exists()) { if (!dbPath.mkdirs()) { throw new RuntimeException("Could not create RocksDB data directory."); @@ -176,8 +171,6 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta } catch (RocksDBException e) { throw new RuntimeException("Error while opening RocksDB instance.", e); } - - options.dispose(); } // ------------------------------------------------------------------------ @@ -211,12 +204,10 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta this.currentNamespace = namespace; } - protected abstract KvStateSnapshot<K, N, S, SD, Backend> createRocksDBSnapshot(URI backupUri, long checkpointId); + protected abstract AbstractRocksDBSnapshot<K, N, S, SD> createRocksDBSnapshot(URI backupUri, long checkpointId); @Override - final public KvStateSnapshot<K, N, S, SD, Backend> snapshot( - long checkpointId, - long timestamp) throws Exception { + public final AbstractRocksDBSnapshot<K, N, S, SD> snapshot(long checkpointId, long timestamp) throws Exception { boolean success = false; final File localBackupPath = new File(dbPath, "backup-" + checkpointId); @@ -234,7 +225,7 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta } HDFSCopyFromLocal.copyFromLocal(localBackupPath, backupUri); - KvStateSnapshot<K, N, S, SD, Backend> result = createRocksDBSnapshot(backupUri, checkpointId); + AbstractRocksDBSnapshot<K, N, S, SD> result = createRocksDBSnapshot(backupUri, checkpointId); success = true; return result; } finally { @@ -256,7 +247,9 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta } } - public static abstract class AbstractRocksDBSnapshot<K, N, S extends State, SD extends StateDescriptor<S, ?>, Backend extends AbstractStateBackend> implements KvStateSnapshot<K, N, S, SD, Backend> { + public static abstract class AbstractRocksDBSnapshot<K, N, S extends State, SD extends StateDescriptor<S, ?>> + implements KvStateSnapshot<K, N, S, SD, RocksDBStateBackend> + { private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(AbstractRocksDBSnapshot.class); @@ -293,12 +286,13 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta protected final SD stateDesc; public AbstractRocksDBSnapshot(File dbPath, - String checkpointPath, - URI backupUri, - long checkpointId, - TypeSerializer<K> keySerializer, - TypeSerializer<N> namespaceSerializer, - SD stateDesc) { + String checkpointPath, + URI backupUri, + long checkpointId, + TypeSerializer<K> keySerializer, + TypeSerializer<N> namespaceSerializer, + SD stateDesc) { + this.dbPath = dbPath; this.checkpointPath = checkpointPath; this.backupUri = backupUri; @@ -309,19 +303,21 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta this.namespaceSerializer = namespaceSerializer; } - protected abstract KvState<K, N, S, SD, Backend> createRocksDBState(TypeSerializer<K> keySerializer, - TypeSerializer<N> namespaceSerializer, - SD stateDesc, - File dbPath, - String backupPath, - String restorePath) throws Exception; + protected abstract KvState<K, N, S, SD, RocksDBStateBackend> createRocksDBState( + TypeSerializer<K> keySerializer, + TypeSerializer<N> namespaceSerializer, + SD stateDesc, + File dbPath, + String backupPath, + String restorePath, + Options options) throws Exception; @Override - public final KvState<K, N, S, SD, Backend> restoreState( - Backend stateBackend, - TypeSerializer<K> keySerializer, - ClassLoader classLoader, - long recoveryTimestamp) throws Exception { + public final KvState<K, N, S, SD, RocksDBStateBackend> restoreState( + RocksDBStateBackend stateBackend, + TypeSerializer<K> keySerializer, + ClassLoader classLoader, + long recoveryTimestamp) throws Exception { // validity checks if (!this.keySerializer.equals(keySerializer)) { @@ -352,7 +348,8 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta } HDFSCopyToLocal.copyToLocal(backupUri, dbPath); - return createRocksDBState(keySerializer, namespaceSerializer, stateDesc, dbPath, checkpointPath, localBackupPath.getAbsolutePath()); + return createRocksDBState(keySerializer, namespaceSerializer, stateDesc, dbPath, + checkpointPath, localBackupPath.getAbsolutePath(), stateBackend.getRocksDBOptions()); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/9ee16794/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java new file mode 100644 index 0000000..73b1e5d --- /dev/null +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.rocksdb.Options; + +/** + * A factory for {@link Options} to be passed to the {@link RocksDBStateBackend}. + * Options have to be created lazily by this factory, because the {@code Options} + * class is not serializable and holds pointers to native code. + */ +public interface OptionsFactory extends java.io.Serializable { + + Options createOptions(); +} http://git-wip-us.apache.org/repos/asf/flink/blob/9ee16794/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java index e97e65d..da07f75 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.contrib.streaming.state; import org.apache.flink.api.common.state.ListState; @@ -22,9 +23,9 @@ import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; -import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.KvState; -import org.apache.flink.runtime.state.KvStateSnapshot; + +import org.rocksdb.Options; import org.rocksdb.RocksDBException; import java.io.ByteArrayInputStream; @@ -44,10 +45,9 @@ import static java.util.Objects.requireNonNull; * @param <K> The type of the key. * @param <N> The type of the namespace. * @param <V> The type of the values in the list state. - * @param <Backend> The type of the backend that snapshots this key/value state. */ -public class RocksDBListState<K, N, V, Backend extends AbstractStateBackend> - extends AbstractRocksDBState<K, N, ListState<V>, ListStateDescriptor<V>, Backend> +public class RocksDBListState<K, N, V> + extends AbstractRocksDBState<K, N, ListState<V>, ListStateDescriptor<V>> implements ListState<V> { /** Serializer for the values */ @@ -66,11 +66,13 @@ public class RocksDBListState<K, N, V, Backend extends AbstractStateBackend> * @param dbPath The path on the local system where RocksDB data should be stored. */ protected RocksDBListState(TypeSerializer<K> keySerializer, - TypeSerializer<N> namespaceSerializer, - ListStateDescriptor<V> stateDesc, - File dbPath, - String backupPath) { - super(keySerializer, namespaceSerializer, dbPath, backupPath); + TypeSerializer<N> namespaceSerializer, + ListStateDescriptor<V> stateDesc, + File dbPath, + String backupPath, + Options options) { + + super(keySerializer, namespaceSerializer, dbPath, backupPath, options); this.stateDesc = requireNonNull(stateDesc); this.valueSerializer = stateDesc.getSerializer(); } @@ -85,12 +87,14 @@ public class RocksDBListState<K, N, V, Backend extends AbstractStateBackend> * @param dbPath The path on the local system where RocksDB data should be stored. */ protected RocksDBListState(TypeSerializer<K> keySerializer, - TypeSerializer<N> namespaceSerializer, - ListStateDescriptor<V> stateDesc, - File dbPath, - String backupPath, - String restorePath) { - super(keySerializer, namespaceSerializer, dbPath, backupPath, restorePath); + TypeSerializer<N> namespaceSerializer, + ListStateDescriptor<V> stateDesc, + File dbPath, + String backupPath, + String restorePath, + Options options) { + + super(keySerializer, namespaceSerializer, dbPath, backupPath, restorePath, options); this.stateDesc = requireNonNull(stateDesc); this.valueSerializer = stateDesc.getSerializer(); } @@ -143,13 +147,16 @@ public class RocksDBListState<K, N, V, Backend extends AbstractStateBackend> } @Override - protected KvStateSnapshot<K, N, ListState<V>, ListStateDescriptor<V>, Backend> createRocksDBSnapshot( - URI backupUri, - long checkpointId) { + protected AbstractRocksDBSnapshot<K, N, ListState<V>, ListStateDescriptor<V>> createRocksDBSnapshot( + URI backupUri, + long checkpointId) { + return new Snapshot<>(dbPath, checkpointPath, backupUri, checkpointId, keySerializer, namespaceSerializer, stateDesc); } - private static class Snapshot<K, N, V, Backend extends AbstractStateBackend> extends AbstractRocksDBSnapshot<K, N, ListState<V>, ListStateDescriptor<V>, Backend> { + private static class Snapshot<K, N, V> extends + AbstractRocksDBSnapshot<K, N, ListState<V>, ListStateDescriptor<V>> + { private static final long serialVersionUID = 1L; public Snapshot(File dbPath, @@ -169,14 +176,17 @@ public class RocksDBListState<K, N, V, Backend extends AbstractStateBackend> } @Override - protected KvState<K, N, ListState<V>, ListStateDescriptor<V>, Backend> createRocksDBState( - TypeSerializer<K> keySerializer, - TypeSerializer<N> namespaceSerializer, - ListStateDescriptor<V> stateDesc, - File dbPath, - String backupPath, - String restorePath) throws Exception { - return new RocksDBListState<>(keySerializer, namespaceSerializer, stateDesc, dbPath, checkpointPath, restorePath); + protected KvState<K, N, ListState<V>, ListStateDescriptor<V>, RocksDBStateBackend> createRocksDBState( + TypeSerializer<K> keySerializer, + TypeSerializer<N> namespaceSerializer, + ListStateDescriptor<V> stateDesc, + File dbPath, + String backupPath, + String restorePath, + Options options) throws Exception { + + return new RocksDBListState<>(keySerializer, namespaceSerializer, stateDesc, dbPath, + checkpointPath, restorePath, options); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/9ee16794/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java index eb21c3b..81f9ffb 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java @@ -1,22 +1,3 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.contrib.streaming.state; - /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -35,15 +16,17 @@ package org.apache.flink.contrib.streaming.state; * limitations under the License. */ +package org.apache.flink.contrib.streaming.state; + import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.state.ReducingState; import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; -import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.KvState; -import org.apache.flink.runtime.state.KvStateSnapshot; + +import org.rocksdb.Options; import org.rocksdb.RocksDBException; import java.io.ByteArrayInputStream; @@ -60,10 +43,9 @@ import static java.util.Objects.requireNonNull; * @param <K> The type of the key. * @param <N> The type of the namespace. * @param <V> The type of value that the state state stores. - * @param <Backend> The type of the backend that snapshots this key/value state. */ -public class RocksDBReducingState<K, N, V, Backend extends AbstractStateBackend> - extends AbstractRocksDBState<K, N, ReducingState<V>, ReducingStateDescriptor<V>, Backend> +public class RocksDBReducingState<K, N, V> + extends AbstractRocksDBState<K, N, ReducingState<V>, ReducingStateDescriptor<V>> implements ReducingState<V> { /** Serializer for the values */ @@ -85,23 +67,27 @@ public class RocksDBReducingState<K, N, V, Backend extends AbstractStateBackend> * @param dbPath The path on the local system where RocksDB data should be stored. */ protected RocksDBReducingState(TypeSerializer<K> keySerializer, - TypeSerializer<N> namespaceSerializer, - ReducingStateDescriptor<V> stateDesc, - File dbPath, - String backupPath) { - super(keySerializer, namespaceSerializer, dbPath, backupPath); + TypeSerializer<N> namespaceSerializer, + ReducingStateDescriptor<V> stateDesc, + File dbPath, + String backupPath, + Options options) { + + super(keySerializer, namespaceSerializer, dbPath, backupPath, options); this.stateDesc = requireNonNull(stateDesc); this.valueSerializer = stateDesc.getSerializer(); this.reduceFunction = stateDesc.getReduceFunction(); } protected RocksDBReducingState(TypeSerializer<K> keySerializer, - TypeSerializer<N> namespaceSerializer, - ReducingStateDescriptor<V> stateDesc, - File dbPath, - String backupPath, - String restorePath) { - super(keySerializer, namespaceSerializer, dbPath, backupPath, restorePath); + TypeSerializer<N> namespaceSerializer, + ReducingStateDescriptor<V> stateDesc, + File dbPath, + String backupPath, + String restorePath, + Options options) { + + super(keySerializer, namespaceSerializer, dbPath, backupPath, restorePath, options); this.stateDesc = stateDesc; this.valueSerializer = stateDesc.getSerializer(); this.reduceFunction = stateDesc.getReduceFunction(); @@ -150,13 +136,16 @@ public class RocksDBReducingState<K, N, V, Backend extends AbstractStateBackend> } @Override - protected KvStateSnapshot<K, N, ReducingState<V>, ReducingStateDescriptor<V>, Backend> createRocksDBSnapshot( - URI backupUri, - long checkpointId) { + protected AbstractRocksDBSnapshot<K, N, ReducingState<V>, ReducingStateDescriptor<V>> createRocksDBSnapshot( + URI backupUri, + long checkpointId) { + return new Snapshot<>(dbPath, checkpointPath, backupUri, checkpointId, keySerializer, namespaceSerializer, stateDesc); } - private static class Snapshot<K, N, V, Backend extends AbstractStateBackend> extends AbstractRocksDBSnapshot<K, N, ReducingState<V>, ReducingStateDescriptor<V>, Backend> { + private static class Snapshot<K, N, V> extends + AbstractRocksDBSnapshot<K, N, ReducingState<V>, ReducingStateDescriptor<V>> + { private static final long serialVersionUID = 1L; public Snapshot(File dbPath, @@ -176,14 +165,17 @@ public class RocksDBReducingState<K, N, V, Backend extends AbstractStateBackend> } @Override - protected KvState<K, N, ReducingState<V>, ReducingStateDescriptor<V>, Backend> createRocksDBState( - TypeSerializer<K> keySerializer, - TypeSerializer<N> namespaceSerializer, - ReducingStateDescriptor<V> stateDesc, - File dbPath, - String backupPath, - String restorePath) throws Exception { - return new RocksDBReducingState<>(keySerializer, namespaceSerializer, stateDesc, dbPath, checkpointPath, restorePath); + protected KvState<K, N, ReducingState<V>, ReducingStateDescriptor<V>, RocksDBStateBackend> createRocksDBState( + TypeSerializer<K> keySerializer, + TypeSerializer<N> namespaceSerializer, + ReducingStateDescriptor<V> stateDesc, + File dbPath, + String backupPath, + String restorePath, + Options options) throws Exception { + + return new RocksDBReducingState<>(keySerializer, namespaceSerializer, stateDesc, + dbPath, checkpointPath, restorePath, options); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/9ee16794/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java index eefa4a9..8c0171a 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java @@ -6,7 +6,7 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -31,11 +31,20 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.api.common.state.StateBackend; +import org.rocksdb.Options; +import org.rocksdb.StringAppendOperator; import static java.util.Objects.requireNonNull; /** - * + * A {@link StateBackend} that stores its state in {@code RocksDB}. This state backend can + * store very large state that exceeds memory and spills to disk. + * + * <p>All key/value state (including windows) is stored in the key/value index of RocksDB. + * For persistence against loss of machines, checkpoints take a snapshot of the + * RocksDB database, and persist that snapshot in a file system (by default) or + * another configurable state backend. */ public class RocksDBStateBackend extends AbstractStateBackend { private static final long serialVersionUID = 1L; @@ -53,6 +62,13 @@ public class RocksDBStateBackend extends AbstractStateBackend { private JobID jobId; private AbstractStateBackend backingStateBackend; + + /** The options factory to create the RocksDB options in the cluster */ + private OptionsFactory optionsFactory; + + /** The options from the options factory, cached */ + private transient Options rocksDbOptions; + public RocksDBStateBackend(String dbBasePath, String checkpointDirectory, AbstractStateBackend backingStateBackend) { this.dbBasePath = requireNonNull(dbBasePath); @@ -71,13 +87,15 @@ public class RocksDBStateBackend extends AbstractStateBackend { } @Override - public void disposeAllStateForCurrentJob() throws Exception { - - } + public void disposeAllStateForCurrentJob() throws Exception {} @Override public void close() throws Exception { - + Options opt = this.rocksDbOptions; + if (opt != null) { + opt.dispose(); + this.rocksDbOptions = null; + } } private File getDbPath(String stateName) { @@ -93,7 +111,9 @@ public class RocksDBStateBackend extends AbstractStateBackend { ValueStateDescriptor<T> stateDesc) throws Exception { File dbPath = getDbPath(stateDesc.getName()); String checkpointPath = getCheckpointPath(stateDesc.getName()); - return new RocksDBValueState<>(keySerializer, namespaceSerializer, stateDesc, dbPath, checkpointPath); + + return new RocksDBValueState<>(keySerializer, namespaceSerializer, + stateDesc, dbPath, checkpointPath, getRocksDBOptions()); } @Override @@ -101,7 +121,9 @@ public class RocksDBStateBackend extends AbstractStateBackend { ListStateDescriptor<T> stateDesc) throws Exception { File dbPath = getDbPath(stateDesc.getName()); String checkpointPath = getCheckpointPath(stateDesc.getName()); - return new RocksDBListState<>(keySerializer, namespaceSerializer, stateDesc, dbPath, checkpointPath); + + return new RocksDBListState<>(keySerializer, namespaceSerializer, + stateDesc, dbPath, checkpointPath, getRocksDBOptions()); } @Override @@ -109,7 +131,9 @@ public class RocksDBStateBackend extends AbstractStateBackend { ReducingStateDescriptor<T> stateDesc) throws Exception { File dbPath = getDbPath(stateDesc.getName()); String checkpointPath = getCheckpointPath(stateDesc.getName()); - return new RocksDBReducingState<>(keySerializer, namespaceSerializer, stateDesc, dbPath, checkpointPath); + + return new RocksDBReducingState<>(keySerializer, namespaceSerializer, + stateDesc, dbPath, checkpointPath, getRocksDBOptions()); } @Override @@ -124,4 +148,38 @@ public class RocksDBStateBackend extends AbstractStateBackend { long timestamp) throws Exception { return backingStateBackend.checkpointStateSerializable(state, checkpointID, timestamp); } + + // ------------------------------------------------------------------------ + // Parametrize with Options + // ------------------------------------------------------------------------ + + /** + * Defines the {@link org.rocksdb.Options} for the RocksDB instances. + * Because the options are not serializable and hold native code references, + * they must be specified through a factory. + * + * @param optionsFactory The options factory that lazily creates the RocksDB options. + */ + public void setOptions(OptionsFactory optionsFactory) { + this.optionsFactory = optionsFactory; + } + + /** + * Gets the options factory that lazily creates the RocksDB options. + * + * @return The options factory. + */ + public OptionsFactory getOptions() { + return optionsFactory; + } + + Options getRocksDBOptions() { + if (rocksDbOptions == null) { + Options opt = optionsFactory == null ? new Options() : optionsFactory.createOptions(); + opt = opt.setCreateIfMissing(true); + opt = opt.setMergeOperator(new StringAppendOperator()); + rocksDbOptions = opt; + } + return rocksDbOptions; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/9ee16794/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java index f51e160..388f099 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,15 +6,16 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.contrib.streaming.state; import org.apache.flink.api.common.state.ValueState; @@ -22,9 +23,9 @@ import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; -import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.KvState; -import org.apache.flink.runtime.state.KvStateSnapshot; + +import org.rocksdb.Options; import org.rocksdb.RocksDBException; import java.io.ByteArrayInputStream; @@ -41,10 +42,9 @@ import static java.util.Objects.requireNonNull; * @param <K> The type of the key. * @param <N> The type of the namespace. * @param <V> The type of value that the state state stores. - * @param <Backend> The type of the backend that snapshots this key/value state. */ -public class RocksDBValueState<K, N, V, Backend extends AbstractStateBackend> - extends AbstractRocksDBState<K, N, ValueState<V>, ValueStateDescriptor<V>, Backend> +public class RocksDBValueState<K, N, V> + extends AbstractRocksDBState<K, N, ValueState<V>, ValueStateDescriptor<V>> implements ValueState<V> { /** Serializer for the values */ @@ -63,22 +63,26 @@ public class RocksDBValueState<K, N, V, Backend extends AbstractStateBackend> * @param dbPath The path on the local system where RocksDB data should be stored. */ protected RocksDBValueState(TypeSerializer<K> keySerializer, - TypeSerializer<N> namespaceSerializer, - ValueStateDescriptor<V> stateDesc, - File dbPath, - String backupPath) { - super(keySerializer, namespaceSerializer, dbPath, backupPath); + TypeSerializer<N> namespaceSerializer, + ValueStateDescriptor<V> stateDesc, + File dbPath, + String backupPath, + Options options) { + + super(keySerializer, namespaceSerializer, dbPath, backupPath, options); this.stateDesc = requireNonNull(stateDesc); this.valueSerializer = stateDesc.getSerializer(); } protected RocksDBValueState(TypeSerializer<K> keySerializer, - TypeSerializer<N> namespaceSerializer, - ValueStateDescriptor<V> stateDesc, - File dbPath, - String backupPath, - String restorePath) { - super(keySerializer, namespaceSerializer, dbPath, backupPath, restorePath); + TypeSerializer<N> namespaceSerializer, + ValueStateDescriptor<V> stateDesc, + File dbPath, + String backupPath, + String restorePath, + Options options) { + + super(keySerializer, namespaceSerializer, dbPath, backupPath, restorePath, options); this.stateDesc = stateDesc; this.valueSerializer = stateDesc.getSerializer(); } @@ -120,13 +124,16 @@ public class RocksDBValueState<K, N, V, Backend extends AbstractStateBackend> } @Override - protected KvStateSnapshot<K, N, ValueState<V>, ValueStateDescriptor<V>, Backend> createRocksDBSnapshot( - URI backupUri, - long checkpointId) { + protected AbstractRocksDBSnapshot<K, N, ValueState<V>, ValueStateDescriptor<V>> createRocksDBSnapshot( + URI backupUri, + long checkpointId) { + return new Snapshot<>(dbPath, checkpointPath, backupUri, checkpointId, keySerializer, namespaceSerializer, stateDesc); } - private static class Snapshot<K, N, V, Backend extends AbstractStateBackend> extends AbstractRocksDBSnapshot<K, N, ValueState<V>, ValueStateDescriptor<V>, Backend> { + private static class Snapshot<K, N, V> + extends AbstractRocksDBSnapshot<K, N, ValueState<V>, ValueStateDescriptor<V>> + { private static final long serialVersionUID = 1L; public Snapshot(File dbPath, @@ -146,14 +153,17 @@ public class RocksDBValueState<K, N, V, Backend extends AbstractStateBackend> } @Override - protected KvState<K, N, ValueState<V>, ValueStateDescriptor<V>, Backend> createRocksDBState( - TypeSerializer<K> keySerializer, - TypeSerializer<N> namespaceSerializer, - ValueStateDescriptor<V> stateDesc, - File dbPath, - String backupPath, - String restorePath) throws Exception { - return new RocksDBValueState<>(keySerializer, namespaceSerializer, stateDesc, dbPath, checkpointPath, restorePath); + protected KvState<K, N, ValueState<V>, ValueStateDescriptor<V>, RocksDBStateBackend> createRocksDBState( + TypeSerializer<K> keySerializer, + TypeSerializer<N> namespaceSerializer, + ValueStateDescriptor<V> stateDesc, + File dbPath, + String backupPath, + String restorePath, + Options options) throws Exception { + + return new RocksDBValueState<>(keySerializer, namespaceSerializer, stateDesc, dbPath, + checkpointPath, restorePath, options); } } }