JNSimba opened a new pull request, #63079:
URL: https://github.com/apache/doris/pull/63079

   ## Summary
   
   `StreamingInsertJob` (CDC FROM-TO and `cdc_stream` TVF paths) used to call 
`splitChunks()` synchronously inside `CREATE STREAMING JOB`, asking cdc_client 
to cut every chunk of every table before returning. On large/non-uniform PK 
tables this can take 30+ minutes — far beyond the BE→cdc_client BRPC 60s 
timeout, and the SQL client blocks the whole time.
   
   This PR makes splitting tick-driven by the FE scheduler:
   
   - `CREATE` returns immediately; no more synchronous `splitChunks()`.
   - Each scheduler tick `advanceSplits()` issues one short fetchSplits RPC 
(default `batchSize=100`) and pushes that batch into `remainingSplits`. Tasks 
dispatch as soon as the first batch lands, so end-to-end first-byte latency 
stays close to flink-cdc's.
   - cdc_client is **stateless** — every RPC reconstructs `ChunkSplitter` from 
the `(currentSplittingTable, nextSplitStart, nextSplitId)` triple supplied by 
FE; flink-cdc internals are untouched (uses the public `ChunkSplitter` API 
only).
   - Crash recovery uses three sources of truth:
     - editlog persists `committedSplitProgress` (3-field `SplitProgress`) + 
existing `chunkHighWatermarkMap` / `binlogOffsetPersist`
     - `streaming_job_meta` system table holds full `chunk_list` JSON per table 
(UPSERT each `advanceSplits`)
     - cdc_client memory holds nothing
   - Both FROM-TO (multi-table) and TVF (single-table) paths share the same 
`SourceOffsetProvider#initSplitProgress` / `noMoreSplits` / `advanceSplits` 
interface; `StreamingJobSchedulerTask.handlePendingState` pre-advances one 
batch so the first task doesn't wait a full `max_interval`.
   
   Detailed design lives in the linked plan.
   
   ## Changes
   
   - `fe-common`: `FetchTableSplitsRequest` adds `nextSplitStart` (`Object[]`) 
/ `nextSplitId` / `batchSize`.
   - `fe-core`:
     - `SourceOffsetProvider` adds 3 default methods: `initSplitProgress` / 
`advanceSplits` / `noMoreSplits`.
     - `JdbcSourceOffsetProvider` implements the async state machine 
(committed/cdc `SplitProgress`, `advanceSplits`, dedup, system-table UPSERT, 
replay path).
     - `JdbcTvfSourceOffsetProvider.initOnCreate` no longer pre-splits; relies 
on the same scheduler tick path.
     - `StreamingInsertJob` carries `syncTables` (`@SerializedName("st")`); 
`initSourceJob` / `initInsertJob` initialize `SplitProgress`; 
`advanceSplitsIfNeed()` mirrors `fetchMeta` error handling (PAUSE on failure).
     - `StreamingJobSchedulerTask.handlePendingState` / `handleRunningState` 
call `advanceSplitsIfNeed()` each tick; PENDING handler pre-advances and 
short-circuits if PAUSED.
     - `StreamingJobUtils.upsertChunkList` covers id-allocation via `MAX(id)+1` 
lookup.
   - `cdc_client/JdbcIncrementalSourceReader`: `getSourceSplits()` rebuilt 
around the public `ChunkSplitter` API (no more in-memory loop / reflection 
hack).
   
   ## Tests
   
   - `SplitProgressTest` — copy/null-field semantics.
   - `JdbcSourceOffsetProviderAsyncSplitTest` — covers `advanceSplits` (first 
call / continue same table / cross-table switch / dedup / empty batch), 
`noMoreSplits`, `updateOffset` committed-progress advancement (mid-chunk vs 
last chunk vs replay missing-split path), and `computeCdcRemainingTables`.
   - Regression case (separate commit, not in this PR yet): 
`test_streaming_postgres_job_async_split.groovy` — 100 rows × 
`snapshot_split_size=5` → 20 splits across multiple ticks; asserts CREATE 
returns < 30s, full snapshot count + DISTINCT id, then INSERT/UPDATE/DELETE in 
binlog phase.
   
   ## Test plan
   
   - [ ] `mvn test -pl fe/fe-core 
-Dtest=JdbcSourceOffsetProviderAsyncSplitTest,SplitProgressTest`
   - [ ] Run `test_streaming_postgres_job_async_split` regression locally
   - [ ] PG/MySQL non-uniform PK large-table manual test: confirm `CREATE` 
returns in seconds, `SHOW STREAMING JOB` immediately reflects the new job, 
snapshot completes, binlog phase healthy
   - [ ] FE restart mid-snapshot: confirm cdc-side resumes from system-table 
position, no duplicate / lost rows
   - [ ] cdc_client kill mid-snapshot: confirm FE retries on next tick, no 
duplicate / lost rows
   - [ ] `cdc_stream` TVF + StreamingInsertJob path: confirm CREATE no longer 
blocks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to