[FLINK-6695] Activate strict checkstyle for flink-statebackend-rocksDB

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/60721e07
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/60721e07
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/60721e07

Branch: refs/heads/master
Commit: 60721e07dca50b268c0509703d69f66b03ca6d3a
Parents: a84ce0b
Author: zentol <ches...@apache.org>
Authored: Tue May 23 22:05:19 2017 +0200
Committer: zentol <ches...@apache.org>
Committed: Thu Jun 1 11:14:11 2017 +0200

----------------------------------------------------------------------
 .../flink-statebackend-rocksdb/pom.xml          | 39 ++++++++++
 .../streaming/state/AbstractRocksDBState.java   | 16 ++---
 .../contrib/streaming/state/OptionsFactory.java | 14 ++--
 .../streaming/state/PredefinedOptions.java      | 28 ++++----
 .../state/RocksDBAggregatingState.java          | 12 ++--
 .../streaming/state/RocksDBFoldingState.java    |  7 +-
 .../state/RocksDBKeyedStateBackend.java         | 52 +++++++-------
 .../streaming/state/RocksDBListState.java       |  6 +-
 .../streaming/state/RocksDBMapState.java        | 75 ++++++++++----------
 .../streaming/state/RocksDBReducingState.java   |  8 +--
 .../streaming/state/RocksDBStateBackend.java    | 19 ++---
 .../state/RocksDBStateBackendFactory.java       |  6 +-
 .../streaming/state/RocksDBValueState.java      |  5 +-
 .../streaming/state/RocksDBStateBackend.java    |  8 ++-
 .../state/RocksDBAggregatingStateTest.java      |  5 +-
 .../state/RocksDBAsyncSnapshotTest.java         | 14 +---
 .../streaming/state/RocksDBInitResetTest.java   |  2 +-
 .../streaming/state/RocksDBListStateTest.java   |  4 +-
 .../state/RocksDBMergeIteratorTest.java         |  4 ++
 .../state/RocksDBReducingStateTest.java         |  8 +--
 .../state/RocksDBStateBackendConfigTest.java    |  7 +-
 .../state/RocksDBStateBackendFactoryTest.java   |  3 +
 .../state/RocksDBStateBackendTest.java          |  6 +-
 .../state/RocksDbMultiClassLoaderTest.java      |  4 +-
 .../state/benchmark/RocksDBPerformanceTest.java | 16 ++---
 25 files changed, 205 insertions(+), 163 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/pom.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/pom.xml 
b/flink-contrib/flink-statebackend-rocksdb/pom.xml
index 527ca18..f3d9da5 100644
--- a/flink-contrib/flink-statebackend-rocksdb/pom.xml
+++ b/flink-contrib/flink-statebackend-rocksdb/pom.xml
@@ -92,4 +92,43 @@ under the License.
                        <scope>test</scope>
                </dependency>
        </dependencies>
+       
+       <build>
+               <plugins>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-checkstyle-plugin</artifactId>
+                               <version>2.17</version>
+                               <dependencies>
+                                       <dependency>
+                                               
<groupId>com.puppycrawl.tools</groupId>
+                                               
<artifactId>checkstyle</artifactId>
+                                               <version>6.19</version>
+                                       </dependency>
+                               </dependencies>
+                               <configuration>
+                                       
<configLocation>/tools/maven/strict-checkstyle.xml</configLocation>
+                                       
<suppressionsLocation>/tools/maven/suppressions.xml</suppressionsLocation>
+                                       
<includeTestSourceDirectory>true</includeTestSourceDirectory>
+                                       
<logViolationsToConsole>true</logViolationsToConsole>
+                                       <failOnViolation>true</failOnViolation>
+                               </configuration>
+                               <executions>
+                                       <!--
+                                       Execute checkstyle after compilation 
but before tests.
+
+                                       This ensures that any parsing or type 
checking errors are from
+                                       javac, so they look as expected. Beyond 
that, we want to
+                                       fail as early as possible.
+                                       -->
+                                       <execution>
+                                               <phase>test-compile</phase>
+                                               <goals>
+                                                       <goal>check</goal>
+                                               </goals>
+                                       </execution>
+                               </executions>
+                       </plugin>
+               </plugins>
+       </build>
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
index ba7fb28..c061835 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
@@ -28,8 +28,8 @@ import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
-import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.util.Preconditions;
 
 import org.rocksdb.ColumnFamilyHandle;
@@ -52,19 +52,19 @@ import java.io.IOException;
 public abstract class AbstractRocksDBState<K, N, S extends State, SD extends 
StateDescriptor<S, V>, V>
                implements InternalKvState<N>, State {
 
-       /** Serializer for the namespace */
+       /** Serializer for the namespace. */
        final TypeSerializer<N> namespaceSerializer;
 
-       /** The current namespace, which the next value methods will refer to */
+       /** The current namespace, which the next value methods will refer to. 
*/
        private N currentNamespace;
 
-       /** Backend that holds the actual RocksDB instance where we store state 
*/
+       /** Backend that holds the actual RocksDB instance where we store 
state. */
        protected RocksDBKeyedStateBackend<K> backend;
 
-       /** The column family of this particular instance of state */
+       /** The column family of this particular instance of state. */
        protected ColumnFamilyHandle columnFamily;
 
-       /** State descriptor from which to create this state instance */
+       /** State descriptor from which to create this state instance. */
        protected final SD stateDesc;
 
        /**
@@ -110,7 +110,7 @@ public abstract class AbstractRocksDBState<K, N, S extends 
State, SD extends Sta
                        writeCurrentKeyWithGroupAndNamespace();
                        byte[] key = keySerializationStream.toByteArray();
                        backend.db.remove(columnFamily, writeOptions, key);
-               } catch (IOException|RocksDBException e) {
+               } catch (IOException | RocksDBException e) {
                        throw new RuntimeException("Error while removing entry 
from RocksDB", e);
                }
        }
@@ -220,7 +220,7 @@ public abstract class AbstractRocksDBState<K, N, S extends 
State, SD extends Sta
                        value >>>= 8;
                } while (value != 0);
        }
-       
+
        protected Tuple3<Integer, K, N> 
readKeyWithGroupAndNamespace(ByteArrayInputStreamWithPos inputStream, 
DataInputView inputView) throws IOException {
                int keyGroup = readKeyGroup(inputView);
                K key = readKey(inputStream, inputView);

http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java
index 863c5da..34f7f62 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java
@@ -25,18 +25,18 @@ import org.rocksdb.DBOptions;
  * A factory for {@link DBOptions} to be passed to the {@link 
RocksDBStateBackend}.
  * Options have to be created lazily by this factory, because the {@code 
Options}
  * class is not serializable and holds pointers to native code.
- * 
+ *
  * <p>A typical pattern to use this OptionsFactory is as follows:
- * 
+ *
  * <h3>Java 8:</h3>
  * <pre>{@code
  * rocksDbBackend.setOptions( (currentOptions) -> 
currentOptions.setMaxOpenFiles(1024) );
  * }</pre>
- * 
+ *
  * <h3>Java 7:</h3>
  * <pre>{@code
  * rocksDbBackend.setOptions(new OptionsFactory() {
- *     
+ *
  *     public Options setOptions(Options currentOptions) {
  *         return currentOptions.setMaxOpenFiles(1024);
  *     }
@@ -49,11 +49,11 @@ public interface OptionsFactory extends 
java.io.Serializable {
         * This method should set the additional options on top of the current 
options object.
         * The current options object may contain pre-defined options based on 
flags that have
         * been configured on the state backend.
-        * 
+        *
         * <p>It is important to set the options on the current object and 
return the result from
         * the setter methods, otherwise the pre-defined options may get lost.
-        * 
-        * @param currentOptions The options object with the pre-defined 
options. 
+        *
+        * @param currentOptions The options object with the pre-defined 
options.
         * @return The options object on which the additional options are set.
         */
        DBOptions createDBOptions(DBOptions currentOptions);

http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
index 93aac85..f606131 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
@@ -25,10 +25,10 @@ import org.rocksdb.DBOptions;
 import org.rocksdb.StringAppendOperator;
 
 /**
- * The {@code PredefinedOptions} are configuration settings for the {@link 
RocksDBStateBackend}. 
+ * The {@code PredefinedOptions} are configuration settings for the {@link 
RocksDBStateBackend}.
  * The various pre-defined choices are configurations that have been 
empirically
  * determined to be beneficial for performance under different settings.
- * 
+ *
  * <p>Some of these settings are based on experiments by the Flink community, 
some follow
  * guides from the RocksDB project.
  */
@@ -37,12 +37,12 @@ public enum PredefinedOptions {
        /**
         * Default options for all settings, except that writes are not forced 
to the
         * disk.
-        * 
+        *
         * <p>Note: Because Flink does not rely on RocksDB data on disk for 
recovery,
         * there is no need to sync data to stable storage.
         */
        DEFAULT {
-               
+
                @Override
                public DBOptions createDBOptions() {
                        return new DBOptions()
@@ -60,11 +60,11 @@ public enum PredefinedOptions {
 
        /**
         * Pre-defined options for regular spinning hard disks.
-        * 
+        *
         * <p>This constant configures RocksDB with some options that lead 
empirically
         * to better performance when the machines executing the system use
         * regular spinning hard disks.
-        * 
+        *
         * <p>The following options are set:
         * <ul>
         *     <li>setCompactionStyle(CompactionStyle.LEVEL)</li>
@@ -74,7 +74,7 @@ public enum PredefinedOptions {
         *     <li>setDisableDataSync(true)</li>
         *     <li>setMaxOpenFiles(-1)</li>
         * </ul>
-        * 
+        *
         * <p>Note: Because Flink does not rely on RocksDB data on disk for 
recovery,
         * there is no need to sync data to stable storage.
         */
@@ -121,7 +121,7 @@ public enum PredefinedOptions {
         *     <li>BlockBasedTableConfig.setBlockCacheSize(256 MBytes)</li>
         *     <li>BlockBasedTableConfigsetBlockSize(128 KBytes)</li>
         * </ul>
-        * 
+        *
         * <p>Note: Because Flink does not rely on RocksDB data on disk for 
recovery,
         * there is no need to sync data to stable storage.
         */
@@ -161,13 +161,13 @@ public enum PredefinedOptions {
                                        );
                }
        },
-       
+
        /**
         * Pre-defined options for Flash SSDs.
         *
         * <p>This constant configures RocksDB with some options that lead 
empirically
         * to better performance when the machines executing the system use 
SSDs.
-        * 
+        *
         * <p>The following options are set:
         * <ul>
         *     <li>setIncreaseParallelism(4)</li>
@@ -175,7 +175,7 @@ public enum PredefinedOptions {
         *     <li>setDisableDataSync(true)</li>
         *     <li>setMaxOpenFiles(-1)</li>
         * </ul>
-        * 
+        *
         * <p>Note: Because Flink does not rely on RocksDB data on disk for 
recovery,
         * there is no need to sync data to stable storage.
         */
@@ -196,13 +196,13 @@ public enum PredefinedOptions {
                                        .setMergeOperator(new 
StringAppendOperator());
                }
        };
-       
+
        // 
------------------------------------------------------------------------
 
        /**
         * Creates the {@link DBOptions}for this pre-defined setting.
-        * 
-        * @return The pre-defined options object. 
+        *
+        * @return The pre-defined options object.
         */
        public abstract DBOptions createDBOptions();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
index 1f306b4..fc84456 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
@@ -25,8 +25,8 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-
 import org.apache.flink.runtime.state.internal.InternalAggregatingState;
+
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.RocksDBException;
 import org.rocksdb.WriteOptions;
@@ -47,10 +47,10 @@ public class RocksDBAggregatingState<K, N, T, ACC, R>
        extends AbstractRocksDBState<K, N, AggregatingState<T, R>, 
AggregatingStateDescriptor<T, ACC, R>, ACC>
        implements InternalAggregatingState<N, T, R> {
 
-       /** Serializer for the values */
+       /** Serializer for the values. */
        private final TypeSerializer<ACC> valueSerializer;
 
-       /** User-specified aggregation function */
+       /** User-specified aggregation function. */
        private final AggregateFunction<T, ACC, R> aggFunction;
 
        /**
@@ -64,7 +64,7 @@ public class RocksDBAggregatingState<K, N, T, ACC, R>
         *
         * @param namespaceSerializer
         *             The serializer for the namespace.
-        * @param stateDesc              
+        * @param stateDesc
         *             The state identifier for the state. This contains the 
state name and aggregation function.
         */
        public RocksDBAggregatingState(
@@ -154,7 +154,7 @@ public class RocksDBAggregatingState<K, N, T, ACC, R>
                                        writeKeyWithGroupAndNamespace(
                                                        keyGroup, key, source,
                                                        keySerializationStream, 
keySerializationDataOutputView);
-                                       
+
                                        final byte[] sourceKey = 
keySerializationStream.toByteArray();
                                        final byte[] valueBytes = 
backend.db.get(columnFamily, sourceKey);
 
@@ -174,7 +174,7 @@ public class RocksDBAggregatingState<K, N, T, ACC, R>
 
                        // if something came out of merging the sources, merge 
it or write it to the target
                        if (current != null) {
-                               // create the target full-binary-key 
+                               // create the target full-binary-key
                                writeKeyWithGroupAndNamespace(
                                                keyGroup, key, target,
                                                keySerializationStream, 
keySerializationDataOutputView);

http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
index d5d9fce..479565e 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.state.internal.InternalFoldingState;
+
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.RocksDBException;
 import org.rocksdb.WriteOptions;
@@ -47,10 +48,10 @@ public class RocksDBFoldingState<K, N, T, ACC>
        extends AbstractRocksDBState<K, N, FoldingState<T, ACC>, 
FoldingStateDescriptor<T, ACC>, ACC>
        implements InternalFoldingState<N, T, ACC> {
 
-       /** Serializer for the values */
+       /** Serializer for the values. */
        private final TypeSerializer<ACC> valueSerializer;
 
-       /** User-specified fold function */
+       /** User-specified fold function. */
        private final FoldFunction<T, ACC> foldFunction;
 
        /**
@@ -90,7 +91,7 @@ public class RocksDBFoldingState<K, N, T, ACC>
                                return null;
                        }
                        return valueSerializer.deserialize(new 
DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes)));
-               } catch (IOException|RocksDBException e) {
+               } catch (IOException | RocksDBException e) {
                        throw new RuntimeException("Error while retrieving data 
from RocksDB", e);
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 053c820..241c0b3 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
@@ -53,7 +54,6 @@ import 
org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
-import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import org.apache.flink.runtime.state.DoneFuture;
 import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
@@ -79,6 +79,7 @@ import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
+
 import org.rocksdb.Checkpoint;
 import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
@@ -125,16 +126,16 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
        private final String operatorIdentifier;
 
-       /** The column family options from the options factory */
+       /** The column family options from the options factory. */
        private final ColumnFamilyOptions columnOptions;
 
-       /** The DB options from the options factory */
+       /** The DB options from the options factory. */
        private final DBOptions dbOptions;
 
-       /** Path where this configured instance stores its data directory */
+       /** Path where this configured instance stores its data directory. */
        private final File instanceBasePath;
 
-       /** Path where this configured instance stores its RocksDB data base */
+       /** Path where this configured instance stores its RocksDB data base. */
        private final File instanceRocksDBPath;
 
        /**
@@ -160,7 +161,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
        /**
         * Map of state names to their corresponding restored state meta info.
         *
-        * TODO this map can be removed when eager-state registration is in 
place.
+        * <p>TODO this map can be removed when eager-state registration is in 
place.
         * TODO we currently need this cached to check state migration 
strategies when new serializers are registered.
         */
        private final Map<String, 
RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredKvStateMetaInfos;
@@ -168,13 +169,13 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
        /** Number of bytes required to prefix the key groups. */
        private final int keyGroupPrefixBytes;
 
-       /** True if incremental checkpointing is enabled */
+       /** True if incremental checkpointing is enabled. */
        private final boolean enableIncrementalCheckpointing;
 
-       /** The state handle ids of all sst files materialized in snapshots for 
previous checkpoints */
+       /** The state handle ids of all sst files materialized in snapshots for 
previous checkpoints. */
        private final SortedMap<Long, Set<StateHandleID>> materializedSstFiles;
 
-       /** The identifier of the last completed checkpoint */
+       /** The identifier of the last completed checkpoint. */
        private long lastCompletedCheckpointId = -1;
 
        private static final String SST_FILE_SUFFIX = ".sst";
@@ -711,22 +712,22 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
        private static final class RocksDBIncrementalSnapshotOperation<K> {
 
-               /** The backend which we snapshot */
+               /** The backend which we snapshot. */
                private final RocksDBKeyedStateBackend<K> stateBackend;
 
-               /** Stream factory that creates the outpus streams to DFS */
+               /** Stream factory that creates the outpus streams to DFS. */
                private final CheckpointStreamFactory checkpointStreamFactory;
 
-               /** Id for the current checkpoint */
+               /** Id for the current checkpoint. */
                private final long checkpointId;
 
-               /** Timestamp for the current checkpoint */
+               /** Timestamp for the current checkpoint. */
                private final long checkpointTimestamp;
 
-               /** All sst files that were part of the last previously 
completed checkpoint */
+               /** All sst files that were part of the last previously 
completed checkpoint. */
                private Set<StateHandleID> baseSstFiles;
 
-               /** The state meta data */
+               /** The state meta data. */
                private final 
List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots 
= new ArrayList<>();
 
                private FileSystem backupFileSystem;
@@ -888,8 +889,6 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                }
                        }
 
-
-
                        synchronized (stateBackend.materializedSstFiles) {
                                
stateBackend.materializedSstFiles.put(checkpointId, sstFiles.keySet());
                        }
@@ -1036,13 +1035,13 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                private final RocksDBKeyedStateBackend<K> 
rocksDBKeyedStateBackend;
 
-               /** Current key-groups state handle from which we restore 
key-groups */
+               /** Current key-groups state handle from which we restore 
key-groups. */
                private KeyGroupsStateHandle currentKeyGroupsStateHandle;
-               /** Current input stream we obtained from 
currentKeyGroupsStateHandle */
+               /** Current input stream we obtained from 
currentKeyGroupsStateHandle. */
                private FSDataInputStream currentStateHandleInStream;
-               /** Current data input view that wraps 
currentStateHandleInStream */
+               /** Current data input view that wraps 
currentStateHandleInStream. */
                private DataInputView currentStateHandleInView;
-               /** Current list of ColumnFamilyHandles for all column families 
we restore from currentKeyGroupsStateHandle */
+               /** Current list of ColumnFamilyHandles for all column families 
we restore from currentKeyGroupsStateHandle. */
                private List<ColumnFamilyHandle> 
currentStateHandleKVStateColumnFamilies;
 
                /**
@@ -1082,7 +1081,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                }
 
                /**
-                * Restore one key groups state handle
+                * Restore one key groups state handle.
                 *
                 * @throws IOException
                 * @throws RocksDBException
@@ -1105,7 +1104,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                }
 
                /**
-                * Restore the KV-state / ColumnFamily meta data for all 
key-groups referenced by the current state handle
+                * Restore the KV-state / ColumnFamily meta data for all 
key-groups referenced by the current state handle.
                 *
                 * @throws IOException
                 * @throws ClassNotFoundException
@@ -1169,7 +1168,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                }
 
                /**
-                * Restore the KV-state / ColumnFamily data for all key-groups 
referenced by the current state handle
+                * Restore the KV-state / ColumnFamily data for all key-groups 
referenced by the current state handle.
                 *
                 * @throws IOException
                 * @throws RocksDBException
@@ -1376,7 +1375,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                                                int 
startKeyGroup = stateBackend.getKeyGroupRange().getStartKeyGroup();
                                                                byte[] 
startKeyGroupPrefixBytes = new byte[stateBackend.keyGroupPrefixBytes];
                                                                for (int j = 0; 
j < stateBackend.keyGroupPrefixBytes; ++j) {
-                                                                       
startKeyGroupPrefixBytes[j] = (byte)(startKeyGroup >>> 
((stateBackend.keyGroupPrefixBytes - j - 1) * Byte.SIZE));
+                                                                       
startKeyGroupPrefixBytes[j] = (byte) (startKeyGroup >>> 
((stateBackend.keyGroupPrefixBytes - j - 1) * Byte.SIZE));
                                                                }
 
                                                                
iterator.seek(startKeyGroupPrefixBytes);
@@ -1430,7 +1429,6 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                                                
columnFamilyHandle, stateMetaInfo));
                                        }
 
-
                                        // use the restore sst files as the 
base for succeeding checkpoints
                                        synchronized 
(stateBackend.materializedSstFiles) {
                                                
stateBackend.materializedSstFiles.put(restoreStateHandle.getCheckpointId(), 
sstFiles.keySet());
@@ -1480,7 +1478,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                        for (KeyedStateHandle rawStateHandle : 
restoreStateHandles) {
 
-                               if (! (rawStateHandle instanceof 
IncrementalKeyedStateHandle)) {
+                               if (!(rawStateHandle instanceof 
IncrementalKeyedStateHandle)) {
                                        throw new 
IllegalStateException("Unexpected state handle type, " +
                                                "expected " + 
IncrementalKeyedStateHandle.class +
                                                ", but found " + 
rawStateHandle.getClass());

http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
index a8b20d1..9d3e97e 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
@@ -50,7 +50,7 @@ public class RocksDBListState<K, N, V>
        extends AbstractRocksDBState<K, N, ListState<V>, 
ListStateDescriptor<V>, List<V>>
        implements InternalListState<N, V> {
 
-       /** Serializer for the values */
+       /** Serializer for the values. */
        private final TypeSerializer<V> valueSerializer;
 
        /**
@@ -100,7 +100,7 @@ public class RocksDBListState<K, N, V>
                                }
                        }
                        return result;
-               } catch (IOException|RocksDBException e) {
+               } catch (IOException | RocksDBException e) {
                        throw new RuntimeException("Error while retrieving data 
from RocksDB", e);
                }
        }
@@ -131,7 +131,7 @@ public class RocksDBListState<K, N, V>
                final int keyGroup = backend.getCurrentKeyGroupIndex();
 
                try {
-                       // create the target full-binary-key 
+                       // create the target full-binary-key
                        writeKeyWithGroupAndNamespace(
                                        keyGroup, key, target,
                                        keySerializationStream, 
keySerializationDataOutputView);

http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index 5125240..75c1651 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -30,6 +30,7 @@ import 
org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.internal.InternalMapState;
 import org.apache.flink.util.Preconditions;
+
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
@@ -45,7 +46,7 @@ import java.util.Map;
 
 /**
  * {@link MapState} implementation that stores state in RocksDB.
- * <p>
+ *
  * <p>{@link RocksDBStateBackend} must ensure that we set the
  * {@link org.rocksdb.StringAppendOperator} on the column family that we use 
for our state since
  * we use the {@code merge()} call.
@@ -58,10 +59,10 @@ import java.util.Map;
 public class RocksDBMapState<K, N, UK, UV>
        extends AbstractRocksDBState<K, N, MapState<UK, UV>, 
MapStateDescriptor<UK, UV>, Map<UK, UV>>
        implements InternalMapState<N, UK, UV> {
-       
-       private static Logger LOG = 
LoggerFactory.getLogger(RocksDBMapState.class);
 
-       /** Serializer for the keys and values */
+       private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBMapState.class);
+
+       /** Serializer for the keys and values. */
        private final TypeSerializer<UK> userKeySerializer;
        private final TypeSerializer<UV> userValueSerializer;
 
@@ -105,19 +106,19 @@ public class RocksDBMapState<K, N, UK, UV>
 
        @Override
        public void put(UK userKey, UV userValue) throws IOException, 
RocksDBException {
-               
+
                byte[] rawKeyBytes = 
serializeUserKeyWithCurrentKeyAndNamespace(userKey);
                byte[] rawValueBytes = serializeUserValue(userValue);
 
                backend.db.put(columnFamily, writeOptions, rawKeyBytes, 
rawValueBytes);
        }
-       
+
        @Override
        public void putAll(Map<UK, UV> map) throws IOException, 
RocksDBException {
                if (map == null) {
                        return;
                }
-               
+
                for (Map.Entry<UK, UV> entry : map.entrySet()) {
                        put(entry.getKey(), entry.getValue());
                }
@@ -137,7 +138,7 @@ public class RocksDBMapState<K, N, UK, UV>
 
                return (rawValueBytes != null);
        }
-       
+
        @Override
        public Iterable<Map.Entry<UK, UV>> entries() throws IOException, 
RocksDBException {
                final Iterator<Map.Entry<UK, UV>> iterator = iterator();
@@ -158,7 +159,7 @@ public class RocksDBMapState<K, N, UK, UV>
        @Override
        public Iterable<UK> keys() throws IOException, RocksDBException {
                final byte[] prefixBytes = serializeCurrentKeyAndNamespace();
-               
+
                return new Iterable<UK>() {
                        @Override
                        public Iterator<UK> iterator() {
@@ -176,7 +177,7 @@ public class RocksDBMapState<K, N, UK, UV>
        @Override
        public Iterable<UV> values() throws IOException, RocksDBException {
                final byte[] prefixBytes = serializeCurrentKeyAndNamespace();
-               
+
                return new Iterable<UV>() {
                        @Override
                        public Iterator<UV> iterator() {
@@ -202,7 +203,7 @@ public class RocksDBMapState<K, N, UK, UV>
                        }
                };
        }
-       
+
        @Override
        public void clear() {
                try {
@@ -216,7 +217,7 @@ public class RocksDBMapState<K, N, UK, UV>
                        LOG.warn("Error while cleaning the state.", e);
                }
        }
-       
+
        @Override
        @SuppressWarnings("unchecked")
        public byte[] getSerializedValue(byte[] serializedKeyAndNamespace) 
throws Exception {
@@ -229,7 +230,7 @@ public class RocksDBMapState<K, N, UK, UV>
                                namespaceSerializer);
 
                int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(des.f0, 
backend.getNumberOfKeyGroups());
-               
+
                ByteArrayOutputStreamWithPos outputStream = new 
ByteArrayOutputStreamWithPos(128);
                DataOutputViewStreamWrapper outputView = new 
DataOutputViewStreamWrapper(outputStream);
                writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1, 
outputStream, outputView);
@@ -246,7 +247,7 @@ public class RocksDBMapState<K, N, UK, UV>
                if (!iterator.hasNext()) {
                        return null;
                }
-               
+
                return KvStateRequestSerializer.serializeMap(new 
Iterable<Map.Entry<UK, UV>>() {
                        @Override
                        public Iterator<Map.Entry<UK, UV>> iterator() {
@@ -254,21 +255,21 @@ public class RocksDBMapState<K, N, UK, UV>
                        }
                }, userKeySerializer, userValueSerializer);
        }
-       
+
        // 
------------------------------------------------------------------------
        //  Serialization Methods
        // 
------------------------------------------------------------------------
-       
+
        private byte[] serializeCurrentKeyAndNamespace() throws IOException {
                writeCurrentKeyWithGroupAndNamespace();
-               
+
                return keySerializationStream.toByteArray();
        }
 
        private byte[] serializeUserKeyWithCurrentKeyAndNamespace(UK userKey) 
throws IOException {
                writeCurrentKeyWithGroupAndNamespace();
                userKeySerializer.serialize(userKey, 
keySerializationDataOutputView);
-               
+
                return keySerializationStream.toByteArray();
        }
 
@@ -282,7 +283,6 @@ public class RocksDBMapState<K, N, UK, UV>
                        userValueSerializer.serialize(userValue, 
keySerializationDataOutputView);
                }
 
-               
                return keySerializationStream.toByteArray();
        }
 
@@ -291,7 +291,7 @@ public class RocksDBMapState<K, N, UK, UV>
                DataInputViewStreamWrapper in = new 
DataInputViewStreamWrapper(bais);
 
                readKeyWithGroupAndNamespace(bais, in);
-       
+
                return userKeySerializer.deserialize(in);
        }
 
@@ -303,20 +303,20 @@ public class RocksDBMapState<K, N, UK, UV>
 
                return isNull ? null : userValueSerializer.deserialize(in);
        }
-       
+
        // 
------------------------------------------------------------------------
        //  Internal Classes
        // 
------------------------------------------------------------------------
-       
-       /** A map entry in RocksDBMapState */
+
+       /** A map entry in RocksDBMapState. */
        private class RocksDBMapEntry implements Map.Entry<UK, UV> {
                private final RocksDB db;
-               
+
                /** The raw bytes of the key stored in RocksDB. Each user key 
is stored in RocksDB
                 * with the format #KeyGroup#Key#Namespace#UserKey. */
                private final byte[] rawKeyBytes;
-               
-               /** The raw bytes of the value stored in RocksDB */
+
+               /** The raw bytes of the value stored in RocksDB. */
                private byte[] rawValueBytes;
 
                /** True if the entry has been deleted. */
@@ -329,7 +329,7 @@ public class RocksDBMapState<K, N, UK, UV>
 
                RocksDBMapEntry(final RocksDB db, final byte[] rawKeyBytes, 
final byte[] rawValueBytes) {
                        this.db = db;
-                       
+
                        this.rawKeyBytes = rawKeyBytes;
                        this.rawValueBytes = rawValueBytes;
                        this.deleted = false;
@@ -383,7 +383,7 @@ public class RocksDBMapState<K, N, UK, UV>
                        }
 
                        UV oldValue = getValue();
-                       
+
                        try {
                                userValue = value;
                                rawValueBytes = serializeUserValue(value);
@@ -400,22 +400,22 @@ public class RocksDBMapState<K, N, UK, UV>
        /** An auxiliary utility to scan all entries under the given key. */
        private abstract class RocksDBMapIterator<T> implements Iterator<T> {
 
-               final static int CACHE_SIZE_BASE = 1;
-               final static int CACHE_SIZE_LIMIT = 128;
+               static final int CACHE_SIZE_BASE = 1;
+               static final int CACHE_SIZE_LIMIT = 128;
 
                /** The db where data resides. */
                private final RocksDB db;
 
-               /** 
+               /**
                 * The prefix bytes of the key being accessed. All entries 
under the same key
                 * has the same prefix, hence we can stop the iterating once 
coming across an
-                * entry with a different prefix. 
+                * entry with a different prefix.
                 */
                private final byte[] keyPrefixBytes;
 
                /**
                 * True if all entries have been accessed or the iterator has 
come across an
-                * entry with a different prefix. 
+                * entry with a different prefix.
                 */
                private boolean expired = false;
 
@@ -423,7 +423,6 @@ public class RocksDBMapState<K, N, UK, UV>
                private ArrayList<RocksDBMapEntry> cacheEntries = new 
ArrayList<>();
                private int cacheIndex = 0;
 
-
                RocksDBMapIterator(final RocksDB db, final byte[] 
keyPrefixBytes) {
                        this.db = db;
                        this.keyPrefixBytes = keyPrefixBytes;
@@ -440,7 +439,7 @@ public class RocksDBMapState<K, N, UK, UV>
                public void remove() {
                        if (cacheIndex == 0 || cacheIndex > 
cacheEntries.size()) {
                                throw new IllegalStateException("The remove 
operation must be called after an valid next operation.");
-                       } 
+                       }
 
                        RocksDBMapEntry lastEntry = cacheEntries.get(cacheIndex 
- 1);
                        lastEntry.remove();
@@ -489,7 +488,7 @@ public class RocksDBMapState<K, N, UK, UV>
 
                        iterator.seek(startBytes);
 
-                       /* 
+                       /*
                         * If the last returned entry is not deleted, it will 
be the first entry in the
                         * iterating. Skip it to avoid redundant access in such 
cases.
                         */
@@ -515,7 +514,7 @@ public class RocksDBMapState<K, N, UK, UV>
 
                        iterator.close();
                }
-               
+
                private boolean underSameKey(byte[] rawKeyBytes) {
                        if (rawKeyBytes.length < keyPrefixBytes.length) {
                                return false;
@@ -530,4 +529,4 @@ public class RocksDBMapState<K, N, UK, UV>
                        return true;
                }
        }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
index ccc98a7..b5fe95f 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
@@ -46,10 +46,10 @@ public class RocksDBReducingState<K, N, V>
        extends AbstractRocksDBState<K, N, ReducingState<V>, 
ReducingStateDescriptor<V>, V>
        implements InternalReducingState<N, V> {
 
-       /** Serializer for the values */
+       /** Serializer for the values. */
        private final TypeSerializer<V> valueSerializer;
 
-       /** User-specified reduce function */
+       /** User-specified reduce function. */
        private final ReduceFunction<V> reduceFunction;
 
        /**
@@ -88,7 +88,7 @@ public class RocksDBReducingState<K, N, V>
                                return null;
                        }
                        return valueSerializer.deserialize(new 
DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes)));
-               } catch (IOException|RocksDBException e) {
+               } catch (IOException | RocksDBException e) {
                        throw new RuntimeException("Error while retrieving data 
from RocksDB", e);
                }
        }
@@ -157,7 +157,7 @@ public class RocksDBReducingState<K, N, V>
 
                        // if something came out of merging the sources, merge 
it or write it to the target
                        if (current != null) {
-                               // create the target full-binary-key 
+                               // create the target full-binary-key
                                writeKeyWithGroupAndNamespace(
                                                keyGroup, key, target,
                                                keySerializationStream, 
keySerializationDataOutputView);

http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 2b70dcd..4a30489 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.util.AbstractID;
+
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
 import org.rocksdb.NativeLibraryLoader;
@@ -69,10 +70,10 @@ public class RocksDBStateBackend extends 
AbstractStateBackend {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBStateBackend.class);
 
-       /** The number of (re)tries for loading the RocksDB JNI library */
+       /** The number of (re)tries for loading the RocksDB JNI library. */
        private static final int ROCKSDB_LIB_LOADING_ATTEMPTS = 3;
 
-       
+
        private static boolean rocksDbInitialized = false;
 
        // 
------------------------------------------------------------------------
@@ -93,23 +94,23 @@ public class RocksDBStateBackend extends 
AbstractStateBackend {
        /** Base paths for RocksDB directory, as configured. May be null. */
        private Path[] configuredDbBasePaths;
 
-       /** Base paths for RocksDB directory, as initialized */
+       /** Base paths for RocksDB directory, as initialized. */
        private File[] initializedDbBasePaths;
 
        private int nextDirectory;
 
        // RocksDB options
 
-       /** The pre-configured option settings */
+       /** The pre-configured option settings. */
        private PredefinedOptions predefinedOptions = PredefinedOptions.DEFAULT;
 
-       /** The options factory to create the RocksDB options in the cluster */
+       /** The options factory to create the RocksDB options in the cluster. */
        private OptionsFactory optionsFactory;
 
        /** Whether we already lazily initialized our local storage 
directories. */
        private transient boolean isInitialized = false;
 
-       /** True if incremental checkpointing is enabled */
+       /** True if incremental checkpointing is enabled. */
        private boolean enableIncrementalCheckpointing;
 
 
@@ -183,10 +184,10 @@ public class RocksDBStateBackend extends 
AbstractStateBackend {
         * Creates a new {@code RocksDBStateBackend} that uses the given state 
backend to store its
         * checkpoint data streams. Typically, one would supply a filesystem or 
database state backend
         * here where the snapshots from RocksDB would be stored.
-        * 
+        *
         * <p>The snapshots of the RocksDB state will be stored using the given 
backend's
-        * {@link AbstractStateBackend#createStreamFactory(JobID, String) 
checkpoint stream}. 
-        * 
+        * {@link AbstractStateBackend#createStreamFactory(JobID, String) 
checkpoint stream}.
+        *
         * @param checkpointStreamBackend The backend to store the
         */
        public RocksDBStateBackend(AbstractStateBackend 
checkpointStreamBackend) {

http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java
index bd9bcaa..f0569b8 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java
@@ -39,13 +39,13 @@ public class RocksDBStateBackendFactory implements 
StateBackendFactory<RocksDBSt
 
        private static final long serialVersionUID = 4906988360901930371L;
 
-       /** The key under which the config stores the directory where 
checkpoints should be stored */
+       /** The key under which the config stores the directory where 
checkpoints should be stored. */
        public static final String CHECKPOINT_DIRECTORY_URI_CONF_KEY = 
"state.backend.fs.checkpointdir";
-       /** The key under which the config stores the directory where RocksDB 
should be stored */
+       /** The key under which the config stores the directory where RocksDB 
should be stored. */
        public static final String ROCKSDB_CHECKPOINT_DIRECTORY_URI_CONF_KEY = 
"state.backend.rocksdb.checkpointdir";
 
        @Override
-       public RocksDBStateBackend createFromConfig(Configuration config) 
+       public RocksDBStateBackend createFromConfig(Configuration config)
                        throws IllegalConfigurationException, IOException {
 
                final String checkpointDirURI = 
config.getString(CHECKPOINT_DIRECTORY_URI_CONF_KEY, null);

http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
index b2a4fba..da21e8a 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.state.internal.InternalValueState;
+
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.RocksDBException;
 import org.rocksdb.WriteOptions;
@@ -42,7 +43,7 @@ public class RocksDBValueState<K, N, V>
        extends AbstractRocksDBState<K, N, ValueState<V>, 
ValueStateDescriptor<V>, V>
        implements InternalValueState<N, V> {
 
-       /** Serializer for the values */
+       /** Serializer for the values. */
        private final TypeSerializer<V> valueSerializer;
 
        /**
@@ -80,7 +81,7 @@ public class RocksDBValueState<K, N, V>
                                return stateDesc.getDefaultValue();
                        }
                        return valueSerializer.deserialize(new 
DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes)));
-               } catch (IOException|RocksDBException e) {
+               } catch (IOException | RocksDBException e) {
                        throw new RuntimeException("Error while retrieving data 
from RocksDB.", e);
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/migration/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/migration/contrib/streaming/state/RocksDBStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/migration/contrib/streaming/state/RocksDBStateBackend.java
index 695aa12..024d12e 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/migration/contrib/streaming/state/RocksDBStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/migration/contrib/streaming/state/RocksDBStateBackend.java
@@ -68,6 +68,12 @@ public class RocksDBStateBackend extends 
AbstractStateBackend {
                }
        }
 
+       /**
+        * This class exists to provide a good error message if a user attempts 
to restore from a semi async snapshot.
+        *
+        * <p>see FLINK-5468
+        */
+       @Deprecated
        public static class FinalSemiAsyncSnapshot {
 
                static {
@@ -75,7 +81,7 @@ public class RocksDBStateBackend extends AbstractStateBackend 
{
                }
 
                private static void throwExceptionOnLoadingThisClass() {
-                       throw new RuntimeException("Attempt to 
requiresMigration RocksDB state created with semi async snapshot mode failed. "
+                       throw new RuntimeException("Attempt to migrate RocksDB 
state created with semi async snapshot mode failed. "
                                        + "Unfortunately, this is not 
supported. Please create a new savepoint for the job using fully "
                                        + "async mode in Flink 1.1 and run 
migration again with the new savepoint.");
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingStateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingStateTest.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingStateTest.java
index 1b65466..f3065ab 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingStateTest.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingStateTest.java
@@ -36,8 +36,9 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
 import static java.util.Arrays.asList;
-import static org.mockito.Mockito.*;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
 
 /**
  * Tests for the {@link InternalAggregatingState} implementation on top of 
RocksDB.

http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
index 812babb..d2edf0e 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.contrib.streaming.state;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
@@ -61,12 +60,10 @@ import org.apache.flink.util.FutureUtil;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
-
 import org.junit.Assert;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-
 import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -408,7 +405,7 @@ public class RocksDBAsyncSnapshotTest {
                                                        } catch 
(InterruptedException e) {
                                                                
Thread.currentThread().interrupt();
                                                        }
-                                                       if(closed) {
+                                                       if (closed) {
                                                                throw new 
IOException("Stream closed.");
                                                        }
                                                        super.write(b);
@@ -422,7 +419,7 @@ public class RocksDBAsyncSnapshotTest {
                                                        } catch 
(InterruptedException e) {
                                                                
Thread.currentThread().interrupt();
                                                        }
-                                                       if(closed) {
+                                                       if (closed) {
                                                                throw new 
IOException("Stream closed.");
                                                        }
                                                        super.write(b, off, 
len);
@@ -439,7 +436,7 @@ public class RocksDBAsyncSnapshotTest {
                }
        }
 
-       public static class AsyncCheckpointOperator
+       private static class AsyncCheckpointOperator
                extends AbstractStreamOperator<String>
                implements OneInputStreamOperator<String, String>, 
StreamCheckpointedOperator {
 
@@ -480,9 +477,4 @@ public class RocksDBAsyncSnapshotTest {
                }
 
        }
-
-       public static class DummyMapFunction<T> implements MapFunction<T, T> {
-               @Override
-               public T map(T value) { return value; }
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBInitResetTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBInitResetTest.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBInitResetTest.java
index 7343b56..565f27d 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBInitResetTest.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBInitResetTest.java
@@ -21,7 +21,7 @@ package org.apache.flink.contrib.streaming.state;
 import org.junit.Test;
 
 /**
- * This test checks that the RocksDB native code loader still responds to 
resetting the
+ * This test checks that the RocksDB native code loader still responds to 
resetting the init flag.
  */
 public class RocksDBInitResetTest {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBListStateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBListStateTest.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBListStateTest.java
index e7efcfa..c6ccd5d 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBListStateTest.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBListStateTest.java
@@ -64,9 +64,9 @@ public class RocksDBListStateTest {
                backend.setDbStoragePath(tmp.newFolder().getAbsolutePath());
 
                final RocksDBKeyedStateBackend<String> keyedBackend = 
createKeyedBackend(backend);
-               
+
                try {
-                       InternalListState<VoidNamespace, Long> state = 
+                       InternalListState<VoidNamespace, Long> state =
                                        
keyedBackend.createListState(VoidNamespaceSerializer.INSTANCE, stateDescr);
                        state.setCurrentNamespace(VoidNamespace.INSTANCE);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
index f5bcf86..1d14f6e 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.contrib.streaming.state;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
@@ -38,6 +39,9 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Random;
 
+/**
+ * Tests for the RocksDBMergeIterator.
+ */
 public class RocksDBMergeIteratorTest {
 
        private static final int NUM_KEY_VAL_STATES = 50;

http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBReducingStateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBReducingStateTest.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBReducingStateTest.java
index a8b4535..0733dce 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBReducingStateTest.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBReducingStateTest.java
@@ -54,7 +54,7 @@ public class RocksDBReducingStateTest {
        @Test
        public void testAddAndGet() throws Exception {
 
-               final ReducingStateDescriptor<Long> stateDescr = 
+               final ReducingStateDescriptor<Long> stateDescr =
                                new ReducingStateDescriptor<>("my-state", new 
AddingFunction(), Long.class);
                stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
 
@@ -62,9 +62,9 @@ public class RocksDBReducingStateTest {
                backend.setDbStoragePath(tmp.newFolder().getAbsolutePath());
 
                final RocksDBKeyedStateBackend<String> keyedBackend = 
createKeyedBackend(backend);
-               
+
                try {
-                       InternalReducingState<VoidNamespace, Long> state = 
+                       InternalReducingState<VoidNamespace, Long> state =
                                        
keyedBackend.createReducingState(VoidNamespaceSerializer.INSTANCE, stateDescr);
                        state.setCurrentNamespace(VoidNamespace.INSTANCE);
 
@@ -126,7 +126,7 @@ public class RocksDBReducingStateTest {
                final RocksDBKeyedStateBackend<String> keyedBackend = 
createKeyedBackend(backend);
 
                try {
-                       final InternalReducingState<TimeWindow, Long> state = 
+                       final InternalReducingState<TimeWindow, Long> state =
                                        keyedBackend.createReducingState(new 
TimeWindow.Serializer(), stateDescr);
 
                        // populate the different namespaces

http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index 463dd44..ff433ad 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.contrib.streaming.state;
 
-import org.apache.commons.io.FileUtils;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
@@ -32,6 +31,8 @@ import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
+
+import org.apache.commons.io.FileUtils;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -59,7 +60,7 @@ import static org.mockito.Mockito.when;
 
 
 /**
- * Tests for configuring the RocksDB State Backend
+ * Tests for configuring the RocksDB State Backend.
  */
 @SuppressWarnings("serial")
 public class RocksDBStateBackendConfigTest {
@@ -102,7 +103,6 @@ public class RocksDBStateBackendConfigTest {
                                                new KeyGroupRange(0, 0),
                                                env.getTaskKvStateRegistry());
 
-
                File instanceBasePath = keyedBackend.getInstanceBasePath();
                assertThat(instanceBasePath.getAbsolutePath(), 
anyOf(startsWith(testDir1.getAbsolutePath()), 
startsWith(testDir2.getAbsolutePath())));
 
@@ -158,7 +158,6 @@ public class RocksDBStateBackendConfigTest {
                                                new KeyGroupRange(0, 0),
                                                env.getTaskKvStateRegistry());
 
-
                File instanceBasePath = keyedBackend.getInstanceBasePath();
                assertThat(instanceBasePath.getAbsolutePath(), 
anyOf(startsWith(dir1.getAbsolutePath()), startsWith(dir2.getAbsolutePath())));
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactoryTest.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactoryTest.java
index 9eb662a..5a937c4 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactoryTest.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactoryTest.java
@@ -22,6 +22,9 @@ import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 
+/**
+ * Tests for the RocksDBStateBackendFactory.
+ */
 public class RocksDBStateBackendFactoryTest {
 
        @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
index 8d0db69..8b44a47 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.contrib.streaming.state;
 
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.filefilter.IOFileFilter;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.state.ValueState;
@@ -42,6 +40,9 @@ import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.filefilter.IOFileFilter;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -372,7 +373,6 @@ public class RocksDBStateBackendTest extends 
StateBackendTestBase<RocksDBStateBa
                        ValueState<String> state =
                                
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
 
-
                        Queue<IncrementalKeyedStateHandle> previousStateHandles 
= new LinkedList<>();
                        SharedStateRegistry sharedStateRegistry = spy(new 
SharedStateRegistry());
                        for (int checkpointId = 0; checkpointId < 3; 
++checkpointId) {

http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java
index c53fa3e..4ec6532 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java
@@ -27,7 +27,7 @@ import java.lang.reflect.Method;
 import java.net.URL;
 import java.net.URLClassLoader;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertNotEquals;
 
 /**
  * This test validates that the RocksDB JNI library loading works properly
@@ -60,7 +60,7 @@ public class RocksDbMultiClassLoaderTest {
 
                final String tempDir = tmp.newFolder().getAbsolutePath();
 
-               final Method meth1 = 
clazz1.getDeclaredMethod("ensureRocksDBIsLoaded", String.class);
+               final Method meth1 = 
clazz1.getDeclaredMethod("ensureRocksDBIsLoaded", String.class);
                final Method meth2 = 
clazz2.getDeclaredMethod("ensureRocksDBIsLoaded", String.class);
                meth1.setAccessible(true);
                meth2.setAccessible(true);

http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
index 7147583..3231e96 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.util.TestLogger;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
-
 import org.rocksdb.CompactionStyle;
 import org.rocksdb.NativeLibraryLoader;
 import org.rocksdb.Options;
@@ -47,7 +46,7 @@ import java.util.Arrays;
 public class RocksDBPerformanceTest extends TestLogger {
 
        @Rule
-       public final TemporaryFolder TMP = new TemporaryFolder();
+       public final TemporaryFolder tmp = new TemporaryFolder();
 
        @Rule
        public final RetryRule retry = new RetryRule();
@@ -55,7 +54,7 @@ public class RocksDBPerformanceTest extends TestLogger {
        @Test(timeout = 2000)
        @RetryOnFailure(times = 3)
        public void testRocksDbMergePerformance() throws Exception {
-               final File rocksDir = TMP.newFolder();
+               final File rocksDir = tmp.newFolder();
 
                // ensure the RocksDB library is loaded to a distinct location 
each retry
                
NativeLibraryLoader.getInstance().loadLibrary(rocksDir.getAbsolutePath());
@@ -83,8 +82,8 @@ public class RocksDBPerformanceTest extends TestLogger {
                                        .setSync(false)
                                        .setDisableWAL(true);
 
-                       final RocksDB rocksDB = RocksDB.open(options, 
rocksDir.getAbsolutePath()))
-               {
+                       final RocksDB rocksDB = RocksDB.open(options, 
rocksDir.getAbsolutePath())) {
+
                        // ----- insert -----
                        log.info("begin insert");
 
@@ -133,7 +132,7 @@ public class RocksDBPerformanceTest extends TestLogger {
        @Test(timeout = 2000)
        @RetryOnFailure(times = 3)
        public void testRocksDbRangeGetPerformance() throws Exception {
-               final File rocksDir = TMP.newFolder();
+               final File rocksDir = tmp.newFolder();
 
                // ensure the RocksDB library is loaded to a distinct location 
each retry
                
NativeLibraryLoader.getInstance().loadLibrary(rocksDir.getAbsolutePath());
@@ -161,8 +160,8 @@ public class RocksDBPerformanceTest extends TestLogger {
                                        .setSync(false)
                                        .setDisableWAL(true);
 
-                       final RocksDB rocksDB = RocksDB.open(options, 
rocksDir.getAbsolutePath()))
-               {
+                       final RocksDB rocksDB = RocksDB.open(options, 
rocksDir.getAbsolutePath())) {
+
                        final byte[] keyTemplate = Arrays.copyOf(keyBytes, 
keyBytes.length + 4);
 
                        final Unsafe unsafe = MemoryUtils.UNSAFE;
@@ -205,7 +204,6 @@ public class RocksDBPerformanceTest extends TestLogger {
                }
        }
 
-
        private static boolean samePrefix(byte[] prefix, byte[] key) {
                for (int i = 0; i < prefix.length; i++) {
                        if (prefix[i] != key [i]) {

Reply via email to