mxm commented on code in PR #15913:
URL: https://github.com/apache/iceberg/pull/15913#discussion_r3066078886


##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java:
##########
@@ -423,21 +423,53 @@ public void endInput() throws IOException {
   }
 
   private void writeToManifestUptoLatestCheckpoint(long checkpointId) throws 
IOException {
-    if (!writeResultsSinceLastSnapshot.containsKey(checkpointId)) {
+
+    if (!dataFilesPerCheckpoint.containsKey(checkpointId)) {
       dataFilesPerCheckpoint.put(checkpointId, EMPTY_MANIFEST_DATA);
     }
 
-    for (Map.Entry<Long, List<WriteResult>> writeResultsOfCheckpoint :
-        writeResultsSinceLastSnapshot.entrySet()) {
-      dataFilesPerCheckpoint.put(
-          writeResultsOfCheckpoint.getKey(),
-          writeToManifest(writeResultsOfCheckpoint.getKey(), 
writeResultsOfCheckpoint.getValue()));
+    Map<Long, List<WriteResult>> pendingWriteResults = Maps.newHashMap();
+    for (Map.Entry<Long, List<WriteResult>> entry : 
writeResultsSinceLastSnapshot.entrySet()) {
+      long assignedCheckpointId = computeCheckpointId(checkpointId, entry);
+      pendingWriteResults
+          .computeIfAbsent(assignedCheckpointId, k -> Lists.newArrayList())
+          .addAll(entry.getValue());
+    }
+

Review Comment:
   When post-barrier `WriteResult`s include equality deletes (V2 tables), they 
get merged with the next checkpoint's data and committed in a single 
`RowDelta`. This changes the sequence number of those deletes compared to 
aligned mode. The behavior should still be correct (deletes end up with a 
higher sequence and still apply to earlier data), but it would be good to have 
an explicit test for this scenario, e.g. `testPostBarrierEqualityDeleteFiles` 
with format v2.
   
   Also: since `commitDeltaTxn` commits each checkpoint's `RowDelta` separately 
(#10526), post-barrier equality deletes that get redirected here end up in the 
same `RowDelta` as the current checkpoint's data+deletes. Worth confirming this 
doesn't cause duplication issues similar to the ones #10526 fixed.



##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java:
##########
@@ -423,21 +423,53 @@ public void endInput() throws IOException {
   }
 
   private void writeToManifestUptoLatestCheckpoint(long checkpointId) throws 
IOException {
-    if (!writeResultsSinceLastSnapshot.containsKey(checkpointId)) {
+

Review Comment:
   Nit: remove the blank line after the opening brace (inconsistent with the 
rest of the file).



##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java:
##########
@@ -423,21 +423,53 @@ public void endInput() throws IOException {
   }
 
   private void writeToManifestUptoLatestCheckpoint(long checkpointId) throws 
IOException {
-    if (!writeResultsSinceLastSnapshot.containsKey(checkpointId)) {
+
+    if (!dataFilesPerCheckpoint.containsKey(checkpointId)) {
       dataFilesPerCheckpoint.put(checkpointId, EMPTY_MANIFEST_DATA);
     }
 
-    for (Map.Entry<Long, List<WriteResult>> writeResultsOfCheckpoint :
-        writeResultsSinceLastSnapshot.entrySet()) {
-      dataFilesPerCheckpoint.put(
-          writeResultsOfCheckpoint.getKey(),
-          writeToManifest(writeResultsOfCheckpoint.getKey(), 
writeResultsOfCheckpoint.getValue()));
+    Map<Long, List<WriteResult>> pendingWriteResults = Maps.newHashMap();
+    for (Map.Entry<Long, List<WriteResult>> entry : 
writeResultsSinceLastSnapshot.entrySet()) {
+      long assignedCheckpointId = computeCheckpointId(checkpointId, entry);
+      pendingWriteResults
+          .computeIfAbsent(assignedCheckpointId, k -> Lists.newArrayList())
+          .addAll(entry.getValue());
+    }
+
+    for (Map.Entry<Long, List<WriteResult>> entry : 
pendingWriteResults.entrySet()) {
+      dataFilesPerCheckpoint.put(entry.getKey(), 
writeToManifest(entry.getKey(), entry.getValue()));
     }
 
     // Clear the local buffer for current checkpoint.
     writeResultsSinceLastSnapshot.clear();
   }
 
+  /**
+   * in case of unaligned checkpoints, data files that were part of checkpoint 
N in the writer may
+   * have to become part of a later checkpoint in the committer if:
+   *
+   * <ul>
+   *   <li>previous files were already committed for checkpoint N. We have to 
keep the manifests for
+   *       new files under a later key, otherwise they are discarded during 
recovery after a crash
+   *   <li>we already have a manifest of files to be committed for checkpoint 
N, even though it
+   *       might not have been committed yet. In this case, we must not 
overwrite the manifests we
+   *       already have, and we must keep them consistent with our checkpoint
+   * </ul>
+   */
+  private long computeCheckpointId(long checkpointId, Map.Entry<Long, 
List<WriteResult>> entry) {
+    long sourceCheckpointId = entry.getKey();
+
+    boolean sourceCheckpointIdAlreadyCommitted = sourceCheckpointId <= 
maxCommittedCheckpointId;
+    boolean sourceCheckpointIdHasDataInSnapshot =
+        dataFilesPerCheckpoint.containsKey(sourceCheckpointId);
+    // for aligned checkpoints, both conditions will be false and the upstream 
operator's checkpoint
+    // ID
+    // will be chosen.
+    return sourceCheckpointIdAlreadyCommitted || 
sourceCheckpointIdHasDataInSnapshot

Review Comment:
   Nit: reflow the comment to avoid the orphaned `// ID` line:
   ```suggestion
       // For aligned checkpoints, both conditions will be false and the 
upstream operator's
       // checkpoint ID will be chosen.
   ```



##########
flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java:
##########
@@ -1076,6 +1076,220 @@ public void testSpecEvolution() throws Exception {
     }
   }
 
+  /**
+   * (unaligned checkpoints) With unaligned checkpoints a writer subtask may 
deliver its write
+   * result after {@code snapshotState(N)} has already fired. ("post-barrier 
data"). This test
+   * verifies that post-barrier data for a checkpoint that never completes is 
not lost, it must be
+   * committed together with the next successful checkpoint. Also covers the 
case where the
+   * successful checkpoint itself has post-barrier data, which must then wait 
for the checkpoint
+   * after that.
+   *
+   * <pre>
+   *   processElement(dataA, checkpointId=1)
+   *   snapshotState(1)
+   *   processElement(dataB, checkpointId=1)
+   *   // checkpoint 1 never completes
+   *   processElement(dataC, checkpointId=2)
+   *   snapshotState(2)
+   *   processElement(dataD, checkpointId=2)
+   *   notifyCheckpointComplete(2)   // commits dataA, dataB, dataC
+   *   snapshotState(3)
+   *   notifyCheckpointComplete(3)   // commits dataD
+   * </pre>
+   */
+  @TestTemplate
+  public void testPostBarrierDataSurvivesFailedCheckpoint() throws Exception {
+    long timestamp = 0;
+    JobID jobId = new JobID();
+    OperatorID operatorId;
+    try (OneInputStreamOperatorTestHarness<FlinkWriteResult, Void> harness =
+        createStreamSink(jobId)) {
+      harness.setup();
+      harness.open();
+      operatorId = harness.getOperator().getOperatorID();
+
+      assertSnapshotSize(0);
+      assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
+
+      RowData rowA = SimpleDataUtil.createRowData(1, "early-checkpoint1");
+      DataFile dataFileA = writeDataFile("data-A", ImmutableList.of(rowA));
+      RowData rowB = SimpleDataUtil.createRowData(2, 
"post-barrier-checkpoint1");
+      DataFile dataFileB = writeDataFile("data-B", ImmutableList.of(rowB));
+      RowData rowC = SimpleDataUtil.createRowData(3, "early-checkpoint2");
+      DataFile dataFileC = writeDataFile("data-C", ImmutableList.of(rowC));
+      RowData rowD = SimpleDataUtil.createRowData(4, 
"post-barrier-checkpoint2");
+      DataFile dataFileD = writeDataFile("data-D", ImmutableList.of(rowD));
+
+      long checkpoint1 = 1;
+      long checkpoint2 = 2;
+      long checkpoint3 = 3;
+
+      harness.processElement(of(checkpoint1, dataFileA), ++timestamp);
+      harness.snapshot(checkpoint1, ++timestamp);
+      assertFlinkManifests(1);
+
+      // post-barrier, arrives after snapshotState(1); checkpoint1 then FAILS 
(no notify)
+      harness.processElement(of(checkpoint1, dataFileB), ++timestamp);
+
+      harness.processElement(of(checkpoint2, dataFileC), ++timestamp);
+      harness.snapshot(checkpoint2, ++timestamp);

Review Comment:
   Nit: existing tests consistently verify manifest counts after `snapshot()` 
and `notifyOfCompletedCheckpoint()`. Consider adding `assertFlinkManifests(2)` 
here (one manifest from checkpoint 1's snapshot, one new manifest for 
checkpoint 2) for consistency.



##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java:
##########
@@ -423,21 +423,53 @@ public void endInput() throws IOException {
   }
 
   private void writeToManifestUptoLatestCheckpoint(long checkpointId) throws 
IOException {
-    if (!writeResultsSinceLastSnapshot.containsKey(checkpointId)) {
+
+    if (!dataFilesPerCheckpoint.containsKey(checkpointId)) {
       dataFilesPerCheckpoint.put(checkpointId, EMPTY_MANIFEST_DATA);
     }
 
-    for (Map.Entry<Long, List<WriteResult>> writeResultsOfCheckpoint :
-        writeResultsSinceLastSnapshot.entrySet()) {
-      dataFilesPerCheckpoint.put(
-          writeResultsOfCheckpoint.getKey(),
-          writeToManifest(writeResultsOfCheckpoint.getKey(), 
writeResultsOfCheckpoint.getValue()));
+    Map<Long, List<WriteResult>> pendingWriteResults = Maps.newHashMap();
+    for (Map.Entry<Long, List<WriteResult>> entry : 
writeResultsSinceLastSnapshot.entrySet()) {
+      long assignedCheckpointId = computeCheckpointId(checkpointId, entry);
+      pendingWriteResults
+          .computeIfAbsent(assignedCheckpointId, k -> Lists.newArrayList())
+          .addAll(entry.getValue());
+    }
+
+    for (Map.Entry<Long, List<WriteResult>> entry : 
pendingWriteResults.entrySet()) {
+      dataFilesPerCheckpoint.put(entry.getKey(), 
writeToManifest(entry.getKey(), entry.getValue()));
     }
 
     // Clear the local buffer for current checkpoint.
     writeResultsSinceLastSnapshot.clear();
   }
 
+  /**
+   * in case of unaligned checkpoints, data files that were part of checkpoint 
N in the writer may

Review Comment:
   Nit: Javadoc should start with a capital letter.
   ```suggestion
      * In case of unaligned checkpoints, data files that were part of 
checkpoint N in the writer may
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to