hudi-agent commented on code in PR #18829:
URL: https://github.com/apache/hudi/pull/18829#discussion_r3292374894
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkAllowUpdateStrategy.java:
##########
@@ -35,13 +34,15 @@
public class SparkAllowUpdateStrategy<T> extends BaseSparkUpdateStrategy<T> {
public SparkAllowUpdateStrategy(
- HoodieEngineContext engineContext, HoodieTable table,
Set<HoodieFileGroupId> fileGroupsInPendingClustering) {
- super(engineContext, table, fileGroupsInPendingClustering);
+ HoodieEngineContext engineContext, HoodieTable table,
+ Set<HoodieFileGroupId> fileGroupsInPendingClustering,
+ Set<HoodieFileGroupId> fileGroupsToBeReplaced) {
+ super(engineContext, table, fileGroupsInPendingClustering,
fileGroupsToBeReplaced);
}
@Override
public Pair<HoodieData<HoodieRecord<T>>, Set<HoodieFileGroupId>>
handleUpdate(HoodieData<HoodieRecord<T>> taggedRecordsRDD) {
- List<HoodieFileGroupId> fileGroupIdsWithRecordUpdate =
getGroupIdsWithUpdate(taggedRecordsRDD);
+ Set<HoodieFileGroupId> fileGroupIdsWithRecordUpdate =
getGroupIdsWithUpdate(taggedRecordsRDD);
Set<HoodieFileGroupId> fileGroupIdsWithUpdatesAndPendingClustering =
fileGroupIdsWithRecordUpdate.stream()
.filter(fileGroupsInPendingClustering::contains)
Review Comment:
🤖 Since `fileGroupsToBeReplaced` is now part of the constructor contract but
`handleUpdate` here ignores it, INSERT_OVERWRITE against a partition with
pending clustering will still silently win the race for any user who sets
`SparkAllowUpdateStrategy` (e.g. with
`PreferWriterConflictResolutionStrategy`). The PR description notes this is
intentional follow-up — could you add a brief TODO comment here referencing the
issue so future readers (and follow-up authors) see the trail? Same applies to
`SparkConsistentBucketDuplicateUpdateStrategy`.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java:
##########
@@ -101,6 +103,26 @@ protected List<String> getAllExistingFileIds(String
partitionPath) {
return
table.getSliceView().getLatestFileSlices(partitionPath).map(FileSlice::getFileId).distinct().collect(Collectors.toList());
}
+ @Override
+ protected Set<HoodieFileGroupId>
getFileGroupsBeingReplaced(HoodieData<HoodieRecord<T>> inputRecords) {
+ String staticOverwritePartition =
config.getStringOrDefault(HoodieInternalConfig.STATIC_OVERWRITE_PARTITION_PATHS);
+ List<String> partitionPaths;
+
+ if (StringUtils.nonEmpty(staticOverwritePartition)) {
+ // Static insert overwrite: use the configured partitions
+ partitionPaths = Arrays.asList(staticOverwritePartition.split(","));
+ } else {
+ // Dynamic insert overwrite: determine partitions from input records
Review Comment:
🤖 Small consistency concern: dynamic-overwrite here uses
`inputRecords.map(HoodieRecord::getPartitionPath)` (the input records'
partition paths), while `getPartitionToReplacedFileIds` (line 94) uses
`writeMetadata.getWriteStatuses().map(... getStat().getPartitionPath())` (the
output partition paths). For INSERT_OVERWRITE these should match in practice,
but is there any path (e.g. with a key generator that derives a different
partition path) where they could diverge and we'd reject the wrong partitions
or miss a real overlap?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/UpdateStrategy.java:
##########
@@ -34,11 +34,15 @@ public abstract class UpdateStrategy<T, I> implements
Serializable {
protected final transient HoodieEngineContext engineContext;
protected HoodieTable table;
protected Set<HoodieFileGroupId> fileGroupsInPendingClustering;
+ protected Set<HoodieFileGroupId> fileGroupsToBeReplaced;
- public UpdateStrategy(HoodieEngineContext engineContext, HoodieTable table,
Set<HoodieFileGroupId> fileGroupsInPendingClustering) {
+ public UpdateStrategy(HoodieEngineContext engineContext, HoodieTable table,
Review Comment:
🤖 This is a breaking change for users with custom strategy classes set via
`hoodie.clustering.updates.strategy`. The 3-arg constructor is replaced (not
added alongside) — so `ReflectionUtils.loadClass(..., new Class<?>[]
{HoodieEngineContext.class, HoodieTable.class, Set.class, Set.class}, ...)` in
`BaseSparkCommitActionExecutor` will now throw `NoSuchMethodException` for any
existing custom strategy with the old 3-arg constructor. Could we either keep
the 3-arg constructor for backward compatibility (delegating to the 4-arg with
`Collections.emptySet()`), or have the executor fall back to the 3-arg
signature via `ReflectionUtils.hasConstructor`? @bvaradar / @yihua any
preference here for the migration path?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestInsertOverwriteWithClustering.java:
##########
@@ -0,0 +1,835 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.HoodieWriteResult;
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteClientTestUtils;
+import org.apache.hudi.client.WriteStatus;
+import
org.apache.hudi.client.clustering.plan.strategy.SparkSingleFileSortPlanStrategy;
+import
org.apache.hudi.client.clustering.run.strategy.SparkSingleFileSortExecutionStrategy;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieClusteringConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaRDD;
+import org.apache.hudi.exception.HoodieUpsertException;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for INSERT_OVERWRITE, INSERT_OVERWRITE_TABLE, and DELETE_PARTITION
operations
+ * when there are pending clustering operations on the file groups being
replaced.
+ */
+public class TestInsertOverwriteWithClustering extends HoodieClientTestBase {
+
+ private HoodieTestDataGenerator dataGen;
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ initPath();
+ initSparkContexts();
+ initTestDataGenerator();
+ initMetaClient(HoodieTableType.COPY_ON_WRITE);
+ dataGen = new HoodieTestDataGenerator();
+ }
+
+ @AfterEach
+ public void tearDown() throws Exception {
+ cleanupResources();
+ }
+
+ private HoodieWriteConfig.Builder getConfigBuilder(boolean
rollbackPendingClustering) {
+ return HoodieWriteConfig.newBuilder()
+ .withPath(basePath)
+ .withSchema(TRIP_EXAMPLE_SCHEMA)
+ .withParallelism(2, 2)
+ .withBulkInsertParallelism(2)
+ .withFinalizeWriteParallelism(2)
+ .withDeleteParallelism(2)
+ .withRollbackParallelism(2)
+ .withClusteringConfig(HoodieClusteringConfig.newBuilder()
+
.withClusteringPlanStrategyClass(SparkSingleFileSortPlanStrategy.class.getName())
+
.withClusteringExecutionStrategyClass(SparkSingleFileSortExecutionStrategy.class.getName())
+ .withClusteringMaxNumGroups(10)
+ .withRollbackPendingClustering(rollbackPendingClustering)
+ .build());
+ }
+
+ private HoodieWriteConfig.Builder
getConfigBuilderWithPartitionFilter(boolean rollbackPendingClustering, String
partitionFilter) {
Review Comment:
🤖 nit: `getConfigBuilderWithPartitionFilter` duplicates the entire builder
chain from `getConfigBuilder` — could you extract the shared clustering config
into a small private helper (e.g. `baseClusteringConfigBuilder(boolean
rollback)`) that both methods delegate to? Also,
`testInsertOverwriteNonOverlappingPartitionWithPendingClustering` (~line 340)
rebuilds the same clustering config inline when
`getConfigBuilderWithPartitionFilter(true, "partition1")` already exists for
exactly this.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkRejectUpdateStrategy.java:
##########
@@ -39,18 +38,23 @@
@Slf4j
public class SparkRejectUpdateStrategy<T> extends BaseSparkUpdateStrategy<T> {
- public SparkRejectUpdateStrategy(HoodieEngineContext engineContext,
HoodieTable table, Set<HoodieFileGroupId> fileGroupsInPendingClustering) {
- super(engineContext, table, fileGroupsInPendingClustering);
+ public SparkRejectUpdateStrategy(HoodieEngineContext engineContext,
HoodieTable table,
+ Set<HoodieFileGroupId>
fileGroupsInPendingClustering,
+ Set<HoodieFileGroupId>
fileGroupsToBeReplaced) {
+ super(engineContext, table, fileGroupsInPendingClustering,
fileGroupsToBeReplaced);
}
@Override
public Pair<HoodieData<HoodieRecord<T>>, Set<HoodieFileGroupId>>
handleUpdate(HoodieData<HoodieRecord<T>> taggedRecordsRDD) {
- List<HoodieFileGroupId> fileGroupIdsWithRecordUpdate =
getGroupIdsWithUpdate(taggedRecordsRDD);
- fileGroupIdsWithRecordUpdate.forEach(fileGroupIdWithRecordUpdate -> {
- if (fileGroupsInPendingClustering.contains(fileGroupIdWithRecordUpdate))
{
+ Set<HoodieFileGroupId> allAffectedFileGroups =
getGroupIdsWithUpdate(taggedRecordsRDD);
+ // Combine file groups with updates and file groups to be replaced
Review Comment:
🤖 nit: the comments on lines 50 and 52 just restate what the following
statements say — `allAffectedFileGroups.addAll(...)` and `forEach(...
contains(...))` are already self-explanatory through variable names. Could you
drop them, or replace both with a single intent-level comment like `// also
treat replaced file groups as potential conflict targets`?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]