agam-99 opened a new pull request, #4190:
URL: https://github.com/apache/gobblin/pull/4190
## Problem
When the `DagProcessingEngine` picks up a LAUNCH `DagAction` and routes it
through `LaunchDagProc` for compilation and submission, downstream
`SpecProducer` implementations have no way to know when the action was
originally inserted in the `dag_action` table. This is the natural anchor for
measuring end-to-end "user request to job submitted" latency in a
multi-instance GaaS deployment, where one instance may insert the action and a
different instance may process it.
The MySQL row already carries this information in its `modified_time` column
(set server-side at INSERT, never UPDATEd in practice — the `ON UPDATE
CURRENT_TIMESTAMP` clause is inert because no UPDATE statement targets this
table), but it is not surfaced anywhere on the in-memory model.
## Change
This PR makes the insert timestamp available to downstream code via three
small, additive, generically-useful pieces:
1. `DagActionStore.getDagActionInsertTimeMillis(DagAction)` — new default
interface method returning `Optional<Long>`. `MysqlDagActionStore` implements
it via `SELECT UNIX_TIMESTAMP(modified_time) * 1000`. Other implementations
(e.g., test mocks) keep working without change because the default returns
`Optional.empty()`. Mirrored on `DagManagementStateStore` for callers that go
through the higher-level abstraction.
2. `LaunchDagProc.initialize()` looks up the insert timestamp and stamps it
onto the `FlowSpec` under two new `ConfigurationKeys`:
- `flow.dagAction.insertTimeMillis` (Long)
- `flow.dagAction.flowType` ("SCHEDULED" or "ADHOC", driven by
`FlowSpec.isScheduled()`)
The compilation pipeline naturally propagates these onto each compiled
`JobSpec`, so any `SpecProducer` can read them without reaching back to the
action store.
3. `FlowSpec.addProperty(String, String)` — a String overload of the
existing `addProperty(String, Long)`. Same mutation pattern, keeps `config` and
`configAsProperties` in sync. Filling in a hole in the existing API.
The lookup-and-stamp step is best-effort: any failure (lookup error, missing
row, etc.) is logged and swallowed so the LAUNCH path is unaffected.
## Why this is OSS-appropriate
The exposure is fully generic — anyone running `MysqlDagActionStore` may
want to know when a row was inserted, and anyone writing a `SpecProducer` may
want to compute submit latency. There is no organization-specific code in this
PR. Downstream metric emission lives outside this repo and consumes only
public, framework-level config keys.
## Tests
- `MysqlDagActionStoreTest`:
- Added `testGetDagActionInsertTimeMillisReturnsTimestampForExistingRow` —
inserts a fresh row, asserts the returned timestamp falls within
`[insertWallClock - 1s, insertWallClock + 1s]` (MySQL `TIMESTAMP` is
second-precision).
- Added `testGetDagActionInsertTimeMillisReturnsEmptyForMissingRow` —
asserts `Optional.empty()` for a key that is not present.
- All existing tests continue to pass: `./gradlew :gobblin-service:test
--tests "MysqlDagActionStoreTest"` reports 8/8 passing.
- `FlowSpec.addProperty` String overload covered by the existing
`FlowSpecTest::testAddProperty`.
- Verified that the LAUNCH path still compiles and the new
`stampDagActionContextOnFlowSpec` helper degrades gracefully when the
underlying store cannot supply a timestamp.
## Compatibility
- No schema change.
- No breaking changes — all new methods have `default` implementations on
interfaces.
- No behavior change when the new `ConfigurationKeys` are not consumed;
downstream code that does not read them is unaffected.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]