924060929 commented on PR #62942:
URL: https://github.com/apache/doris/pull/62942#issuecomment-4666633154

   Thanks for the work here — the low-memory load idea is valuable. One note 
for reviewers: the diff actually contains **three** independent mechanisms 
toward the same goal (reducing peak load memory), but the current description 
only documents the first. Since #2 is default-on and a large part of the 
change, it'd help a lot to cover all three. Here's a suggested description in a 
**Background → Approach → Implementation** structure — feel free to use or 
adapt:
   
   > ## Overview
   >
   > This PR reduces peak memory usage during load for random-distribution / 
wide / duplicate-without-key tables. It bundles three independent mechanisms:
   >
   > | Mechanism | Switch | Default | Scope |
   > |---|---|---|---|
   > | ① Low-memory load (direct rowset write + early column-page flush) | 
table property `enable_low_memory_load` + BE 
`column_writer_page_flush_threshold` (4MB) | off (per-table opt-in, 
DUP-without-key only) | V1 tablet-writer path |
   > | ② Adaptive random bucket (receiver-side) | FE 
`enable_adaptive_random_bucket_load` + BE 
`enable_adaptive_random_bucket_load_bucket_rotation` | on | cloud + 
random-distribution tables only |
   > | ③ Memtable flush-queue backpressure | BE 
`enable_memtable_flush_queue_backpressure` | off | all memtable-based loads |
   >
   > ---
   >
   > ### ① `enable_low_memory_load`: direct rowset write + early column-page 
flush
   >
   > **Background.** Segment V2 writes column by column. Each column writer 
first builds data pages in memory and keeps them in its internal page list 
until the column is finalized. For wide, duplicate-without-key tables this 
means that while column N is being written, the already-materialized pages of 
columns 1..N-1 stay resident; peak memory grows with column count × buffered 
pages per column × page size, and is worst for large variable-length 
(`STRING`/`VARCHAR`) columns and large batches. In addition, the normal path 
first buffers rows in a memtable for batching/sort, but a duplicate-without-key 
table needs neither sort nor aggregation, so the memtable layer is pure 
overhead for it.
   >
   > **Approach.** Add an opt-in table property `enable_low_memory_load` for 
duplicate-without-key tables. When enabled: (1) skip the memtable and write 
blocks directly into the rowset writer / segment (safe because there is no 
sort/aggregation — hence the strict DUP-without-key restriction); (2) inside 
segment writing, once a column's buffered pages reach 
`column_writer_page_flush_threshold` (default 4MB) **and** memory has hit the 
hard limit, flush those pages to the file writer immediately instead of holding 
them until finalize. Only the V1 tablet-writer path is covered; the 
memtable-on-sink (V2) path keeps existing behavior.
   >
   > **Implementation.**
   > - FE: `PropertyAnalyzer.analyzeEnableLowMemoryLoad`; validation in 
`InternalCatalog.createOlapTable` / `SchemaChangeHandler` (must be `DUP_KEYS` 
with no key columns); `OlapTable`/`TableProperty` get/set/build; `Env` for SHOW 
CREATE TABLE; `OlapTableSink.init` sets `TOlapTableSink.enable_low_memory_load`.
   > - BE propagation: `VNodeChannel::_open_internal` → 
`PTabletWriterOpenRequest.enable_low_memory_load` → `WriteRequest` → 
`RowsetWriterContext` → `ColumnWriterOptions`.
   > - BE direct write: `BaseDeltaWriter` skips creating `MemTableWriter`; 
`DeltaWriter::write` / `CloudDeltaWriter::write` → 
`_write_directly_to_rowset()` (clone selected columns + `add_rows` + 
`rowset_writer()->add_block()`); `close()` → `rowset_writer()->flush()`.
   > - BE early flush: `ScalarColumnWriter::_flush_pages_if_needed()`, called 
at the end of `finish_current_page()`.
   >
   > ---
   >
   > ### ② `enable_adaptive_random_bucket_load`: receiver-side random bucket
   >
   > **Background.** For random-distribution tables, today the **sender** (the 
post-planning VTabletWriter) round-robins each batch onto a tablet 
(`FIND_TABLET_EVERY_BATCH`), so a partition's data is scattered across all of 
its buckets. As a result each BE concurrently holds active writers/memtables 
for many buckets of that partition (high memory), and each batch fans out to 
many BEs (network amplification). In cloud mode we'd rather let the BE where 
data lands decide which bucket to write.
   >
   > **Approach.** Move the bucket decision from sender to receiver: the sender 
sends only `partition_id` (no `tablet_id`); each receiver BE writes to one 
bucket it owns (holds the primary replica of) and rotates to the next local 
bucket after a memtable flush, so each BE has only one active writer per 
partition at a time. FE computes, for each (sink BE, partition), the starting 
bucket and the ordered local bucket list and ships them down; the receiver 
maintains a small state machine for current-tablet + flush rotation. 
Auto-partition partitions created at runtime are computed on the fly inside the 
create/replace-partition RPC. Default-on, but only for cloud + 
random-distribution tables.
   >
   > **Implementation.**
   > - FE flag: `OlapTableSink.init` sets `enable_adaptive_random_bucket` 
(cloud + `Config.enable_adaptive_random_bucket_load` + 
`RandomDistributionInfo`).
   > - FE bucket-assignment algorithm (new static methods in `OlapTableSink`): 
`buildBeToBucketSeqs` derives each bucket's owner BE from tablet locations; 
`targetBucketNum = min(min(#sinkBE, #bucket), max(planFragmentNum, 1))`; 
balancing via `load_tablet_idx` rotation + `selectLeastUsedBucketSeq`; 
`applyAdaptiveRandomBucketAssignments` writes 
`load_tablet_idx`/`bucket_be_id`/`local_bucket_seqs` onto a per-BE deep-copied 
sink. Entry points: legacy `Coordinator.assignAdaptiveRandomBucketForFragment`, 
Nereids `ThriftPlansBuilder.assignAdaptiveRandomBucketForSinkParams`.
   > - BE sender: `VTabletWriter::_init` adds `FIND_TABLET_RANDOM_BUCKET` 
(highest priority); in that mode `OlapTabletFinder`/`VRowDistribution` skip 
tablet-id computation; `Payload` carries `RowPartTabletIds*` (partition_ids); 
`IndexChannel` builds `_channels_by_partition` and routes payloads to the owner 
channel; `VNodeChannel` sends per-row `partition_ids` plus the per-partition 
ordered tablet list at open.
   > - BE receiver: new proto fields (`is_receiver_side_random_bucket`, 
`random_bucket_partitions`, `PRandomBucketPartitionParam`); 
`_init_receiver_side_random_bucket_state` builds `AdaptiveRandomBucketState`; 
`add_batch` groups rows by partition → 
`_write_block_data_for_receiver_side_random_bucket` takes a per-partition lock, 
writes to `current_tablet()`, and calls `rotate_by_tablet()` on 
`memtable_flushed` (gated by 
`enable_adaptive_random_bucket_load_bucket_rotation`).
   > - Auto-partition: `FrontendServiceImpl.createPartition/replacePartition` 
set `load_tablet_idx` for RANDOM partitions and re-run the assignment via 
`collectAdaptiveBucketSinkContext` (looks up the coordinator by `query_id` to 
get the sink BE set).
   > - **Cross-cutting change (affects every load path, not just the new 
modes):** a `memtable_flushed` out-param threaded through 
`MemTableWriter::write` → `DeltaWriter::write` → `TabletsChannel`, and 
`MemTableWriter` switched to lazy memtable creation (created on first write / 
after flush instead of in `init()`). Called out separately because it's on the 
hot path for all loads.
   >
   > ---
   >
   > ### ③ `enable_memtable_flush_queue_backpressure`: flush-queue backpressure
   >
   > **Background.** `MemTableMemoryLimiter` currently throttles only on memory 
(soft/hard limit). But the flush thread pool can back up — memory hasn't hit 
the limit yet, but the flush queue is already long, and continuing to admit 
writes only grows the backlog. A memory-only view can't see this.
   >
   > **Approach.** Add the workload group's flush-pool queue length as a second 
backpressure signal: when the queue exceeds the threshold, hang/flush even if 
memory limits aren't reached. Default-off, introduced conservatively.
   >
   > **Implementation.** BE config `enable_memtable_flush_queue_backpressure` 
(default false). `MemTableMemoryLimiter::handle_memtable_flush(cancel_check, 
WorkloadGroup* wg)` adds a `check_queue_overloaded` check 
(`wg->get_memtable_flush_pool()->get_queue_size() > kQueueThreshold`) to both 
the entry condition and the wait loop; `hard_limit_reached()` is exposed as 
public (used by ①'s `_flush_pages_if_needed`). Callers in 
`load_channel_mgr.cpp` and `vtablet_writer_v2.cpp` pass the workload group.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to