DaisyModi opened a new pull request, #4188:
URL: https://github.com/apache/gobblin/pull/4188

   ### Dear Gobblin maintainers,
   
   Please accept this PR. I understand that until you sign your ICLA, we cannot 
actually accept your PR.
   
   - My name is Daisy Modi
   - I work for LinkedIn
   - I have signed the ICLA
   
   ### JIRA
   - Filing a GOBBLIN ticket as a follow-up — happy to update the PR 
title/description with the ticket number once filed.
   
   ### Description of PR
   
   GaaS depends on several long-lived worker threads that are submitted 
fire-and-forget to executors. On any uncaught exception they exit silently, 
leaving the service in a "server up, worker down" state — no alert, no recovery 
until manual restart. This PR adds a `SupervisedRunnable` utility and wires it 
into the three vulnerable sites so worker threads respawn on uncaught 
exceptions.
   
   **Three sites fixed:**
   1. `HighLevelConsumer` — the Kafka poll loop (factored out into 
`startConsumerPollLoop()` so subclasses overriding `startUp()` can reuse it).
   2. `HighLevelConsumer` — each per-partition `QueueProcessor`.
   3. `DagProcessingEngine` — each `DagProcEngineThread`.
   
   `DagActionStoreChangeMonitor.setActive()` now calls 
`startConsumerPollLoop()` instead of inlining the raw 
`while(!shutdownRequested) consume()` lambda, so it benefits from the 
supervisor too.
   
   **Watermark semantics preserved.** GOBBLIN-2177's behavior is unchanged: 
when auto-commit is off and `processMessage` throws, the `QueueProcessor` still 
dies (so the failed record's offset is not committed to 
`partitionOffsetsToCommit`). The supervisor replaces the dead processor with a 
fresh one that drains the same queue — no blanket catch-all, no silent offset 
advancement. Uncommitted offsets get re-delivered by Kafka on consumer 
rebalance/restart.
   
   **Supervisor behavior:**
   - Catches any `Throwable` escaping the wrapped runnable.
   - Exponential backoff (1s -> 60s cap) between respawns.
   - Resets the failure counter once the runnable survives longer than 
`maxBackoffMillis` (treats it as recovered from a transient error).
   - Circuit-breaks after 10 consecutive immediate failures and fires an 
`onUnrecoverable` callback — the paired metric should be wired to on-call 
paging so the service can be manually investigated.
   - Clean exits and interruption during shutdown terminate the supervisor 
without respawning.
   - Uses caller-supplied callbacks for metric emission so the utility stays 
free of metrics dependencies.
   
   **New metrics in `RuntimeMetrics`:**
   - `gobblin.kafka.highLevelConsumer.pollThreadRespawns`
   - `gobblin.kafka.highLevelConsumer.queueThreadRespawns`
   - `gobblin.kafka.highLevelConsumer.threadUnrecoverable` — wire to paging
   - `gobblin.service.dagProcessingEngine.threadRespawns`
   - `gobblin.service.dagProcessingEngine.threadUnrecoverable` — wire to paging
   
   ### Tests
   
   - `SupervisedRunnableTest` — 5 unit tests covering respawn, circuit-break, 
clean exit, interrupt-during-backoff, and failure-counter reset.
   - 
`HighLevelConsumerTest#testQueueProcessorRespawnsAfterExceptionAutoCommitDisabled`
 — integration test verifying the supervisor respawns a `QueueProcessor` after 
a `processMessage` exception with `auto-commit=off` and that subsequent records 
flow through.
   
   ### Commits
   
   - Squashed into one commit.
   
   ### Documentation and Formatting
   
   - No new user-visible config.
   - All additions follow existing package and style conventions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to