danny0405 commented on code in PR #19023:
URL: https://github.com/apache/hudi/pull/19023#discussion_r3425167181


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java:
##########
@@ -141,6 +144,28 @@ public static Option<Pair<HoodieInstant, Map<String, 
String>>> getLastCompletedT
     return getHoodieInstantAndMetaDataPair(metaClient, hoodieInstantOption);
   }
 
+  /**
+   * Get the last completed transaction hoodie instant before the given 
instant time.
+   * The returned instant has both requested time and completion time less 
than the given instant time,
+   * ensuring it was fully completed before the given instant was created.
+   *
+   * @param metaClient table meta client
+   * @param currentInstantTime the requested time of the current inflight 
instant
+   * @return the last completed instant before the given instant, with its 
extra metadata
+   */
+  public static Option<Pair<HoodieInstant, Map<String, String>>> 
getLastCompletedTxnInstantAndMetadata(
+      HoodieTableMetaClient metaClient, String currentInstantTime) {
+    Option<HoodieInstant> hoodieInstantOption = Option.fromJavaOptional(
+        metaClient.getActiveTimeline().getCommitsTimeline()
+            .filterCompletedInstants()
+            .findInstantsBefore(currentInstantTime)
+            .getInstantsAsStream()
+            .filter(instant -> instant.getCompletionTime() != null
+                && compareTimestamps(instant.getCompletionTime(), LESSER_THAN, 
currentInstantTime))
+            .max(Comparator.comparing(HoodieInstant::getCompletionTime)));

Review Comment:
   This should probably choose the max `requestedTime`, not max 
`completionTime`. The OCC strategy later uses 
`findInstantsAfter(lastSuccessful.requestedTime())` as the candidate cutoff, so 
choosing an older slow commit by completion time can still leave newer, 
already-completed-before-current instants in the candidate set. For example, if 
A has req=T1/complete=T5 and B has req=T2/complete=T3, and both complete before 
current=T6, this picks A and conflict resolution still checks B even though B 
was not concurrent with the recommit. Picking the latest requested instant that 
also satisfies completionTime < currentInstantTime seems to match the cutoff 
semantics.



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java:
##########
@@ -547,6 +547,11 @@ private boolean recommitInstant(HoodieTimeline 
completedTimeline, long checkpoin
       if (writeClient.getConfig().getFailedWritesCleanPolicy().isLazy()) {
         writeClient.getHeartbeatClient().start(instant);
       }
+      // Initialize the transaction state so that OCC conflict resolution uses 
the correct
+      // baseline: the last completed instant before this inflight instant was 
created.
+      // Without this, lastCompletedTxnAndMetadata is empty and conflict 
resolution checks
+      // against all completed instants on the timeline, causing false 
conflicts.
+      writeClient.preTxnForRecommit(tableState.operationType, this.metaClient, 
instant);

Review Comment:
   This still uses the coordinator metaClient/timeline captured before the 
restore loop. `restoreEvents()` builds one `completedTimeline` and then 
recommits every buffered checkpoint against it; after the first recommit 
succeeds, this metaClient is not reloaded until after the whole loop. If there 
are multiple recovered checkpoint buffers, `preTxnForRecommit(...)` for the 
second instant may not see the first instant as completed, so OCC can treat 
that just-recommitted instant as a conflict candidate and fail with the same 
kind of false positive. Can we reload the active timeline before each recommit, 
or move the reload into this path before computing the completed timeline and 
initializing `preTxnForRecommit`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to