Re: [PR] WIP: Test DataFusion with experimental parquet pushdown [datafusion]
Dandandan commented on code in PR #16208:
URL: https://github.com/apache/datafusion/pull/16208#discussion_r2115799584
##
datafusion/physical-plan/src/filter.rs:
##
@@ -689,6 +702,46 @@ fn filter_and_project(
})
}
+impl FilterExecStream {
+/// Evaluates the predicate filter on the given batch and appends and rows
that match
+/// to the in progress output batch builder.
+fn filter_batch(&mut self, batch: RecordBatch) -> Result<()> {
+self.predicate
+.evaluate(&batch)
+.and_then(|v| v.into_array(batch.num_rows()))
+.and_then(|filter| {
+let Some(filter) = filter.as_boolean_opt() else {
+return internal_err!(
+"Cannot create filter_array from non-boolean
predicates"
+);
+};
+
+let batch = match self.projection.as_ref() {
+Some(projection) => {
+let projected_columns = projection
+.iter()
+.map(|i| Arc::clone(batch.column(*i)))
+.collect();
+// Safety -- the input was a valid RecordBatch and
thus the projection is too
+unsafe {
+RecordBatch::new_unchecked(
+Arc::clone(&self.schema),
+projected_columns,
+batch.num_rows(),
+)
+}
+}
+None => batch,
+};
+let output_batch_builder = self
+.output_batch_builder
+.as_mut()
+.expect("output_batch_builder should be Some");
+Ok(output_batch_builder.append_filtered(batch, filter)?)
Review Comment:
> space each batch will take up front and can just straight up allocate
Hm yeah with primitive and views this might be okay - we might have to test
other data sources though (with normal binary types were it can't be
preallocated).
##
datafusion/physical-plan/src/filter.rs:
##
@@ -689,6 +702,46 @@ fn filter_and_project(
})
}
+impl FilterExecStream {
+/// Evaluates the predicate filter on the given batch and appends and rows
that match
+/// to the in progress output batch builder.
+fn filter_batch(&mut self, batch: RecordBatch) -> Result<()> {
+self.predicate
+.evaluate(&batch)
+.and_then(|v| v.into_array(batch.num_rows()))
+.and_then(|filter| {
+let Some(filter) = filter.as_boolean_opt() else {
+return internal_err!(
+"Cannot create filter_array from non-boolean
predicates"
+);
+};
+
+let batch = match self.projection.as_ref() {
+Some(projection) => {
+let projected_columns = projection
+.iter()
+.map(|i| Arc::clone(batch.column(*i)))
+.collect();
+// Safety -- the input was a valid RecordBatch and
thus the projection is too
+unsafe {
+RecordBatch::new_unchecked(
+Arc::clone(&self.schema),
+projected_columns,
+batch.num_rows(),
+)
+}
+}
+None => batch,
+};
+let output_batch_builder = self
+.output_batch_builder
+.as_mut()
+.expect("output_batch_builder should be Some");
+Ok(output_batch_builder.append_filtered(batch, filter)?)
Review Comment:
> space each batch will take up front and can just straight up allocate
Hm yeah with primitive and views this might be okay - we might have to test
other data sources though (with normal binary types were it can't be
preallocated).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] WIP: Test DataFusion with experimental parquet pushdown [datafusion]
alamb commented on code in PR #16208:
URL: https://github.com/apache/datafusion/pull/16208#discussion_r2115668031
##
datafusion/physical-plan/src/filter.rs:
##
@@ -689,6 +702,46 @@ fn filter_and_project(
})
}
+impl FilterExecStream {
+/// Evaluates the predicate filter on the given batch and appends and rows
that match
+/// to the in progress output batch builder.
+fn filter_batch(&mut self, batch: RecordBatch) -> Result<()> {
+self.predicate
+.evaluate(&batch)
+.and_then(|v| v.into_array(batch.num_rows()))
+.and_then(|filter| {
+let Some(filter) = filter.as_boolean_opt() else {
+return internal_err!(
+"Cannot create filter_array from non-boolean
predicates"
+);
+};
+
+let batch = match self.projection.as_ref() {
+Some(projection) => {
+let projected_columns = projection
+.iter()
+.map(|i| Arc::clone(batch.column(*i)))
+.collect();
+// Safety -- the input was a valid RecordBatch and
thus the projection is too
+unsafe {
+RecordBatch::new_unchecked(
+Arc::clone(&self.schema),
+projected_columns,
+batch.num_rows(),
+)
+}
+}
+None => batch,
+};
+let output_batch_builder = self
+.output_batch_builder
+.as_mut()
+.expect("output_batch_builder should be Some");
+Ok(output_batch_builder.append_filtered(batch, filter)?)
Review Comment:
> This avoids reallocations / copying as the target capacity can be
calculated.
In order to avoid buffering too much batches probably have to limit this /
create a batch anyway after x batches or having x megabytes in memory.
I was trying to avoid having any reallocations in the
[`IncrementalRecordBatchBuilder`](https://github.com/apache/arrow-rs/pull/7513/files#diff-52e0762696101af1d088555b3d8bbe861f53dc4e9e6a6edd37eee22f950ec742R48)
-- since we know the target output batch size (`batch_size`) it knows how much
space each batch will take up front and can just straight up allocate it (
`instantiate_builder` function creates the builders with `with_capacity`)
However, now that I think about it, after a call to `finish()` the updated
builder doesn't have the right allocation π€
I'll look into that more later today
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] WIP: Test DataFusion with experimental parquet pushdown [datafusion]
Dandandan commented on code in PR #16208:
URL: https://github.com/apache/datafusion/pull/16208#discussion_r2115388263
##
datafusion/physical-plan/src/filter.rs:
##
@@ -689,6 +702,46 @@ fn filter_and_project(
})
}
+impl FilterExecStream {
+/// Evaluates the predicate filter on the given batch and appends and rows
that match
+/// to the in progress output batch builder.
+fn filter_batch(&mut self, batch: RecordBatch) -> Result<()> {
+self.predicate
+.evaluate(&batch)
+.and_then(|v| v.into_array(batch.num_rows()))
+.and_then(|filter| {
+let Some(filter) = filter.as_boolean_opt() else {
+return internal_err!(
+"Cannot create filter_array from non-boolean
predicates"
+);
+};
+
+let batch = match self.projection.as_ref() {
+Some(projection) => {
+let projected_columns = projection
+.iter()
+.map(|i| Arc::clone(batch.column(*i)))
+.collect();
+// Safety -- the input was a valid RecordBatch and
thus the projection is too
+unsafe {
+RecordBatch::new_unchecked(
+Arc::clone(&self.schema),
+projected_columns,
+batch.num_rows(),
+)
+}
+}
+None => batch,
+};
+let output_batch_builder = self
+.output_batch_builder
+.as_mut()
+.expect("output_batch_builder should be Some");
+Ok(output_batch_builder.append_filtered(batch, filter)?)
Review Comment:
I expect it should be faster rather than incrementally build a new batch
based on a number of arrays, to first evaluate a number of filters
(`self.predicate.evaluate(&batch)` until the number of true values reaches the
target batch size and then have a filter api to filter a list of batches. This
avoids reallocations / copying as the target capacity can be calculated
.
In order to avoid buffering too much batches probably have to limit this /
create a batch anyway after x batches or having x megabytes in memory.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] WIP: Test DataFusion with experimental parquet pushdown [datafusion]
Dandandan commented on code in PR #16208:
URL: https://github.com/apache/datafusion/pull/16208#discussion_r2115388263
##
datafusion/physical-plan/src/filter.rs:
##
@@ -689,6 +702,46 @@ fn filter_and_project(
})
}
+impl FilterExecStream {
+/// Evaluates the predicate filter on the given batch and appends and rows
that match
+/// to the in progress output batch builder.
+fn filter_batch(&mut self, batch: RecordBatch) -> Result<()> {
+self.predicate
+.evaluate(&batch)
+.and_then(|v| v.into_array(batch.num_rows()))
+.and_then(|filter| {
+let Some(filter) = filter.as_boolean_opt() else {
+return internal_err!(
+"Cannot create filter_array from non-boolean
predicates"
+);
+};
+
+let batch = match self.projection.as_ref() {
+Some(projection) => {
+let projected_columns = projection
+.iter()
+.map(|i| Arc::clone(batch.column(*i)))
+.collect();
+// Safety -- the input was a valid RecordBatch and
thus the projection is too
+unsafe {
+RecordBatch::new_unchecked(
+Arc::clone(&self.schema),
+projected_columns,
+batch.num_rows(),
+)
+}
+}
+None => batch,
+};
+let output_batch_builder = self
+.output_batch_builder
+.as_mut()
+.expect("output_batch_builder should be Some");
+Ok(output_batch_builder.append_filtered(batch, filter)?)
Review Comment:
I expect it should be faster rather than incrementally build a new batch
based on a number of arrays, to first evaluate a number of filters
(`self.predicate.evaluate(&batch)` until the number of true values reaches the
target batch size and then have a filter api to filter a list of batches. This
avoids reallocations / copying as the target capacity can be calculated.
In order to avoid buffering too much batches probably have to limit this /
create a batch anyway after x batches or having x megabytes in memory.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] WIP: Test DataFusion with experimental parquet pushdown [datafusion]
Dandandan commented on code in PR #16208:
URL: https://github.com/apache/datafusion/pull/16208#discussion_r2115400750
##
datafusion/physical-plan/src/filter.rs:
##
@@ -689,6 +702,46 @@ fn filter_and_project(
})
}
+impl FilterExecStream {
+/// Evaluates the predicate filter on the given batch and appends and rows
that match
+/// to the in progress output batch builder.
+fn filter_batch(&mut self, batch: RecordBatch) -> Result<()> {
+self.predicate
+.evaluate(&batch)
+.and_then(|v| v.into_array(batch.num_rows()))
+.and_then(|filter| {
+let Some(filter) = filter.as_boolean_opt() else {
+return internal_err!(
+"Cannot create filter_array from non-boolean
predicates"
+);
+};
+
+let batch = match self.projection.as_ref() {
+Some(projection) => {
+let projected_columns = projection
+.iter()
+.map(|i| Arc::clone(batch.column(*i)))
+.collect();
+// Safety -- the input was a valid RecordBatch and
thus the projection is too
+unsafe {
+RecordBatch::new_unchecked(
+Arc::clone(&self.schema),
+projected_columns,
+batch.num_rows(),
+)
+}
+}
+None => batch,
+};
+let output_batch_builder = self
+.output_batch_builder
+.as_mut()
+.expect("output_batch_builder should be Some");
+Ok(output_batch_builder.append_filtered(batch, filter)?)
Review Comment:
My feeling is this should to do this "multi batch filter" and then `concat`
anyway if smaller batches are generated by this approach, rather than using a
builder approach.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] WIP: Test DataFusion with experimental parquet pushdown [datafusion]
zhuqi-lucas commented on PR #16208: URL: https://github.com/apache/datafusion/pull/16208#issuecomment-2921147300 Interesting, i am confused why it's different from arrow-rs clickbench result. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] WIP: Test DataFusion with experimental parquet pushdown [datafusion]
zhuqi-lucas commented on PR #16208: URL: https://github.com/apache/datafusion/pull/16208#issuecomment-2921150637 Ok, i misunderstood this testing is not for parquet filter pushdown enabled... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] WIP: Test DataFusion with experimental parquet pushdown [datafusion]
alamb commented on PR #16208: URL: https://github.com/apache/datafusion/pull/16208#issuecomment-2920064834 > π€: Benchmark completed Hmm that is somewhat depressing. I will investigate tomorrow -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] WIP: Test DataFusion with experimental parquet pushdown [datafusion]
alamb commented on PR #16208: URL: https://github.com/apache/datafusion/pull/16208#issuecomment-2920053295 π€: Benchmark completed Details ``` Comparing HEAD and alamb_test_filter_pushdown Benchmark clickbench_extended.json ββββ³β³β³βββ β Queryβ HEAD β alamb_test_filter_pushdown β Change β β‘ββββββββ© β QQuery 0 β 1822.08ms β 1950.22ms β 1.07x slower β β QQuery 1 β 686.15ms β 725.22ms β 1.06x slower β β QQuery 2 β 1421.43ms β 1437.38ms βno change β β QQuery 3 β 684.69ms β 705.23ms βno change β β QQuery 4 β 1456.23ms β 1479.06ms βno change β β QQuery 5 β 15371.83ms β 15471.55ms βno change β β QQuery 6 β 2033.56ms β 2057.13ms βno change β β QQuery 7 β 2091.87ms β 2101.95ms βno change β β QQuery 8 β 849.28ms β 854.59ms βno change β ββββ΄β΄β΄βββ βββββ³β β Benchmark Summary ββ β‘βββββ© β Total Time (HEAD) β 26417.11ms β β Total Time (alamb_test_filter_pushdown) β 26782.33ms β β Average Time (HEAD) β 2935.23ms β β Average Time (alamb_test_filter_pushdown) β 2975.81ms β β Queries Fasterβ 0 β β Queries Slowerβ 2 β β Queries with No Changeβ 7 β βββββ΄β Benchmark clickbench_partitioned.json ββββ³β³β³ββββ β Queryβ HEAD β alamb_test_filter_pushdown βChange β β‘βββββββββ© β QQuery 0 β15.73ms β14.77ms β +1.06x faster β β QQuery 1 β31.94ms β32.66ms β no change β β QQuery 2 β80.09ms β79.35ms β no change β β QQuery 3 β98.77ms β95.13ms β no change β β QQuery 4 β 605.02ms β 602.99ms β no change β β QQuery 5 β 868.32ms β 856.53ms β no change β β QQuery 6 β23.87ms β23.91ms β no change β β QQuery 7 β37.19ms β38.29ms β no change β β QQuery 8 β 933.70ms β 914.39ms β no change β β QQuery 9 β 1279.67ms β 1209.80ms β +1.06x faster β β QQuery 10β 272.21ms β 273.11ms β no change β β QQuery 11β 308.41ms β 298.73ms β no change β β QQuery 12β 930.67ms β 910.59ms β no change β β QQuery 13β 1348.50ms β 1369.46ms β no change β β QQuery 14β 860.32ms β 844.04ms β no change β β QQuery 15β 843.03ms β 821.28ms β no change β β QQuery 16β 1746.35ms β 1729.29ms β no change β β QQuery 17β 1634.90ms β 1603.86ms β no change β β QQuery 18β 3126.70ms β 3105.76ms β no change β β QQuery 19β84.55ms β81.44ms β no change β β QQuery 20β 1152.02ms β 1247.44ms β 1.08x slower β β QQuery 21β 1340.97ms β 1376.02ms β no change β β QQuery 22β 2221.77ms β 2373.59ms β 1.07x slower β β QQuery 23β 8067.48ms β 8453.86ms β no change β β QQuery 24β 472.64ms β 465.06ms β no change β β QQuery 25β 396.53ms β 398.05ms β no change β β QQuery 26β 535.55ms β 536.82ms β no change β β QQuery 27β 1576.77ms β 1702.88ms β 1.08x slower β β QQuery 28β 12485.75ms β 13253.81ms β 1.06x slower β β QQuery 29β 530.72ms β 528.47ms β no change β β QQuery 30β 802.59ms β 834.59ms β no change β β QQuery 31β 841.57ms β 875.02ms β no change β β QQuery 32β 2642.67ms β 2824.64ms β 1.07x slower β β QQuery 33β 3355.69ms β 3530.76ms β 1.05x slower β β QQuery 34β 3376.55ms β
Re: [PR] WIP: Test DataFusion with experimental parquet pushdown [datafusion]
alamb commented on PR #16208: URL: https://github.com/apache/datafusion/pull/16208#issuecomment-2919955178 π€ `./gh_compare_branch.sh` [Benchmark Script](https://github.com/alamb/datafusion-benchmarking/blob/main/gh_compare_branch.sh) Running Linux aal-dev 6.11.0-1013-gcp #13~24.04.1-Ubuntu SMP Wed Apr 2 16:34:16 UTC 2025 x86_64 x86_64 x86_64 GNU/Linux Comparing alamb/test_filter_pushdown (f9dbcc077dbe1f1301f1f03142fe14075146a4f5) to 7002a0027876a17e5bdf275e63d2a25373331943 [diff](https://github.com/apache/datafusion/compare/7002a0027876a17e5bdf275e63d2a25373331943..f9dbcc077dbe1f1301f1f03142fe14075146a4f5) Benchmarks: tpch_mem clickbench_partitioned clickbench_extended Results will be posted here when complete -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
