This is an automated email from the ASF dual-hosted git repository.

panyuepeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-benchmarks.git

commit c05ca114382ba48a3b4d27d268bc753ed6844f8c
Author: panyuepeng <[email protected]>
AuthorDate: Wed Dec 17 13:13:18 2025 +0800

    [FLINK-38801][benchmark] Adjust code lines format and corresponding namings 
to follow Google AOSP style.
---
 .../benchmark/BlockingPartitionBenchmark.java      |   7 +-
 .../BlockingPartitionRemoteChannelBenchmark.java   |   1 -
 .../benchmark/CheckpointEnvironmentContext.java    |   6 +-
 .../ContinuousFileReaderOperatorBenchmark.java     |  28 ++--
 .../flink/benchmark/FlinkEnvironmentContext.java   |   5 +-
 .../HighAvailabilityServiceBenchmark.java          | 177 +++++++++++----------
 .../apache/flink/benchmark/KeyByBenchmarks.java    |   7 +-
 .../flink/benchmark/MultipleInputBenchmark.java    |   3 +-
 .../flink/benchmark/ProcessingTimerBenchmark.java  |  12 +-
 .../flink/benchmark/RemoteBenchmarkBase.java       |   4 +-
 .../SerializationFrameworkMiniBenchmarks.java      |  15 +-
 .../flink/benchmark/StateBackendBenchmarkBase.java |   7 +-
 .../benchmark/WatermarkAggregationBenchmark.java   |  13 +-
 .../benchmark/full/PojoSerializationBenchmark.java |   4 +-
 .../full/SerializationFrameworkAllBenchmarks.java  |   6 +-
 .../full/StringSerializationBenchmark.java         |  18 ++-
 .../benchmark/functions/IntegerLongSource.java     |   1 +
 .../flink/benchmark/functions/LongSourceType.java  |   2 +-
 ...FailureAndRestartAllTasksBenchmarkExecutor.java |   3 +-
 ...DownstreamTasksInBatchJobBenchmarkExecutor.java |   7 +-
 ...hMapStateBackendRescalingBenchmarkExecutor.java |  26 ++-
 .../flink/state/benchmark/ListStateBenchmark.java  |  24 +--
 .../flink/state/benchmark/MapStateBenchmark.java   |  26 +--
 .../state/benchmark/RescalingBenchmarkBase.java    |  12 +-
 ...ksdbStateBackendRescalingBenchmarkExecutor.java |  21 ++-
 .../flink/state/benchmark/StateBenchmarkBase.java  |  39 ++---
 .../state/benchmark/StateBenchmarkConstants.java   |  56 +++----
 .../flink/state/benchmark/ValueStateBenchmark.java |   4 +-
 .../state/benchmark/ttl/TtlListStateBenchmark.java |  18 ++-
 .../state/benchmark/ttl/TtlMapStateBenchmark.java  |  21 +--
 .../state/benchmark/ttl/TtlStateBenchmarkBase.java |   2 +
 .../benchmark/ttl/TtlValueStateBenchmark.java      |  10 +-
 32 files changed, 316 insertions(+), 269 deletions(-)

diff --git 
a/src/main/java/org/apache/flink/benchmark/BlockingPartitionBenchmark.java 
b/src/main/java/org/apache/flink/benchmark/BlockingPartitionBenchmark.java
index 0159de1..db3962d 100644
--- a/src/main/java/org/apache/flink/benchmark/BlockingPartitionBenchmark.java
+++ b/src/main/java/org/apache/flink/benchmark/BlockingPartitionBenchmark.java
@@ -85,14 +85,13 @@ public class BlockingPartitionBenchmark extends 
BenchmarkBase {
             env.setBufferTimeout(-1);
         }
 
-        protected Configuration createConfiguration(
-                boolean compressionEnabled) {
+        protected Configuration createConfiguration(boolean 
compressionEnabled) {
             Configuration configuration = super.createConfiguration();
 
             configuration.set(
                     NettyShuffleEnvironmentOptions.SHUFFLE_COMPRESSION_CODEC,
-                    compressionEnabled ?
-                            NettyShuffleEnvironmentOptions.CompressionCodec.LZ4
+                    compressionEnabled
+                            ? 
NettyShuffleEnvironmentOptions.CompressionCodec.LZ4
                             : 
NettyShuffleEnvironmentOptions.CompressionCodec.NONE);
             configuration.set(
                     CoreOptions.TMP_DIRS,
diff --git 
a/src/main/java/org/apache/flink/benchmark/BlockingPartitionRemoteChannelBenchmark.java
 
b/src/main/java/org/apache/flink/benchmark/BlockingPartitionRemoteChannelBenchmark.java
index da411bb..9df44e3 100644
--- 
a/src/main/java/org/apache/flink/benchmark/BlockingPartitionRemoteChannelBenchmark.java
+++ 
b/src/main/java/org/apache/flink/benchmark/BlockingPartitionRemoteChannelBenchmark.java
@@ -20,7 +20,6 @@ package org.apache.flink.benchmark;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
 import org.apache.flink.util.FileUtils;
diff --git 
a/src/main/java/org/apache/flink/benchmark/CheckpointEnvironmentContext.java 
b/src/main/java/org/apache/flink/benchmark/CheckpointEnvironmentContext.java
index 0295cdf..d7e5a29 100644
--- a/src/main/java/org/apache/flink/benchmark/CheckpointEnvironmentContext.java
+++ b/src/main/java/org/apache/flink/benchmark/CheckpointEnvironmentContext.java
@@ -98,8 +98,7 @@ public abstract class CheckpointEnvironmentContext extends 
FlinkEnvironmentConte
                             TaskManagerOptions.MEMORY_SEGMENT_SIZE,
                             
CheckpointEnvironmentContext.START_MEMORY_SEGMENT_SIZE);
                     config.set(
-                            CheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT,
-                            Duration.ofMillis(0));
+                            CheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT, 
Duration.ofMillis(0));
                     config.set(TaskManagerOptions.BUFFER_DEBLOAT_ENABLED, 
false);
                     return config;
                 }),
@@ -110,8 +109,7 @@ public abstract class CheckpointEnvironmentContext extends 
FlinkEnvironmentConte
                             TaskManagerOptions.MEMORY_SEGMENT_SIZE,
                             
CheckpointEnvironmentContext.START_MEMORY_SEGMENT_SIZE);
                     config.set(
-                            CheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT,
-                            Duration.ofMillis(1));
+                            CheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT, 
Duration.ofMillis(1));
                     config.set(TaskManagerOptions.BUFFER_DEBLOAT_ENABLED, 
false);
                     return config;
                 }),
diff --git 
a/src/main/java/org/apache/flink/benchmark/ContinuousFileReaderOperatorBenchmark.java
 
b/src/main/java/org/apache/flink/benchmark/ContinuousFileReaderOperatorBenchmark.java
index be13691..c6d0ea1 100644
--- 
a/src/main/java/org/apache/flink/benchmark/ContinuousFileReaderOperatorBenchmark.java
+++ 
b/src/main/java/org/apache/flink/benchmark/ContinuousFileReaderOperatorBenchmark.java
@@ -27,9 +27,9 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
 import 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperatorFactory;
 import 
org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit;
+import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
 
 import joptsimple.internal.Strings;
-import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.OperationsPerInvocation;
 import org.openjdk.jmh.runner.Runner;
@@ -43,19 +43,19 @@ import java.util.concurrent.TimeoutException;
 
 @OperationsPerInvocation(value = 
ContinuousFileReaderOperatorBenchmark.RECORDS_PER_INVOCATION)
 public class ContinuousFileReaderOperatorBenchmark extends BenchmarkBase {
-    private static final int SPLITS_PER_INVOCATION = 100;
-    private static final int LINES_PER_SPLIT = 175_000;
-    public static final int RECORDS_PER_INVOCATION = SPLITS_PER_INVOCATION * 
LINES_PER_SPLIT;
+    private static final int splitsPerInvocation = 100;
+    private static final int linesPerSplit = 175_000;
+    public static final int RECORDS_PER_INVOCATION = splitsPerInvocation * 
linesPerSplit;
 
-    private static final TimestampedFileInputSplit SPLIT =
+    private static final TimestampedFileInputSplit split =
             new TimestampedFileInputSplit(0, 0, new Path("."), 0, 0, new 
String[] {});
-    private static final String LINE = Strings.repeat('0', 10);
+    private static final String line = Strings.repeat('0', 10);
 
     // Source should wait until all elements reach sink. Otherwise, 
END_OF_INPUT is sent once all
     // splits are emitted.
     // Thus, all subsequent reads in ContinuousFileReaderOperator would be 
made in CLOSING state in
     // a simple while-true loop (MailboxExecutor.isIdle is always true).
-    private static OneShotLatch TARGET_COUNT_REACHED_LATCH = new 
OneShotLatch();
+    private static OneShotLatch targetCountReachedLatch = new OneShotLatch();
 
     public static void main(String[] args) throws RunnerException {
         Options options =
@@ -73,7 +73,7 @@ public class ContinuousFileReaderOperatorBenchmark extends 
BenchmarkBase {
 
     @Benchmark
     public void readFileSplit(FlinkEnvironmentContext context) throws 
Exception {
-        TARGET_COUNT_REACHED_LATCH.reset();
+        targetCountReachedLatch.reset();
         StreamExecutionEnvironment env = context.env;
         env.enableCheckpointing(100)
                 .setParallelism(1)
@@ -93,15 +93,15 @@ public class ContinuousFileReaderOperatorBenchmark extends 
BenchmarkBase {
 
         @Override
         public void run(SourceContext<TimestampedFileInputSplit> ctx) {
-            while (isRunning && count < SPLITS_PER_INVOCATION) {
+            while (isRunning && count < splitsPerInvocation) {
                 count++;
                 synchronized (ctx.getCheckpointLock()) {
-                    ctx.collect(SPLIT);
+                    ctx.collect(split);
                 }
             }
             while (isRunning) {
                 try {
-                    TARGET_COUNT_REACHED_LATCH.await(100, 
TimeUnit.MILLISECONDS);
+                    targetCountReachedLatch.await(100, TimeUnit.MILLISECONDS);
                     return;
                 } catch (InterruptedException e) {
                     if (!isRunning) {
@@ -124,13 +124,13 @@ public class ContinuousFileReaderOperatorBenchmark 
extends BenchmarkBase {
 
         @Override
         public boolean reachedEnd() {
-            return count >= 
ContinuousFileReaderOperatorBenchmark.LINES_PER_SPLIT;
+            return count >= 
ContinuousFileReaderOperatorBenchmark.linesPerSplit;
         }
 
         @Override
         public String nextRecord(String s) {
             count++;
-            return LINE;
+            return line;
         }
 
         @Override
@@ -151,7 +151,7 @@ public class ContinuousFileReaderOperatorBenchmark extends 
BenchmarkBase {
         @Override
         public void invoke(String value, Context context) {
             if (++count == RECORDS_PER_INVOCATION) {
-                TARGET_COUNT_REACHED_LATCH.trigger();
+                targetCountReachedLatch.trigger();
             }
         }
     }
diff --git 
a/src/main/java/org/apache/flink/benchmark/FlinkEnvironmentContext.java 
b/src/main/java/org/apache/flink/benchmark/FlinkEnvironmentContext.java
index b51de81..c3fa19c 100644
--- a/src/main/java/org/apache/flink/benchmark/FlinkEnvironmentContext.java
+++ b/src/main/java/org/apache/flink/benchmark/FlinkEnvironmentContext.java
@@ -103,11 +103,12 @@ public class FlinkEnvironmentContext {
         final Configuration configuration = new Configuration();
         configuration.set(RestOptions.BIND_PORT, "0");
         // no equivalent config available.
-        //configuration.setInteger(
+        // configuration.setInteger(
         //        NettyShuffleEnvironmentOptions.NETWORK_NUM_BUFFERS, 
NUM_NETWORK_BUFFERS);
         configuration.set(DeploymentOptions.TARGET, 
MiniClusterPipelineExecutorServiceLoader.NAME);
         configuration.set(DeploymentOptions.ATTACHED, true);
-        // It doesn't make sense to wait for the final checkpoint in 
benchmarks since it only prolongs
+        // It doesn't make sense to wait for the final checkpoint in 
benchmarks since it only
+        // prolongs
         // the test but doesn't give any advantages.
         
configuration.set(CheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, 
false);
         // TODO: remove this line after FLINK-28243 will be done
diff --git 
a/src/main/java/org/apache/flink/benchmark/HighAvailabilityServiceBenchmark.java
 
b/src/main/java/org/apache/flink/benchmark/HighAvailabilityServiceBenchmark.java
index e49c61e..c91f693 100644
--- 
a/src/main/java/org/apache/flink/benchmark/HighAvailabilityServiceBenchmark.java
+++ 
b/src/main/java/org/apache/flink/benchmark/HighAvailabilityServiceBenchmark.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.benchmark;
 
-import org.apache.curator.test.TestingServer;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
@@ -27,6 +26,8 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.util.FileUtils;
+
+import org.apache.curator.test.TestingServer;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.OutputTimeUnit;
 import org.openjdk.jmh.annotations.Param;
@@ -54,88 +55,94 @@ import static org.openjdk.jmh.annotations.Scope.Thread;
  */
 @OutputTimeUnit(SECONDS)
 public class HighAvailabilityServiceBenchmark extends BenchmarkBase {
-       public static void main(String[] args) throws RunnerException {
-               Options options =
-                               new OptionsBuilder()
-                                               .verbosity(VerboseMode.NORMAL)
-                                               .include(".*" + 
HighAvailabilityServiceBenchmark.class.getCanonicalName() + ".*")
-                                               .build();
-
-               new Runner(options).run();
-       }
-
-       @Benchmark
-       public void submitJobThroughput(HighAvailabilityContext context) throws 
Exception {
-               context.miniCluster.executeJobBlocking(buildNoOpJob());
-       }
-
-       private static JobGraph buildNoOpJob() {
-               JobGraph jobGraph = new JobGraph(JobID.generate(), 
UUID.randomUUID().toString());
-               jobGraph.addVertex(createNoOpVertex());
-               return jobGraph;
-       }
-
-       private static JobVertex createNoOpVertex() {
-               JobVertex vertex = new JobVertex("v");
-               vertex.setInvokableClass(NoOpInvokable.class);
-               vertex.setParallelism(1);
-               vertex.setMaxParallelism(1);
-               return vertex;
-       }
-
-       @State(Thread)
-       public static class HighAvailabilityContext extends 
FlinkEnvironmentContext {
-               private TestingServer testingServer;
-               public final File haDir;
-
-               @Param({"ZOOKEEPER", "NONE"})
-               public HighAvailabilityMode highAvailabilityMode;
-
-               public HighAvailabilityContext() {
-                       try {
-                               haDir = 
Files.createTempDirectory("bench-ha-").toFile();
-                       } catch (IOException e) {
-                               throw new RuntimeException(e);
-                       }
-               }
-
-               @Override
-               public void setUp() throws Exception {
-                       if (isZookeeperHighAvailability()) {
-                               testingServer = new TestingServer();
-                               testingServer.start();
-                       }
-
-                       // The method `super.setUp()` will call 
`createConfiguration()` to get Configuration and
-                       // create a `MiniCluster`. We need to start 
TestingServer before `createConfiguration()`,
-                       // then we can add zookeeper quorum in the 
configuration. So we can only start
-                       // `TestingServer` before `super.setUp()`.
-                       super.setUp();
-               }
-
-               private boolean isZookeeperHighAvailability() {
-                       return highAvailabilityMode == 
HighAvailabilityMode.ZOOKEEPER;
-               }
-
-               @Override
-               protected Configuration createConfiguration() {
-                       Configuration configuration = 
super.createConfiguration();
-                       configuration.set(HighAvailabilityOptions.HA_MODE, 
highAvailabilityMode.name());
-                       
configuration.set(HighAvailabilityOptions.HA_STORAGE_PATH, 
haDir.toURI().toString());
-                       if (isZookeeperHighAvailability()) {
-                               
configuration.set(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, 
testingServer.getConnectString());
-                       }
-                       return configuration;
-               }
-
-               @Override
-               public void tearDown() throws Exception {
-                       super.tearDown();
-                       if (isZookeeperHighAvailability()) {
-                               testingServer.stop();
-                               testingServer.close();
-                       }
-                       FileUtils.deleteDirectory(haDir);
-               }
-       }
+    public static void main(String[] args) throws RunnerException {
+        Options options =
+                new OptionsBuilder()
+                        .verbosity(VerboseMode.NORMAL)
+                        .include(
+                                ".*"
+                                        + 
HighAvailabilityServiceBenchmark.class.getCanonicalName()
+                                        + ".*")
+                        .build();
+
+        new Runner(options).run();
+    }
+
+    @Benchmark
+    public void submitJobThroughput(HighAvailabilityContext context) throws 
Exception {
+        context.miniCluster.executeJobBlocking(buildNoOpJob());
+    }
+
+    private static JobGraph buildNoOpJob() {
+        JobGraph jobGraph = new JobGraph(JobID.generate(), 
UUID.randomUUID().toString());
+        jobGraph.addVertex(createNoOpVertex());
+        return jobGraph;
+    }
+
+    private static JobVertex createNoOpVertex() {
+        JobVertex vertex = new JobVertex("v");
+        vertex.setInvokableClass(NoOpInvokable.class);
+        vertex.setParallelism(1);
+        vertex.setMaxParallelism(1);
+        return vertex;
+    }
+
+    @State(Thread)
+    public static class HighAvailabilityContext extends 
FlinkEnvironmentContext {
+        private TestingServer testingServer;
+        public final File haDir;
+
+        @Param({"ZOOKEEPER", "NONE"})
+        public HighAvailabilityMode highAvailabilityMode;
+
+        public HighAvailabilityContext() {
+            try {
+                haDir = Files.createTempDirectory("bench-ha-").toFile();
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        @Override
+        public void setUp() throws Exception {
+            if (isZookeeperHighAvailability()) {
+                testingServer = new TestingServer();
+                testingServer.start();
+            }
+
+            // The method `super.setUp()` will call `createConfiguration()` to 
get Configuration and
+            // create a `MiniCluster`. We need to start TestingServer before
+            // `createConfiguration()`,
+            // then we can add zookeeper quorum in the configuration. So we 
can only start
+            // `TestingServer` before `super.setUp()`.
+            super.setUp();
+        }
+
+        private boolean isZookeeperHighAvailability() {
+            return highAvailabilityMode == HighAvailabilityMode.ZOOKEEPER;
+        }
+
+        @Override
+        protected Configuration createConfiguration() {
+            Configuration configuration = super.createConfiguration();
+            configuration.set(HighAvailabilityOptions.HA_MODE, 
highAvailabilityMode.name());
+            configuration.set(HighAvailabilityOptions.HA_STORAGE_PATH, 
haDir.toURI().toString());
+            if (isZookeeperHighAvailability()) {
+                configuration.set(
+                        HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM,
+                        testingServer.getConnectString());
+            }
+            return configuration;
+        }
+
+        @Override
+        public void tearDown() throws Exception {
+            super.tearDown();
+            if (isZookeeperHighAvailability()) {
+                testingServer.stop();
+                testingServer.close();
+            }
+            FileUtils.deleteDirectory(haDir);
+        }
+    }
 }
diff --git a/src/main/java/org/apache/flink/benchmark/KeyByBenchmarks.java 
b/src/main/java/org/apache/flink/benchmark/KeyByBenchmarks.java
index c4cb907..149cacd 100644
--- a/src/main/java/org/apache/flink/benchmark/KeyByBenchmarks.java
+++ b/src/main/java/org/apache/flink/benchmark/KeyByBenchmarks.java
@@ -23,8 +23,8 @@ import 
org.apache.flink.benchmark.functions.BaseSourceWithKeyRange;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.legacy.DiscardingSink;
-
 import org.apache.flink.streaming.util.keys.KeySelectorUtil;
+
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.OperationsPerInvocation;
 import org.openjdk.jmh.runner.Runner;
@@ -68,8 +68,9 @@ public class KeyByBenchmarks extends BenchmarkBase {
         StreamExecutionEnvironment env = context.env;
         env.setParallelism(4);
 
-        DataStreamSource<int[]> source = env.addSource(new 
IncreasingArraySource(ARRAY_RECORDS_PER_INVOCATION, 10));
-        source.keyBy(KeySelectorUtil.getSelectorForArray(new int[]{0}, 
source.getType()))
+        DataStreamSource<int[]> source =
+                env.addSource(new 
IncreasingArraySource(ARRAY_RECORDS_PER_INVOCATION, 10));
+        source.keyBy(KeySelectorUtil.getSelectorForArray(new int[] {0}, 
source.getType()))
                 .addSink(new DiscardingSink<>());
 
         env.execute();
diff --git 
a/src/main/java/org/apache/flink/benchmark/MultipleInputBenchmark.java 
b/src/main/java/org/apache/flink/benchmark/MultipleInputBenchmark.java
index 72ad6c8..95eca61 100644
--- a/src/main/java/org/apache/flink/benchmark/MultipleInputBenchmark.java
+++ b/src/main/java/org/apache/flink/benchmark/MultipleInputBenchmark.java
@@ -167,7 +167,8 @@ public class MultipleInputBenchmark extends BenchmarkBase {
         @Override
         public SourceReader<Integer, MockSourceSplit> createReader(
                 SourceReaderContext readerContext) {
-            return new 
MockSourceReader(MockSourceReader.WaitingForSplits.WAIT_UNTIL_ALL_SPLITS_ASSIGNED,
 true) {
+            return new MockSourceReader(
+                    
MockSourceReader.WaitingForSplits.WAIT_UNTIL_ALL_SPLITS_ASSIGNED, true) {
                 @Override
                 public InputStatus pollNext(ReaderOutput<Integer> 
sourceOutput) {
                     if (canFinish.isDone() && 
!canFinish.isCompletedExceptionally()) {
diff --git 
a/src/main/java/org/apache/flink/benchmark/ProcessingTimerBenchmark.java 
b/src/main/java/org/apache/flink/benchmark/ProcessingTimerBenchmark.java
index 83939b8..21db382 100644
--- a/src/main/java/org/apache/flink/benchmark/ProcessingTimerBenchmark.java
+++ b/src/main/java/org/apache/flink/benchmark/ProcessingTimerBenchmark.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.streaming.api.functions.KeyedProcessFunction;
 import org.apache.flink.streaming.api.functions.sink.legacy.DiscardingSink;
 import 
org.apache.flink.streaming.api.functions.source.legacy.RichParallelSourceFunction;
 import org.apache.flink.util.Collector;
+
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.OperationsPerInvocation;
 import org.openjdk.jmh.runner.Runner;
@@ -41,7 +42,7 @@ public class ProcessingTimerBenchmark extends BenchmarkBase {
 
     private static final int PARALLELISM = 1;
 
-    private static OneShotLatch LATCH = new OneShotLatch();
+    private static OneShotLatch latch = new OneShotLatch();
 
     public static void main(String[] args) throws RunnerException {
         Options options =
@@ -55,7 +56,7 @@ public class ProcessingTimerBenchmark extends BenchmarkBase {
 
     @Benchmark
     public void fireProcessingTimers(FlinkEnvironmentContext context) throws 
Exception {
-        LATCH.reset();
+        latch.reset();
         StreamExecutionEnvironment env = context.env;
         env.setParallelism(PARALLELISM);
 
@@ -84,7 +85,7 @@ public class ProcessingTimerBenchmark extends BenchmarkBase {
                 sourceContext.collect(String.valueOf(random.nextLong()));
             }
 
-            LATCH.await();
+            latch.await();
         }
 
         @Override
@@ -111,7 +112,8 @@ public class ProcessingTimerBenchmark extends BenchmarkBase 
{
                 throws Exception {
             final long currTimestamp = System.currentTimeMillis();
             for (int i = 0; i < timersPerRecord; i++) {
-                
context.timerService().registerProcessingTimeTimer(currTimestamp - 
timersPerRecord + i);
+                context.timerService()
+                        .registerProcessingTimeTimer(currTimestamp - 
timersPerRecord + i);
             }
         }
 
@@ -119,7 +121,7 @@ public class ProcessingTimerBenchmark extends BenchmarkBase 
{
         public void onTimer(long timestamp, OnTimerContext ctx, 
Collector<String> out)
                 throws Exception {
             if (++firedTimesCount == timersPerRecord) {
-                LATCH.trigger();
+                latch.trigger();
             }
         }
     }
diff --git a/src/main/java/org/apache/flink/benchmark/RemoteBenchmarkBase.java 
b/src/main/java/org/apache/flink/benchmark/RemoteBenchmarkBase.java
index e6fef61..70715e0 100644
--- a/src/main/java/org/apache/flink/benchmark/RemoteBenchmarkBase.java
+++ b/src/main/java/org/apache/flink/benchmark/RemoteBenchmarkBase.java
@@ -36,7 +36,9 @@ public abstract class RemoteBenchmarkBase extends 
BenchmarkBase {
             return 1;
         }
 
-        /** @return the number of vertices the respective job graph contains. 
*/
+        /**
+         * @return the number of vertices the respective job graph contains.
+         */
         abstract int getNumberOfVertices();
     }
 }
diff --git 
a/src/main/java/org/apache/flink/benchmark/SerializationFrameworkMiniBenchmarks.java
 
b/src/main/java/org/apache/flink/benchmark/SerializationFrameworkMiniBenchmarks.java
index dd4a242..61402c0 100644
--- 
a/src/main/java/org/apache/flink/benchmark/SerializationFrameworkMiniBenchmarks.java
+++ 
b/src/main/java/org/apache/flink/benchmark/SerializationFrameworkMiniBenchmarks.java
@@ -68,7 +68,8 @@ public class SerializationFrameworkMiniBenchmarks extends 
BenchmarkBase {
         StreamExecutionEnvironment env = context.env;
         env.setParallelism(4);
         ExecutionConfig executionConfig = env.getConfig();
-        SerializerConfigImpl serializerConfig = (SerializerConfigImpl) 
executionConfig.getSerializerConfig();
+        SerializerConfigImpl serializerConfig =
+                (SerializerConfigImpl) executionConfig.getSerializerConfig();
         serializerConfig.registerPojoType(MyPojo.class);
         serializerConfig.registerPojoType(MyOperation.class);
 
@@ -85,7 +86,8 @@ public class SerializationFrameworkMiniBenchmarks extends 
BenchmarkBase {
         StreamExecutionEnvironment env = context.env;
         env.setParallelism(1);
         ExecutionConfig executionConfig = env.getConfig();
-        SerializerConfigImpl serializerConfig = (SerializerConfigImpl) 
executionConfig.getSerializerConfig();
+        SerializerConfigImpl serializerConfig =
+                (SerializerConfigImpl) executionConfig.getSerializerConfig();
         serializerConfig.registerPojoType(MyPojo.class);
         serializerConfig.registerPojoType(MyOperation.class);
 
@@ -115,7 +117,8 @@ public class SerializationFrameworkMiniBenchmarks extends 
BenchmarkBase {
         StreamExecutionEnvironment env = context.env;
         env.setParallelism(4);
         ExecutionConfig executionConfig = env.getConfig();
-        SerializerConfigImpl serializerConfig = (SerializerConfigImpl) 
executionConfig.getSerializerConfig();
+        SerializerConfigImpl serializerConfig =
+                (SerializerConfigImpl) executionConfig.getSerializerConfig();
         serializerConfig.setForceKryo(true);
         serializerConfig.registerKryoType(MyPojo.class);
         serializerConfig.registerKryoType(MyOperation.class);
@@ -167,9 +170,9 @@ public class SerializationFrameworkMiniBenchmarks extends 
BenchmarkBase {
             super.init();
             templates =
                     new String[] {
-                        makeString(StringSerializationBenchmark.asciiChars, 
1024),
-                        makeString(StringSerializationBenchmark.russianChars, 
1024),
-                        makeString(StringSerializationBenchmark.chineseChars, 
1024)
+                        makeString(StringSerializationBenchmark.ASCII_CHARS, 
1024),
+                        makeString(StringSerializationBenchmark.RUSSIAN_CHARS, 
1024),
+                        makeString(StringSerializationBenchmark.CHINESE_CHARS, 
1024)
                     };
         }
 
diff --git 
a/src/main/java/org/apache/flink/benchmark/StateBackendBenchmarkBase.java 
b/src/main/java/org/apache/flink/benchmark/StateBackendBenchmarkBase.java
index 00d665e..629ee99 100644
--- a/src/main/java/org/apache/flink/benchmark/StateBackendBenchmarkBase.java
+++ b/src/main/java/org/apache/flink/benchmark/StateBackendBenchmarkBase.java
@@ -21,11 +21,7 @@ package org.apache.flink.benchmark;
 import org.apache.flink.benchmark.functions.IntegerLongSource;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.configuration.StateBackendOptions;
-import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
-import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.util.FileUtils;
 
@@ -65,7 +61,6 @@ public class StateBackendBenchmarkBase extends BenchmarkBase {
                 e.printStackTrace();
             }
 
-
             Configuration configuration = 
Configuration.fromMap(env.getConfiguration().toMap());
             String checkpointDataUri = "file://" + 
checkpointDir.getAbsolutePath();
             switch (stateBackend) {
@@ -93,7 +88,7 @@ public class StateBackendBenchmarkBase extends BenchmarkBase {
             }
 
             // default character
-            //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+            // env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
             source = env.addSource(new IntegerLongSource(numberOfElements, 
recordsPerInvocation));
         }
 
diff --git 
a/src/main/java/org/apache/flink/benchmark/WatermarkAggregationBenchmark.java 
b/src/main/java/org/apache/flink/benchmark/WatermarkAggregationBenchmark.java
index 07d6063..9bdac47 100644
--- 
a/src/main/java/org/apache/flink/benchmark/WatermarkAggregationBenchmark.java
+++ 
b/src/main/java/org/apache/flink/benchmark/WatermarkAggregationBenchmark.java
@@ -16,15 +16,12 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.benchmark;
 
 import 
org.apache.flink.runtime.source.coordinator.SourceCoordinatorAlignmentBenchmark;
 
 import org.openjdk.jmh.annotations.Benchmark;
-import org.openjdk.jmh.annotations.BenchmarkMode;
 import org.openjdk.jmh.annotations.Level;
-import org.openjdk.jmh.annotations.Mode;
 import org.openjdk.jmh.annotations.OperationsPerInvocation;
 import org.openjdk.jmh.annotations.Setup;
 import org.openjdk.jmh.annotations.TearDown;
@@ -34,7 +31,9 @@ import org.openjdk.jmh.runner.options.Options;
 import org.openjdk.jmh.runner.options.OptionsBuilder;
 import org.openjdk.jmh.runner.options.VerboseMode;
 
-/** The watermark aggregation benchmark for source coordinator when enabling 
the watermark alignment. */
+/**
+ * The watermark aggregation benchmark for source coordinator when enabling 
the watermark alignment.
+ */
 public class WatermarkAggregationBenchmark extends BenchmarkBase {
 
     private static final int NUM_SUBTASKS = 5000;
@@ -47,7 +46,10 @@ public class WatermarkAggregationBenchmark extends 
BenchmarkBase {
         Options options =
                 new OptionsBuilder()
                         .verbosity(VerboseMode.NORMAL)
-                        .include(".*" + 
WatermarkAggregationBenchmark.class.getCanonicalName() + ".*")
+                        .include(
+                                ".*"
+                                        + 
WatermarkAggregationBenchmark.class.getCanonicalName()
+                                        + ".*")
                         .build();
 
         new Runner(options).run();
@@ -71,5 +73,4 @@ public class WatermarkAggregationBenchmark extends 
BenchmarkBase {
     public void teardown() throws Exception {
         benchmark.teardown();
     }
-
 }
diff --git 
a/src/main/java/org/apache/flink/benchmark/full/PojoSerializationBenchmark.java 
b/src/main/java/org/apache/flink/benchmark/full/PojoSerializationBenchmark.java
index f6a961b..98a34b9 100644
--- 
a/src/main/java/org/apache/flink/benchmark/full/PojoSerializationBenchmark.java
+++ 
b/src/main/java/org/apache/flink/benchmark/full/PojoSerializationBenchmark.java
@@ -61,7 +61,9 @@ public class PojoSerializationBenchmark extends BenchmarkBase 
{
             
TypeInformation.of(SerializationFrameworkMiniBenchmarks.MyPojo.class)
                     .createSerializer(config.getSerializerConfig());
     TypeSerializer<SerializationFrameworkMiniBenchmarks.MyPojo> kryoSerializer 
=
-            new 
KryoSerializer<>(SerializationFrameworkMiniBenchmarks.MyPojo.class, 
config.getSerializerConfig());
+            new KryoSerializer<>(
+                    SerializationFrameworkMiniBenchmarks.MyPojo.class,
+                    config.getSerializerConfig());
     TypeSerializer<org.apache.flink.benchmark.avro.MyPojo> avroSerializer =
             new AvroSerializer<>(org.apache.flink.benchmark.avro.MyPojo.class);
 
diff --git 
a/src/main/java/org/apache/flink/benchmark/full/SerializationFrameworkAllBenchmarks.java
 
b/src/main/java/org/apache/flink/benchmark/full/SerializationFrameworkAllBenchmarks.java
index dafb4ae..94f7152 100644
--- 
a/src/main/java/org/apache/flink/benchmark/full/SerializationFrameworkAllBenchmarks.java
+++ 
b/src/main/java/org/apache/flink/benchmark/full/SerializationFrameworkAllBenchmarks.java
@@ -127,7 +127,8 @@ public class SerializationFrameworkAllBenchmarks extends 
SerializationFrameworkM
         StreamExecutionEnvironment env = context.env;
         env.setParallelism(4);
         ExecutionConfig executionConfig = env.getConfig();
-        SerializerConfigImpl serializerConfig = (SerializerConfigImpl) 
executionConfig.getSerializerConfig();
+        SerializerConfigImpl serializerConfig =
+                (SerializerConfigImpl) executionConfig.getSerializerConfig();
         serializerConfig.setForceKryo(true);
         serializerConfig.addDefaultKryoSerializer(
                 org.apache.flink.benchmark.thrift.MyPojo.class, 
TBaseSerializer.class);
@@ -147,7 +148,8 @@ public class SerializationFrameworkAllBenchmarks extends 
SerializationFrameworkM
         StreamExecutionEnvironment env = context.env;
         env.setParallelism(4);
         ExecutionConfig executionConfig = env.getConfig();
-        SerializerConfigImpl serializerConfig = (SerializerConfigImpl) 
executionConfig.getSerializerConfig();
+        SerializerConfigImpl serializerConfig =
+                (SerializerConfigImpl) executionConfig.getSerializerConfig();
         serializerConfig.setForceKryo(true);
         serializerConfig.registerTypeWithKryoSerializer(
                 
org.apache.flink.benchmark.protobuf.MyPojoOuterClass.MyPojo.class,
diff --git 
a/src/main/java/org/apache/flink/benchmark/full/StringSerializationBenchmark.java
 
b/src/main/java/org/apache/flink/benchmark/full/StringSerializationBenchmark.java
index 6cc032e..dd7c3b2 100644
--- 
a/src/main/java/org/apache/flink/benchmark/full/StringSerializationBenchmark.java
+++ 
b/src/main/java/org/apache/flink/benchmark/full/StringSerializationBenchmark.java
@@ -52,20 +52,24 @@ import java.util.concurrent.TimeUnit;
 @OutputTimeUnit(TimeUnit.MILLISECONDS)
 public class StringSerializationBenchmark extends BenchmarkBase {
 
-    public static final char[] asciiChars =
+    public static final char[] ASCII_CHARS =
             
"qwertyuiopasdfghjklzxcvbnmQWERTYUIOPASDFGHJKLZXCVBNM1234567890".toCharArray();
-    public static final char[] russianChars =
+    public static final char[] RUSSIAN_CHARS =
             
"йцукенгшщзхъфывапролджэячсмитьбюЙЦУКЕНГШЩЗХЪФЫВАПРОЛДЖЭЯЧСМИТЬБЮ".toCharArray();
-    public static final char[] chineseChars =
+    public static final char[] CHINESE_CHARS =
             "的是不了人我在有他这为之大来以个中上们到国说和地也子要时道出而于就下得可你年生".toCharArray();
+
     @Param({"ascii", "russian", "chinese"})
     public String type;
+
     @Param({"4", "128", "16384"})
     public String lengthStr;
+
     int length;
     String input;
     ExecutionConfig config = new ExecutionConfig();
-    TypeSerializer<String> serializer = 
TypeInformation.of(String.class).createSerializer(config.getSerializerConfig());
+    TypeSerializer<String> serializer =
+            
TypeInformation.of(String.class).createSerializer(config.getSerializerConfig());
     ByteArrayInputStream serializedBuffer;
     DataInputView serializedStream;
 
@@ -85,13 +89,13 @@ public class StringSerializationBenchmark extends 
BenchmarkBase {
         length = Integer.parseInt(lengthStr);
         switch (type) {
             case "ascii":
-                input = generate(asciiChars, length);
+                input = generate(ASCII_CHARS, length);
                 break;
             case "russian":
-                input = generate(russianChars, length);
+                input = generate(RUSSIAN_CHARS, length);
                 break;
             case "chinese":
-                input = generate(chineseChars, length);
+                input = generate(CHINESE_CHARS, length);
                 break;
             default:
                 throw new IllegalArgumentException(type + "charset is not 
supported");
diff --git 
a/src/main/java/org/apache/flink/benchmark/functions/IntegerLongSource.java 
b/src/main/java/org/apache/flink/benchmark/functions/IntegerLongSource.java
index 22ad2f9..6afba8d 100644
--- a/src/main/java/org/apache/flink/benchmark/functions/IntegerLongSource.java
+++ b/src/main/java/org/apache/flink/benchmark/functions/IntegerLongSource.java
@@ -24,6 +24,7 @@ public class IntegerLongSource extends 
RichParallelSourceFunction<IntegerLongSou
     private volatile boolean running = true;
     private int numberOfKeys;
     private long numberOfElements;
+
     public IntegerLongSource(int numberOfKeys, long numberOfElements) {
         this.numberOfKeys = numberOfKeys;
         this.numberOfElements = numberOfElements;
diff --git 
a/src/main/java/org/apache/flink/benchmark/functions/LongSourceType.java 
b/src/main/java/org/apache/flink/benchmark/functions/LongSourceType.java
index d44eafc..f4c45f8 100644
--- a/src/main/java/org/apache/flink/benchmark/functions/LongSourceType.java
+++ b/src/main/java/org/apache/flink/benchmark/functions/LongSourceType.java
@@ -54,4 +54,4 @@ public enum LongSourceType {
     public DataStreamSource<Long> source(StreamExecutionEnvironment 
environment, long maxValue) {
         return factory.apply(environment, maxValue);
     }
-};
+}
diff --git 
a/src/main/java/org/apache/flink/scheduler/benchmark/e2e/HandleGlobalFailureAndRestartAllTasksBenchmarkExecutor.java
 
b/src/main/java/org/apache/flink/scheduler/benchmark/e2e/HandleGlobalFailureAndRestartAllTasksBenchmarkExecutor.java
index 105ad5e..2b89b55 100644
--- 
a/src/main/java/org/apache/flink/scheduler/benchmark/e2e/HandleGlobalFailureAndRestartAllTasksBenchmarkExecutor.java
+++ 
b/src/main/java/org/apache/flink/scheduler/benchmark/e2e/HandleGlobalFailureAndRestartAllTasksBenchmarkExecutor.java
@@ -32,7 +32,8 @@ import org.openjdk.jmh.annotations.TearDown;
 import org.openjdk.jmh.runner.RunnerException;
 
 /** The benchmark of handle global failure and restarting tasks in a 
STREAMING/BATCH job. */
-public class HandleGlobalFailureAndRestartAllTasksBenchmarkExecutor extends 
SchedulerBenchmarkExecutorBase {
+public class HandleGlobalFailureAndRestartAllTasksBenchmarkExecutor
+        extends SchedulerBenchmarkExecutorBase {
 
     @Param({"BATCH", "STREAMING", "BATCH_EVENLY", "STREAMING_EVENLY"})
     private JobConfiguration jobConfiguration;
diff --git 
a/src/main/java/org/apache/flink/scheduler/benchmark/scheduling/SchedulingDownstreamTasksInBatchJobBenchmarkExecutor.java
 
b/src/main/java/org/apache/flink/scheduler/benchmark/scheduling/SchedulingDownstreamTasksInBatchJobBenchmarkExecutor.java
index 182776b..b718747 100644
--- 
a/src/main/java/org/apache/flink/scheduler/benchmark/scheduling/SchedulingDownstreamTasksInBatchJobBenchmarkExecutor.java
+++ 
b/src/main/java/org/apache/flink/scheduler/benchmark/scheduling/SchedulingDownstreamTasksInBatchJobBenchmarkExecutor.java
@@ -35,7 +35,12 @@ import org.openjdk.jmh.runner.RunnerException;
 public class SchedulingDownstreamTasksInBatchJobBenchmarkExecutor
         extends SchedulerBenchmarkExecutorBase {
 
-    @Param({"BATCH", "BATCH_HYBRID_DEFAULT", "BATCH_HYBRID_PARTIAL_FINISHED", 
"BATCH_HYBRID_ALL_FINISHED"})
+    @Param({
+        "BATCH",
+        "BATCH_HYBRID_DEFAULT",
+        "BATCH_HYBRID_PARTIAL_FINISHED",
+        "BATCH_HYBRID_ALL_FINISHED"
+    })
     private JobConfiguration jobConfiguration;
 
     private SchedulingDownstreamTasksInBatchJobBenchmark benchmark;
diff --git 
a/src/main/java/org/apache/flink/state/benchmark/HashMapStateBackendRescalingBenchmarkExecutor.java
 
b/src/main/java/org/apache/flink/state/benchmark/HashMapStateBackendRescalingBenchmarkExecutor.java
index 1ec6032..f630cc8 100644
--- 
a/src/main/java/org/apache/flink/state/benchmark/HashMapStateBackendRescalingBenchmarkExecutor.java
+++ 
b/src/main/java/org/apache/flink/state/benchmark/HashMapStateBackendRescalingBenchmarkExecutor.java
@@ -19,13 +19,16 @@
 package org.apache.flink.state.benchmark;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.config.ConfigUtil;
-import org.apache.flink.config.StateBenchmarkOptions;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.state.benchmark.RescalingBenchmarkBuilder;
 import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
 import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
-import org.openjdk.jmh.annotations.*;
+
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Warmup;
 import org.openjdk.jmh.runner.RunnerException;
 
 import java.io.IOException;
@@ -48,7 +51,8 @@ public class HashMapStateBackendRescalingBenchmarkExecutor 
extends RescalingBenc
 
     @Setup(Level.Trial)
     public void setUp() throws Exception {
-        // FsStateBackend is deprecated in favor of HashMapStateBackend with 
setting checkpointStorage.
+        // FsStateBackend is deprecated in favor of HashMapStateBackend with 
setting
+        // checkpointStorage.
         HashMapStateBackend stateBackend = new HashMapStateBackend();
         benchmark =
                 new RescalingBenchmarkBuilder<byte[]>()
@@ -56,10 +60,16 @@ public class HashMapStateBackendRescalingBenchmarkExecutor 
extends RescalingBenc
                         
.setParallelismBefore(rescaleType.getParallelismBefore())
                         .setParallelismAfter(rescaleType.getParallelismAfter())
                         .setCheckpointStorageAccess(
-                                new FileSystemCheckpointStorage(new 
URI("file://" + prepareDirectory("rescaleDb").getAbsolutePath()), 0)
+                                new FileSystemCheckpointStorage(
+                                                new URI(
+                                                        "file://"
+                                                                + 
prepareDirectory("rescaleDb")
+                                                                        
.getAbsolutePath()),
+                                                0)
                                         .createCheckpointStorage(new JobID()))
                         .setStateBackend(stateBackend)
-                        .setStreamRecordGenerator(new 
ByteArrayRecordGenerator(numberOfKeys, keyLen))
+                        .setStreamRecordGenerator(
+                                new ByteArrayRecordGenerator(numberOfKeys, 
keyLen))
                         
.setStateProcessFunctionSupplier(TestKeyedFunction::new)
                         .build();
         benchmark.setUp();
diff --git 
a/src/main/java/org/apache/flink/state/benchmark/ListStateBenchmark.java 
b/src/main/java/org/apache/flink/state/benchmark/ListStateBenchmark.java
index c8c6f2d..e350ace 100644
--- a/src/main/java/org/apache/flink/state/benchmark/ListStateBenchmark.java
+++ b/src/main/java/org/apache/flink/state/benchmark/ListStateBenchmark.java
@@ -39,14 +39,14 @@ import java.util.concurrent.atomic.AtomicInteger;
 import static 
org.apache.flink.state.benchmark.StateBackendBenchmarkUtils.applyToAllKeys;
 import static 
org.apache.flink.state.benchmark.StateBackendBenchmarkUtils.compactState;
 import static 
org.apache.flink.state.benchmark.StateBackendBenchmarkUtils.getListState;
-import static 
org.apache.flink.state.benchmark.StateBenchmarkConstants.listValueCount;
-import static 
org.apache.flink.state.benchmark.StateBenchmarkConstants.setupKeyCount;
+import static 
org.apache.flink.state.benchmark.StateBenchmarkConstants.LIST_VALUE_COUNT;
+import static 
org.apache.flink.state.benchmark.StateBenchmarkConstants.SETUP_KEY_COUNT;
 
 /** Implementation for list state benchmark testing. */
 public class ListStateBenchmark extends StateBenchmarkBase {
-    private final String STATE_NAME = "listState";
-    private final ListStateDescriptor<Long> STATE_DESC =
-            new ListStateDescriptor<>(STATE_NAME, Long.class);
+    private final String stateName = "listState";
+    private final ListStateDescriptor<Long> stateDesc =
+            new ListStateDescriptor<>(stateName, Long.class);
     private ListState<Long> listState;
     private List<Long> dummyLists;
 
@@ -63,9 +63,9 @@ public class ListStateBenchmark extends StateBenchmarkBase {
     @Setup
     public void setUp() throws Exception {
         keyedStateBackend = createKeyedStateBackend();
-        listState = getListState(keyedStateBackend, STATE_DESC);
-        dummyLists = new ArrayList<>(listValueCount);
-        for (int i = 0; i < listValueCount; ++i) {
+        listState = getListState(keyedStateBackend, stateDesc);
+        dummyLists = new ArrayList<>(LIST_VALUE_COUNT);
+        for (int i = 0; i < LIST_VALUE_COUNT; ++i) {
             dummyLists.add(random.nextLong());
         }
         keyIndex = new AtomicInteger();
@@ -73,27 +73,27 @@ public class ListStateBenchmark extends StateBenchmarkBase {
 
     @Setup(Level.Iteration)
     public void setUpPerIteration() throws Exception {
-        for (int i = 0; i < setupKeyCount; ++i) {
+        for (int i = 0; i < SETUP_KEY_COUNT; ++i) {
             keyedStateBackend.setCurrentKey((long) i);
             listState.add(random.nextLong());
         }
         // make sure only one sst file left, so all get invocation will access 
this single file,
         // to prevent the spike caused by different key distribution in 
multiple sst files,
         // the more access to the older sst file, the lower throughput will be.
-        compactState(keyedStateBackend, STATE_DESC);
+        compactState(keyedStateBackend, stateDesc);
     }
 
     @TearDown(Level.Iteration)
     public void tearDownPerIteration() throws Exception {
         applyToAllKeys(
                 keyedStateBackend,
-                STATE_DESC,
+                stateDesc,
                 (k, state) -> {
                     keyedStateBackend.setCurrentKey(k);
                     state.clear();
                 });
         // make the clearance effective, trigger compaction for RocksDB, and 
GC for heap.
-        if (!compactState(keyedStateBackend, STATE_DESC)) {
+        if (!compactState(keyedStateBackend, stateDesc)) {
             System.gc();
         }
         // wait a while for the clearance to take effect.
diff --git 
a/src/main/java/org/apache/flink/state/benchmark/MapStateBenchmark.java 
b/src/main/java/org/apache/flink/state/benchmark/MapStateBenchmark.java
index 32dce60..c840b5b 100644
--- a/src/main/java/org/apache/flink/state/benchmark/MapStateBenchmark.java
+++ b/src/main/java/org/apache/flink/state/benchmark/MapStateBenchmark.java
@@ -37,9 +37,9 @@ import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static 
org.apache.flink.state.benchmark.StateBackendBenchmarkUtils.getMapState;
-import static 
org.apache.flink.state.benchmark.StateBenchmarkConstants.mapKeyCount;
-import static org.apache.flink.state.benchmark.StateBenchmarkConstants.mapKeys;
-import static 
org.apache.flink.state.benchmark.StateBenchmarkConstants.setupKeyCount;
+import static 
org.apache.flink.state.benchmark.StateBenchmarkConstants.MAP_KEYS;
+import static 
org.apache.flink.state.benchmark.StateBenchmarkConstants.MAP_KEY_COUNT;
+import static 
org.apache.flink.state.benchmark.StateBenchmarkConstants.SETUP_KEY_COUNT;
 
 /** Implementation for map state benchmark testing. */
 public class MapStateBenchmark extends StateBenchmarkBase {
@@ -63,14 +63,14 @@ public class MapStateBenchmark extends StateBenchmarkBase {
                 getMapState(
                         keyedStateBackend,
                         new MapStateDescriptor<>("mapState", Long.class, 
Double.class));
-        dummyMaps = new HashMap<>(mapKeyCount);
-        for (int i = 0; i < mapKeyCount; ++i) {
-            dummyMaps.put(mapKeys.get(i), random.nextDouble());
+        dummyMaps = new HashMap<>(MAP_KEY_COUNT);
+        for (int i = 0; i < MAP_KEY_COUNT; ++i) {
+            dummyMaps.put(MAP_KEYS.get(i), random.nextDouble());
         }
-        for (int i = 0; i < setupKeyCount; ++i) {
+        for (int i = 0; i < SETUP_KEY_COUNT; ++i) {
             keyedStateBackend.setCurrentKey((long) i);
-            for (int j = 0; j < mapKeyCount; j++) {
-                mapState.put(mapKeys.get(j), random.nextDouble());
+            for (int j = 0; j < MAP_KEY_COUNT; j++) {
+                mapState.put(MAP_KEYS.get(j), random.nextDouble());
             }
         }
         keyIndex = new AtomicInteger();
@@ -107,7 +107,7 @@ public class MapStateBenchmark extends StateBenchmarkBase {
     }
 
     @Benchmark
-    @OperationsPerInvocation(mapKeyCount)
+    @OperationsPerInvocation(MAP_KEY_COUNT)
     public void mapKeys(KeyValue keyValue, Blackhole bh) throws Exception {
         keyedStateBackend.setCurrentKey(keyValue.setUpKey);
         for (Long key : mapState.keys()) {
@@ -116,7 +116,7 @@ public class MapStateBenchmark extends StateBenchmarkBase {
     }
 
     @Benchmark
-    @OperationsPerInvocation(mapKeyCount)
+    @OperationsPerInvocation(MAP_KEY_COUNT)
     public void mapValues(KeyValue keyValue, Blackhole bh) throws Exception {
         keyedStateBackend.setCurrentKey(keyValue.setUpKey);
         for (Double value : mapState.values()) {
@@ -125,7 +125,7 @@ public class MapStateBenchmark extends StateBenchmarkBase {
     }
 
     @Benchmark
-    @OperationsPerInvocation(mapKeyCount)
+    @OperationsPerInvocation(MAP_KEY_COUNT)
     public void mapEntries(KeyValue keyValue, Blackhole bh) throws Exception {
         keyedStateBackend.setCurrentKey(keyValue.setUpKey);
         Iterable<Map.Entry<Long, Double>> iterable = mapState.entries();
@@ -138,7 +138,7 @@ public class MapStateBenchmark extends StateBenchmarkBase {
     }
 
     @Benchmark
-    @OperationsPerInvocation(mapKeyCount)
+    @OperationsPerInvocation(MAP_KEY_COUNT)
     public void mapIterator(KeyValue keyValue, Blackhole bh) throws Exception {
         keyedStateBackend.setCurrentKey(keyValue.setUpKey);
         Iterator<Map.Entry<Long, Double>> iterator = mapState.iterator();
diff --git 
a/src/main/java/org/apache/flink/state/benchmark/RescalingBenchmarkBase.java 
b/src/main/java/org/apache/flink/state/benchmark/RescalingBenchmarkBase.java
index f5b749d..4b6d68c 100644
--- a/src/main/java/org/apache/flink/state/benchmark/RescalingBenchmarkBase.java
+++ b/src/main/java/org/apache/flink/state/benchmark/RescalingBenchmarkBase.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.state.benchmark;
 
 import org.apache.flink.api.common.functions.OpenContext;
@@ -23,13 +24,10 @@ import 
org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.benchmark.BenchmarkBase;
-import org.apache.flink.config.ConfigUtil;
-import org.apache.flink.config.StateBenchmarkOptions;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.state.benchmark.RescalingBenchmark;
 import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.Collector;
+
 import org.openjdk.jmh.annotations.Param;
 import org.openjdk.jmh.annotations.Scope;
 import org.openjdk.jmh.annotations.State;
@@ -41,8 +39,6 @@ import org.openjdk.jmh.runner.options.VerboseMode;
 
 import java.io.File;
 import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Paths;
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.Random;
@@ -105,9 +101,7 @@ public class RescalingBenchmarkBase extends BenchmarkBase {
         private final byte[] fatArray;
         private int count = 0;
 
-
-        protected ByteArrayRecordGenerator(final int numberOfKeys,
-                                           final int keyLen) {
+        protected ByteArrayRecordGenerator(final int numberOfKeys, final int 
keyLen) {
             this.numberOfKeys = numberOfKeys;
             fatArray = new byte[keyLen];
         }
diff --git 
a/src/main/java/org/apache/flink/state/benchmark/RocksdbStateBackendRescalingBenchmarkExecutor.java
 
b/src/main/java/org/apache/flink/state/benchmark/RocksdbStateBackendRescalingBenchmarkExecutor.java
index b552ad7..205fc8e 100644
--- 
a/src/main/java/org/apache/flink/state/benchmark/RocksdbStateBackendRescalingBenchmarkExecutor.java
+++ 
b/src/main/java/org/apache/flink/state/benchmark/RocksdbStateBackendRescalingBenchmarkExecutor.java
@@ -15,17 +15,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.state.benchmark;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.config.ConfigUtil;
-import org.apache.flink.config.StateBenchmarkOptions;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
-import org.apache.flink.state.benchmark.RescalingBenchmarkBuilder;
 import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
 
-import org.openjdk.jmh.annotations.*;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Warmup;
 import org.openjdk.jmh.runner.RunnerException;
 
 import java.io.IOException;
@@ -55,10 +58,14 @@ public class RocksdbStateBackendRescalingBenchmarkExecutor 
extends RescalingBenc
                         .setParallelismAfter(rescaleType.getParallelismAfter())
                         .setManagedMemorySize(512 * 1024 * 1024)
                         .setCheckpointStorageAccess(
-                                new FileSystemCheckpointStorage("file://" + 
prepareDirectory("rescaleDb").getAbsolutePath())
+                                new FileSystemCheckpointStorage(
+                                                "file://"
+                                                        + 
prepareDirectory("rescaleDb")
+                                                                
.getAbsolutePath())
                                         .createCheckpointStorage(new JobID()))
                         .setStateBackend(stateBackend)
-                        .setStreamRecordGenerator(new 
ByteArrayRecordGenerator(numberOfKeys, keyLen))
+                        .setStreamRecordGenerator(
+                                new ByteArrayRecordGenerator(numberOfKeys, 
keyLen))
                         
.setStateProcessFunctionSupplier(TestKeyedFunction::new)
                         .build();
         benchmark.setUp();
diff --git 
a/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkBase.java 
b/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkBase.java
index 8c1f970..c61c82e 100644
--- a/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkBase.java
+++ b/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkBase.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.state.benchmark;
 
 import org.apache.flink.benchmark.BenchmarkBase;
@@ -22,8 +23,8 @@ import org.apache.flink.config.ConfigUtil;
 import org.apache.flink.config.StateBenchmarkOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.KeyedStateBackend;
-
 import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+
 import org.openjdk.jmh.annotations.Level;
 import org.openjdk.jmh.annotations.Param;
 import org.openjdk.jmh.annotations.Scope;
@@ -41,15 +42,15 @@ import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static 
org.apache.flink.state.benchmark.StateBackendBenchmarkUtils.cleanUp;
-import static 
org.apache.flink.state.benchmark.StateBenchmarkConstants.mapKeyCount;
-import static org.apache.flink.state.benchmark.StateBenchmarkConstants.mapKeys;
-import static 
org.apache.flink.state.benchmark.StateBenchmarkConstants.mapValues;
-import static 
org.apache.flink.state.benchmark.StateBenchmarkConstants.newKeyCount;
-import static org.apache.flink.state.benchmark.StateBenchmarkConstants.newKeys;
-import static 
org.apache.flink.state.benchmark.StateBenchmarkConstants.randomValueCount;
-import static 
org.apache.flink.state.benchmark.StateBenchmarkConstants.randomValues;
-import static 
org.apache.flink.state.benchmark.StateBenchmarkConstants.setupKeyCount;
-import static 
org.apache.flink.state.benchmark.StateBenchmarkConstants.setupKeys;
+import static 
org.apache.flink.state.benchmark.StateBenchmarkConstants.MAP_KEYS;
+import static 
org.apache.flink.state.benchmark.StateBenchmarkConstants.MAP_KEY_COUNT;
+import static 
org.apache.flink.state.benchmark.StateBenchmarkConstants.MAP_VALUES;
+import static 
org.apache.flink.state.benchmark.StateBenchmarkConstants.NEW_KEYS;
+import static 
org.apache.flink.state.benchmark.StateBenchmarkConstants.NEW_KEY_COUNT;
+import static 
org.apache.flink.state.benchmark.StateBenchmarkConstants.RANDOM_VALUES;
+import static 
org.apache.flink.state.benchmark.StateBenchmarkConstants.RANDOM_VALUE_COUNT;
+import static 
org.apache.flink.state.benchmark.StateBenchmarkConstants.SETUP_KEYS;
+import static 
org.apache.flink.state.benchmark.StateBenchmarkConstants.SETUP_KEY_COUNT;
 
 /** Base implementation of the state benchmarks. */
 public class StateBenchmarkBase extends BenchmarkBase {
@@ -66,8 +67,10 @@ public class StateBenchmarkBase extends BenchmarkBase {
         return createKeyedStateBackend(TtlTimeProvider.DEFAULT);
     }
 
-    protected KeyedStateBackend<Long> createKeyedStateBackend(TtlTimeProvider 
ttlTimeProvider) throws Exception {
-        return StateBackendBenchmarkUtils.createKeyedStateBackend(backendType, 
createStateDataDir());
+    protected KeyedStateBackend<Long> createKeyedStateBackend(TtlTimeProvider 
ttlTimeProvider)
+            throws Exception {
+        return StateBackendBenchmarkUtils.createKeyedStateBackend(
+                backendType, createStateDataDir());
     }
 
     public static File createStateDataDir() throws IOException {
@@ -108,16 +111,16 @@ public class StateBenchmarkBase extends BenchmarkBase {
         @Setup(Level.Invocation)
         public void kvSetup() {
             int currentIndex = getCurrentIndex();
-            setUpKey = setupKeys.get(currentIndex % setupKeyCount);
-            newKey = newKeys.get(currentIndex % newKeyCount);
-            mapKey = mapKeys.get(currentIndex % mapKeyCount);
-            mapValue = mapValues.get(currentIndex % mapKeyCount);
-            value = randomValues.get(currentIndex % randomValueCount);
+            setUpKey = SETUP_KEYS.get(currentIndex % SETUP_KEY_COUNT);
+            newKey = NEW_KEYS.get(currentIndex % NEW_KEY_COUNT);
+            mapKey = MAP_KEYS.get(currentIndex % MAP_KEY_COUNT);
+            mapValue = MAP_VALUES.get(currentIndex % MAP_KEY_COUNT);
+            value = RANDOM_VALUES.get(currentIndex % RANDOM_VALUE_COUNT);
             // TODO: singletonList is taking 25% of time in mapAdd 
benchmark... This shouldn't be
             // initiated if benchmark is not using it and for the benchmarks 
that are using it,
             // this should also be probably somehow avoided.
             listValue =
-                    Collections.singletonList(randomValues.get(currentIndex % 
randomValueCount));
+                    Collections.singletonList(RANDOM_VALUES.get(currentIndex % 
RANDOM_VALUE_COUNT));
         }
 
         @TearDown(Level.Invocation)
diff --git 
a/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkConstants.java 
b/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkConstants.java
index c0a141f..d445a20 100644
--- 
a/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkConstants.java
+++ 
b/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkConstants.java
@@ -28,54 +28,54 @@ import java.util.Random;
  */
 public class StateBenchmarkConstants {
     // TODO: why all of those static fields? Those should be inside a context 
class
-    public static final int mapKeyCount = 10;
-    public static final int listValueCount = 100;
-    public static final int setupKeyCount = 500_000;
-    public static final String rootDirName = "benchmark";
-    public static final String recoveryDirName = "localRecovery";
-    public static final String dbDirName = "dbPath";
+    public static final int MAP_KEY_COUNT = 10;
+    public static final int LIST_VALUE_COUNT = 100;
+    public static final int SETUP_KEY_COUNT = 500_000;
+    public static final String ROOT_DIR_NAME = "benchmark";
+    public static final String RECOVERY_DIR_NAME = "localRecovery";
+    public static final String DB_PATH = "dbPath";
 
-    public static final ArrayList<Long> mapKeys = new ArrayList<>(mapKeyCount);
-    public static final ArrayList<Double> mapValues = new 
ArrayList<>(mapKeyCount);
-    public static final ArrayList<Long> setupKeys = new 
ArrayList<>(setupKeyCount);
-    public static final int newKeyCount = 500_000;
-    public static final ArrayList<Long> newKeys = new ArrayList<>(newKeyCount);
-    public static final int randomValueCount = 1_000_000;
-    public static final ArrayList<Long> randomValues = new 
ArrayList<>(randomValueCount);
+    public static final ArrayList<Long> MAP_KEYS = new 
ArrayList<>(MAP_KEY_COUNT);
+    public static final ArrayList<Double> MAP_VALUES = new 
ArrayList<>(MAP_KEY_COUNT);
+    public static final ArrayList<Long> SETUP_KEYS = new 
ArrayList<>(SETUP_KEY_COUNT);
+    public static final int NEW_KEY_COUNT = 500_000;
+    public static final ArrayList<Long> NEW_KEYS = new 
ArrayList<>(NEW_KEY_COUNT);
+    public static final int RANDOM_VALUE_COUNT = 1_000_000;
+    public static final ArrayList<Long> RANDOM_VALUES = new 
ArrayList<>(RANDOM_VALUE_COUNT);
 
     static {
-        for (int i = 0; i < mapKeyCount; i++) {
-            mapKeys.add((long) i);
+        for (int i = 0; i < MAP_KEY_COUNT; i++) {
+            MAP_KEYS.add((long) i);
         }
-        Collections.shuffle(mapKeys);
+        Collections.shuffle(MAP_KEYS);
     }
 
     static {
         Random random = new Random();
-        for (int i = 0; i < mapKeyCount; i++) {
-            mapValues.add(random.nextDouble());
+        for (int i = 0; i < MAP_KEY_COUNT; i++) {
+            MAP_VALUES.add(random.nextDouble());
         }
-        Collections.shuffle(mapValues);
+        Collections.shuffle(MAP_VALUES);
     }
 
     static {
-        for (long i = 0; i < setupKeyCount; i++) {
-            setupKeys.add(i);
+        for (long i = 0; i < SETUP_KEY_COUNT; i++) {
+            SETUP_KEYS.add(i);
         }
-        Collections.shuffle(setupKeys);
+        Collections.shuffle(SETUP_KEYS);
     }
 
     static {
-        for (long i = 0; i < newKeyCount; i++) {
-            newKeys.add(i + setupKeyCount);
+        for (long i = 0; i < NEW_KEY_COUNT; i++) {
+            NEW_KEYS.add(i + SETUP_KEY_COUNT);
         }
-        Collections.shuffle(newKeys);
+        Collections.shuffle(NEW_KEYS);
     }
 
     static {
-        for (long i = 0; i < randomValueCount; i++) {
-            randomValues.add(i);
+        for (long i = 0; i < RANDOM_VALUE_COUNT; i++) {
+            RANDOM_VALUES.add(i);
         }
-        Collections.shuffle(randomValues);
+        Collections.shuffle(RANDOM_VALUES);
     }
 }
diff --git 
a/src/main/java/org/apache/flink/state/benchmark/ValueStateBenchmark.java 
b/src/main/java/org/apache/flink/state/benchmark/ValueStateBenchmark.java
index 0be9e14..7c35d50 100644
--- a/src/main/java/org/apache/flink/state/benchmark/ValueStateBenchmark.java
+++ b/src/main/java/org/apache/flink/state/benchmark/ValueStateBenchmark.java
@@ -33,7 +33,7 @@ import java.io.IOException;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static 
org.apache.flink.state.benchmark.StateBackendBenchmarkUtils.getValueState;
-import static 
org.apache.flink.state.benchmark.StateBenchmarkConstants.setupKeyCount;
+import static 
org.apache.flink.state.benchmark.StateBenchmarkConstants.SETUP_KEY_COUNT;
 
 /** Implementation for listValue state benchmark testing. */
 public class ValueStateBenchmark extends StateBenchmarkBase {
@@ -54,7 +54,7 @@ public class ValueStateBenchmark extends StateBenchmarkBase {
         keyedStateBackend = createKeyedStateBackend();
         valueState =
                 getValueState(keyedStateBackend, new 
ValueStateDescriptor<>("kvState", Long.class));
-        for (int i = 0; i < setupKeyCount; ++i) {
+        for (int i = 0; i < SETUP_KEY_COUNT; ++i) {
             keyedStateBackend.setCurrentKey((long) i);
             valueState.update(random.nextLong());
         }
diff --git 
a/src/main/java/org/apache/flink/state/benchmark/ttl/TtlListStateBenchmark.java 
b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlListStateBenchmark.java
index 829b440..33fdac7 100644
--- 
a/src/main/java/org/apache/flink/state/benchmark/ttl/TtlListStateBenchmark.java
+++ 
b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlListStateBenchmark.java
@@ -21,6 +21,7 @@ package org.apache.flink.state.benchmark.ttl;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.state.benchmark.StateBenchmarkBase;
+
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.Level;
 import org.openjdk.jmh.annotations.Setup;
@@ -39,12 +40,12 @@ import java.util.concurrent.atomic.AtomicInteger;
 import static 
org.apache.flink.state.benchmark.StateBackendBenchmarkUtils.applyToAllKeys;
 import static 
org.apache.flink.state.benchmark.StateBackendBenchmarkUtils.compactState;
 import static 
org.apache.flink.state.benchmark.StateBackendBenchmarkUtils.getListState;
-import static 
org.apache.flink.state.benchmark.StateBenchmarkConstants.listValueCount;
-import static 
org.apache.flink.state.benchmark.StateBenchmarkConstants.setupKeyCount;
+import static 
org.apache.flink.state.benchmark.StateBenchmarkConstants.LIST_VALUE_COUNT;
+import static 
org.apache.flink.state.benchmark.StateBenchmarkConstants.SETUP_KEY_COUNT;
 
 /** Implementation for list state benchmark testing. */
 public class TtlListStateBenchmark extends TtlStateBenchmarkBase {
-    private final String STATE_NAME = "listState";
+    private final String stateName = "listState";
     private ListStateDescriptor<Long> stateDesc;
     private ListState<Long> listState;
     private List<Long> dummyLists;
@@ -62,10 +63,10 @@ public class TtlListStateBenchmark extends 
TtlStateBenchmarkBase {
     @Setup
     public void setUp() throws Exception {
         keyedStateBackend = createKeyedStateBackend();
-        stateDesc = configTtl(new ListStateDescriptor<>(STATE_NAME, 
Long.class));
+        stateDesc = configTtl(new ListStateDescriptor<>(stateName, 
Long.class));
         listState = getListState(keyedStateBackend, stateDesc);
-        dummyLists = new ArrayList<>(listValueCount);
-        for (int i = 0; i < listValueCount; ++i) {
+        dummyLists = new ArrayList<>(LIST_VALUE_COUNT);
+        for (int i = 0; i < LIST_VALUE_COUNT; ++i) {
             dummyLists.add(random.nextLong());
         }
         keyIndex = new AtomicInteger();
@@ -73,7 +74,7 @@ public class TtlListStateBenchmark extends 
TtlStateBenchmarkBase {
 
     @Setup(Level.Iteration)
     public void setUpPerIteration() throws Exception {
-        for (int i = 0; i < setupKeyCount; ++i) {
+        for (int i = 0; i < SETUP_KEY_COUNT; ++i) {
             keyedStateBackend.setCurrentKey((long) i);
             setTtlWhenInitialization();
             listState.add(random.nextLong());
@@ -127,7 +128,8 @@ public class TtlListStateBenchmark extends 
TtlStateBenchmarkBase {
     }
 
     @Benchmark
-    public void listGetAndIterate(StateBenchmarkBase.KeyValue keyValue, 
Blackhole bh) throws Exception {
+    public void listGetAndIterate(StateBenchmarkBase.KeyValue keyValue, 
Blackhole bh)
+            throws Exception {
         keyedStateBackend.setCurrentKey(keyValue.setUpKey);
         Iterable<Long> iterable = listState.get();
         for (Long value : iterable) {
diff --git 
a/src/main/java/org/apache/flink/state/benchmark/ttl/TtlMapStateBenchmark.java 
b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlMapStateBenchmark.java
index 772a103..8e61ffe 100644
--- 
a/src/main/java/org/apache/flink/state/benchmark/ttl/TtlMapStateBenchmark.java
+++ 
b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlMapStateBenchmark.java
@@ -21,6 +21,7 @@ package org.apache.flink.state.benchmark.ttl;
 import org.apache.flink.api.common.state.MapState;
 import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.state.benchmark.StateBenchmarkBase;
+
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.Level;
 import org.openjdk.jmh.annotations.OperationsPerInvocation;
@@ -38,9 +39,9 @@ import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static 
org.apache.flink.state.benchmark.StateBackendBenchmarkUtils.getMapState;
-import static 
org.apache.flink.state.benchmark.StateBenchmarkConstants.mapKeyCount;
-import static org.apache.flink.state.benchmark.StateBenchmarkConstants.mapKeys;
-import static 
org.apache.flink.state.benchmark.StateBenchmarkConstants.setupKeyCount;
+import static 
org.apache.flink.state.benchmark.StateBenchmarkConstants.MAP_KEYS;
+import static 
org.apache.flink.state.benchmark.StateBenchmarkConstants.MAP_KEY_COUNT;
+import static 
org.apache.flink.state.benchmark.StateBenchmarkConstants.SETUP_KEY_COUNT;
 
 /** Implementation for map state benchmark testing. */
 public class TtlMapStateBenchmark extends TtlStateBenchmarkBase {
@@ -64,15 +65,15 @@ public class TtlMapStateBenchmark extends 
TtlStateBenchmarkBase {
                 getMapState(
                         keyedStateBackend,
                         configTtl(new MapStateDescriptor<>("mapState", 
Long.class, Double.class)));
-        dummyMaps = new HashMap<>(mapKeyCount);
-        for (int i = 0; i < mapKeyCount; ++i) {
-            dummyMaps.put(mapKeys.get(i), random.nextDouble());
+        dummyMaps = new HashMap<>(MAP_KEY_COUNT);
+        for (int i = 0; i < MAP_KEY_COUNT; ++i) {
+            dummyMaps.put(MAP_KEYS.get(i), random.nextDouble());
         }
-        for (int i = 0; i < setupKeyCount; ++i) {
+        for (int i = 0; i < SETUP_KEY_COUNT; ++i) {
             keyedStateBackend.setCurrentKey((long) i);
-            for (int j = 0; j < mapKeyCount; j++) {
+            for (int j = 0; j < MAP_KEY_COUNT; j++) {
                 setTtlWhenInitialization();
-                mapState.put(mapKeys.get(j), random.nextDouble());
+                mapState.put(MAP_KEYS.get(j), random.nextDouble());
             }
         }
         keyIndex = new AtomicInteger();
@@ -108,7 +109,7 @@ public class TtlMapStateBenchmark extends 
TtlStateBenchmarkBase {
     }
 
     @Benchmark
-    @OperationsPerInvocation(mapKeyCount)
+    @OperationsPerInvocation(MAP_KEY_COUNT)
     public void mapIterator(StateBenchmarkBase.KeyValue keyValue, Blackhole 
bh) throws Exception {
         keyedStateBackend.setCurrentKey(keyValue.setUpKey);
         Iterator<Map.Entry<Long, Double>> iterator = mapState.iterator();
diff --git 
a/src/main/java/org/apache/flink/state/benchmark/ttl/TtlStateBenchmarkBase.java 
b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlStateBenchmarkBase.java
index bfe0017..a5d4fa1 100644
--- 
a/src/main/java/org/apache/flink/state/benchmark/ttl/TtlStateBenchmarkBase.java
+++ 
b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlStateBenchmarkBase.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.state.StateTtlConfig;
 import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.state.benchmark.StateBenchmarkBase;
+
 import org.openjdk.jmh.annotations.Param;
 
 import java.time.Duration;
@@ -42,6 +43,7 @@ public class TtlStateBenchmarkBase extends StateBenchmarkBase 
{
         NeverExpired(0);
 
         public long advanceTimePerIteration;
+
         ExpiredTimeOptions(int expirePercentPerIteration) {
             this.advanceTimePerIteration = initialTime * 
expirePercentPerIteration / 100;
         }
diff --git 
a/src/main/java/org/apache/flink/state/benchmark/ttl/TtlValueStateBenchmark.java
 
b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlValueStateBenchmark.java
index ee34cfb..12ed5c2 100644
--- 
a/src/main/java/org/apache/flink/state/benchmark/ttl/TtlValueStateBenchmark.java
+++ 
b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlValueStateBenchmark.java
@@ -20,6 +20,7 @@ package org.apache.flink.state.benchmark.ttl;
 
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
+
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.Level;
 import org.openjdk.jmh.annotations.Setup;
@@ -33,7 +34,7 @@ import java.io.IOException;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static 
org.apache.flink.state.benchmark.StateBackendBenchmarkUtils.getValueState;
-import static 
org.apache.flink.state.benchmark.StateBenchmarkConstants.setupKeyCount;
+import static 
org.apache.flink.state.benchmark.StateBenchmarkConstants.SETUP_KEY_COUNT;
 
 /** Implementation for listValue state benchmark testing. */
 public class TtlValueStateBenchmark extends TtlStateBenchmarkBase {
@@ -52,8 +53,11 @@ public class TtlValueStateBenchmark extends 
TtlStateBenchmarkBase {
     @Setup
     public void setUp() throws Exception {
         keyedStateBackend = createKeyedStateBackend();
-        valueState = getValueState(keyedStateBackend, configTtl(new 
ValueStateDescriptor<>("kvState", Long.class)));
-        for (int i = 0; i < setupKeyCount; ++i) {
+        valueState =
+                getValueState(
+                        keyedStateBackend,
+                        configTtl(new ValueStateDescriptor<>("kvState", 
Long.class)));
+        for (int i = 0; i < SETUP_KEY_COUNT; ++i) {
             setTtlWhenInitialization();
             keyedStateBackend.setCurrentKey((long) i);
             valueState.update(random.nextLong());

Reply via email to