wombatu-kun commented on code in PR #19007:
URL: https://github.com/apache/hudi/pull/19007#discussion_r3411302127


##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestClusteringWithCustomMerger.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client.functional;
+
+import org.apache.hudi.DefaultSparkRecordMerger;
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteClientTestUtils;
+import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.config.RecordMergeMode;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieClusteringConfig;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.testutils.HoodieClientTestUtils;
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.Stream;
+
+import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Reproduction for HUDI issue #18980: standalone/inline clustering on a MOR 
table configured with
+ * {@code hoodie.write.record.merge.mode=CUSTOM} fails with
+ * "No valid spark merger implementation set for 
`hoodie.write.record.merge.custom.implementation.classes`".
+ *
+ * <p>The failing path is the RDD-based clustering execution
+ * ({@code MultipleSparkJobExecutionStrategy} -> {@code 
ClusteringExecutionStrategy.getFileGroupReader}),
+ * so row writer is disabled to exercise it.
+ */
+@Tag("functional")
+class TestClusteringWithCustomMerger extends SparkClientFunctionalTestHarness {
+
+  public static class CustomMerger extends DefaultSparkRecordMerger {
+    @Override
+    public String getMergingStrategy() {
+      return HoodieRecordMerger.CUSTOM_MERGE_STRATEGY_UUID;
+    }
+  }
+
+  @Test
+  void clusteringWithCustomMergerShouldSucceed() throws Exception {
+    HoodieWriteConfig cfg = getHoodieWriteConfig(false);
+    HoodieTableMetaClient metaClient = 
getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, cfg.getProps());
+    HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
+
+    try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
+      String commit1 = "001";
+      WriteClientTestUtils.startCommitWithTime(client, commit1);
+      List<HoodieRecord> records = dataGen.generateInserts(commit1, 400);
+      Stream<HoodieBaseFile> dataFiles =
+          insertRecordsToMORTable(metaClient, records.subList(0, 200), client, 
cfg, commit1);
+      assertTrue(dataFiles.findAny().isPresent(), "should list the base files 
we wrote in the delta commit");
+
+      String commit2 = "002";
+      WriteClientTestUtils.startCommitWithTime(client, commit2);
+      dataFiles = insertRecordsToMORTable(metaClient, records.subList(200, 
400), client, cfg, commit2);
+      assertTrue(dataFiles.findAny().isPresent(), "should list the base files 
we wrote in the delta commit");
+
+      String commit3 = "003";
+      WriteClientTestUtils.startCommitWithTime(client, commit3);
+      updateRecordsInMORTable(metaClient, dataGen.generateUpdates(commit3, 
100), client, cfg, commit3, false);
+    }
+
+    HoodieWriteConfig cfgClustering = getHoodieWriteConfig(true);
+    String clusteringCommitTime;
+    try (SparkRDDWriteClient client = getHoodieWriteClient(cfgClustering)) {
+      clusteringCommitTime = 
client.scheduleClustering(Option.empty()).get().toString();
+      // Issue #18980 surfaces here: the clustering execution strategy reads 
the source file groups
+      // through HoodieFileGroupReader but does not propagate the custom 
merger impl classes, so
+      // initRecordMerger throws "No valid spark merger implementation set".
+      client.cluster(clusteringCommitTime, true);
+    }
+
+    // Clustering must have produced a completed replace commit at the 
scheduled instant.
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    HoodieTimeline timeline = 
metaClient.getCommitTimeline().filterCompletedInstants();
+    HoodieInstant lastInstant = timeline.lastInstant().get();
+    assertEquals(HoodieTimeline.REPLACE_COMMIT_ACTION, lastInstant.getAction(),
+        "Clustering should complete as a replace commit");
+    assertEquals(clusteringCommitTime, lastInstant.requestedTime(),
+        "The last commit should be the clustering instant");
+    // And all 400 distinct records must still be readable after clustering.
+    assertEquals(400, HoodieClientTestUtils.countRecordsOptionallySince(
+        jsc(), basePath(), sqlContext(), timeline, Option.empty()),
+        "All records should be readable after clustering");
+  }
+
+  private HoodieWriteConfig getHoodieWriteConfig(boolean 
enableInlineClustering) {
+    Properties props = new Properties();
+    // Force the RDD-based clustering execution path (the one that reproduces 
the bug).

Review Comment:
   The default row-writer clustering path 
(MultipleSparkJobExecutionStrategy.readRecordsForGroupAsRow:301) also calls 
this same getReaderProperties() and then getFileGroupReader, so it hits the 
identical initRecordMerger failure - the bug is not unique to the RDD path. 
Since hoodie.datasource.write.row.writer.enable defaults to true 
(MultipleSparkJobExecutionStrategy:109), the row-writer path is the one the 
issue reporter hit with default configs, yet this test forces it off and leaves 
it uncovered. Consider parameterizing the test over row.writer.enable (true and 
false) so the default path is exercised too, and softening "the one that 
reproduces the bug" here and in the class javadoc, since both paths reproduce 
it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to