hudi-agent commented on code in PR #18828:
URL: https://github.com/apache/hudi/pull/18828#discussion_r3292358448
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackHelperV1.java:
##########
@@ -323,16 +328,30 @@ List<Pair<String, HoodieRollbackStat>>
maybeDeleteAndCollectStats(HoodieEngineCo
.withTableVersion(tableVersion)
.withFileExtension(HoodieLogFile.DELTA_EXTENSION);
- // Supply the pre-computed latest log version and its write token so
that
- // WriterBuilder.build() skips the per-request
FSUtils.getLatestLogVersion() listing.
- // This produces the same result: build() would discover (N,
T_existing), construct
- // path (N, T_existing), find it exists, and roll over to N+1.
Pre-computation
- // feeds the same (N, T_existing), triggering the identical rollover
in getOutputStream().
+ // Apply pre-computed log version if available. Always keep the
per-task write token
+ // generated above (via CommonClientUtils.generateWriteToken) so
that retried/repeated
+ // rollbacks do not collide on UNKNOWN_WRITE_TOKEN or inherit a
prior log's write token.
+ //
+ // When doDelete=true, we actually create a new rollback log file:
explicitly bump the
+ // version (latest + 1) so the new file is written with the per-task
write token instead
Review Comment:
🤖 Out of curiosity — when `preComputeLogVersions` fails for a partition
(IOException caught/logged on line 147), `preComputedVersion` is null for all
requests in that partition, so this whole `if` block is skipped. Then
`WriterBuilder.build()` falls back to `FSUtils.getLatestLogVersion`, which (for
table v6, where `useBaseVersion=false`) overrides the per-task token set on
line 317 with the existing log's token — the exact bug this PR fixes. Is this
listing-failure fallback considered rare enough to leave as-is, or worth
handling here too?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/rollback/TestRollbackHelper.java:
##########
@@ -681,7 +680,9 @@ void
testPreComputeLogVersionsSentinelForMissingFileGroups() throws Exception {
String missingKey = RollbackHelperV1.logVersionLookupKey(partition,
"fileId-no-logs", baseInstant);
assertTrue(result.containsKey(missingKey));
assertEquals(HoodieLogFile.LOGFILE_BASE_VERSION, (int)
result.get(missingKey).getLeft());
- assertEquals(HoodieLogFormat.UNKNOWN_WRITE_TOKEN,
result.get(missingKey).getRight());
+ // Sentinel entries (no real log file) carry a null write token so they
cannot be confused
+ // with a real log file that happens to use UNKNOWN_WRITE_TOKEN.
+ assertEquals(null, result.get(missingKey).getRight());
Review Comment:
🤖 nit: `assertEquals(null, ...)` works but
`assertNull(result.get(missingKey).getRight())` is the idiomatic JUnit form and
reads more naturally here.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java:
##########
@@ -487,4 +503,221 @@ public void testRollbackWhenFirstCommitFail() {
client.rollback(newCommitTime);
}
}
+
+ /**
+ * Tests that rollback operations generate unique write tokens for log
files, preventing collisions
+ * during repeated rollback attempts.
+ *
+ * <p>This test validates the fix for write token generation in metadata
table rollbacks. Previously,
+ * rollback log files used the default UNKNOWN_WRITE_TOKEN ("1-0-1"),
causing collisions when rollback
+ * was retried. Now, each rollback generates explicit write tokens based on
Spark task context
+ * (format: {partitionId}-{stageId}-{attemptId}).
+ *
+ * <p>Test flow:
+ * <ol>
+ * <li>Create initial commit with inserts to establish base files</li>
+ * <li>Create second commit with updates to generate log files (MOR
table)</li>
+ * <li>Backup commit timeline files and marker directory for repeated
rollback simulation</li>
+ * <li>Execute first rollback and validate write tokens are NOT
"1-0-1"</li>
+ * <li>Restore commit state (timeline files + markers) to simulate
rollback retry scenario</li>
+ * <li>Execute second rollback and validate unique write tokens prevent
collisions</li>
+ * <li>Verify exactly one new rollback log file per file group from second
attempt</li>
+ * </ol>
Review Comment:
🤖 nit: the `@param` name here is `enableFileSliceOptimization` but the
method parameter is `enableMetadataTable` — could you update the Javadoc to
match and also fix the description, which describes file-slice caching rather
than the metadata table toggle?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java:
##########
@@ -487,4 +503,221 @@ public void testRollbackWhenFirstCommitFail() {
client.rollback(newCommitTime);
}
}
+
+ /**
+ * Tests that rollback operations generate unique write tokens for log
files, preventing collisions
+ * during repeated rollback attempts.
+ *
+ * <p>This test validates the fix for write token generation in metadata
table rollbacks. Previously,
+ * rollback log files used the default UNKNOWN_WRITE_TOKEN ("1-0-1"),
causing collisions when rollback
+ * was retried. Now, each rollback generates explicit write tokens based on
Spark task context
+ * (format: {partitionId}-{stageId}-{attemptId}).
+ *
+ * <p>Test flow:
+ * <ol>
+ * <li>Create initial commit with inserts to establish base files</li>
+ * <li>Create second commit with updates to generate log files (MOR
table)</li>
+ * <li>Backup commit timeline files and marker directory for repeated
rollback simulation</li>
+ * <li>Execute first rollback and validate write tokens are NOT
"1-0-1"</li>
+ * <li>Restore commit state (timeline files + markers) to simulate
rollback retry scenario</li>
+ * <li>Execute second rollback and validate unique write tokens prevent
collisions</li>
+ * <li>Verify exactly one new rollback log file per file group from second
attempt</li>
+ * </ol>
+ *
+ * @param enableFileSliceOptimization tests both with and without file slice
caching optimization
+ * to ensure write tokens work correctly
in both code paths
+ */
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void testRollbackWriteTokenGeneration(boolean enableMetadataTable)
throws Exception {
+ // 1. Setup: Create a table-version-6 MOR table so the rollback exercises
RollbackHelperV1
+ // (which is what this test targets). On v8+ rollbacks delete files
directly and don't
+ // produce rollback log files.
+ Properties props = new Properties();
+ props.put(HoodieTableConfig.VERSION.key(),
HoodieTableVersion.SIX.versionCode());
+ tearDown();
+ initPath();
+ initSparkContexts();
+ dataGen = new HoodieTestDataGenerator(
+ new String[] {DEFAULT_FIRST_PARTITION_PATH,
DEFAULT_SECOND_PARTITION_PATH});
+ initHoodieStorage();
+ initMetaClient(HoodieTableType.MERGE_ON_READ, props);
+
+ HoodieWriteConfig cfg = getConfigBuilder()
+ .withRollbackUsingMarkers(true)
+ .withMarkersType(MarkerType.DIRECT.name())
+ .withWriteTableVersion(HoodieTableVersion.SIX.versionCode())
+
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build())
+
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(0).build())
+ .build();
+
+ HoodieTestDataGenerator.writePartitionMetadataDeprecated(
+ storage, new String[] {DEFAULT_FIRST_PARTITION_PATH}, basePath);
+ FileSystem fs = (FileSystem) storage.getFileSystem();
+ SparkRDDWriteClient client = getHoodieWriteClient(cfg);
+
+ // Write 1: Initial inserts
+ String commitTime1 = "001";
+ WriteClientTestUtils.startCommitWithTime(client, commitTime1);
+ List<HoodieRecord> records =
dataGen.generateInsertsForPartition(commitTime1, 100,
DEFAULT_FIRST_PARTITION_PATH);
+ JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+ List<WriteStatus> statusList = client.upsert(writeRecords,
commitTime1).collect();
+ Assertions.assertNoWriteErrors(statusList);
+ client.commit(commitTime1, jsc.parallelize(statusList));
+
+ // Write 2: Updates to same partition to create log files. Use multiple
Spark partitions to
+ // exercise multiple task contexts (so write tokens vary across tasks).
+ String commitTime2 = "002";
+ WriteClientTestUtils.startCommitWithTime(client, commitTime2);
+ List<HoodieRecord> updateRecords = dataGen.generateUpdates(commitTime2,
records);
+ writeRecords = jsc.parallelize(updateRecords, 2);
+ statusList = client.upsert(writeRecords, commitTime2).collect();
+ Assertions.assertNoWriteErrors(statusList);
+ // Intentionally leave commit 002 in inflight state so rollback exercises
the inflight path.
+
+ HoodieTable table = this.getHoodieTable(metaClient, cfg);
+ Map<String, List<String>> logFileNames = collectLogFileNamesByFileId(fs,
DEFAULT_FIRST_PARTITION_PATH);
+ assertFalse(logFileNames.isEmpty());
+
+ // Backup commit 002 timeline files + marker dir so the rollback retry
below can replay the same input.
+ Path commit2RequestedPath = new Path(metaClient.getMetaPath().toString(),
+ commitTime2 + HoodieTimeline.REQUESTED_DELTA_COMMIT_EXTENSION);
+ Path commit2InflightPath = new Path(metaClient.getMetaPath().toString(),
+ commitTime2 + HoodieTimeline.INFLIGHT_DELTA_COMMIT_EXTENSION);
+ Path commit2MarkerDir = new
Path(metaClient.getMarkerFolderPath(commitTime2));
+ Path backupDir = new Path(basePath, ".backup_test");
+ Path backupMarkerDir = new Path(backupDir, commitTime2);
+ fs.mkdirs(backupDir);
+
+ boolean requestedExists = fs.exists(commit2RequestedPath);
+ boolean inflightExists = fs.exists(commit2InflightPath);
+ boolean markerDirExists = fs.exists(commit2MarkerDir);
+
+ if (requestedExists) {
+ FileUtil.copy(fs, commit2RequestedPath, fs,
+ new Path(backupDir, commitTime2 +
HoodieTimeline.REQUESTED_DELTA_COMMIT_EXTENSION),
+ false, fs.getConf());
+ }
Review Comment:
🤖 I think this is indexing in the wrong direction. `FileSlice.logFiles` is a
`TreeSet` constructed with `getReverseLogFileComparator()`, so the stream emits
log files from highest-version (the rollback log) to lowest-version (the
original log from commit 002). `FileSlice.getLatestLogFile()` confirms this —
it uses `stream().findFirst()`. So `logFiles.get(logFiles.size() - 1)` here
returns the *original* commit-002 log file (which was already written by Spark
with a per-task token), not the rollback log file. The subsequent
`assertNotEquals("1-0-1", writeToken)` would pass even if the fix weren't
applied. Could you switch to `logFiles.get(0)` to actually validate the
rollback log's write token?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]