Re: [PR] Parquet filter pushdown v4 [arrow-rs]
alamb commented on PR #7850: URL: https://github.com/apache/arrow-rs/pull/7850#issuecomment-3114685204 Update: Here is a proposed PR to add a config option for predicate evaluation: - https://github.com/XiangpengHao/arrow-rs/pull/7 The only thing left is to figure out how to integrate this caching into the sync reader. Some options I can think of: 1. Don't integrate it at all 2. Rework the IO pattern in the sync reader so it only evaluates predicates a row group at a time rather than for the entire file (which is likely hard due to) https://github.com/apache/arrow-rs/issues/7983 So now I am stumped 🤔 Maybe I'll try and hack out a ParquetDecoder and see what that would look like 🤔 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Parquet filter pushdown v4 [arrow-rs]
alamb commented on code in PR #7850:
URL: https://github.com/apache/arrow-rs/pull/7850#discussion_r2229371852
##
parquet/src/arrow/async_reader/mod.rs:
##
@@ -597,11 +610,16 @@ where
metadata: self.metadata.as_ref(),
};
+let cache_options_builder =
Review Comment:
I started looking into how to integrate this cache into the sync reader, and
I got a bit stuck because the sync reader evaluates the predicates for *all*
RowGroups first -- so if we put a predicate cache in the sync reader, it would
end up having to cache results from *all* row groups, not just a single row
group the way the async reader does
https://github.com/apache/arrow-rs/blob/8c75ad988e448f2eb02a2e9d9f4b920a59b7bb2b/parquet/src/arrow/arrow_reader/mod.rs#L793-L818
##
parquet/src/arrow/async_reader/mod.rs:
##
@@ -1832,6 +1882,7 @@ mod tests {
assert_eq!(total_rows, 730);
}
+#[ignore]
Review Comment:
I guess my point is we should either update the test or remove it -- leaving
it ignored is likely not helping anything
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Parquet filter pushdown v4 [arrow-rs]
alamb commented on PR #7850: URL: https://github.com/apache/arrow-rs/pull/7850#issuecomment-3110230266 Update here: I have hooked up a configuration option, and tests in a PR: - https://github.com/XiangpengHao/arrow-rs/pull/7 The tests are failing because the sync reader does not use the predicate cache yet. I will fix that tomorrow and get that PR up for review -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Parquet filter pushdown v4 [arrow-rs]
alamb commented on PR #7850: URL: https://github.com/apache/arrow-rs/pull/7850#issuecomment-3097317805 > I am beginning to look into this -- my planned contribution is to > > 1. Make a setting for max cache size (which we will need as a escape valve to turn this off) > 2. Tests for cache memory size I started writing some tests but it got somewhat more complicated than I expected. Here is the WIP PR - https://github.com/apache/arrow-rs/pull/7971 Once that is in place then I hope to use the same pattern to verify the cache operations. I will continue tomorrow -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Parquet filter pushdown v4 [arrow-rs]
alamb commented on PR #7850: URL: https://github.com/apache/arrow-rs/pull/7850#issuecomment-3096390709 I am beginning to look into this -- my planned contribution is to 1. Make a setting for max cache size (which we will need as a escape valve to turn this off) 2. Tests for caching -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Parquet filter pushdown v4 [arrow-rs]
alamb commented on PR #7850: URL: https://github.com/apache/arrow-rs/pull/7850#issuecomment-3092286873 Thank you -- I will get back to this tomorrow or Monday -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Parquet filter pushdown v4 [arrow-rs]
XiangpengHao commented on PR #7850: URL: https://github.com/apache/arrow-rs/pull/7850#issuecomment-3085041352 Summary for new updates: 1. incorporated the changes from @alamb 2. added a test case to reproduce error from https://github.com/apache/datafusion/pull/16711#issuecomment-3078620431 3. fixed the above bug 4. added a slightly more accurate memory accounting for string view arrays -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Parquet filter pushdown v4 [arrow-rs]
alamb commented on PR #7850: URL: https://github.com/apache/arrow-rs/pull/7850#issuecomment-3084508352 > Maybe it's just me, but I can't reproduce some of the regressions reported in datafusion integrations, I'll get a "cloud" machine and try again. If we can't reproduce them I think we should just ignore it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Parquet filter pushdown v4 [arrow-rs]
XiangpengHao commented on PR #7850: URL: https://github.com/apache/arrow-rs/pull/7850#issuecomment-3082355027 > Summary so far (I now need to go work on some other things for the rest of the day): > > I made two proposed changes > > * [Simplify projection caching XiangpengHao/arrow-rs#5](https://github.com/XiangpengHao/arrow-rs/pull/5) > * [Move cache options construction to ArrayReaderBuilder, add builders XiangpengHao/arrow-rs#6](https://github.com/XiangpengHao/arrow-rs/pull/6) > * I found a bug in this code via the DataFusion code (see [POC: Test DataFusion with experimental Parquet Filter Pushdown (try 4) datafusion#16711 (comment)](https://github.com/apache/datafusion/pull/16711#issuecomment-3078620431z)) > > My plan for tomorrow will be to try and write some tests: > > 1. Reproduce the bug / error in an arrow-rs only test > 2. Write some sort of integration test that shows the cache working (in preparation for wiring in the memory limit) Thank you for the review @alamb , I plan to take a look on this in the next few days, and also think about further optimizations. Maybe it's just me, but I can't reproduce some of the regressions reported in datafusion integrations, I'll get a "cloud" machine and try again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Parquet filter pushdown v4 [arrow-rs]
alamb commented on PR #7850: URL: https://github.com/apache/arrow-rs/pull/7850#issuecomment-3079205754 Summary so far (I now need to go work on some other things for the rest of the day): I made two proposed changes - https://github.com/XiangpengHao/arrow-rs/pull/5 - https://github.com/XiangpengHao/arrow-rs/pull/6 - I found a bug in this code via the DataFusion code (see https://github.com/apache/datafusion/pull/16711#issuecomment-3078620431z) My plan for tomorrow will be to try and write some tests: 1. Reproduce the bug / error in an arrow-rs only test 2. Write some sort of integration test that shows the cache working (in preparation for wiring in the memory limit) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Parquet filter pushdown v4 [arrow-rs]
alamb commented on PR #7850: URL: https://github.com/apache/arrow-rs/pull/7850#issuecomment-3079170550 Here is another proposed addition - https://github.com/XiangpengHao/arrow-rs/pull/6 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Parquet filter pushdown v4 [arrow-rs]
alamb commented on PR #7850: URL: https://github.com/apache/arrow-rs/pull/7850#issuecomment-3079092238 🤖: Benchmark completed Details ``` groupmain pushdown-v4 - --- arrow_reader_clickbench/async/Q1 1.02 2.4±0.01ms? ?/sec 1.00 2.3±0.01ms? ?/sec arrow_reader_clickbench/async/Q101.00 11.3±0.13ms? ?/sec 1.07 12.0±0.24ms? ?/sec arrow_reader_clickbench/async/Q111.00 13.3±0.20ms? ?/sec 1.03 13.7±0.20ms? ?/sec arrow_reader_clickbench/async/Q121.39 35.6±0.32ms? ?/sec 1.00 25.6±0.31ms? ?/sec arrow_reader_clickbench/async/Q131.26 49.7±0.47ms? ?/sec 1.00 39.6±0.26ms? ?/sec arrow_reader_clickbench/async/Q141.27 47.0±0.30ms? ?/sec 1.00 36.9±0.27ms? ?/sec arrow_reader_clickbench/async/Q191.00 5.2±0.07ms? ?/sec 1.11 5.8±0.15ms? ?/sec arrow_reader_clickbench/async/Q201.33161.3±1.22ms? ?/sec 1.00121.6±0.61ms? ?/sec arrow_reader_clickbench/async/Q211.29207.6±1.10ms? ?/sec 1.00160.3±1.21ms? ?/sec arrow_reader_clickbench/async/Q221.00405.6±2.49ms? ?/sec 1.00406.4±5.58ms? ?/sec arrow_reader_clickbench/async/Q231.18496.8±5.89ms? ?/sec 1.00 422.8±12.00ms? ?/sec arrow_reader_clickbench/async/Q241.24 55.4±0.66ms? ?/sec 1.00 44.6±0.61ms? ?/sec arrow_reader_clickbench/async/Q271.53165.6±1.38ms? ?/sec 1.00108.0±0.55ms? ?/sec arrow_reader_clickbench/async/Q281.45162.6±1.54ms? ?/sec 1.00112.5±0.85ms? ?/sec arrow_reader_clickbench/async/Q301.00 62.3±0.73ms? ?/sec 1.00 62.7±0.50ms? ?/sec arrow_reader_clickbench/async/Q361.32169.0±1.77ms? ?/sec 1.00128.1±0.56ms? ?/sec arrow_reader_clickbench/async/Q371.00100.5±0.54ms? ?/sec 1.00100.8±0.75ms? ?/sec arrow_reader_clickbench/async/Q381.00 39.8±0.28ms? ?/sec 1.01 40.2±0.23ms? ?/sec arrow_reader_clickbench/async/Q391.01 50.0±0.43ms? ?/sec 1.00 49.5±0.34ms? ?/sec arrow_reader_clickbench/async/Q401.03 53.8±0.48ms? ?/sec 1.00 52.3±0.44ms? ?/sec arrow_reader_clickbench/async/Q411.00 41.2±0.40ms? ?/sec 1.01 41.5±0.33ms? ?/sec arrow_reader_clickbench/async/Q421.00 14.4±0.14ms? ?/sec 1.03 14.8±0.17ms? ?/sec arrow_reader_clickbench/sync/Q1 1.00 2.2±0.02ms? ?/sec 1.01 2.2±0.01ms? ?/sec arrow_reader_clickbench/sync/Q10 1.03 9.8±0.06ms? ?/sec 1.00 9.5±0.07ms? ?/sec arrow_reader_clickbench/sync/Q11 1.00 11.5±0.07ms? ?/sec 1.00 11.5±0.11ms? ?/sec arrow_reader_clickbench/sync/Q12 1.02 37.9±0.32ms? ?/sec 1.00 37.2±0.35ms? ?/sec arrow_reader_clickbench/sync/Q13 1.00 50.8±0.45ms? ?/sec 1.01 51.2±0.52ms? ?/sec arrow_reader_clickbench/sync/Q14 1.00 48.6±0.31ms? ?/sec 1.02 49.3±0.39ms? ?/sec arrow_reader_clickbench/sync/Q19 1.03 4.4±0.03ms? ?/sec 1.00 4.3±0.02ms? ?/sec arrow_reader_clickbench/sync/Q20 1.00176.7±1.29ms? ?/sec 1.01178.1±1.35ms? ?/sec arrow_reader_clickbench/sync/Q21 1.00234.8±1.84ms? ?/sec 1.01236.9±1.80ms? ?/sec arrow_reader_clickbench/sync/Q22 1.00479.6±4.95ms? ?/sec 1.01482.4±3.57ms? ?/sec arrow_reader_clickbench/sync/Q23 1.04 443.1±17.45ms? ?/sec 1.00426.0±5.14ms? ?/sec arrow_reader_clickbench/sync/Q24 1.00 53.0±0.74ms? ?/sec 1.01 53.3±0.96ms? ?/sec arrow_reader_clickbench/sync/Q27 1.00154.6±1.23ms? ?/sec 1.00154.2±0.95ms? ?/sec arrow_reader_clickbench/sync/Q28 1.00153.2±1.41ms? ?/sec 1.01154.5±1.53ms? ?/sec arrow_reader_clickbench/sync/Q30 1.00 59.7±0.41ms? ?/sec 1.01 60.1±0.50ms? ?/sec arrow_reader_clickbench/sync/Q36 1.00158.8±1.01ms? ?/sec 1.01160.1±1.63ms? ?/sec arrow_reader_clickbench/sync/Q37 1.01 94.3±0.74ms? ?/sec 1.00 93.8±0.53ms? ?/sec arrow_reader_clickbench/sync/Q38 1.00 32.0±0.21ms? ?/sec 1.01 32.3±0.22ms? ?/sec arrow_
Re: [PR] Parquet filter pushdown v4 [arrow-rs]
alamb commented on PR #7850: URL: https://github.com/apache/arrow-rs/pull/7850#issuecomment-3078962810 🤖 `./gh_compare_arrow.sh` [Benchmark Script](https://github.com/alamb/datafusion-benchmarking/blob/main/gh_compare_arrow.sh) Running Linux aal-dev 6.11.0-1016-gcp #16~24.04.1-Ubuntu SMP Wed May 28 02:40:52 UTC 2025 x86_64 x86_64 x86_64 GNU/Linux Comparing pushdown-v4 (b8351639569fe49d4aa88eab4b4efd2270859ec0) to c40830e390b3c6edc388f17817d633ecd40eb04d [diff](https://github.com/apache/arrow-rs/compare/c40830e390b3c6edc388f17817d633ecd40eb04d..b8351639569fe49d4aa88eab4b4efd2270859ec0) BENCH_NAME=arrow_reader_clickbench BENCH_COMMAND=cargo bench --features=arrow,async,test_common,experimental --bench arrow_reader_clickbench BENCH_FILTER= BENCH_BRANCH_NAME=pushdown-v4 Results will be posted here when complete -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Parquet filter pushdown v4 [arrow-rs]
alamb commented on PR #7850: URL: https://github.com/apache/arrow-rs/pull/7850#issuecomment-3078945749 Here is a proposal: - https://github.com/XiangpengHao/arrow-rs/pull/5 (I think the CI is having issues due to https://www.githubstatus.com/incidents/k20s3qvr28zw) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Parquet filter pushdown v4 [arrow-rs]
alamb commented on PR #7850: URL: https://github.com/apache/arrow-rs/pull/7850#issuecomment-3078627131 I am now working on additional review / proposed improvements to this PR -- basically to structure the caching more into the PlanBuilder and make it easier to test -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Parquet filter pushdown v4 [arrow-rs]
alamb commented on PR #7850: URL: https://github.com/apache/arrow-rs/pull/7850#issuecomment-3078622511 I found another bug in DataFusion testing here https://github.com/apache/datafusion/pull/16711#issuecomment-3078620431 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Parquet filter pushdown v4 [arrow-rs]
XiangpengHao commented on PR #7850: URL: https://github.com/apache/arrow-rs/pull/7850#issuecomment-3075979442 > So one thing I didn't understand after reading this PR in detail was how the relative row positions are updated after applying a filter. Thank you for reviewing this @alamb ! This is place we do merge back filter selection to original selection: https://github.com/XiangpengHao/arrow-rs/blob/5537bcb0870ba21549e72b58b65237ba823eec50/parquet/src/arrow/arrow_reader/read_plan.rs#L119 i.e., this pr does not change how we represent selections, we still use the existing implementation, the only difference is that we added a transparent cache layer, rest of the code should all be the same. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Parquet filter pushdown v4 [arrow-rs]
XiangpengHao commented on code in PR #7850:
URL: https://github.com/apache/arrow-rs/pull/7850#discussion_r2208583946
##
parquet/src/arrow/async_reader/mod.rs:
##
@@ -1832,6 +1882,7 @@ mod tests {
assert_eq!(total_rows, 730);
}
+#[ignore]
Review Comment:
This test will fail because the cache will read larger ranges, this is
normally not a big problem but does create inconsistencies in tests
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Parquet filter pushdown v4 [arrow-rs]
alamb commented on code in PR #7850:
URL: https://github.com/apache/arrow-rs/pull/7850#discussion_r2208476062
##
parquet/src/arrow/async_reader/mod.rs:
##
@@ -1832,6 +1882,7 @@ mod tests {
assert_eq!(total_rows, 730);
}
+#[ignore]
Review Comment:
this test still fails for me locally when I remove the `ignore`
##
parquet/src/arrow/async_reader/mod.rs:
##
@@ -613,8 +623,18 @@ where
.fetch(&mut self.input, predicate.projection(), selection)
.await?;
+let mut cache_projection = predicate.projection().clone();
+cache_projection.intersect(&projection);
Review Comment:
So one thing I didn't understand after reading this PR in detail was how the
relative row positions are updated after applying a filter.
For example if we are applying multiple filters, the first may reduce the
original RowSelection down to `[100->200]`, and now when the second filter runs
it is only evaluated on the 100->200 rows , not the original selection
In other words I think there needs to be some sort of function equvalent to
`RowSelection::and_then` that applies to the cache
```rust
// Narrow the cache so that it only retains the results of evaluating the
predicate
let row_group_cache = row_group_cache.and_then(resulting_selection)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Parquet filter pushdown v4 [arrow-rs]
alamb commented on code in PR #7850: URL: https://github.com/apache/arrow-rs/pull/7850#discussion_r2208531957 ## parquet/src/arrow/async_reader/mod.rs: ## @@ -613,8 +623,18 @@ where .fetch(&mut self.input, predicate.projection(), selection) .await?; +let mut cache_projection = predicate.projection().clone(); +cache_projection.intersect(&projection); Review Comment: So one thing I didn't understand after reading this PR in detail was how the relative row positions are updated after applying a filter. For example if we are applying multiple filters, the first may reduce the original RowSelection down to `[100->200]`, and now when the second filter runs it is only evaluated on the 100->200 rows , not the original selection In other words I think there needs to be some sort of function equvalent to `RowSelection::and_then` that applies to the cache ```rust // Narrow the cache so that it only retains the results of evaluating the predicate let row_group_cache = row_group_cache.and_then(resulting_selection) ``` Maybe this is the root cause of https://github.com/apache/datafusion/actions/runs/16302299778/job/46039904381?pr=16711 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Parquet filter pushdown v4 [arrow-rs]
alamb commented on PR #7850: URL: https://github.com/apache/arrow-rs/pull/7850#issuecomment-3074782955 I merged up from main to make it easier to evaluate this PR against DataFusion in https://github.com/apache/datafusion/pull/16711 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Parquet filter pushdown v4 [arrow-rs]
alamb commented on PR #7850: URL: https://github.com/apache/arrow-rs/pull/7850#issuecomment-3052761087 For anyone following along, we are testing / discussing this PR here: - https://github.com/apache/datafusion/pull/16711#issuecomment-3052692116 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Parquet filter pushdown v4 [arrow-rs]
alamb commented on PR #7850: URL: https://github.com/apache/arrow-rs/pull/7850#issuecomment-3048522884 > Hey peeps 👋 > > Apologies for the approval, I was just checking if this was possible. Maybe we can restrict random people to approve it? > > Again sorry and thanks! 🙈 No worries -- I think we purposely have the repo setup so anyone can approve PRs (merging isn't gated on approval anyways). This is a community endeavor and while not everyone has write access to the repos (and thus can merge PRs) everyone should feel free to review and approve PRs -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Parquet filter pushdown v4 [arrow-rs]
Satya-Pranav commented on PR #7850: URL: https://github.com/apache/arrow-rs/pull/7850#issuecomment-3047729597 Hey peeps 👋 Apologies for the approval, I was just checking if this was possible. Maybe we can restrict random people to approve it? Again sorry and thanks! 🙈 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Parquet filter pushdown v4 [arrow-rs]
XiangpengHao commented on PR #7850: URL: https://github.com/apache/arrow-rs/pull/7850#issuecomment-3046616707 > I kicked off some benchmark runs > > * [POC: Test DataFusion with experimental Parquet Filter Pushdown (try 4)Â datafusion#16711 (comment)](https://github.com/apache/datafusion/pull/16711#issuecomment-3046608364) Holding my breath -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Parquet filter pushdown v4 [arrow-rs]
alamb commented on PR #7850: URL: https://github.com/apache/arrow-rs/pull/7850#issuecomment-3046609300 I kicked off some benchmark runs - https://github.com/apache/datafusion/pull/16711#issuecomment-3046608364 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Parquet filter pushdown v4 [arrow-rs]
alamb commented on PR #7850: URL: https://github.com/apache/arrow-rs/pull/7850#issuecomment-3044585055 I have begin trying to test this in DataFusion - https://github.com/apache/datafusion/pull/16690 I hit an issue - https://github.com/apache/arrow-rs/issues/7874 But that has subsequently resolved. I will continue trying to test this shortly -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Parquet filter pushdown v4 [arrow-rs]
XiangpengHao commented on PR #7850: URL: https://github.com/apache/arrow-rs/pull/7850#issuecomment-3032524503 > 🤖: Benchmark completed > > Details > ``` > group main pushdown-v4 > - --- > arrow_array_reader/BYTE_ARRAY/Decimal128Array/plain encoded, mandatory, no NULLs 1.06 1356.3±2.84µs? ?/sec1.00 1277.4±2.92µs? ?/sec > arrow_array_reader/BYTE_ARRAY/Decimal128Array/plain encoded, optional, half NULLs 1.02 1352.0±2.48µs? ?/sec1.00 1323.1±3.61µs? ?/sec > arrow_array_reader/BYTE_ARRAY/Decimal128Array/plain encoded, optional, no NULLs1.06 1361.7±3.15µs? ?/sec1.00 1283.6±2.09µs? ?/sec > arrow_array_reader/BinaryArray/dictionary encoded, mandatory, no NULLs 1.00484.4±6.57µs? ?/sec1.06 512.0±4.35µs? ?/sec > arrow_array_reader/BinaryArray/dictionary encoded, optional, half NULLs 1.00662.9±2.03µs? ?/sec1.05 694.0±2.13µs? ?/sec > arrow_array_reader/BinaryArray/dictionary encoded, optional, no NULLs 1.00485.8±3.76µs? ?/sec1.05 509.5±4.37µs? ?/sec > arrow_array_reader/BinaryArray/plain encoded, mandatory, no NULLs 1.09626.7±3.48µs? ?/sec1.00 577.1±3.17µs? ?/sec > arrow_array_reader/BinaryArray/plain encoded, optional, half NULLs 1.01772.8±2.90µs? ?/sec1.00 763.2±2.98µs? ?/sec > arrow_array_reader/BinaryArray/plain encoded, optional, no NULLs 1.07632.7±2.73µs? ?/sec1.00 590.5±4.25µs? ?/sec > arrow_array_reader/BinaryViewArray/dictionary encoded, mandatory, no NULLs 1.03258.8±3.21µs? ?/sec1.00 251.7±2.83µs? ?/sec > arrow_array_reader/BinaryViewArray/dictionary encoded, optional, half NULLs1.17269.3±0.80µs? ?/sec 1.00230.1±0.60µs? ?/sec > arrow_array_reader/BinaryViewArray/dictionary encoded, optional, no NULLs 1.00257.7±2.56µs? ?/sec1.00 258.5±3.28µs? ?/sec > arrow_array_reader/BinaryViewArray/plain encoded, mandatory, no NULLs 1.00309.6±1.51µs? ?/sec1.00 311.1±2.30µs? ?/sec > arrow_array_reader/BinaryViewArray/plain encoded, mandatory, no NULLs, short string1.00301.0±0.54µs? ?/sec1.07 321.4±0.61µs? ?/sec > arrow_array_reader/BinaryViewArray/plain encoded, optional, half NULLs 1.13306.2±1.12µs? ?/sec1.00 269.9±1.09µs? ?/sec > arrow_array_reader/BinaryViewArray/plain encoded, optional, no NULLs 1.00317.2±1.37µs? ?/sec1.00 318.4±1.88µs? ?/sec > arrow_array_reader/FIXED_LEN_BYTE_ARRAY/Decimal128Array/byte_stream_split encoded, mandatory, no NULLs 1.01 1077.6±2.48µs? ?/sec1.00 1066.7±1.91µs? ?/sec > arrow_array_reader/FIXED_LEN_BYTE_ARRAY/Decimal128Array/byte_stream_split encoded, optional, half NULLs1.05951.0±2.12µs? ?/sec1.00 902.7±2.82µs? ?/sec > arrow_array_reader/FIXED_LEN_BYTE_ARRAY/Decimal128Array/byte_stream_split encoded, optional, no NULLs 1.01 1083.5±2.79µs? ?/sec1.00 1074.1±4.83µs? ?/sec > arrow_array_reader/FIXED_LEN_BYTE_ARRAY/Decimal128Array/plain encoded, mandatory, no NULLs 1.04448.4±3.42µs? ?/sec1.00 432.8±4.39µs? ?/sec > arrow_array_reader/FIXED_LEN_BYTE_ARRAY/Decimal128Array/plain encoded, optional, half NULLs1.11630.6±1.87µs? ?/sec1.00 567.9±4.22µs? ?/sec > arrow_array_reader/FIXED_LEN_BYTE_ARRAY/Decimal128Array/plain encoded, optional, no NULLs 1.04457.8±4.89µs? ?/sec1.00 438.3±3.40µs? ?/sec > arrow_array_reader/FIXED_LEN_BYTE_ARRAY/Float16Array/byte_stream_split encoded, mandatory, no NULLs1.00153.1±0.31µs? ?/sec1.05 160.6±0.29µs? ?/sec > arrow_array_reader/FIXED_LEN_BYTE_ARRAY/Float16Array/byte_stream_split encoded, optional, half NULLs 1.19297.8±0.69µs? ?/sec1
Re: [PR] Parquet filter pushdown v4 [arrow-rs]
alamb commented on PR #7850:
URL: https://github.com/apache/arrow-rs/pull/7850#issuecomment-3032511001
🤖: Benchmark completed
Details
```
group
main pushdown-v4
-
---
arrow_array_reader/BYTE_ARRAY/Decimal128Array/plain encoded, mandatory, no
NULLs 1.06 1356.3±2.84µs? ?/sec1.00
1277.4±2.92µs? ?/sec
arrow_array_reader/BYTE_ARRAY/Decimal128Array/plain encoded, optional, half
NULLs 1.02 1352.0±2.48µs? ?/sec1.00
1323.1±3.61µs? ?/sec
arrow_array_reader/BYTE_ARRAY/Decimal128Array/plain encoded, optional, no
NULLs1.06 1361.7±3.15µs? ?/sec1.00
1283.6±2.09µs? ?/sec
arrow_array_reader/BinaryArray/dictionary encoded, mandatory, no NULLs
1.00484.4±6.57µs? ?/sec1.06
512.0±4.35µs? ?/sec
arrow_array_reader/BinaryArray/dictionary encoded, optional, half NULLs
1.00662.9±2.03µs? ?/sec1.05
694.0±2.13µs? ?/sec
arrow_array_reader/BinaryArray/dictionary encoded, optional, no NULLs
1.00485.8±3.76µs? ?/sec1.05
509.5±4.37µs? ?/sec
arrow_array_reader/BinaryArray/plain encoded, mandatory, no NULLs
1.09626.7±3.48µs? ?/sec1.00
577.1±3.17µs? ?/sec
arrow_array_reader/BinaryArray/plain encoded, optional, half NULLs
1.01772.8±2.90µs? ?/sec1.00
763.2±2.98µs? ?/sec
arrow_array_reader/BinaryArray/plain encoded, optional, no NULLs
1.07632.7±2.73µs? ?/sec1.00
590.5±4.25µs? ?/sec
arrow_array_reader/BinaryViewArray/dictionary encoded, mandatory, no NULLs
1.03258.8±3.21µs? ?/sec1.00
251.7±2.83µs? ?/sec
arrow_array_reader/BinaryViewArray/dictionary encoded, optional, half NULLs
1.17269.3±0.80µs? ?/sec1.00
230.1±0.60µs? ?/sec
arrow_array_reader/BinaryViewArray/dictionary encoded, optional, no NULLs
1.00257.7±2.56µs? ?/sec1.00
258.5±3.28µs? ?/sec
arrow_array_reader/BinaryViewArray/plain encoded, mandatory, no NULLs
1.00309.6±1.51µs? ?/sec1.00
311.1±2.30µs? ?/sec
arrow_array_reader/BinaryViewArray/plain encoded, mandatory, no NULLs, short
string1.00301.0±0.54µs? ?/sec1.07
321.4±0.61µs? ?/sec
arrow_array_reader/BinaryViewArray/plain encoded, optional, half NULLs
1.13306.2±1.12µs? ?/sec1.00
269.9±1.09µs? ?/sec
arrow_array_reader/BinaryViewArray/plain encoded, optional, no NULLs
1.00317.2±1.37µs? ?/sec1.00
318.4±1.88µs? ?/sec
arrow_array_reader/FIXED_LEN_BYTE_ARRAY/Decimal128Array/byte_stream_split
encoded, mandatory, no NULLs 1.01 1077.6±2.48µs? ?/sec1.00
1066.7±1.91µs? ?/sec
arrow_array_reader/FIXED_LEN_BYTE_ARRAY/Decimal128Array/byte_stream_split
encoded, optional, half NULLs1.05951.0±2.12µs? ?/sec1.00
902.7±2.82µs? ?/sec
arrow_array_reader/FIXED_LEN_BYTE_ARRAY/Decimal128Array/byte_stream_split
encoded, optional, no NULLs 1.01 1083.5±2.79µs? ?/sec1.00
1074.1±4.83µs? ?/sec
arrow_array_reader/FIXED_LEN_BYTE_ARRAY/Decimal128Array/plain encoded,
mandatory, no NULLs 1.04448.4±3.42µs? ?/sec1.00
432.8±4.39µs? ?/sec
arrow_array_reader/FIXED_LEN_BYTE_ARRAY/Decimal128Array/plain encoded,
optional, half NULLs1.11630.6±1.87µs? ?/sec1.00
567.9±4.22µs? ?/sec
arrow_array_reader/FIXED_LEN_BYTE_ARRAY/Decimal128Array/plain encoded,
optional, no NULLs 1.04457.8±4.89µs? ?/sec1.00
438.3±3.40µs? ?/sec
arrow_array_reader/FIXED_LEN_BYTE_ARRAY/Float16Array/byte_stream_split
encoded, mandatory, no NULLs1.00153.1±0.31µs? ?/sec1.05
160.6±0.29µs? ?/sec
arrow_array_reader/FIXED_LEN_BYTE_ARRAY/Float16Array/byte_stream_split
encoded, optional, half NULLs 1.19297.8±0.69µs? ?/sec1.00
249.8±0.82µs? ?/sec
arrow_array_reader
Re: [PR] Parquet filter pushdown v4 [arrow-rs]
XiangpengHao commented on PR #7850: URL: https://github.com/apache/arrow-rs/pull/7850#issuecomment-3032449772 > 🤖 `./gh_compare_arrow.sh` [Benchmark Script](https://github.com/alamb/datafusion-benchmarking/blob/main/gh_compare_arrow.sh) Running Linux aal-dev 6.11.0-1016-gcp #16~24.04.1-Ubuntu SMP Wed May 28 02:40:52 UTC 2025 x86_64 x86_64 x86_64 GNU/Linux Comparing pushdown-v4 ([1851f0b](https://github.com/apache/arrow-rs/commit/1851f0b6345a58d904e4a18e3f9b0b09fbf99b8e)) to [af8564f](https://github.com/apache/arrow-rs/commit/af8564f076109e79329063fc6c8fbb672e35c32e) [diff](https://github.com/apache/arrow-rs/compare/af8564f076109e79329063fc6c8fbb672e35c32e..1851f0b6345a58d904e4a18e3f9b0b09fbf99b8e) BENCH_NAME=arrow_reader BENCH_COMMAND=cargo bench --features=arrow,async,test_common,experimental --bench arrow_reader BENCH_FILTER= BENCH_BRANCH_NAME=pushdown-v4 Results will be posted here when complete Curious why we haven't get a result from this run? is there a panic? Another note is that this benchmark is run with 100MB limit, Q22 should get much better with a larger limit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Parquet filter pushdown v4 [arrow-rs]
XiangpengHao commented on code in PR #7850:
URL: https://github.com/apache/arrow-rs/pull/7850#discussion_r2182905388
##
parquet/src/arrow/array_reader/cached_array_reader.rs:
##
@@ -0,0 +1,635 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::array_reader::row_group_cache::BatchID;
+use crate::arrow::array_reader::{row_group_cache::RowGroupCache, ArrayReader};
+use crate::arrow::arrow_reader::RowSelector;
+use crate::errors::Result;
+use arrow_array::{new_empty_array, ArrayRef, BooleanArray};
+use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder};
+use arrow_schema::DataType as ArrowType;
+use std::any::Any;
+use std::collections::{HashMap, VecDeque};
+use std::sync::{Arc, Mutex};
+
+/// Role of the cached array reader
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum CacheRole {
+/// Producer role: inserts data into the cache during filter phase
+Producer,
+/// Consumer role: removes consumed data from the cache during output
building phase
+Consumer,
+}
+
+/// A cached wrapper around an ArrayReader that avoids duplicate decoding
+/// when the same column appears in both filter predicates and output
projection.
+///
+/// This reader acts as a transparent layer over the inner reader, using a
cache
+/// to avoid redundant work when the same data is needed multiple times.
+///
+/// The reader can operate in two roles:
+/// - Producer: During filter phase, inserts decoded data into the cache
+/// - Consumer: During output building, consumes and removes data from the
cache
+///
+/// This means the memory consumption of the cache has two stages:
+/// 1. During the filter phase, the memory increases as the cache is populated
+/// 2. It peaks when filters are built.
+/// 3. It decreases as the cached data is consumed.
+///â–²
+///│ â•─╮
+///│╱ ╲
+///│ ╱ ╲
+///│ ╱ ╲
+///│ ╱ ╲
+///│╱ ╲
+///└─╲──► Time
+///│ │ │
+///Filter Peak Consume
+///Phase (Built) (Decrease)
+pub struct CachedArrayReader {
+/// The underlying array reader
+inner: Box,
+/// Shared cache for this row group
+cache: Arc>,
+/// Column index for cache key generation
+column_idx: usize,
+/// Current logical position in the data stream (for cache key generation)
+outer_position: usize,
+/// Current position in the inner reader
+inner_position: usize,
+/// Batch size for the cache
+batch_size: usize,
+/// Selections to be applied to the next consume_batch()
+selections: VecDeque,
+/// Role of this reader (Producer or Consumer)
+role: CacheRole,
+/// Local buffer to store batches between read_records and consume_batch
calls
+/// This ensures data is available even if the shared cache evicts items
+local_buffer: HashMap,
+}
+
+impl CachedArrayReader {
+/// Creates a new cached array reader with the specified role
+pub fn new(
+inner: Box,
+cache: Arc>,
+column_idx: usize,
+role: CacheRole,
+) -> Self {
+let batch_size = cache.lock().unwrap().batch_size();
+
+Self {
+inner,
+cache,
+column_idx,
+outer_position: 0,
+inner_position: 0,
+batch_size,
+selections: VecDeque::new(),
+role,
+local_buffer: HashMap::new(),
+}
+}
+
+fn get_batch_id_from_position(&self, row_id: usize) -> BatchID {
+BatchID {
+val: row_id / self.batch_size,
+}
+}
+
+fn fetch_batch(&mut self, batch_id: BatchID) -> Result {
+let row_id = batch_id.val * self.batch_size;
+if self.inner_position < row_id {
+let to_skip = row_id - self.inner_position;
+let skipped = self.inner.skip_records(to_skip)?;
+assert_eq!(skipped, to_skip);
+self.inner_position += skipped;
+}
+
+let read = self.inner.read_records(self.batch_size)?;
+
+// If there are no remaining records (EOF), return immediately without
+// attempting to cache an empty batch. This prevents inserting
zero-length
+
Re: [PR] Parquet filter pushdown v4 [arrow-rs]
zhuqi-lucas commented on PR #7850: URL: https://github.com/apache/arrow-rs/pull/7850#issuecomment-3032419621 > > I am curious about the performance compared with no filter pushdown case, because previous try will also improve the performance for this benchmark. But compared to the no filter pushdown case, it has some regression. > > I will try and run this experiment later today Thank you @alamb , if it has no regression, i believe this PR will also resolve the adaptive selection cases, if it has regression, we can further combine the adaptive selection for final optimization. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Parquet filter pushdown v4 [arrow-rs]
alamb commented on PR #7850: URL: https://github.com/apache/arrow-rs/pull/7850#issuecomment-3032406091 > I am curious about the performance compared with no filter pushdown case, because previous try will also improve the performance for this benchmark. But compared to the no filter pushdown case, it has some regression. I will try and run this experiment later today -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Parquet filter pushdown v4 [arrow-rs]
zhuqi-lucas commented on PR #7850: URL: https://github.com/apache/arrow-rs/pull/7850#issuecomment-3032399679 > > 🤖: Benchmark completed > > 😎 -- very nice Great result! I am curious about the performance compared with no filter pushdown case, because previous try will also improve the performance for this benchmark. But compared to the no filter pushdown case, it has some regression. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Parquet filter pushdown v4 [arrow-rs]
alamb commented on PR #7850: URL: https://github.com/apache/arrow-rs/pull/7850#issuecomment-3032385583 > 🤖: Benchmark completed 😎 -- very nice -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Parquet filter pushdown v4 [arrow-rs]
alamb commented on PR #7850: URL: https://github.com/apache/arrow-rs/pull/7850#issuecomment-3032245611 🤖 `./gh_compare_arrow.sh` [Benchmark Script](https://github.com/alamb/datafusion-benchmarking/blob/main/gh_compare_arrow.sh) Running Linux aal-dev 6.11.0-1016-gcp #16~24.04.1-Ubuntu SMP Wed May 28 02:40:52 UTC 2025 x86_64 x86_64 x86_64 GNU/Linux Comparing pushdown-v4 (1851f0b6345a58d904e4a18e3f9b0b09fbf99b8e) to af8564f076109e79329063fc6c8fbb672e35c32e [diff](https://github.com/apache/arrow-rs/compare/af8564f076109e79329063fc6c8fbb672e35c32e..1851f0b6345a58d904e4a18e3f9b0b09fbf99b8e) BENCH_NAME=arrow_reader BENCH_COMMAND=cargo bench --features=arrow,async,test_common,experimental --bench arrow_reader BENCH_FILTER= BENCH_BRANCH_NAME=pushdown-v4 Results will be posted here when complete -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Parquet filter pushdown v4 [arrow-rs]
alamb commented on PR #7850: URL: https://github.com/apache/arrow-rs/pull/7850#issuecomment-3032245444 🤖: Benchmark completed Details ``` groupmain pushdown-v4 - --- arrow_reader_clickbench/async/Q1 1.00 2.4±0.02ms? ?/sec 1.00 2.4±0.12ms? ?/sec arrow_reader_clickbench/async/Q101.00 10.4±0.11ms? ?/sec 1.10 11.5±0.26ms? ?/sec arrow_reader_clickbench/async/Q111.00 12.4±0.14ms? ?/sec 1.09 13.5±0.18ms? ?/sec arrow_reader_clickbench/async/Q121.34 34.4±0.29ms? ?/sec 1.00 25.7±0.22ms? ?/sec arrow_reader_clickbench/async/Q131.23 48.6±0.32ms? ?/sec 1.00 39.5±0.26ms? ?/sec arrow_reader_clickbench/async/Q141.24 46.3±0.35ms? ?/sec 1.00 37.2±0.27ms? ?/sec arrow_reader_clickbench/async/Q191.00 5.2±0.05ms? ?/sec 1.08 5.6±0.07ms? ?/sec arrow_reader_clickbench/async/Q201.32161.7±0.73ms? ?/sec 1.00122.3±0.50ms? ?/sec arrow_reader_clickbench/async/Q211.30207.7±0.83ms? ?/sec 1.00159.6±0.65ms? ?/sec arrow_reader_clickbench/async/Q221.06479.2±2.17ms? ?/sec 1.00450.6±8.27ms? ?/sec arrow_reader_clickbench/async/Q231.13 492.5±12.42ms? ?/sec 1.00 436.3±14.78ms? ?/sec arrow_reader_clickbench/async/Q241.21 53.8±0.69ms? ?/sec 1.00 44.3±0.41ms? ?/sec arrow_reader_clickbench/async/Q271.52163.9±0.89ms? ?/sec 1.00107.7±0.60ms? ?/sec arrow_reader_clickbench/async/Q281.45160.0±0.86ms? ?/sec 1.00110.3±0.47ms? ?/sec arrow_reader_clickbench/async/Q301.00 61.5±0.37ms? ?/sec 1.00 61.6±0.37ms? ?/sec arrow_reader_clickbench/async/Q361.33169.1±0.95ms? ?/sec 1.00127.2±0.54ms? ?/sec arrow_reader_clickbench/async/Q371.01100.1±0.47ms? ?/sec 1.00 98.7±0.39ms? ?/sec arrow_reader_clickbench/async/Q381.00 39.6±0.23ms? ?/sec 1.00 39.5±0.25ms? ?/sec arrow_reader_clickbench/async/Q391.02 49.6±0.20ms? ?/sec 1.00 48.9±0.43ms? ?/sec arrow_reader_clickbench/async/Q401.05 54.1±0.36ms? ?/sec 1.00 51.7±0.44ms? ?/sec arrow_reader_clickbench/async/Q411.00 40.8±0.26ms? ?/sec 1.01 41.1±0.34ms? ?/sec arrow_reader_clickbench/async/Q421.00 14.5±0.12ms? ?/sec 1.00 14.5±0.10ms? ?/sec arrow_reader_clickbench/sync/Q1 1.00 2.2±0.00ms? ?/sec 1.00 2.2±0.01ms? ?/sec arrow_reader_clickbench/sync/Q10 1.00 9.2±0.09ms? ?/sec 1.01 9.3±0.08ms? ?/sec arrow_reader_clickbench/sync/Q11 1.00 11.1±0.07ms? ?/sec 1.01 11.2±0.07ms? ?/sec arrow_reader_clickbench/sync/Q12 1.00 36.4±0.28ms? ?/sec 1.00 36.4±0.29ms? ?/sec arrow_reader_clickbench/sync/Q13 1.00 49.9±0.41ms? ?/sec 1.00 49.9±0.38ms? ?/sec arrow_reader_clickbench/sync/Q14 1.00 47.9±0.28ms? ?/sec 1.01 48.2±0.38ms? ?/sec arrow_reader_clickbench/sync/Q19 1.02 4.3±0.02ms? ?/sec 1.00 4.2±0.02ms? ?/sec arrow_reader_clickbench/sync/Q20 1.01178.1±0.90ms? ?/sec 1.00176.8±0.72ms? ?/sec arrow_reader_clickbench/sync/Q21 1.00233.1±2.45ms? ?/sec 1.00233.5±0.83ms? ?/sec arrow_reader_clickbench/sync/Q22 1.01479.4±2.39ms? ?/sec 1.00476.4±2.19ms? ?/sec arrow_reader_clickbench/sync/Q23 1.02 443.9±12.86ms? ?/sec 1.00 435.5±16.08ms? ?/sec arrow_reader_clickbench/sync/Q24 1.00 51.0±0.52ms? ?/sec 1.01 51.7±0.65ms? ?/sec arrow_reader_clickbench/sync/Q27 1.00153.9±0.61ms? ?/sec 1.00153.3±0.68ms? ?/sec arrow_reader_clickbench/sync/Q28 1.01150.4±0.65ms? ?/sec 1.00149.2±0.86ms? ?/sec arrow_reader_clickbench/sync/Q30 1.01 59.3±0.40ms? ?/sec 1.00 58.9±0.39ms? ?/sec arrow_reader_clickbench/sync/Q36 1.01158.6±1.04ms? ?/sec 1.00157.7±0.94ms? ?/sec arrow_reader_clickbench/sync/Q37 1.01 93.2±0.44ms? ?/sec 1.00 92.5±0.42ms? ?/sec arrow_reader_clickbench/sync/Q38 1.00 31.9±0.20ms? ?/sec 1.01 32.2±0.29ms? ?/sec arrow_
Re: [PR] Parquet filter pushdown v4 [arrow-rs]
alamb commented on PR #7850: URL: https://github.com/apache/arrow-rs/pull/7850#issuecomment-3032163709 🤖 `./gh_compare_arrow.sh` [Benchmark Script](https://github.com/alamb/datafusion-benchmarking/blob/main/gh_compare_arrow.sh) Running Linux aal-dev 6.11.0-1016-gcp #16~24.04.1-Ubuntu SMP Wed May 28 02:40:52 UTC 2025 x86_64 x86_64 x86_64 GNU/Linux Comparing pushdown-v4 (1851f0b6345a58d904e4a18e3f9b0b09fbf99b8e) to af8564f076109e79329063fc6c8fbb672e35c32e [diff](https://github.com/apache/arrow-rs/compare/af8564f076109e79329063fc6c8fbb672e35c32e..1851f0b6345a58d904e4a18e3f9b0b09fbf99b8e) BENCH_NAME=arrow_reader_clickbench BENCH_COMMAND=cargo bench --features=arrow,async,test_common,experimental --bench arrow_reader_clickbench BENCH_FILTER= BENCH_BRANCH_NAME=pushdown-v4 Results will be posted here when complete -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Parquet filter pushdown v4 [arrow-rs]
alamb commented on code in PR #7850:
URL: https://github.com/apache/arrow-rs/pull/7850#discussion_r2182645830
##
parquet/src/arrow/async_reader/mod.rs:
##
@@ -661,13 +681,30 @@ where
let plan = plan_builder.build();
-let array_reader = ArrayReaderBuilder::new(&row_group)
-.build_array_reader(self.fields.as_deref(), &projection)?;
+let array_reader =
ArrayReaderBuilder::new(&row_group).build_array_reader_with_cache(
+self.fields.as_deref(),
+&projection,
+CacheOptions {
+projection_mask: &cache_projection,
+cache: row_group_cache.clone(),
+role: crate::arrow::array_reader::CacheRole::Consumer,
+},
+)?;
let reader = ParquetRecordBatchReader::new(array_reader, plan);
Ok((self, Some(reader)))
}
+
+fn compute_cache_projection(&self, projection: &ProjectionMask) ->
Option {
Review Comment:
```suggestion
/// Compute which columns are used in filters and the final (output)
projection
fn compute_cache_projection(&self, projection: &ProjectionMask) ->
Option {
```
##
parquet/src/arrow/async_reader/mod.rs:
##
@@ -613,8 +623,18 @@ where
.fetch(&mut self.input, predicate.projection(), selection)
.await?;
+let mut cache_projection = predicate.projection().clone();
+cache_projection.intersect(&projection);
let array_reader = ArrayReaderBuilder::new(&row_group)
-.build_array_reader(self.fields.as_deref(),
predicate.projection())?;
+.build_array_reader_with_cache(
+self.fields.as_deref(),
+predicate.projection(),
+CacheOptions {
+projection_mask: &cache_projection,
+cache: row_group_cache.clone(),
+role:
crate::arrow::array_reader::CacheRole::Producer,
+},
Review Comment:
structurally both here and below it might help to keep the creation ofthe
`CacheOptions` into the cache itself so a reader of this code doesn't have to
understand the innards of the cache
```suggestion
row_group_cache.producer_options(projection,
predicate.proection())
```
##
parquet/src/arrow/array_reader/cached_array_reader.rs:
##
@@ -0,0 +1,635 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::array_reader::row_group_cache::BatchID;
+use crate::arrow::array_reader::{row_group_cache::RowGroupCache, ArrayReader};
+use crate::arrow::arrow_reader::RowSelector;
+use crate::errors::Result;
+use arrow_array::{new_empty_array, ArrayRef, BooleanArray};
+use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder};
+use arrow_schema::DataType as ArrowType;
+use std::any::Any;
+use std::collections::{HashMap, VecDeque};
+use std::sync::{Arc, Mutex};
+
+/// Role of the cached array reader
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum CacheRole {
+/// Producer role: inserts data into the cache during filter phase
+Producer,
+/// Consumer role: removes consumed data from the cache during output
building phase
+Consumer,
+}
+
+/// A cached wrapper around an ArrayReader that avoids duplicate decoding
+/// when the same column appears in both filter predicates and output
projection.
+///
+/// This reader acts as a transparent layer over the inner reader, using a
cache
+/// to avoid redundant work when the same data is needed multiple times.
+///
+/// The reader can operate in two roles:
+/// - Producer: During filter phase, inserts decoded data into the cache
+/// - Consumer: During output building, consumes and removes data from the
cache
+///
+/// This means the memory consumption of the cache has two stages:
+/// 1. During the filter phase, the memory increases as the cache is populated
+/// 2. It peaks when filters are built.
+/// 3. It decreases as the cached data is consumed.
+///â–²
+///│ â•─╮
+///│╱ ╲
+///│ ╱ ╲
+///│
Re: [PR] Parquet filter pushdown v4 [arrow-rs]
zhuqi-lucas commented on PR #7850: URL: https://github.com/apache/arrow-rs/pull/7850#issuecomment-3031691065 Thank you @XiangpengHao for amazing work, i will try to review and test this PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Parquet filter pushdown v4 [arrow-rs]
alamb commented on PR #7850: URL: https://github.com/apache/arrow-rs/pull/7850#issuecomment-3029302979 😮 -- My brain is likely too fried at the moment to review this properly but it is on my list for first thing tomorrow -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Parquet filter pushdown v4 [arrow-rs]
XiangpengHao commented on code in PR #7850:
URL: https://github.com/apache/arrow-rs/pull/7850#discussion_r2180920730
##
parquet/src/arrow/array_reader/cached_array_reader.rs:
##
@@ -0,0 +1,621 @@
+use crate::arrow::array_reader::row_group_cache::BatchID;
+use crate::arrow::array_reader::{row_group_cache::RowGroupCache, ArrayReader};
+use crate::arrow::arrow_reader::RowSelector;
+use crate::errors::Result;
+use arrow_array::{new_empty_array, ArrayRef, BooleanArray};
+use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder};
+use arrow_schema::DataType as ArrowType;
+use std::any::Any;
+use std::collections::{HashMap, VecDeque};
+use std::sync::{Arc, Mutex};
+
+/// Role of the cached array reader
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum CacheRole {
+/// Producer role: inserts data into the cache during filter phase
+Producer,
+/// Consumer role: removes consumed data from the cache during output
building phase
+Consumer,
+}
+
+/// A cached wrapper around an ArrayReader that avoids duplicate decoding
+/// when the same column appears in both filter predicates and output
projection.
+///
+/// This reader acts as a transparent layer over the inner reader, using a
cache
+/// to avoid redundant work when the same data is needed multiple times.
+///
+/// The reader can operate in two roles:
+/// - Producer: During filter phase, inserts decoded data into the cache
+/// - Consumer: During output building, consumes and removes data from the
cache
+///
+/// This means the memory consumption of the cache has two stages:
+/// 1. During the filter phase, the memory increases as the cache is populated
+/// 2. It peaks when filters are built.
+/// 3. It decreases as the cached data is consumed.
+///â–²
+///│ â•─╮
+///│╱ ╲
+///│ ╱ ╲
+///│ ╱ ╲
+///│ ╱ ╲
+///│╱ ╲
+///└─╲──► Time
+///│ │ │
+///Filter Peak Consume
+///Phase (Built) (Decrease)
+pub struct CachedArrayReader {
+/// The underlying array reader
+inner: Box,
+/// Shared cache for this row group
+cache: Arc>,
+/// Column index for cache key generation
+column_idx: usize,
+/// Current logical position in the data stream (for cache key generation)
+outer_position: usize,
+/// Current position in the inner reader
+inner_position: usize,
+/// Batch size for the cache
+batch_size: usize,
+/// Selections to be applied to the next consume_batch()
+selections: VecDeque,
+/// Role of this reader (Producer or Consumer)
+role: CacheRole,
+/// Local buffer to store batches between read_records and consume_batch
calls
+/// This ensures data is available even if the shared cache evicts items
+local_buffer: HashMap,
+}
+
+impl CachedArrayReader {
+/// Creates a new cached array reader with the specified role
+pub fn new(
+inner: Box,
+cache: Arc>,
+column_idx: usize,
+role: CacheRole,
+) -> Self {
+let batch_size = cache.lock().unwrap().batch_size();
+
+Self {
+inner,
+cache,
+column_idx,
+outer_position: 0,
+inner_position: 0,
+batch_size,
+selections: VecDeque::new(),
+role,
+local_buffer: HashMap::new(),
+}
+}
+
+fn get_batch_id_from_position(&self, row_id: usize) -> BatchID {
+BatchID {
+val: row_id / self.batch_size,
+}
+}
+
+fn fetch_batch(&mut self, batch_id: BatchID) -> Result {
+let row_id = batch_id.val * self.batch_size;
+if self.inner_position < row_id {
+let to_skip = row_id - self.inner_position;
+let skipped = self.inner.skip_records(to_skip)?;
+assert_eq!(skipped, to_skip);
+self.inner_position += skipped;
+}
+
+let read = self.inner.read_records(self.batch_size)?;
+
+// If there are no remaining records (EOF), return immediately without
+// attempting to cache an empty batch. This prevents inserting
zero-length
+// arrays into the cache which can later cause panics when slicing.
+if read == 0 {
+return Ok(0);
+}
+
+let array = self.inner.consume_batch()?;
+
+// Store in both shared cache and local cache
+// The shared cache is for coordination between readers
+// The local cache ensures data is available for our consume_batch call
+let _cached = self
+.cache
+.lock()
+.unwrap()
+.insert(self.column_idx, batch_id, array.clone());
+// Note: if the shared cache is full (_cached == false), we continue
without caching
+// The local cache will still store the data for this reader's use
+
+self.local_buffer.insert(batch_id, array);
+
+self.inner_position += read;
+
Re: [PR] Parquet filter pushdown v4 [arrow-rs]
XiangpengHao commented on code in PR #7850: URL: https://github.com/apache/arrow-rs/pull/7850#discussion_r2180919509 ## parquet/src/arrow/async_reader/mod.rs: ## @@ -613,8 +623,18 @@ where .fetch(&mut self.input, predicate.projection(), selection) .await?; +let mut cache_projection = predicate.projection().clone(); +cache_projection.intersect(&projection); Review Comment: A column is cached if and only if it appears both in output projection and filter projection -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
