This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-benchmarks.git
commit 9af64a13fa94a5ca2c4767948823ed3ea8513805 Author: 愚鲤 <yuli....@alibaba-inc.com> AuthorDate: Sun Nov 21 16:14:05 2021 +0800 [FLINK-24918][Runtime/State Backends]Support to specify the data dir for state benchmark --- README.md | 28 +++++- .../java/org/apache/flink/config/ConfigUtil.java | 102 +++++++++++++++++++++ .../apache/flink/config/StateBenchmarkOptions.java | 32 +++++++ .../flink/state/benchmark/ListStateBenchmark.java | 3 +- .../flink/state/benchmark/MapStateBenchmark.java | 3 +- .../flink/state/benchmark/StateBenchmarkBase.java | 21 ++++- .../flink/state/benchmark/ValueStateBenchmark.java | 3 +- src/main/resources/benchmark-conf.yaml | 21 +++++ .../org/apache/flink/config/ConfigUtilTest.java | 34 +++++++ src/test/resources/benchmark-conf.yaml | 19 ++++ 10 files changed, 257 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index 9dc583c..3af9581 100644 --- a/README.md +++ b/README.md @@ -21,7 +21,7 @@ mvn clean package exec:exec \ ``` If you want to execute just one benchmark, the best approach is to execute selected main function manually. -There're mainly two ways: +There're mainly three ways: 1. From your IDE (hint there is a plugin for Intellij IDEA). * In this case don't forget about selecting `flink.version`, default value for the property is defined in pom.xml. @@ -31,7 +31,14 @@ There're mainly two ways: mvn -Dflink.version=<FLINK_VERSION> clean package exec:exec \ -Dbenchmarks="<benchmark_class>" ``` -An example flink version can be -Dflink.version=1.12-SNAPSHOT. + + An example flink version can be -Dflink.version=1.12-SNAPSHOT. + +3. Run the uber jar directly like: + + ``` + java -jar target/benchmarks.jar -rf csv "<benchmark_class>" + ``` We also support to run each benchmark once (with only one fork and one iteration) for testing, with below command: @@ -39,6 +46,23 @@ We also support to run each benchmark once (with only one fork and one iteration mvn test -P test ``` +## Parameters + +There are some built-in parameters to run different benchmarks, these can be shown/overridden from the command line. + +``` +# show all the parameters combination for the <benchmark_class> +java -jar target/benchmarks.jar "<benchmark_class>" -lp + +# run benchmark for rocksdb state backend type +java -jar target/benchmarks.jar "org.apache.flink.state.benchmark.*" -p "backendType=ROCKSDB" +``` + +## Configuration + +Besides the parameters, there is also a benchmark config file `benchmark-conf.yaml` to tune some basic parameters. +For example, we can change the state data dir by putting `benchmark.state.data-dir: /data` in the config file. For more options, you can refer to the code in the `org.apache.flink.config` package. + ## Prerequisites The recent addition of OpenSSL-based benchmarks require one of two modes to be active: diff --git a/src/main/java/org/apache/flink/config/ConfigUtil.java b/src/main/java/org/apache/flink/config/ConfigUtil.java new file mode 100644 index 0000000..b4e670f --- /dev/null +++ b/src/main/java/org/apache/flink/config/ConfigUtil.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.config; + +import org.apache.flink.configuration.Configuration; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; + +public class ConfigUtil { + + private static final Logger LOG = LoggerFactory.getLogger(ConfigUtil.class); + + private static final String BENCHMARK_CONF = "benchmark-conf.yaml"; + + /** Load benchmark conf from classpath. */ + public static Configuration loadBenchMarkConf() { + InputStream inputStream = + ConfigUtil.class.getClassLoader().getResourceAsStream(BENCHMARK_CONF); + return loadYAMLResource(inputStream); + } + + /** + * This is copied from {@code GlobalConfiguration#loadYAMLResource} to avoid depending + * on @Internal api. + */ + private static Configuration loadYAMLResource(InputStream inputStream) { + final Configuration config = new Configuration(); + + try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) { + + String line; + int lineNo = 0; + while ((line = reader.readLine()) != null) { + lineNo++; + // 1. check for comments + String[] comments = line.split("#", 2); + String conf = comments[0].trim(); + + // 2. get key and value + if (conf.length() > 0) { + String[] kv = conf.split(": ", 2); + + // skip line with no valid key-value pair + if (kv.length == 1) { + LOG.warn( + "Error while trying to split key and value in configuration file " + + ":" + + lineNo + + ": \"" + + line + + "\""); + continue; + } + + String key = kv[0].trim(); + String value = kv[1].trim(); + + // sanity check + if (key.length() == 0 || value.length() == 0) { + LOG.warn( + "Error after splitting key and value in configuration file " + + ":" + + lineNo + + ": \"" + + line + + "\""); + continue; + } + + LOG.info("Loading configuration property: {}, {}", key, value); + config.setString(key, value); + } + } + } catch (IOException e) { + throw new RuntimeException("Error parsing YAML configuration.", e); + } + + return config; + } +} diff --git a/src/main/java/org/apache/flink/config/StateBenchmarkOptions.java b/src/main/java/org/apache/flink/config/StateBenchmarkOptions.java new file mode 100644 index 0000000..d54e31a --- /dev/null +++ b/src/main/java/org/apache/flink/config/StateBenchmarkOptions.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.config; + +import org.apache.flink.configuration.ConfigOption; + +import static org.apache.flink.configuration.ConfigOptions.key; + +public class StateBenchmarkOptions { + + public static final ConfigOption<String> STATE_DATA_DIR = + key("benchmark.state.data-dir") + .stringType() + .noDefaultValue() + .withDescription("The dir to put state data."); +} diff --git a/src/main/java/org/apache/flink/state/benchmark/ListStateBenchmark.java b/src/main/java/org/apache/flink/state/benchmark/ListStateBenchmark.java index 0690eb2..7fea1eb 100644 --- a/src/main/java/org/apache/flink/state/benchmark/ListStateBenchmark.java +++ b/src/main/java/org/apache/flink/state/benchmark/ListStateBenchmark.java @@ -39,7 +39,6 @@ import java.util.concurrent.atomic.AtomicInteger; import static org.apache.flink.contrib.streaming.state.benchmark.StateBackendBenchmarkUtils.applyToAllKeys; import static org.apache.flink.contrib.streaming.state.benchmark.StateBackendBenchmarkUtils.compactState; -import static org.apache.flink.contrib.streaming.state.benchmark.StateBackendBenchmarkUtils.createKeyedStateBackend; import static org.apache.flink.contrib.streaming.state.benchmark.StateBackendBenchmarkUtils.getListState; import static org.apache.flink.state.benchmark.StateBenchmarkConstants.listValueCount; import static org.apache.flink.state.benchmark.StateBenchmarkConstants.setupKeyCount; @@ -64,7 +63,7 @@ public class ListStateBenchmark extends StateBenchmarkBase { @Setup public void setUp() throws Exception { - keyedStateBackend = createKeyedStateBackend(backendType); + keyedStateBackend = createKeyedStateBackend(); listState = getListState(keyedStateBackend, STATE_DESC); dummyLists = new ArrayList<>(listValueCount); for (int i = 0; i < listValueCount; ++i) { diff --git a/src/main/java/org/apache/flink/state/benchmark/MapStateBenchmark.java b/src/main/java/org/apache/flink/state/benchmark/MapStateBenchmark.java index 52494c0..044d6b1 100644 --- a/src/main/java/org/apache/flink/state/benchmark/MapStateBenchmark.java +++ b/src/main/java/org/apache/flink/state/benchmark/MapStateBenchmark.java @@ -36,7 +36,6 @@ import java.util.Iterator; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; -import static org.apache.flink.contrib.streaming.state.benchmark.StateBackendBenchmarkUtils.createKeyedStateBackend; import static org.apache.flink.contrib.streaming.state.benchmark.StateBackendBenchmarkUtils.getMapState; import static org.apache.flink.state.benchmark.StateBenchmarkConstants.mapKeyCount; import static org.apache.flink.state.benchmark.StateBenchmarkConstants.mapKeys; @@ -59,7 +58,7 @@ public class MapStateBenchmark extends StateBenchmarkBase { @Setup public void setUp() throws Exception { - keyedStateBackend = createKeyedStateBackend(backendType); + keyedStateBackend = createKeyedStateBackend(); mapState = getMapState( keyedStateBackend, diff --git a/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkBase.java b/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkBase.java index c053f29..61f1c2c 100644 --- a/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkBase.java +++ b/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkBase.java @@ -18,6 +18,9 @@ package org.apache.flink.state.benchmark; import org.apache.flink.benchmark.BenchmarkBase; +import org.apache.flink.config.ConfigUtil; +import org.apache.flink.config.StateBenchmarkOptions; +import org.apache.flink.configuration.Configuration; import org.apache.flink.contrib.streaming.state.benchmark.StateBackendBenchmarkUtils; import org.apache.flink.runtime.state.KeyedStateBackend; @@ -28,7 +31,10 @@ import org.openjdk.jmh.annotations.Setup; import org.openjdk.jmh.annotations.State; import org.openjdk.jmh.annotations.TearDown; +import java.io.File; import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; import java.util.Collections; import java.util.List; import java.util.concurrent.ThreadLocalRandom; @@ -52,10 +58,23 @@ public class StateBenchmarkBase extends BenchmarkBase { final ThreadLocalRandom random = ThreadLocalRandom.current(); @Param({"HEAP", "ROCKSDB"}) - protected StateBackendBenchmarkUtils.StateBackendType backendType; + private StateBackendBenchmarkUtils.StateBackendType backendType; KeyedStateBackend<Long> keyedStateBackend; + protected KeyedStateBackend<Long> createKeyedStateBackend() throws Exception { + Configuration benchMarkConfig = ConfigUtil.loadBenchMarkConf(); + String stateDataDirPath = benchMarkConfig.getString(StateBenchmarkOptions.STATE_DATA_DIR); + File dataDir = null; + if (stateDataDirPath != null) { + dataDir = new File(stateDataDirPath); + if (!dataDir.exists()) { + Files.createDirectories(Paths.get(stateDataDirPath)); + } + } + return StateBackendBenchmarkUtils.createKeyedStateBackend(backendType, dataDir); + } + private static int getCurrentIndex() { int currentIndex = keyIndex.getAndIncrement(); if (currentIndex == Integer.MAX_VALUE) { diff --git a/src/main/java/org/apache/flink/state/benchmark/ValueStateBenchmark.java b/src/main/java/org/apache/flink/state/benchmark/ValueStateBenchmark.java index 92350d0..eacd12c 100644 --- a/src/main/java/org/apache/flink/state/benchmark/ValueStateBenchmark.java +++ b/src/main/java/org/apache/flink/state/benchmark/ValueStateBenchmark.java @@ -32,7 +32,6 @@ import org.openjdk.jmh.runner.options.VerboseMode; import java.io.IOException; import java.util.concurrent.atomic.AtomicInteger; -import static org.apache.flink.contrib.streaming.state.benchmark.StateBackendBenchmarkUtils.createKeyedStateBackend; import static org.apache.flink.contrib.streaming.state.benchmark.StateBackendBenchmarkUtils.getValueState; import static org.apache.flink.state.benchmark.StateBenchmarkConstants.setupKeyCount; @@ -52,7 +51,7 @@ public class ValueStateBenchmark extends StateBenchmarkBase { @Setup public void setUp() throws Exception { - keyedStateBackend = createKeyedStateBackend(backendType); + keyedStateBackend = createKeyedStateBackend(); valueState = getValueState(keyedStateBackend, new ValueStateDescriptor<>("kvState", Long.class)); for (int i = 0; i < setupKeyCount; ++i) { diff --git a/src/main/resources/benchmark-conf.yaml b/src/main/resources/benchmark-conf.yaml new file mode 100644 index 0000000..174acc5 --- /dev/null +++ b/src/main/resources/benchmark-conf.yaml @@ -0,0 +1,21 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# The dir to put state data during test. If not set, the system default temp dir will be used. + +#benchmark.state.data-dir: /tmp/flink-benchmark \ No newline at end of file diff --git a/src/test/java/org/apache/flink/config/ConfigUtilTest.java b/src/test/java/org/apache/flink/config/ConfigUtilTest.java new file mode 100644 index 0000000..9f3cbaa --- /dev/null +++ b/src/test/java/org/apache/flink/config/ConfigUtilTest.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.config; + +import org.apache.flink.configuration.Configuration; + +import org.junit.Assert; +import org.junit.Test; + +public class ConfigUtilTest { + + @Test + public void testLoadConf() { + Configuration cfg = ConfigUtil.loadBenchMarkConf(); + String dir = cfg.getString(StateBenchmarkOptions.STATE_DATA_DIR); + Assert.assertEquals("/tmp/data", dir); + } +} diff --git a/src/test/resources/benchmark-conf.yaml b/src/test/resources/benchmark-conf.yaml new file mode 100644 index 0000000..267da86 --- /dev/null +++ b/src/test/resources/benchmark-conf.yaml @@ -0,0 +1,19 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +benchmark.state.data-dir: /tmp/data \ No newline at end of file