lrsb commented on code in PR #16011:
URL: https://github.com/apache/iceberg/pull/16011#discussion_r3117448212


##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java:
##########
@@ -128,8 +159,8 @@ public void prepareSnapshotPreBarrier(long checkpointId) 
throws IOException {
               new DynamicCommittable(
                   entries.getKey(),
                   writeToManifests(entries.getKey().tableName(), 
entries.getValue(), checkpointId),
-                  getContainingTask().getEnvironment().getJobID().toString(),
-                  getRuntimeContext().getOperatorUniqueID(),
+                  flinkJobId,
+                  operatorId,
                   checkpointId),

Review Comment:
   Refactored the fix, I do agree with you that prev implementation was not 
addressing the underlying issue, but I still believe that `DynamicCommitter` 
logic is faulty.
   
   More specifically we check last checkpoint id 
with`getMaxCommittedCheckpointId` but only against the last entry in the map 
which might contains multiple jobids making the re-commit of the same 
committable possible.
   
   I've also added a test that reproduces the issue and fails in the prev 
implementation.
   
   Let me know your thoughts!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to