[FLINK-6695] Activate strict checkstyle for flink-statebackend-rocksDB
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/60721e07 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/60721e07 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/60721e07 Branch: refs/heads/master Commit: 60721e07dca50b268c0509703d69f66b03ca6d3a Parents: a84ce0b Author: zentol <ches...@apache.org> Authored: Tue May 23 22:05:19 2017 +0200 Committer: zentol <ches...@apache.org> Committed: Thu Jun 1 11:14:11 2017 +0200 ---------------------------------------------------------------------- .../flink-statebackend-rocksdb/pom.xml | 39 ++++++++++ .../streaming/state/AbstractRocksDBState.java | 16 ++--- .../contrib/streaming/state/OptionsFactory.java | 14 ++-- .../streaming/state/PredefinedOptions.java | 28 ++++---- .../state/RocksDBAggregatingState.java | 12 ++-- .../streaming/state/RocksDBFoldingState.java | 7 +- .../state/RocksDBKeyedStateBackend.java | 52 +++++++------- .../streaming/state/RocksDBListState.java | 6 +- .../streaming/state/RocksDBMapState.java | 75 ++++++++++---------- .../streaming/state/RocksDBReducingState.java | 8 +-- .../streaming/state/RocksDBStateBackend.java | 19 ++--- .../state/RocksDBStateBackendFactory.java | 6 +- .../streaming/state/RocksDBValueState.java | 5 +- .../streaming/state/RocksDBStateBackend.java | 8 ++- .../state/RocksDBAggregatingStateTest.java | 5 +- .../state/RocksDBAsyncSnapshotTest.java | 14 +--- .../streaming/state/RocksDBInitResetTest.java | 2 +- .../streaming/state/RocksDBListStateTest.java | 4 +- .../state/RocksDBMergeIteratorTest.java | 4 ++ .../state/RocksDBReducingStateTest.java | 8 +-- .../state/RocksDBStateBackendConfigTest.java | 7 +- .../state/RocksDBStateBackendFactoryTest.java | 3 + .../state/RocksDBStateBackendTest.java | 6 +- .../state/RocksDbMultiClassLoaderTest.java | 4 +- .../state/benchmark/RocksDBPerformanceTest.java | 16 ++--- 25 files changed, 205 insertions(+), 163 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/pom.xml ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/pom.xml b/flink-contrib/flink-statebackend-rocksdb/pom.xml index 527ca18..f3d9da5 100644 --- a/flink-contrib/flink-statebackend-rocksdb/pom.xml +++ b/flink-contrib/flink-statebackend-rocksdb/pom.xml @@ -92,4 +92,43 @@ under the License. <scope>test</scope> </dependency> </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-checkstyle-plugin</artifactId> + <version>2.17</version> + <dependencies> + <dependency> + <groupId>com.puppycrawl.tools</groupId> + <artifactId>checkstyle</artifactId> + <version>6.19</version> + </dependency> + </dependencies> + <configuration> + <configLocation>/tools/maven/strict-checkstyle.xml</configLocation> + <suppressionsLocation>/tools/maven/suppressions.xml</suppressionsLocation> + <includeTestSourceDirectory>true</includeTestSourceDirectory> + <logViolationsToConsole>true</logViolationsToConsole> + <failOnViolation>true</failOnViolation> + </configuration> + <executions> + <!-- + Execute checkstyle after compilation but before tests. + + This ensures that any parsing or type checking errors are from + javac, so they look as expected. Beyond that, we want to + fail as early as possible. + --> + <execution> + <phase>test-compile</phase> + <goals> + <goal>check</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> </project> http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java index ba7fb28..c061835 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java @@ -28,8 +28,8 @@ import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; -import org.apache.flink.runtime.state.internal.InternalKvState; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.runtime.state.internal.InternalKvState; import org.apache.flink.util.Preconditions; import org.rocksdb.ColumnFamilyHandle; @@ -52,19 +52,19 @@ import java.io.IOException; public abstract class AbstractRocksDBState<K, N, S extends State, SD extends StateDescriptor<S, V>, V> implements InternalKvState<N>, State { - /** Serializer for the namespace */ + /** Serializer for the namespace. */ final TypeSerializer<N> namespaceSerializer; - /** The current namespace, which the next value methods will refer to */ + /** The current namespace, which the next value methods will refer to. */ private N currentNamespace; - /** Backend that holds the actual RocksDB instance where we store state */ + /** Backend that holds the actual RocksDB instance where we store state. */ protected RocksDBKeyedStateBackend<K> backend; - /** The column family of this particular instance of state */ + /** The column family of this particular instance of state. */ protected ColumnFamilyHandle columnFamily; - /** State descriptor from which to create this state instance */ + /** State descriptor from which to create this state instance. */ protected final SD stateDesc; /** @@ -110,7 +110,7 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta writeCurrentKeyWithGroupAndNamespace(); byte[] key = keySerializationStream.toByteArray(); backend.db.remove(columnFamily, writeOptions, key); - } catch (IOException|RocksDBException e) { + } catch (IOException | RocksDBException e) { throw new RuntimeException("Error while removing entry from RocksDB", e); } } @@ -220,7 +220,7 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta value >>>= 8; } while (value != 0); } - + protected Tuple3<Integer, K, N> readKeyWithGroupAndNamespace(ByteArrayInputStreamWithPos inputStream, DataInputView inputView) throws IOException { int keyGroup = readKeyGroup(inputView); K key = readKey(inputStream, inputView); http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java index 863c5da..34f7f62 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java @@ -25,18 +25,18 @@ import org.rocksdb.DBOptions; * A factory for {@link DBOptions} to be passed to the {@link RocksDBStateBackend}. * Options have to be created lazily by this factory, because the {@code Options} * class is not serializable and holds pointers to native code. - * + * * <p>A typical pattern to use this OptionsFactory is as follows: - * + * * <h3>Java 8:</h3> * <pre>{@code * rocksDbBackend.setOptions( (currentOptions) -> currentOptions.setMaxOpenFiles(1024) ); * }</pre> - * + * * <h3>Java 7:</h3> * <pre>{@code * rocksDbBackend.setOptions(new OptionsFactory() { - * + * * public Options setOptions(Options currentOptions) { * return currentOptions.setMaxOpenFiles(1024); * } @@ -49,11 +49,11 @@ public interface OptionsFactory extends java.io.Serializable { * This method should set the additional options on top of the current options object. * The current options object may contain pre-defined options based on flags that have * been configured on the state backend. - * + * * <p>It is important to set the options on the current object and return the result from * the setter methods, otherwise the pre-defined options may get lost. - * - * @param currentOptions The options object with the pre-defined options. + * + * @param currentOptions The options object with the pre-defined options. * @return The options object on which the additional options are set. */ DBOptions createDBOptions(DBOptions currentOptions); http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java index 93aac85..f606131 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java @@ -25,10 +25,10 @@ import org.rocksdb.DBOptions; import org.rocksdb.StringAppendOperator; /** - * The {@code PredefinedOptions} are configuration settings for the {@link RocksDBStateBackend}. + * The {@code PredefinedOptions} are configuration settings for the {@link RocksDBStateBackend}. * The various pre-defined choices are configurations that have been empirically * determined to be beneficial for performance under different settings. - * + * * <p>Some of these settings are based on experiments by the Flink community, some follow * guides from the RocksDB project. */ @@ -37,12 +37,12 @@ public enum PredefinedOptions { /** * Default options for all settings, except that writes are not forced to the * disk. - * + * * <p>Note: Because Flink does not rely on RocksDB data on disk for recovery, * there is no need to sync data to stable storage. */ DEFAULT { - + @Override public DBOptions createDBOptions() { return new DBOptions() @@ -60,11 +60,11 @@ public enum PredefinedOptions { /** * Pre-defined options for regular spinning hard disks. - * + * * <p>This constant configures RocksDB with some options that lead empirically * to better performance when the machines executing the system use * regular spinning hard disks. - * + * * <p>The following options are set: * <ul> * <li>setCompactionStyle(CompactionStyle.LEVEL)</li> @@ -74,7 +74,7 @@ public enum PredefinedOptions { * <li>setDisableDataSync(true)</li> * <li>setMaxOpenFiles(-1)</li> * </ul> - * + * * <p>Note: Because Flink does not rely on RocksDB data on disk for recovery, * there is no need to sync data to stable storage. */ @@ -121,7 +121,7 @@ public enum PredefinedOptions { * <li>BlockBasedTableConfig.setBlockCacheSize(256 MBytes)</li> * <li>BlockBasedTableConfigsetBlockSize(128 KBytes)</li> * </ul> - * + * * <p>Note: Because Flink does not rely on RocksDB data on disk for recovery, * there is no need to sync data to stable storage. */ @@ -161,13 +161,13 @@ public enum PredefinedOptions { ); } }, - + /** * Pre-defined options for Flash SSDs. * * <p>This constant configures RocksDB with some options that lead empirically * to better performance when the machines executing the system use SSDs. - * + * * <p>The following options are set: * <ul> * <li>setIncreaseParallelism(4)</li> @@ -175,7 +175,7 @@ public enum PredefinedOptions { * <li>setDisableDataSync(true)</li> * <li>setMaxOpenFiles(-1)</li> * </ul> - * + * * <p>Note: Because Flink does not rely on RocksDB data on disk for recovery, * there is no need to sync data to stable storage. */ @@ -196,13 +196,13 @@ public enum PredefinedOptions { .setMergeOperator(new StringAppendOperator()); } }; - + // ------------------------------------------------------------------------ /** * Creates the {@link DBOptions}for this pre-defined setting. - * - * @return The pre-defined options object. + * + * @return The pre-defined options object. */ public abstract DBOptions createDBOptions(); http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java index 1f306b4..fc84456 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java @@ -25,8 +25,8 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; - import org.apache.flink.runtime.state.internal.InternalAggregatingState; + import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.RocksDBException; import org.rocksdb.WriteOptions; @@ -47,10 +47,10 @@ public class RocksDBAggregatingState<K, N, T, ACC, R> extends AbstractRocksDBState<K, N, AggregatingState<T, R>, AggregatingStateDescriptor<T, ACC, R>, ACC> implements InternalAggregatingState<N, T, R> { - /** Serializer for the values */ + /** Serializer for the values. */ private final TypeSerializer<ACC> valueSerializer; - /** User-specified aggregation function */ + /** User-specified aggregation function. */ private final AggregateFunction<T, ACC, R> aggFunction; /** @@ -64,7 +64,7 @@ public class RocksDBAggregatingState<K, N, T, ACC, R> * * @param namespaceSerializer * The serializer for the namespace. - * @param stateDesc + * @param stateDesc * The state identifier for the state. This contains the state name and aggregation function. */ public RocksDBAggregatingState( @@ -154,7 +154,7 @@ public class RocksDBAggregatingState<K, N, T, ACC, R> writeKeyWithGroupAndNamespace( keyGroup, key, source, keySerializationStream, keySerializationDataOutputView); - + final byte[] sourceKey = keySerializationStream.toByteArray(); final byte[] valueBytes = backend.db.get(columnFamily, sourceKey); @@ -174,7 +174,7 @@ public class RocksDBAggregatingState<K, N, T, ACC, R> // if something came out of merging the sources, merge it or write it to the target if (current != null) { - // create the target full-binary-key + // create the target full-binary-key writeKeyWithGroupAndNamespace( keyGroup, key, target, keySerializationStream, keySerializationDataOutputView); http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java index d5d9fce..479565e 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java @@ -26,6 +26,7 @@ import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.runtime.state.internal.InternalFoldingState; + import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.RocksDBException; import org.rocksdb.WriteOptions; @@ -47,10 +48,10 @@ public class RocksDBFoldingState<K, N, T, ACC> extends AbstractRocksDBState<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>, ACC> implements InternalFoldingState<N, T, ACC> { - /** Serializer for the values */ + /** Serializer for the values. */ private final TypeSerializer<ACC> valueSerializer; - /** User-specified fold function */ + /** User-specified fold function. */ private final FoldFunction<T, ACC> foldFunction; /** @@ -90,7 +91,7 @@ public class RocksDBFoldingState<K, N, T, ACC> return null; } return valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes))); - } catch (IOException|RocksDBException e) { + } catch (IOException | RocksDBException e) { throw new RuntimeException("Error while retrieving data from RocksDB", e); } } http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index 053c820..241c0b3 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -26,6 +26,7 @@ import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.CompatibilityUtil; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer; import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; @@ -53,7 +54,6 @@ import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.CheckpointStreamFactory; -import org.apache.flink.api.common.typeutils.CompatibilityUtil; import org.apache.flink.runtime.state.DoneFuture; import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; import org.apache.flink.runtime.state.KeyGroupRange; @@ -79,6 +79,7 @@ import org.apache.flink.util.FileUtils; import org.apache.flink.util.IOUtils; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.Preconditions; + import org.rocksdb.Checkpoint; import org.rocksdb.ColumnFamilyDescriptor; import org.rocksdb.ColumnFamilyHandle; @@ -125,16 +126,16 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { private final String operatorIdentifier; - /** The column family options from the options factory */ + /** The column family options from the options factory. */ private final ColumnFamilyOptions columnOptions; - /** The DB options from the options factory */ + /** The DB options from the options factory. */ private final DBOptions dbOptions; - /** Path where this configured instance stores its data directory */ + /** Path where this configured instance stores its data directory. */ private final File instanceBasePath; - /** Path where this configured instance stores its RocksDB data base */ + /** Path where this configured instance stores its RocksDB data base. */ private final File instanceRocksDBPath; /** @@ -160,7 +161,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { /** * Map of state names to their corresponding restored state meta info. * - * TODO this map can be removed when eager-state registration is in place. + * <p>TODO this map can be removed when eager-state registration is in place. * TODO we currently need this cached to check state migration strategies when new serializers are registered. */ private final Map<String, RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredKvStateMetaInfos; @@ -168,13 +169,13 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { /** Number of bytes required to prefix the key groups. */ private final int keyGroupPrefixBytes; - /** True if incremental checkpointing is enabled */ + /** True if incremental checkpointing is enabled. */ private final boolean enableIncrementalCheckpointing; - /** The state handle ids of all sst files materialized in snapshots for previous checkpoints */ + /** The state handle ids of all sst files materialized in snapshots for previous checkpoints. */ private final SortedMap<Long, Set<StateHandleID>> materializedSstFiles; - /** The identifier of the last completed checkpoint */ + /** The identifier of the last completed checkpoint. */ private long lastCompletedCheckpointId = -1; private static final String SST_FILE_SUFFIX = ".sst"; @@ -711,22 +712,22 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { private static final class RocksDBIncrementalSnapshotOperation<K> { - /** The backend which we snapshot */ + /** The backend which we snapshot. */ private final RocksDBKeyedStateBackend<K> stateBackend; - /** Stream factory that creates the outpus streams to DFS */ + /** Stream factory that creates the outpus streams to DFS. */ private final CheckpointStreamFactory checkpointStreamFactory; - /** Id for the current checkpoint */ + /** Id for the current checkpoint. */ private final long checkpointId; - /** Timestamp for the current checkpoint */ + /** Timestamp for the current checkpoint. */ private final long checkpointTimestamp; - /** All sst files that were part of the last previously completed checkpoint */ + /** All sst files that were part of the last previously completed checkpoint. */ private Set<StateHandleID> baseSstFiles; - /** The state meta data */ + /** The state meta data. */ private final List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots = new ArrayList<>(); private FileSystem backupFileSystem; @@ -888,8 +889,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } } - - synchronized (stateBackend.materializedSstFiles) { stateBackend.materializedSstFiles.put(checkpointId, sstFiles.keySet()); } @@ -1036,13 +1035,13 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { private final RocksDBKeyedStateBackend<K> rocksDBKeyedStateBackend; - /** Current key-groups state handle from which we restore key-groups */ + /** Current key-groups state handle from which we restore key-groups. */ private KeyGroupsStateHandle currentKeyGroupsStateHandle; - /** Current input stream we obtained from currentKeyGroupsStateHandle */ + /** Current input stream we obtained from currentKeyGroupsStateHandle. */ private FSDataInputStream currentStateHandleInStream; - /** Current data input view that wraps currentStateHandleInStream */ + /** Current data input view that wraps currentStateHandleInStream. */ private DataInputView currentStateHandleInView; - /** Current list of ColumnFamilyHandles for all column families we restore from currentKeyGroupsStateHandle */ + /** Current list of ColumnFamilyHandles for all column families we restore from currentKeyGroupsStateHandle. */ private List<ColumnFamilyHandle> currentStateHandleKVStateColumnFamilies; /** @@ -1082,7 +1081,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } /** - * Restore one key groups state handle + * Restore one key groups state handle. * * @throws IOException * @throws RocksDBException @@ -1105,7 +1104,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } /** - * Restore the KV-state / ColumnFamily meta data for all key-groups referenced by the current state handle + * Restore the KV-state / ColumnFamily meta data for all key-groups referenced by the current state handle. * * @throws IOException * @throws ClassNotFoundException @@ -1169,7 +1168,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } /** - * Restore the KV-state / ColumnFamily data for all key-groups referenced by the current state handle + * Restore the KV-state / ColumnFamily data for all key-groups referenced by the current state handle. * * @throws IOException * @throws RocksDBException @@ -1376,7 +1375,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { int startKeyGroup = stateBackend.getKeyGroupRange().getStartKeyGroup(); byte[] startKeyGroupPrefixBytes = new byte[stateBackend.keyGroupPrefixBytes]; for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) { - startKeyGroupPrefixBytes[j] = (byte)(startKeyGroup >>> ((stateBackend.keyGroupPrefixBytes - j - 1) * Byte.SIZE)); + startKeyGroupPrefixBytes[j] = (byte) (startKeyGroup >>> ((stateBackend.keyGroupPrefixBytes - j - 1) * Byte.SIZE)); } iterator.seek(startKeyGroupPrefixBytes); @@ -1430,7 +1429,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { columnFamilyHandle, stateMetaInfo)); } - // use the restore sst files as the base for succeeding checkpoints synchronized (stateBackend.materializedSstFiles) { stateBackend.materializedSstFiles.put(restoreStateHandle.getCheckpointId(), sstFiles.keySet()); @@ -1480,7 +1478,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { for (KeyedStateHandle rawStateHandle : restoreStateHandles) { - if (! (rawStateHandle instanceof IncrementalKeyedStateHandle)) { + if (!(rawStateHandle instanceof IncrementalKeyedStateHandle)) { throw new IllegalStateException("Unexpected state handle type, " + "expected " + IncrementalKeyedStateHandle.class + ", but found " + rawStateHandle.getClass()); http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java index a8b20d1..9d3e97e 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java @@ -50,7 +50,7 @@ public class RocksDBListState<K, N, V> extends AbstractRocksDBState<K, N, ListState<V>, ListStateDescriptor<V>, List<V>> implements InternalListState<N, V> { - /** Serializer for the values */ + /** Serializer for the values. */ private final TypeSerializer<V> valueSerializer; /** @@ -100,7 +100,7 @@ public class RocksDBListState<K, N, V> } } return result; - } catch (IOException|RocksDBException e) { + } catch (IOException | RocksDBException e) { throw new RuntimeException("Error while retrieving data from RocksDB", e); } } @@ -131,7 +131,7 @@ public class RocksDBListState<K, N, V> final int keyGroup = backend.getCurrentKeyGroupIndex(); try { - // create the target full-binary-key + // create the target full-binary-key writeKeyWithGroupAndNamespace( keyGroup, key, target, keySerializationStream, keySerializationDataOutputView); http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java index 5125240..75c1651 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java @@ -30,6 +30,7 @@ import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.runtime.state.internal.InternalMapState; import org.apache.flink.util.Preconditions; + import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; @@ -45,7 +46,7 @@ import java.util.Map; /** * {@link MapState} implementation that stores state in RocksDB. - * <p> + * * <p>{@link RocksDBStateBackend} must ensure that we set the * {@link org.rocksdb.StringAppendOperator} on the column family that we use for our state since * we use the {@code merge()} call. @@ -58,10 +59,10 @@ import java.util.Map; public class RocksDBMapState<K, N, UK, UV> extends AbstractRocksDBState<K, N, MapState<UK, UV>, MapStateDescriptor<UK, UV>, Map<UK, UV>> implements InternalMapState<N, UK, UV> { - - private static Logger LOG = LoggerFactory.getLogger(RocksDBMapState.class); - /** Serializer for the keys and values */ + private static final Logger LOG = LoggerFactory.getLogger(RocksDBMapState.class); + + /** Serializer for the keys and values. */ private final TypeSerializer<UK> userKeySerializer; private final TypeSerializer<UV> userValueSerializer; @@ -105,19 +106,19 @@ public class RocksDBMapState<K, N, UK, UV> @Override public void put(UK userKey, UV userValue) throws IOException, RocksDBException { - + byte[] rawKeyBytes = serializeUserKeyWithCurrentKeyAndNamespace(userKey); byte[] rawValueBytes = serializeUserValue(userValue); backend.db.put(columnFamily, writeOptions, rawKeyBytes, rawValueBytes); } - + @Override public void putAll(Map<UK, UV> map) throws IOException, RocksDBException { if (map == null) { return; } - + for (Map.Entry<UK, UV> entry : map.entrySet()) { put(entry.getKey(), entry.getValue()); } @@ -137,7 +138,7 @@ public class RocksDBMapState<K, N, UK, UV> return (rawValueBytes != null); } - + @Override public Iterable<Map.Entry<UK, UV>> entries() throws IOException, RocksDBException { final Iterator<Map.Entry<UK, UV>> iterator = iterator(); @@ -158,7 +159,7 @@ public class RocksDBMapState<K, N, UK, UV> @Override public Iterable<UK> keys() throws IOException, RocksDBException { final byte[] prefixBytes = serializeCurrentKeyAndNamespace(); - + return new Iterable<UK>() { @Override public Iterator<UK> iterator() { @@ -176,7 +177,7 @@ public class RocksDBMapState<K, N, UK, UV> @Override public Iterable<UV> values() throws IOException, RocksDBException { final byte[] prefixBytes = serializeCurrentKeyAndNamespace(); - + return new Iterable<UV>() { @Override public Iterator<UV> iterator() { @@ -202,7 +203,7 @@ public class RocksDBMapState<K, N, UK, UV> } }; } - + @Override public void clear() { try { @@ -216,7 +217,7 @@ public class RocksDBMapState<K, N, UK, UV> LOG.warn("Error while cleaning the state.", e); } } - + @Override @SuppressWarnings("unchecked") public byte[] getSerializedValue(byte[] serializedKeyAndNamespace) throws Exception { @@ -229,7 +230,7 @@ public class RocksDBMapState<K, N, UK, UV> namespaceSerializer); int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(des.f0, backend.getNumberOfKeyGroups()); - + ByteArrayOutputStreamWithPos outputStream = new ByteArrayOutputStreamWithPos(128); DataOutputViewStreamWrapper outputView = new DataOutputViewStreamWrapper(outputStream); writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1, outputStream, outputView); @@ -246,7 +247,7 @@ public class RocksDBMapState<K, N, UK, UV> if (!iterator.hasNext()) { return null; } - + return KvStateRequestSerializer.serializeMap(new Iterable<Map.Entry<UK, UV>>() { @Override public Iterator<Map.Entry<UK, UV>> iterator() { @@ -254,21 +255,21 @@ public class RocksDBMapState<K, N, UK, UV> } }, userKeySerializer, userValueSerializer); } - + // ------------------------------------------------------------------------ // Serialization Methods // ------------------------------------------------------------------------ - + private byte[] serializeCurrentKeyAndNamespace() throws IOException { writeCurrentKeyWithGroupAndNamespace(); - + return keySerializationStream.toByteArray(); } private byte[] serializeUserKeyWithCurrentKeyAndNamespace(UK userKey) throws IOException { writeCurrentKeyWithGroupAndNamespace(); userKeySerializer.serialize(userKey, keySerializationDataOutputView); - + return keySerializationStream.toByteArray(); } @@ -282,7 +283,6 @@ public class RocksDBMapState<K, N, UK, UV> userValueSerializer.serialize(userValue, keySerializationDataOutputView); } - return keySerializationStream.toByteArray(); } @@ -291,7 +291,7 @@ public class RocksDBMapState<K, N, UK, UV> DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(bais); readKeyWithGroupAndNamespace(bais, in); - + return userKeySerializer.deserialize(in); } @@ -303,20 +303,20 @@ public class RocksDBMapState<K, N, UK, UV> return isNull ? null : userValueSerializer.deserialize(in); } - + // ------------------------------------------------------------------------ // Internal Classes // ------------------------------------------------------------------------ - - /** A map entry in RocksDBMapState */ + + /** A map entry in RocksDBMapState. */ private class RocksDBMapEntry implements Map.Entry<UK, UV> { private final RocksDB db; - + /** The raw bytes of the key stored in RocksDB. Each user key is stored in RocksDB * with the format #KeyGroup#Key#Namespace#UserKey. */ private final byte[] rawKeyBytes; - - /** The raw bytes of the value stored in RocksDB */ + + /** The raw bytes of the value stored in RocksDB. */ private byte[] rawValueBytes; /** True if the entry has been deleted. */ @@ -329,7 +329,7 @@ public class RocksDBMapState<K, N, UK, UV> RocksDBMapEntry(final RocksDB db, final byte[] rawKeyBytes, final byte[] rawValueBytes) { this.db = db; - + this.rawKeyBytes = rawKeyBytes; this.rawValueBytes = rawValueBytes; this.deleted = false; @@ -383,7 +383,7 @@ public class RocksDBMapState<K, N, UK, UV> } UV oldValue = getValue(); - + try { userValue = value; rawValueBytes = serializeUserValue(value); @@ -400,22 +400,22 @@ public class RocksDBMapState<K, N, UK, UV> /** An auxiliary utility to scan all entries under the given key. */ private abstract class RocksDBMapIterator<T> implements Iterator<T> { - final static int CACHE_SIZE_BASE = 1; - final static int CACHE_SIZE_LIMIT = 128; + static final int CACHE_SIZE_BASE = 1; + static final int CACHE_SIZE_LIMIT = 128; /** The db where data resides. */ private final RocksDB db; - /** + /** * The prefix bytes of the key being accessed. All entries under the same key * has the same prefix, hence we can stop the iterating once coming across an - * entry with a different prefix. + * entry with a different prefix. */ private final byte[] keyPrefixBytes; /** * True if all entries have been accessed or the iterator has come across an - * entry with a different prefix. + * entry with a different prefix. */ private boolean expired = false; @@ -423,7 +423,6 @@ public class RocksDBMapState<K, N, UK, UV> private ArrayList<RocksDBMapEntry> cacheEntries = new ArrayList<>(); private int cacheIndex = 0; - RocksDBMapIterator(final RocksDB db, final byte[] keyPrefixBytes) { this.db = db; this.keyPrefixBytes = keyPrefixBytes; @@ -440,7 +439,7 @@ public class RocksDBMapState<K, N, UK, UV> public void remove() { if (cacheIndex == 0 || cacheIndex > cacheEntries.size()) { throw new IllegalStateException("The remove operation must be called after an valid next operation."); - } + } RocksDBMapEntry lastEntry = cacheEntries.get(cacheIndex - 1); lastEntry.remove(); @@ -489,7 +488,7 @@ public class RocksDBMapState<K, N, UK, UV> iterator.seek(startBytes); - /* + /* * If the last returned entry is not deleted, it will be the first entry in the * iterating. Skip it to avoid redundant access in such cases. */ @@ -515,7 +514,7 @@ public class RocksDBMapState<K, N, UK, UV> iterator.close(); } - + private boolean underSameKey(byte[] rawKeyBytes) { if (rawKeyBytes.length < keyPrefixBytes.length) { return false; @@ -530,4 +529,4 @@ public class RocksDBMapState<K, N, UK, UV> return true; } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java index ccc98a7..b5fe95f 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java @@ -46,10 +46,10 @@ public class RocksDBReducingState<K, N, V> extends AbstractRocksDBState<K, N, ReducingState<V>, ReducingStateDescriptor<V>, V> implements InternalReducingState<N, V> { - /** Serializer for the values */ + /** Serializer for the values. */ private final TypeSerializer<V> valueSerializer; - /** User-specified reduce function */ + /** User-specified reduce function. */ private final ReduceFunction<V> reduceFunction; /** @@ -88,7 +88,7 @@ public class RocksDBReducingState<K, N, V> return null; } return valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes))); - } catch (IOException|RocksDBException e) { + } catch (IOException | RocksDBException e) { throw new RuntimeException("Error while retrieving data from RocksDB", e); } } @@ -157,7 +157,7 @@ public class RocksDBReducingState<K, N, V> // if something came out of merging the sources, merge it or write it to the target if (current != null) { - // create the target full-binary-key + // create the target full-binary-key writeKeyWithGroupAndNamespace( keyGroup, key, target, keySerializationStream, keySerializationDataOutputView); http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java index 2b70dcd..4a30489 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java @@ -31,6 +31,7 @@ import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.OperatorStateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.util.AbstractID; + import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.DBOptions; import org.rocksdb.NativeLibraryLoader; @@ -69,10 +70,10 @@ public class RocksDBStateBackend extends AbstractStateBackend { private static final Logger LOG = LoggerFactory.getLogger(RocksDBStateBackend.class); - /** The number of (re)tries for loading the RocksDB JNI library */ + /** The number of (re)tries for loading the RocksDB JNI library. */ private static final int ROCKSDB_LIB_LOADING_ATTEMPTS = 3; - + private static boolean rocksDbInitialized = false; // ------------------------------------------------------------------------ @@ -93,23 +94,23 @@ public class RocksDBStateBackend extends AbstractStateBackend { /** Base paths for RocksDB directory, as configured. May be null. */ private Path[] configuredDbBasePaths; - /** Base paths for RocksDB directory, as initialized */ + /** Base paths for RocksDB directory, as initialized. */ private File[] initializedDbBasePaths; private int nextDirectory; // RocksDB options - /** The pre-configured option settings */ + /** The pre-configured option settings. */ private PredefinedOptions predefinedOptions = PredefinedOptions.DEFAULT; - /** The options factory to create the RocksDB options in the cluster */ + /** The options factory to create the RocksDB options in the cluster. */ private OptionsFactory optionsFactory; /** Whether we already lazily initialized our local storage directories. */ private transient boolean isInitialized = false; - /** True if incremental checkpointing is enabled */ + /** True if incremental checkpointing is enabled. */ private boolean enableIncrementalCheckpointing; @@ -183,10 +184,10 @@ public class RocksDBStateBackend extends AbstractStateBackend { * Creates a new {@code RocksDBStateBackend} that uses the given state backend to store its * checkpoint data streams. Typically, one would supply a filesystem or database state backend * here where the snapshots from RocksDB would be stored. - * + * * <p>The snapshots of the RocksDB state will be stored using the given backend's - * {@link AbstractStateBackend#createStreamFactory(JobID, String) checkpoint stream}. - * + * {@link AbstractStateBackend#createStreamFactory(JobID, String) checkpoint stream}. + * * @param checkpointStreamBackend The backend to store the */ public RocksDBStateBackend(AbstractStateBackend checkpointStreamBackend) { http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java index bd9bcaa..f0569b8 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java @@ -39,13 +39,13 @@ public class RocksDBStateBackendFactory implements StateBackendFactory<RocksDBSt private static final long serialVersionUID = 4906988360901930371L; - /** The key under which the config stores the directory where checkpoints should be stored */ + /** The key under which the config stores the directory where checkpoints should be stored. */ public static final String CHECKPOINT_DIRECTORY_URI_CONF_KEY = "state.backend.fs.checkpointdir"; - /** The key under which the config stores the directory where RocksDB should be stored */ + /** The key under which the config stores the directory where RocksDB should be stored. */ public static final String ROCKSDB_CHECKPOINT_DIRECTORY_URI_CONF_KEY = "state.backend.rocksdb.checkpointdir"; @Override - public RocksDBStateBackend createFromConfig(Configuration config) + public RocksDBStateBackend createFromConfig(Configuration config) throws IllegalConfigurationException, IOException { final String checkpointDirURI = config.getString(CHECKPOINT_DIRECTORY_URI_CONF_KEY, null); http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java index b2a4fba..da21e8a 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.runtime.state.internal.InternalValueState; + import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.RocksDBException; import org.rocksdb.WriteOptions; @@ -42,7 +43,7 @@ public class RocksDBValueState<K, N, V> extends AbstractRocksDBState<K, N, ValueState<V>, ValueStateDescriptor<V>, V> implements InternalValueState<N, V> { - /** Serializer for the values */ + /** Serializer for the values. */ private final TypeSerializer<V> valueSerializer; /** @@ -80,7 +81,7 @@ public class RocksDBValueState<K, N, V> return stateDesc.getDefaultValue(); } return valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes))); - } catch (IOException|RocksDBException e) { + } catch (IOException | RocksDBException e) { throw new RuntimeException("Error while retrieving data from RocksDB.", e); } } http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/migration/contrib/streaming/state/RocksDBStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/migration/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/migration/contrib/streaming/state/RocksDBStateBackend.java index 695aa12..024d12e 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/migration/contrib/streaming/state/RocksDBStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/migration/contrib/streaming/state/RocksDBStateBackend.java @@ -68,6 +68,12 @@ public class RocksDBStateBackend extends AbstractStateBackend { } } + /** + * This class exists to provide a good error message if a user attempts to restore from a semi async snapshot. + * + * <p>see FLINK-5468 + */ + @Deprecated public static class FinalSemiAsyncSnapshot { static { @@ -75,7 +81,7 @@ public class RocksDBStateBackend extends AbstractStateBackend { } private static void throwExceptionOnLoadingThisClass() { - throw new RuntimeException("Attempt to requiresMigration RocksDB state created with semi async snapshot mode failed. " + throw new RuntimeException("Attempt to migrate RocksDB state created with semi async snapshot mode failed. " + "Unfortunately, this is not supported. Please create a new savepoint for the job using fully " + "async mode in Flink 1.1 and run migration again with the new savepoint."); } http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingStateTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingStateTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingStateTest.java index 1b65466..f3065ab 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingStateTest.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingStateTest.java @@ -36,8 +36,9 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; import static java.util.Arrays.asList; -import static org.mockito.Mockito.*; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.mockito.Mockito.mock; /** * Tests for the {@link InternalAggregatingState} implementation on top of RocksDB. http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java index 812babb..d2edf0e 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java @@ -19,7 +19,6 @@ package org.apache.flink.contrib.streaming.state; import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; @@ -61,12 +60,10 @@ import org.apache.flink.util.FutureUtil; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; - import org.junit.Assert; import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; - import org.powermock.api.mockito.PowerMockito; import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PrepareForTest; @@ -408,7 +405,7 @@ public class RocksDBAsyncSnapshotTest { } catch (InterruptedException e) { Thread.currentThread().interrupt(); } - if(closed) { + if (closed) { throw new IOException("Stream closed."); } super.write(b); @@ -422,7 +419,7 @@ public class RocksDBAsyncSnapshotTest { } catch (InterruptedException e) { Thread.currentThread().interrupt(); } - if(closed) { + if (closed) { throw new IOException("Stream closed."); } super.write(b, off, len); @@ -439,7 +436,7 @@ public class RocksDBAsyncSnapshotTest { } } - public static class AsyncCheckpointOperator + private static class AsyncCheckpointOperator extends AbstractStreamOperator<String> implements OneInputStreamOperator<String, String>, StreamCheckpointedOperator { @@ -480,9 +477,4 @@ public class RocksDBAsyncSnapshotTest { } } - - public static class DummyMapFunction<T> implements MapFunction<T, T> { - @Override - public T map(T value) { return value; } - } } http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBInitResetTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBInitResetTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBInitResetTest.java index 7343b56..565f27d 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBInitResetTest.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBInitResetTest.java @@ -21,7 +21,7 @@ package org.apache.flink.contrib.streaming.state; import org.junit.Test; /** - * This test checks that the RocksDB native code loader still responds to resetting the + * This test checks that the RocksDB native code loader still responds to resetting the init flag. */ public class RocksDBInitResetTest { http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBListStateTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBListStateTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBListStateTest.java index e7efcfa..c6ccd5d 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBListStateTest.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBListStateTest.java @@ -64,9 +64,9 @@ public class RocksDBListStateTest { backend.setDbStoragePath(tmp.newFolder().getAbsolutePath()); final RocksDBKeyedStateBackend<String> keyedBackend = createKeyedBackend(backend); - + try { - InternalListState<VoidNamespace, Long> state = + InternalListState<VoidNamespace, Long> state = keyedBackend.createListState(VoidNamespaceSerializer.INSTANCE, stateDescr); state.setCurrentNamespace(VoidNamespace.INSTANCE); http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java index f5bcf86..1d14f6e 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java @@ -21,6 +21,7 @@ package org.apache.flink.contrib.streaming.state; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; + import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -38,6 +39,9 @@ import java.util.Collections; import java.util.List; import java.util.Random; +/** + * Tests for the RocksDBMergeIterator. + */ public class RocksDBMergeIteratorTest { private static final int NUM_KEY_VAL_STATES = 50; http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBReducingStateTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBReducingStateTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBReducingStateTest.java index a8b4535..0733dce 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBReducingStateTest.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBReducingStateTest.java @@ -54,7 +54,7 @@ public class RocksDBReducingStateTest { @Test public void testAddAndGet() throws Exception { - final ReducingStateDescriptor<Long> stateDescr = + final ReducingStateDescriptor<Long> stateDescr = new ReducingStateDescriptor<>("my-state", new AddingFunction(), Long.class); stateDescr.initializeSerializerUnlessSet(new ExecutionConfig()); @@ -62,9 +62,9 @@ public class RocksDBReducingStateTest { backend.setDbStoragePath(tmp.newFolder().getAbsolutePath()); final RocksDBKeyedStateBackend<String> keyedBackend = createKeyedBackend(backend); - + try { - InternalReducingState<VoidNamespace, Long> state = + InternalReducingState<VoidNamespace, Long> state = keyedBackend.createReducingState(VoidNamespaceSerializer.INSTANCE, stateDescr); state.setCurrentNamespace(VoidNamespace.INSTANCE); @@ -126,7 +126,7 @@ public class RocksDBReducingStateTest { final RocksDBKeyedStateBackend<String> keyedBackend = createKeyedBackend(backend); try { - final InternalReducingState<TimeWindow, Long> state = + final InternalReducingState<TimeWindow, Long> state = keyedBackend.createReducingState(new TimeWindow.Serializer(), stateDescr); // populate the different namespaces http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java index 463dd44..ff433ad 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java @@ -18,7 +18,6 @@ package org.apache.flink.contrib.streaming.state; -import org.apache.commons.io.FileUtils; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.typeutils.base.IntSerializer; @@ -32,6 +31,8 @@ import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo; + +import org.apache.commons.io.FileUtils; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -59,7 +60,7 @@ import static org.mockito.Mockito.when; /** - * Tests for configuring the RocksDB State Backend + * Tests for configuring the RocksDB State Backend. */ @SuppressWarnings("serial") public class RocksDBStateBackendConfigTest { @@ -102,7 +103,6 @@ public class RocksDBStateBackendConfigTest { new KeyGroupRange(0, 0), env.getTaskKvStateRegistry()); - File instanceBasePath = keyedBackend.getInstanceBasePath(); assertThat(instanceBasePath.getAbsolutePath(), anyOf(startsWith(testDir1.getAbsolutePath()), startsWith(testDir2.getAbsolutePath()))); @@ -158,7 +158,6 @@ public class RocksDBStateBackendConfigTest { new KeyGroupRange(0, 0), env.getTaskKvStateRegistry()); - File instanceBasePath = keyedBackend.getInstanceBasePath(); assertThat(instanceBasePath.getAbsolutePath(), anyOf(startsWith(dir1.getAbsolutePath()), startsWith(dir2.getAbsolutePath()))); } http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactoryTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactoryTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactoryTest.java index 9eb662a..5a937c4 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactoryTest.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactoryTest.java @@ -22,6 +22,9 @@ import org.junit.Test; import static org.junit.Assert.assertEquals; +/** + * Tests for the RocksDBStateBackendFactory. + */ public class RocksDBStateBackendFactoryTest { @Test http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java index 8d0db69..8b44a47 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java @@ -18,8 +18,6 @@ package org.apache.flink.contrib.streaming.state; -import org.apache.commons.io.FileUtils; -import org.apache.commons.io.filefilter.IOFileFilter; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.state.ValueState; @@ -42,6 +40,9 @@ import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.filefilter.IOFileFilter; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -372,7 +373,6 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); - Queue<IncrementalKeyedStateHandle> previousStateHandles = new LinkedList<>(); SharedStateRegistry sharedStateRegistry = spy(new SharedStateRegistry()); for (int checkpointId = 0; checkpointId < 3; ++checkpointId) { http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java index c53fa3e..4ec6532 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java @@ -27,7 +27,7 @@ import java.lang.reflect.Method; import java.net.URL; import java.net.URLClassLoader; -import static org.junit.Assert.*; +import static org.junit.Assert.assertNotEquals; /** * This test validates that the RocksDB JNI library loading works properly @@ -60,7 +60,7 @@ public class RocksDbMultiClassLoaderTest { final String tempDir = tmp.newFolder().getAbsolutePath(); - final Method meth1 = clazz1.getDeclaredMethod("ensureRocksDBIsLoaded", String.class); + final Method meth1 = clazz1.getDeclaredMethod("ensureRocksDBIsLoaded", String.class); final Method meth2 = clazz2.getDeclaredMethod("ensureRocksDBIsLoaded", String.class); meth1.setAccessible(true); meth2.setAccessible(true); http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java index 7147583..3231e96 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java @@ -26,7 +26,6 @@ import org.apache.flink.util.TestLogger; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; - import org.rocksdb.CompactionStyle; import org.rocksdb.NativeLibraryLoader; import org.rocksdb.Options; @@ -47,7 +46,7 @@ import java.util.Arrays; public class RocksDBPerformanceTest extends TestLogger { @Rule - public final TemporaryFolder TMP = new TemporaryFolder(); + public final TemporaryFolder tmp = new TemporaryFolder(); @Rule public final RetryRule retry = new RetryRule(); @@ -55,7 +54,7 @@ public class RocksDBPerformanceTest extends TestLogger { @Test(timeout = 2000) @RetryOnFailure(times = 3) public void testRocksDbMergePerformance() throws Exception { - final File rocksDir = TMP.newFolder(); + final File rocksDir = tmp.newFolder(); // ensure the RocksDB library is loaded to a distinct location each retry NativeLibraryLoader.getInstance().loadLibrary(rocksDir.getAbsolutePath()); @@ -83,8 +82,8 @@ public class RocksDBPerformanceTest extends TestLogger { .setSync(false) .setDisableWAL(true); - final RocksDB rocksDB = RocksDB.open(options, rocksDir.getAbsolutePath())) - { + final RocksDB rocksDB = RocksDB.open(options, rocksDir.getAbsolutePath())) { + // ----- insert ----- log.info("begin insert"); @@ -133,7 +132,7 @@ public class RocksDBPerformanceTest extends TestLogger { @Test(timeout = 2000) @RetryOnFailure(times = 3) public void testRocksDbRangeGetPerformance() throws Exception { - final File rocksDir = TMP.newFolder(); + final File rocksDir = tmp.newFolder(); // ensure the RocksDB library is loaded to a distinct location each retry NativeLibraryLoader.getInstance().loadLibrary(rocksDir.getAbsolutePath()); @@ -161,8 +160,8 @@ public class RocksDBPerformanceTest extends TestLogger { .setSync(false) .setDisableWAL(true); - final RocksDB rocksDB = RocksDB.open(options, rocksDir.getAbsolutePath())) - { + final RocksDB rocksDB = RocksDB.open(options, rocksDir.getAbsolutePath())) { + final byte[] keyTemplate = Arrays.copyOf(keyBytes, keyBytes.length + 4); final Unsafe unsafe = MemoryUtils.UNSAFE; @@ -205,7 +204,6 @@ public class RocksDBPerformanceTest extends TestLogger { } } - private static boolean samePrefix(byte[] prefix, byte[] key) { for (int i = 0; i < prefix.length; i++) { if (prefix[i] != key [i]) {