danny0405 commented on code in PR #19023:
URL: https://github.com/apache/hudi/pull/19023#discussion_r3433204884


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java:
##########
@@ -603,9 +610,23 @@ private void handleWriteMetaEvent(WriteMetadataEvent 
event) {
    */
   private boolean commitInstants(long checkpointId) {
     // use < instead of <= because the write metadata event sends the last 
known checkpoint id which is smaller than the current one.
-    List<Boolean> result = 
this.eventBuffers.getEventBufferStream().filter(entry -> entry.getKey() < 
checkpointId)
-        .map(entry -> commitInstant(entry.getKey(), 
entry.getValue().getLeft(), 
entry.getValue().getRight())).collect(Collectors.toList());
-    return result.stream().anyMatch(i -> i);
+    List<Map.Entry<Long, Pair<String, EventBuffer>>> entries = 
this.eventBuffers.getEventBufferStream()
+        .filter(entry -> entry.getKey() < checkpointId)
+        .collect(Collectors.toList());
+    boolean anyCommitted = false;
+    for (int i = 0; i < entries.size(); i++) {
+      if (i > 0) {
+        // Refresh the baseline for subsequent instants so that OCC conflict 
resolution
+        // sees the just-committed instant as completed, not as a concurrent 
conflict.
+        this.metaClient.reloadActiveTimeline();
+        this.writeClient.preTxn(tableState.operationType, this.metaClient);

Review Comment:
   this does not really work, the preTxn needs to be invoked before each 
instant starts, so that we can figure out what instants got complted during the 
current write window.
   
   And another problem with Flink OCC is it does not work as expected, even if 
the conflict deteced and we throws, the Flink checkpoint already succeeded, 
there is no way to rollback to the precedding checkpoint by default.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to