laskoviymishka commented on code in PR #16434:
URL: https://github.com/apache/iceberg/pull/16434#discussion_r3302257622
##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java:
##########
@@ -150,16 +152,33 @@ protected boolean receive(Envelope envelope) {
private void commit(boolean partialCommit) {
try {
doCommit(partialCommit);
- } catch (RuntimeException e) {
+ if (!partialCommit) {
+ consecutiveCommitFailures = 0;
+ }
+ } catch (CommitFailedException e) {
Review Comment:
I think this catch is going to miss in production, and the retry feature
will silently do nothing.
The mechanism: `doCommit` runs the per-table commits through
`Tasks.foreach(...).executeWith(exec).stopOnFailure().run(...)`. Once
`commitThreads > 1` (the production default is `availableProcessors() * 2`)
`Tasks` takes the parallel path. A `CommitFailedException` thrown inside the
submitted `Runnable` comes back from `Future.get()` wrapped in an
`ExecutionException`; `Tasks.waitFor` adds that to its suppressed list;
`ExceptionUtil.castAndThrow` then rewraps it as a plain `RuntimeException`
because `ExecutionException` is checked. So what reaches `commit()` is:
```
RuntimeException
└─ caused by ExecutionException
└─ caused by CommitFailedException
```
The new `catch (CommitFailedException e)` only matches if the thrown object
IS-A `CommitFailedException` — Java's catch doesn't peer into causes. So on
every multi-threaded deployment: the catch is skipped,
`consecutiveCommitFailures` never increments, the coordinator terminates on the
first transient blip, and the configured `max-consecutive-failures` does
nothing. The single-threaded test fixture (`ChannelTestBase` mocks
`commitThreads=1`) takes the inline `runSingleThreaded` path where no wrapping
happens, so CI passes green while production is broken.
The fix that survives the wrapping is to keep the catch wide and walk the
cause chain:
```java
} catch (RuntimeException e) {
if (partialCommit) {
LOG.warn("Partial commit {} failed for task {}, will retry",
commitState.currentCommitId(), taskId, e);
return;
}
if (!isCommitFailedException(e)) {
// CommitStateUnknownException, ValidationException, ForbiddenException,
// NPE, anything else — not retryable, terminate immediately
throw e;
}
consecutiveCommitFailures++;
if (consecutiveCommitFailures >= config.commitMaxConsecutiveFailures()) {
LOG.error("Commit {} failed for task {} ({} consecutive failures,
terminating)",
commitState.currentCommitId(), taskId, consecutiveCommitFailures, e);
throw e;
}
LOG.warn("Commit {} failed for task {} ({} consecutive failures, will
retry)",
commitState.currentCommitId(), taskId, consecutiveCommitFailures, e);
}
private static boolean isCommitFailedException(Throwable t) {
while (t != null) {
if (t instanceof CommitFailedException) {
return true;
}
t = t.getCause();
}
return false;
}
```
One change, two things land:
1. The retry path actually engages in production (multi-threaded
`CommitFailedException` is unwrapped and counted).
2. The wide catch restores the safety net on the partial-commit path that
the narrowing accidentally removed — anything weird during a partial commit
(NPE, `UncheckedIOException`, `ConnectException` from the offset write) gets
logged and retries next cycle, instead of tearing down the task on first
occurrence.
Worth adding one test with `commitThreads >= 2` that throws
`CommitFailedException` and asserts the counter still increments through the
parallel path — otherwise this exact bug stays invisible to CI. wdyt?
##########
kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java:
##########
@@ -170,6 +174,103 @@ public void testCommitFailedExceptionPropagates() {
.hasMessageContaining("Glue detected concurrent update");
}
+ @Test
+ public void testCommitBoundedRetry() {
+ when(config.commitMaxConsecutiveFailures()).thenReturn(3);
+ when(config.commitIntervalMs()).thenReturn(0);
+ when(config.commitTimeoutMs()).thenReturn(Integer.MAX_VALUE);
+
+ Table spiedTable = spy(table);
+ AppendFiles spiedAppend = spy(table.newAppend());
+ doThrow(new CommitFailedException("transient
error")).when(spiedAppend).commit();
+ when(spiedTable.newAppend()).thenReturn(spiedAppend);
+ when(catalog.loadTable(TABLE_IDENTIFIER)).thenReturn(spiedTable);
+
+ SinkTaskContext context = mock(SinkTaskContext.class);
+ Coordinator coordinator =
+ new Coordinator(catalog, config, ImmutableList.of(), clientFactory,
context);
+ coordinator.start();
+ initConsumer();
+
+ // first two failures should not throw
+ triggerCommitCycle(coordinator);
+ triggerCommitCycle(coordinator);
+
+ // third consecutive failure should terminate
+ assertThatThrownBy(() -> triggerCommitCycle(coordinator))
+ .isInstanceOf(CommitFailedException.class)
+ .hasMessageContaining("transient error");
+ }
+
+ @Test
+ public void testCommitCounterResetsOnSuccess() {
+ when(config.commitMaxConsecutiveFailures()).thenReturn(3);
+ when(config.commitIntervalMs()).thenReturn(0);
+ when(config.commitTimeoutMs()).thenReturn(Integer.MAX_VALUE);
+
+ Table spiedTable = spy(table);
+ AppendFiles failingAppend = spy(table.newAppend());
+ doThrow(new CommitFailedException("transient
error")).when(failingAppend).commit();
+
+ // fail, fail, succeed, fail, fail — should NOT terminate
+ when(spiedTable.newAppend())
+ .thenReturn(failingAppend)
+ .thenReturn(failingAppend)
+ .thenCallRealMethod()
Review Comment:
I don't think this test actually exercises the reset.
Cycle 3 calls the real `newAppend()` and writes a real snapshot to the
in-memory table. From cycle 4 onward, `lastCommittedOffsetsForTable` sees the
test's data files as already-committed and filters them out, so `dataFiles` is
empty, `doCommit` produces no Iceberg operation, and `failingAppend.commit()`
is never reached. The counter doesn't increment in cycles 4–5 — not because
reset works, but because there's nothing left to fail on. There's also no
explicit assertion; "didn't throw across 5 cycles" is the only signal.
I'd either drop `thenCallRealMethod()` for a clean mock that returns cleanly
without writing a snapshot, or advance the offset above the committed watermark
each cycle so 4–5 still hit `failingAppend.commit()`. Then an explicit
`assertThat(consecutiveCommitFailures).isZero()` after cycle 3 (or a sixth
cycle that DOES throw to prove the counter actually restarted from zero) would
pin the invariant.
This one matters because once the catch fix above lands, this is the test
that proves it works — it needs to actually fail when the reset is broken.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]