bhavya-ganatra opened a new issue, #18980:
URL: https://github.com/apache/hudi/issues/18980
### Bug Description
**What happened:**
When using `hoodie.write.record.merge.mode=CUSTOM` with a custom merger
class specified via `hoodie.write.record.merge.custom.implementation.classes`,
inline clustering (and standalone clustering) fails with:
```
java.lang.IllegalArgumentException: No valid spark merger implementation set
for
`hoodie.write.record.merge.custom.implementation.classes`
```
The error is thrown during the clustering execution phase when Hudi attempts
to read source file groups to produce the clustered output. Every clustering
task fails on every executor with the same error. The same custom merger class
works correctly for inline compaction on the same table without any issues.
Additionally, the merger is successfully used on other COW tables and all MOR
read paths function correctly with the same implementation. Therefore, the
issue appears to be specific to the clustering code path rather than the custom
merger implementation itself.
**What you expected:**
Clustering should successfully read file groups using the configured custom
merger class, the same way inline compaction does. Since
`hoodie.write.record.merge.custom.implementation.classes` is explicitly set in
the write config, clustering should be able to resolve and instantiate the
custom merger implementation.
**Steps to reproduce:**
1. Create a MOR table with `hoodie.write.record.merge.mode=CUSTOM` and a
custom merger class that extends `HoodieSparkRecordMerger`, e.g.:
```
hoodie.write.record.merge.mode=CUSTOM
hoodie.write.record.merge.custom.implementation.classes=com.example.CustomHoodieSparkRecordMerger
hoodie.write.record.merge.strategy.id=<custom-uuid>
```
2. Write data to the table (upserts) — these succeed.
3. Enable inline clustering:
```
hoodie.clustering.inline=true
hoodie.clustering.inline.max.commits=1
hoodie.clustering.plan.strategy.target.file.max.bytes=10485760 // low
threshold for easy reproduce
hoodie.clustering.plan.strategy.small.file.limit=5242880 // low threshold
for easy reproduce
hoodie.clustering.plan.strategy.partition.selected="<list of partitions -
comma seperated>"
hoodie.clustering.execution.strategy.class=org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy
```
4. After the first successful delta commit, clustering is triggered
automatically. Observe that all clustering tasks fail with
`IllegalArgumentException: No valid spark merger implementation set`.
5. Note that inline compaction (`hoodie.compact.inline=true`) on the same
table with the same configs succeeds without error.
## Additional Notes / Investigation Hunch
> **Note:** The following is our working hypothesis based on code inspection
— not a confirmed root cause. We may be wrong; please treat this as a starting
point for investigation.
Looking at the stack trace, the failure path is:
`MultipleSparkJobExecutionStrategy` →
`ClusteringExecutionStrategy.getFileGroupReader()` →
`HoodieFileGroupReader.<init>` → `HoodieReaderContext.initRecordMerger()` →
`BaseSparkInternalRowReaderContext.getRecordMerger()`
The key question is: **why does `initRecordMerger` not find the custom class
when it is present in the write config?**
One possible explanation we noticed from looking at
`ClusteringExecutionStrategy`:
The clustering execution strategy builds its own `TypedProperties` to pass
to `HoodieFileGroupReader` (via something like `getReaderProperties()`). If
this method only copies a limited set of properties (e.g., spill-map and
memory-related keys) and does **not** propagate
`hoodie.write.record.merge.custom.implementation.classes`, then
`initRecordMerger` would receive an empty/missing value for that key.
Meanwhile, compaction works because it goes through a different code path
(`SparkMergeHelper` / `HoodieSparkMergeOnReadTableCompactor`) which has access
to the full write config props.
A secondary possible issue: even if `ConfigUtils.getMergeProps()` enriches
the props with table config (`hoodie.properties`), `initRecordMerger` may be
called with the **original** un-enriched `TypedProperties` rather than the
enriched copy.
We have not confirmed this by stepping through the code — if the Hudi team
can verify whether `getReaderProperties()` in `ClusteringExecutionStrategy`
explicitly propagates the custom merge implementation class key, that would
either confirm or rule out this hypothesis.
### Environment
**Hudi version:** 1.1.0 (`hudi-spark3.5-bundle_2.12-1.1.0.jar`)
**Query engine:** (Spark/Flink/Trino etc): Spark 3.5 (AWS EMR-flavored,
`spark-sql_2.12-3.5.6-amzn-1`)
**Relevant configs:**
```
hoodie.write.record.merge.mode=CUSTOM
hoodie.write.record.merge.custom.implementation.classes=com.example.CustomHoodieSparkRecordMerger
hoodie.write.record.merge.strategy.id=<custom-uuid>
hoodie.datasource.write.table.type=MERGE_ON_READ
hoodie.clustering.inline=true
hoodie.clustering.inline.max.commits=1
hoodie.clustering.execution.strategy.class=org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy
hoodie.clustering.plan.strategy.class=org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy
hoodie.write.concurrency.mode=optimistic_concurrency_control
hoodie.clean.policy.failed.writes=LAZY
```
### Logs and Stack Trace
Error thrown on each executor task (all tasks fail, Spark retries 4 times
per task before aborting):
```
WARN [task-result-getter] - Lost task 0.0 in stage 39.0:
java.lang.IllegalArgumentException: No valid spark merger implementation set
for
`hoodie.write.record.merge.custom.implementation.classes`
at
org.apache.hudi.BaseSparkInternalRowReaderContext.getRecordMerger(BaseSparkInternalRowReaderContext.java:74)
at
org.apache.hudi.common.engine.HoodieReaderContext.initRecordMerger(HoodieReaderContext.java:331)
at
org.apache.hudi.common.engine.HoodieReaderContext.initRecordMerger(HoodieReaderContext.java:289)
at
org.apache.hudi.common.table.read.HoodieFileGroupReader.<init>(HoodieFileGroupReader.java:111)
at
org.apache.hudi.common.table.read.HoodieFileGroupReader.<init>(HoodieFileGroupReader.java:70)
at
org.apache.hudi.common.table.read.HoodieFileGroupReader$Builder.build(HoodieFileGroupReader.java:534)
at
org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy.getFileGroupReader(ClusteringExecutionStrategy.java:156)
at
org.apache.hudi.client.clustering.run.strategy.MultipleSparkJobExecutionStrategy.access$100(MultipleSparkJobExecutionStrategy.java:95)
at
org.apache.hudi.client.clustering.run.strategy.MultipleSparkJobExecutionStrategy$1.call(MultipleSparkJobExecutionStrategy.java:323)
at
org.apache.hudi.client.clustering.run.strategy.MultipleSparkJobExecutionStrategy$1.call(MultipleSparkJobExecutionStrategy.java:314)
at
org.apache.spark.api.java.JavaRDDLike.$anonfun$flatMap$1(JavaRDDLike.scala:125)
...
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:635)
```
Driver-level error after all tasks fail:
```
ERROR [stream execution thread] (BaseHoodieWriteClient.java:641) - Inline
compaction or
clustering failed for table s3://...
java.util.concurrent.CompletionException:
java.util.concurrent.CancellationException
at
org.apache.hudi.common.util.FutureUtils.lambda$null$0(FutureUtils.java:52)
...
Caused by: java.util.concurrent.CancellationException
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]