aiborodin commented on code in PR #16011:
URL: https://github.com/apache/iceberg/pull/16011#discussion_r3121044384
##########
flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java:
##########
@@ -326,6 +326,112 @@ void testSkipsCommitRequestsForPreviousCheckpoints()
throws Exception {
.build());
}
+ @Test
+ void testDedupsCommittablesTaggedWithPreviousJobIdAfterRestart() throws
Exception {
Review Comment:
How about renaming this test to match the intended restart scenario?
`testSkipsAlreadyCommittedDataAfterJobIdChanges` or similar.
##########
flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java:
##########
@@ -326,6 +326,112 @@ void testSkipsCommitRequestsForPreviousCheckpoints()
throws Exception {
.build());
}
+ @Test
+ void testDedupsCommittablesTaggedWithPreviousJobIdAfterRestart() throws
Exception {
Review Comment:
Could we please add an integration test reproducing this bug? It should be
possible using committer hooks, such as in
`TestDynamicIcebergSink.testCommitsOnceWhenConcurrentDuplicateCommit()`.
##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java:
##########
@@ -119,28 +119,28 @@ public void
commit(Collection<CommitRequest<DynamicCommittable>> commitRequests)
DynamicWriteResultAggregator. Iceberg 1.12 will remove this, and users
should upgrade to the 1.11 release first
to migrate their state to a single commit request per checkpoint.
*/
- Map<TableKey, NavigableMap<Long, List<CommitRequest<DynamicCommittable>>>>
commitRequestMap =
- Maps.newHashMap();
+ Map<CommitGroupKey, NavigableMap<Long,
List<CommitRequest<DynamicCommittable>>>>
+ commitRequestMap = Maps.newHashMap();
for (CommitRequest<DynamicCommittable> request : commitRequests) {
NavigableMap<Long, List<CommitRequest<DynamicCommittable>>> committables
=
commitRequestMap.computeIfAbsent(
- new TableKey(request.getCommittable()), unused ->
Maps.newTreeMap());
+ new CommitGroupKey(request.getCommittable()), unused ->
Maps.newTreeMap());
committables
.computeIfAbsent(request.getCommittable().checkpointId(), unused ->
Lists.newArrayList())
.add(request);
}
- for (Map.Entry<TableKey, NavigableMap<Long,
List<CommitRequest<DynamicCommittable>>>> entry :
Review Comment:
Let's keep the `TableKey` here and make the `CommitGroupKey` group by
`jobId` and `operatorId`. It would allow us to load a table and its ancestors
only once rather than for each (jobId, operatorId) pair. Also, the code would
be a bit cleaner in terms of the intended behaviour.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]