Re: [PR] WIP: Test DataFusion with experimental parquet pushdown [datafusion]

2025-05-30 Thread via GitHub


Dandandan commented on code in PR #16208:
URL: https://github.com/apache/datafusion/pull/16208#discussion_r2115799584


##
datafusion/physical-plan/src/filter.rs:
##
@@ -689,6 +702,46 @@ fn filter_and_project(
 })
 }
 
+impl FilterExecStream {
+/// Evaluates the predicate filter on the given batch and appends and rows 
that match
+/// to the in progress output batch builder.
+fn filter_batch(&mut self, batch: RecordBatch) -> Result<()> {
+self.predicate
+.evaluate(&batch)
+.and_then(|v| v.into_array(batch.num_rows()))
+.and_then(|filter| {
+let Some(filter) = filter.as_boolean_opt() else {
+return internal_err!(
+"Cannot create filter_array from non-boolean 
predicates"
+);
+};
+
+let batch = match self.projection.as_ref() {
+Some(projection) => {
+let projected_columns = projection
+.iter()
+.map(|i| Arc::clone(batch.column(*i)))
+.collect();
+// Safety -- the input was a valid RecordBatch and 
thus the projection is too
+unsafe {
+RecordBatch::new_unchecked(
+Arc::clone(&self.schema),
+projected_columns,
+batch.num_rows(),
+)
+}
+}
+None => batch,
+};
+let output_batch_builder = self
+.output_batch_builder
+.as_mut()
+.expect("output_batch_builder should be Some");
+Ok(output_batch_builder.append_filtered(batch, filter)?)

Review Comment:
   > space each batch will take up front and can just straight up allocate
   Hm yeah with primitive and views this might be okay - we might have to test 
other data sources though (with normal binary types were it can't be 
preallocated).



##
datafusion/physical-plan/src/filter.rs:
##
@@ -689,6 +702,46 @@ fn filter_and_project(
 })
 }
 
+impl FilterExecStream {
+/// Evaluates the predicate filter on the given batch and appends and rows 
that match
+/// to the in progress output batch builder.
+fn filter_batch(&mut self, batch: RecordBatch) -> Result<()> {
+self.predicate
+.evaluate(&batch)
+.and_then(|v| v.into_array(batch.num_rows()))
+.and_then(|filter| {
+let Some(filter) = filter.as_boolean_opt() else {
+return internal_err!(
+"Cannot create filter_array from non-boolean 
predicates"
+);
+};
+
+let batch = match self.projection.as_ref() {
+Some(projection) => {
+let projected_columns = projection
+.iter()
+.map(|i| Arc::clone(batch.column(*i)))
+.collect();
+// Safety -- the input was a valid RecordBatch and 
thus the projection is too
+unsafe {
+RecordBatch::new_unchecked(
+Arc::clone(&self.schema),
+projected_columns,
+batch.num_rows(),
+)
+}
+}
+None => batch,
+};
+let output_batch_builder = self
+.output_batch_builder
+.as_mut()
+.expect("output_batch_builder should be Some");
+Ok(output_batch_builder.append_filtered(batch, filter)?)

Review Comment:
   > space each batch will take up front and can just straight up allocate
   
   Hm yeah with primitive and views this might be okay - we might have to test 
other data sources though (with normal binary types were it can't be 
preallocated).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] WIP: Test DataFusion with experimental parquet pushdown [datafusion]

2025-05-30 Thread via GitHub


alamb commented on code in PR #16208:
URL: https://github.com/apache/datafusion/pull/16208#discussion_r2115668031


##
datafusion/physical-plan/src/filter.rs:
##
@@ -689,6 +702,46 @@ fn filter_and_project(
 })
 }
 
+impl FilterExecStream {
+/// Evaluates the predicate filter on the given batch and appends and rows 
that match
+/// to the in progress output batch builder.
+fn filter_batch(&mut self, batch: RecordBatch) -> Result<()> {
+self.predicate
+.evaluate(&batch)
+.and_then(|v| v.into_array(batch.num_rows()))
+.and_then(|filter| {
+let Some(filter) = filter.as_boolean_opt() else {
+return internal_err!(
+"Cannot create filter_array from non-boolean 
predicates"
+);
+};
+
+let batch = match self.projection.as_ref() {
+Some(projection) => {
+let projected_columns = projection
+.iter()
+.map(|i| Arc::clone(batch.column(*i)))
+.collect();
+// Safety -- the input was a valid RecordBatch and 
thus the projection is too
+unsafe {
+RecordBatch::new_unchecked(
+Arc::clone(&self.schema),
+projected_columns,
+batch.num_rows(),
+)
+}
+}
+None => batch,
+};
+let output_batch_builder = self
+.output_batch_builder
+.as_mut()
+.expect("output_batch_builder should be Some");
+Ok(output_batch_builder.append_filtered(batch, filter)?)

Review Comment:
   > This avoids reallocations / copying as the target capacity can be 
calculated.
   In order to avoid buffering too much batches probably have to limit this / 
create a batch anyway after x batches or having x megabytes in memory.
   
   I was trying to avoid having any reallocations  in  the 
[`IncrementalRecordBatchBuilder`](https://github.com/apache/arrow-rs/pull/7513/files#diff-52e0762696101af1d088555b3d8bbe861f53dc4e9e6a6edd37eee22f950ec742R48)
 -- since we know the target output batch size (`batch_size`) it knows how much 
space each batch will take up front and can just straight up allocate it ( 
`instantiate_builder`  function creates the builders with `with_capacity`)
   
   However, now that I think about it, after a call to `finish()` the updated 
builder doesn't have the right allocation πŸ€” 
   
   I'll look into that more later today
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] WIP: Test DataFusion with experimental parquet pushdown [datafusion]

2025-05-30 Thread via GitHub


Dandandan commented on code in PR #16208:
URL: https://github.com/apache/datafusion/pull/16208#discussion_r2115388263


##
datafusion/physical-plan/src/filter.rs:
##
@@ -689,6 +702,46 @@ fn filter_and_project(
 })
 }
 
+impl FilterExecStream {
+/// Evaluates the predicate filter on the given batch and appends and rows 
that match
+/// to the in progress output batch builder.
+fn filter_batch(&mut self, batch: RecordBatch) -> Result<()> {
+self.predicate
+.evaluate(&batch)
+.and_then(|v| v.into_array(batch.num_rows()))
+.and_then(|filter| {
+let Some(filter) = filter.as_boolean_opt() else {
+return internal_err!(
+"Cannot create filter_array from non-boolean 
predicates"
+);
+};
+
+let batch = match self.projection.as_ref() {
+Some(projection) => {
+let projected_columns = projection
+.iter()
+.map(|i| Arc::clone(batch.column(*i)))
+.collect();
+// Safety -- the input was a valid RecordBatch and 
thus the projection is too
+unsafe {
+RecordBatch::new_unchecked(
+Arc::clone(&self.schema),
+projected_columns,
+batch.num_rows(),
+)
+}
+}
+None => batch,
+};
+let output_batch_builder = self
+.output_batch_builder
+.as_mut()
+.expect("output_batch_builder should be Some");
+Ok(output_batch_builder.append_filtered(batch, filter)?)

Review Comment:
   I expect it should be faster rather than incrementally build a new batch 
based on a number of arrays, to first evaluate a number of filters 
(`self.predicate.evaluate(&batch)` until the number of true values reaches the 
target batch size and then have a filter api to filter a list of batches. This 
avoids reallocations / copying as the target capacity can be calculated
   .
   In order to avoid buffering too much batches probably have to limit this / 
create a batch anyway after x batches or having x megabytes in memory.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] WIP: Test DataFusion with experimental parquet pushdown [datafusion]

2025-05-30 Thread via GitHub


Dandandan commented on code in PR #16208:
URL: https://github.com/apache/datafusion/pull/16208#discussion_r2115388263


##
datafusion/physical-plan/src/filter.rs:
##
@@ -689,6 +702,46 @@ fn filter_and_project(
 })
 }
 
+impl FilterExecStream {
+/// Evaluates the predicate filter on the given batch and appends and rows 
that match
+/// to the in progress output batch builder.
+fn filter_batch(&mut self, batch: RecordBatch) -> Result<()> {
+self.predicate
+.evaluate(&batch)
+.and_then(|v| v.into_array(batch.num_rows()))
+.and_then(|filter| {
+let Some(filter) = filter.as_boolean_opt() else {
+return internal_err!(
+"Cannot create filter_array from non-boolean 
predicates"
+);
+};
+
+let batch = match self.projection.as_ref() {
+Some(projection) => {
+let projected_columns = projection
+.iter()
+.map(|i| Arc::clone(batch.column(*i)))
+.collect();
+// Safety -- the input was a valid RecordBatch and 
thus the projection is too
+unsafe {
+RecordBatch::new_unchecked(
+Arc::clone(&self.schema),
+projected_columns,
+batch.num_rows(),
+)
+}
+}
+None => batch,
+};
+let output_batch_builder = self
+.output_batch_builder
+.as_mut()
+.expect("output_batch_builder should be Some");
+Ok(output_batch_builder.append_filtered(batch, filter)?)

Review Comment:
   I expect it should be faster rather than incrementally build a new batch 
based on a number of arrays, to first evaluate a number of filters 
(`self.predicate.evaluate(&batch)` until the number of true values reaches the 
target batch size and then have a filter api to filter a list of batches. This 
avoids reallocations / copying as the target capacity can be calculated.
   In order to avoid buffering too much batches probably have to limit this / 
create a batch anyway after x batches or having x megabytes in memory.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] WIP: Test DataFusion with experimental parquet pushdown [datafusion]

2025-05-30 Thread via GitHub


Dandandan commented on code in PR #16208:
URL: https://github.com/apache/datafusion/pull/16208#discussion_r2115400750


##
datafusion/physical-plan/src/filter.rs:
##
@@ -689,6 +702,46 @@ fn filter_and_project(
 })
 }
 
+impl FilterExecStream {
+/// Evaluates the predicate filter on the given batch and appends and rows 
that match
+/// to the in progress output batch builder.
+fn filter_batch(&mut self, batch: RecordBatch) -> Result<()> {
+self.predicate
+.evaluate(&batch)
+.and_then(|v| v.into_array(batch.num_rows()))
+.and_then(|filter| {
+let Some(filter) = filter.as_boolean_opt() else {
+return internal_err!(
+"Cannot create filter_array from non-boolean 
predicates"
+);
+};
+
+let batch = match self.projection.as_ref() {
+Some(projection) => {
+let projected_columns = projection
+.iter()
+.map(|i| Arc::clone(batch.column(*i)))
+.collect();
+// Safety -- the input was a valid RecordBatch and 
thus the projection is too
+unsafe {
+RecordBatch::new_unchecked(
+Arc::clone(&self.schema),
+projected_columns,
+batch.num_rows(),
+)
+}
+}
+None => batch,
+};
+let output_batch_builder = self
+.output_batch_builder
+.as_mut()
+.expect("output_batch_builder should be Some");
+Ok(output_batch_builder.append_filtered(batch, filter)?)

Review Comment:
   My feeling is this should to do this "multi batch filter" and then `concat` 
anyway if smaller batches are generated by this approach, rather than using a 
builder approach.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] WIP: Test DataFusion with experimental parquet pushdown [datafusion]

2025-05-29 Thread via GitHub


zhuqi-lucas commented on PR #16208:
URL: https://github.com/apache/datafusion/pull/16208#issuecomment-2921147300

   Interesting, i am confused why it's different from arrow-rs clickbench 
result.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] WIP: Test DataFusion with experimental parquet pushdown [datafusion]

2025-05-29 Thread via GitHub


zhuqi-lucas commented on PR #16208:
URL: https://github.com/apache/datafusion/pull/16208#issuecomment-2921150637

   Ok, i misunderstood this testing is not for parquet filter pushdown 
enabled...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] WIP: Test DataFusion with experimental parquet pushdown [datafusion]

2025-05-29 Thread via GitHub


alamb commented on PR #16208:
URL: https://github.com/apache/datafusion/pull/16208#issuecomment-2920064834

   > πŸ€–: Benchmark completed
   
   Hmm that is somewhat depressing. I will investigate tomorrow


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] WIP: Test DataFusion with experimental parquet pushdown [datafusion]

2025-05-29 Thread via GitHub


alamb commented on PR #16208:
URL: https://github.com/apache/datafusion/pull/16208#issuecomment-2920053295

   πŸ€–: Benchmark completed
   
   Details
   
   
   
   ```
   Comparing HEAD and alamb_test_filter_pushdown
   
   Benchmark clickbench_extended.json
   
   ┏━━┳┳┳━━┓
   ┃ Query┃   HEAD ┃ alamb_test_filter_pushdown ┃   Change ┃
   ┑━━╇╇╇━━┩
   β”‚ QQuery 0 β”‚  1822.08ms β”‚  1950.22ms β”‚ 1.07x slower β”‚
   β”‚ QQuery 1 β”‚   686.15ms β”‚   725.22ms β”‚ 1.06x slower β”‚
   β”‚ QQuery 2 β”‚  1421.43ms β”‚  1437.38ms β”‚no change β”‚
   β”‚ QQuery 3 β”‚   684.69ms β”‚   705.23ms β”‚no change β”‚
   β”‚ QQuery 4 β”‚  1456.23ms β”‚  1479.06ms β”‚no change β”‚
   β”‚ QQuery 5 β”‚ 15371.83ms β”‚ 15471.55ms β”‚no change β”‚
   β”‚ QQuery 6 β”‚  2033.56ms β”‚  2057.13ms β”‚no change β”‚
   β”‚ QQuery 7 β”‚  2091.87ms β”‚  2101.95ms β”‚no change β”‚
   β”‚ QQuery 8 β”‚   849.28ms β”‚   854.59ms β”‚no change β”‚
   β””β”€β”€β”΄β”΄β”΄β”€β”€β”˜
   ┏━━━┳┓
   ┃ Benchmark Summary ┃┃
   ┑━━━╇┩
   β”‚ Total Time (HEAD) β”‚ 26417.11ms β”‚
   β”‚ Total Time (alamb_test_filter_pushdown)   β”‚ 26782.33ms β”‚
   β”‚ Average Time (HEAD)   β”‚  2935.23ms β”‚
   β”‚ Average Time (alamb_test_filter_pushdown) β”‚  2975.81ms β”‚
   β”‚ Queries Fasterβ”‚  0 β”‚
   β”‚ Queries Slowerβ”‚  2 β”‚
   β”‚ Queries with No Changeβ”‚  7 β”‚
   β””β”€β”€β”€β”΄β”˜
   
   Benchmark clickbench_partitioned.json
   
   ┏━━┳┳┳━━━┓
   ┃ Query┃   HEAD ┃ alamb_test_filter_pushdown ┃Change ┃
   ┑━━╇╇╇━━━┩
   β”‚ QQuery 0 β”‚15.73ms β”‚14.77ms β”‚ +1.06x faster β”‚
   β”‚ QQuery 1 β”‚31.94ms β”‚32.66ms β”‚ no change β”‚
   β”‚ QQuery 2 β”‚80.09ms β”‚79.35ms β”‚ no change β”‚
   β”‚ QQuery 3 β”‚98.77ms β”‚95.13ms β”‚ no change β”‚
   β”‚ QQuery 4 β”‚   605.02ms β”‚   602.99ms β”‚ no change β”‚
   β”‚ QQuery 5 β”‚   868.32ms β”‚   856.53ms β”‚ no change β”‚
   β”‚ QQuery 6 β”‚23.87ms β”‚23.91ms β”‚ no change β”‚
   β”‚ QQuery 7 β”‚37.19ms β”‚38.29ms β”‚ no change β”‚
   β”‚ QQuery 8 β”‚   933.70ms β”‚   914.39ms β”‚ no change β”‚
   β”‚ QQuery 9 β”‚  1279.67ms β”‚  1209.80ms β”‚ +1.06x faster β”‚
   β”‚ QQuery 10β”‚   272.21ms β”‚   273.11ms β”‚ no change β”‚
   β”‚ QQuery 11β”‚   308.41ms β”‚   298.73ms β”‚ no change β”‚
   β”‚ QQuery 12β”‚   930.67ms β”‚   910.59ms β”‚ no change β”‚
   β”‚ QQuery 13β”‚  1348.50ms β”‚  1369.46ms β”‚ no change β”‚
   β”‚ QQuery 14β”‚   860.32ms β”‚   844.04ms β”‚ no change β”‚
   β”‚ QQuery 15β”‚   843.03ms β”‚   821.28ms β”‚ no change β”‚
   β”‚ QQuery 16β”‚  1746.35ms β”‚  1729.29ms β”‚ no change β”‚
   β”‚ QQuery 17β”‚  1634.90ms β”‚  1603.86ms β”‚ no change β”‚
   β”‚ QQuery 18β”‚  3126.70ms β”‚  3105.76ms β”‚ no change β”‚
   β”‚ QQuery 19β”‚84.55ms β”‚81.44ms β”‚ no change β”‚
   β”‚ QQuery 20β”‚  1152.02ms β”‚  1247.44ms β”‚  1.08x slower β”‚
   β”‚ QQuery 21β”‚  1340.97ms β”‚  1376.02ms β”‚ no change β”‚
   β”‚ QQuery 22β”‚  2221.77ms β”‚  2373.59ms β”‚  1.07x slower β”‚
   β”‚ QQuery 23β”‚  8067.48ms β”‚  8453.86ms β”‚ no change β”‚
   β”‚ QQuery 24β”‚   472.64ms β”‚   465.06ms β”‚ no change β”‚
   β”‚ QQuery 25β”‚   396.53ms β”‚   398.05ms β”‚ no change β”‚
   β”‚ QQuery 26β”‚   535.55ms β”‚   536.82ms β”‚ no change β”‚
   β”‚ QQuery 27β”‚  1576.77ms β”‚  1702.88ms β”‚  1.08x slower β”‚
   β”‚ QQuery 28β”‚ 12485.75ms β”‚ 13253.81ms β”‚  1.06x slower β”‚
   β”‚ QQuery 29β”‚   530.72ms β”‚   528.47ms β”‚ no change β”‚
   β”‚ QQuery 30β”‚   802.59ms β”‚   834.59ms β”‚ no change β”‚
   β”‚ QQuery 31β”‚   841.57ms β”‚   875.02ms β”‚ no change β”‚
   β”‚ QQuery 32β”‚  2642.67ms β”‚  2824.64ms β”‚  1.07x slower β”‚
   β”‚ QQuery 33β”‚  3355.69ms β”‚  3530.76ms β”‚  1.05x slower β”‚
   β”‚ QQuery 34β”‚  3376.55ms β”‚

Re: [PR] WIP: Test DataFusion with experimental parquet pushdown [datafusion]

2025-05-29 Thread via GitHub


alamb commented on PR #16208:
URL: https://github.com/apache/datafusion/pull/16208#issuecomment-2919955178

   πŸ€– `./gh_compare_branch.sh` [Benchmark 
Script](https://github.com/alamb/datafusion-benchmarking/blob/main/gh_compare_branch.sh)
 Running
   Linux aal-dev 6.11.0-1013-gcp #13~24.04.1-Ubuntu SMP Wed Apr  2 16:34:16 UTC 
2025 x86_64 x86_64 x86_64 GNU/Linux
   Comparing alamb/test_filter_pushdown 
(f9dbcc077dbe1f1301f1f03142fe14075146a4f5) to 
7002a0027876a17e5bdf275e63d2a25373331943 
[diff](https://github.com/apache/datafusion/compare/7002a0027876a17e5bdf275e63d2a25373331943..f9dbcc077dbe1f1301f1f03142fe14075146a4f5)
   Benchmarks: tpch_mem clickbench_partitioned clickbench_extended
   Results will be posted here when complete
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]