This is an automated email from the ASF dual-hosted git repository.
biyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new be4a84917a [spark] Add bucket function and write benchmark (#5418)
be4a84917a is described below
commit be4a84917adc68f9fa23fa17f1ef82f287bd0684
Author: Zouxxyy <[email protected]>
AuthorDate: Tue Apr 8 18:29:43 2025 +0800
[spark] Add bucket function and write benchmark (#5418)
---
.../spark/benchmark/BucketFunctionBenchmark.scala | 44 ++++++++++++
.../spark/benchmark/PaimonSqlBasedBenchmark.scala | 63 +++++++++++++++++
.../paimon/spark/benchmark/WriteBenchmark.scala | 80 ++++++++++++++++++++++
.../apache/spark/sql/paimon/PaimonBenchmark.scala | 42 ++++++++++++
4 files changed, 229 insertions(+)
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/benchmark/BucketFunctionBenchmark.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/benchmark/BucketFunctionBenchmark.scala
new file mode 100644
index 0000000000..1ba618c6a1
--- /dev/null
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/benchmark/BucketFunctionBenchmark.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.benchmark
+
+import org.apache.spark.sql.paimon.PaimonBenchmark
+
+object BucketFunctionBenchmark extends PaimonSqlBasedBenchmark {
+
+ private val N = 20L * 1000 * 1000
+
+ override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
+ val benchmark = PaimonBenchmark(s"Bucket function", N, output = output)
+
+ benchmark.addCase("Single int column", 3) {
+ _ => spark.range(N).selectExpr("fixed_bucket(10, id)").noop()
+ }
+
+ benchmark.addCase("Single string column", 3) {
+ _ => spark.range(N).selectExpr("fixed_bucket(10, uuid())").noop()
+ }
+
+ benchmark.addCase("Multiple columns", 3) {
+ _ => spark.range(N).selectExpr("fixed_bucket(10, id, uuid(),
uuid())").noop()
+ }
+
+ benchmark.run()
+ }
+}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/benchmark/PaimonSqlBasedBenchmark.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/benchmark/PaimonSqlBasedBenchmark.scala
new file mode 100644
index 0000000000..d1301f144d
--- /dev/null
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/benchmark/PaimonSqlBasedBenchmark.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.benchmark
+
+import org.apache.paimon.spark.SparkCatalog
+import org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
+
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.execution.benchmark.SqlBasedBenchmark
+import org.apache.spark.sql.paimon.Utils
+
+import java.io.File
+
+trait PaimonSqlBasedBenchmark extends SqlBasedBenchmark {
+
+ protected lazy val tempDBDir: File = Utils.createTempDir
+
+ protected lazy val sql: String => DataFrame = spark.sql
+
+ override def getSparkSession: SparkSession = {
+ SparkSession
+ .builder()
+ .master("local[1]")
+ .appName(this.getClass.getCanonicalName)
+ .config("spark.ui.enabled", value = false)
+ .config("spark.sql.warehouse.dir", tempDBDir.getCanonicalPath)
+ .config("spark.sql.catalog.paimon", classOf[SparkCatalog].getName)
+ .config("spark.sql.catalog.paimon.warehouse", tempDBDir.getCanonicalPath)
+ .config("spark.sql.defaultCatalog", "paimon")
+ .config("spark.sql.extensions",
classOf[PaimonSparkSessionExtensions].getName)
+ .getOrCreate()
+ }
+
+ def withTempTable(tableNames: String*)(f: => Unit): Unit = {
+ try f
+ finally {
+ tableNames.foreach(spark.catalog.dropTempView)
+ }
+ }
+
+ def withTable(tableNames: String*)(f: => Unit): Unit = {
+ try f
+ finally {
+ tableNames.foreach(name => sql(s"DROP TABLE IF EXISTS $name"))
+ }
+ }
+}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/benchmark/WriteBenchmark.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/benchmark/WriteBenchmark.scala
new file mode 100644
index 0000000000..c27c626c0b
--- /dev/null
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/benchmark/WriteBenchmark.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.benchmark
+
+import org.apache.spark.sql.paimon.PaimonBenchmark
+
+object WriteBenchmark extends PaimonSqlBasedBenchmark {
+
+ private val N = 10L * 1000 * 1000
+ private val sourceTable = "source"
+
+ def writeNonPKTable(keys: String, i: Int, benchmark: PaimonBenchmark): Unit
= {
+ writeTable(isPKTable = false, keys, i, benchmark)
+ }
+
+ def writeTable(
+ isPKTable: Boolean,
+ keys: String,
+ bucket: Int,
+ benchmark: PaimonBenchmark): Unit = {
+ benchmark.addCase(s"write table, isPKTable: $isPKTable, keys: $keys,
bucket: $bucket", 3) {
+ _ =>
+ val tableName = "targetTable"
+ val keyProp = if (keys.isEmpty) {
+ ""
+ } else if (isPKTable) {
+ s"'primary-key' = '$keys',"
+ } else {
+ s"'bucket-key' = '$keys',"
+ }
+ withTable(tableName) {
+ sql(s"""
+ |CREATE TABLE $tableName (id INT, s1 STRING, s2 STRING) USING
paimon
+ |TBLPROPERTIES (
+ | $keyProp
+ | 'bucket' = '$bucket'
+ |)
+ |""".stripMargin)
+ sql(s"INSERT INTO $tableName SELECT * FROM $sourceTable")
+ }
+ }
+ }
+
+ override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
+ val benchmark = PaimonBenchmark(s"Write paimon table", N, output = output)
+
+ withTempTable(sourceTable) {
+ spark
+ .range(N)
+ .selectExpr("id", "uuid() as s1", "uuid() as s2")
+ .createOrReplaceTempView(sourceTable)
+
+ // fixed bucket
+ writeNonPKTable("id", 10, benchmark)
+ writeNonPKTable("s1", 10, benchmark)
+ writeNonPKTable("id,s1", 10, benchmark)
+
+ // unaware bucket
+ writeNonPKTable("", -1, benchmark)
+
+ benchmark.run()
+ }
+ }
+}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/spark/sql/paimon/PaimonBenchmark.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/spark/sql/paimon/PaimonBenchmark.scala
new file mode 100644
index 0000000000..1fd795367b
--- /dev/null
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/spark/sql/paimon/PaimonBenchmark.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.paimon
+
+import org.apache.spark.benchmark.Benchmark
+
+import java.io.OutputStream
+
+import scala.concurrent.duration._
+
+case class PaimonBenchmark(
+ name: String,
+ valuesPerIteration: Long,
+ minNumIters: Int = 2,
+ warmupTime: FiniteDuration = 2.seconds,
+ minTime: FiniteDuration = 2.seconds,
+ outputPerIteration: Boolean = false,
+ output: Option[OutputStream] = None)
+ extends Benchmark(
+ name,
+ valuesPerIteration,
+ minNumIters,
+ warmupTime,
+ minTime,
+ outputPerIteration,
+ output)