This is an automated email from the ASF dual-hosted git repository.

mbutrovich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 6d8b8c5a7 chore: `ScanExec::new` no longer fetches data (#2881)
6d8b8c5a7 is described below

commit 6d8b8c5a7ed2834d4899b8e188b5029520b091ca
Author: Andy Grove <[email protected]>
AuthorDate: Fri Dec 12 07:37:05 2025 -0700

    chore: `ScanExec::new` no longer fetches data (#2881)
---
 native/core/src/execution/jni_api.rs        |  2 +-
 native/core/src/execution/operators/scan.rs | 67 ++++++-----------------------
 2 files changed, 15 insertions(+), 54 deletions(-)

diff --git a/native/core/src/execution/jni_api.rs 
b/native/core/src/execution/jni_api.rs
index ce991d014..b6c933d0b 100644
--- a/native/core/src/execution/jni_api.rs
+++ b/native/core/src/execution/jni_api.rs
@@ -534,7 +534,7 @@ pub unsafe extern "system" fn 
Java_org_apache_comet_Native_executePlan(
                                 .indent(true);
                                 info!(
                                     "Comet native query plan with metrics 
(Plan #{} Stage {} Partition {}):\
-                                \n plan creation (including CometScans 
fetching first batches) took {:?}:\
+                                \n plan creation took {:?}:\
                                 \n{formatted_plan_str:}",
                                     plan.plan_id, stage_id, partition, 
exec_context.plan_creation_time
                                 );
diff --git a/native/core/src/execution/operators/scan.rs 
b/native/core/src/execution/operators/scan.rs
index fef4a5b9e..1fedafbe8 100644
--- a/native/core/src/execution/operators/scan.rs
+++ b/native/core/src/execution/operators/scan.rs
@@ -98,29 +98,8 @@ impl ScanExec {
         let arrow_ffi_time = 
MetricBuilder::new(&metrics_set).subset_time("arrow_ffi_time", 0);
         let jvm_fetch_time = 
MetricBuilder::new(&metrics_set).subset_time("jvm_fetch_time", 0);
 
-        // Scan's schema is determined by the input batch, so we need to set 
it before execution.
-        // Note that we determine if arrays are dictionary-encoded based on the
-        // first batch. The array may be dictionary-encoded in some batches 
and not others, and
-        // ScanExec will cast arrays from all future batches to the type 
determined here, so we
-        // may end up either unpacking dictionary arrays or 
dictionary-encoding arrays.
-        // Dictionary-encoded primitive arrays are always unpacked.
-        let first_batch = if let Some(input_source) = input_source.as_ref() {
-            let mut timer = baseline_metrics.elapsed_compute().timer();
-            let batch = ScanExec::get_next(
-                exec_context_id,
-                input_source.as_obj(),
-                data_types.len(),
-                &jvm_fetch_time,
-                &arrow_ffi_time,
-                arrow_ffi_safe,
-            )?;
-            timer.stop();
-            batch
-        } else {
-            InputBatch::EOF
-        };
-
-        let schema = scan_schema(&first_batch, &data_types);
+        // Build schema directly from data types since get_next now always 
unpacks dictionaries
+        let schema = schema_from_data_types(&data_types);
 
         let cache = PlanProperties::new(
             EquivalenceProperties::new(Arc::clone(&schema)),
@@ -136,7 +115,7 @@ impl ScanExec {
             input_source,
             input_source_description: input_source_description.to_string(),
             data_types,
-            batch: Arc::new(Mutex::new(Some(first_batch))),
+            batch: Arc::new(Mutex::new(None)),
             cache,
             metrics: metrics_set,
             baseline_metrics,
@@ -423,26 +402,16 @@ impl ScanExec {
     }
 }
 
-fn scan_schema(input_batch: &InputBatch, data_types: &[DataType]) -> SchemaRef 
{
-    let fields = match input_batch {
-        // Note that if `columns` is empty, we'll get an empty schema
-        InputBatch::Batch(columns, _) => {
-            columns
-                .iter()
-                .enumerate()
-                .map(|(idx, c)| {
-                    let datatype = 
ScanExec::unpack_dictionary_type(c.data_type());
-                    // We don't use the field name. Put a placeholder.
-                    Field::new(format!("col_{idx}"), datatype, true)
-                })
-                .collect::<Vec<Field>>()
-        }
-        _ => data_types
-            .iter()
-            .enumerate()
-            .map(|(idx, dt)| Field::new(format!("col_{idx}"), dt.clone(), 
true))
-            .collect(),
-    };
+fn schema_from_data_types(data_types: &[DataType]) -> SchemaRef {
+    let fields = data_types
+        .iter()
+        .enumerate()
+        .map(|(idx, dt)| {
+            let datatype = ScanExec::unpack_dictionary_type(dt);
+            // We don't use the field name. Put a placeholder.
+            Field::new(format!("col_{idx}"), datatype, true)
+        })
+        .collect::<Vec<Field>>();
 
     Arc::new(Schema::new(fields))
 }
@@ -453,15 +422,7 @@ impl ExecutionPlan for ScanExec {
     }
 
     fn schema(&self) -> SchemaRef {
-        if self.exec_context_id == TEST_EXEC_CONTEXT_ID {
-            // `unwrap` is safe because `schema` is only called during 
converting
-            // Spark plan to DataFusion plan. At the moment, `batch` is not 
EOF.
-            let binding = self.batch.try_lock().unwrap();
-            let input_batch = binding.as_ref().unwrap();
-            scan_schema(input_batch, &self.data_types)
-        } else {
-            Arc::clone(&self.schema)
-        }
+        Arc::clone(&self.schema)
     }
 
     fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to