hudi-agent commented on code in PR #18829:
URL: https://github.com/apache/hudi/pull/18829#discussion_r3292374894


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkAllowUpdateStrategy.java:
##########
@@ -35,13 +34,15 @@
 public class SparkAllowUpdateStrategy<T> extends BaseSparkUpdateStrategy<T> {
 
   public SparkAllowUpdateStrategy(
-      HoodieEngineContext engineContext, HoodieTable table, 
Set<HoodieFileGroupId> fileGroupsInPendingClustering) {
-    super(engineContext, table, fileGroupsInPendingClustering);
+      HoodieEngineContext engineContext, HoodieTable table,
+      Set<HoodieFileGroupId> fileGroupsInPendingClustering,
+      Set<HoodieFileGroupId> fileGroupsToBeReplaced) {
+    super(engineContext, table, fileGroupsInPendingClustering, 
fileGroupsToBeReplaced);
   }
 
   @Override
   public Pair<HoodieData<HoodieRecord<T>>, Set<HoodieFileGroupId>> 
handleUpdate(HoodieData<HoodieRecord<T>> taggedRecordsRDD) {
-    List<HoodieFileGroupId> fileGroupIdsWithRecordUpdate = 
getGroupIdsWithUpdate(taggedRecordsRDD);
+    Set<HoodieFileGroupId> fileGroupIdsWithRecordUpdate = 
getGroupIdsWithUpdate(taggedRecordsRDD);
     Set<HoodieFileGroupId> fileGroupIdsWithUpdatesAndPendingClustering = 
fileGroupIdsWithRecordUpdate.stream()
         .filter(fileGroupsInPendingClustering::contains)

Review Comment:
   🤖 Since `fileGroupsToBeReplaced` is now part of the constructor contract but 
`handleUpdate` here ignores it, INSERT_OVERWRITE against a partition with 
pending clustering will still silently win the race for any user who sets 
`SparkAllowUpdateStrategy` (e.g. with 
`PreferWriterConflictResolutionStrategy`). The PR description notes this is 
intentional follow-up — could you add a brief TODO comment here referencing the 
issue so future readers (and follow-up authors) see the trail? Same applies to 
`SparkConsistentBucketDuplicateUpdateStrategy`.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java:
##########
@@ -101,6 +103,26 @@ protected List<String> getAllExistingFileIds(String 
partitionPath) {
     return 
table.getSliceView().getLatestFileSlices(partitionPath).map(FileSlice::getFileId).distinct().collect(Collectors.toList());
   }
 
+  @Override
+  protected Set<HoodieFileGroupId> 
getFileGroupsBeingReplaced(HoodieData<HoodieRecord<T>> inputRecords) {
+    String staticOverwritePartition = 
config.getStringOrDefault(HoodieInternalConfig.STATIC_OVERWRITE_PARTITION_PATHS);
+    List<String> partitionPaths;
+
+    if (StringUtils.nonEmpty(staticOverwritePartition)) {
+      // Static insert overwrite: use the configured partitions
+      partitionPaths = Arrays.asList(staticOverwritePartition.split(","));
+    } else {
+      // Dynamic insert overwrite: determine partitions from input records

Review Comment:
   🤖 Small consistency concern: dynamic-overwrite here uses 
`inputRecords.map(HoodieRecord::getPartitionPath)` (the input records' 
partition paths), while `getPartitionToReplacedFileIds` (line 94) uses 
`writeMetadata.getWriteStatuses().map(... getStat().getPartitionPath())` (the 
output partition paths). For INSERT_OVERWRITE these should match in practice, 
but is there any path (e.g. with a key generator that derives a different 
partition path) where they could diverge and we'd reject the wrong partitions 
or miss a real overlap?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/UpdateStrategy.java:
##########
@@ -34,11 +34,15 @@ public abstract class UpdateStrategy<T, I> implements 
Serializable {
   protected final transient HoodieEngineContext engineContext;
   protected HoodieTable table;
   protected Set<HoodieFileGroupId> fileGroupsInPendingClustering;
+  protected Set<HoodieFileGroupId> fileGroupsToBeReplaced;
 
-  public UpdateStrategy(HoodieEngineContext engineContext, HoodieTable table, 
Set<HoodieFileGroupId> fileGroupsInPendingClustering) {
+  public UpdateStrategy(HoodieEngineContext engineContext, HoodieTable table,

Review Comment:
   🤖 This is a breaking change for users with custom strategy classes set via 
`hoodie.clustering.updates.strategy`. The 3-arg constructor is replaced (not 
added alongside) — so `ReflectionUtils.loadClass(..., new Class<?>[] 
{HoodieEngineContext.class, HoodieTable.class, Set.class, Set.class}, ...)` in 
`BaseSparkCommitActionExecutor` will now throw `NoSuchMethodException` for any 
existing custom strategy with the old 3-arg constructor. Could we either keep 
the 3-arg constructor for backward compatibility (delegating to the 4-arg with 
`Collections.emptySet()`), or have the executor fall back to the 3-arg 
signature via `ReflectionUtils.hasConstructor`? @bvaradar / @yihua any 
preference here for the migration path?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestInsertOverwriteWithClustering.java:
##########
@@ -0,0 +1,835 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.HoodieWriteResult;
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteClientTestUtils;
+import org.apache.hudi.client.WriteStatus;
+import 
org.apache.hudi.client.clustering.plan.strategy.SparkSingleFileSortPlanStrategy;
+import 
org.apache.hudi.client.clustering.run.strategy.SparkSingleFileSortExecutionStrategy;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieClusteringConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaRDD;
+import org.apache.hudi.exception.HoodieUpsertException;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for INSERT_OVERWRITE, INSERT_OVERWRITE_TABLE, and DELETE_PARTITION 
operations
+ * when there are pending clustering operations on the file groups being 
replaced.
+ */
+public class TestInsertOverwriteWithClustering extends HoodieClientTestBase {
+
+  private HoodieTestDataGenerator dataGen;
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    initPath();
+    initSparkContexts();
+    initTestDataGenerator();
+    initMetaClient(HoodieTableType.COPY_ON_WRITE);
+    dataGen = new HoodieTestDataGenerator();
+  }
+
+  @AfterEach
+  public void tearDown() throws Exception {
+    cleanupResources();
+  }
+
+  private HoodieWriteConfig.Builder getConfigBuilder(boolean 
rollbackPendingClustering) {
+    return HoodieWriteConfig.newBuilder()
+        .withPath(basePath)
+        .withSchema(TRIP_EXAMPLE_SCHEMA)
+        .withParallelism(2, 2)
+        .withBulkInsertParallelism(2)
+        .withFinalizeWriteParallelism(2)
+        .withDeleteParallelism(2)
+        .withRollbackParallelism(2)
+        .withClusteringConfig(HoodieClusteringConfig.newBuilder()
+            
.withClusteringPlanStrategyClass(SparkSingleFileSortPlanStrategy.class.getName())
+            
.withClusteringExecutionStrategyClass(SparkSingleFileSortExecutionStrategy.class.getName())
+            .withClusteringMaxNumGroups(10)
+            .withRollbackPendingClustering(rollbackPendingClustering)
+            .build());
+  }
+
+  private HoodieWriteConfig.Builder 
getConfigBuilderWithPartitionFilter(boolean rollbackPendingClustering, String 
partitionFilter) {

Review Comment:
   🤖 nit: `getConfigBuilderWithPartitionFilter` duplicates the entire builder 
chain from `getConfigBuilder` — could you extract the shared clustering config 
into a small private helper (e.g. `baseClusteringConfigBuilder(boolean 
rollback)`) that both methods delegate to? Also, 
`testInsertOverwriteNonOverlappingPartitionWithPendingClustering` (~line 340) 
rebuilds the same clustering config inline when 
`getConfigBuilderWithPartitionFilter(true, "partition1")` already exists for 
exactly this.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkRejectUpdateStrategy.java:
##########
@@ -39,18 +38,23 @@
 @Slf4j
 public class SparkRejectUpdateStrategy<T> extends BaseSparkUpdateStrategy<T> {
 
-  public SparkRejectUpdateStrategy(HoodieEngineContext engineContext, 
HoodieTable table, Set<HoodieFileGroupId> fileGroupsInPendingClustering) {
-    super(engineContext, table, fileGroupsInPendingClustering);
+  public SparkRejectUpdateStrategy(HoodieEngineContext engineContext, 
HoodieTable table,
+                                   Set<HoodieFileGroupId> 
fileGroupsInPendingClustering,
+                                   Set<HoodieFileGroupId> 
fileGroupsToBeReplaced) {
+    super(engineContext, table, fileGroupsInPendingClustering, 
fileGroupsToBeReplaced);
   }
 
   @Override
   public Pair<HoodieData<HoodieRecord<T>>, Set<HoodieFileGroupId>> 
handleUpdate(HoodieData<HoodieRecord<T>> taggedRecordsRDD) {
-    List<HoodieFileGroupId> fileGroupIdsWithRecordUpdate = 
getGroupIdsWithUpdate(taggedRecordsRDD);
-    fileGroupIdsWithRecordUpdate.forEach(fileGroupIdWithRecordUpdate -> {
-      if (fileGroupsInPendingClustering.contains(fileGroupIdWithRecordUpdate)) 
{
+    Set<HoodieFileGroupId> allAffectedFileGroups = 
getGroupIdsWithUpdate(taggedRecordsRDD);
+    // Combine file groups with updates and file groups to be replaced

Review Comment:
   🤖 nit: the comments on lines 50 and 52 just restate what the following 
statements say — `allAffectedFileGroups.addAll(...)` and `forEach(... 
contains(...))` are already self-explanatory through variable names. Could you 
drop them, or replace both with a single intent-level comment like `// also 
treat replaced file groups as potential conflict targets`?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to