aiborodin commented on code in PR #16011:
URL: https://github.com/apache/iceberg/pull/16011#discussion_r3121044384


##########
flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java:
##########
@@ -326,6 +326,112 @@ void testSkipsCommitRequestsForPreviousCheckpoints() 
throws Exception {
                 .build());
   }
 
+  @Test
+  void testDedupsCommittablesTaggedWithPreviousJobIdAfterRestart() throws 
Exception {

Review Comment:
   How about renaming this test to match the intended restart scenario?
   `testSkipsAlreadyCommittedDataAfterJobIdChanges` or similar.



##########
flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java:
##########
@@ -326,6 +326,112 @@ void testSkipsCommitRequestsForPreviousCheckpoints() 
throws Exception {
                 .build());
   }
 
+  @Test
+  void testDedupsCommittablesTaggedWithPreviousJobIdAfterRestart() throws 
Exception {

Review Comment:
   Could we please add an integration test reproducing this bug? It should be 
possible using committer hooks, such as in 
`TestDynamicIcebergSink.testCommitsOnceWhenConcurrentDuplicateCommit()`.



##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java:
##########
@@ -119,28 +119,28 @@ public void 
commit(Collection<CommitRequest<DynamicCommittable>> commitRequests)
       DynamicWriteResultAggregator. Iceberg 1.12 will remove this, and users 
should upgrade to the 1.11 release first
       to migrate their state to a single commit request per checkpoint.
     */
-    Map<TableKey, NavigableMap<Long, List<CommitRequest<DynamicCommittable>>>> 
commitRequestMap =
-        Maps.newHashMap();
+    Map<CommitGroupKey, NavigableMap<Long, 
List<CommitRequest<DynamicCommittable>>>>
+        commitRequestMap = Maps.newHashMap();
     for (CommitRequest<DynamicCommittable> request : commitRequests) {
       NavigableMap<Long, List<CommitRequest<DynamicCommittable>>> committables 
=
           commitRequestMap.computeIfAbsent(
-              new TableKey(request.getCommittable()), unused -> 
Maps.newTreeMap());
+              new CommitGroupKey(request.getCommittable()), unused -> 
Maps.newTreeMap());
       committables
           .computeIfAbsent(request.getCommittable().checkpointId(), unused -> 
Lists.newArrayList())
           .add(request);
     }
 
-    for (Map.Entry<TableKey, NavigableMap<Long, 
List<CommitRequest<DynamicCommittable>>>> entry :

Review Comment:
   Let's keep the `TableKey` here and make the `CommitGroupKey` group by 
`jobId` and `operatorId`. It would allow us to load a table and its ancestors 
only once rather than for each (jobId, operatorId) pair. Also, the code would 
be a bit cleaner in terms of the intended behaviour.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to