[ https://issues.apache.org/jira/browse/HUDI-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Alexey Kudinkin updated HUDI-5023: ---------------------------------- Sprint: 2022/11/15 (was: 2022/11/29) > Evaluate removing Queueing in the write path > -------------------------------------------- > > Key: HUDI-5023 > URL: https://issues.apache.org/jira/browse/HUDI-5023 > Project: Apache Hudi > Issue Type: Improvement > Components: writer-core > Reporter: Alexey Kudinkin > Assignee: Alexey Kudinkin > Priority: Critical > Labels: pull-request-available > Fix For: 0.13.0 > > > We should evaluate removing _any queueing_ (BoundedInMemoryQueue, > DisruptorQueue) on the write path for multiple reasons: > *It breaks up vertical chain of transformations applied to data* > Spark (alas other engines) rely on the notion of _Iteration_ to vertically > compose all transformations applied to a single record to allow for effective > _stream_ processing, where all transformations are applied to an _Iterator, > yielding records_ from the source, that way > # Chain of transformations* is applied to every record one by one, allowing > to effectively limit amount of memory used to the number of records being > read and processed simultaneously (if the reading is not batched, it'd be > just a single record), which in turn allows > # To limit # of memory allocations required to process a single record. > Consider the opposite: if we'd do it breadth-wise, applying first > transformation to _all_ of the records, we will have to store all of > transformed records in memory which is costly from both GC overhead as well > as pure object churn perspectives. > > Enqueueing is essentially violates both of these invariants, breaking up > {_}stream{_}-like processing model and forcing records to be kept in memory > for no good reason. > > * This chain is broken up at shuffling points (collection of tasks executed > b/w these shuffling points are called stages in Spark) > > *It requires data to be allocated on the heap* > As was called out in the previous paragraph, enqueueing raw data read from > the source breaks up _stream_ processing paradigm and forces records to be > persisted in the heap. > Consider following example: plain ParquetReader from Spark actually uses > *mutable* `ColumnarBatchRow` providing a Row-based view into the batch of > data being read from the file. > Now, since it's a mutable object we can use it to _iterate_ over all of the > records (while doing stream-processing) ultimately producing some "output" > (either writing into another file, shuffle block, etc), but we +can't keep a > reference on it+ (for ex, by +enqueueing+ it) – since the object is mutable. > Instead we are forced to make a *copy* of it, which will obviously require us > to allocate it on the heap. -- This message was sent by Atlassian Jira (v8.20.10#820010)