This is an automated email from the ASF dual-hosted git repository.
mbutrovich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 6d8b8c5a7 chore: `ScanExec::new` no longer fetches data (#2881)
6d8b8c5a7 is described below
commit 6d8b8c5a7ed2834d4899b8e188b5029520b091ca
Author: Andy Grove <[email protected]>
AuthorDate: Fri Dec 12 07:37:05 2025 -0700
chore: `ScanExec::new` no longer fetches data (#2881)
---
native/core/src/execution/jni_api.rs | 2 +-
native/core/src/execution/operators/scan.rs | 67 ++++++-----------------------
2 files changed, 15 insertions(+), 54 deletions(-)
diff --git a/native/core/src/execution/jni_api.rs
b/native/core/src/execution/jni_api.rs
index ce991d014..b6c933d0b 100644
--- a/native/core/src/execution/jni_api.rs
+++ b/native/core/src/execution/jni_api.rs
@@ -534,7 +534,7 @@ pub unsafe extern "system" fn
Java_org_apache_comet_Native_executePlan(
.indent(true);
info!(
"Comet native query plan with metrics
(Plan #{} Stage {} Partition {}):\
- \n plan creation (including CometScans
fetching first batches) took {:?}:\
+ \n plan creation took {:?}:\
\n{formatted_plan_str:}",
plan.plan_id, stage_id, partition,
exec_context.plan_creation_time
);
diff --git a/native/core/src/execution/operators/scan.rs
b/native/core/src/execution/operators/scan.rs
index fef4a5b9e..1fedafbe8 100644
--- a/native/core/src/execution/operators/scan.rs
+++ b/native/core/src/execution/operators/scan.rs
@@ -98,29 +98,8 @@ impl ScanExec {
let arrow_ffi_time =
MetricBuilder::new(&metrics_set).subset_time("arrow_ffi_time", 0);
let jvm_fetch_time =
MetricBuilder::new(&metrics_set).subset_time("jvm_fetch_time", 0);
- // Scan's schema is determined by the input batch, so we need to set
it before execution.
- // Note that we determine if arrays are dictionary-encoded based on the
- // first batch. The array may be dictionary-encoded in some batches
and not others, and
- // ScanExec will cast arrays from all future batches to the type
determined here, so we
- // may end up either unpacking dictionary arrays or
dictionary-encoding arrays.
- // Dictionary-encoded primitive arrays are always unpacked.
- let first_batch = if let Some(input_source) = input_source.as_ref() {
- let mut timer = baseline_metrics.elapsed_compute().timer();
- let batch = ScanExec::get_next(
- exec_context_id,
- input_source.as_obj(),
- data_types.len(),
- &jvm_fetch_time,
- &arrow_ffi_time,
- arrow_ffi_safe,
- )?;
- timer.stop();
- batch
- } else {
- InputBatch::EOF
- };
-
- let schema = scan_schema(&first_batch, &data_types);
+ // Build schema directly from data types since get_next now always
unpacks dictionaries
+ let schema = schema_from_data_types(&data_types);
let cache = PlanProperties::new(
EquivalenceProperties::new(Arc::clone(&schema)),
@@ -136,7 +115,7 @@ impl ScanExec {
input_source,
input_source_description: input_source_description.to_string(),
data_types,
- batch: Arc::new(Mutex::new(Some(first_batch))),
+ batch: Arc::new(Mutex::new(None)),
cache,
metrics: metrics_set,
baseline_metrics,
@@ -423,26 +402,16 @@ impl ScanExec {
}
}
-fn scan_schema(input_batch: &InputBatch, data_types: &[DataType]) -> SchemaRef
{
- let fields = match input_batch {
- // Note that if `columns` is empty, we'll get an empty schema
- InputBatch::Batch(columns, _) => {
- columns
- .iter()
- .enumerate()
- .map(|(idx, c)| {
- let datatype =
ScanExec::unpack_dictionary_type(c.data_type());
- // We don't use the field name. Put a placeholder.
- Field::new(format!("col_{idx}"), datatype, true)
- })
- .collect::<Vec<Field>>()
- }
- _ => data_types
- .iter()
- .enumerate()
- .map(|(idx, dt)| Field::new(format!("col_{idx}"), dt.clone(),
true))
- .collect(),
- };
+fn schema_from_data_types(data_types: &[DataType]) -> SchemaRef {
+ let fields = data_types
+ .iter()
+ .enumerate()
+ .map(|(idx, dt)| {
+ let datatype = ScanExec::unpack_dictionary_type(dt);
+ // We don't use the field name. Put a placeholder.
+ Field::new(format!("col_{idx}"), datatype, true)
+ })
+ .collect::<Vec<Field>>();
Arc::new(Schema::new(fields))
}
@@ -453,15 +422,7 @@ impl ExecutionPlan for ScanExec {
}
fn schema(&self) -> SchemaRef {
- if self.exec_context_id == TEST_EXEC_CONTEXT_ID {
- // `unwrap` is safe because `schema` is only called during
converting
- // Spark plan to DataFusion plan. At the moment, `batch` is not
EOF.
- let binding = self.batch.try_lock().unwrap();
- let input_batch = binding.as_ref().unwrap();
- scan_schema(input_batch, &self.data_types)
- } else {
- Arc::clone(&self.schema)
- }
+ Arc::clone(&self.schema)
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]