This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 8af95e62 [FLINK-29702] Add micro benchmarks module and merge tree 
reader/writer benchmarks
8af95e62 is described below

commit 8af95e62a9286e78f135b2ad24f2d61b54d6f7e9
Author: shammon <zjur...@gmail.com>
AuthorDate: Wed Oct 26 19:34:01 2022 +0800

    [FLINK-29702] Add micro benchmarks module and merge tree reader/writer 
benchmarks
    
    This closes #326
---
 .../flink-table-store-cluster-benchmark/pom.xml    |   2 +-
 .../flink-table-store-micro-benchmarks/README.md   |  41 +++
 .../flink-table-store-micro-benchmarks/pom.xml     | 287 +++++++++++++++++++++
 .../table/store/benchmark/config/ConfigUtil.java   | 119 +++++++++
 .../benchmark/config/FileBenchmarkOptions.java     |  50 ++++
 .../file/mergetree/MergeTreeBenchmark.java         | 269 +++++++++++++++++++
 .../file/mergetree/MergeTreeReaderBenchmark.java   | 114 ++++++++
 .../file/mergetree/MergeTreeWriterBenchmark.java   |  66 +++++
 .../src/main/resources/benchmark-conf.yaml         |  25 ++
 .../src/main/resources/log4j.properties            |  25 ++
 flink-table-store-benchmark/pom.xml                |   6 +
 11 files changed, 1003 insertions(+), 1 deletion(-)

diff --git 
a/flink-table-store-benchmark/flink-table-store-cluster-benchmark/pom.xml 
b/flink-table-store-benchmark/flink-table-store-cluster-benchmark/pom.xml
index 856a666a..16526808 100644
--- a/flink-table-store-benchmark/flink-table-store-cluster-benchmark/pom.xml
+++ b/flink-table-store-benchmark/flink-table-store-cluster-benchmark/pom.xml
@@ -47,7 +47,7 @@ under the License.
         <dependency>
             <groupId>org.apache.httpcomponents</groupId>
             <artifactId>httpclient</artifactId>
-            <version>4.5.13</version>
+            <version>${httpclient.version}</version>
         </dependency>
 
         <!-- Flink dependencies -->
diff --git 
a/flink-table-store-benchmark/flink-table-store-micro-benchmarks/README.md 
b/flink-table-store-benchmark/flink-table-store-micro-benchmarks/README.md
new file mode 100644
index 00000000..5790f188
--- /dev/null
+++ b/flink-table-store-benchmark/flink-table-store-micro-benchmarks/README.md
@@ -0,0 +1,41 @@
+# flink-table-store-micro-benchmarks
+
+This project contains sets of micro benchmarks designed to run on a single 
machine to
+help flink table store developers assess performance implications of their 
changes.
+
+The main methods defined in the various classes are using 
[jmh](http://openjdk.java.net/projects/code-tools/jmh/) micro
+benchmark suite to define runners to execute those cases. You can execute the
+default benchmark suite at once as follows:
+
+1. Build and install in `flink-table-store` project.
+```
+mvn clean install -DskipTests
+```
+2. Build and run micro benchmarks in `flink-table-store-micro-benchmarks` 
project.
+```
+mvn clean install -DskipTests exec:exec
+```
+
+If you want to execute just one benchmark, the best approach is to execute 
selected main function manually.
+There are mainly three ways:
+
+1. From your IDE (hint there is a plugin for Intellij IDEA).
+
+2. From command line, using command like:
+   ```
+   mvn clean install -DskipTests exec:exec \
+    -Dbenchmarks="<benchmark_class>"
+   ```
+
+   An example benchmark_class can be `MergeTreeReaderBenchmark` to measure the 
performance of merge tree reader.
+
+3. Run the uber jar directly like:
+
+    ```
+    java -jar target/benchmarks.jar -rf csv "<benchmark_class>"
+    ```
+
+## Configuration
+
+Besides the parameters, there is also a benchmark config file 
`benchmark-conf.yaml` to tune some basic parameters.
+For example, we can change the file data dir by putting 
`benchmark.file.base-data-dir: /data` in the config file. For more options, you 
can refer to the options in `FileBenchmarkOptions` and flink table store. 
diff --git 
a/flink-table-store-benchmark/flink-table-store-micro-benchmarks/pom.xml 
b/flink-table-store-benchmark/flink-table-store-micro-benchmarks/pom.xml
new file mode 100644
index 00000000..b5991a42
--- /dev/null
+++ b/flink-table-store-benchmark/flink-table-store-micro-benchmarks/pom.xml
@@ -0,0 +1,287 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>flink-table-store-benchmark</artifactId>
+        <groupId>org.apache.flink</groupId>
+        <version>0.3-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>flink-table-store-micro-benchmarks</artifactId>
+    <name>Flink Table Store : Micro Benchmark</name>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-slf4j-impl</artifactId>
+            <version>${log4j.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-api</artifactId>
+            <version>${log4j.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-core</artifactId>
+            <version>${log4j.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-1.2-api</artifactId>
+            <version>${log4j.version}</version>
+        </dependency>
+
+        <!-- Orc and parquet dependencies -->
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-hdfs-client</artifactId>
+            <version>${hadoop.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <version>${hadoop.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-mapreduce-client-core</artifactId>
+            <version>${hadoop.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.xerial.snappy</groupId>
+            <artifactId>snappy-java</artifactId>
+            <version>${snappy.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.avro</groupId>
+            <artifactId>avro</artifactId>
+            <version>${avro.version}</version>
+        </dependency>
+
+        <!-- JMH dependencies -->
+
+        <dependency>
+            <groupId>org.openjdk.jmh</groupId>
+            <artifactId>jmh-core</artifactId>
+            <version>${jmh.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.openjdk.jmh</groupId>
+            <artifactId>jmh-generator-annprocess</artifactId>
+            <version>${jmh.version}</version>
+        </dependency>
+
+        <!-- Flink Table Store dependencies -->
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-store-shade</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>${flink.sql.parquet}</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+
+    </dependencies>
+
+    <!-- This is copied from flink-benchmarks and updated for 
flink-table-store-micro-benchmarks. -->
+
+    <profiles>
+        <profile>
+            <id>test</id>
+            <activation>
+                <activeByDefault>false</activeByDefault>
+            </activation>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>org.codehaus.mojo</groupId>
+                        <artifactId>exec-maven-plugin</artifactId>
+                        <version>${maven.exec.version}</version>
+                        <executions>
+                            <execution>
+                                <id>test-benchmarks</id>
+                                <phase>test</phase>
+                                <goals>
+                                    <goal>exec</goal>
+                                </goals>
+                            </execution>
+                        </executions>
+                        <configuration>
+                            <skip>${skipTests}</skip>
+                            <classpathScope>test</classpathScope>
+                            <executable>${executableJava}</executable>
+                            <arguments>
+                                <argument>-Xmx6g</argument>
+                                <argument>-classpath</argument>
+                                <classpath/>
+                                <argument>org.openjdk.jmh.Main</argument>
+                                <!--shouldFailOnError-->
+                                <argument>-foe</argument>
+                                <argument>true</argument>
+                                <!--speed up tests-->
+                                <argument>-f</argument>
+                                <argument>1</argument>
+                                <argument>-i</argument>
+                                <argument>1</argument>
+                                <argument>-wi</argument>
+                                <argument>0</argument>
+                                <argument>-rf</argument>
+                                <argument>csv</argument>
+                                <argument>.*</argument>
+                            </arguments>
+                        </configuration>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+
+        <profile>
+            <id>benchmark</id>
+            <activation>
+                <activeByDefault>true</activeByDefault>
+                <property>
+                    <name>benchmarks</name>
+                </property>
+            </activation>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>org.codehaus.mojo</groupId>
+                        <artifactId>exec-maven-plugin</artifactId>
+                        <version>${maven.exec.version}</version>
+                        <executions>
+                            <execution>
+                                <id>run-benchmarks</id>
+                                <goals>
+                                    <goal>exec</goal>
+                                </goals>
+                            </execution>
+                        </executions>
+                        <configuration>
+                            <classpathScope>test</classpathScope>
+                            <executable>${executableJava}</executable>
+                            <arguments>
+                                <argument>-classpath</argument>
+                                <classpath/>
+                                <argument>org.openjdk.jmh.Main</argument>
+                                <!--shouldFailOnError-->
+                                <argument>-foe</argument>
+                                <argument>true</argument>
+                                <argument>-rf</argument>
+                                <argument>csv</argument>
+                                <argument>${benchmarks}</argument>
+                            </arguments>
+                        </configuration>
+                    </plugin>
+
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-shade-plugin</artifactId>
+                        <version>3.2.0</version>
+                        <executions>
+                            <execution>
+                                <phase>package</phase>
+                                <goals>
+                                    <goal>shade</goal>
+                                </goals>
+                                <configuration>
+                                    <finalName>benchmarks</finalName>
+                                    <transformers>
+                                        <transformer 
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                            
<mainClass>org.openjdk.jmh.Main</mainClass>
+                                        </transformer>
+                                        <transformer 
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
+                                            <resource>reference.conf</resource>
+                                        </transformer>
+                                        <!-- The service transformer is needed 
to merge META-INF/services files -->
+                                        <transformer 
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+                                        <transformer 
implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer">
+                                            <projectName>Flink Table 
Store</projectName>
+                                        </transformer>
+                                    </transformers>
+                                    <filters>
+                                        <filter>
+                                            <artifact>*</artifact>
+                                            <excludes>
+                                                
<exclude>META-INF/*.SF</exclude>
+                                                
<exclude>META-INF/*.DSA</exclude>
+                                                
<exclude>META-INF/*.RSA</exclude>
+                                            </excludes>
+                                        </filter>
+                                    </filters>
+                                </configuration>
+                            </execution>
+                        </executions>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+        <profile>
+            <id>custom-benchmark</id>
+            <activation>
+                <property>
+                    <name>benchmarks</name>
+                </property>
+            </activation>
+            <properties>
+                <benchmarkExcludes>""</benchmarkExcludes>
+            </properties>
+        </profile>
+    </profiles>
+</project>
\ No newline at end of file
diff --git 
a/flink-table-store-benchmark/flink-table-store-micro-benchmarks/src/main/java/org/apache/flink/table/store/benchmark/config/ConfigUtil.java
 
b/flink-table-store-benchmark/flink-table-store-micro-benchmarks/src/main/java/org/apache/flink/table/store/benchmark/config/ConfigUtil.java
new file mode 100644
index 00000000..dec6b714
--- /dev/null
+++ 
b/flink-table-store-benchmark/flink-table-store-micro-benchmarks/src/main/java/org/apache/flink/table/store/benchmark/config/ConfigUtil.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.benchmark.config;
+
+import org.apache.flink.configuration.Configuration;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.UUID;
+
+/**
+ * Config utils to load benchmark config from yaml in classpath, the main 
class is copied from
+ * {@code ConfigUtil} in flink-benchmarks.
+ */
+public class ConfigUtil {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ConfigUtil.class);
+
+    private static final String BENCHMARK_CONF = "benchmark-conf.yaml";
+
+    /** Load benchmark conf from classpath. */
+    public static Configuration loadBenchMarkConf() {
+        InputStream inputStream =
+                
ConfigUtil.class.getClassLoader().getResourceAsStream(BENCHMARK_CONF);
+        return loadYAMLResource(inputStream);
+    }
+
+    /**
+     * This is copied from {@code GlobalConfiguration#loadYAMLResource} to 
avoid depending
+     * on @Internal api.
+     */
+    private static Configuration loadYAMLResource(InputStream inputStream) {
+        final Configuration config = new Configuration();
+
+        try (BufferedReader reader = new BufferedReader(new 
InputStreamReader(inputStream))) {
+
+            String line;
+            int lineNo = 0;
+            while ((line = reader.readLine()) != null) {
+                lineNo++;
+                // 1. check for comments
+                String[] comments = line.split("#", 2);
+                String conf = comments[0].trim();
+
+                // 2. get key and value
+                if (conf.length() > 0) {
+                    String[] kv = conf.split(": ", 2);
+
+                    // skip line with no valid key-value pair
+                    if (kv.length == 1) {
+                        LOG.warn(
+                                "Error while trying to split key and value in 
configuration file "
+                                        + ":"
+                                        + lineNo
+                                        + ": \""
+                                        + line
+                                        + "\"");
+                        continue;
+                    }
+
+                    String key = kv[0].trim();
+                    String value = kv[1].trim();
+
+                    // sanity check
+                    if (key.length() == 0 || value.length() == 0) {
+                        LOG.warn(
+                                "Error after splitting key and value in 
configuration file "
+                                        + ":"
+                                        + lineNo
+                                        + ": \""
+                                        + line
+                                        + "\"");
+                        continue;
+                    }
+
+                    LOG.info("Loading configuration property: {}, {}", key, 
value);
+                    config.setString(key, value);
+                }
+            }
+        } catch (IOException e) {
+            throw new RuntimeException("Error parsing YAML configuration.", e);
+        }
+
+        return config;
+    }
+
+    /**
+     * Create file data dir from given configuration.
+     *
+     * @param configuration the configuration
+     * @return the file data dir
+     */
+    public static String createFileDataDir(Configuration configuration) {
+        return configuration.get(FileBenchmarkOptions.FILE_DATA_BASE_DIR)
+                + "/"
+                + UUID.randomUUID().toString();
+    }
+}
diff --git 
a/flink-table-store-benchmark/flink-table-store-micro-benchmarks/src/main/java/org/apache/flink/table/store/benchmark/config/FileBenchmarkOptions.java
 
b/flink-table-store-benchmark/flink-table-store-micro-benchmarks/src/main/java/org/apache/flink/table/store/benchmark/config/FileBenchmarkOptions.java
new file mode 100644
index 00000000..d27bd8b9
--- /dev/null
+++ 
b/flink-table-store-benchmark/flink-table-store-micro-benchmarks/src/main/java/org/apache/flink/table/store/benchmark/config/FileBenchmarkOptions.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.benchmark.config;
+
+import org.apache.flink.configuration.ConfigOption;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/** Benchmark options for file. */
+public class FileBenchmarkOptions {
+    public static final ConfigOption<String> FILE_DATA_BASE_DIR =
+            key("benchmark.file.base-data-dir")
+                    .stringType()
+                    .defaultValue("/tmp/flink-table-store-micro-benchmarks")
+                    .withDescription("The dir to put state data.");
+
+    public static final ConfigOption<Long> TARGET_FILE_SIZE =
+            key("benchmark.file.target-size")
+                    .longType()
+                    .defaultValue(1024 * 1024L)
+                    .withDescription("The target file size.");
+
+    public static final ConfigOption<Integer> WRITER_BATCH_COUNT =
+            key("benchmark.writer.batch-count")
+                    .intType()
+                    .defaultValue(50)
+                    .withDescription("The number of batches to be written by 
writer.");
+
+    public static final ConfigOption<Integer> WRITER_RECORD_COUNT_PER_BATCH =
+            key("benchmark.writer.record-count-per-batch")
+                    .intType()
+                    .defaultValue(50000)
+                    .withDescription("The number of records to be written in 
one batch by writer.");
+}
diff --git 
a/flink-table-store-benchmark/flink-table-store-micro-benchmarks/src/main/java/org/apache/flink/table/store/benchmark/file/mergetree/MergeTreeBenchmark.java
 
b/flink-table-store-benchmark/flink-table-store-micro-benchmarks/src/main/java/org/apache/flink/table/store/benchmark/file/mergetree/MergeTreeBenchmark.java
new file mode 100644
index 00000000..dd9e17de
--- /dev/null
+++ 
b/flink-table-store-benchmark/flink-table-store-micro-benchmarks/src/main/java/org/apache/flink/table/store/benchmark/file/mergetree/MergeTreeBenchmark.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.benchmark.file.mergetree;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowDataUtil;
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.benchmark.config.ConfigUtil;
+import org.apache.flink.table.store.benchmark.config.FileBenchmarkOptions;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.io.DataFileMeta;
+import org.apache.flink.table.store.file.io.KeyValueFileReaderFactory;
+import org.apache.flink.table.store.file.io.KeyValueFileWriterFactory;
+import org.apache.flink.table.store.file.io.RollingFileWriter;
+import org.apache.flink.table.store.file.memory.HeapMemorySegmentPool;
+import org.apache.flink.table.store.file.mergetree.Levels;
+import org.apache.flink.table.store.file.mergetree.MergeTreeReader;
+import org.apache.flink.table.store.file.mergetree.MergeTreeWriter;
+import org.apache.flink.table.store.file.mergetree.compact.CompactRewriter;
+import org.apache.flink.table.store.file.mergetree.compact.CompactStrategy;
+import 
org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
+import 
org.apache.flink.table.store.file.mergetree.compact.MergeTreeCompactManager;
+import org.apache.flink.table.store.file.mergetree.compact.UniversalCompaction;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.RecordReaderIterator;
+import org.apache.flink.table.store.file.utils.RecordWriter;
+import org.apache.flink.table.store.format.FileFormat;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+
+import org.apache.commons.io.FileUtils;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static java.util.Collections.singletonList;
+import static 
org.apache.flink.table.store.file.utils.FileStorePathFactory.PARTITION_DEFAULT_NAME;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Base class for merge tree benchmark. */
+@SuppressWarnings("MethodMayBeStatic")
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@State(Scope.Thread)
+@BenchmarkMode(Mode.Throughput)
+@Fork(3)
+@Threads(1)
+@Warmup(iterations = 10)
+@Measurement(iterations = 10)
+public class MergeTreeBenchmark {
+    @Param({"avro", "orc", "parquet"})
+    protected String format;
+
+    protected ExecutorService service;
+    protected File file;
+    protected Comparator<RowData> comparator;
+    protected CoreOptions options;
+    protected KeyValueFileReaderFactory readerFactory;
+    private KeyValueFileReaderFactory compactReaderFactory;
+    protected KeyValueFileWriterFactory writerFactory;
+    private KeyValueFileWriterFactory compactWriterFactory;
+    protected List<DataFileMeta> compactedFiles;
+
+    protected RecordWriter<KeyValue> writer;
+    protected int batchCount;
+    protected int countPerBatch;
+    protected KeyValue kv = new KeyValue();
+    protected long sequenceNumber = 0;
+
+    protected void createRecordWriter() {
+        Configuration configuration = ConfigUtil.loadBenchMarkConf();
+        batchCount = 
configuration.get(FileBenchmarkOptions.WRITER_BATCH_COUNT);
+        countPerBatch = 
configuration.get(FileBenchmarkOptions.WRITER_RECORD_COUNT_PER_BATCH);
+        service = Executors.newSingleThreadExecutor();
+        file = new File(ConfigUtil.createFileDataDir(configuration));
+        comparator = Comparator.comparingInt(o -> o.getInt(0));
+        compactedFiles = new ArrayList<>();
+
+        Path path = new org.apache.flink.core.fs.Path(file.toURI());
+        FileStorePathFactory pathFactory =
+                new FileStorePathFactory(
+                        path, RowType.of(), 
configuration.get(PARTITION_DEFAULT_NAME), format);
+        writer = recreateMergeTree(configuration, path, pathFactory);
+    }
+
+    private RecordWriter<KeyValue> recreateMergeTree(
+            Configuration configuration, Path path, FileStorePathFactory 
pathFactory) {
+        options = new CoreOptions(configuration);
+        RowType keyType = new RowType(singletonList(new RowType.RowField("k", 
new IntType())));
+        RowType valueType = new RowType(singletonList(new 
RowType.RowField("v", new IntType())));
+        FileFormat flushingFormat = FileFormat.fromIdentifier(format, new 
Configuration());
+        KeyValueFileReaderFactory.Builder readerBuilder =
+                KeyValueFileReaderFactory.builder(
+                        new SchemaManager(path),
+                        0,
+                        keyType,
+                        valueType,
+                        flushingFormat,
+                        pathFactory);
+        readerFactory = readerBuilder.build(BinaryRowDataUtil.EMPTY_ROW, 0);
+        compactReaderFactory = 
readerBuilder.build(BinaryRowDataUtil.EMPTY_ROW, 0);
+        KeyValueFileWriterFactory.Builder writerBuilder =
+                KeyValueFileWriterFactory.builder(
+                        0,
+                        keyType,
+                        valueType,
+                        flushingFormat,
+                        pathFactory,
+                        options.targetFileSize());
+        writerFactory = writerBuilder.build(BinaryRowDataUtil.EMPTY_ROW, 0);
+        compactWriterFactory = 
writerBuilder.build(BinaryRowDataUtil.EMPTY_ROW, 0);
+        return createMergeTreeWriter(Collections.emptyList());
+    }
+
+    private MergeTreeWriter createMergeTreeWriter(List<DataFileMeta> files) {
+        long maxSequenceNumber =
+                
files.stream().map(DataFileMeta::maxSequenceNumber).max(Long::compare).orElse(-1L);
+        MergeTreeWriter writer =
+                new MergeTreeWriter(
+                        false,
+                        128,
+                        null,
+                        createCompactManager(service, files),
+                        maxSequenceNumber,
+                        comparator,
+                        new DeduplicateMergeFunction(),
+                        writerFactory,
+                        options.commitForceCompact(),
+                        CoreOptions.ChangelogProducer.NONE);
+        writer.setMemoryPool(
+                new HeapMemorySegmentPool(options.writeBufferSize(), 
options.pageSize()));
+        return writer;
+    }
+
+    private MergeTreeCompactManager createCompactManager(
+            ExecutorService compactExecutor, List<DataFileMeta> files) {
+        CompactStrategy strategy =
+                new UniversalCompaction(
+                        options.maxSizeAmplificationPercent(),
+                        options.sortedRunSizeRatio(),
+                        options.numSortedRunCompactionTrigger(),
+                        options.maxSortedRunNum());
+        CompactRewriter rewriter =
+                (outputLevel, dropDelete, sections) -> {
+                    RollingFileWriter<KeyValue, DataFileMeta> writer =
+                            
compactWriterFactory.createRollingMergeTreeFileWriter(outputLevel);
+                    writer.write(
+                            new RecordReaderIterator<>(
+                                    new MergeTreeReader(
+                                            sections,
+                                            dropDelete,
+                                            compactReaderFactory,
+                                            comparator,
+                                            new DeduplicateMergeFunction())));
+                    writer.close();
+                    return writer.result();
+                };
+        return new MergeTreeCompactManager(
+                compactExecutor,
+                new Levels(comparator, files, options.numLevels()),
+                strategy,
+                comparator,
+                options.targetFileSize(),
+                options.numSortedRunStopTrigger(),
+                rewriter);
+    }
+
+    protected void mergeCompacted(
+            Set<String> newFileNames,
+            List<DataFileMeta> compactedFiles,
+            RecordWriter.CommitIncrement increment) {
+        increment.newFilesIncrement().newFiles().stream()
+                .map(DataFileMeta::fileName)
+                .forEach(newFileNames::add);
+        compactedFiles.addAll(increment.newFilesIncrement().newFiles());
+        Set<String> afterFiles =
+                increment.compactIncrement().compactAfter().stream()
+                        .map(DataFileMeta::fileName)
+                        .collect(Collectors.toSet());
+        for (DataFileMeta file : increment.compactIncrement().compactBefore()) 
{
+            checkState(compactedFiles.remove(file));
+            // See MergeTreeWriter.updateCompactResult
+            if (!newFileNames.contains(file.fileName()) && 
!afterFiles.contains(file.fileName())) {
+                compactWriterFactory.deleteFile(file.fileName());
+            }
+        }
+        compactedFiles.addAll(increment.compactIncrement().compactAfter());
+    }
+
+    protected void cleanUp() throws Exception {
+        if (file != null) {
+            FileUtils.forceDeleteOnExit(file);
+            file = null;
+        }
+        if (service != null) {
+            service.shutdown();
+            service = null;
+        }
+        compactedFiles.clear();
+    }
+
+    /** Key value data to be written in {@link MergeTreeWriterBenchmark}. */
+    @State(Scope.Thread)
+    public static class KeyValueData {
+        GenericRowData key;
+        RowKind kind;
+        GenericRowData value;
+
+        @Setup(Level.Invocation)
+        public void kvSetup() {
+            key = new GenericRowData(1);
+            key.setField(0, ThreadLocalRandom.current().nextInt());
+
+            kind = RowKind.INSERT;
+
+            value = new GenericRowData(1);
+            value.setField(0, ThreadLocalRandom.current().nextInt());
+        }
+
+        @TearDown(Level.Invocation)
+        public void kvTearDown() {
+            key = null;
+            kind = null;
+            value = null;
+        }
+    }
+}
diff --git 
a/flink-table-store-benchmark/flink-table-store-micro-benchmarks/src/main/java/org/apache/flink/table/store/benchmark/file/mergetree/MergeTreeReaderBenchmark.java
 
b/flink-table-store-benchmark/flink-table-store-micro-benchmarks/src/main/java/org/apache/flink/table/store/benchmark/file/mergetree/MergeTreeReaderBenchmark.java
new file mode 100644
index 00000000..62dd8646
--- /dev/null
+++ 
b/flink-table-store-benchmark/flink-table-store-micro-benchmarks/src/main/java/org/apache/flink/table/store/benchmark/file/mergetree/MergeTreeReaderBenchmark.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.benchmark.file.mergetree;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.mergetree.MergeTreeReader;
+import 
org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
+import org.apache.flink.table.store.file.mergetree.compact.IntervalPartition;
+import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.store.file.utils.RecordReaderIterator;
+import org.apache.flink.table.store.file.utils.RecordWriter;
+import org.apache.flink.types.RowKind;
+
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+import org.openjdk.jmh.runner.options.VerboseMode;
+
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+
+/** Benchmark for merge tree reader. */
+public class MergeTreeReaderBenchmark extends MergeTreeBenchmark {
+
+    public static void main(String[] args) throws Exception {
+        Options opt =
+                new OptionsBuilder()
+                        .verbosity(VerboseMode.NORMAL)
+                        .include(".*" + 
MergeTreeReaderBenchmark.class.getCanonicalName() + ".*")
+                        .build();
+
+        new Runner(opt).run();
+    }
+
+    @Setup
+    public void setUp() throws Exception {
+        createRecordWriter();
+        try {
+            KeyValue kv = new KeyValue();
+            Set<String> newFileNames = new HashSet<>();
+            Random random = new Random();
+            for (int k = 0; k < batchCount; k++) {
+                for (int i = 0; i < countPerBatch; i++) {
+                    GenericRowData key = new GenericRowData(1);
+                    key.setField(0, random.nextInt());
+                    key.setRowKind(RowKind.INSERT);
+
+                    GenericRowData value = new GenericRowData(1);
+                    value.setField(0, random.nextInt());
+
+                    kv.replace(key, sequenceNumber++, RowKind.INSERT, value);
+                    writer.write(kv);
+                }
+
+                RecordWriter.CommitIncrement increment = 
writer.prepareCommit(false);
+                mergeCompacted(newFileNames, compactedFiles, increment);
+            }
+
+            writer.close();
+        } catch (Exception e) {
+            cleanUp();
+            throw e;
+        }
+    }
+
+    @TearDown
+    public void tearDown() throws Exception {
+        cleanUp();
+    }
+
+    @Benchmark
+    @BenchmarkMode(Mode.SingleShotTime)
+    public long scanAll() throws Exception {
+        try (RecordReader<KeyValue> reader =
+                new MergeTreeReader(
+                        new IntervalPartition(compactedFiles, 
comparator).partition(),
+                        true,
+                        readerFactory,
+                        comparator,
+                        new DeduplicateMergeFunction())) {
+            long sum = 0;
+            try (RecordReaderIterator<KeyValue> iterator = new 
RecordReaderIterator<>(reader)) {
+                while (iterator.hasNext()) {
+                    KeyValue kv = iterator.next();
+                    sum += kv.value().getInt(0);
+                }
+            }
+            return sum;
+        }
+    }
+}
diff --git 
a/flink-table-store-benchmark/flink-table-store-micro-benchmarks/src/main/java/org/apache/flink/table/store/benchmark/file/mergetree/MergeTreeWriterBenchmark.java
 
b/flink-table-store-benchmark/flink-table-store-micro-benchmarks/src/main/java/org/apache/flink/table/store/benchmark/file/mergetree/MergeTreeWriterBenchmark.java
new file mode 100644
index 00000000..a24ca0ad
--- /dev/null
+++ 
b/flink-table-store-benchmark/flink-table-store-micro-benchmarks/src/main/java/org/apache/flink/table/store/benchmark/file/mergetree/MergeTreeWriterBenchmark.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.benchmark.file.mergetree;
+
+import org.apache.flink.table.store.file.utils.RecordWriter;
+
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+import org.openjdk.jmh.runner.options.VerboseMode;
+
+import java.util.HashSet;
+
+/** Benchmark for merge tree writer with compaction. */
+public class MergeTreeWriterBenchmark extends MergeTreeBenchmark {
+
+    public static void main(String[] args) throws Exception {
+        Options opt =
+                new OptionsBuilder()
+                        .verbosity(VerboseMode.NORMAL)
+                        .include(".*" + 
MergeTreeWriterBenchmark.class.getCanonicalName() + ".*")
+                        .build();
+
+        new Runner(opt).run();
+    }
+
+    @Setup
+    public void setUp() {
+        createRecordWriter();
+    }
+
+    @Benchmark
+    public void write(KeyValueData data) throws Exception {
+        kv.replace(data.key, sequenceNumber++, data.kind, data.value);
+        writer.write(kv);
+        if (sequenceNumber % countPerBatch == 0) {
+            RecordWriter.CommitIncrement increment = 
writer.prepareCommit(false);
+            mergeCompacted(new HashSet<>(), compactedFiles, increment);
+        }
+    }
+
+    @TearDown
+    public void tearDown() throws Exception {
+        writer.close();
+        cleanUp();
+    }
+}
diff --git 
a/flink-table-store-benchmark/flink-table-store-micro-benchmarks/src/main/resources/benchmark-conf.yaml
 
b/flink-table-store-benchmark/flink-table-store-micro-benchmarks/src/main/resources/benchmark-conf.yaml
new file mode 100644
index 00000000..d3245de5
--- /dev/null
+++ 
b/flink-table-store-benchmark/flink-table-store-micro-benchmarks/src/main/resources/benchmark-conf.yaml
@@ -0,0 +1,25 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# The base dir to put file data during test. If not set, the system default 
temp dir will be used.
+
+#benchmark.file.base-data-dir: /tmp/flink-table-store-micro-benchmark
+page-size: 4kb
+write-buffer-size: 16kb
+target-file-size: 128kb
+
diff --git 
a/flink-table-store-benchmark/flink-table-store-micro-benchmarks/src/main/resources/log4j.properties
 
b/flink-table-store-benchmark/flink-table-store-micro-benchmarks/src/main/resources/log4j.properties
new file mode 100644
index 00000000..b077baca
--- /dev/null
+++ 
b/flink-table-store-benchmark/flink-table-store-micro-benchmarks/src/main/resources/log4j.properties
@@ -0,0 +1,25 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=ERROR, logger
+log4j.appender.logger=org.apache.log4j.ConsoleAppender
+log4j.appender.logger.target = System.out
+log4j.appender.logger.layout=org.apache.log4j.PatternLayout
+log4j.appender.logger.layout.ConversionPattern=%-5p - %m%n
+
+
diff --git a/flink-table-store-benchmark/pom.xml 
b/flink-table-store-benchmark/pom.xml
index 9d38e215..fc879b9c 100644
--- a/flink-table-store-benchmark/pom.xml
+++ b/flink-table-store-benchmark/pom.xml
@@ -35,11 +35,17 @@ under the License.
 
     <modules>
         <module>flink-table-store-cluster-benchmark</module>
+        <module>flink-table-store-micro-benchmarks</module>
     </modules>
 
     <properties>
         <commons.io.version>2.7</commons.io.version>
         <commons.cli.version>1.3.1</commons.cli.version>
+        <httpclient.version>4.5.13</httpclient.version>
+        <avro.version>1.10.0</avro.version>
+        <jmh.version>1.35</jmh.version>
+        <maven.exec.version>1.6.0</maven.exec.version>
+        <executableJava>java</executableJava>
     </properties>
 
 </project>
\ No newline at end of file


Reply via email to