This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a commit to branch test1.14
in repository https://gitbox.apache.org/repos/asf/flink-benchmarks.git


The following commit(s) were added to refs/heads/test1.14 by this push:
     new 90849eb  fix compile error
90849eb is described below

commit 90849eb002022b078be7ac7397d4a0bcc1679517
Author: Yun Gao <gaoyunhen...@gmail.com>
AuthorDate: Wed Sep 22 23:01:00 2021 +0800

    fix compile error
---
 .../benchmark/BlockingPartitionBenchmark.java      | 256 ++++++++++-----------
 .../BlockingPartitionRemoteChannelBenchmark.java   | 172 +++++++-------
 .../apache/flink/benchmark/StreamGraphUtils.java   |  90 ++++----
 ...DownstreamTasksInBatchJobBenchmarkExecutor.java | 132 +++++------
 ...loyingTasksInStreamingJobBenchmarkExecutor.java | 132 +++++------
 .../e2e/CreateSchedulerBenchmarkExecutor.java      |   2 +-
 .../SchedulingAndDeployingBenchmarkExecutor.java   |   2 +-
 ...RegionToRestartInBatchJobBenchmarkExecutor.java | 130 +++++------
 ...onToRestartInStreamingJobBenchmarkExecutor.java | 130 +++++------
 ...artitionReleaseInBatchJobBenchmarkExecutor.java | 130 +++++------
 .../InitSchedulingStrategyBenchmarkExecutor.java   | 132 +++++------
 ...DownstreamTasksInBatchJobBenchmarkExecutor.java | 130 +++++------
 .../BuildExecutionGraphBenchmarkExecutor.java      | 130 +++++------
 13 files changed, 784 insertions(+), 784 deletions(-)

diff --git 
a/src/main/java/org/apache/flink/benchmark/BlockingPartitionBenchmark.java 
b/src/main/java/org/apache/flink/benchmark/BlockingPartitionBenchmark.java
index 425debe..487c58e 100644
--- a/src/main/java/org/apache/flink/benchmark/BlockingPartitionBenchmark.java
+++ b/src/main/java/org/apache/flink/benchmark/BlockingPartitionBenchmark.java
@@ -1,128 +1,128 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.benchmark;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.graph.StreamGraph;
-import org.apache.flink.util.FileUtils;
-
-import org.openjdk.jmh.annotations.Benchmark;
-import org.openjdk.jmh.annotations.OperationsPerInvocation;
-import org.openjdk.jmh.annotations.Setup;
-import org.openjdk.jmh.runner.Runner;
-import org.openjdk.jmh.runner.RunnerException;
-import org.openjdk.jmh.runner.options.Options;
-import org.openjdk.jmh.runner.options.OptionsBuilder;
-import org.openjdk.jmh.runner.options.VerboseMode;
-
-import java.io.IOException;
-
-/**
- * JMH throughput benchmark runner.
- */
-@OperationsPerInvocation(value = 
BlockingPartitionBenchmark.RECORDS_PER_INVOCATION)
-public class BlockingPartitionBenchmark extends BenchmarkBase {
-
-       public static final int RECORDS_PER_INVOCATION = 15_000_000;
-
-       public static void main(String[] args)
-                       throws RunnerException {
-               Options options = new OptionsBuilder()
-                               .verbosity(VerboseMode.NORMAL)
-                               .include(".*" + 
BlockingPartitionBenchmark.class.getCanonicalName() + ".*")
-                               .build();
-
-               new Runner(options).run();
-       }
-
-       @Benchmark
-       public void 
uncompressedFilePartition(UncompressedFileEnvironmentContext context) throws 
Exception {
-               executeBenchmark(context.env);
-       }
-
-       @Benchmark
-       public void compressedFilePartition(CompressedFileEnvironmentContext 
context) throws Exception {
-               executeBenchmark(context.env);
-       }
-
-       @Benchmark
-       public void 
uncompressedMmapPartition(UncompressedMmapEnvironmentContext context) throws 
Exception {
-               executeBenchmark(context.env);
-       }
-
-       private void executeBenchmark(StreamExecutionEnvironment env) throws 
Exception {
-               StreamGraph streamGraph = 
StreamGraphUtils.buildGraphForBatchJob(env, RECORDS_PER_INVOCATION);
-               env.execute(streamGraph);
-       }
-
-       /**
-        * Setup for the benchmark(s).
-        */
-       public static class BlockingPartitionEnvironmentContext extends 
FlinkEnvironmentContext {
-
-               /**
-                * Parallelism of 1 causes the reads/writes to be always 
sequential and only covers the case
-                * of one reader. More parallelism should be more suitable for 
finding performance regressions
-                * of the code. Considering that the benchmarking machine has 4 
CPU cores, we set the parallelism
-                * to 4.
-                */
-               private final int parallelism = 4;
-
-               @Override
-               public void setUp() throws IOException {
-                       super.setUp();
-
-                       env.setParallelism(parallelism);
-                       env.setBufferTimeout(-1);
-               }
-
-               protected Configuration createConfiguration(boolean 
compressionEnabled, String subpartitionType) {
-                       Configuration configuration = 
super.createConfiguration();
-
-                       
configuration.setBoolean(NettyShuffleEnvironmentOptions.BLOCKING_SHUFFLE_COMPRESSION_ENABLED,
 compressionEnabled);
-                       
configuration.setString(NettyShuffleEnvironmentOptions.NETWORK_BLOCKING_SHUFFLE_TYPE,
 subpartitionType);
-                       configuration.setString(CoreOptions.TMP_DIRS, 
FileUtils.getCurrentWorkingDirectory().toAbsolutePath().toString());
-                       return configuration;
-               }
-       }
-
-       public static class UncompressedFileEnvironmentContext extends 
BlockingPartitionEnvironmentContext {
-               @Override
-               protected Configuration createConfiguration() {
-                       return createConfiguration(false, "file");
-               }
-       }
-
-       public static class CompressedFileEnvironmentContext extends 
BlockingPartitionEnvironmentContext {
-               @Override
-               protected Configuration createConfiguration() {
-                       return createConfiguration(true, "file");
-               }
-       }
-
-       public static class UncompressedMmapEnvironmentContext extends 
BlockingPartitionEnvironmentContext {
-               @Override
-               protected Configuration createConfiguration() {
-                       return createConfiguration(false, "mmap");
-               }
-       }
-}
+///*
+// * Licensed to the Apache Software Foundation (ASF) under one
+// * or more contributor license agreements.  See the NOTICE file
+// * distributed with this work for additional information
+// * regarding copyright ownership.  The ASF licenses this file
+// * to you under the Apache License, Version 2.0 (the
+// * "License"); you may not use this file except in compliance
+// * with the License.  You may obtain a copy of the License at
+// *
+// *     http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// */
+//
+//package org.apache.flink.benchmark;
+//
+//import org.apache.flink.configuration.Configuration;
+//import org.apache.flink.configuration.CoreOptions;
+//import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+//import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+//import org.apache.flink.streaming.api.graph.StreamGraph;
+//import org.apache.flink.util.FileUtils;
+//
+//import org.openjdk.jmh.annotations.Benchmark;
+//import org.openjdk.jmh.annotations.OperationsPerInvocation;
+//import org.openjdk.jmh.annotations.Setup;
+//import org.openjdk.jmh.runner.Runner;
+//import org.openjdk.jmh.runner.RunnerException;
+//import org.openjdk.jmh.runner.options.Options;
+//import org.openjdk.jmh.runner.options.OptionsBuilder;
+//import org.openjdk.jmh.runner.options.VerboseMode;
+//
+//import java.io.IOException;
+//
+///**
+// * JMH throughput benchmark runner.
+// */
+//@OperationsPerInvocation(value = 
BlockingPartitionBenchmark.RECORDS_PER_INVOCATION)
+//public class BlockingPartitionBenchmark extends BenchmarkBase {
+//
+//     public static final int RECORDS_PER_INVOCATION = 15_000_000;
+//
+//     public static void main(String[] args)
+//                     throws RunnerException {
+//             Options options = new OptionsBuilder()
+//                             .verbosity(VerboseMode.NORMAL)
+//                             .include(".*" + 
BlockingPartitionBenchmark.class.getCanonicalName() + ".*")
+//                             .build();
+//
+//             new Runner(options).run();
+//     }
+//
+//     @Benchmark
+//     public void 
uncompressedFilePartition(UncompressedFileEnvironmentContext context) throws 
Exception {
+//             executeBenchmark(context.env);
+//     }
+//
+//     @Benchmark
+//     public void compressedFilePartition(CompressedFileEnvironmentContext 
context) throws Exception {
+//             executeBenchmark(context.env);
+//     }
+//
+//     @Benchmark
+//     public void 
uncompressedMmapPartition(UncompressedMmapEnvironmentContext context) throws 
Exception {
+//             executeBenchmark(context.env);
+//     }
+//
+//     private void executeBenchmark(StreamExecutionEnvironment env) throws 
Exception {
+//             StreamGraph streamGraph = 
StreamGraphUtils.buildGraphForBatchJob(env, RECORDS_PER_INVOCATION);
+//             env.execute(streamGraph);
+//     }
+//
+//     /**
+//      * Setup for the benchmark(s).
+//      */
+//     public static class BlockingPartitionEnvironmentContext extends 
FlinkEnvironmentContext {
+//
+//             /**
+//              * Parallelism of 1 causes the reads/writes to be always 
sequential and only covers the case
+//              * of one reader. More parallelism should be more suitable for 
finding performance regressions
+//              * of the code. Considering that the benchmarking machine has 4 
CPU cores, we set the parallelism
+//              * to 4.
+//              */
+//             private final int parallelism = 4;
+//
+//             @Override
+//             public void setUp() throws IOException {
+//                     super.setUp();
+//
+//                     env.setParallelism(parallelism);
+//                     env.setBufferTimeout(-1);
+//             }
+//
+//             protected Configuration createConfiguration(boolean 
compressionEnabled, String subpartitionType) {
+//                     Configuration configuration = 
super.createConfiguration();
+//
+//                     
configuration.setBoolean(NettyShuffleEnvironmentOptions.BLOCKING_SHUFFLE_COMPRESSION_ENABLED,
 compressionEnabled);
+//                     
configuration.setString(NettyShuffleEnvironmentOptions.NETWORK_BLOCKING_SHUFFLE_TYPE,
 subpartitionType);
+//                     configuration.setString(CoreOptions.TMP_DIRS, 
FileUtils.getCurrentWorkingDirectory().toAbsolutePath().toString());
+//                     return configuration;
+//             }
+//     }
+//
+//     public static class UncompressedFileEnvironmentContext extends 
BlockingPartitionEnvironmentContext {
+//             @Override
+//             protected Configuration createConfiguration() {
+//                     return createConfiguration(false, "file");
+//             }
+//     }
+//
+//     public static class CompressedFileEnvironmentContext extends 
BlockingPartitionEnvironmentContext {
+//             @Override
+//             protected Configuration createConfiguration() {
+//                     return createConfiguration(true, "file");
+//             }
+//     }
+//
+//     public static class UncompressedMmapEnvironmentContext extends 
BlockingPartitionEnvironmentContext {
+//             @Override
+//             protected Configuration createConfiguration() {
+//                     return createConfiguration(false, "mmap");
+//             }
+//     }
+//}
diff --git 
a/src/main/java/org/apache/flink/benchmark/BlockingPartitionRemoteChannelBenchmark.java
 
b/src/main/java/org/apache/flink/benchmark/BlockingPartitionRemoteChannelBenchmark.java
index 21e7cc1..8ebf4b8 100644
--- 
a/src/main/java/org/apache/flink/benchmark/BlockingPartitionRemoteChannelBenchmark.java
+++ 
b/src/main/java/org/apache/flink/benchmark/BlockingPartitionRemoteChannelBenchmark.java
@@ -1,86 +1,86 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.benchmark;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
-import org.apache.flink.streaming.api.graph.StreamGraph;
-import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
-import org.apache.flink.util.FileUtils;
-
-import org.openjdk.jmh.annotations.Benchmark;
-import org.openjdk.jmh.annotations.OperationsPerInvocation;
-import org.openjdk.jmh.annotations.Setup;
-import org.openjdk.jmh.runner.Runner;
-import org.openjdk.jmh.runner.RunnerException;
-import org.openjdk.jmh.runner.options.Options;
-import org.openjdk.jmh.runner.options.OptionsBuilder;
-import org.openjdk.jmh.runner.options.VerboseMode;
-
-import java.io.IOException;
-
-@OperationsPerInvocation(value = 
BlockingPartitionRemoteChannelBenchmark.RECORDS_PER_INVOCATION)
-public class BlockingPartitionRemoteChannelBenchmark extends 
RemoteBenchmarkBase {
-
-    private static final int NUM_VERTICES = 2;
-
-    public static void main(String[] args) throws RunnerException {
-        Options options = new OptionsBuilder()
-            .verbosity(VerboseMode.NORMAL)
-            
.include(BlockingPartitionRemoteChannelBenchmark.class.getCanonicalName())
-            .build();
-
-        new Runner(options).run();
-    }
-
-    @Override
-    public int getNumberOfVertexes() {
-        return NUM_VERTICES;
-    }
-
-    @Benchmark
-    public void remoteFilePartition(BlockingPartitionEnvironmentContext 
context) throws Exception {
-        StreamGraph streamGraph = 
StreamGraphUtils.buildGraphForBatchJob(context.env, RECORDS_PER_INVOCATION);
-        
miniCluster.executeJobBlocking(StreamingJobGraphGenerator.createJobGraph(streamGraph));
-    }
-
-    /**
-     * Environment context for specific file based bounded blocking partition.
-     */
-    public static class BlockingPartitionEnvironmentContext extends 
FlinkEnvironmentContext {
-
-        @Override
-        public void setUp() throws IOException {
-            super.setUp();
-
-            env.setParallelism(PARALLELISM);
-            env.setBufferTimeout(-1);
-        }
-
-        @Override
-        protected Configuration createConfiguration() {
-            Configuration configuration = super.createConfiguration();
-
-            
configuration.setString(NettyShuffleEnvironmentOptions.NETWORK_BLOCKING_SHUFFLE_TYPE,
 "file");
-            configuration.setString(CoreOptions.TMP_DIRS, 
FileUtils.getCurrentWorkingDirectory().toAbsolutePath().toString());
-            return configuration;
-        }
-    }
-}
+///*
+// * Licensed to the Apache Software Foundation (ASF) under one
+// * or more contributor license agreements.  See the NOTICE file
+// * distributed with this work for additional information
+// * regarding copyright ownership.  The ASF licenses this file
+// * to you under the Apache License, Version 2.0 (the
+// * "License"); you may not use this file except in compliance
+// * with the License.  You may obtain a copy of the License at
+// *
+// *     http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// */
+//
+//package org.apache.flink.benchmark;
+//
+//import org.apache.flink.configuration.Configuration;
+//import org.apache.flink.configuration.CoreOptions;
+//import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+//import org.apache.flink.streaming.api.graph.StreamGraph;
+//import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
+//import org.apache.flink.util.FileUtils;
+//
+//import org.openjdk.jmh.annotations.Benchmark;
+//import org.openjdk.jmh.annotations.OperationsPerInvocation;
+//import org.openjdk.jmh.annotations.Setup;
+//import org.openjdk.jmh.runner.Runner;
+//import org.openjdk.jmh.runner.RunnerException;
+//import org.openjdk.jmh.runner.options.Options;
+//import org.openjdk.jmh.runner.options.OptionsBuilder;
+//import org.openjdk.jmh.runner.options.VerboseMode;
+//
+//import java.io.IOException;
+//
+//@OperationsPerInvocation(value = 
BlockingPartitionRemoteChannelBenchmark.RECORDS_PER_INVOCATION)
+//public class BlockingPartitionRemoteChannelBenchmark extends 
RemoteBenchmarkBase {
+//
+//    private static final int NUM_VERTICES = 2;
+//
+//    public static void main(String[] args) throws RunnerException {
+//        Options options = new OptionsBuilder()
+//            .verbosity(VerboseMode.NORMAL)
+//            
.include(BlockingPartitionRemoteChannelBenchmark.class.getCanonicalName())
+//            .build();
+//
+//        new Runner(options).run();
+//    }
+//
+//    @Override
+//    public int getNumberOfVertexes() {
+//        return NUM_VERTICES;
+//    }
+//
+//    @Benchmark
+//    public void remoteFilePartition(BlockingPartitionEnvironmentContext 
context) throws Exception {
+//        StreamGraph streamGraph = 
StreamGraphUtils.buildGraphForBatchJob(context.env, RECORDS_PER_INVOCATION);
+//        
miniCluster.executeJobBlocking(StreamingJobGraphGenerator.createJobGraph(streamGraph));
+//    }
+//
+//    /**
+//     * Environment context for specific file based bounded blocking 
partition.
+//     */
+//    public static class BlockingPartitionEnvironmentContext extends 
FlinkEnvironmentContext {
+//
+//        @Override
+//        public void setUp() throws IOException {
+//            super.setUp();
+//
+//            env.setParallelism(PARALLELISM);
+//            env.setBufferTimeout(-1);
+//        }
+//
+//        @Override
+//        protected Configuration createConfiguration() {
+//            Configuration configuration = super.createConfiguration();
+//
+//            
configuration.setString(NettyShuffleEnvironmentOptions.NETWORK_BLOCKING_SHUFFLE_TYPE,
 "file");
+//            configuration.setString(CoreOptions.TMP_DIRS, 
FileUtils.getCurrentWorkingDirectory().toAbsolutePath().toString());
+//            return configuration;
+//        }
+//    }
+//}
diff --git a/src/main/java/org/apache/flink/benchmark/StreamGraphUtils.java 
b/src/main/java/org/apache/flink/benchmark/StreamGraphUtils.java
index 0746193..3c6aa93 100644
--- a/src/main/java/org/apache/flink/benchmark/StreamGraphUtils.java
+++ b/src/main/java/org/apache/flink/benchmark/StreamGraphUtils.java
@@ -1,45 +1,45 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.benchmark;
-
-import org.apache.flink.benchmark.functions.LongSource;
-import org.apache.flink.runtime.jobgraph.JobType;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
-import org.apache.flink.streaming.api.graph.GlobalStreamExchangeMode;
-import org.apache.flink.streaming.api.graph.StreamGraph;
-
-/**
- * Utilities for building respective graph for performing in benchmark.
- */
-public class StreamGraphUtils {
-
-       public static StreamGraph 
buildGraphForBatchJob(StreamExecutionEnvironment env, int numRecords) {
-               DataStreamSource<Long> source = env.addSource(new 
LongSource(numRecords));
-               source.addSink(new DiscardingSink<>());
-
-               StreamGraph streamGraph = env.getStreamGraph();
-               streamGraph.setChaining(false);
-               
streamGraph.setGlobalStreamExchangeMode(GlobalStreamExchangeMode.ALL_EDGES_BLOCKING);
-               streamGraph.setJobType(JobType.BATCH);
-
-               return streamGraph;
-       }
-}
+///*
+// * Licensed to the Apache Software Foundation (ASF) under one
+// * or more contributor license agreements.  See the NOTICE file
+// * distributed with this work for additional information
+// * regarding copyright ownership.  The ASF licenses this file
+// * to you under the Apache License, Version 2.0 (the
+// * "License"); you may not use this file except in compliance
+// * with the License.  You may obtain a copy of the License at
+// *
+// *     http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// */
+//
+//package org.apache.flink.benchmark;
+//
+//import org.apache.flink.benchmark.functions.LongSource;
+//import org.apache.flink.runtime.jobgraph.JobType;
+//import org.apache.flink.streaming.api.datastream.DataStreamSource;
+//import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+//import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+//import org.apache.flink.streaming.api.graph.GlobalStreamExchangeMode;
+//import org.apache.flink.streaming.api.graph.StreamGraph;
+//
+///**
+// * Utilities for building respective graph for performing in benchmark.
+// */
+//public class StreamGraphUtils {
+//
+//     public static StreamGraph 
buildGraphForBatchJob(StreamExecutionEnvironment env, int numRecords) {
+//             DataStreamSource<Long> source = env.addSource(new 
LongSource(numRecords));
+//             source.addSink(new DiscardingSink<>());
+//
+//             StreamGraph streamGraph = env.getStreamGraph();
+//             streamGraph.setChaining(false);
+//             
streamGraph.setGlobalStreamExchangeMode(GlobalStreamExchangeMode.ALL_EDGES_BLOCKING);
+//             streamGraph.setJobType(JobType.BATCH);
+//
+//             return streamGraph;
+//     }
+//}
diff --git 
a/src/main/java/org/apache/flink/scheduler/benchmark/deploying/DeployingDownstreamTasksInBatchJobBenchmarkExecutor.java
 
b/src/main/java/org/apache/flink/scheduler/benchmark/deploying/DeployingDownstreamTasksInBatchJobBenchmarkExecutor.java
index 89a46f7..e264190 100644
--- 
a/src/main/java/org/apache/flink/scheduler/benchmark/deploying/DeployingDownstreamTasksInBatchJobBenchmarkExecutor.java
+++ 
b/src/main/java/org/apache/flink/scheduler/benchmark/deploying/DeployingDownstreamTasksInBatchJobBenchmarkExecutor.java
@@ -1,66 +1,66 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.scheduler.benchmark.deploying;
-
-import org.apache.flink.runtime.executiongraph.Execution;
-import org.apache.flink.runtime.scheduler.benchmark.JobConfiguration;
-import 
org.apache.flink.runtime.scheduler.benchmark.deploying.DeployingDownstreamTasksInBatchJobBenchmark;
-import org.apache.flink.scheduler.benchmark.SchedulerBenchmarkExecutorBase;
-
-import org.openjdk.jmh.annotations.Benchmark;
-import org.openjdk.jmh.annotations.BenchmarkMode;
-import org.openjdk.jmh.annotations.Level;
-import org.openjdk.jmh.annotations.Mode;
-import org.openjdk.jmh.annotations.Param;
-import org.openjdk.jmh.annotations.Setup;
-import org.openjdk.jmh.annotations.TearDown;
-import org.openjdk.jmh.runner.RunnerException;
-
-/**
- * The benchmark of deploying downstream tasks in a BATCH job.
- * The related method is {@link Execution#deploy}.
- */
-public class DeployingDownstreamTasksInBatchJobBenchmarkExecutor extends 
SchedulerBenchmarkExecutorBase {
-
-       @Param("BATCH")
-       private JobConfiguration jobConfiguration;
-
-       private DeployingDownstreamTasksInBatchJobBenchmark benchmark;
-
-       public static void main(String[] args) throws RunnerException {
-               
runBenchmark(DeployingDownstreamTasksInBatchJobBenchmarkExecutor.class);
-       }
-
-       @Setup(Level.Trial)
-       public void setup() throws Exception {
-               benchmark = new DeployingDownstreamTasksInBatchJobBenchmark();
-               benchmark.setup(jobConfiguration);
-       }
-
-       @Benchmark
-       @BenchmarkMode(Mode.SingleShotTime)
-       public void deployDownstreamTasks() throws Exception {
-               benchmark.deployDownstreamTasks();
-       }
-
-       @TearDown(Level.Trial)
-       public void teardown() {
-               benchmark.teardown();
-       }
-}
+///*
+// * Licensed to the Apache Software Foundation (ASF) under one
+// * or more contributor license agreements.  See the NOTICE file
+// * distributed with this work for additional information
+// * regarding copyright ownership.  The ASF licenses this file
+// * to you under the Apache License, Version 2.0 (the
+// * "License"); you may not use this file except in compliance
+// * with the License.  You may obtain a copy of the License at
+// *
+// *     http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// */
+//
+//package org.apache.flink.scheduler.benchmark.deploying;
+//
+//import org.apache.flink.runtime.executiongraph.Execution;
+//import org.apache.flink.runtime.scheduler.benchmark.JobConfiguration;
+//import 
org.apache.flink.runtime.scheduler.benchmark.deploying.DeployingDownstreamTasksInBatchJobBenchmark;
+//import org.apache.flink.scheduler.benchmark.SchedulerBenchmarkExecutorBase;
+//
+//import org.openjdk.jmh.annotations.Benchmark;
+//import org.openjdk.jmh.annotations.BenchmarkMode;
+//import org.openjdk.jmh.annotations.Level;
+//import org.openjdk.jmh.annotations.Mode;
+//import org.openjdk.jmh.annotations.Param;
+//import org.openjdk.jmh.annotations.Setup;
+//import org.openjdk.jmh.annotations.TearDown;
+//import org.openjdk.jmh.runner.RunnerException;
+//
+///**
+// * The benchmark of deploying downstream tasks in a BATCH job.
+// * The related method is {@link Execution#deploy}.
+// */
+//public class DeployingDownstreamTasksInBatchJobBenchmarkExecutor extends 
SchedulerBenchmarkExecutorBase {
+//
+//     @Param("BATCH")
+//     private JobConfiguration jobConfiguration;
+//
+//     private DeployingDownstreamTasksInBatchJobBenchmark benchmark;
+//
+//     public static void main(String[] args) throws RunnerException {
+//             
runBenchmark(DeployingDownstreamTasksInBatchJobBenchmarkExecutor.class);
+//     }
+//
+//     @Setup(Level.Trial)
+//     public void setup() throws Exception {
+//             benchmark = new DeployingDownstreamTasksInBatchJobBenchmark();
+//             benchmark.setup(jobConfiguration);
+//     }
+//
+//     @Benchmark
+//     @BenchmarkMode(Mode.SingleShotTime)
+//     public void deployDownstreamTasks() throws Exception {
+//             benchmark.deployDownstreamTasks();
+//     }
+//
+//     @TearDown(Level.Trial)
+//     public void teardown() {
+//             benchmark.teardown();
+//     }
+//}
diff --git 
a/src/main/java/org/apache/flink/scheduler/benchmark/deploying/DeployingTasksInStreamingJobBenchmarkExecutor.java
 
b/src/main/java/org/apache/flink/scheduler/benchmark/deploying/DeployingTasksInStreamingJobBenchmarkExecutor.java
index bbabaf4..e96e2d6 100644
--- 
a/src/main/java/org/apache/flink/scheduler/benchmark/deploying/DeployingTasksInStreamingJobBenchmarkExecutor.java
+++ 
b/src/main/java/org/apache/flink/scheduler/benchmark/deploying/DeployingTasksInStreamingJobBenchmarkExecutor.java
@@ -1,66 +1,66 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.scheduler.benchmark.deploying;
-
-import org.apache.flink.runtime.executiongraph.Execution;
-import org.apache.flink.runtime.scheduler.benchmark.JobConfiguration;
-import 
org.apache.flink.runtime.scheduler.benchmark.deploying.DeployingTasksInStreamingJobBenchmark;
-import org.apache.flink.scheduler.benchmark.SchedulerBenchmarkExecutorBase;
-
-import org.openjdk.jmh.annotations.Benchmark;
-import org.openjdk.jmh.annotations.BenchmarkMode;
-import org.openjdk.jmh.annotations.Level;
-import org.openjdk.jmh.annotations.Mode;
-import org.openjdk.jmh.annotations.Param;
-import org.openjdk.jmh.annotations.Setup;
-import org.openjdk.jmh.annotations.TearDown;
-import org.openjdk.jmh.runner.RunnerException;
-
-/**
- * The benchmark of deploying tasks in a STREAMING job.
- * The related method is {@link Execution#deploy}.
- */
-public class DeployingTasksInStreamingJobBenchmarkExecutor extends 
SchedulerBenchmarkExecutorBase {
-
-       @Param("STREAMING")
-       private JobConfiguration jobConfiguration;
-
-       private DeployingTasksInStreamingJobBenchmark benchmark;
-
-       public static void main(String[] args) throws RunnerException {
-               runBenchmark(DeployingTasksInStreamingJobBenchmark.class);
-       }
-
-       @Setup(Level.Trial)
-       public void setup() throws Exception {
-               benchmark = new DeployingTasksInStreamingJobBenchmark();
-               benchmark.setup(jobConfiguration);
-       }
-
-       @Benchmark
-       @BenchmarkMode(Mode.SingleShotTime)
-       public void deployAllTasks() throws Exception {
-               benchmark.deployAllTasks();
-       }
-
-       @TearDown(Level.Trial)
-       public void teardown() {
-               benchmark.teardown();
-       }
-}
+///*
+// * Licensed to the Apache Software Foundation (ASF) under one
+// * or more contributor license agreements.  See the NOTICE file
+// * distributed with this work for additional information
+// * regarding copyright ownership.  The ASF licenses this file
+// * to you under the Apache License, Version 2.0 (the
+// * "License"); you may not use this file except in compliance
+// * with the License.  You may obtain a copy of the License at
+// *
+// *     http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// */
+//
+//package org.apache.flink.scheduler.benchmark.deploying;
+//
+//import org.apache.flink.runtime.executiongraph.Execution;
+//import org.apache.flink.runtime.scheduler.benchmark.JobConfiguration;
+//import 
org.apache.flink.runtime.scheduler.benchmark.deploying.DeployingTasksInStreamingJobBenchmark;
+//import org.apache.flink.scheduler.benchmark.SchedulerBenchmarkExecutorBase;
+//
+//import org.openjdk.jmh.annotations.Benchmark;
+//import org.openjdk.jmh.annotations.BenchmarkMode;
+//import org.openjdk.jmh.annotations.Level;
+//import org.openjdk.jmh.annotations.Mode;
+//import org.openjdk.jmh.annotations.Param;
+//import org.openjdk.jmh.annotations.Setup;
+//import org.openjdk.jmh.annotations.TearDown;
+//import org.openjdk.jmh.runner.RunnerException;
+//
+///**
+// * The benchmark of deploying tasks in a STREAMING job.
+// * The related method is {@link Execution#deploy}.
+// */
+//public class DeployingTasksInStreamingJobBenchmarkExecutor extends 
SchedulerBenchmarkExecutorBase {
+//
+//     @Param("STREAMING")
+//     private JobConfiguration jobConfiguration;
+//
+//     private DeployingTasksInStreamingJobBenchmark benchmark;
+//
+//     public static void main(String[] args) throws RunnerException {
+//             runBenchmark(DeployingTasksInStreamingJobBenchmark.class);
+//     }
+//
+//     @Setup(Level.Trial)
+//     public void setup() throws Exception {
+//             benchmark = new DeployingTasksInStreamingJobBenchmark();
+//             benchmark.setup(jobConfiguration);
+//     }
+//
+//     @Benchmark
+//     @BenchmarkMode(Mode.SingleShotTime)
+//     public void deployAllTasks() throws Exception {
+//             benchmark.deployAllTasks();
+//     }
+//
+//     @TearDown(Level.Trial)
+//     public void teardown() {
+//             benchmark.teardown();
+//     }
+//}
diff --git 
a/src/main/java/org/apache/flink/scheduler/benchmark/e2e/CreateSchedulerBenchmarkExecutor.java
 
b/src/main/java/org/apache/flink/scheduler/benchmark/e2e/CreateSchedulerBenchmarkExecutor.java
index b14a6df..8191d00 100644
--- 
a/src/main/java/org/apache/flink/scheduler/benchmark/e2e/CreateSchedulerBenchmarkExecutor.java
+++ 
b/src/main/java/org/apache/flink/scheduler/benchmark/e2e/CreateSchedulerBenchmarkExecutor.java
@@ -60,6 +60,6 @@ public class CreateSchedulerBenchmarkExecutor extends 
SchedulerBenchmarkExecutor
 
        @TearDown(Level.Trial)
        public void teardown() {
-               benchmark.teardown();
+               // benchmark.teardown();
        }
 }
diff --git 
a/src/main/java/org/apache/flink/scheduler/benchmark/e2e/SchedulingAndDeployingBenchmarkExecutor.java
 
b/src/main/java/org/apache/flink/scheduler/benchmark/e2e/SchedulingAndDeployingBenchmarkExecutor.java
index 153c3d8..c8df10d 100644
--- 
a/src/main/java/org/apache/flink/scheduler/benchmark/e2e/SchedulingAndDeployingBenchmarkExecutor.java
+++ 
b/src/main/java/org/apache/flink/scheduler/benchmark/e2e/SchedulingAndDeployingBenchmarkExecutor.java
@@ -59,6 +59,6 @@ public class SchedulingAndDeployingBenchmarkExecutor extends 
SchedulerBenchmarkE
 
        @TearDown(Level.Trial)
        public void teardown() {
-               benchmark.teardown();
+               //benchmark.teardown();
        }
 }
diff --git 
a/src/main/java/org/apache/flink/scheduler/benchmark/failover/RegionToRestartInBatchJobBenchmarkExecutor.java
 
b/src/main/java/org/apache/flink/scheduler/benchmark/failover/RegionToRestartInBatchJobBenchmarkExecutor.java
index c8504b9..bfa0ce2 100644
--- 
a/src/main/java/org/apache/flink/scheduler/benchmark/failover/RegionToRestartInBatchJobBenchmarkExecutor.java
+++ 
b/src/main/java/org/apache/flink/scheduler/benchmark/failover/RegionToRestartInBatchJobBenchmarkExecutor.java
@@ -1,65 +1,65 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.scheduler.benchmark.failover;
-
-import org.apache.flink.runtime.scheduler.benchmark.JobConfiguration;
-import 
org.apache.flink.runtime.scheduler.benchmark.failover.RegionToRestartInBatchJobBenchmark;
-import org.apache.flink.scheduler.benchmark.SchedulerBenchmarkExecutorBase;
-
-import org.openjdk.jmh.annotations.Benchmark;
-import org.openjdk.jmh.annotations.BenchmarkMode;
-import org.openjdk.jmh.annotations.Level;
-import org.openjdk.jmh.annotations.Mode;
-import org.openjdk.jmh.annotations.Param;
-import org.openjdk.jmh.annotations.Setup;
-import org.openjdk.jmh.annotations.TearDown;
-import org.openjdk.jmh.infra.Blackhole;
-import org.openjdk.jmh.runner.RunnerException;
-
-/**
- * The benchmark of calculating the regions to restart when failover occurs in 
a BATCH job.
- */
-public class RegionToRestartInBatchJobBenchmarkExecutor extends 
SchedulerBenchmarkExecutorBase {
-
-       @Param("BATCH")
-       private JobConfiguration jobConfiguration;
-
-       private RegionToRestartInBatchJobBenchmark benchmark;
-
-       public static void main(String[] args) throws RunnerException {
-               runBenchmark(RegionToRestartInBatchJobBenchmarkExecutor.class);
-       }
-
-       @Setup(Level.Trial)
-       public void setup() throws Exception {
-               benchmark = new RegionToRestartInBatchJobBenchmark();
-               benchmark.setup(jobConfiguration);
-       }
-
-       @Benchmark
-       @BenchmarkMode(Mode.SingleShotTime)
-       public void calculateRegionToRestart(Blackhole blackhole) {
-               blackhole.consume(benchmark.calculateRegionToRestart());
-       }
-
-       @TearDown(Level.Trial)
-       public void teardown() {
-               benchmark.teardown();
-       }
-}
+///*
+// * Licensed to the Apache Software Foundation (ASF) under one
+// * or more contributor license agreements.  See the NOTICE file
+// * distributed with this work for additional information
+// * regarding copyright ownership.  The ASF licenses this file
+// * to you under the Apache License, Version 2.0 (the
+// * "License"); you may not use this file except in compliance
+// * with the License.  You may obtain a copy of the License at
+// *
+// *     http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// */
+//
+//package org.apache.flink.scheduler.benchmark.failover;
+//
+//import org.apache.flink.runtime.scheduler.benchmark.JobConfiguration;
+//import 
org.apache.flink.runtime.scheduler.benchmark.failover.RegionToRestartInBatchJobBenchmark;
+//import org.apache.flink.scheduler.benchmark.SchedulerBenchmarkExecutorBase;
+//
+//import org.openjdk.jmh.annotations.Benchmark;
+//import org.openjdk.jmh.annotations.BenchmarkMode;
+//import org.openjdk.jmh.annotations.Level;
+//import org.openjdk.jmh.annotations.Mode;
+//import org.openjdk.jmh.annotations.Param;
+//import org.openjdk.jmh.annotations.Setup;
+//import org.openjdk.jmh.annotations.TearDown;
+//import org.openjdk.jmh.infra.Blackhole;
+//import org.openjdk.jmh.runner.RunnerException;
+//
+///**
+// * The benchmark of calculating the regions to restart when failover occurs 
in a BATCH job.
+// */
+//public class RegionToRestartInBatchJobBenchmarkExecutor extends 
SchedulerBenchmarkExecutorBase {
+//
+//     @Param("BATCH")
+//     private JobConfiguration jobConfiguration;
+//
+//     private RegionToRestartInBatchJobBenchmark benchmark;
+//
+//     public static void main(String[] args) throws RunnerException {
+//             runBenchmark(RegionToRestartInBatchJobBenchmarkExecutor.class);
+//     }
+//
+//     @Setup(Level.Trial)
+//     public void setup() throws Exception {
+//             benchmark = new RegionToRestartInBatchJobBenchmark();
+//             benchmark.setup(jobConfiguration);
+//     }
+//
+//     @Benchmark
+//     @BenchmarkMode(Mode.SingleShotTime)
+//     public void calculateRegionToRestart(Blackhole blackhole) {
+//             blackhole.consume(benchmark.calculateRegionToRestart());
+//     }
+//
+//     @TearDown(Level.Trial)
+//     public void teardown() {
+//             benchmark.teardown();
+//     }
+//}
diff --git 
a/src/main/java/org/apache/flink/scheduler/benchmark/failover/RegionToRestartInStreamingJobBenchmarkExecutor.java
 
b/src/main/java/org/apache/flink/scheduler/benchmark/failover/RegionToRestartInStreamingJobBenchmarkExecutor.java
index 2b1543e..e874c4c 100644
--- 
a/src/main/java/org/apache/flink/scheduler/benchmark/failover/RegionToRestartInStreamingJobBenchmarkExecutor.java
+++ 
b/src/main/java/org/apache/flink/scheduler/benchmark/failover/RegionToRestartInStreamingJobBenchmarkExecutor.java
@@ -1,65 +1,65 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.scheduler.benchmark.failover;
-
-import org.apache.flink.runtime.scheduler.benchmark.JobConfiguration;
-import 
org.apache.flink.runtime.scheduler.benchmark.failover.RegionToRestartInStreamingJobBenchmark;
-import org.apache.flink.scheduler.benchmark.SchedulerBenchmarkExecutorBase;
-
-import org.openjdk.jmh.annotations.Benchmark;
-import org.openjdk.jmh.annotations.BenchmarkMode;
-import org.openjdk.jmh.annotations.Level;
-import org.openjdk.jmh.annotations.Mode;
-import org.openjdk.jmh.annotations.Param;
-import org.openjdk.jmh.annotations.Setup;
-import org.openjdk.jmh.annotations.TearDown;
-import org.openjdk.jmh.infra.Blackhole;
-import org.openjdk.jmh.runner.RunnerException;
-
-/**
- * The benchmark of calculating region to restart when failover occurs in a 
STREAMING job.
- */
-public class RegionToRestartInStreamingJobBenchmarkExecutor extends 
SchedulerBenchmarkExecutorBase {
-
-       @Param("STREAMING")
-       private JobConfiguration jobConfiguration;
-
-       private RegionToRestartInStreamingJobBenchmark benchmark;
-
-       public static void main(String[] args) throws RunnerException {
-               
runBenchmark(RegionToRestartInStreamingJobBenchmarkExecutor.class);
-       }
-
-       @Setup(Level.Trial)
-       public void setup() throws Exception {
-               benchmark = new RegionToRestartInStreamingJobBenchmark();
-               benchmark.setup(jobConfiguration);
-       }
-
-       @Benchmark
-       @BenchmarkMode(Mode.SingleShotTime)
-       public void calculateRegionToRestart(Blackhole blackhole) {
-               blackhole.consume(benchmark.calculateRegionToRestart());
-       }
-
-       @TearDown(Level.Trial)
-       public void teardown() {
-               benchmark.teardown();
-       }
-}
+///*
+// * Licensed to the Apache Software Foundation (ASF) under one
+// * or more contributor license agreements.  See the NOTICE file
+// * distributed with this work for additional information
+// * regarding copyright ownership.  The ASF licenses this file
+// * to you under the Apache License, Version 2.0 (the
+// * "License"); you may not use this file except in compliance
+// * with the License.  You may obtain a copy of the License at
+// *
+// *     http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// */
+//
+//package org.apache.flink.scheduler.benchmark.failover;
+//
+//import org.apache.flink.runtime.scheduler.benchmark.JobConfiguration;
+//import 
org.apache.flink.runtime.scheduler.benchmark.failover.RegionToRestartInStreamingJobBenchmark;
+//import org.apache.flink.scheduler.benchmark.SchedulerBenchmarkExecutorBase;
+//
+//import org.openjdk.jmh.annotations.Benchmark;
+//import org.openjdk.jmh.annotations.BenchmarkMode;
+//import org.openjdk.jmh.annotations.Level;
+//import org.openjdk.jmh.annotations.Mode;
+//import org.openjdk.jmh.annotations.Param;
+//import org.openjdk.jmh.annotations.Setup;
+//import org.openjdk.jmh.annotations.TearDown;
+//import org.openjdk.jmh.infra.Blackhole;
+//import org.openjdk.jmh.runner.RunnerException;
+//
+///**
+// * The benchmark of calculating region to restart when failover occurs in a 
STREAMING job.
+// */
+//public class RegionToRestartInStreamingJobBenchmarkExecutor extends 
SchedulerBenchmarkExecutorBase {
+//
+//     @Param("STREAMING")
+//     private JobConfiguration jobConfiguration;
+//
+//     private RegionToRestartInStreamingJobBenchmark benchmark;
+//
+//     public static void main(String[] args) throws RunnerException {
+//             
runBenchmark(RegionToRestartInStreamingJobBenchmarkExecutor.class);
+//     }
+//
+//     @Setup(Level.Trial)
+//     public void setup() throws Exception {
+//             benchmark = new RegionToRestartInStreamingJobBenchmark();
+//             benchmark.setup(jobConfiguration);
+//     }
+//
+//     @Benchmark
+//     @BenchmarkMode(Mode.SingleShotTime)
+//     public void calculateRegionToRestart(Blackhole blackhole) {
+//             blackhole.consume(benchmark.calculateRegionToRestart());
+//     }
+//
+//     @TearDown(Level.Trial)
+//     public void teardown() {
+//             benchmark.teardown();
+//     }
+//}
diff --git 
a/src/main/java/org/apache/flink/scheduler/benchmark/partitionrelease/PartitionReleaseInBatchJobBenchmarkExecutor.java
 
b/src/main/java/org/apache/flink/scheduler/benchmark/partitionrelease/PartitionReleaseInBatchJobBenchmarkExecutor.java
index 3657874..573a6b4 100644
--- 
a/src/main/java/org/apache/flink/scheduler/benchmark/partitionrelease/PartitionReleaseInBatchJobBenchmarkExecutor.java
+++ 
b/src/main/java/org/apache/flink/scheduler/benchmark/partitionrelease/PartitionReleaseInBatchJobBenchmarkExecutor.java
@@ -1,65 +1,65 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.scheduler.benchmark.partitionrelease;
-
-import org.apache.flink.runtime.scheduler.benchmark.JobConfiguration;
-import org.apache.flink.runtime.scheduler.benchmark.SchedulerBenchmarkUtils;
-import 
org.apache.flink.runtime.scheduler.benchmark.partitionrelease.PartitionReleaseInBatchJobBenchmark;
-import org.apache.flink.scheduler.benchmark.SchedulerBenchmarkExecutorBase;
-
-import org.openjdk.jmh.annotations.Benchmark;
-import org.openjdk.jmh.annotations.BenchmarkMode;
-import org.openjdk.jmh.annotations.Level;
-import org.openjdk.jmh.annotations.Mode;
-import org.openjdk.jmh.annotations.Param;
-import org.openjdk.jmh.annotations.Setup;
-import org.openjdk.jmh.annotations.TearDown;
-import org.openjdk.jmh.runner.RunnerException;
-
-/**
- * The benchmark of releasing partitions in a BATCH job.
- */
-public class PartitionReleaseInBatchJobBenchmarkExecutor extends 
SchedulerBenchmarkExecutorBase {
-
-       @Param("BATCH")
-       private JobConfiguration jobConfiguration;
-
-       private PartitionReleaseInBatchJobBenchmark benchmark;
-
-       public static void main(String[] args) throws RunnerException {
-               runBenchmark(PartitionReleaseInBatchJobBenchmarkExecutor.class);
-       }
-
-       @Setup(Level.Trial)
-       public void setup() throws Exception {
-               benchmark = new PartitionReleaseInBatchJobBenchmark();
-               benchmark.setup(jobConfiguration);
-       }
-
-       @Benchmark
-       @BenchmarkMode(Mode.SingleShotTime)
-       public void partitionRelease() {
-               benchmark.partitionRelease();
-       }
-
-       @TearDown(Level.Trial)
-       public void teardown() {
-               benchmark.teardown();
-       }
-}
+///*
+// * Licensed to the Apache Software Foundation (ASF) under one
+// * or more contributor license agreements.  See the NOTICE file
+// * distributed with this work for additional information
+// * regarding copyright ownership.  The ASF licenses this file
+// * to you under the Apache License, Version 2.0 (the
+// * "License"); you may not use this file except in compliance
+// * with the License.  You may obtain a copy of the License at
+// *
+// *     http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// */
+//
+//package org.apache.flink.scheduler.benchmark.partitionrelease;
+//
+//import org.apache.flink.runtime.scheduler.benchmark.JobConfiguration;
+//import org.apache.flink.runtime.scheduler.benchmark.SchedulerBenchmarkUtils;
+//import 
org.apache.flink.runtime.scheduler.benchmark.partitionrelease.PartitionReleaseInBatchJobBenchmark;
+//import org.apache.flink.scheduler.benchmark.SchedulerBenchmarkExecutorBase;
+//
+//import org.openjdk.jmh.annotations.Benchmark;
+//import org.openjdk.jmh.annotations.BenchmarkMode;
+//import org.openjdk.jmh.annotations.Level;
+//import org.openjdk.jmh.annotations.Mode;
+//import org.openjdk.jmh.annotations.Param;
+//import org.openjdk.jmh.annotations.Setup;
+//import org.openjdk.jmh.annotations.TearDown;
+//import org.openjdk.jmh.runner.RunnerException;
+//
+///**
+// * The benchmark of releasing partitions in a BATCH job.
+// */
+//public class PartitionReleaseInBatchJobBenchmarkExecutor extends 
SchedulerBenchmarkExecutorBase {
+//
+//     @Param("BATCH")
+//     private JobConfiguration jobConfiguration;
+//
+//     private PartitionReleaseInBatchJobBenchmark benchmark;
+//
+//     public static void main(String[] args) throws RunnerException {
+//             runBenchmark(PartitionReleaseInBatchJobBenchmarkExecutor.class);
+//     }
+//
+//     @Setup(Level.Trial)
+//     public void setup() throws Exception {
+//             benchmark = new PartitionReleaseInBatchJobBenchmark();
+//             benchmark.setup(jobConfiguration);
+//     }
+//
+//     @Benchmark
+//     @BenchmarkMode(Mode.SingleShotTime)
+//     public void partitionRelease() {
+//             benchmark.partitionRelease();
+//     }
+//
+//     @TearDown(Level.Trial)
+//     public void teardown() {
+//             benchmark.teardown();
+//     }
+//}
diff --git 
a/src/main/java/org/apache/flink/scheduler/benchmark/scheduling/InitSchedulingStrategyBenchmarkExecutor.java
 
b/src/main/java/org/apache/flink/scheduler/benchmark/scheduling/InitSchedulingStrategyBenchmarkExecutor.java
index 21c66d8..75a0a2d 100644
--- 
a/src/main/java/org/apache/flink/scheduler/benchmark/scheduling/InitSchedulingStrategyBenchmarkExecutor.java
+++ 
b/src/main/java/org/apache/flink/scheduler/benchmark/scheduling/InitSchedulingStrategyBenchmarkExecutor.java
@@ -1,66 +1,66 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.scheduler.benchmark.scheduling;
-
-import org.apache.flink.runtime.scheduler.benchmark.JobConfiguration;
-import org.apache.flink.runtime.scheduler.benchmark.SchedulerBenchmarkUtils;
-import 
org.apache.flink.runtime.scheduler.benchmark.scheduling.InitSchedulingStrategyBenchmark;
-import org.apache.flink.scheduler.benchmark.SchedulerBenchmarkExecutorBase;
-
-import org.openjdk.jmh.annotations.Benchmark;
-import org.openjdk.jmh.annotations.BenchmarkMode;
-import org.openjdk.jmh.annotations.Level;
-import org.openjdk.jmh.annotations.Mode;
-import org.openjdk.jmh.annotations.Param;
-import org.openjdk.jmh.annotations.Setup;
-import org.openjdk.jmh.annotations.TearDown;
-import org.openjdk.jmh.infra.Blackhole;
-import org.openjdk.jmh.runner.RunnerException;
-
-/**
- * The benchmark of initializing the scheduling strategy in a STREAMING/BATCH 
job.
- */
-public class InitSchedulingStrategyBenchmarkExecutor extends 
SchedulerBenchmarkExecutorBase {
-
-       @Param({"BATCH", "STREAMING"})
-       private JobConfiguration jobConfiguration;
-
-       private InitSchedulingStrategyBenchmark benchmark;
-
-       public static void main(String[] args) throws RunnerException {
-               runBenchmark(InitSchedulingStrategyBenchmarkExecutor.class);
-       }
-
-       @Setup(Level.Trial)
-       public void setup() throws Exception {
-               benchmark = new InitSchedulingStrategyBenchmark();
-               benchmark.setup(jobConfiguration);
-       }
-
-       @Benchmark
-       @BenchmarkMode(Mode.SingleShotTime)
-       public void initSchedulingStrategy(Blackhole blackhole) {
-               blackhole.consume(benchmark.initSchedulingStrategy());
-       }
-
-       @TearDown(Level.Trial)
-       public void teardown() {
-               benchmark.teardown();
-       }
-}
+///*
+// * Licensed to the Apache Software Foundation (ASF) under one
+// * or more contributor license agreements.  See the NOTICE file
+// * distributed with this work for additional information
+// * regarding copyright ownership.  The ASF licenses this file
+// * to you under the Apache License, Version 2.0 (the
+// * "License"); you may not use this file except in compliance
+// * with the License.  You may obtain a copy of the License at
+// *
+// *     http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// */
+//
+//package org.apache.flink.scheduler.benchmark.scheduling;
+//
+//import org.apache.flink.runtime.scheduler.benchmark.JobConfiguration;
+//import org.apache.flink.runtime.scheduler.benchmark.SchedulerBenchmarkUtils;
+//import 
org.apache.flink.runtime.scheduler.benchmark.scheduling.InitSchedulingStrategyBenchmark;
+//import org.apache.flink.scheduler.benchmark.SchedulerBenchmarkExecutorBase;
+//
+//import org.openjdk.jmh.annotations.Benchmark;
+//import org.openjdk.jmh.annotations.BenchmarkMode;
+//import org.openjdk.jmh.annotations.Level;
+//import org.openjdk.jmh.annotations.Mode;
+//import org.openjdk.jmh.annotations.Param;
+//import org.openjdk.jmh.annotations.Setup;
+//import org.openjdk.jmh.annotations.TearDown;
+//import org.openjdk.jmh.infra.Blackhole;
+//import org.openjdk.jmh.runner.RunnerException;
+//
+///**
+// * The benchmark of initializing the scheduling strategy in a 
STREAMING/BATCH job.
+// */
+//public class InitSchedulingStrategyBenchmarkExecutor extends 
SchedulerBenchmarkExecutorBase {
+//
+//     @Param({"BATCH", "STREAMING"})
+//     private JobConfiguration jobConfiguration;
+//
+//     private InitSchedulingStrategyBenchmark benchmark;
+//
+//     public static void main(String[] args) throws RunnerException {
+//             runBenchmark(InitSchedulingStrategyBenchmarkExecutor.class);
+//     }
+//
+//     @Setup(Level.Trial)
+//     public void setup() throws Exception {
+//             benchmark = new InitSchedulingStrategyBenchmark();
+//             benchmark.setup(jobConfiguration);
+//     }
+//
+//     @Benchmark
+//     @BenchmarkMode(Mode.SingleShotTime)
+//     public void initSchedulingStrategy(Blackhole blackhole) {
+//             blackhole.consume(benchmark.initSchedulingStrategy());
+//     }
+//
+//     @TearDown(Level.Trial)
+//     public void teardown() {
+//             benchmark.teardown();
+//     }
+//}
diff --git 
a/src/main/java/org/apache/flink/scheduler/benchmark/scheduling/SchedulingDownstreamTasksInBatchJobBenchmarkExecutor.java
 
b/src/main/java/org/apache/flink/scheduler/benchmark/scheduling/SchedulingDownstreamTasksInBatchJobBenchmarkExecutor.java
index 7968ff9..a8447ad 100644
--- 
a/src/main/java/org/apache/flink/scheduler/benchmark/scheduling/SchedulingDownstreamTasksInBatchJobBenchmarkExecutor.java
+++ 
b/src/main/java/org/apache/flink/scheduler/benchmark/scheduling/SchedulingDownstreamTasksInBatchJobBenchmarkExecutor.java
@@ -1,65 +1,65 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.scheduler.benchmark.scheduling;
-
-import org.apache.flink.runtime.scheduler.benchmark.JobConfiguration;
-import org.apache.flink.runtime.scheduler.benchmark.SchedulerBenchmarkUtils;
-import 
org.apache.flink.runtime.scheduler.benchmark.scheduling.SchedulingDownstreamTasksInBatchJobBenchmark;
-import org.apache.flink.scheduler.benchmark.SchedulerBenchmarkExecutorBase;
-
-import org.openjdk.jmh.annotations.Benchmark;
-import org.openjdk.jmh.annotations.BenchmarkMode;
-import org.openjdk.jmh.annotations.Level;
-import org.openjdk.jmh.annotations.Mode;
-import org.openjdk.jmh.annotations.Param;
-import org.openjdk.jmh.annotations.Setup;
-import org.openjdk.jmh.annotations.TearDown;
-import org.openjdk.jmh.runner.RunnerException;
-
-/**
- * The benchmark of scheduling downstream task in a BATCH job.
- */
-public class SchedulingDownstreamTasksInBatchJobBenchmarkExecutor extends 
SchedulerBenchmarkExecutorBase {
-
-       @Param({"BATCH"})
-       private JobConfiguration jobConfiguration;
-
-       private SchedulingDownstreamTasksInBatchJobBenchmark benchmark;
-
-       public static void main(String[] args) throws RunnerException {
-               
runBenchmark(SchedulingDownstreamTasksInBatchJobBenchmarkExecutor.class);
-       }
-
-       @Setup(Level.Trial)
-       public void setup() throws Exception {
-               benchmark = new SchedulingDownstreamTasksInBatchJobBenchmark();
-               benchmark.setup(jobConfiguration);
-       }
-
-       @Benchmark
-       @BenchmarkMode(Mode.SingleShotTime)
-       public void schedulingDownstreamTasks() {
-               benchmark.schedulingDownstreamTasks();
-       }
-
-       @TearDown(Level.Trial)
-       public void teardown() {
-               benchmark.teardown();
-       }
-}
+///*
+// * Licensed to the Apache Software Foundation (ASF) under one
+// * or more contributor license agreements.  See the NOTICE file
+// * distributed with this work for additional information
+// * regarding copyright ownership.  The ASF licenses this file
+// * to you under the Apache License, Version 2.0 (the
+// * "License"); you may not use this file except in compliance
+// * with the License.  You may obtain a copy of the License at
+// *
+// *     http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// */
+//
+//package org.apache.flink.scheduler.benchmark.scheduling;
+//
+//import org.apache.flink.runtime.scheduler.benchmark.JobConfiguration;
+//import org.apache.flink.runtime.scheduler.benchmark.SchedulerBenchmarkUtils;
+//import 
org.apache.flink.runtime.scheduler.benchmark.scheduling.SchedulingDownstreamTasksInBatchJobBenchmark;
+//import org.apache.flink.scheduler.benchmark.SchedulerBenchmarkExecutorBase;
+//
+//import org.openjdk.jmh.annotations.Benchmark;
+//import org.openjdk.jmh.annotations.BenchmarkMode;
+//import org.openjdk.jmh.annotations.Level;
+//import org.openjdk.jmh.annotations.Mode;
+//import org.openjdk.jmh.annotations.Param;
+//import org.openjdk.jmh.annotations.Setup;
+//import org.openjdk.jmh.annotations.TearDown;
+//import org.openjdk.jmh.runner.RunnerException;
+//
+///**
+// * The benchmark of scheduling downstream task in a BATCH job.
+// */
+//public class SchedulingDownstreamTasksInBatchJobBenchmarkExecutor extends 
SchedulerBenchmarkExecutorBase {
+//
+//     @Param({"BATCH"})
+//     private JobConfiguration jobConfiguration;
+//
+//     private SchedulingDownstreamTasksInBatchJobBenchmark benchmark;
+//
+//     public static void main(String[] args) throws RunnerException {
+//             
runBenchmark(SchedulingDownstreamTasksInBatchJobBenchmarkExecutor.class);
+//     }
+//
+//     @Setup(Level.Trial)
+//     public void setup() throws Exception {
+//             benchmark = new SchedulingDownstreamTasksInBatchJobBenchmark();
+//             benchmark.setup(jobConfiguration);
+//     }
+//
+//     @Benchmark
+//     @BenchmarkMode(Mode.SingleShotTime)
+//     public void schedulingDownstreamTasks() {
+//             benchmark.schedulingDownstreamTasks();
+//     }
+//
+//     @TearDown(Level.Trial)
+//     public void teardown() {
+//             benchmark.teardown();
+//     }
+//}
diff --git 
a/src/main/java/org/apache/flink/scheduler/benchmark/topology/BuildExecutionGraphBenchmarkExecutor.java
 
b/src/main/java/org/apache/flink/scheduler/benchmark/topology/BuildExecutionGraphBenchmarkExecutor.java
index cf97809..2c4498d 100644
--- 
a/src/main/java/org/apache/flink/scheduler/benchmark/topology/BuildExecutionGraphBenchmarkExecutor.java
+++ 
b/src/main/java/org/apache/flink/scheduler/benchmark/topology/BuildExecutionGraphBenchmarkExecutor.java
@@ -1,65 +1,65 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.scheduler.benchmark.topology;
-
-import org.apache.flink.runtime.scheduler.benchmark.JobConfiguration;
-import org.apache.flink.runtime.scheduler.benchmark.SchedulerBenchmarkUtils;
-import 
org.apache.flink.runtime.scheduler.benchmark.topology.BuildExecutionGraphBenchmark;
-import org.apache.flink.scheduler.benchmark.SchedulerBenchmarkExecutorBase;
-
-import org.openjdk.jmh.annotations.Benchmark;
-import org.openjdk.jmh.annotations.BenchmarkMode;
-import org.openjdk.jmh.annotations.Level;
-import org.openjdk.jmh.annotations.Mode;
-import org.openjdk.jmh.annotations.Param;
-import org.openjdk.jmh.annotations.Setup;
-import org.openjdk.jmh.annotations.TearDown;
-import org.openjdk.jmh.runner.RunnerException;
-
-/**
- * The benchmark of building the topology of ExecutionGraph in a 
STREAMING/BATCH job.
- */
-public class BuildExecutionGraphBenchmarkExecutor extends 
SchedulerBenchmarkExecutorBase {
-
-       @Param({"BATCH", "STREAMING"})
-       private JobConfiguration jobConfiguration;
-
-       private BuildExecutionGraphBenchmark benchmark;
-
-       public static void main(String[] args) throws RunnerException {
-               runBenchmark(BuildExecutionGraphBenchmarkExecutor.class);
-       }
-
-       @Setup(Level.Trial)
-       public void setup() throws Exception {
-               benchmark = new BuildExecutionGraphBenchmark();
-               benchmark.setup(jobConfiguration);
-       }
-
-       @Benchmark
-       @BenchmarkMode(Mode.SingleShotTime)
-       public void buildTopology() throws Exception {
-               benchmark.buildTopology();
-       }
-
-       @TearDown(Level.Trial)
-       public void teardown() {
-               benchmark.teardown();
-       }
-}
+///*
+// * Licensed to the Apache Software Foundation (ASF) under one
+// * or more contributor license agreements.  See the NOTICE file
+// * distributed with this work for additional information
+// * regarding copyright ownership.  The ASF licenses this file
+// * to you under the Apache License, Version 2.0 (the
+// * "License"); you may not use this file except in compliance
+// * with the License.  You may obtain a copy of the License at
+// *
+// *     http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// */
+//
+//package org.apache.flink.scheduler.benchmark.topology;
+//
+//import org.apache.flink.runtime.scheduler.benchmark.JobConfiguration;
+//import org.apache.flink.runtime.scheduler.benchmark.SchedulerBenchmarkUtils;
+//import 
org.apache.flink.runtime.scheduler.benchmark.topology.BuildExecutionGraphBenchmark;
+//import org.apache.flink.scheduler.benchmark.SchedulerBenchmarkExecutorBase;
+//
+//import org.openjdk.jmh.annotations.Benchmark;
+//import org.openjdk.jmh.annotations.BenchmarkMode;
+//import org.openjdk.jmh.annotations.Level;
+//import org.openjdk.jmh.annotations.Mode;
+//import org.openjdk.jmh.annotations.Param;
+//import org.openjdk.jmh.annotations.Setup;
+//import org.openjdk.jmh.annotations.TearDown;
+//import org.openjdk.jmh.runner.RunnerException;
+//
+///**
+// * The benchmark of building the topology of ExecutionGraph in a 
STREAMING/BATCH job.
+// */
+//public class BuildExecutionGraphBenchmarkExecutor extends 
SchedulerBenchmarkExecutorBase {
+//
+//     @Param({"BATCH", "STREAMING"})
+//     private JobConfiguration jobConfiguration;
+//
+//     private BuildExecutionGraphBenchmark benchmark;
+//
+//     public static void main(String[] args) throws RunnerException {
+//             runBenchmark(BuildExecutionGraphBenchmarkExecutor.class);
+//     }
+//
+//     @Setup(Level.Trial)
+//     public void setup() throws Exception {
+//             benchmark = new BuildExecutionGraphBenchmark();
+//             benchmark.setup(jobConfiguration);
+//     }
+//
+//     @Benchmark
+//     @BenchmarkMode(Mode.SingleShotTime)
+//     public void buildTopology() throws Exception {
+//             benchmark.buildTopology();
+//     }
+//
+//     @TearDown(Level.Trial)
+//     public void teardown() {
+//             benchmark.teardown();
+//     }
+//}

Reply via email to