[ 
https://issues.apache.org/jira/browse/ARROW-11897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17310301#comment-17310301
 ] 

Yordan Pavlov commented on ARROW-11897:
---------------------------------------

Hi [~Dandandan], apologies I should have updated on my progress earlier, but I 
was busy trying things out.

My thinking so far has been in the lines of how to replace pretty much the 
entire path from parquet pages all the way into arrow arrays using iterators 
(because I am hoping that an iterator-based implementation would minimize 
unnecessary memory allocation). Something like this: 
Iterator<RowGroup> >> Iterator<(ColumnChunkContext, Page)> >> 
Iterator<(ValueSliceIterator, DefLevelIterator, RepLevelIterator)>
>> (Iterator<ValueSliceIterator>, Iterator<DefLevelIterator>, 
>>Iterator<RepLevelIterator>)
So far I have implemented splitting an iterator into multiple (parallel) 
iterators based on 
[https://stackoverflow.com/questions/25586681/splitting-iteratora-b-into-iteratora-and-iteratorb#25588440]

This will be useful, as illustrated above, for splitting an iterator over pages 
into iterators over values, def levels and rep levels which can be consume 
independently (but usually in parallel).

Also, in the past week I have been working on an splitting an iterator of byte 
slices into iterators that return no more than batch_size items - I have almost 
figured out how to do this, I just have to make it a bit more generic and do 
some more benchmarking. I would also like to do some benchmarking with 
[https://docs.rs/hyper/0.14.4/hyper/body/struct.Bytes.html] (which appears to 
be an alternative implementation of the ByteBufferPtr that already exists in 
the parquet crate).

Figuring out exactly how the work will be split into different PRs is what I 
will focus on next, but I already have some ideas:

I think would be to start small, by building on PageIterator::next() -> 
PageReader to produce an iterator of pages, something like:

 
// create iterator of (contiguous) data slices across all pages from all row 
groups
row_group_iter // iter of PageReader
  // add row group context using the scan() operator
  .iter_mut().flat_map(|x| {
      // the column chunk / row group context is used to store dictionaries for 
dictionary-encoded chunks
      let context = Rc::new(RefCell::new(IterContext::new()));
      x.map(move |v| (context.clone(), v))
  }) // iter of (mut RowGroupContext, Page)
  .map(|(c, p)| { 
    let mut context = c.borrow_mut();
    get_decoder(p)
  }) // iter of AsRef<[u8]>
  .flatten()
 

Iterating over pages is something that is implemented inconsistently for 
primitive and complex types, and I would like to ultimately merge the two 
implementations, so that there is no more primitive or complex array reader, 
just a single arrow array reader using adapters / converters for different 
types of arrays.

Also the decoding functionality implemented in each parquet type is only used 
by the plain decoder (and not used by any other decoder) and I would look to 
move this away from the types and into the plain decoder where it belongs.

Then, I would look into implementing the Iterator<Item = AsRef<[u8]>> idea for 
the different decoders and also into how exactly the adaptors / converters for 
different types of arrays would work.

I am open to suggestions on how we could collaborate better on this. Let me 
know what you think.

> [Rust][Parquet] Use iterators to increase performance of creating Arrow arrays
> ------------------------------------------------------------------------------
>
>                 Key: ARROW-11897
>                 URL: https://issues.apache.org/jira/browse/ARROW-11897
>             Project: Apache Arrow
>          Issue Type: Improvement
>          Components: Rust
>            Reporter: Yordan Pavlov
>            Priority: Major
>
> The overall goal is to create an efficient pipeline from Parquet page data 
> into Arrow arrays, with as little intermediate conversion and memory 
> allocation as possible. It is assumed, that for best performance, we favor 
> doing fewer but larger copy operations (rather than more but smaller). 
> Such a pipeline would need to be flexible in order to enable high performance 
> implementations in several different cases:
>  (1) In some cases, such as plain-encoded number array, it might even be 
> possible to copy / create the array from a single contiguous section from a 
> page buffer. 
>  (2) In other cases, such as plain-encoded string array, since values are 
> encoded in non-contiguous slices (where value bytes are separated by length 
> bytes) in a page buffer contains multiple values, individual values will have 
> to be copied separately and it's not obvious how this can be avoided.
>  (3) Finally, in the case of bit-packing encoding and smaller numeric values, 
> page buffer data has to be decoded / expanded before it is ready to copy into 
> an arrow arrow, so a `Vec<u8>` will have to be returned instead of a slice 
> pointing to a page buffer.
> I propose that the implementation is split into three layers - (1) decoder, 
> (2) column reader and (3) array converter layers (not too dissimilar from the 
> current implementation, except it would be based on Iterators), as follows:
> *(1) Decoder layer:*
> A decoder output abstraction that enables all of the above cases and 
> minimizes intermediate memory allocation is `Iterator<Item = (count, 
> AsRef<[u8]>)>`.
>  Then in case (1) above, where a numeric array could be created from a single 
> contiguous byte slice, such an iterator could return a single item such as 
> `(1024, &[u8])`. 
>  In case (2) above, where each string value is encoded as an individual byte 
> slice, but it is still possible to copy directly from a page buffer, a 
> decoder iterator could return a sequence of items such as `(1, &[u8])`. 
>  And finally in case (3) above, where bit-packed values have to be 
> unpacked/expanded, and it's NOT possible to copy value bytes directly from a 
> page buffer, a decoder iterator could return items representing chunks of 
> values such as `(32, Vec<u8>)` where bit-packed values have been unpacked and 
>  the chunk size is configured for best performance.
> Another benefit of an `Iterator`-based abstraction is that it would prepare 
> the parquet crate for  migration to `async` `Stream`s (my understanding is 
> that a `Stream` is effectively an async `Iterator`).
> *(2) Column reader layer:*
> Then a higher level iterator could combine a value iterator and a (def) level 
> iterator to produce a sequence of `ValueSequence(count, AsRef<[u8]>)` and 
> `NullSequence(count)` items from which an arrow array can be created 
> efficiently.
> In future, a higher level iterator (for the keys) could be combined with a 
> dictionary value iterator to create a dictionary array.
> *(3) Array converter layer:*
> Finally, Arrow arrays would be created from a (generic) higher-level 
> iterator, using a layer of array converters that know what the value bytes 
> and nulls mean for each type of array.
>  
> [~nevime] , [~Dandandan] , [~jorgecarleitao] let me know what you think
> Next steps:
>  * split work into smaller tasks that could be done over time



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to