This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 85eeda8d72 [spark] Restrict bucket creation to not exceed max bucket
limit (#5661)
85eeda8d72 is described below
commit 85eeda8d726e8a7ead2313ad3d4ad2dd08c89fd3
Author: Zouxxyy <[email protected]>
AuthorDate: Tue May 27 07:52:30 2025 +0800
[spark] Restrict bucket creation to not exceed max bucket limit (#5661)
---
.../org/apache/paimon/index/PartitionIndex.java | 21 +++++--------
.../paimon/spark/commands/PaimonSparkWriter.scala | 35 +++++++++++++---------
.../paimon/spark/sql/DynamicBucketTableTest.scala | 22 ++++++++++++++
3 files changed, 51 insertions(+), 27 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/index/PartitionIndex.java
b/paimon-core/src/main/java/org/apache/paimon/index/PartitionIndex.java
index decc1f12f1..8241270a87 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/PartitionIndex.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/PartitionIndex.java
@@ -91,21 +91,16 @@ public class PartitionIndex {
}
}
- if (-1 == maxBucketsNum || totalBucketSet.isEmpty() || maxBucketId <
maxBucketsNum - 1) {
+ int globalMaxBucketId = (maxBucketsNum == -1 ? Short.MAX_VALUE :
maxBucketsNum) - 1;
+ if (totalBucketSet.isEmpty() || maxBucketId < globalMaxBucketId) {
// 3. create a new bucket
- for (int i = 0; i < Short.MAX_VALUE; i++) {
+ for (int i = 0; i <= globalMaxBucketId; i++) {
if (bucketFilter.test(i) && !totalBucketSet.contains(i)) {
- // The new bucketId may still be larger than the upper
bound
- if (-1 == maxBucketsNum || i <= maxBucketsNum - 1) {
- nonFullBucketInformation.put(i, 1L);
- totalBucketSet.add(i);
- totalBucketArray.add(i);
- hash2Bucket.put(hash, (short) i);
- return i;
- } else {
- // No need to enter the next iteration when upper
bound exceeded
- break;
- }
+ nonFullBucketInformation.put(i, 1L);
+ totalBucketSet.add(i);
+ totalBucketArray.add(i);
+ hash2Bucket.put(hash, (short) i);
+ return i;
}
}
if (-1 == maxBucketsNum) {
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index 23e90df92b..ca1158e69a 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -58,12 +58,13 @@ case class PaimonSparkWriter(table: FileStoreTable) {
private lazy val bucketMode = table.bucketMode
+ private lazy val coreOptions = table.coreOptions()
+
private lazy val disableReportStats = {
- val options = table.coreOptions()
- val config = options.toConfiguration
+ val config = coreOptions.toConfiguration
config.get(CoreOptions.PARTITION_IDLE_TIME_TO_REPORT_STATISTIC).toMillis
<= 0 ||
table.partitionKeys.isEmpty ||
- !options.partitionedTableInMetastore ||
+ !coreOptions.partitionedTableInMetastore ||
table.catalogEnvironment.partitionHandler() == null
}
@@ -163,7 +164,7 @@ case class PaimonSparkWriter(table: FileStoreTable) {
case CROSS_PARTITION =>
// Topology: input -> bootstrap -> shuffle by key hash ->
bucket-assigner -> shuffle by partition & bucket
val rowType =
SparkTypeUtils.toPaimonType(withInitBucketCol.schema).asInstanceOf[RowType]
- val assignerParallelism =
Option(table.coreOptions.dynamicBucketAssignerParallelism)
+ val assignerParallelism =
Option(coreOptions.dynamicBucketAssignerParallelism)
.map(_.toInt)
.getOrElse(sparkParallelism)
val bootstrapped = bootstrapAndRepartitionByKeyHash(
@@ -186,10 +187,17 @@ case class PaimonSparkWriter(table: FileStoreTable) {
writeWithBucket(repartitioned)
case HASH_DYNAMIC =>
- val assignerParallelism =
Option(table.coreOptions.dynamicBucketAssignerParallelism)
- .map(_.toInt)
- .getOrElse(sparkParallelism)
- val numAssigners =
Option(table.coreOptions.dynamicBucketInitialBuckets)
+ val assignerParallelism = {
+ val parallelism =
Option(coreOptions.dynamicBucketAssignerParallelism)
+ .map(_.toInt)
+ .getOrElse(sparkParallelism)
+ if (coreOptions.dynamicBucketMaxBuckets() != -1) {
+ Math.min(coreOptions.dynamicBucketMaxBuckets().toInt, parallelism)
+ } else {
+ parallelism
+ }
+ }
+ val numAssigners = Option(coreOptions.dynamicBucketInitialBuckets)
.map(initialBuckets => Math.min(initialBuckets.toInt,
assignerParallelism))
.getOrElse(assignerParallelism)
@@ -212,8 +220,8 @@ case class PaimonSparkWriter(table: FileStoreTable) {
new SimpleHashBucketAssigner(
numAssigners,
TaskContext.getPartitionId(),
- table.coreOptions.dynamicBucketTargetRowNum,
- table.coreOptions.dynamicBucketMaxBuckets
+ coreOptions.dynamicBucketTargetRowNum,
+ coreOptions.dynamicBucketMaxBuckets
)
row => {
val sparkRow = new SparkRow(rowType, row)
@@ -242,7 +250,7 @@ case class PaimonSparkWriter(table: FileStoreTable) {
case HASH_FIXED =>
if (paimonExtensionEnabled && BucketFunction.supportsTable(table)) {
// Topology: input -> shuffle by partition & bucket
- val bucketNumber = table.coreOptions().bucket()
+ val bucketNumber = coreOptions.bucket()
val bucketKeyCol = tableSchema
.bucketKeys()
.asScala
@@ -333,12 +341,11 @@ case class PaimonSparkWriter(table: FileStoreTable) {
return
}
- val options = table.coreOptions()
val partitionComputer = new InternalRowPartitionComputer(
- options.partitionDefaultName,
+ coreOptions.partitionDefaultName,
table.schema.logicalPartitionType,
table.partitionKeys.toArray(new Array[String](0)),
- options.legacyPartitionName()
+ coreOptions.legacyPartitionName()
)
val hmsReporter = new PartitionStatisticsReporter(
table,
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DynamicBucketTableTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DynamicBucketTableTest.scala
index 023ab16646..f005156182 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DynamicBucketTableTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DynamicBucketTableTest.scala
@@ -86,6 +86,28 @@ class DynamicBucketTableTest extends PaimonSparkTestBase {
Row(0) :: Row(1) :: Row(2) :: Nil)
}
+ test(s"Paimon dynamic bucket table: write with max buckets") {
+ spark.sql(s"""
+ |CREATE TABLE T (
+ | pk STRING,
+ | v STRING,
+ | pt STRING)
+ |TBLPROPERTIES (
+ | 'primary-key' = 'pk, pt',
+ | 'bucket' = '-1',
+ | 'dynamic-bucket.max-buckets'='2'
+ |)
+ |PARTITIONED BY (pt)
+ |""".stripMargin)
+
+ spark.sql(
+ "INSERT INTO T VALUES ('1', 'a', 'p'), ('2', 'b', 'p'), ('3', 'c', 'p'),
('4', 'd', 'p'), ('5', 'e', 'p'), ('6', 'e', 'p')")
+ spark.sql(
+ "INSERT INTO T VALUES ('11', 'a', 'p'), ('22', 'b', 'p'), ('33', 'c',
'p'), ('44', 'd', 'p'), ('55', 'e', 'p'), ('66', 'e', 'p')")
+
+ checkAnswer(spark.sql("SELECT DISTINCT bucket FROM `T$FILES`"),
Seq(Row(0), Row(1)))
+ }
+
test(s"Paimon cross partition table: write with partition change") {
sql(s"""
|CREATE TABLE T (