This is an automated email from the ASF dual-hosted git repository.

biyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new be4a84917a [spark] Add bucket function and write benchmark (#5418)
be4a84917a is described below

commit be4a84917adc68f9fa23fa17f1ef82f287bd0684
Author: Zouxxyy <[email protected]>
AuthorDate: Tue Apr 8 18:29:43 2025 +0800

    [spark] Add bucket function and write benchmark (#5418)
---
 .../spark/benchmark/BucketFunctionBenchmark.scala  | 44 ++++++++++++
 .../spark/benchmark/PaimonSqlBasedBenchmark.scala  | 63 +++++++++++++++++
 .../paimon/spark/benchmark/WriteBenchmark.scala    | 80 ++++++++++++++++++++++
 .../apache/spark/sql/paimon/PaimonBenchmark.scala  | 42 ++++++++++++
 4 files changed, 229 insertions(+)

diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/benchmark/BucketFunctionBenchmark.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/benchmark/BucketFunctionBenchmark.scala
new file mode 100644
index 0000000000..1ba618c6a1
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/benchmark/BucketFunctionBenchmark.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.benchmark
+
+import org.apache.spark.sql.paimon.PaimonBenchmark
+
+object BucketFunctionBenchmark extends PaimonSqlBasedBenchmark {
+
+  private val N = 20L * 1000 * 1000
+
+  override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
+    val benchmark = PaimonBenchmark(s"Bucket function", N, output = output)
+
+    benchmark.addCase("Single int column", 3) {
+      _ => spark.range(N).selectExpr("fixed_bucket(10, id)").noop()
+    }
+
+    benchmark.addCase("Single string column", 3) {
+      _ => spark.range(N).selectExpr("fixed_bucket(10, uuid())").noop()
+    }
+
+    benchmark.addCase("Multiple columns", 3) {
+      _ => spark.range(N).selectExpr("fixed_bucket(10, id, uuid(), 
uuid())").noop()
+    }
+
+    benchmark.run()
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/benchmark/PaimonSqlBasedBenchmark.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/benchmark/PaimonSqlBasedBenchmark.scala
new file mode 100644
index 0000000000..d1301f144d
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/benchmark/PaimonSqlBasedBenchmark.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.benchmark
+
+import org.apache.paimon.spark.SparkCatalog
+import org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
+
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.execution.benchmark.SqlBasedBenchmark
+import org.apache.spark.sql.paimon.Utils
+
+import java.io.File
+
+trait PaimonSqlBasedBenchmark extends SqlBasedBenchmark {
+
+  protected lazy val tempDBDir: File = Utils.createTempDir
+
+  protected lazy val sql: String => DataFrame = spark.sql
+
+  override def getSparkSession: SparkSession = {
+    SparkSession
+      .builder()
+      .master("local[1]")
+      .appName(this.getClass.getCanonicalName)
+      .config("spark.ui.enabled", value = false)
+      .config("spark.sql.warehouse.dir", tempDBDir.getCanonicalPath)
+      .config("spark.sql.catalog.paimon", classOf[SparkCatalog].getName)
+      .config("spark.sql.catalog.paimon.warehouse", tempDBDir.getCanonicalPath)
+      .config("spark.sql.defaultCatalog", "paimon")
+      .config("spark.sql.extensions", 
classOf[PaimonSparkSessionExtensions].getName)
+      .getOrCreate()
+  }
+
+  def withTempTable(tableNames: String*)(f: => Unit): Unit = {
+    try f
+    finally {
+      tableNames.foreach(spark.catalog.dropTempView)
+    }
+  }
+
+  def withTable(tableNames: String*)(f: => Unit): Unit = {
+    try f
+    finally {
+      tableNames.foreach(name => sql(s"DROP TABLE IF EXISTS $name"))
+    }
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/benchmark/WriteBenchmark.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/benchmark/WriteBenchmark.scala
new file mode 100644
index 0000000000..c27c626c0b
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/benchmark/WriteBenchmark.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.benchmark
+
+import org.apache.spark.sql.paimon.PaimonBenchmark
+
+object WriteBenchmark extends PaimonSqlBasedBenchmark {
+
+  private val N = 10L * 1000 * 1000
+  private val sourceTable = "source"
+
+  def writeNonPKTable(keys: String, i: Int, benchmark: PaimonBenchmark): Unit 
= {
+    writeTable(isPKTable = false, keys, i, benchmark)
+  }
+
+  def writeTable(
+      isPKTable: Boolean,
+      keys: String,
+      bucket: Int,
+      benchmark: PaimonBenchmark): Unit = {
+    benchmark.addCase(s"write table, isPKTable: $isPKTable, keys: $keys, 
bucket: $bucket", 3) {
+      _ =>
+        val tableName = "targetTable"
+        val keyProp = if (keys.isEmpty) {
+          ""
+        } else if (isPKTable) {
+          s"'primary-key' = '$keys',"
+        } else {
+          s"'bucket-key' = '$keys',"
+        }
+        withTable(tableName) {
+          sql(s"""
+                 |CREATE TABLE $tableName (id INT, s1 STRING, s2 STRING) USING 
paimon
+                 |TBLPROPERTIES (
+                 | $keyProp
+                 | 'bucket' = '$bucket'
+                 |)
+                 |""".stripMargin)
+          sql(s"INSERT INTO $tableName SELECT * FROM $sourceTable")
+        }
+    }
+  }
+
+  override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
+    val benchmark = PaimonBenchmark(s"Write paimon table", N, output = output)
+
+    withTempTable(sourceTable) {
+      spark
+        .range(N)
+        .selectExpr("id", "uuid() as s1", "uuid() as s2")
+        .createOrReplaceTempView(sourceTable)
+
+      // fixed bucket
+      writeNonPKTable("id", 10, benchmark)
+      writeNonPKTable("s1", 10, benchmark)
+      writeNonPKTable("id,s1", 10, benchmark)
+
+      // unaware bucket
+      writeNonPKTable("", -1, benchmark)
+
+      benchmark.run()
+    }
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/spark/sql/paimon/PaimonBenchmark.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/spark/sql/paimon/PaimonBenchmark.scala
new file mode 100644
index 0000000000..1fd795367b
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/spark/sql/paimon/PaimonBenchmark.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.paimon
+
+import org.apache.spark.benchmark.Benchmark
+
+import java.io.OutputStream
+
+import scala.concurrent.duration._
+
+case class PaimonBenchmark(
+    name: String,
+    valuesPerIteration: Long,
+    minNumIters: Int = 2,
+    warmupTime: FiniteDuration = 2.seconds,
+    minTime: FiniteDuration = 2.seconds,
+    outputPerIteration: Boolean = false,
+    output: Option[OutputStream] = None)
+  extends Benchmark(
+    name,
+    valuesPerIteration,
+    minNumIters,
+    warmupTime,
+    minTime,
+    outputPerIteration,
+    output)

Reply via email to