This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 6553fafc5c [Minor] Remove redundant member from RepartitionExec
(#12638)
6553fafc5c is described below
commit 6553fafc5c320e13fb177977b5a78a91c2ac2fc1
Author: Mustafa Akur <[email protected]>
AuthorDate: Fri Sep 27 13:55:56 2024 -0700
[Minor] Remove redundant member from RepartitionExec (#12638)
* Initial commit
* Use util get_array_ref
* Resolve linter errors
---
datafusion/physical-plan/src/repartition/mod.rs | 46 ++++++++++---------------
1 file changed, 19 insertions(+), 27 deletions(-)
diff --git a/datafusion/physical-plan/src/repartition/mod.rs
b/datafusion/physical-plan/src/repartition/mod.rs
index 093803e3c8..10f898b26a 100644
--- a/datafusion/physical-plan/src/repartition/mod.rs
+++ b/datafusion/physical-plan/src/repartition/mod.rs
@@ -38,12 +38,11 @@ use crate::sorts::streaming_merge;
use crate::stream::RecordBatchStreamAdapter;
use crate::{DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
Statistics};
-use arrow::array::ArrayRef;
-use arrow::datatypes::{SchemaRef, UInt64Type};
+use arrow::datatypes::{SchemaRef, UInt32Type};
use arrow::record_batch::RecordBatch;
use arrow_array::{PrimitiveArray, RecordBatchOptions};
-use datafusion_common::utils::transpose;
-use datafusion_common::{arrow_datafusion_err, not_impl_err, DataFusionError,
Result};
+use datafusion_common::utils::{get_arrayref_at_indices, transpose};
+use datafusion_common::{not_impl_err, DataFusionError, Result};
use datafusion_common_runtime::SpawnedTask;
use datafusion_execution::memory_pool::MemoryConsumer;
use datafusion_execution::TaskContext;
@@ -280,7 +279,7 @@ impl BatchPartitioner {
.collect();
for (index, hash) in hash_buffer.iter().enumerate() {
- indices[(*hash % *partitions as u64) as
usize].push(index as u64);
+ indices[(*hash % *partitions as u64) as
usize].push(index as u32);
}
// Finished building index-arrays for output partitions
@@ -292,7 +291,7 @@ impl BatchPartitioner {
.into_iter()
.enumerate()
.filter_map(|(partition, indices)| {
- let indices: PrimitiveArray<UInt64Type> =
indices.into();
+ let indices: PrimitiveArray<UInt32Type> =
indices.into();
(!indices.is_empty()).then_some((partition,
indices))
})
.map(move |(partition, indices)| {
@@ -300,14 +299,8 @@ impl BatchPartitioner {
let _timer = partitioner_timer.timer();
// Produce batches based on indices
- let columns = batch
- .columns()
- .iter()
- .map(|c| {
- arrow::compute::take(c.as_ref(), &indices,
None)
- .map_err(|e| arrow_datafusion_err!(e))
- })
- .collect::<Result<Vec<ArrayRef>>>()?;
+ let columns =
+ get_arrayref_at_indices(batch.columns(),
&indices)?;
let mut options = RecordBatchOptions::new();
options =
options.with_row_count(Some(indices.len()));
@@ -403,8 +396,6 @@ impl BatchPartitioner {
pub struct RepartitionExec {
/// Input execution plan
input: Arc<dyn ExecutionPlan>,
- /// Partitioning scheme to use
- partitioning: Partitioning,
/// Inner state that is initialized when the first output stream is
created.
state: LazyState,
/// Execution metrics
@@ -469,7 +460,7 @@ impl RepartitionExec {
/// Partitioning scheme to use
pub fn partitioning(&self) -> &Partitioning {
- &self.partitioning
+ &self.cache.partitioning
}
/// Get preserve_order flag of the RepartitionExecutor
@@ -496,7 +487,7 @@ impl DisplayAs for RepartitionExec {
f,
"{}: partitioning={}, input_partitions={}",
self.name(),
- self.partitioning,
+ self.partitioning(),
self.input.output_partitioning().partition_count()
)?;
@@ -539,8 +530,10 @@ impl ExecutionPlan for RepartitionExec {
self: Arc<Self>,
mut children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
- let mut repartition =
- RepartitionExec::try_new(children.swap_remove(0),
self.partitioning.clone())?;
+ let mut repartition = RepartitionExec::try_new(
+ children.swap_remove(0),
+ self.partitioning().clone(),
+ )?;
if self.preserve_order {
repartition = repartition.with_preserve_order();
}
@@ -548,7 +541,7 @@ impl ExecutionPlan for RepartitionExec {
}
fn benefits_from_input_partitioning(&self) -> Vec<bool> {
- vec![matches!(self.partitioning, Partitioning::Hash(_, _))]
+ vec![matches!(self.partitioning(), Partitioning::Hash(_, _))]
}
fn maintains_input_order(&self) -> Vec<bool> {
@@ -568,7 +561,7 @@ impl ExecutionPlan for RepartitionExec {
let lazy_state = Arc::clone(&self.state);
let input = Arc::clone(&self.input);
- let partitioning = self.partitioning.clone();
+ let partitioning = self.partitioning().clone();
let metrics = self.metrics.clone();
let preserve_order = self.preserve_order;
let name = self.name().to_owned();
@@ -687,7 +680,6 @@ impl RepartitionExec {
Self::compute_properties(&input, partitioning.clone(),
preserve_order);
Ok(RepartitionExec {
input,
- partitioning,
state: Default::default(),
metrics: ExecutionPlanMetricsSet::new(),
preserve_order,
@@ -1027,10 +1019,10 @@ mod tests {
{collect, expressions::col, memory::MemoryExec},
};
- use arrow::array::{StringArray, UInt32Array};
+ use arrow::array::{ArrayRef, StringArray, UInt32Array};
use arrow::datatypes::{DataType, Field, Schema};
use datafusion_common::cast::as_string_array;
- use datafusion_common::{assert_batches_sorted_eq, exec_err};
+ use datafusion_common::{arrow_datafusion_err, assert_batches_sorted_eq,
exec_err};
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
use tokio::task::JoinSet;
@@ -1134,7 +1126,7 @@ mod tests {
// execute and collect results
let mut output_partitions = vec![];
- for i in 0..exec.partitioning.partition_count() {
+ for i in 0..exec.partitioning().partition_count() {
// execute this *output* partition and collect all batches
let mut stream = exec.execute(i, Arc::clone(&task_ctx))?;
let mut batches = vec![];
@@ -1524,7 +1516,7 @@ mod tests {
let exec = RepartitionExec::try_new(Arc::new(exec), partitioning)?;
// pull partitions
- for i in 0..exec.partitioning.partition_count() {
+ for i in 0..exec.partitioning().partition_count() {
let mut stream = exec.execute(i, Arc::clone(&task_ctx))?;
let err =
arrow_datafusion_err!(stream.next().await.unwrap().unwrap_err().into());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]