agam-99 opened a new pull request, #4190:
URL: https://github.com/apache/gobblin/pull/4190

   ## Problem
   
   When the `DagProcessingEngine` picks up a LAUNCH `DagAction` and routes it 
through `LaunchDagProc` for compilation and submission, downstream 
`SpecProducer` implementations have no way to know when the action was 
originally inserted in the `dag_action` table. This is the natural anchor for 
measuring end-to-end "user request to job submitted" latency in a 
multi-instance GaaS deployment, where one instance may insert the action and a 
different instance may process it.
   
   The MySQL row already carries this information in its `modified_time` column 
(set server-side at INSERT, never UPDATEd in practice — the `ON UPDATE 
CURRENT_TIMESTAMP` clause is inert because no UPDATE statement targets this 
table), but it is not surfaced anywhere on the in-memory model.
   
   ## Change
   
   This PR makes the insert timestamp available to downstream code via three 
small, additive, generically-useful pieces:
   
   1. `DagActionStore.getDagActionInsertTimeMillis(DagAction)` — new default 
interface method returning `Optional<Long>`. `MysqlDagActionStore` implements 
it via `SELECT UNIX_TIMESTAMP(modified_time) * 1000`. Other implementations 
(e.g., test mocks) keep working without change because the default returns 
`Optional.empty()`. Mirrored on `DagManagementStateStore` for callers that go 
through the higher-level abstraction.
   
   2. `LaunchDagProc.initialize()` looks up the insert timestamp and stamps it 
onto the `FlowSpec` under two new `ConfigurationKeys`:
      - `flow.dagAction.insertTimeMillis` (Long)
      - `flow.dagAction.flowType` ("SCHEDULED" or "ADHOC", driven by 
`FlowSpec.isScheduled()`)
   
      The compilation pipeline naturally propagates these onto each compiled 
`JobSpec`, so any `SpecProducer` can read them without reaching back to the 
action store.
   
   3. `FlowSpec.addProperty(String, String)` — a String overload of the 
existing `addProperty(String, Long)`. Same mutation pattern, keeps `config` and 
`configAsProperties` in sync. Filling in a hole in the existing API.
   
   The lookup-and-stamp step is best-effort: any failure (lookup error, missing 
row, etc.) is logged and swallowed so the LAUNCH path is unaffected.
   
   ## Why this is OSS-appropriate
   
   The exposure is fully generic — anyone running `MysqlDagActionStore` may 
want to know when a row was inserted, and anyone writing a `SpecProducer` may 
want to compute submit latency. There is no organization-specific code in this 
PR. Downstream metric emission lives outside this repo and consumes only 
public, framework-level config keys.
   
   ## Tests
   
   - `MysqlDagActionStoreTest`:
     - Added `testGetDagActionInsertTimeMillisReturnsTimestampForExistingRow` — 
inserts a fresh row, asserts the returned timestamp falls within 
`[insertWallClock - 1s, insertWallClock + 1s]` (MySQL `TIMESTAMP` is 
second-precision).
     - Added `testGetDagActionInsertTimeMillisReturnsEmptyForMissingRow` — 
asserts `Optional.empty()` for a key that is not present.
     - All existing tests continue to pass: `./gradlew :gobblin-service:test 
--tests "MysqlDagActionStoreTest"` reports 8/8 passing.
   - `FlowSpec.addProperty` String overload covered by the existing 
`FlowSpecTest::testAddProperty`.
   - Verified that the LAUNCH path still compiles and the new 
`stampDagActionContextOnFlowSpec` helper degrades gracefully when the 
underlying store cannot supply a timestamp.
   
   ## Compatibility
   
   - No schema change.
   - No breaking changes — all new methods have `default` implementations on 
interfaces.
   - No behavior change when the new `ConfigurationKeys` are not consumed; 
downstream code that does not read them is unaffected.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to