DaisyModi opened a new pull request, #4183:
URL: https://github.com/apache/gobblin/pull/4183

   ## Summary
   
   GOBBLIN-2257 removed `synchronized` from `onAddSpec` and introduced a 
multi-threaded executor for parallel flow compilation, improving 
`flowConfigsV2` GET API P99 latency. However, the listener callbacks and 
FlowCatalog internals that were implicitly protected by that `synchronized` 
block now run concurrently without thread-safe data structures.
   
   ### The bug
   
   **`GobblinServiceJobScheduler.onAddSpec()`** reads and writes 
`scheduledFlowSpecs` and `lastUpdatedTimeForFlowSpec` — both plain `HashMap`s — 
from multiple concurrent callback threads. `NonScheduledJobRunner` also removes 
entries from a separate thread pool. Concurrent `HashMap` modifications cause 
structural corruption:
   - Lost map entries → flows not tracked, not cleaned up
   - Orphaned DAGs → `LaunchDagProc - error`, `DagNode or its job status not 
found`
   - Corrupted internal HashMap state → potential infinite loops, NPEs
   
   **`FlowCatalog.specSyncObjects`** (also a plain `HashMap`) is similarly 
accessed concurrently from `updateOrAddSpecHelper` (multiple API request 
threads) and `NonScheduledJobRunner`.
   
   ### The fix
   
   - **`GobblinServiceJobScheduler`**: `Maps.newHashMap()` → 
`Maps.newConcurrentMap()` for both `scheduledFlowSpecs` and 
`lastUpdatedTimeForFlowSpec`
   - **`FlowCatalog`**: `HashMap` → `ConcurrentHashMap` for `specSyncObjects`
   - **`FlowCatalog`**: Downgrade "SpecStore is missing in FlowCatalog" log 
from ERROR to WARN — with concurrent SpecStore writes, `getSpecURIs()` can list 
a URI before `addSpec()` fully commits. This transient condition is already 
handled by exponential backoff retries; ERROR level creates false alarms in 
monitoring
   
   All changes are drop-in replacements with no API changes and no impact on 
the P99 latency improvement from GOBBLIN-2257.
   
   ## Test plan
   
   - [ ] Existing `FlowCatalogTest` and `GobblinServiceJobSchedulerTest` pass
   - [ ] Verify in production: `LaunchDagProc - error` and `DagNode not found` 
errors should stop occurring
   - [ ] Verify in production: "SpecStore is missing in FlowCatalog" now logs 
at WARN instead of ERROR
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to