laskoviymishka commented on code in PR #16434:
URL: https://github.com/apache/iceberg/pull/16434#discussion_r3302257622


##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java:
##########
@@ -150,16 +152,33 @@ protected boolean receive(Envelope envelope) {
   private void commit(boolean partialCommit) {
     try {
       doCommit(partialCommit);
-    } catch (RuntimeException e) {
+      if (!partialCommit) {
+        consecutiveCommitFailures = 0;
+      }
+    } catch (CommitFailedException e) {

Review Comment:
   I think this catch is going to miss in production, and the retry feature 
will silently do nothing.
   
   The mechanism: `doCommit` runs the per-table commits through 
`Tasks.foreach(...).executeWith(exec).stopOnFailure().run(...)`. Once 
`commitThreads > 1` (the production default is `availableProcessors() * 2`) 
`Tasks` takes the parallel path. A `CommitFailedException` thrown inside the 
submitted `Runnable` comes back from `Future.get()` wrapped in an 
`ExecutionException`; `Tasks.waitFor` adds that to its suppressed list; 
`ExceptionUtil.castAndThrow` then rewraps it as a plain `RuntimeException` 
because `ExecutionException` is checked. So what reaches `commit()` is:
   
   ```
   RuntimeException
     └─ caused by ExecutionException
          └─ caused by CommitFailedException
   ```
   
   The new `catch (CommitFailedException e)` only matches if the thrown object 
IS-A `CommitFailedException` — Java's catch doesn't peer into causes. So on 
every multi-threaded deployment: the catch is skipped, 
`consecutiveCommitFailures` never increments, the coordinator terminates on the 
first transient blip, and the configured `max-consecutive-failures` does 
nothing. The single-threaded test fixture (`ChannelTestBase` mocks 
`commitThreads=1`) takes the inline `runSingleThreaded` path where no wrapping 
happens, so CI passes green while production is broken.
   
   The fix that survives the wrapping is to keep the catch wide and walk the 
cause chain:
   
   ```java
   } catch (RuntimeException e) {
     if (partialCommit) {
       LOG.warn("Partial commit {} failed for task {}, will retry",
           commitState.currentCommitId(), taskId, e);
       return;
     }
   
     if (!isCommitFailedException(e)) {
       // CommitStateUnknownException, ValidationException, ForbiddenException,
       // NPE, anything else — not retryable, terminate immediately
       throw e;
     }
   
     consecutiveCommitFailures++;
     if (consecutiveCommitFailures >= config.commitMaxConsecutiveFailures()) {
       LOG.error("Commit {} failed for task {} ({} consecutive failures, 
terminating)",
           commitState.currentCommitId(), taskId, consecutiveCommitFailures, e);
       throw e;
     }
     LOG.warn("Commit {} failed for task {} ({} consecutive failures, will 
retry)",
         commitState.currentCommitId(), taskId, consecutiveCommitFailures, e);
   }
   
   private static boolean isCommitFailedException(Throwable t) {
     while (t != null) {
       if (t instanceof CommitFailedException) {
         return true;
       }
       t = t.getCause();
     }
     return false;
   }
   ```
   
   One change, two things land:
   
   1. The retry path actually engages in production (multi-threaded 
`CommitFailedException` is unwrapped and counted).
   2. The wide catch restores the safety net on the partial-commit path that 
the narrowing accidentally removed — anything weird during a partial commit 
(NPE, `UncheckedIOException`, `ConnectException` from the offset write) gets 
logged and retries next cycle, instead of tearing down the task on first 
occurrence.
   
   Worth adding one test with `commitThreads >= 2` that throws 
`CommitFailedException` and asserts the counter still increments through the 
parallel path — otherwise this exact bug stays invisible to CI. wdyt?



##########
kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java:
##########
@@ -170,6 +174,103 @@ public void testCommitFailedExceptionPropagates() {
         .hasMessageContaining("Glue detected concurrent update");
   }
 
+  @Test
+  public void testCommitBoundedRetry() {
+    when(config.commitMaxConsecutiveFailures()).thenReturn(3);
+    when(config.commitIntervalMs()).thenReturn(0);
+    when(config.commitTimeoutMs()).thenReturn(Integer.MAX_VALUE);
+
+    Table spiedTable = spy(table);
+    AppendFiles spiedAppend = spy(table.newAppend());
+    doThrow(new CommitFailedException("transient 
error")).when(spiedAppend).commit();
+    when(spiedTable.newAppend()).thenReturn(spiedAppend);
+    when(catalog.loadTable(TABLE_IDENTIFIER)).thenReturn(spiedTable);
+
+    SinkTaskContext context = mock(SinkTaskContext.class);
+    Coordinator coordinator =
+        new Coordinator(catalog, config, ImmutableList.of(), clientFactory, 
context);
+    coordinator.start();
+    initConsumer();
+
+    // first two failures should not throw
+    triggerCommitCycle(coordinator);
+    triggerCommitCycle(coordinator);
+
+    // third consecutive failure should terminate
+    assertThatThrownBy(() -> triggerCommitCycle(coordinator))
+        .isInstanceOf(CommitFailedException.class)
+        .hasMessageContaining("transient error");
+  }
+
+  @Test
+  public void testCommitCounterResetsOnSuccess() {
+    when(config.commitMaxConsecutiveFailures()).thenReturn(3);
+    when(config.commitIntervalMs()).thenReturn(0);
+    when(config.commitTimeoutMs()).thenReturn(Integer.MAX_VALUE);
+
+    Table spiedTable = spy(table);
+    AppendFiles failingAppend = spy(table.newAppend());
+    doThrow(new CommitFailedException("transient 
error")).when(failingAppend).commit();
+
+    // fail, fail, succeed, fail, fail — should NOT terminate
+    when(spiedTable.newAppend())
+        .thenReturn(failingAppend)
+        .thenReturn(failingAppend)
+        .thenCallRealMethod()

Review Comment:
   I don't think this test actually exercises the reset.
   
   Cycle 3 calls the real `newAppend()` and writes a real snapshot to the 
in-memory table. From cycle 4 onward, `lastCommittedOffsetsForTable` sees the 
test's data files as already-committed and filters them out, so `dataFiles` is 
empty, `doCommit` produces no Iceberg operation, and `failingAppend.commit()` 
is never reached. The counter doesn't increment in cycles 4–5 — not because 
reset works, but because there's nothing left to fail on. There's also no 
explicit assertion; "didn't throw across 5 cycles" is the only signal.
   
   I'd either drop `thenCallRealMethod()` for a clean mock that returns cleanly 
without writing a snapshot, or advance the offset above the committed watermark 
each cycle so 4–5 still hit `failingAppend.commit()`. Then an explicit 
`assertThat(consecutiveCommitFailures).isZero()` after cycle 3 (or a sixth 
cycle that DOES throw to prove the counter actually restarted from zero) would 
pin the invariant.
   
   This one matters because once the catch fix above lands, this is the test 
that proves it works — it needs to actually fail when the reset is broken.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to