JNSimba opened a new pull request, #63079:
URL: https://github.com/apache/doris/pull/63079
## Summary
`StreamingInsertJob` (CDC FROM-TO and `cdc_stream` TVF paths) used to call
`splitChunks()` synchronously inside `CREATE STREAMING JOB`, asking cdc_client
to cut every chunk of every table before returning. On large/non-uniform PK
tables this can take 30+ minutes — far beyond the BE→cdc_client BRPC 60s
timeout, and the SQL client blocks the whole time.
This PR makes splitting tick-driven by the FE scheduler:
- `CREATE` returns immediately; no more synchronous `splitChunks()`.
- Each scheduler tick `advanceSplits()` issues one short fetchSplits RPC
(default `batchSize=100`) and pushes that batch into `remainingSplits`. Tasks
dispatch as soon as the first batch lands, so end-to-end first-byte latency
stays close to flink-cdc's.
- cdc_client is **stateless** — every RPC reconstructs `ChunkSplitter` from
the `(currentSplittingTable, nextSplitStart, nextSplitId)` triple supplied by
FE; flink-cdc internals are untouched (uses the public `ChunkSplitter` API
only).
- Crash recovery uses three sources of truth:
- editlog persists `committedSplitProgress` (3-field `SplitProgress`) +
existing `chunkHighWatermarkMap` / `binlogOffsetPersist`
- `streaming_job_meta` system table holds full `chunk_list` JSON per table
(UPSERT each `advanceSplits`)
- cdc_client memory holds nothing
- Both FROM-TO (multi-table) and TVF (single-table) paths share the same
`SourceOffsetProvider#initSplitProgress` / `noMoreSplits` / `advanceSplits`
interface; `StreamingJobSchedulerTask.handlePendingState` pre-advances one
batch so the first task doesn't wait a full `max_interval`.
Detailed design lives in the linked plan.
## Changes
- `fe-common`: `FetchTableSplitsRequest` adds `nextSplitStart` (`Object[]`)
/ `nextSplitId` / `batchSize`.
- `fe-core`:
- `SourceOffsetProvider` adds 3 default methods: `initSplitProgress` /
`advanceSplits` / `noMoreSplits`.
- `JdbcSourceOffsetProvider` implements the async state machine
(committed/cdc `SplitProgress`, `advanceSplits`, dedup, system-table UPSERT,
replay path).
- `JdbcTvfSourceOffsetProvider.initOnCreate` no longer pre-splits; relies
on the same scheduler tick path.
- `StreamingInsertJob` carries `syncTables` (`@SerializedName("st")`);
`initSourceJob` / `initInsertJob` initialize `SplitProgress`;
`advanceSplitsIfNeed()` mirrors `fetchMeta` error handling (PAUSE on failure).
- `StreamingJobSchedulerTask.handlePendingState` / `handleRunningState`
call `advanceSplitsIfNeed()` each tick; PENDING handler pre-advances and
short-circuits if PAUSED.
- `StreamingJobUtils.upsertChunkList` covers id-allocation via `MAX(id)+1`
lookup.
- `cdc_client/JdbcIncrementalSourceReader`: `getSourceSplits()` rebuilt
around the public `ChunkSplitter` API (no more in-memory loop / reflection
hack).
## Tests
- `SplitProgressTest` — copy/null-field semantics.
- `JdbcSourceOffsetProviderAsyncSplitTest` — covers `advanceSplits` (first
call / continue same table / cross-table switch / dedup / empty batch),
`noMoreSplits`, `updateOffset` committed-progress advancement (mid-chunk vs
last chunk vs replay missing-split path), and `computeCdcRemainingTables`.
- Regression case (separate commit, not in this PR yet):
`test_streaming_postgres_job_async_split.groovy` — 100 rows ×
`snapshot_split_size=5` → 20 splits across multiple ticks; asserts CREATE
returns < 30s, full snapshot count + DISTINCT id, then INSERT/UPDATE/DELETE in
binlog phase.
## Test plan
- [ ] `mvn test -pl fe/fe-core
-Dtest=JdbcSourceOffsetProviderAsyncSplitTest,SplitProgressTest`
- [ ] Run `test_streaming_postgres_job_async_split` regression locally
- [ ] PG/MySQL non-uniform PK large-table manual test: confirm `CREATE`
returns in seconds, `SHOW STREAMING JOB` immediately reflects the new job,
snapshot completes, binlog phase healthy
- [ ] FE restart mid-snapshot: confirm cdc-side resumes from system-table
position, no duplicate / lost rows
- [ ] cdc_client kill mid-snapshot: confirm FE retries on next tick, no
duplicate / lost rows
- [ ] `cdc_stream` TVF + StreamingInsertJob path: confirm CREATE no longer
blocks
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]