This is an automated email from the ASF dual-hosted git repository.

wombatu-kun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new a4f0e261b950 fix(spark): Propagate merge configs to file group reader 
during clustering (#19007)
a4f0e261b950 is described below

commit a4f0e261b9501aed3221dde3ef7b09323ea22918
Author: Aditya Goenka <[email protected]>
AuthorDate: Tue Jun 16 18:59:00 2026 +0530

    fix(spark): Propagate merge configs to file group reader during clustering 
(#19007)
---
 .../strategy/ClusteringExecutionStrategy.java      |   6 +-
 .../procedure/TestClusteringWithCustomMerger.scala | 133 +++++++++++++++++++++
 2 files changed, 138 insertions(+), 1 deletion(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringExecutionStrategy.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringExecutionStrategy.java
index cc3c4a02eb22..9610f728f8c7 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringExecutionStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringExecutionStrategy.java
@@ -107,7 +107,11 @@ public abstract class ClusteringExecutionStrategy<T, I, K, 
O> implements Seriali
 
   protected TypedProperties getReaderProperties(long maxMemory) {
     HoodieWriteConfig config = getWriteConfig();
-    TypedProperties props = new TypedProperties();
+    // Seed from the full write config so that merge-related properties (e.g. 
the custom record
+    // merger impl classes, merge mode and strategy id) are propagated to the 
file group reader.
+    // Without this, reading source file groups with a CUSTOM merge mode fails 
to resolve the
+    // configured merger (HUDI-18980). This mirrors the compaction read path.
+    TypedProperties props = TypedProperties.copy(config.getProps());
     props.setProperty(SPILLABLE_MAP_BASE_PATH.key(), 
config.getSpillableMapBasePath());
     props.setProperty(SPILLABLE_DISK_MAP_TYPE.key(), 
config.getCommonConfig().getSpillableDiskMapType().toString());
     props.setProperty(DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(), 
Boolean.toString(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled()));
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringWithCustomMerger.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringWithCustomMerger.scala
new file mode 100644
index 000000000000..7319bba9768c
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringWithCustomMerger.scala
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.hudi.procedure
+
+import org.apache.hudi.{DefaultSparkRecordMerger, HoodieDataSourceHelpers}
+import org.apache.hudi.common.config.HoodieReaderConfig
+import org.apache.hudi.common.model.HoodieRecordMerger
+import org.apache.hudi.common.table.timeline.HoodieTimeline
+
+import org.apache.hadoop.fs.Path
+
+import scala.collection.JavaConverters._
+
+/**
+ * Regression test for <a 
href="https://github.com/apache/hudi/issues/18980";>HUDI issue #18980</a>:
+ * clustering on a table configured with {@code 
hoodie.write.record.merge.mode=CUSTOM} and a custom
+ * Spark merger failed with
+ * "No valid spark merger implementation set for 
`hoodie.write.record.merge.custom.implementation.classes`".
+ *
+ * <p>The merge mode and strategy id are persisted as table config, but the 
custom merger impl classes
+ * are a write-side config that is not persisted. Before the fix,
+ * {@code ClusteringExecutionStrategy.getReaderProperties} built a fresh 
property set containing only
+ * the spill/memory keys, dropping the impl classes, so the file group reader 
could not resolve the
+ * configured merger. The fix seeds the reader properties from the full write 
config.
+ *
+ * <p>Both clustering execution paths call {@code getReaderProperties} and 
reproduce the bug:
+ * the row-writer path ({@code 
MultipleSparkJobExecutionStrategy#readRecordsForGroupAsRow}, used when
+ * {@code hoodie.datasource.write.row.writer.enable} is true — the default the 
reporter hit) and the
+ * RDD path ({@code MultipleSparkJobExecutionStrategy#readRecordsForGroup} 
when it is false). The test
+ * is parameterized over both.
+ *
+ * <p>This lives in {@code hudi-spark} (not {@code hudi-spark-client}): a 
CUSTOM/SPARK-typed merger
+ * forces the InternalRow write path, which needs a concrete {@code 
SparkXXXAdapter} and real Spark
+ * records — neither is available to {@code hudi-spark-client}'s test 
classpath / Avro test-data path.
+ */
+class TestClusteringWithCustomMerger extends HoodieSparkProcedureTestBase {
+
+  test("Test clustering with CUSTOM record merge mode and a custom Spark 
merger") {
+    // hoodie.datasource.write.row.writer.enable: true exercises the 
row-writer clustering path
+    // (the default the reporter hit), false exercises the RDD path. Both call 
getReaderProperties.
+    Seq("true", "false").foreach { rowWriterEnabled =>
+      withTempDir { tmp =>
+        val tableName = generateTableName
+        val basePath = s"${tmp.getCanonicalPath}/$tableName"
+        val mergerClass = classOf[CustomSparkRecordMergerForClustering].getName
+
+        spark.sql(
+          s"""
+             |create table $tableName (
+             |  id int,
+             |  name string,
+             |  price double,
+             |  ts long,
+             |  part long
+             |) using hudi
+             | tblproperties (
+             |  primaryKey = 'id',
+             |  type = 'mor',
+             |  orderingFields = 'ts',
+             |  "hoodie.write.record.merge.mode" = "CUSTOM",
+             |  "hoodie.write.record.merge.strategy.id" = 
"${HoodieRecordMerger.CUSTOM_MERGE_STRATEGY_UUID}",
+             |  
"${HoodieReaderConfig.RECORD_MERGE_IMPL_CLASSES_WRITE_CONFIG_KEY}" = 
"$mergerClass",
+             |  "hoodie.parquet.small.file.limit" = "0"
+             | )
+             | partitioned by (part)
+             | location '$basePath'
+       """.stripMargin)
+
+        withSQLConf(
+          "hoodie.compact.inline" -> "false",
+          "hoodie.compact.schedule.inline" -> "false") {
+          // Several commits into the same partition create multiple file 
groups for clustering to
+          // combine (small.file.limit=0 keeps every insert in a new base 
file).
+          spark.sql(s"insert into $tableName values (1, 'a1', 10.0, 1000, 
100)")
+          spark.sql(s"insert into $tableName values (2, 'a2', 20.0, 1001, 
100)")
+          spark.sql(s"insert into $tableName values (3, 'a3', 30.0, 1002, 
100)")
+          // An update lands in a log file, so clustering must merge base + 
log records through the
+          // file group reader using the CUSTOM Spark merger.
+          spark.sql(s"update $tableName set price = 99.0, ts = 1003 where id = 
1")
+
+          // Pin the row-writer setting and propagate the custom merger impl 
classes (write-side,
+          // non-persisted) to the clustering write client. Before HUDI-18980 
the reader properties
+          // dropped these and clustering failed with "No valid spark merger 
implementation set".
+          val clusteringOptions =
+            s"hoodie.datasource.write.row.writer.enable=$rowWriterEnabled," +
+              
s"${HoodieReaderConfig.RECORD_MERGE_IMPL_CLASSES_WRITE_CONFIG_KEY}=$mergerClass"
+          spark.sql(s"call run_clustering(table => '$tableName', options => 
'$clusteringOptions')").show()
+
+          // Clustering must have produced exactly one completed replace 
commit.
+          val fs = new 
Path(basePath).getFileSystem(spark.sessionState.newHadoopConf())
+          val replaceCommits = 
HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath)
+            .getInstants.iterator().asScala
+            .filter(_.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION)
+            .toSeq
+          assertResult(1)(replaceCommits.size)
+
+          // All records remain readable after clustering, with the update 
applied.
+          checkAnswer(s"select id, name, price, ts, part from $tableName order 
by id")(
+            Seq(1, "a1", 99.0, 1003, 100),
+            Seq(2, "a2", 20.0, 1001, 100),
+            Seq(3, "a3", 30.0, 1002, 100)
+          )
+        }
+      }
+    }
+  }
+}
+
+/**
+ * A custom Spark record merger whose only purpose is to advertise the CUSTOM 
merge strategy id, so the
+ * table is configured with {@code RecordMergeMode.CUSTOM} and a custom merger 
impl class. It otherwise
+ * reuses the default Spark merge behavior.
+ */
+class CustomSparkRecordMergerForClustering extends DefaultSparkRecordMerger {
+  override def getMergingStrategy: String = 
HoodieRecordMerger.CUSTOM_MERGE_STRATEGY_UUID
+}

Reply via email to