This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push: new 8af95e62 [FLINK-29702] Add micro benchmarks module and merge tree reader/writer benchmarks 8af95e62 is described below commit 8af95e62a9286e78f135b2ad24f2d61b54d6f7e9 Author: shammon <zjur...@gmail.com> AuthorDate: Wed Oct 26 19:34:01 2022 +0800 [FLINK-29702] Add micro benchmarks module and merge tree reader/writer benchmarks This closes #326 --- .../flink-table-store-cluster-benchmark/pom.xml | 2 +- .../flink-table-store-micro-benchmarks/README.md | 41 +++ .../flink-table-store-micro-benchmarks/pom.xml | 287 +++++++++++++++++++++ .../table/store/benchmark/config/ConfigUtil.java | 119 +++++++++ .../benchmark/config/FileBenchmarkOptions.java | 50 ++++ .../file/mergetree/MergeTreeBenchmark.java | 269 +++++++++++++++++++ .../file/mergetree/MergeTreeReaderBenchmark.java | 114 ++++++++ .../file/mergetree/MergeTreeWriterBenchmark.java | 66 +++++ .../src/main/resources/benchmark-conf.yaml | 25 ++ .../src/main/resources/log4j.properties | 25 ++ flink-table-store-benchmark/pom.xml | 6 + 11 files changed, 1003 insertions(+), 1 deletion(-) diff --git a/flink-table-store-benchmark/flink-table-store-cluster-benchmark/pom.xml b/flink-table-store-benchmark/flink-table-store-cluster-benchmark/pom.xml index 856a666a..16526808 100644 --- a/flink-table-store-benchmark/flink-table-store-cluster-benchmark/pom.xml +++ b/flink-table-store-benchmark/flink-table-store-cluster-benchmark/pom.xml @@ -47,7 +47,7 @@ under the License. <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> - <version>4.5.13</version> + <version>${httpclient.version}</version> </dependency> <!-- Flink dependencies --> diff --git a/flink-table-store-benchmark/flink-table-store-micro-benchmarks/README.md b/flink-table-store-benchmark/flink-table-store-micro-benchmarks/README.md new file mode 100644 index 00000000..5790f188 --- /dev/null +++ b/flink-table-store-benchmark/flink-table-store-micro-benchmarks/README.md @@ -0,0 +1,41 @@ +# flink-table-store-micro-benchmarks + +This project contains sets of micro benchmarks designed to run on a single machine to +help flink table store developers assess performance implications of their changes. + +The main methods defined in the various classes are using [jmh](http://openjdk.java.net/projects/code-tools/jmh/) micro +benchmark suite to define runners to execute those cases. You can execute the +default benchmark suite at once as follows: + +1. Build and install in `flink-table-store` project. +``` +mvn clean install -DskipTests +``` +2. Build and run micro benchmarks in `flink-table-store-micro-benchmarks` project. +``` +mvn clean install -DskipTests exec:exec +``` + +If you want to execute just one benchmark, the best approach is to execute selected main function manually. +There are mainly three ways: + +1. From your IDE (hint there is a plugin for Intellij IDEA). + +2. From command line, using command like: + ``` + mvn clean install -DskipTests exec:exec \ + -Dbenchmarks="<benchmark_class>" + ``` + + An example benchmark_class can be `MergeTreeReaderBenchmark` to measure the performance of merge tree reader. + +3. Run the uber jar directly like: + + ``` + java -jar target/benchmarks.jar -rf csv "<benchmark_class>" + ``` + +## Configuration + +Besides the parameters, there is also a benchmark config file `benchmark-conf.yaml` to tune some basic parameters. +For example, we can change the file data dir by putting `benchmark.file.base-data-dir: /data` in the config file. For more options, you can refer to the options in `FileBenchmarkOptions` and flink table store. diff --git a/flink-table-store-benchmark/flink-table-store-micro-benchmarks/pom.xml b/flink-table-store-benchmark/flink-table-store-micro-benchmarks/pom.xml new file mode 100644 index 00000000..b5991a42 --- /dev/null +++ b/flink-table-store-benchmark/flink-table-store-micro-benchmarks/pom.xml @@ -0,0 +1,287 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <artifactId>flink-table-store-benchmark</artifactId> + <groupId>org.apache.flink</groupId> + <version>0.3-SNAPSHOT</version> + </parent> + + <artifactId>flink-table-store-micro-benchmarks</artifactId> + <name>Flink Table Store : Micro Benchmark</name> + + <dependencies> + + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-slf4j-impl</artifactId> + <version>${log4j.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-api</artifactId> + <version>${log4j.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-core</artifactId> + <version>${log4j.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-1.2-api</artifactId> + <version>${log4j.version}</version> + </dependency> + + <!-- Orc and parquet dependencies --> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs-client</artifactId> + <version>${hadoop.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${hadoop.version}</version> + <exclusions> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + <version>${hadoop.version}</version> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.xerial.snappy</groupId> + <artifactId>snappy-java</artifactId> + <version>${snappy.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + <version>${avro.version}</version> + </dependency> + + <!-- JMH dependencies --> + + <dependency> + <groupId>org.openjdk.jmh</groupId> + <artifactId>jmh-core</artifactId> + <version>${jmh.version}</version> + </dependency> + + <dependency> + <groupId>org.openjdk.jmh</groupId> + <artifactId>jmh-generator-annprocess</artifactId> + <version>${jmh.version}</version> + </dependency> + + <!-- Flink Table Store dependencies --> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-store-shade</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>${flink.sql.parquet}</artifactId> + <version>${flink.version}</version> + </dependency> + + </dependencies> + + <!-- This is copied from flink-benchmarks and updated for flink-table-store-micro-benchmarks. --> + + <profiles> + <profile> + <id>test</id> + <activation> + <activeByDefault>false</activeByDefault> + </activation> + <build> + <plugins> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>exec-maven-plugin</artifactId> + <version>${maven.exec.version}</version> + <executions> + <execution> + <id>test-benchmarks</id> + <phase>test</phase> + <goals> + <goal>exec</goal> + </goals> + </execution> + </executions> + <configuration> + <skip>${skipTests}</skip> + <classpathScope>test</classpathScope> + <executable>${executableJava}</executable> + <arguments> + <argument>-Xmx6g</argument> + <argument>-classpath</argument> + <classpath/> + <argument>org.openjdk.jmh.Main</argument> + <!--shouldFailOnError--> + <argument>-foe</argument> + <argument>true</argument> + <!--speed up tests--> + <argument>-f</argument> + <argument>1</argument> + <argument>-i</argument> + <argument>1</argument> + <argument>-wi</argument> + <argument>0</argument> + <argument>-rf</argument> + <argument>csv</argument> + <argument>.*</argument> + </arguments> + </configuration> + </plugin> + </plugins> + </build> + </profile> + + <profile> + <id>benchmark</id> + <activation> + <activeByDefault>true</activeByDefault> + <property> + <name>benchmarks</name> + </property> + </activation> + <build> + <plugins> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>exec-maven-plugin</artifactId> + <version>${maven.exec.version}</version> + <executions> + <execution> + <id>run-benchmarks</id> + <goals> + <goal>exec</goal> + </goals> + </execution> + </executions> + <configuration> + <classpathScope>test</classpathScope> + <executable>${executableJava}</executable> + <arguments> + <argument>-classpath</argument> + <classpath/> + <argument>org.openjdk.jmh.Main</argument> + <!--shouldFailOnError--> + <argument>-foe</argument> + <argument>true</argument> + <argument>-rf</argument> + <argument>csv</argument> + <argument>${benchmarks}</argument> + </arguments> + </configuration> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>3.2.0</version> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <finalName>benchmarks</finalName> + <transformers> + <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> + <mainClass>org.openjdk.jmh.Main</mainClass> + </transformer> + <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> + <resource>reference.conf</resource> + </transformer> + <!-- The service transformer is needed to merge META-INF/services files --> + <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> + <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"> + <projectName>Flink Table Store</projectName> + </transformer> + </transformers> + <filters> + <filter> + <artifact>*</artifact> + <excludes> + <exclude>META-INF/*.SF</exclude> + <exclude>META-INF/*.DSA</exclude> + <exclude>META-INF/*.RSA</exclude> + </excludes> + </filter> + </filters> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + <profile> + <id>custom-benchmark</id> + <activation> + <property> + <name>benchmarks</name> + </property> + </activation> + <properties> + <benchmarkExcludes>""</benchmarkExcludes> + </properties> + </profile> + </profiles> +</project> \ No newline at end of file diff --git a/flink-table-store-benchmark/flink-table-store-micro-benchmarks/src/main/java/org/apache/flink/table/store/benchmark/config/ConfigUtil.java b/flink-table-store-benchmark/flink-table-store-micro-benchmarks/src/main/java/org/apache/flink/table/store/benchmark/config/ConfigUtil.java new file mode 100644 index 00000000..dec6b714 --- /dev/null +++ b/flink-table-store-benchmark/flink-table-store-micro-benchmarks/src/main/java/org/apache/flink/table/store/benchmark/config/ConfigUtil.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.benchmark.config; + +import org.apache.flink.configuration.Configuration; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.UUID; + +/** + * Config utils to load benchmark config from yaml in classpath, the main class is copied from + * {@code ConfigUtil} in flink-benchmarks. + */ +public class ConfigUtil { + + private static final Logger LOG = LoggerFactory.getLogger(ConfigUtil.class); + + private static final String BENCHMARK_CONF = "benchmark-conf.yaml"; + + /** Load benchmark conf from classpath. */ + public static Configuration loadBenchMarkConf() { + InputStream inputStream = + ConfigUtil.class.getClassLoader().getResourceAsStream(BENCHMARK_CONF); + return loadYAMLResource(inputStream); + } + + /** + * This is copied from {@code GlobalConfiguration#loadYAMLResource} to avoid depending + * on @Internal api. + */ + private static Configuration loadYAMLResource(InputStream inputStream) { + final Configuration config = new Configuration(); + + try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) { + + String line; + int lineNo = 0; + while ((line = reader.readLine()) != null) { + lineNo++; + // 1. check for comments + String[] comments = line.split("#", 2); + String conf = comments[0].trim(); + + // 2. get key and value + if (conf.length() > 0) { + String[] kv = conf.split(": ", 2); + + // skip line with no valid key-value pair + if (kv.length == 1) { + LOG.warn( + "Error while trying to split key and value in configuration file " + + ":" + + lineNo + + ": \"" + + line + + "\""); + continue; + } + + String key = kv[0].trim(); + String value = kv[1].trim(); + + // sanity check + if (key.length() == 0 || value.length() == 0) { + LOG.warn( + "Error after splitting key and value in configuration file " + + ":" + + lineNo + + ": \"" + + line + + "\""); + continue; + } + + LOG.info("Loading configuration property: {}, {}", key, value); + config.setString(key, value); + } + } + } catch (IOException e) { + throw new RuntimeException("Error parsing YAML configuration.", e); + } + + return config; + } + + /** + * Create file data dir from given configuration. + * + * @param configuration the configuration + * @return the file data dir + */ + public static String createFileDataDir(Configuration configuration) { + return configuration.get(FileBenchmarkOptions.FILE_DATA_BASE_DIR) + + "/" + + UUID.randomUUID().toString(); + } +} diff --git a/flink-table-store-benchmark/flink-table-store-micro-benchmarks/src/main/java/org/apache/flink/table/store/benchmark/config/FileBenchmarkOptions.java b/flink-table-store-benchmark/flink-table-store-micro-benchmarks/src/main/java/org/apache/flink/table/store/benchmark/config/FileBenchmarkOptions.java new file mode 100644 index 00000000..d27bd8b9 --- /dev/null +++ b/flink-table-store-benchmark/flink-table-store-micro-benchmarks/src/main/java/org/apache/flink/table/store/benchmark/config/FileBenchmarkOptions.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.benchmark.config; + +import org.apache.flink.configuration.ConfigOption; + +import static org.apache.flink.configuration.ConfigOptions.key; + +/** Benchmark options for file. */ +public class FileBenchmarkOptions { + public static final ConfigOption<String> FILE_DATA_BASE_DIR = + key("benchmark.file.base-data-dir") + .stringType() + .defaultValue("/tmp/flink-table-store-micro-benchmarks") + .withDescription("The dir to put state data."); + + public static final ConfigOption<Long> TARGET_FILE_SIZE = + key("benchmark.file.target-size") + .longType() + .defaultValue(1024 * 1024L) + .withDescription("The target file size."); + + public static final ConfigOption<Integer> WRITER_BATCH_COUNT = + key("benchmark.writer.batch-count") + .intType() + .defaultValue(50) + .withDescription("The number of batches to be written by writer."); + + public static final ConfigOption<Integer> WRITER_RECORD_COUNT_PER_BATCH = + key("benchmark.writer.record-count-per-batch") + .intType() + .defaultValue(50000) + .withDescription("The number of records to be written in one batch by writer."); +} diff --git a/flink-table-store-benchmark/flink-table-store-micro-benchmarks/src/main/java/org/apache/flink/table/store/benchmark/file/mergetree/MergeTreeBenchmark.java b/flink-table-store-benchmark/flink-table-store-micro-benchmarks/src/main/java/org/apache/flink/table/store/benchmark/file/mergetree/MergeTreeBenchmark.java new file mode 100644 index 00000000..dd9e17de --- /dev/null +++ b/flink-table-store-benchmark/flink-table-store-micro-benchmarks/src/main/java/org/apache/flink/table/store/benchmark/file/mergetree/MergeTreeBenchmark.java @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.benchmark.file.mergetree; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.binary.BinaryRowDataUtil; +import org.apache.flink.table.store.CoreOptions; +import org.apache.flink.table.store.benchmark.config.ConfigUtil; +import org.apache.flink.table.store.benchmark.config.FileBenchmarkOptions; +import org.apache.flink.table.store.file.KeyValue; +import org.apache.flink.table.store.file.io.DataFileMeta; +import org.apache.flink.table.store.file.io.KeyValueFileReaderFactory; +import org.apache.flink.table.store.file.io.KeyValueFileWriterFactory; +import org.apache.flink.table.store.file.io.RollingFileWriter; +import org.apache.flink.table.store.file.memory.HeapMemorySegmentPool; +import org.apache.flink.table.store.file.mergetree.Levels; +import org.apache.flink.table.store.file.mergetree.MergeTreeReader; +import org.apache.flink.table.store.file.mergetree.MergeTreeWriter; +import org.apache.flink.table.store.file.mergetree.compact.CompactRewriter; +import org.apache.flink.table.store.file.mergetree.compact.CompactStrategy; +import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction; +import org.apache.flink.table.store.file.mergetree.compact.MergeTreeCompactManager; +import org.apache.flink.table.store.file.mergetree.compact.UniversalCompaction; +import org.apache.flink.table.store.file.schema.SchemaManager; +import org.apache.flink.table.store.file.utils.FileStorePathFactory; +import org.apache.flink.table.store.file.utils.RecordReaderIterator; +import org.apache.flink.table.store.file.utils.RecordWriter; +import org.apache.flink.table.store.format.FileFormat; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.types.RowKind; + +import org.apache.commons.io.FileUtils; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static java.util.Collections.singletonList; +import static org.apache.flink.table.store.file.utils.FileStorePathFactory.PARTITION_DEFAULT_NAME; +import static org.apache.flink.util.Preconditions.checkState; + +/** Base class for merge tree benchmark. */ +@SuppressWarnings("MethodMayBeStatic") +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@State(Scope.Thread) +@BenchmarkMode(Mode.Throughput) +@Fork(3) +@Threads(1) +@Warmup(iterations = 10) +@Measurement(iterations = 10) +public class MergeTreeBenchmark { + @Param({"avro", "orc", "parquet"}) + protected String format; + + protected ExecutorService service; + protected File file; + protected Comparator<RowData> comparator; + protected CoreOptions options; + protected KeyValueFileReaderFactory readerFactory; + private KeyValueFileReaderFactory compactReaderFactory; + protected KeyValueFileWriterFactory writerFactory; + private KeyValueFileWriterFactory compactWriterFactory; + protected List<DataFileMeta> compactedFiles; + + protected RecordWriter<KeyValue> writer; + protected int batchCount; + protected int countPerBatch; + protected KeyValue kv = new KeyValue(); + protected long sequenceNumber = 0; + + protected void createRecordWriter() { + Configuration configuration = ConfigUtil.loadBenchMarkConf(); + batchCount = configuration.get(FileBenchmarkOptions.WRITER_BATCH_COUNT); + countPerBatch = configuration.get(FileBenchmarkOptions.WRITER_RECORD_COUNT_PER_BATCH); + service = Executors.newSingleThreadExecutor(); + file = new File(ConfigUtil.createFileDataDir(configuration)); + comparator = Comparator.comparingInt(o -> o.getInt(0)); + compactedFiles = new ArrayList<>(); + + Path path = new org.apache.flink.core.fs.Path(file.toURI()); + FileStorePathFactory pathFactory = + new FileStorePathFactory( + path, RowType.of(), configuration.get(PARTITION_DEFAULT_NAME), format); + writer = recreateMergeTree(configuration, path, pathFactory); + } + + private RecordWriter<KeyValue> recreateMergeTree( + Configuration configuration, Path path, FileStorePathFactory pathFactory) { + options = new CoreOptions(configuration); + RowType keyType = new RowType(singletonList(new RowType.RowField("k", new IntType()))); + RowType valueType = new RowType(singletonList(new RowType.RowField("v", new IntType()))); + FileFormat flushingFormat = FileFormat.fromIdentifier(format, new Configuration()); + KeyValueFileReaderFactory.Builder readerBuilder = + KeyValueFileReaderFactory.builder( + new SchemaManager(path), + 0, + keyType, + valueType, + flushingFormat, + pathFactory); + readerFactory = readerBuilder.build(BinaryRowDataUtil.EMPTY_ROW, 0); + compactReaderFactory = readerBuilder.build(BinaryRowDataUtil.EMPTY_ROW, 0); + KeyValueFileWriterFactory.Builder writerBuilder = + KeyValueFileWriterFactory.builder( + 0, + keyType, + valueType, + flushingFormat, + pathFactory, + options.targetFileSize()); + writerFactory = writerBuilder.build(BinaryRowDataUtil.EMPTY_ROW, 0); + compactWriterFactory = writerBuilder.build(BinaryRowDataUtil.EMPTY_ROW, 0); + return createMergeTreeWriter(Collections.emptyList()); + } + + private MergeTreeWriter createMergeTreeWriter(List<DataFileMeta> files) { + long maxSequenceNumber = + files.stream().map(DataFileMeta::maxSequenceNumber).max(Long::compare).orElse(-1L); + MergeTreeWriter writer = + new MergeTreeWriter( + false, + 128, + null, + createCompactManager(service, files), + maxSequenceNumber, + comparator, + new DeduplicateMergeFunction(), + writerFactory, + options.commitForceCompact(), + CoreOptions.ChangelogProducer.NONE); + writer.setMemoryPool( + new HeapMemorySegmentPool(options.writeBufferSize(), options.pageSize())); + return writer; + } + + private MergeTreeCompactManager createCompactManager( + ExecutorService compactExecutor, List<DataFileMeta> files) { + CompactStrategy strategy = + new UniversalCompaction( + options.maxSizeAmplificationPercent(), + options.sortedRunSizeRatio(), + options.numSortedRunCompactionTrigger(), + options.maxSortedRunNum()); + CompactRewriter rewriter = + (outputLevel, dropDelete, sections) -> { + RollingFileWriter<KeyValue, DataFileMeta> writer = + compactWriterFactory.createRollingMergeTreeFileWriter(outputLevel); + writer.write( + new RecordReaderIterator<>( + new MergeTreeReader( + sections, + dropDelete, + compactReaderFactory, + comparator, + new DeduplicateMergeFunction()))); + writer.close(); + return writer.result(); + }; + return new MergeTreeCompactManager( + compactExecutor, + new Levels(comparator, files, options.numLevels()), + strategy, + comparator, + options.targetFileSize(), + options.numSortedRunStopTrigger(), + rewriter); + } + + protected void mergeCompacted( + Set<String> newFileNames, + List<DataFileMeta> compactedFiles, + RecordWriter.CommitIncrement increment) { + increment.newFilesIncrement().newFiles().stream() + .map(DataFileMeta::fileName) + .forEach(newFileNames::add); + compactedFiles.addAll(increment.newFilesIncrement().newFiles()); + Set<String> afterFiles = + increment.compactIncrement().compactAfter().stream() + .map(DataFileMeta::fileName) + .collect(Collectors.toSet()); + for (DataFileMeta file : increment.compactIncrement().compactBefore()) { + checkState(compactedFiles.remove(file)); + // See MergeTreeWriter.updateCompactResult + if (!newFileNames.contains(file.fileName()) && !afterFiles.contains(file.fileName())) { + compactWriterFactory.deleteFile(file.fileName()); + } + } + compactedFiles.addAll(increment.compactIncrement().compactAfter()); + } + + protected void cleanUp() throws Exception { + if (file != null) { + FileUtils.forceDeleteOnExit(file); + file = null; + } + if (service != null) { + service.shutdown(); + service = null; + } + compactedFiles.clear(); + } + + /** Key value data to be written in {@link MergeTreeWriterBenchmark}. */ + @State(Scope.Thread) + public static class KeyValueData { + GenericRowData key; + RowKind kind; + GenericRowData value; + + @Setup(Level.Invocation) + public void kvSetup() { + key = new GenericRowData(1); + key.setField(0, ThreadLocalRandom.current().nextInt()); + + kind = RowKind.INSERT; + + value = new GenericRowData(1); + value.setField(0, ThreadLocalRandom.current().nextInt()); + } + + @TearDown(Level.Invocation) + public void kvTearDown() { + key = null; + kind = null; + value = null; + } + } +} diff --git a/flink-table-store-benchmark/flink-table-store-micro-benchmarks/src/main/java/org/apache/flink/table/store/benchmark/file/mergetree/MergeTreeReaderBenchmark.java b/flink-table-store-benchmark/flink-table-store-micro-benchmarks/src/main/java/org/apache/flink/table/store/benchmark/file/mergetree/MergeTreeReaderBenchmark.java new file mode 100644 index 00000000..62dd8646 --- /dev/null +++ b/flink-table-store-benchmark/flink-table-store-micro-benchmarks/src/main/java/org/apache/flink/table/store/benchmark/file/mergetree/MergeTreeReaderBenchmark.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.benchmark.file.mergetree; + +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.store.file.KeyValue; +import org.apache.flink.table.store.file.mergetree.MergeTreeReader; +import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction; +import org.apache.flink.table.store.file.mergetree.compact.IntervalPartition; +import org.apache.flink.table.store.file.utils.RecordReader; +import org.apache.flink.table.store.file.utils.RecordReaderIterator; +import org.apache.flink.table.store.file.utils.RecordWriter; +import org.apache.flink.types.RowKind; + +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; +import org.openjdk.jmh.runner.options.VerboseMode; + +import java.util.HashSet; +import java.util.Random; +import java.util.Set; + +/** Benchmark for merge tree reader. */ +public class MergeTreeReaderBenchmark extends MergeTreeBenchmark { + + public static void main(String[] args) throws Exception { + Options opt = + new OptionsBuilder() + .verbosity(VerboseMode.NORMAL) + .include(".*" + MergeTreeReaderBenchmark.class.getCanonicalName() + ".*") + .build(); + + new Runner(opt).run(); + } + + @Setup + public void setUp() throws Exception { + createRecordWriter(); + try { + KeyValue kv = new KeyValue(); + Set<String> newFileNames = new HashSet<>(); + Random random = new Random(); + for (int k = 0; k < batchCount; k++) { + for (int i = 0; i < countPerBatch; i++) { + GenericRowData key = new GenericRowData(1); + key.setField(0, random.nextInt()); + key.setRowKind(RowKind.INSERT); + + GenericRowData value = new GenericRowData(1); + value.setField(0, random.nextInt()); + + kv.replace(key, sequenceNumber++, RowKind.INSERT, value); + writer.write(kv); + } + + RecordWriter.CommitIncrement increment = writer.prepareCommit(false); + mergeCompacted(newFileNames, compactedFiles, increment); + } + + writer.close(); + } catch (Exception e) { + cleanUp(); + throw e; + } + } + + @TearDown + public void tearDown() throws Exception { + cleanUp(); + } + + @Benchmark + @BenchmarkMode(Mode.SingleShotTime) + public long scanAll() throws Exception { + try (RecordReader<KeyValue> reader = + new MergeTreeReader( + new IntervalPartition(compactedFiles, comparator).partition(), + true, + readerFactory, + comparator, + new DeduplicateMergeFunction())) { + long sum = 0; + try (RecordReaderIterator<KeyValue> iterator = new RecordReaderIterator<>(reader)) { + while (iterator.hasNext()) { + KeyValue kv = iterator.next(); + sum += kv.value().getInt(0); + } + } + return sum; + } + } +} diff --git a/flink-table-store-benchmark/flink-table-store-micro-benchmarks/src/main/java/org/apache/flink/table/store/benchmark/file/mergetree/MergeTreeWriterBenchmark.java b/flink-table-store-benchmark/flink-table-store-micro-benchmarks/src/main/java/org/apache/flink/table/store/benchmark/file/mergetree/MergeTreeWriterBenchmark.java new file mode 100644 index 00000000..a24ca0ad --- /dev/null +++ b/flink-table-store-benchmark/flink-table-store-micro-benchmarks/src/main/java/org/apache/flink/table/store/benchmark/file/mergetree/MergeTreeWriterBenchmark.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.benchmark.file.mergetree; + +import org.apache.flink.table.store.file.utils.RecordWriter; + +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; +import org.openjdk.jmh.runner.options.VerboseMode; + +import java.util.HashSet; + +/** Benchmark for merge tree writer with compaction. */ +public class MergeTreeWriterBenchmark extends MergeTreeBenchmark { + + public static void main(String[] args) throws Exception { + Options opt = + new OptionsBuilder() + .verbosity(VerboseMode.NORMAL) + .include(".*" + MergeTreeWriterBenchmark.class.getCanonicalName() + ".*") + .build(); + + new Runner(opt).run(); + } + + @Setup + public void setUp() { + createRecordWriter(); + } + + @Benchmark + public void write(KeyValueData data) throws Exception { + kv.replace(data.key, sequenceNumber++, data.kind, data.value); + writer.write(kv); + if (sequenceNumber % countPerBatch == 0) { + RecordWriter.CommitIncrement increment = writer.prepareCommit(false); + mergeCompacted(new HashSet<>(), compactedFiles, increment); + } + } + + @TearDown + public void tearDown() throws Exception { + writer.close(); + cleanUp(); + } +} diff --git a/flink-table-store-benchmark/flink-table-store-micro-benchmarks/src/main/resources/benchmark-conf.yaml b/flink-table-store-benchmark/flink-table-store-micro-benchmarks/src/main/resources/benchmark-conf.yaml new file mode 100644 index 00000000..d3245de5 --- /dev/null +++ b/flink-table-store-benchmark/flink-table-store-micro-benchmarks/src/main/resources/benchmark-conf.yaml @@ -0,0 +1,25 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# The base dir to put file data during test. If not set, the system default temp dir will be used. + +#benchmark.file.base-data-dir: /tmp/flink-table-store-micro-benchmark +page-size: 4kb +write-buffer-size: 16kb +target-file-size: 128kb + diff --git a/flink-table-store-benchmark/flink-table-store-micro-benchmarks/src/main/resources/log4j.properties b/flink-table-store-benchmark/flink-table-store-micro-benchmarks/src/main/resources/log4j.properties new file mode 100644 index 00000000..b077baca --- /dev/null +++ b/flink-table-store-benchmark/flink-table-store-micro-benchmarks/src/main/resources/log4j.properties @@ -0,0 +1,25 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +log4j.rootLogger=ERROR, logger +log4j.appender.logger=org.apache.log4j.ConsoleAppender +log4j.appender.logger.target = System.out +log4j.appender.logger.layout=org.apache.log4j.PatternLayout +log4j.appender.logger.layout.ConversionPattern=%-5p - %m%n + + diff --git a/flink-table-store-benchmark/pom.xml b/flink-table-store-benchmark/pom.xml index 9d38e215..fc879b9c 100644 --- a/flink-table-store-benchmark/pom.xml +++ b/flink-table-store-benchmark/pom.xml @@ -35,11 +35,17 @@ under the License. <modules> <module>flink-table-store-cluster-benchmark</module> + <module>flink-table-store-micro-benchmarks</module> </modules> <properties> <commons.io.version>2.7</commons.io.version> <commons.cli.version>1.3.1</commons.cli.version> + <httpclient.version>4.5.13</httpclient.version> + <avro.version>1.10.0</avro.version> + <jmh.version>1.35</jmh.version> + <maven.exec.version>1.6.0</maven.exec.version> + <executableJava>java</executableJava> </properties> </project> \ No newline at end of file