DaisyModi opened a new pull request, #4188: URL: https://github.com/apache/gobblin/pull/4188
### Dear Gobblin maintainers, Please accept this PR. I understand that until you sign your ICLA, we cannot actually accept your PR. - My name is Daisy Modi - I work for LinkedIn - I have signed the ICLA ### JIRA - Filing a GOBBLIN ticket as a follow-up — happy to update the PR title/description with the ticket number once filed. ### Description of PR GaaS depends on several long-lived worker threads that are submitted fire-and-forget to executors. On any uncaught exception they exit silently, leaving the service in a "server up, worker down" state — no alert, no recovery until manual restart. This PR adds a `SupervisedRunnable` utility and wires it into the three vulnerable sites so worker threads respawn on uncaught exceptions. **Three sites fixed:** 1. `HighLevelConsumer` — the Kafka poll loop (factored out into `startConsumerPollLoop()` so subclasses overriding `startUp()` can reuse it). 2. `HighLevelConsumer` — each per-partition `QueueProcessor`. 3. `DagProcessingEngine` — each `DagProcEngineThread`. `DagActionStoreChangeMonitor.setActive()` now calls `startConsumerPollLoop()` instead of inlining the raw `while(!shutdownRequested) consume()` lambda, so it benefits from the supervisor too. **Watermark semantics preserved.** GOBBLIN-2177's behavior is unchanged: when auto-commit is off and `processMessage` throws, the `QueueProcessor` still dies (so the failed record's offset is not committed to `partitionOffsetsToCommit`). The supervisor replaces the dead processor with a fresh one that drains the same queue — no blanket catch-all, no silent offset advancement. Uncommitted offsets get re-delivered by Kafka on consumer rebalance/restart. **Supervisor behavior:** - Catches any `Throwable` escaping the wrapped runnable. - Exponential backoff (1s -> 60s cap) between respawns. - Resets the failure counter once the runnable survives longer than `maxBackoffMillis` (treats it as recovered from a transient error). - Circuit-breaks after 10 consecutive immediate failures and fires an `onUnrecoverable` callback — the paired metric should be wired to on-call paging so the service can be manually investigated. - Clean exits and interruption during shutdown terminate the supervisor without respawning. - Uses caller-supplied callbacks for metric emission so the utility stays free of metrics dependencies. **New metrics in `RuntimeMetrics`:** - `gobblin.kafka.highLevelConsumer.pollThreadRespawns` - `gobblin.kafka.highLevelConsumer.queueThreadRespawns` - `gobblin.kafka.highLevelConsumer.threadUnrecoverable` — wire to paging - `gobblin.service.dagProcessingEngine.threadRespawns` - `gobblin.service.dagProcessingEngine.threadUnrecoverable` — wire to paging ### Tests - `SupervisedRunnableTest` — 5 unit tests covering respawn, circuit-break, clean exit, interrupt-during-backoff, and failure-counter reset. - `HighLevelConsumerTest#testQueueProcessorRespawnsAfterExceptionAutoCommitDisabled` — integration test verifying the supervisor respawns a `QueueProcessor` after a `processMessage` exception with `auto-commit=off` and that subsequent records flow through. ### Commits - Squashed into one commit. ### Documentation and Formatting - No new user-visible config. - All additions follow existing package and style conventions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
