[ 
https://issues.apache.org/jira/browse/HUDI-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raymond Xu updated HUDI-5023:
-----------------------------
    Sprint: 2022/11/15, 2022/11/29, 0.13.0 Final Sprint, 0.13.0 Final Sprint 2  
(was: 2022/11/15, 2022/11/29, 0.13.0 Final Sprint)

> Add new Executor avoiding Queueing in the write-path
> ----------------------------------------------------
>
>                 Key: HUDI-5023
>                 URL: https://issues.apache.org/jira/browse/HUDI-5023
>             Project: Apache Hudi
>          Issue Type: Improvement
>          Components: writer-core
>            Reporter: Alexey Kudinkin
>            Assignee: Alexey Kudinkin
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 0.13.0
>
>
> We should evaluate removing _any queueing_ (BoundedInMemoryQueue, 
> DisruptorQueue) on the write path for multiple reasons:
> *It breaks up vertical chain of transformations applied to data*
> Spark (alas other engines) rely on the notion of _Iteration_ to vertically 
> compose all transformations applied to a single record to allow for effective 
> _stream_ processing, where all transformations are applied to an _Iterator, 
> yielding records_ from the source, that way
>  # Chain of transformations* is applied to every record one by one, allowing 
> to effectively limit amount of memory used to the number of records being 
> read and processed simultaneously (if the reading is not batched, it'd be 
> just a single record), which in turn allows
>  # To limit # of memory allocations required to process a single record. 
> Consider the opposite: if we'd do it breadth-wise, applying first 
> transformation to _all_ of the records, we will have to store all of 
> transformed records in memory which is costly from both GC overhead as well 
> as pure object churn perspectives.
>  
> Enqueueing is essentially violates both of these invariants, breaking up 
> {_}stream{_}-like processing model and forcing records to be kept in memory 
> for no good reason.
>  
> * This chain is broken up at shuffling points (collection of tasks executed 
> b/w these shuffling points are called stages in Spark)
>  
> *It requires data to be allocated on the heap*
> As was called out in the previous paragraph, enqueueing raw data read from 
> the source breaks up _stream_ processing paradigm and forces records to be 
> persisted in the heap.
> Consider following example: plain ParquetReader from Spark actually uses 
> *mutable* `ColumnarBatchRow` providing a Row-based view into the batch of 
> data being read from the file.
> Now, since it's a mutable object we can use it to _iterate_ over all of the 
> records (while doing stream-processing) ultimately producing some "output" 
> (either writing into another file, shuffle block, etc), but we +can't keep a 
> reference on it+ (for ex, by +enqueueing+ it) – since the object is mutable. 
> Instead we are forced to make a *copy* of it, which will obviously require us 
> to allocate it on the heap.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to