[ 
https://issues.apache.org/jira/browse/ARROW-11897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17311401#comment-17311401
 ] 

Jorge Leitão commented on ARROW-11897:
--------------------------------------

I see. To understand: is there a reason why this should be in [Parquet] instead 
of in [DataFusion]? I.e. why should we push a specific parallelism strategy to 
the library?

Asking this because the way I see it, the parquet crate can't tell which 
use-case is being used on and provide an optimal strategy for (one record per 
page, per group or per file or per files?). For example, s3 vs hdfs vs local 
file-system typically require different parallelism strategies.

My hypothesis (which may be wrong!) is that the parquet crate should offer 
"units of work" that can be divided/parallelized according to IO (e.g. s3 vs 
filesystem), memory and CPU constraints that each consumer has, and allow 
consumers of the library (e.g. DataFusion, Polars, Ballista, s3 vs hdfs vs 
file-system) to design strategies that fit their constraints the best, by 
assembling these units according to their compute model.

> [Rust][Parquet] Use iterators to increase performance of creating Arrow arrays
> ------------------------------------------------------------------------------
>
>                 Key: ARROW-11897
>                 URL: https://issues.apache.org/jira/browse/ARROW-11897
>             Project: Apache Arrow
>          Issue Type: Improvement
>          Components: Rust
>            Reporter: Yordan Pavlov
>            Priority: Major
>
> The overall goal is to create an efficient pipeline from Parquet page data 
> into Arrow arrays, with as little intermediate conversion and memory 
> allocation as possible. It is assumed, that for best performance, we favor 
> doing fewer but larger copy operations (rather than more but smaller). 
> Such a pipeline would need to be flexible in order to enable high performance 
> implementations in several different cases:
>  (1) In some cases, such as plain-encoded number array, it might even be 
> possible to copy / create the array from a single contiguous section from a 
> page buffer. 
>  (2) In other cases, such as plain-encoded string array, since values are 
> encoded in non-contiguous slices (where value bytes are separated by length 
> bytes) in a page buffer contains multiple values, individual values will have 
> to be copied separately and it's not obvious how this can be avoided.
>  (3) Finally, in the case of bit-packing encoding and smaller numeric values, 
> page buffer data has to be decoded / expanded before it is ready to copy into 
> an arrow arrow, so a `Vec<u8>` will have to be returned instead of a slice 
> pointing to a page buffer.
> I propose that the implementation is split into three layers - (1) decoder, 
> (2) column reader and (3) array converter layers (not too dissimilar from the 
> current implementation, except it would be based on Iterators), as follows:
> *(1) Decoder layer:*
> A decoder output abstraction that enables all of the above cases and 
> minimizes intermediate memory allocation is `Iterator<Item = (count, 
> AsRef<[u8]>)>`.
>  Then in case (1) above, where a numeric array could be created from a single 
> contiguous byte slice, such an iterator could return a single item such as 
> `(1024, &[u8])`. 
>  In case (2) above, where each string value is encoded as an individual byte 
> slice, but it is still possible to copy directly from a page buffer, a 
> decoder iterator could return a sequence of items such as `(1, &[u8])`. 
>  And finally in case (3) above, where bit-packed values have to be 
> unpacked/expanded, and it's NOT possible to copy value bytes directly from a 
> page buffer, a decoder iterator could return items representing chunks of 
> values such as `(32, Vec<u8>)` where bit-packed values have been unpacked and 
>  the chunk size is configured for best performance.
> Another benefit of an `Iterator`-based abstraction is that it would prepare 
> the parquet crate for  migration to `async` `Stream`s (my understanding is 
> that a `Stream` is effectively an async `Iterator`).
> *(2) Column reader layer:*
> Then a higher level iterator could combine a value iterator and a (def) level 
> iterator to produce a sequence of `ValueSequence(count, AsRef<[u8]>)` and 
> `NullSequence(count)` items from which an arrow array can be created 
> efficiently.
> In future, a higher level iterator (for the keys) could be combined with a 
> dictionary value iterator to create a dictionary array.
> *(3) Array converter layer:*
> Finally, Arrow arrays would be created from a (generic) higher-level 
> iterator, using a layer of array converters that know what the value bytes 
> and nulls mean for each type of array.
>  
> [~nevime] , [~Dandandan] , [~jorgecarleitao] let me know what you think
> Next steps:
>  * split work into smaller tasks that could be done over time



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to