XiangpengHao opened a new pull request, #7850:
URL: https://github.com/apache/arrow-rs/pull/7850

   This is my latest attempt to make pushdown faster. Prior art: #6921
   
   cc @alamb @zhuqi-lucas 
   
   ## Problems of #6921
   
   1. It proactively loads entire row group into memory. (rather than only 
loading pages that passing the filter predicate)
   2. It only cache decompressed pages, still paying the decoding cost twice.
   
   This PR takes a different approach, it does not change the decoding 
pipeline, so we avoid the problem 1. It also caches the arrow record batch, so 
avoid problem 2.
   
   But this means we need to use more memory to cache data.
   
   ## How it works?
   
   1. It instruments the `array_readers` with a transparent 
`cached_array_reader`.
   2. The cache layer will first consult the `RowGroupCache` to look for a 
batch, and only reads from underlying reader on a cache miss.
   3. There're cache producer and cache consumer. Producer is when we build 
filters we insert arrow arrays into cache, consumer is when we build outputs, 
we remove arrow array from cache. So the memory usage should look like this:
   ```
       ▲
       │     ╭─╮
       │    ╱   ╲
       │   ╱     ╲
       │  ╱       ╲
       │ ╱         ╲
       │╱           ╲
       └─────────────╲──────► Time
       │      │      │
       Filter  Peak  Consume
       Phase (Built) (Decrease)
   ```
   In a concurrent setup, not all reader may reach the peak point at the same 
time, so the peak system memory usage might be lower.
   
   4.  It has a max_cache_size knob, this is a per row group setting. If the 
row group has used up the budget, the cache stops taking new data. and the 
`cached_array_reader` will fallback to read and decode from Parquet.
   
   ## Other benefits
   
   1. This architecture allows nested columns (but not implemented in this pr), 
i.e., it's future proof.
   2. There're many performance optimizations to further squeeze the 
performance, but even with current state, it has no regressions.
   
   ## How does it perform?
   
   My criterion somehow won't produces a result from `--save-baseline`, so I 
asked llm to generate a table from this benchmark:
   ```
   cargo bench --bench arrow_reader_clickbench --features "arrow async" "async"
   ```
   
   `Baseline` is the implementation for current main branch.
   `New Unlimited` is the new pushdown with unlimited memory budget.
   `New 100MB` is the new pushdown but the memory budget for a row group 
caching is 100MB.
   
   ```
   Query  | Baseline (ms) | New Unlimited (ms) | Diff (ms)  | New 100MB (ms) | 
Diff (ms)
   
-------+--------------+--------------------+-----------+----------------+-----------
   Q1     | 0.847          | 0.803               | -0.044     | 0.812          
| -0.035    
   Q10    | 4.060          | 6.273               | +2.213     | 6.216          
| +2.156    
   Q11    | 5.088          | 7.152               | +2.064     | 7.193          
| +2.105    
   Q12    | 18.485         | 14.937              | -3.548     | 14.904         
| -3.581    
   Q13    | 24.859         | 21.908              | -2.951     | 21.705         
| -3.154    
   Q14    | 23.994         | 20.691              | -3.303     | 20.467         
| -3.527    
   Q19    | 1.894          | 1.980               | +0.086     | 1.996          
| +0.102    
   Q20    | 90.325         | 64.689              | -25.636    | 74.478         
| -15.847   
   Q21    | 106.610        | 74.766              | -31.844    | 99.557         
| -7.053    
   Q22    | 232.730        | 101.660             | -131.070   | 204.800        
| -27.930   
   Q23    | 222.800        | 186.320             | -36.480    | 186.590        
| -36.210   
   Q24    | 24.840         | 19.762              | -5.078     | 19.908         
| -4.932    
   Q27    | 80.463         | 47.118              | -33.345    | 49.597         
| -30.866   
   Q28    | 78.999         | 47.583              | -31.416    | 51.432         
| -27.567   
   Q30    | 28.587         | 28.710              | +0.123     | 28.926         
| +0.339    
   Q36    | 80.157         | 57.954              | -22.203    | 58.012         
| -22.145   
   Q37    | 46.962         | 45.901              | -1.061     | 45.386         
| -1.576    
   Q38    | 16.324         | 16.492              | +0.168     | 16.522         
| +0.198    
   Q39    | 20.754         | 20.734              | -0.020     | 20.648         
| -0.106    
   Q40    | 22.554         | 21.707              | -0.847     | 21.995         
| -0.559    
   Q41    | 16.430         | 16.391              | -0.039     | 16.581         
| +0.151    
   Q42    | 6.045          | 6.157               | +0.112     | 6.120          
| +0.075    
   ```
   
   1. If we consider the diff within 5ms to be noise, then we are never worse 
than the current implementation.
   2. We see significant improvements for string-heavy queries, because string 
columns are large, they take time to decompress and decode.
   3. 100MB cache budget seems to have small performance impact.
   
   
   ## Limitations
   
   1. It only works for async readers, because sync reader do not follow the 
same row group by row group structure.
   2. It is memory hungry -- compared to #6921. But changing decoding pipeline 
without eager loading entire row group would require significant changes to the 
current decoding infrastructure, e.g., we need to make page iterator an async 
function.
   3. It currently doesn't support nested columns, more specifically, it 
doesn't support nested columns with nullable parents. but supporting it is 
straightforward, no big changes.
   4. The current memory accounting is not accurate, it will overestimate the 
memory usage, especially when reading string view arrays, where multiple string 
view may share the same underlying buffer, and that buffer size is counted 
twice. Anyway, we never exceeds the user configured memory usage.
   
   ## Next steps?
   
   This pr is largely proof of concept, I want to collect some feedback before 
sending a multi-thousands pr :)
   
   Some items I can think of:
   1. Design an interface for user to specify the cache size limit, currently 
it's hard-coded.
   2. Don't instrument nested array reader if the parquet file has nullable 
parent. currently it will panic
   3. More testing, and integration test/benchmark with Datafusion
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to