This is an automated email from the ASF dual-hosted git repository.

paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-db.git


The following commit(s) were added to refs/heads/main by this push:
     new 01ef59be refactor(rust/sedona-spatial-join): Allow SpatialIndexBuilder 
and EvaluatedGeometryArray to be configurable (#737)
01ef59be is described below

commit 01ef59be5d76721ac658a00b02c30e3d8902bd09
Author: Dewey Dunnington <[email protected]>
AuthorDate: Wed Apr 1 16:46:10 2026 -0500

    refactor(rust/sedona-spatial-join): Allow SpatialIndexBuilder and 
EvaluatedGeometryArray to be configurable (#737)
---
 .../external_evaluated_batch_stream.rs             |   9 +-
 .../bench/evaluated_batch/spill.rs                 |  10 +-
 rust/sedona-spatial-join/src/evaluated_batch.rs    |  16 -
 .../evaluated_batch_stream/external.rs             |  12 +-
 .../src/evaluated_batch/spill.rs                   | 126 ++++-
 rust/sedona-spatial-join/src/exec.rs               |   6 +
 .../src/index/build_side_collector.rs              |  44 +-
 .../src/index/default_spatial_index.rs             |  66 +--
 .../src/index/default_spatial_index_builder.rs     |  84 ++-
 rust/sedona-spatial-join/src/index/knn_adapter.rs  |   2 +-
 .../src/index/partitioned_index_provider.rs        |  22 +-
 .../sedona-spatial-join/src/index/spatial_index.rs |  20 -
 .../src/index/spatial_index_builder.rs             |  27 +-
 rust/sedona-spatial-join/src/join_provider.rs      | 108 ++++
 rust/sedona-spatial-join/src/lib.rs                |   1 +
 rust/sedona-spatial-join/src/operand_evaluator.rs  | 561 ++++++++++++++++++---
 .../src/partitioning/stream_repartitioner.rs       | 321 +-----------
 rust/sedona-spatial-join/src/prepare.rs            |  20 +-
 rust/sedona-spatial-join/src/stream.rs             |   4 +-
 rust/sedona-spatial-join/src/utils/once_fut.rs     |   2 +-
 20 files changed, 838 insertions(+), 623 deletions(-)

diff --git 
a/rust/sedona-spatial-join/bench/evaluated_batch/external_evaluated_batch_stream.rs
 
b/rust/sedona-spatial-join/bench/evaluated_batch/external_evaluated_batch_stream.rs
index 950502be..5353f151 100644
--- 
a/rust/sedona-spatial-join/bench/evaluated_batch/external_evaluated_batch_stream.rs
+++ 
b/rust/sedona-spatial-join/bench/evaluated_batch/external_evaluated_batch_stream.rs
@@ -57,10 +57,11 @@ fn make_evaluated_batch(num_rows: usize, sedona_type: 
&SedonaType) -> EvaluatedB
     let wkt_values = vec![Some("POINT (0 0)"); num_rows];
     let geom_array = create_array_storage(&wkt_values, sedona_type);
 
-    let mut geom_array = EvaluatedGeometryArray::try_new(geom_array, 
sedona_type)
-        .expect("failed to build geometry array for benchmark");
-
-    geom_array.distance = 
Some(ColumnarValue::Scalar(ScalarValue::Float64(Some(10.0))));
+    let geom_array = EvaluatedGeometryArray::try_new(geom_array, sedona_type)
+        .expect("failed to build geometry array for benchmark")
+        .with_distance(Some(ColumnarValue::Scalar(ScalarValue::Float64(Some(
+            10.0,
+        )))));
 
     EvaluatedBatch { batch, geom_array }
 }
diff --git a/rust/sedona-spatial-join/bench/evaluated_batch/spill.rs 
b/rust/sedona-spatial-join/bench/evaluated_batch/spill.rs
index b87a6b9c..cf0331f3 100644
--- a/rust/sedona-spatial-join/bench/evaluated_batch/spill.rs
+++ b/rust/sedona-spatial-join/bench/evaluated_batch/spill.rs
@@ -57,11 +57,11 @@ fn make_evaluated_batch(num_rows: usize, sedona_type: 
&SedonaType) -> EvaluatedB
     let wkt_values = vec![Some("POINT (0 0)"); num_rows];
     let geom_array = create_array_storage(&wkt_values, sedona_type);
 
-    let mut geom_array = EvaluatedGeometryArray::try_new(geom_array, 
sedona_type)
-        .expect("failed to build geometry array for benchmark");
-
-    // Use a scalar distance so the spilled dist column is constant.
-    geom_array.distance = 
Some(ColumnarValue::Scalar(ScalarValue::Float64(Some(10.0))));
+    let geom_array = EvaluatedGeometryArray::try_new(geom_array, sedona_type)
+        .expect("failed to build geometry array for benchmark")
+        .with_distance(Some(ColumnarValue::Scalar(ScalarValue::Float64(Some(
+            10.0,
+        )))));
 
     EvaluatedBatch { batch, geom_array }
 }
diff --git a/rust/sedona-spatial-join/src/evaluated_batch.rs 
b/rust/sedona-spatial-join/src/evaluated_batch.rs
index 7fa0cd79..f3b6237e 100644
--- a/rust/sedona-spatial-join/src/evaluated_batch.rs
+++ b/rust/sedona-spatial-join/src/evaluated_batch.rs
@@ -18,9 +18,6 @@
 use arrow_array::RecordBatch;
 use arrow_schema::SchemaRef;
 use datafusion_common::Result;
-use datafusion_expr::ColumnarValue;
-use geo::Rect;
-use wkb::reader::Wkb;
 
 use crate::{
     operand_evaluator::EvaluatedGeometryArray, 
utils::arrow_utils::get_record_batch_memory_size,
@@ -55,19 +52,6 @@ impl EvaluatedBatch {
     pub fn num_rows(&self) -> usize {
         self.batch.num_rows()
     }
-
-    pub fn wkb(&self, idx: usize) -> Option<&Wkb<'_>> {
-        let wkbs = self.geom_array.wkbs();
-        wkbs[idx].as_ref()
-    }
-
-    pub fn rects(&self) -> &Vec<Option<Rect<f32>>> {
-        &self.geom_array.rects
-    }
-
-    pub fn distance(&self) -> &Option<ColumnarValue> {
-        &self.geom_array.distance
-    }
 }
 
 pub mod evaluated_batch_stream;
diff --git 
a/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream/external.rs
 
b/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream/external.rs
index 67d3538e..9d3fa10e 100644
--- 
a/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream/external.rs
+++ 
b/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream/external.rs
@@ -378,11 +378,9 @@ mod tests {
     fn create_test_evaluated_batch(start_id: i32) -> Result<EvaluatedBatch> {
         let batch = create_test_record_batch(start_id)?;
         let (geom_array, sedona_type) = create_test_geometry_array()?;
-        let mut geom_array = EvaluatedGeometryArray::try_new(geom_array, 
&sedona_type)?;
-
-        // Add distance as a scalar value
-        geom_array.distance = 
Some(ColumnarValue::Scalar(ScalarValue::Float64(Some(10.0))));
-
+        let geom_array = EvaluatedGeometryArray::try_new(geom_array, 
&sedona_type)?.with_distance(
+            Some(ColumnarValue::Scalar(ScalarValue::Float64(Some(10.0)))),
+        );
         Ok(EvaluatedBatch { batch, geom_array })
     }
 
@@ -576,10 +574,10 @@ mod tests {
         assert!(name_array.is_null(2));
 
         // Verify geometry array
-        assert_eq!(batch.geom_array.rects.len(), 3);
+        assert_eq!(batch.geom_array.rects().len(), 3);
 
         // Verify distance
-        match &batch.geom_array.distance {
+        match &batch.geom_array.distance() {
             Some(ColumnarValue::Scalar(ScalarValue::Float64(Some(val)))) => {
                 assert_eq!(*val, 10.0);
             }
diff --git a/rust/sedona-spatial-join/src/evaluated_batch/spill.rs 
b/rust/sedona-spatial-join/src/evaluated_batch/spill.rs
index eddd0574..81e5d805 100644
--- a/rust/sedona-spatial-join/src/evaluated_batch/spill.rs
+++ b/rust/sedona-spatial-join/src/evaluated_batch/spill.rs
@@ -18,13 +18,15 @@
 use std::sync::Arc;
 
 use arrow::array::Float64Array;
-use arrow_array::{Array, RecordBatch, StructArray};
+use arrow_array::{Array, ArrayRef, FixedSizeListArray, Float32Array, 
RecordBatch, StructArray};
 use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef};
 use datafusion::config::SpillCompression;
 use datafusion_common::{Result, ScalarValue};
 use datafusion_execution::{disk_manager::RefCountedTempFile, 
runtime_env::RuntimeEnv};
 use datafusion_expr::ColumnarValue;
 use datafusion_physical_plan::metrics::SpillMetrics;
+use geo::{Coord, Rect};
+use geo_traits::CoordTrait;
 use sedona_common::{sedona_internal_datafusion_err, sedona_internal_err};
 use sedona_schema::datatypes::SedonaType;
 
@@ -52,6 +54,7 @@ pub struct EvaluatedBatchSpillWriter {
 const SPILL_FIELD_DATA_INDEX: usize = 0;
 const SPILL_FIELD_GEOM_INDEX: usize = 1;
 const SPILL_FIELD_DIST_INDEX: usize = 2;
+const SPILL_FIELD_RECT_INDEX: usize = 3;
 
 impl EvaluatedBatchSpillWriter {
     /// Create a new SpillWriter
@@ -70,7 +73,12 @@ impl EvaluatedBatchSpillWriter {
             Field::new("data", DataType::Struct(data_inner_fields.clone()), 
false);
         let geom_field = sedona_type.to_storage_field("geom", true)?;
         let dist_field = Field::new("dist", DataType::Float64, true);
-        let spill_schema = Schema::new(vec![data_struct_field, geom_field, 
dist_field]);
+        let rect_field = Field::new(
+            "rect",
+            DataType::new_fixed_size_list(DataType::Float32, 4, true),
+            true,
+        );
+        let spill_schema = Schema::new(vec![data_struct_field, geom_field, 
dist_field, rect_field]);
 
         // Create spill file
         let inner = RecordBatchSpillWriter::try_new(
@@ -115,7 +123,7 @@ impl EvaluatedBatchSpillWriter {
         // Store dist into a Float64Array
         let mut dist_builder = 
arrow::array::Float64Builder::with_capacity(num_rows);
         let geom_array = &evaluated_batch.geom_array;
-        match &geom_array.distance {
+        match geom_array.distance() {
             Some(ColumnarValue::Scalar(scalar)) => match scalar {
                 ScalarValue::Float64(dist_value) => {
                     for _ in 0..num_rows {
@@ -141,11 +149,31 @@ impl EvaluatedBatchSpillWriter {
         }
         let dist_array = dist_builder.finish();
 
+        // Store rect into a FixedSizeList array
+        let mut rect_builder =
+            
arrow::array::FixedSizeListBuilder::new(arrow::array::Float32Builder::new(), 4);
+        for rect_opt in geom_array.rects() {
+            if let Some(rect) = rect_opt {
+                rect_builder.values().append_slice(&[
+                    rect.min().x(),
+                    rect.min().y(),
+                    rect.max().x(),
+                    rect.max().y(),
+                ]);
+                rect_builder.append(true);
+            } else {
+                rect_builder.values().append_nulls(4);
+                rect_builder.append(false);
+            }
+        }
+        let rect_array = rect_builder.finish();
+
         // Assemble the final spilled RecordBatch
         let columns = vec![
-            Arc::new(data_struct_array) as Arc<dyn arrow::array::Array>,
-            Arc::clone(&geom_array.geometry_array),
-            Arc::new(dist_array) as Arc<dyn arrow::array::Array>,
+            Arc::new(data_struct_array) as ArrayRef,
+            Arc::clone(geom_array.geometry_array()),
+            Arc::new(dist_array) as ArrayRef,
+            Arc::new(rect_array) as ArrayRef,
         ];
         let spilled_record_batch =
             RecordBatch::try_new(Arc::new(self.spill_schema.clone()), 
columns)?;
@@ -170,7 +198,6 @@ impl EvaluatedBatchSpillReader {
     }
 
     /// Read the next EvaluatedBatch from the spill file
-    #[allow(unused)]
     pub fn next_batch(&mut self) -> Option<Result<EvaluatedBatch>> {
         self.next_raw_batch()
             .map(|record_batch| 
record_batch.and_then(spilled_batch_to_evaluated_batch))
@@ -250,9 +277,47 @@ pub(crate) fn spilled_batch_to_evaluated_batch(
         None
     };
 
+    // Extract the rect array
+    let rect_array = record_batch
+        .column(SPILL_FIELD_RECT_INDEX)
+        .as_any()
+        .downcast_ref::<FixedSizeListArray>()
+        .ok_or_else(|| {
+            sedona_internal_datafusion_err!("Expected rect column to be 
FixedSizeListArray")
+        })?;
+    let rect_child_array = rect_array
+        .values()
+        .as_any()
+        .downcast_ref::<Float32Array>()
+        .ok_or_else(|| {
+            sedona_internal_datafusion_err!("Expected rect column child to be 
Float32Array")
+        })?;
+    let rect_vec = (0..rect_array.len())
+        .map(|i| {
+            if rect_array.is_null(i) {
+                None
+            } else {
+                let child_i = i * 4;
+                unsafe {
+                    Some(Rect::new(
+                        Coord {
+                            x: rect_child_array.value_unchecked(child_i),
+                            y: rect_child_array.value_unchecked(child_i + 1),
+                        },
+                        Coord {
+                            x: rect_child_array.value_unchecked(child_i + 2),
+                            y: rect_child_array.value_unchecked(child_i + 3),
+                        },
+                    ))
+                }
+            }
+        })
+        .collect::<Vec<_>>();
+
     // Create EvaluatedGeometryArray
-    let mut geom_array = EvaluatedGeometryArray::try_new(geom_array, 
&sedona_type)?;
-    geom_array.distance = distance;
+    let geom_array =
+        EvaluatedGeometryArray::try_new_with_rects(geom_array, rect_vec, 
&sedona_type)?
+            .with_distance(distance);
 
     Ok(EvaluatedBatch { batch, geom_array })
 }
@@ -328,10 +393,11 @@ mod tests {
     fn create_test_evaluated_batch() -> Result<EvaluatedBatch> {
         let batch = create_test_record_batch()?;
         let (geom_array, sedona_type) = create_test_geometry_array()?;
-        let mut geom_array = EvaluatedGeometryArray::try_new(geom_array, 
&sedona_type)?;
 
-        // Add distance as a scalar value
-        geom_array.distance = 
Some(ColumnarValue::Scalar(ScalarValue::Float64(Some(10.0))));
+        // With distance as ScalarValue
+        let geom_array = EvaluatedGeometryArray::try_new(geom_array, 
&sedona_type)?.with_distance(
+            Some(ColumnarValue::Scalar(ScalarValue::Float64(Some(10.0)))),
+        );
 
         Ok(EvaluatedBatch { batch, geom_array })
     }
@@ -339,11 +405,11 @@ mod tests {
     fn create_test_evaluated_batch_with_array_distance() -> 
Result<EvaluatedBatch> {
         let batch = create_test_record_batch()?;
         let (geom_array, sedona_type) = create_test_geometry_array()?;
-        let mut geom_array = EvaluatedGeometryArray::try_new(geom_array, 
&sedona_type)?;
 
-        // Add distance as an array value
+        // With distance as an array value
         let dist_array = Arc::new(Float64Array::from(vec![Some(1.0), 
Some(2.0), Some(3.0)]));
-        geom_array.distance = Some(ColumnarValue::Array(dist_array));
+        let geom_array = EvaluatedGeometryArray::try_new(geom_array, 
&sedona_type)?
+            .with_distance(Some(ColumnarValue::Array(dist_array)));
 
         Ok(EvaluatedBatch { batch, geom_array })
     }
@@ -374,11 +440,10 @@ mod tests {
             Some(point3_wkb.as_slice()),
         ]));
 
-        let mut geom_array = EvaluatedGeometryArray::try_new(geom_array, 
&sedona_type)?;
-
-        // Add distance with nulls
+        // With distance with nulls
         let dist_array = Arc::new(Float64Array::from(vec![Some(1.0), None, 
Some(3.0)]));
-        geom_array.distance = Some(ColumnarValue::Array(dist_array));
+        let geom_array = EvaluatedGeometryArray::try_new(geom_array, 
&sedona_type)?
+            .with_distance(Some(ColumnarValue::Array(dist_array)));
 
         Ok(EvaluatedBatch { batch, geom_array })
     }
@@ -402,7 +467,7 @@ mod tests {
         )?;
 
         // Verify the spill schema has the expected structure
-        assert_eq!(writer.spill_schema.fields().len(), 3);
+        assert_eq!(writer.spill_schema.fields().len(), 4);
         assert_eq!(
             writer.spill_schema.field(SPILL_FIELD_DATA_INDEX).name(),
             "data"
@@ -415,6 +480,10 @@ mod tests {
             writer.spill_schema.field(SPILL_FIELD_DIST_INDEX).name(),
             "dist"
         );
+        assert_eq!(
+            writer.spill_schema.field(SPILL_FIELD_RECT_INDEX).name(),
+            "rect"
+        );
 
         Ok(())
     }
@@ -487,7 +556,7 @@ mod tests {
         let read_batch = reader.next_batch().unwrap()?;
 
         // Verify distance is read back as array
-        match &read_batch.geom_array.distance {
+        match read_batch.geom_array.distance() {
             Some(ColumnarValue::Array(array)) => {
                 let float_array = 
array.as_any().downcast_ref::<Float64Array>().unwrap();
                 assert_eq!(float_array.len(), 3);
@@ -529,10 +598,10 @@ mod tests {
 
         // Verify nulls are preserved
         assert_eq!(read_batch.num_rows(), 3);
-        assert!(read_batch.geom_array.rects[1].is_none()); // Null geometry
+        assert!(read_batch.geom_array.rects()[1].is_none()); // Null geometry
 
         // Verify distance nulls
-        match &read_batch.geom_array.distance {
+        match read_batch.geom_array.distance() {
             Some(ColumnarValue::Array(array)) => {
                 let float_array = 
array.as_any().downcast_ref::<Float64Array>().unwrap();
                 assert!(float_array.is_valid(0));
@@ -642,7 +711,7 @@ mod tests {
         )?;
 
         let evaluated_batch = create_test_evaluated_batch()?;
-        let original_rects = evaluated_batch.rects().clone();
+        let original_rects = evaluated_batch.geom_array.rects();
 
         writer.append(&evaluated_batch)?;
         let temp_file = writer.finish()?;
@@ -651,8 +720,11 @@ mod tests {
         let mut reader = EvaluatedBatchSpillReader::try_new(&temp_file)?;
         let read_batch = reader.next_batch().unwrap()?;
 
-        assert_eq!(read_batch.rects().len(), original_rects.len());
-        for (original, read) in 
original_rects.iter().zip(read_batch.rects().iter()) {
+        assert_eq!(read_batch.geom_array.rects().len(), original_rects.len());
+        for (original, read) in original_rects
+            .iter()
+            .zip(read_batch.geom_array.rects().iter())
+        {
             match (original, read) {
                 (Some(orig_rect), Some(read_rect)) => {
                     assert_eq!(orig_rect.min().x, read_rect.min().x);
@@ -694,7 +766,7 @@ mod tests {
         let mut reader = EvaluatedBatchSpillReader::try_new(&temp_file)?;
         let read_batch = reader.next_batch().unwrap()?;
 
-        match &read_batch.geom_array.distance {
+        match read_batch.geom_array.distance() {
             Some(ColumnarValue::Scalar(ScalarValue::Float64(Some(val)))) => {
                 assert_eq!(*val, 10.0);
             }
diff --git a/rust/sedona-spatial-join/src/exec.rs 
b/rust/sedona-spatial-join/src/exec.rs
index 071f6538..42ca6ebb 100644
--- a/rust/sedona-spatial-join/src/exec.rs
+++ b/rust/sedona-spatial-join/src/exec.rs
@@ -33,6 +33,7 @@ use parking_lot::Mutex;
 use sedona_common::{sedona_internal_err, SpatialJoinOptions};
 
 use crate::{
+    join_provider::{DefaultSpatialJoinProvider, SpatialJoinProvider},
     prepare::{SpatialJoinComponents, SpatialJoinComponentsBuilder},
     spatial_predicate::{KNNPredicate, SpatialPredicate, SpatialPredicateTrait},
     stream::SpatialJoinStream,
@@ -113,6 +114,8 @@ pub struct SpatialJoinExec {
     once_async_spatial_join_components: 
Arc<Mutex<Option<OnceAsync<SpatialJoinComponents>>>>,
     /// A random seed for making random procedures in spatial join 
deterministic
     seed: u64,
+    /// Factories to create the index builder and evaluated batches
+    join_provider: Arc<dyn SpatialJoinProvider>,
 }
 
 impl SpatialJoinExec {
@@ -172,6 +175,7 @@ impl SpatialJoinExec {
             cache,
             once_async_spatial_join_components: Arc::new(Mutex::new(None)),
             seed,
+            join_provider: Arc::new(DefaultSpatialJoinProvider),
         })
     }
 
@@ -472,6 +476,7 @@ impl ExecutionPlan for SpatialJoinExec {
                         self.join_type,
                         probe_thread_count,
                         self.metrics.clone(),
+                        self.join_provider.clone(),
                         self.seed,
                     );
                     Ok(spatial_join_components_builder.build(build_streams))
@@ -503,6 +508,7 @@ impl ExecutionPlan for SpatialJoinExec {
             session_config,
             context.runtime_env(),
             &self.metrics,
+            self.join_provider.clone(),
             once_fut_spatial_join_components,
             Arc::clone(&self.once_async_spatial_join_components),
         )))
diff --git a/rust/sedona-spatial-join/src/index/build_side_collector.rs 
b/rust/sedona-spatial-join/src/index/build_side_collector.rs
index 8d9ca579..6c51f681 100644
--- a/rust/sedona-spatial-join/src/index/build_side_collector.rs
+++ b/rust/sedona-spatial-join/src/index/build_side_collector.rs
@@ -32,7 +32,6 @@ use sedona_expr::statistics::GeoStatistics;
 use sedona_functions::st_analyze_agg::AnalyzeAccumulator;
 use sedona_schema::datatypes::WKB_GEOMETRY;
 
-use crate::index::spatial_index_builder::SpatialIndexBuilderRef;
 use crate::{
     evaluated_batch::{
         evaluated_batch_stream::{
@@ -42,7 +41,8 @@ use crate::{
         spill::EvaluatedBatchSpillWriter,
         EvaluatedBatch,
     },
-    operand_evaluator::{create_operand_evaluator, OperandEvaluator},
+    join_provider::SpatialJoinProvider,
+    operand_evaluator::create_operand_evaluator,
     spatial_predicate::SpatialPredicate,
     utils::bbox_sampler::{BoundingBoxSampler, BoundingBoxSamples},
 };
@@ -81,10 +81,9 @@ pub(crate) struct BuildPartition {
 pub(crate) struct BuildSideBatchesCollector {
     spatial_predicate: SpatialPredicate,
     spatial_join_options: SpatialJoinOptions,
-    evaluator: Arc<dyn OperandEvaluator>,
     runtime_env: Arc<RuntimeEnv>,
     spill_compression: SpillCompression,
-    spatial_index_builder: SpatialIndexBuilderRef,
+    join_provider: Arc<dyn SpatialJoinProvider>,
 }
 
 #[derive(Clone)]
@@ -128,16 +127,14 @@ impl BuildSideBatchesCollector {
         spatial_join_options: SpatialJoinOptions,
         runtime_env: Arc<RuntimeEnv>,
         spill_compression: SpillCompression,
-        spatial_index_builder: SpatialIndexBuilderRef,
+        join_provider: Arc<dyn SpatialJoinProvider>,
     ) -> Self {
-        let evaluator = create_operand_evaluator(&spatial_predicate, 
spatial_join_options.clone());
         BuildSideBatchesCollector {
             spatial_predicate,
             spatial_join_options,
-            evaluator,
             runtime_env,
             spill_compression,
-            spatial_index_builder,
+            join_provider,
         }
     }
 
@@ -220,7 +217,7 @@ impl BuildSideBatchesCollector {
         }
 
         let geo_statistics = analyzer.finish();
-        let extra_mem = self.spatial_index_builder.estimate_extra_memory_usage(
+        let extra_mem = self.join_provider.estimate_extra_memory_usage(
             &geo_statistics,
             &self.spatial_predicate,
             &self.spatial_join_options,
@@ -328,7 +325,10 @@ impl BuildSideBatchesCollector {
             .enumerate()
         {
             let collector = self.clone();
-            let evaluator = Arc::clone(&self.evaluator);
+            let evaluator = create_operand_evaluator(
+                &self.spatial_predicate,
+                self.join_provider.evaluated_array_factory(),
+            );
             let bbox_sampler = BoundingBoxSampler::try_new(
                 self.spatial_join_options.min_index_side_bbox_samples,
                 self.spatial_join_options.max_index_side_bbox_samples,
@@ -377,7 +377,10 @@ impl BuildSideBatchesCollector {
             .zip(reservations)
             .enumerate()
         {
-            let evaluator = Arc::clone(&self.evaluator);
+            let evaluator = create_operand_evaluator(
+                &self.spatial_predicate,
+                self.join_provider.evaluated_array_factory(),
+            );
             let bbox_sampler = BoundingBoxSampler::try_new(
                 self.spatial_join_options.min_index_side_bbox_samples,
                 self.spatial_join_options.max_index_side_bbox_samples,
@@ -408,7 +411,7 @@ impl BuildSideBatchesCollector {
         let build_side_batch = &in_mem_batches[0];
 
         let schema = build_side_batch.schema();
-        let sedona_type = &build_side_batch.geom_array.sedona_type;
+        let sedona_type = &build_side_batch.geom_array.sedona_type();
         let mut spill_writer = EvaluatedBatchSpillWriter::try_new(
             Arc::clone(&self.runtime_env),
             schema,
@@ -442,15 +445,14 @@ impl BuildSideBatchesCollector {
 #[cfg(test)]
 mod tests {
     use super::*;
-    use crate::index::spatial_index_builder::SpatialJoinBuildMetrics;
-    use crate::index::DefaultSpatialIndexBuilder;
+    use crate::join_provider::DefaultSpatialJoinProvider;
     use crate::{
         operand_evaluator::EvaluatedGeometryArray,
         spatial_predicate::{RelationPredicate, SpatialRelationType},
     };
     use arrow_array::{ArrayRef, BinaryArray, Int32Array, RecordBatch};
     use arrow_schema::{DataType, Field, Schema};
-    use datafusion_common::{JoinType, ScalarValue};
+    use datafusion_common::ScalarValue;
     use datafusion_execution::memory_pool::{GreedyMemoryPool, MemoryConsumer, 
MemoryPool};
     use datafusion_physical_expr::expressions::Literal;
     use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
@@ -488,22 +490,12 @@ mod tests {
             SpatialRelationType::Intersects,
         ));
 
-        let builder = DefaultSpatialIndexBuilder::new(
-            test_schema(),
-            predicate.clone(),
-            SpatialJoinOptions::default(),
-            JoinType::Inner,
-            1,
-            SpatialJoinBuildMetrics::default(),
-        )
-        .unwrap();
-
         BuildSideBatchesCollector::new(
             predicate,
             SpatialJoinOptions::default(),
             Arc::new(RuntimeEnv::default()),
             SpillCompression::Uncompressed,
-            Arc::new(builder),
+            Arc::new(DefaultSpatialJoinProvider),
         )
     }
 
diff --git a/rust/sedona-spatial-join/src/index/default_spatial_index.rs 
b/rust/sedona-spatial-join/src/index/default_spatial_index.rs
index 6215c307..e1c85e77 100644
--- a/rust/sedona-spatial-join/src/index/default_spatial_index.rs
+++ b/rust/sedona-spatial-join/src/index/default_spatial_index.rs
@@ -35,7 +35,6 @@ use geo_index::rtree::{
 };
 use geo_index::rtree::{sort::HilbertSort, RTree, RTreeBuilder, RTreeIndex};
 use geo_index::IndexableNum;
-use geo_types::Rect;
 use parking_lot::Mutex;
 use sedona_expr::statistics::GeoStatistics;
 use sedona_geo::to_geo::item_to_geometry;
@@ -49,7 +48,7 @@ use crate::{
         knn_adapter::{KnnComponents, SedonaKnnAdapter},
         IndexQueryResult, QueryResultMetrics,
     },
-    operand_evaluator::{create_operand_evaluator, distance_value_at, 
OperandEvaluator},
+    operand_evaluator::distance_value_at,
     refine::{create_refiner, IndexQueryResultRefiner},
     spatial_predicate::SpatialPredicate,
 };
@@ -61,9 +60,6 @@ struct DefaultSpatialIndexInner {
     pub(crate) schema: SchemaRef,
     pub(crate) options: SpatialJoinOptions,
 
-    /// The spatial predicate evaluator for the spatial predicate.
-    pub(crate) evaluator: Arc<dyn OperandEvaluator>,
-
     /// The refiner for refining the index query results.
     pub(crate) refiner: Arc<dyn IndexQueryResultRefiner>,
 
@@ -111,7 +107,6 @@ impl DefaultSpatialIndex {
         options: SpatialJoinOptions,
         probe_threads_counter: AtomicUsize,
     ) -> Self {
-        let evaluator = create_operand_evaluator(&spatial_predicate, 
options.clone());
         let refiner = create_refiner(
             options.spatial_library,
             &spatial_predicate,
@@ -126,7 +121,6 @@ impl DefaultSpatialIndex {
             inner: Arc::new(DefaultSpatialIndexInner {
                 schema,
                 options,
-                evaluator,
                 refiner,
                 rtree,
                 data_id_to_batch_pos: Vec::new(),
@@ -142,7 +136,6 @@ impl DefaultSpatialIndex {
     pub fn new(
         schema: SchemaRef,
         options: SpatialJoinOptions,
-        evaluator: Arc<dyn OperandEvaluator>,
         refiner: Arc<dyn IndexQueryResultRefiner>,
         rtree: RTree<f32>,
         indexed_batches: Vec<EvaluatedBatch>,
@@ -156,7 +149,6 @@ impl DefaultSpatialIndex {
             inner: Arc::new(DefaultSpatialIndexInner {
                 schema,
                 options,
-                evaluator,
                 refiner,
                 rtree,
                 data_id_to_batch_pos,
@@ -194,7 +186,7 @@ impl DefaultSpatialIndex {
             let chunk = chunk.to_vec();
             let index_owned = self.clone();
             join_set.spawn(async move {
-                let Some(probe_wkb) = cloned_evaluated_batch.wkb(row_idx) else 
{
+                let Some(probe_wkb) = 
cloned_evaluated_batch.geom_array.wkb(row_idx) else {
                     return (
                         i,
                         sedona_internal_err!(
@@ -248,15 +240,16 @@ impl DefaultSpatialIndex {
             let pos = self.inner.data_id_to_batch_pos[*data_idx as usize];
             let (batch_idx, row_idx) = pos;
             let indexed_batch = &self.inner.indexed_batches[batch_idx as 
usize];
-            let build_wkb = indexed_batch.wkb(row_idx as usize);
+            let build_wkb = indexed_batch.geom_array.wkb(row_idx as usize);
             let Some(build_wkb) = build_wkb else {
                 continue;
             };
-            let distance = self.inner.evaluator.resolve_distance(
-                indexed_batch.distance(),
-                row_idx as usize,
-                distance,
-            )?;
+            let build_distance = indexed_batch.geom_array.distance_at(row_idx 
as usize)?;
+            debug_assert!(
+                build_distance.is_none() || distance.is_none(),
+                "Distance should not be present on both build and probe sides"
+            );
+            let distance = build_distance.map(Some).unwrap_or(*distance);
             let geom_idx = self.inner.geom_idx_vec[*data_idx as usize];
             index_query_results.push(IndexQueryResult {
                 wkb: build_wkb,
@@ -299,37 +292,6 @@ impl SpatialIndex for DefaultSpatialIndex {
         &self.inner.indexed_batches[batch_idx].batch
     }
 
-    /// This method implements [`SpatialIndex::query`], which is a two-phase 
spatial join:
-    /// 1. **Filter phase**: Uses the R-tree index with the probe geometry's 
bounding rectangle
-    ///    to quickly identify candidate geometries that might satisfy the 
spatial predicate
-    /// 2. **Refinement phase**: Evaluates the exact spatial predicate on 
candidates to determine
-    ///    actual matches
-    fn query(
-        &self,
-        probe_wkb: &Wkb,
-        probe_rect: &Rect<f32>,
-        distance: &Option<f64>,
-        build_batch_positions: &mut Vec<(i32, i32)>,
-    ) -> Result<QueryResultMetrics> {
-        let min = probe_rect.min();
-        let max = probe_rect.max();
-        let mut candidates = self.inner.rtree.search(min.x, min.y, max.x, 
max.y);
-        if candidates.is_empty() {
-            return Ok(QueryResultMetrics {
-                count: 0,
-                candidate_count: 0,
-            });
-        }
-
-        // Sort and dedup candidates to avoid duplicate results when we index 
one geometry
-        // using several boxes.
-        candidates.sort_unstable();
-        candidates.dedup();
-
-        // Refine the candidates retrieved from the r-tree index by evaluating 
the actual spatial predicate
-        self.refine(probe_wkb, &candidates, distance, build_batch_positions)
-    }
-
     /// This method implements [`SpatialIndex::query_knn`] by:
     /// 1. R-tree's built-in neighbors() method for efficient KNN search
     /// 2. Distance refinement using actual geometry calculations
@@ -557,8 +519,8 @@ impl SpatialIndex for DefaultSpatialIndex {
             ));
         }
 
-        let rects = evaluated_batch.rects();
-        let dist = evaluated_batch.distance();
+        let rects = evaluated_batch.geom_array.rects();
+        let dist = evaluated_batch.geom_array.distance();
         let mut total_candidates_count = 0;
         let mut total_count = 0;
         let mut current_row_idx = range.start;
@@ -575,7 +537,7 @@ impl SpatialIndex for DefaultSpatialIndex {
                 continue;
             }
 
-            let Some(probe_wkb) = evaluated_batch.wkb(row_idx) else {
+            let Some(probe_wkb) = evaluated_batch.geom_array.wkb(row_idx) else 
{
                 return sedona_internal_err!(
                     "Failed to get WKB for row {} in evaluated batch",
                     row_idx
@@ -752,7 +714,7 @@ mod tests {
             SpatialRelationType::Intersects,
         ));
 
-        let builder = DefaultSpatialIndexBuilder::new(
+        let mut builder = DefaultSpatialIndexBuilder::new(
             schema.clone(),
             spatial_predicate,
             options,
@@ -1226,7 +1188,7 @@ mod tests {
             JoinSide::Left,
         ));
 
-        let builder = DefaultSpatialIndexBuilder::new(
+        let mut builder = DefaultSpatialIndexBuilder::new(
             schema.clone(),
             spatial_predicate,
             options,
diff --git 
a/rust/sedona-spatial-join/src/index/default_spatial_index_builder.rs 
b/rust/sedona-spatial-join/src/index/default_spatial_index_builder.rs
index 307a916f..6dcb20d1 100644
--- a/rust/sedona-spatial-join/src/index/default_spatial_index_builder.rs
+++ b/rust/sedona-spatial-join/src/index/default_spatial_index_builder.rs
@@ -26,7 +26,6 @@ use 
crate::index::spatial_index_builder::{SpatialIndexBuilder, SpatialJoinBuildM
 use crate::{
     evaluated_batch::{evaluated_batch_stream::SendableEvaluatedBatchStream, 
EvaluatedBatch},
     index::{default_spatial_index::DefaultSpatialIndex, 
knn_adapter::KnnComponents},
-    operand_evaluator::create_operand_evaluator,
     refine::create_refiner,
     spatial_predicate::SpatialPredicate,
     utils::join_utils::need_produce_result_in_final,
@@ -95,6 +94,36 @@ impl DefaultSpatialIndexBuilder {
         })
     }
 
+    pub(crate) fn estimate_extra_memory_usage(
+        geo_stats: &GeoStatistics,
+        spatial_predicate: &SpatialPredicate,
+        options: &SpatialJoinOptions,
+    ) -> usize {
+        // Estimate the amount of memory needed by the refiner
+        let num_geoms = geo_stats.total_geometries().unwrap_or(0) as usize;
+        let refiner = create_refiner(
+            options.spatial_library,
+            spatial_predicate,
+            options.clone(),
+            num_geoms,
+            geo_stats.clone(),
+        );
+        let refiner_mem_usage = refiner.estimate_max_memory_usage(geo_stats);
+
+        let knn_components_mem_usage =
+            if matches!(spatial_predicate, 
SpatialPredicate::KNearestNeighbors(_)) {
+                KnnComponents::estimate_max_memory_usage(geo_stats)
+            } else {
+                0
+            };
+
+        // Estimate the amount of memory needed for the R-tree
+        let rtree_mem_usage = num_geoms * RTREE_MEMORY_ESTIMATE_PER_RECT;
+
+        // The final estimation is the sum of all above
+        refiner_mem_usage + knn_components_mem_usage + rtree_mem_usage
+    }
+
     /// Build the spatial R-tree index from collected geometry batches.
     fn build_rtree(&mut self) -> Result<RTreeBuildResult> {
         let build_timer = self.metrics.build_time.timer();
@@ -102,14 +131,14 @@ impl DefaultSpatialIndexBuilder {
         let num_rects = self
             .indexed_batches
             .iter()
-            .map(|batch| batch.rects().iter().flatten().count())
+            .map(|batch| batch.geom_array.rects().iter().flatten().count())
             .sum::<usize>();
 
         let mut rtree_builder = RTreeBuilder::<f32>::new(num_rects as u32);
         let mut batch_pos_vec = vec![(-1, -1); num_rects];
 
         for (batch_idx, batch) in self.indexed_batches.iter().enumerate() {
-            let rects = batch.rects();
+            let rects = batch.geom_array.rects();
             for (idx, rect_opt) in rects.iter().enumerate() {
                 let Some(rect) = rect_opt else {
                     continue;
@@ -201,48 +230,16 @@ impl DefaultSpatialIndexBuilder {
 
 #[async_trait]
 impl SpatialIndexBuilder for DefaultSpatialIndexBuilder {
-    fn estimate_extra_memory_usage(
-        &self,
-        geo_stats: &GeoStatistics,
-        spatial_predicate: &SpatialPredicate,
-        options: &SpatialJoinOptions,
-    ) -> usize {
-        // Estimate the amount of memory needed by the refiner
-        let num_geoms = geo_stats.total_geometries().unwrap_or(0) as usize;
-        let refiner = create_refiner(
-            options.spatial_library,
-            spatial_predicate,
-            options.clone(),
-            num_geoms,
-            geo_stats.clone(),
-        );
-        let refiner_mem_usage = refiner.estimate_max_memory_usage(geo_stats);
-
-        let knn_components_mem_usage =
-            if matches!(spatial_predicate, 
SpatialPredicate::KNearestNeighbors(_)) {
-                KnnComponents::estimate_max_memory_usage(geo_stats)
-            } else {
-                0
-            };
-
-        // Estimate the amount of memory needed for the R-tree
-        let rtree_mem_usage = num_geoms * RTREE_MEMORY_ESTIMATE_PER_RECT;
-
-        // The final estimation is the sum of all above
-        refiner_mem_usage + knn_components_mem_usage + rtree_mem_usage
-    }
-
-    fn finish(mut self) -> Result<SpatialIndexRef> {
+    fn finish(&mut self) -> Result<SpatialIndexRef> {
         if self.indexed_batches.is_empty() {
             return Ok(Arc::new(DefaultSpatialIndex::empty(
-                self.spatial_predicate,
-                self.schema,
-                self.options,
+                self.spatial_predicate.clone(),
+                self.schema.clone(),
+                self.options.clone(),
                 AtomicUsize::new(self.probe_threads_count),
             )));
         }
 
-        let evaluator = create_operand_evaluator(&self.spatial_predicate, 
self.options.clone());
         let num_geoms = self
             .indexed_batches
             .iter()
@@ -282,12 +279,13 @@ impl SpatialIndexBuilder for DefaultSpatialIndexBuilder {
             self.memory_used
         );
         Ok(Arc::new(DefaultSpatialIndex::new(
-            self.schema,
-            self.options,
-            evaluator,
+            self.schema.clone(),
+            self.options.clone(),
             refiner,
             rtree,
-            self.indexed_batches,
+            self.indexed_batches
+                .drain(0..self.indexed_batches.len())
+                .collect(),
             batch_pos_vec,
             geom_idx_vec,
             visited_build_side,
diff --git a/rust/sedona-spatial-join/src/index/knn_adapter.rs 
b/rust/sedona-spatial-join/src/index/knn_adapter.rs
index ec67fe15..81deb160 100644
--- a/rust/sedona-spatial-join/src/index/knn_adapter.rs
+++ b/rust/sedona-spatial-join/src/index/knn_adapter.rs
@@ -111,7 +111,7 @@ impl<'a> GeometryAccessor for SedonaKnnAdapter<'a> {
         let (batch_idx, row_idx) = self.data_id_to_batch_pos[item_index];
         let indexed_batch = &self.indexed_batches[batch_idx as usize];
 
-        if let Some(wkb) = indexed_batch.wkb(row_idx as usize) {
+        if let Some(wkb) = indexed_batch.geom_array.wkb(row_idx as usize) {
             if let Ok(geom) = item_to_geometry(wkb) {
                 // Try to store in cache - if another thread got there first, 
we just use theirs
                 let _ = geometry_cache[item_index].set(geom);
diff --git a/rust/sedona-spatial-join/src/index/partitioned_index_provider.rs 
b/rust/sedona-spatial-join/src/index/partitioned_index_provider.rs
index c56e9930..bd8e08c2 100644
--- a/rust/sedona-spatial-join/src/index/partitioned_index_provider.rs
+++ b/rust/sedona-spatial-join/src/index/partitioned_index_provider.rs
@@ -21,8 +21,9 @@ use crate::evaluated_batch::evaluated_batch_stream::{
 };
 use crate::evaluated_batch::EvaluatedBatch;
 use crate::index::spatial_index::SpatialIndexRef;
-use crate::index::spatial_index_builder::{SpatialIndexBuilder, 
SpatialJoinBuildMetrics};
-use crate::index::{BuildPartition, DefaultSpatialIndexBuilder};
+use crate::index::spatial_index_builder::SpatialJoinBuildMetrics;
+use crate::index::BuildPartition;
+use crate::join_provider::SpatialJoinProvider;
 use crate::partitioning::stream_repartitioner::{SpilledPartition, 
SpilledPartitions};
 use crate::utils::disposable_async_cell::DisposableAsyncCell;
 use crate::{partitioning::SpatialPartition, 
spatial_predicate::SpatialPredicate};
@@ -55,6 +56,8 @@ pub(crate) struct PartitionedIndexProvider {
     /// Async cells for indexes, one per regular partition
     index_cells: Vec<DisposableAsyncCell<SharedResult<SpatialIndexRef>>>,
 
+    join_provider: Arc<dyn SpatialJoinProvider>,
+
     /// The memory reserved in the build side collection phase. We'll hold 
them until
     /// we don't need to build spatial indexes.
     _reservations: Vec<MemoryReservation>,
@@ -75,6 +78,7 @@ impl PartitionedIndexProvider {
         probe_threads_count: usize,
         partitioned_spill_files: SpilledPartitions,
         metrics: SpatialJoinBuildMetrics,
+        join_provider: Arc<dyn SpatialJoinProvider>,
         reservations: Vec<MemoryReservation>,
     ) -> Self {
         let num_partitions = partitioned_spill_files.num_regular_partitions();
@@ -91,6 +95,7 @@ impl PartitionedIndexProvider {
             metrics,
             data: 
BuildSideData::MultiPartition(Mutex::new(partitioned_spill_files)),
             index_cells,
+            join_provider,
             _reservations: reservations,
         }
     }
@@ -104,6 +109,7 @@ impl PartitionedIndexProvider {
         probe_threads_count: usize,
         mut build_partitions: Vec<BuildPartition>,
         metrics: SpatialJoinBuildMetrics,
+        join_provider: Arc<dyn SpatialJoinProvider>,
     ) -> Self {
         let reservations = build_partitions
             .iter_mut()
@@ -119,6 +125,7 @@ impl PartitionedIndexProvider {
             metrics,
             data: 
BuildSideData::SinglePartition(Mutex::new(Some(build_partitions))),
             index_cells,
+            join_provider,
             _reservations: reservations,
         }
     }
@@ -130,6 +137,7 @@ impl PartitionedIndexProvider {
         join_type: JoinType,
         probe_threads_count: usize,
         metrics: SpatialJoinBuildMetrics,
+        join_provider: Arc<dyn SpatialJoinProvider>,
     ) -> Self {
         let build_partitions = Vec::new();
         Self::new_single_partition(
@@ -140,6 +148,7 @@ impl PartitionedIndexProvider {
             probe_threads_count,
             build_partitions,
             metrics,
+            join_provider,
         )
     }
 
@@ -274,7 +283,7 @@ impl PartitionedIndexProvider {
         &self,
         build_partitions: Vec<BuildPartition>,
     ) -> Result<SpatialIndexRef> {
-        let mut builder = DefaultSpatialIndexBuilder::new(
+        let mut builder = self.join_provider.try_new_spatial_index_builder(
             Arc::clone(&self.schema),
             self.spatial_predicate.clone(),
             self.options.clone(),
@@ -296,7 +305,7 @@ impl PartitionedIndexProvider {
         &self,
         spilled_partition: SpilledPartition,
     ) -> Result<SpatialIndexRef> {
-        let mut builder = DefaultSpatialIndexBuilder::new(
+        let mut builder = self.join_provider.try_new_spatial_index_builder(
             Arc::clone(&self.schema),
             self.spatial_predicate.clone(),
             self.options.clone(),
@@ -395,6 +404,7 @@ impl EvaluatedBatchStream for ReceiverBatchStream {
 #[cfg(test)]
 mod tests {
     use super::*;
+    use crate::join_provider::DefaultSpatialJoinProvider;
     use crate::operand_evaluator::EvaluatedGeometryArray;
     use crate::partitioning::partition_slots::PartitionSlots;
     use crate::utils::bbox_sampler::BoundingBoxSamples;
@@ -511,7 +521,7 @@ mod tests {
             return Ok(SpilledPartition::empty());
         }
         let schema = batches[0].schema();
-        let sedona_type = batches[0].geom_array.sedona_type.clone();
+        let sedona_type = batches[0].geom_array.sedona_type().clone();
         let mut writer = EvaluatedBatchSpillWriter::try_new(
             runtime_env,
             schema,
@@ -572,6 +582,7 @@ mod tests {
             1,
             vec![build_partition],
             SpatialJoinBuildMetrics::new(0, &metrics),
+            Arc::new(DefaultSpatialJoinProvider),
         );
 
         let first_index = provider
@@ -613,6 +624,7 @@ mod tests {
             1,
             spilled_partitions,
             SpatialJoinBuildMetrics::new(0, &metrics),
+            Arc::new(DefaultSpatialJoinProvider),
             vec![new_reservation(Arc::clone(&memory_pool))],
         ));
 
diff --git a/rust/sedona-spatial-join/src/index/spatial_index.rs 
b/rust/sedona-spatial-join/src/index/spatial_index.rs
index 8c9d4827..ccad19bd 100644
--- a/rust/sedona-spatial-join/src/index/spatial_index.rs
+++ b/rust/sedona-spatial-join/src/index/spatial_index.rs
@@ -22,7 +22,6 @@ use arrow_array::RecordBatch;
 use arrow_schema::SchemaRef;
 use async_trait::async_trait;
 use datafusion_common::Result;
-use geo_types::Rect;
 use parking_lot::Mutex;
 use sedona_common::ExecutionMode;
 use sedona_expr::statistics::GeoStatistics;
@@ -45,25 +44,6 @@ pub(crate) trait SpatialIndex {
     fn num_indexed_batches(&self) -> usize;
     /// Get the batch at the given index.
     fn get_indexed_batch(&self, batch_idx: usize) -> &RecordBatch;
-    /// Query the spatial index with a probe geometry to find matching 
build-side geometries.
-    /// # Arguments
-    /// * `probe_wkb` - The probe geometry in WKB format
-    /// * `probe_rect` - The minimum bounding rectangle of the probe geometry
-    /// * `distance` - Optional distance parameter for distance-based spatial 
predicates
-    /// * `build_batch_positions` - Output vector that will be populated with 
(batch_idx, row_idx)
-    ///   pairs for each matching build-side geometry
-    ///
-    /// # Returns
-    /// * `JoinResultMetrics` containing the number of actual matches 
(`count`) and the number
-    ///   of candidates from the filter phase (`candidate_count`)
-    #[allow(unused)]
-    fn query(
-        &self,
-        probe_wkb: &Wkb,
-        probe_rect: &Rect<f32>,
-        distance: &Option<f64>,
-        build_batch_positions: &mut Vec<(i32, i32)>,
-    ) -> Result<QueryResultMetrics>;
     /// Query the spatial index for k nearest neighbors of a given geometry.
     /// # Arguments
     ///
diff --git a/rust/sedona-spatial-join/src/index/spatial_index_builder.rs 
b/rust/sedona-spatial-join/src/index/spatial_index_builder.rs
index 0db1e073..c51217ba 100644
--- a/rust/sedona-spatial-join/src/index/spatial_index_builder.rs
+++ b/rust/sedona-spatial-join/src/index/spatial_index_builder.rs
@@ -16,38 +16,25 @@
 // under the License.
 
 use datafusion_physical_plan::metrics::{self, ExecutionPlanMetricsSet, 
MetricBuilder};
-use sedona_common::SpatialJoinOptions;
 use sedona_expr::statistics::GeoStatistics;
-use std::sync::Arc;
 
+use 
crate::evaluated_batch::evaluated_batch_stream::SendableEvaluatedBatchStream;
 use crate::index::spatial_index::SpatialIndexRef;
-use crate::{
-    evaluated_batch::evaluated_batch_stream::SendableEvaluatedBatchStream,
-    spatial_predicate::SpatialPredicate,
-};
 use async_trait::async_trait;
 use datafusion_common::Result;
 
 /// Builder for constructing a SpatialIndex from geometry batches.
 #[async_trait]
-pub(crate) trait SpatialIndexBuilder {
-    /// Estimate the amount of memory required by the R-tree index and 
evaluating spatial predicates.
-    /// The estimated memory usage does not include the memory required for 
holding the build side
-    /// batches.
-    fn estimate_extra_memory_usage(
-        &self,
-        geo_stats: &GeoStatistics,
-        spatial_predicate: &SpatialPredicate,
-        options: &SpatialJoinOptions,
-    ) -> usize;
-
-    /// Finish building and return the completed SpatialIndex.
-    fn finish(self) -> Result<SpatialIndexRef>;
+pub(crate) trait SpatialIndexBuilder: Send + Sync {
+    /// Add a stream to this builder
     async fn add_stream(
         &mut self,
         stream: SendableEvaluatedBatchStream,
         geo_statistics: GeoStatistics,
     ) -> Result<()>;
+
+    /// Finish building and return the completed SpatialIndex.
+    fn finish(&mut self) -> Result<SpatialIndexRef>;
 }
 
 /// Metrics for the build phase of the spatial join.
@@ -67,5 +54,3 @@ impl SpatialJoinBuildMetrics {
         }
     }
 }
-
-pub(crate) type SpatialIndexBuilderRef = Arc<dyn SpatialIndexBuilder + Send + 
Sync>;
diff --git a/rust/sedona-spatial-join/src/join_provider.rs 
b/rust/sedona-spatial-join/src/join_provider.rs
new file mode 100644
index 00000000..595a6d12
--- /dev/null
+++ b/rust/sedona-spatial-join/src/join_provider.rs
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use arrow_schema::SchemaRef;
+use datafusion_common::Result;
+use datafusion_expr::JoinType;
+use sedona_common::SpatialJoinOptions;
+use sedona_expr::statistics::GeoStatistics;
+
+use crate::{
+    index::{
+        spatial_index_builder::{SpatialIndexBuilder, SpatialJoinBuildMetrics},
+        DefaultSpatialIndexBuilder,
+    },
+    operand_evaluator::{DefaultGeometryArrayFactory, 
EvaluatedGeometryArrayFactory},
+    SpatialPredicate,
+};
+
+/// Provider for join internals
+///
+/// This trait provides an extension point for overriding the evaluation
+/// details of a spatial join. In particular it allows plugging in a custom
+/// index for accelerated joins on specific hardware (e.g., GPU) and a custom
+/// bounder for specific data types (e.g., geography).
+pub(crate) trait SpatialJoinProvider: std::fmt::Debug + Send + Sync {
+    /// Create a new [SpatialIndexBuilder]
+    fn try_new_spatial_index_builder(
+        &self,
+        schema: SchemaRef,
+        spatial_predicate: SpatialPredicate,
+        options: SpatialJoinOptions,
+        join_type: JoinType,
+        probe_threads_count: usize,
+        metrics: SpatialJoinBuildMetrics,
+    ) -> Result<Box<dyn SpatialIndexBuilder>>;
+
+    /// Estimate the amount of memory required by the index and refiner 
evaluating spatial predicates
+    ///
+    /// The estimated memory usage should not include the memory required for 
holding the build side
+    /// batches.
+    fn estimate_extra_memory_usage(
+        &self,
+        geo_stats: &GeoStatistics,
+        spatial_predicate: &SpatialPredicate,
+        options: &SpatialJoinOptions,
+    ) -> usize;
+
+    fn evaluated_array_factory(&self) -> Arc<dyn 
EvaluatedGeometryArrayFactory>;
+}
+
+/// Default implementation of the [SpatialJoinProvider]
+#[derive(Debug)]
+pub(crate) struct DefaultSpatialJoinProvider;
+
+impl SpatialJoinProvider for DefaultSpatialJoinProvider {
+    fn try_new_spatial_index_builder(
+        &self,
+        schema: SchemaRef,
+        spatial_predicate: SpatialPredicate,
+        options: SpatialJoinOptions,
+        join_type: JoinType,
+        probe_threads_count: usize,
+        metrics: SpatialJoinBuildMetrics,
+    ) -> Result<Box<dyn SpatialIndexBuilder>> {
+        let builder = DefaultSpatialIndexBuilder::new(
+            schema,
+            spatial_predicate,
+            options,
+            join_type,
+            probe_threads_count,
+            metrics,
+        )?;
+        Ok(Box::new(builder))
+    }
+
+    fn estimate_extra_memory_usage(
+        &self,
+        geo_stats: &GeoStatistics,
+        spatial_predicate: &SpatialPredicate,
+        options: &SpatialJoinOptions,
+    ) -> usize {
+        DefaultSpatialIndexBuilder::estimate_extra_memory_usage(
+            geo_stats,
+            spatial_predicate,
+            options,
+        )
+    }
+
+    fn evaluated_array_factory(&self) -> Arc<dyn 
EvaluatedGeometryArrayFactory> {
+        Arc::new(DefaultGeometryArrayFactory)
+    }
+}
diff --git a/rust/sedona-spatial-join/src/lib.rs 
b/rust/sedona-spatial-join/src/lib.rs
index d3e7a293..372efb7f 100644
--- a/rust/sedona-spatial-join/src/lib.rs
+++ b/rust/sedona-spatial-join/src/lib.rs
@@ -18,6 +18,7 @@
 pub mod evaluated_batch;
 pub mod exec;
 mod index;
+mod join_provider;
 pub mod operand_evaluator;
 pub mod partitioning;
 pub mod planner;
diff --git a/rust/sedona-spatial-join/src/operand_evaluator.rs 
b/rust/sedona-spatial-join/src/operand_evaluator.rs
index efcf2a43..3c316b4c 100644
--- a/rust/sedona-spatial-join/src/operand_evaluator.rs
+++ b/rust/sedona-spatial-join/src/operand_evaluator.rs
@@ -17,6 +17,7 @@
 use core::fmt;
 use std::{mem::transmute, sync::Arc};
 
+use arrow::compute::interleave as arrow_interleave;
 use arrow_array::{Array, ArrayRef, Float64Array, RecordBatch};
 use arrow_schema::DataType;
 use datafusion_common::{utils::proxy::VecAllocExt, JoinSide, Result, 
ScalarValue};
@@ -30,7 +31,6 @@ use sedona_geo_generic_alg::BoundingRect;
 use sedona_schema::datatypes::SedonaType;
 use wkb::reader::Wkb;
 
-use sedona_common::option::SpatialJoinOptions;
 use sedona_common::sedona_internal_err;
 
 use crate::{
@@ -38,30 +38,46 @@ use crate::{
     utils::arrow_utils::get_array_memory_size,
 };
 
+/// Factory for [EvaluatedGeometryArray] instances given an evaluated geometry 
column
+///
+/// Allows join providers to customize the eagerly evaluated representation of 
geometry.
+/// This is currently limited to a concrete internal representation but may 
expand to
+/// support non-Cartesian bounding or non-WKB backed geometry arrays. This 
also may
+/// expand to support more compact or efficient serialization/deserialization 
of the
+/// evaluated array when spilling.
+pub(crate) trait EvaluatedGeometryArrayFactory: fmt::Debug + Send + Sync {
+    /// Create a new [EvaluatedGeometryArray]
+    fn try_new_evaluated_array(
+        &self,
+        geometry_array: ArrayRef,
+        sedona_type: &SedonaType,
+    ) -> Result<EvaluatedGeometryArray>;
+}
+
+/// Default EvaluatedGeometryArray factory
+///
+/// This factory constructs an array
+#[derive(Debug)]
+pub(crate) struct DefaultGeometryArrayFactory;
+
+impl EvaluatedGeometryArrayFactory for DefaultGeometryArrayFactory {
+    fn try_new_evaluated_array(
+        &self,
+        geometry_array: ArrayRef,
+        sedona_type: &SedonaType,
+    ) -> Result<EvaluatedGeometryArray> {
+        EvaluatedGeometryArray::try_new(geometry_array, sedona_type)
+    }
+}
+
 /// Operand evaluator is for evaluating the operands of a spatial predicate. 
It can be a distance
 /// operand evaluator or a relation operand evaluator.
 pub(crate) trait OperandEvaluator: fmt::Debug + Send + Sync {
     /// Evaluate the spatial predicate operand on the build side.
-    fn evaluate_build(&self, batch: &RecordBatch) -> 
Result<EvaluatedGeometryArray> {
-        let geom_expr = self.build_side_expr()?;
-        evaluate_with_rects(batch, &geom_expr)
-    }
+    fn evaluate_build(&self, batch: &RecordBatch) -> 
Result<EvaluatedGeometryArray>;
 
     /// Evaluate the spatial predicate operand on the probe side.
-    fn evaluate_probe(&self, batch: &RecordBatch) -> 
Result<EvaluatedGeometryArray> {
-        let geom_expr = self.probe_side_expr()?;
-        evaluate_with_rects(batch, &geom_expr)
-    }
-
-    /// Resolve the distance operand for a given row.
-    fn resolve_distance(
-        &self,
-        _build_distance: &Option<ColumnarValue>,
-        _build_row_idx: usize,
-        _probe_distance: &Option<f64>,
-    ) -> Result<Option<f64>> {
-        Ok(None)
-    }
+    fn evaluate_probe(&self, batch: &RecordBatch) -> 
Result<EvaluatedGeometryArray>;
 
     /// Get the expression for the build side.
     fn build_side_expr(&self) -> Result<Arc<dyn PhysicalExpr>>;
@@ -73,32 +89,34 @@ pub(crate) trait OperandEvaluator: fmt::Debug + Send + Sync 
{
 /// Create a spatial predicate evaluator for the spatial predicate.
 pub(crate) fn create_operand_evaluator(
     predicate: &SpatialPredicate,
-    options: SpatialJoinOptions,
+    evaluated_array_factory: Arc<dyn EvaluatedGeometryArrayFactory>,
 ) -> Arc<dyn OperandEvaluator> {
     match predicate {
-        SpatialPredicate::Distance(predicate) => {
-            Arc::new(DistanceOperandEvaluator::new(predicate.clone(), options))
-        }
-        SpatialPredicate::Relation(predicate) => {
-            Arc::new(RelationOperandEvaluator::new(predicate.clone(), options))
-        }
-        SpatialPredicate::KNearestNeighbors(predicate) => {
-            Arc::new(KNNOperandEvaluator::new(predicate.clone()))
-        }
+        SpatialPredicate::Distance(predicate) => 
Arc::new(DistanceOperandEvaluator::new(
+            predicate.clone(),
+            evaluated_array_factory,
+        )),
+        SpatialPredicate::Relation(predicate) => 
Arc::new(RelationOperandEvaluator::new(
+            predicate.clone(),
+            evaluated_array_factory,
+        )),
+        SpatialPredicate::KNearestNeighbors(predicate) => 
Arc::new(KNNOperandEvaluator::new(
+            predicate.clone(),
+            evaluated_array_factory,
+        )),
     }
 }
 
 /// Result of evaluating a geometry batch.
 pub struct EvaluatedGeometryArray {
-    /// Type of geometry_array
-    pub sedona_type: SedonaType,
+    sedona_type: SedonaType,
     /// The array of geometries produced by evaluating the geometry expression.
-    pub geometry_array: ArrayRef,
+    geometry_array: ArrayRef,
     /// The rects of the geometries in the geometry array. The length of this 
array is equal to the number of geometries.
     /// The rects will be None for empty or null geometries.
-    pub rects: Vec<Option<Rect<f32>>>,
+    rects: Vec<Option<Rect<f32>>>,
     /// The distance value produced by evaluating the distance expression.
-    pub distance: Option<ColumnarValue>,
+    distance: Option<ColumnarValue>,
     /// WKBs of the geometries in `geometry_array`. The wkb values reference 
buffers inside the geometry array,
     /// but we'll only allow accessing Wkb<'a> where 'a is the lifetime of the 
GeometryBatchResult to make
     /// the interfaces safe. The buffers in `geometry_array` are allocated on 
the heap and won't be moved when
@@ -107,6 +125,7 @@ pub struct EvaluatedGeometryArray {
 }
 
 impl EvaluatedGeometryArray {
+    /// Create a new EvaluatedGeometryArray and compute item bounds
     pub fn try_new(geometry_array: ArrayRef, sedona_type: &SedonaType) -> 
Result<Self> {
         let num_rows = geometry_array.len();
         let mut rect_vec = Vec::with_capacity(num_rows);
@@ -126,19 +145,47 @@ impl EvaluatedGeometryArray {
             } else {
                 None
             };
+
             rect_vec.push(rect_opt);
-            wkbs.push(wkb_opt);
+
+            // Safety: The wkbs must reference buffers inside the 
`geometry_array`. Since the `geometry_array` and
+            // `wkbs` are both owned by the `EvaluatedGeometryArray`, so they 
have the same lifetime. We'll never
+            // have a situation where the `EvaluatedGeometryArray` is dropped 
while the `wkbs` are still in use
+            // (guaranteed by the scope of the `wkbs` field and lifetime 
signature of the `wkbs` method).
+            wkbs.push(wkb_opt.map(|wkb| unsafe { transmute(wkb) }));
             Ok(())
         })?;
 
+        Ok(Self {
+            sedona_type: sedona_type.clone(),
+            geometry_array,
+            rects: rect_vec,
+            distance: None,
+            wkbs,
+        })
+    }
+
+    /// Create a new EvaluatedGeometryArray with precomputed item bounds
+    pub fn try_new_with_rects(
+        geometry_array: ArrayRef,
+        rect_vec: Vec<Option<Rect<f32>>>,
+        sedona_type: &SedonaType,
+    ) -> Result<Self> {
         // Safety: The wkbs must reference buffers inside the 
`geometry_array`. Since the `geometry_array` and
         // `wkbs` are both owned by the `EvaluatedGeometryArray`, so they have 
the same lifetime. We'll never
         // have a situation where the `EvaluatedGeometryArray` is dropped 
while the `wkbs` are still in use
         // (guaranteed by the scope of the `wkbs` field and lifetime signature 
of the `wkbs` method).
-        let wkbs = wkbs
-            .into_iter()
-            .map(|wkb| wkb.map(|wkb| unsafe { transmute(wkb) }))
-            .collect();
+        let num_rows = geometry_array.len();
+        let mut wkbs = Vec::with_capacity(num_rows);
+        geometry_array.iter_as_wkb(sedona_type, num_rows, |wkb_opt| {
+            wkbs.push(wkb_opt.map(|wkb| unsafe { transmute(wkb) }));
+            Ok(())
+        })?;
+
+        if num_rows != rect_vec.len() {
+            return sedona_internal_err!("Expected rect_vec to have same length 
as geometry array");
+        }
+
         Ok(Self {
             sedona_type: sedona_type.clone(),
             geometry_array,
@@ -148,6 +195,157 @@ impl EvaluatedGeometryArray {
         })
     }
 
+    /// Set evaluated distance
+    pub fn with_distance(mut self, distance: Option<ColumnarValue>) -> Self {
+        self.distance = distance;
+        self
+    }
+
+    /// Build a new `EvaluatedGeometryArray` by interleaving rows from the 
provided
+    /// source arrays according to `indices`. Each `(batch_idx, row_idx)` pair
+    /// identifies a source array and row.
+    ///
+    /// The rectangles are gathered directly from the source arrays (no
+    /// recomputation), while the WKB references are recomputed from the
+    /// interleaved Arrow array since their lifetimes are tied to it.
+    pub fn interleave(
+        geom_arrays: &[&EvaluatedGeometryArray],
+        indices: &[(usize, usize)],
+    ) -> Result<Self> {
+        if geom_arrays.is_empty() {
+            return sedona_internal_err!("interleave requires at least one 
geometry array");
+        }
+
+        let sedona_type = &geom_arrays[0].sedona_type;
+
+        // Interleave the Arrow geometry arrays.
+        let value_refs: Vec<&dyn Array> = geom_arrays
+            .iter()
+            .map(|g| g.geometry_array.as_ref())
+            .collect();
+        let geometry_array = arrow_interleave(&value_refs, indices)?;
+
+        // Gather rects by index — no recomputation.
+        let rects: Vec<Option<Rect<f32>>> = indices
+            .iter()
+            .map(|&(batch_idx, row_idx)| geom_arrays[batch_idx].rects[row_idx])
+            .collect();
+
+        let mut out = Self::try_new_with_rects(geometry_array, rects, 
sedona_type)?;
+
+        // Interleave distance columns.
+        out.distance = Self::interleave_distance(geom_arrays, indices)?;
+
+        Ok(out)
+    }
+
+    /// Interleave the optional distance metadata across source geometry 
arrays.
+    pub(crate) fn interleave_distance(
+        geom_arrays: &[&EvaluatedGeometryArray],
+        indices: &[(usize, usize)],
+    ) -> Result<Option<ColumnarValue>> {
+        let mut first_value: Option<&ColumnarValue> = None;
+        let mut needs_array = false;
+        let mut all_null = true;
+        let mut first_scalar: Option<&ScalarValue> = None;
+
+        for geom in geom_arrays {
+            match &geom.distance {
+                Some(value) => {
+                    if first_value.is_none() {
+                        first_value = Some(value);
+                    }
+                    match value {
+                        ColumnarValue::Array(array) => {
+                            needs_array = true;
+                            if all_null && array.logical_null_count() != 
array.len() {
+                                all_null = false;
+                            }
+                        }
+                        ColumnarValue::Scalar(scalar) => {
+                            if let Some(first) = first_scalar {
+                                if first != scalar {
+                                    needs_array = true;
+                                }
+                            } else {
+                                first_scalar = Some(scalar);
+                            }
+                            if !scalar.is_null() {
+                                all_null = false;
+                            }
+                        }
+                    }
+                }
+                None => {
+                    if first_value.is_some() && !all_null {
+                        return sedona_internal_err!(
+                            "Inconsistent distance metadata across batches"
+                        );
+                    }
+                }
+            }
+        }
+
+        if all_null {
+            return Ok(None);
+        }
+
+        let Some(distance_value) = first_value else {
+            return Ok(None);
+        };
+
+        if !needs_array {
+            if let ColumnarValue::Scalar(value) = distance_value {
+                return Ok(Some(ColumnarValue::Scalar(value.clone())));
+            }
+        }
+
+        let mut arrays: Vec<ArrayRef> = Vec::with_capacity(geom_arrays.len());
+        for geom in geom_arrays {
+            match &geom.distance {
+                Some(ColumnarValue::Array(array)) => 
arrays.push(array.clone()),
+                Some(ColumnarValue::Scalar(value)) => {
+                    
arrays.push(value.to_array_of_size(geom.geometry_array.len())?);
+                }
+                None => {
+                    return sedona_internal_err!("Inconsistent distance 
metadata across batches");
+                }
+            }
+        }
+
+        let array_refs: Vec<&dyn Array> = arrays.iter().map(|a| 
a.as_ref()).collect();
+        let array = arrow_interleave(&array_refs, indices)?;
+        Ok(Some(ColumnarValue::Array(array)))
+    }
+
+    /// Type of geometry_array
+    pub fn sedona_type(&self) -> &SedonaType {
+        &self.sedona_type
+    }
+
+    /// Evaluated array of geometries
+    pub fn geometry_array(&self) -> &ArrayRef {
+        &self.geometry_array
+    }
+
+    /// Bounding rectangles of each element in geometry_array
+    pub fn rects(&self) -> &[Option<Rect<f32>>] {
+        &self.rects
+    }
+
+    /// Evaluated array of distances
+    pub fn distance(&self) -> &Option<ColumnarValue> {
+        &self.distance
+    }
+
+    /// Get the distance value for a specific row, if distances are present.
+    pub fn distance_at(&self, row_idx: usize) -> Result<Option<f64>> {
+        match &self.distance {
+            Some(cv) => distance_value_at(cv, row_idx),
+            None => Ok(None),
+        }
+    }
+
     /// Get the WKBs of the geometries in the geometry array.
     pub fn wkbs(&self) -> &Vec<Option<Wkb<'_>>> {
         // The returned WKBs are guaranteed to be valid for the lifetime of 
the GeometryBatchResult,
@@ -157,6 +355,11 @@ impl EvaluatedGeometryArray {
         &self.wkbs
     }
 
+    /// Get a single WKB
+    pub fn wkb(&self, idx: usize) -> Option<&Wkb<'_>> {
+        self.wkbs[idx].as_ref()
+    }
+
     pub fn in_mem_size(&self) -> Result<usize> {
         let geom_array_size = get_array_memory_size(&self.geometry_array)?;
 
@@ -177,14 +380,17 @@ impl EvaluatedGeometryArray {
 #[derive(Debug)]
 struct RelationOperandEvaluator {
     inner: RelationPredicate,
-    _options: SpatialJoinOptions,
+    evaluated_array_factory: Arc<dyn EvaluatedGeometryArrayFactory>,
 }
 
 impl RelationOperandEvaluator {
-    pub fn new(inner: RelationPredicate, options: SpatialJoinOptions) -> Self {
+    pub fn new(
+        inner: RelationPredicate,
+        evaluated_array_factory: Arc<dyn EvaluatedGeometryArrayFactory>,
+    ) -> Self {
         Self {
             inner,
-            _options: options,
+            evaluated_array_factory,
         }
     }
 }
@@ -193,14 +399,17 @@ impl RelationOperandEvaluator {
 #[derive(Debug)]
 struct DistanceOperandEvaluator {
     inner: DistancePredicate,
-    _options: SpatialJoinOptions,
+    evaluated_array_factory: Arc<dyn EvaluatedGeometryArrayFactory>,
 }
 
 impl DistanceOperandEvaluator {
-    pub fn new(inner: DistancePredicate, options: SpatialJoinOptions) -> Self {
+    pub fn new(
+        inner: DistancePredicate,
+        evaluated_array_factory: Arc<dyn EvaluatedGeometryArrayFactory>,
+    ) -> Self {
         Self {
             inner,
-            _options: options,
+            evaluated_array_factory,
         }
     }
 }
@@ -208,13 +417,14 @@ impl DistanceOperandEvaluator {
 fn evaluate_with_rects(
     batch: &RecordBatch,
     geom_expr: &Arc<dyn PhysicalExpr>,
+    evaluated_array_factory: &dyn EvaluatedGeometryArrayFactory,
 ) -> Result<EvaluatedGeometryArray> {
     let geometry_columnar_value = geom_expr.evaluate(batch)?;
     let num_rows = batch.num_rows();
     let geometry_array = geometry_columnar_value.to_array(num_rows)?;
     let sedona_type =
         
SedonaType::from_storage_field(geom_expr.return_field(&batch.schema())?.as_ref())?;
-    EvaluatedGeometryArray::try_new(geometry_array, &sedona_type)
+    evaluated_array_factory.try_new_evaluated_array(geometry_array, 
&sedona_type)
 }
 
 impl DistanceOperandEvaluator {
@@ -224,7 +434,8 @@ impl DistanceOperandEvaluator {
         geom_expr: &Arc<dyn PhysicalExpr>,
         side: JoinSide,
     ) -> Result<EvaluatedGeometryArray> {
-        let mut result = evaluate_with_rects(batch, geom_expr)?;
+        let mut result =
+            evaluate_with_rects(batch, geom_expr, 
self.evaluated_array_factory.as_ref())?;
 
         let should_expand = match side {
             JoinSide::Left => self.inner.distance_side == JoinSide::Left,
@@ -333,26 +544,19 @@ impl OperandEvaluator for DistanceOperandEvaluator {
     fn probe_side_expr(&self) -> Result<Arc<dyn PhysicalExpr>> {
         Ok(Arc::clone(&self.inner.right))
     }
-
-    fn resolve_distance(
-        &self,
-        build_distance: &Option<ColumnarValue>,
-        build_row_idx: usize,
-        probe_distance: &Option<f64>,
-    ) -> Result<Option<f64>> {
-        match self.inner.distance_side {
-            JoinSide::Left => {
-                let Some(distance) = build_distance else {
-                    return Ok(None);
-                };
-                distance_value_at(distance, build_row_idx)
-            }
-            JoinSide::Right | JoinSide::None => Ok(*probe_distance),
-        }
-    }
 }
 
 impl OperandEvaluator for RelationOperandEvaluator {
+    fn evaluate_build(&self, batch: &RecordBatch) -> 
Result<EvaluatedGeometryArray> {
+        let geom_expr = self.build_side_expr()?;
+        evaluate_with_rects(batch, &geom_expr, 
self.evaluated_array_factory.as_ref())
+    }
+
+    fn evaluate_probe(&self, batch: &RecordBatch) -> 
Result<EvaluatedGeometryArray> {
+        let geom_expr = self.probe_side_expr()?;
+        evaluate_with_rects(batch, &geom_expr, 
self.evaluated_array_factory.as_ref())
+    }
+
     fn build_side_expr(&self) -> Result<Arc<dyn PhysicalExpr>> {
         Ok(Arc::clone(&self.inner.left))
     }
@@ -366,15 +570,32 @@ impl OperandEvaluator for RelationOperandEvaluator {
 #[derive(Debug)]
 struct KNNOperandEvaluator {
     inner: KNNPredicate,
+    evaluated_array_factory: Arc<dyn EvaluatedGeometryArrayFactory>,
 }
 
 impl KNNOperandEvaluator {
-    fn new(inner: KNNPredicate) -> Self {
-        Self { inner }
+    fn new(
+        inner: KNNPredicate,
+        evaluated_array_factory: Arc<dyn EvaluatedGeometryArrayFactory>,
+    ) -> Self {
+        Self {
+            inner,
+            evaluated_array_factory,
+        }
     }
 }
 
 impl OperandEvaluator for KNNOperandEvaluator {
+    fn evaluate_build(&self, batch: &RecordBatch) -> 
Result<EvaluatedGeometryArray> {
+        let geom_expr = self.build_side_expr()?;
+        evaluate_with_rects(batch, &geom_expr, 
self.evaluated_array_factory.as_ref())
+    }
+
+    fn evaluate_probe(&self, batch: &RecordBatch) -> 
Result<EvaluatedGeometryArray> {
+        let geom_expr = self.probe_side_expr()?;
+        evaluate_with_rects(batch, &geom_expr, 
self.evaluated_array_factory.as_ref())
+    }
+
     fn build_side_expr(&self) -> Result<Arc<dyn PhysicalExpr>> {
         // For KNN, the right side (objects/candidates) is the build side
         Ok(Arc::clone(&self.inner.right))
@@ -384,15 +605,205 @@ impl OperandEvaluator for KNNOperandEvaluator {
         // For KNN, the left side (queries) is the probe side
         Ok(Arc::clone(&self.inner.left))
     }
+}
 
-    /// Resolve the k value for KNN operation
-    fn resolve_distance(
-        &self,
-        _build_distance: &Option<ColumnarValue>,
-        _build_row_idx: usize,
-        _probe_distance: &Option<f64>,
-    ) -> Result<Option<f64>> {
-        // NOTE: We do not support distance-based refinement for KNN 
predicates in the refiner phase.
-        Ok(None)
+#[cfg(test)]
+mod test {
+    use arrow_array::BinaryArray;
+    use sedona_geometry::wkb_factory::wkb_point;
+    use sedona_schema::datatypes::WKB_GEOMETRY;
+
+    use super::*;
+
+    fn make_geom_array_with_distance(
+        wkbs: Vec<Vec<u8>>,
+        distance: Option<ColumnarValue>,
+    ) -> Result<EvaluatedGeometryArray> {
+        let geom_array: ArrayRef = Arc::new(BinaryArray::from(
+            wkbs.iter()
+                .map(|wkb| Some(wkb.as_slice()))
+                .collect::<Vec<_>>(),
+        ));
+        let mut geom = EvaluatedGeometryArray::try_new(geom_array, 
&WKB_GEOMETRY)?;
+        geom.distance = distance;
+        Ok(geom)
+    }
+
+    #[test]
+    fn interleave_distance_none() -> Result<()> {
+        let wkbs1 = vec![
+            wkb_point((10.0, 10.0)).unwrap(),
+            wkb_point((20.0, 20.0)).unwrap(),
+        ];
+        let wkbs2 = vec![wkb_point((30.0, 30.0)).unwrap()];
+
+        let geom1 = make_geom_array_with_distance(wkbs1, None)?;
+        let geom2 = make_geom_array_with_distance(wkbs2, None)?;
+
+        let geom_arrays = vec![&geom1, &geom2];
+        let assignments = vec![(0, 0), (1, 0), (0, 1)];
+
+        let result = EvaluatedGeometryArray::interleave_distance(&geom_arrays, 
&assignments)?;
+        assert!(result.is_none());
+        Ok(())
+    }
+
+    #[test]
+    fn interleave_distance_uniform_scalar() -> Result<()> {
+        let wkbs1 = vec![
+            wkb_point((10.0, 10.0)).unwrap(),
+            wkb_point((20.0, 20.0)).unwrap(),
+        ];
+        let wkbs2 = vec![wkb_point((30.0, 30.0)).unwrap()];
+
+        let scalar = ScalarValue::Float64(Some(5.0));
+        let geom1 =
+            make_geom_array_with_distance(wkbs1, 
Some(ColumnarValue::Scalar(scalar.clone())))?;
+        let geom2 =
+            make_geom_array_with_distance(wkbs2, 
Some(ColumnarValue::Scalar(scalar.clone())))?;
+
+        let geom_arrays = vec![&geom1, &geom2];
+        let assignments = vec![(0, 0), (1, 0), (0, 1)];
+
+        let result = EvaluatedGeometryArray::interleave_distance(&geom_arrays, 
&assignments)?;
+        assert!(matches!(result, Some(ColumnarValue::Scalar(_))));
+        if let Some(ColumnarValue::Scalar(value)) = result {
+            assert_eq!(value, ScalarValue::Float64(Some(5.0)));
+        }
+        Ok(())
+    }
+
+    #[test]
+    fn interleave_distance_different_scalars() -> Result<()> {
+        use arrow_array::Float64Array;
+
+        let wkbs1 = vec![
+            wkb_point((10.0, 10.0)).unwrap(),
+            wkb_point((20.0, 20.0)).unwrap(),
+        ];
+        let wkbs2 = vec![wkb_point((30.0, 30.0)).unwrap()];
+
+        let scalar1 = ScalarValue::Float64(Some(5.0));
+        let scalar2 = ScalarValue::Float64(Some(10.0));
+        let geom1 = make_geom_array_with_distance(wkbs1, 
Some(ColumnarValue::Scalar(scalar1)))?;
+        let geom2 = make_geom_array_with_distance(wkbs2, 
Some(ColumnarValue::Scalar(scalar2)))?;
+
+        let geom_arrays = vec![&geom1, &geom2];
+        let assignments = vec![(0, 0), (1, 0), (0, 1)];
+
+        let result = EvaluatedGeometryArray::interleave_distance(&geom_arrays, 
&assignments)?;
+        assert!(matches!(result, Some(ColumnarValue::Array(_))));
+        if let Some(ColumnarValue::Array(array)) = result {
+            let float_array = 
array.as_any().downcast_ref::<Float64Array>().unwrap();
+            assert_eq!(float_array.len(), 3);
+            assert_eq!(float_array.value(0), 5.0);
+            assert_eq!(float_array.value(1), 10.0);
+            assert_eq!(float_array.value(2), 5.0);
+        }
+        Ok(())
+    }
+
+    #[test]
+    fn interleave_distance_arrays() -> Result<()> {
+        use arrow_array::Float64Array;
+
+        let wkbs1 = vec![
+            wkb_point((10.0, 10.0)).unwrap(),
+            wkb_point((20.0, 20.0)).unwrap(),
+        ];
+        let wkbs2 = vec![wkb_point((30.0, 30.0)).unwrap()];
+
+        let array1: ArrayRef = Arc::new(Float64Array::from(vec![1.0, 2.0]));
+        let array2: ArrayRef = Arc::new(Float64Array::from(vec![3.0]));
+        let geom1 = make_geom_array_with_distance(wkbs1, 
Some(ColumnarValue::Array(array1)))?;
+        let geom2 = make_geom_array_with_distance(wkbs2, 
Some(ColumnarValue::Array(array2)))?;
+
+        let geom_arrays = vec![&geom1, &geom2];
+        let assignments = vec![(0, 0), (1, 0), (0, 1)];
+
+        let result = EvaluatedGeometryArray::interleave_distance(&geom_arrays, 
&assignments)?;
+        assert!(matches!(result, Some(ColumnarValue::Array(_))));
+        if let Some(ColumnarValue::Array(array)) = result {
+            let float_array = 
array.as_any().downcast_ref::<Float64Array>().unwrap();
+            assert_eq!(float_array.len(), 3);
+            assert_eq!(float_array.value(0), 1.0);
+            assert_eq!(float_array.value(1), 3.0);
+            assert_eq!(float_array.value(2), 2.0);
+        }
+        Ok(())
+    }
+
+    #[test]
+    fn interleave_distance_mixed_scalar_and_array() -> Result<()> {
+        use arrow_array::Float64Array;
+
+        let wkbs1 = vec![
+            wkb_point((10.0, 10.0)).unwrap(),
+            wkb_point((20.0, 20.0)).unwrap(),
+        ];
+        let wkbs2 = vec![wkb_point((30.0, 30.0)).unwrap()];
+
+        let scalar = ScalarValue::Float64(Some(5.0));
+        let array: ArrayRef = Arc::new(Float64Array::from(vec![10.0]));
+        let geom1 = make_geom_array_with_distance(wkbs1, 
Some(ColumnarValue::Scalar(scalar)))?;
+        let geom2 = make_geom_array_with_distance(wkbs2, 
Some(ColumnarValue::Array(array)))?;
+
+        let geom_arrays = vec![&geom1, &geom2];
+        let assignments = vec![(0, 0), (1, 0), (0, 1)];
+
+        let result = EvaluatedGeometryArray::interleave_distance(&geom_arrays, 
&assignments)?;
+        assert!(matches!(result, Some(ColumnarValue::Array(_))));
+        if let Some(ColumnarValue::Array(array)) = result {
+            let float_array = 
array.as_any().downcast_ref::<Float64Array>().unwrap();
+            assert_eq!(float_array.len(), 3);
+            assert_eq!(float_array.value(0), 5.0);
+            assert_eq!(float_array.value(1), 10.0);
+            assert_eq!(float_array.value(2), 5.0);
+        }
+        Ok(())
+    }
+
+    #[test]
+    fn interleave_distance_inconsistent_metadata() -> Result<()> {
+        let wkbs1 = vec![wkb_point((10.0, 10.0)).unwrap()];
+        let wkbs2 = vec![wkb_point((20.0, 20.0)).unwrap()];
+
+        let scalar = ScalarValue::Float64(Some(5.0));
+        let geom1 = make_geom_array_with_distance(wkbs1, 
Some(ColumnarValue::Scalar(scalar)))?;
+        let geom2 = make_geom_array_with_distance(wkbs2, None)?;
+
+        let geom_arrays = vec![&geom1, &geom2];
+        let assignments = vec![(0, 0), (1, 0)];
+
+        let result = EvaluatedGeometryArray::interleave_distance(&geom_arrays, 
&assignments);
+        assert!(result.is_err());
+        if let Err(e) = result {
+            assert!(e.to_string().contains("Inconsistent distance metadata"));
+        }
+        Ok(())
+    }
+
+    #[test]
+    fn interleave_distance_mixed_none_and_null() -> Result<()> {
+        use arrow_array::Float64Array;
+
+        let wkbs1 = vec![wkb_point((10.0, 10.0)).unwrap()];
+        let wkbs2 = vec![wkb_point((20.0, 20.0)).unwrap()];
+        let wkbs3 = vec![wkb_point((30.0, 30.0)).unwrap()];
+
+        let null_array = Arc::new(Float64Array::new_null(1));
+        let ega1 = make_geom_array_with_distance(wkbs1, 
Some(ColumnarValue::Array(null_array)))?;
+
+        let null_scalar = ScalarValue::Float64(None);
+        let ega2 = make_geom_array_with_distance(wkbs2, 
Some(ColumnarValue::Scalar(null_scalar)))?;
+
+        let ega3 = make_geom_array_with_distance(wkbs3, None)?;
+
+        let vec_ega = vec![&ega1, &ega2, &ega3];
+        let assignments = vec![(0, 0), (1, 0), (2, 0)];
+
+        let result = EvaluatedGeometryArray::interleave_distance(&vec_ega, 
&assignments)?;
+        assert!(result.is_none());
+        Ok(())
     }
 }
diff --git a/rust/sedona-spatial-join/src/partitioning/stream_repartitioner.rs 
b/rust/sedona-spatial-join/src/partitioning/stream_repartitioner.rs
index cc8e6d7e..8d86d40b 100644
--- a/rust/sedona-spatial-join/src/partitioning/stream_repartitioner.rs
+++ b/rust/sedona-spatial-join/src/partitioning/stream_repartitioner.rs
@@ -35,13 +35,11 @@ use crate::{
         SpatialPartitioner,
     },
 };
-use arrow::compute::interleave as arrow_interleave;
 use arrow::compute::interleave_record_batch;
-use arrow_array::{Array, ArrayRef, RecordBatch};
+use arrow_array::RecordBatch;
 use datafusion::config::SpillCompression;
-use datafusion_common::{Result, ScalarValue};
+use datafusion_common::Result;
 use datafusion_execution::{disk_manager::RefCountedTempFile, 
runtime_env::RuntimeEnv};
-use datafusion_expr::ColumnarValue;
 use datafusion_physical_plan::metrics::SpillMetrics;
 use futures::StreamExt;
 use sedona_common::sedona_internal_err;
@@ -479,7 +477,7 @@ impl StreamRepartitioner {
                     self.slots.num_regular_partitions()
                 );
             };
-            if let Some(wkb) = batch_ref.wkb(row_idx) {
+            if let Some(wkb) = batch_ref.geom_array.wkb(row_idx) {
                 self.geo_stats_accumulators[slot_idx].update_statistics(wkb)?;
             }
             self.slot_assignments[slot_idx].push((batch_idx, row_idx));
@@ -573,7 +571,7 @@ impl StreamRepartitioner {
             self.spill_registry[slot_idx] = 
Some(EvaluatedBatchSpillWriter::try_new(
                 Arc::clone(&self.runtime_env),
                 batch.schema(),
-                &batch.geom_array.sedona_type,
+                batch.geom_array.sedona_type(),
                 "streaming repartitioner",
                 self.spill_compression,
                 self.spill_metrics.clone(),
@@ -597,13 +595,13 @@ pub(crate) fn assign_rows(
     assignments: &mut Vec<SpatialPartition>,
 ) -> Result<()> {
     assignments.clear();
-    assignments.reserve(batch.rects().len());
+    assignments.reserve(batch.geom_array.rects().len());
 
     match partitioned_side {
         PartitionedSide::BuildSide => {
             let mut cnt = 0;
             let num_regular_partitions = partitioner.num_regular_partitions() 
as u32;
-            for rect_opt in batch.rects() {
+            for rect_opt in batch.geom_array.rects() {
                 let partition = match rect_opt {
                     Some(rect) => 
partitioner.partition_no_multi(&geo_rect_to_bbox(rect))?,
                     None => {
@@ -618,7 +616,7 @@ pub(crate) fn assign_rows(
             }
         }
         PartitionedSide::ProbeSide => {
-            for rect_opt in batch.rects() {
+            for rect_opt in batch.geom_array.rects() {
                 let partition = match rect_opt {
                     Some(rect) => 
partitioner.partition(&geo_rect_to_bbox(rect))?,
                     None => SpatialPartition::None,
@@ -645,115 +643,14 @@ pub(crate) fn interleave_evaluated_batch(
         return sedona_internal_err!("interleave_evaluated_batch requires at 
least one batch");
     }
     let batch = interleave_record_batch(record_batches, indices)?;
-    let geom_array = interleave_geometry_array(geom_arrays, indices)?;
+    let geom_array = EvaluatedGeometryArray::interleave(geom_arrays, indices)?;
     Ok(EvaluatedBatch { batch, geom_array })
 }
 
-fn interleave_geometry_array(
-    geom_arrays: &[&EvaluatedGeometryArray],
-    indices: &[(usize, usize)],
-) -> Result<EvaluatedGeometryArray> {
-    if geom_arrays.is_empty() {
-        return sedona_internal_err!("interleave_geometry_array requires at 
least one batch");
-    }
-    let sedona_type = &geom_arrays[0].sedona_type;
-    let value_refs: Vec<&dyn Array> = geom_arrays
-        .iter()
-        .map(|geom| geom.geometry_array.as_ref())
-        .collect();
-    let geometry_array = arrow_interleave(&value_refs, indices)?;
-
-    let distance = interleave_distance_columns(geom_arrays, indices)?;
-
-    let mut result = EvaluatedGeometryArray::try_new(geometry_array, 
sedona_type)?;
-    result.distance = distance;
-    Ok(result)
-}
-
-fn interleave_distance_columns(
-    geom_arrays: &[&EvaluatedGeometryArray],
-    assignments: &[(usize, usize)],
-) -> Result<Option<ColumnarValue>> {
-    // Check consistency and determine if we need array conversion
-    let mut first_value: Option<&ColumnarValue> = None;
-    let mut needs_array = false;
-    let mut all_null = true;
-    let mut first_scalar: Option<&ScalarValue> = None;
-
-    for geom in geom_arrays {
-        match &geom.distance {
-            Some(value) => {
-                if first_value.is_none() {
-                    first_value = Some(value);
-                }
-
-                match value {
-                    ColumnarValue::Array(array) => {
-                        needs_array = true;
-                        if all_null && array.logical_null_count() != 
array.len() {
-                            all_null = false;
-                        }
-                    }
-                    ColumnarValue::Scalar(scalar) => {
-                        if let Some(first) = first_scalar {
-                            if first != scalar {
-                                needs_array = true;
-                            }
-                        } else {
-                            first_scalar = Some(scalar);
-                        }
-                        if !scalar.is_null() {
-                            all_null = false;
-                        }
-                    }
-                }
-            }
-            None => {
-                if first_value.is_some() && !all_null {
-                    return sedona_internal_err!("Inconsistent distance 
metadata across batches");
-                }
-            }
-        }
-    }
-
-    if all_null {
-        return Ok(None);
-    }
-
-    let Some(distance_value) = first_value else {
-        return Ok(None);
-    };
-
-    // If all scalars match, return scalar
-    if !needs_array {
-        if let ColumnarValue::Scalar(value) = distance_value {
-            return Ok(Some(ColumnarValue::Scalar(value.clone())));
-        }
-    }
-
-    // Convert to arrays and interleave
-    let mut arrays: Vec<ArrayRef> = Vec::with_capacity(geom_arrays.len());
-    for geom in geom_arrays {
-        match &geom.distance {
-            Some(ColumnarValue::Array(array)) => arrays.push(array.clone()),
-            Some(ColumnarValue::Scalar(value)) => {
-                
arrays.push(value.to_array_of_size(geom.geometry_array.len())?);
-            }
-            None => {
-                return sedona_internal_err!("Inconsistent distance metadata 
across batches");
-            }
-        }
-    }
-
-    let array_refs: Vec<&dyn Array> = arrays.iter().map(|array| 
array.as_ref()).collect();
-    let array = arrow_interleave(&array_refs, assignments)?;
-    Ok(Some(ColumnarValue::Array(array)))
-}
-
 #[cfg(test)]
 mod tests {
     use super::*;
-    use arrow_array::{ArrayRef, BinaryArray, Int32Array};
+    use arrow_array::{Array, ArrayRef, BinaryArray, Int32Array};
     use arrow_schema::{DataType, Field, Schema};
     use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
     use sedona_geometry::bounding_box::BoundingBox;
@@ -1096,154 +993,6 @@ mod tests {
         Ok(())
     }
 
-    fn make_geom_array_with_distance(
-        wkbs: Vec<Vec<u8>>,
-        distance: Option<ColumnarValue>,
-    ) -> Result<EvaluatedGeometryArray> {
-        let geom_array: ArrayRef = Arc::new(BinaryArray::from(
-            wkbs.iter()
-                .map(|wkb| Some(wkb.as_slice()))
-                .collect::<Vec<_>>(),
-        ));
-        let mut geom = EvaluatedGeometryArray::try_new(geom_array, 
&WKB_GEOMETRY)?;
-        geom.distance = distance;
-        Ok(geom)
-    }
-
-    #[test]
-    fn interleave_distance_none() -> Result<()> {
-        let wkbs1 = vec![
-            wkb_point((10.0, 10.0)).unwrap(),
-            wkb_point((20.0, 20.0)).unwrap(),
-        ];
-        let wkbs2 = vec![wkb_point((30.0, 30.0)).unwrap()];
-
-        let geom1 = make_geom_array_with_distance(wkbs1, None)?;
-        let geom2 = make_geom_array_with_distance(wkbs2, None)?;
-
-        let geom_arrays = vec![&geom1, &geom2];
-        let assignments = vec![(0, 0), (1, 0), (0, 1)];
-
-        let result = interleave_distance_columns(&geom_arrays, &assignments)?;
-        assert!(result.is_none());
-        Ok(())
-    }
-
-    #[test]
-    fn interleave_distance_uniform_scalar() -> Result<()> {
-        let wkbs1 = vec![
-            wkb_point((10.0, 10.0)).unwrap(),
-            wkb_point((20.0, 20.0)).unwrap(),
-        ];
-        let wkbs2 = vec![wkb_point((30.0, 30.0)).unwrap()];
-
-        let scalar = ScalarValue::Float64(Some(5.0));
-        let geom1 =
-            make_geom_array_with_distance(wkbs1, 
Some(ColumnarValue::Scalar(scalar.clone())))?;
-        let geom2 =
-            make_geom_array_with_distance(wkbs2, 
Some(ColumnarValue::Scalar(scalar.clone())))?;
-
-        let geom_arrays = vec![&geom1, &geom2];
-        let assignments = vec![(0, 0), (1, 0), (0, 1)];
-
-        let result = interleave_distance_columns(&geom_arrays, &assignments)?;
-        assert!(matches!(result, Some(ColumnarValue::Scalar(_))));
-        if let Some(ColumnarValue::Scalar(value)) = result {
-            assert_eq!(value, ScalarValue::Float64(Some(5.0)));
-        }
-        Ok(())
-    }
-
-    #[test]
-    fn interleave_distance_different_scalars() -> Result<()> {
-        use arrow_array::Float64Array;
-
-        let wkbs1 = vec![
-            wkb_point((10.0, 10.0)).unwrap(),
-            wkb_point((20.0, 20.0)).unwrap(),
-        ];
-        let wkbs2 = vec![wkb_point((30.0, 30.0)).unwrap()];
-
-        let scalar1 = ScalarValue::Float64(Some(5.0));
-        let scalar2 = ScalarValue::Float64(Some(10.0));
-        let geom1 = make_geom_array_with_distance(wkbs1, 
Some(ColumnarValue::Scalar(scalar1)))?;
-        let geom2 = make_geom_array_with_distance(wkbs2, 
Some(ColumnarValue::Scalar(scalar2)))?;
-
-        let geom_arrays = vec![&geom1, &geom2];
-        let assignments = vec![(0, 0), (1, 0), (0, 1)];
-
-        let result = interleave_distance_columns(&geom_arrays, &assignments)?;
-        assert!(matches!(result, Some(ColumnarValue::Array(_))));
-        if let Some(ColumnarValue::Array(array)) = result {
-            let float_array = 
array.as_any().downcast_ref::<Float64Array>().unwrap();
-            assert_eq!(float_array.len(), 3);
-            assert_eq!(float_array.value(0), 5.0);
-            assert_eq!(float_array.value(1), 10.0);
-            assert_eq!(float_array.value(2), 5.0);
-        }
-        Ok(())
-    }
-
-    #[test]
-    fn interleave_distance_arrays() -> Result<()> {
-        use arrow_array::Float64Array;
-
-        let wkbs1 = vec![
-            wkb_point((10.0, 10.0)).unwrap(),
-            wkb_point((20.0, 20.0)).unwrap(),
-        ];
-        let wkbs2 = vec![wkb_point((30.0, 30.0)).unwrap()];
-
-        let array1: ArrayRef = Arc::new(Float64Array::from(vec![1.0, 2.0]));
-        let array2: ArrayRef = Arc::new(Float64Array::from(vec![3.0]));
-        let geom1 = make_geom_array_with_distance(wkbs1, 
Some(ColumnarValue::Array(array1)))?;
-        let geom2 = make_geom_array_with_distance(wkbs2, 
Some(ColumnarValue::Array(array2)))?;
-
-        let geom_arrays = vec![&geom1, &geom2];
-        let assignments = vec![(0, 0), (1, 0), (0, 1)];
-
-        let result = interleave_distance_columns(&geom_arrays, &assignments)?;
-        assert!(matches!(result, Some(ColumnarValue::Array(_))));
-        if let Some(ColumnarValue::Array(array)) = result {
-            let float_array = 
array.as_any().downcast_ref::<Float64Array>().unwrap();
-            assert_eq!(float_array.len(), 3);
-            assert_eq!(float_array.value(0), 1.0);
-            assert_eq!(float_array.value(1), 3.0);
-            assert_eq!(float_array.value(2), 2.0);
-        }
-        Ok(())
-    }
-
-    #[test]
-    fn interleave_distance_mixed_scalar_and_array() -> Result<()> {
-        use arrow_array::Float64Array;
-
-        let wkbs1 = vec![
-            wkb_point((10.0, 10.0)).unwrap(),
-            wkb_point((20.0, 20.0)).unwrap(),
-        ];
-        let wkbs2 = vec![wkb_point((30.0, 30.0)).unwrap()];
-
-        let scalar = ScalarValue::Float64(Some(5.0));
-        let array: ArrayRef = Arc::new(Float64Array::from(vec![10.0]));
-        let geom1 = make_geom_array_with_distance(wkbs1, 
Some(ColumnarValue::Scalar(scalar)))?;
-        let geom2 = make_geom_array_with_distance(wkbs2, 
Some(ColumnarValue::Array(array)))?;
-
-        let geom_arrays = vec![&geom1, &geom2];
-        let assignments = vec![(0, 0), (1, 0), (0, 1)];
-
-        let result = interleave_distance_columns(&geom_arrays, &assignments)?;
-        assert!(matches!(result, Some(ColumnarValue::Array(_))));
-        if let Some(ColumnarValue::Array(array)) = result {
-            let float_array = 
array.as_any().downcast_ref::<Float64Array>().unwrap();
-            assert_eq!(float_array.len(), 3);
-            assert_eq!(float_array.value(0), 5.0);
-            assert_eq!(float_array.value(1), 10.0);
-            assert_eq!(float_array.value(2), 5.0);
-        }
-        Ok(())
-    }
-
     #[test]
     fn interleave_evaluated_batch_empty_assignments() -> Result<()> {
         let batch_a = sample_batch(&[0], vec![Some(wkb_point((10.0, 
10.0)).unwrap())])?;
@@ -1253,29 +1002,9 @@ mod tests {
 
         let result = interleave_evaluated_batch(&record_batches, &geom_arrays, 
&[])?;
         assert_eq!(result.batch.num_rows(), 0);
-        assert_eq!(result.geom_array.geometry_array.len(), 0);
-        assert!(result.geom_array.rects.is_empty());
-        assert!(result.geom_array.distance.is_none());
-        Ok(())
-    }
-
-    #[test]
-    fn interleave_distance_inconsistent_metadata() -> Result<()> {
-        let wkbs1 = vec![wkb_point((10.0, 10.0)).unwrap()];
-        let wkbs2 = vec![wkb_point((20.0, 20.0)).unwrap()];
-
-        let scalar = ScalarValue::Float64(Some(5.0));
-        let geom1 = make_geom_array_with_distance(wkbs1, 
Some(ColumnarValue::Scalar(scalar)))?;
-        let geom2 = make_geom_array_with_distance(wkbs2, None)?;
-
-        let geom_arrays = vec![&geom1, &geom2];
-        let assignments = vec![(0, 0), (1, 0)];
-
-        let result = interleave_distance_columns(&geom_arrays, &assignments);
-        assert!(result.is_err());
-        if let Err(e) = result {
-            assert!(e.to_string().contains("Inconsistent distance metadata"));
-        }
+        assert_eq!(result.geom_array.geometry_array().len(), 0);
+        assert!(result.geom_array.rects().is_empty());
+        assert!(result.geom_array.distance().is_none());
         Ok(())
     }
 
@@ -1314,7 +1043,7 @@ mod tests {
         let result = interleave_evaluated_batch(&record_batches, &geom_arrays, 
&assignments)?;
 
         // Check if the result geometry array is BinaryViewArray
-        let geom_array = result.geom_array.geometry_array;
+        let geom_array = result.geom_array.geometry_array();
         assert!(geom_array
             .as_any()
             .downcast_ref::<BinaryViewArray>()
@@ -1332,28 +1061,4 @@ mod tests {
 
         Ok(())
     }
-
-    #[test]
-    fn interleave_distance_mixed_none_and_null() -> Result<()> {
-        use arrow_array::Float64Array;
-
-        let wkbs1 = vec![wkb_point((10.0, 10.0)).unwrap()];
-        let wkbs2 = vec![wkb_point((20.0, 20.0)).unwrap()];
-        let wkbs3 = vec![wkb_point((30.0, 30.0)).unwrap()];
-
-        let null_array = Arc::new(Float64Array::new_null(1));
-        let ega1 = make_geom_array_with_distance(wkbs1, 
Some(ColumnarValue::Array(null_array)))?;
-
-        let null_scalar = ScalarValue::Float64(None);
-        let ega2 = make_geom_array_with_distance(wkbs2, 
Some(ColumnarValue::Scalar(null_scalar)))?;
-
-        let ega3 = make_geom_array_with_distance(wkbs3, None)?;
-
-        let vec_ega = vec![&ega1, &ega2, &ega3];
-        let assignments = vec![(0, 0), (1, 0), (2, 0)];
-
-        let result = interleave_distance_columns(&vec_ega, &assignments)?;
-        assert!(result.is_none());
-        Ok(())
-    }
 }
diff --git a/rust/sedona-spatial-join/src/prepare.rs 
b/rust/sedona-spatial-join/src/prepare.rs
index b0ee38db..fae20615 100644
--- a/rust/sedona-spatial-join/src/prepare.rs
+++ b/rust/sedona-spatial-join/src/prepare.rs
@@ -33,7 +33,7 @@ use sedona_expr::statistics::GeoStatistics;
 use sedona_geometry::bounding_box::BoundingBox;
 
 use crate::index::spatial_index_builder::SpatialJoinBuildMetrics;
-use crate::index::DefaultSpatialIndexBuilder;
+use crate::join_provider::SpatialJoinProvider;
 use crate::{
     index::{
         memory_plan::{compute_memory_plan, MemoryPlan, PartitionMemorySummary},
@@ -73,6 +73,7 @@ pub(crate) struct SpatialJoinComponentsBuilder {
     join_type: JoinType,
     probe_threads_count: usize,
     metrics: ExecutionPlanMetricsSet,
+    join_provider: Arc<dyn SpatialJoinProvider>,
     seed: u64,
     sedona_options: SedonaOptions,
 }
@@ -80,6 +81,7 @@ pub(crate) struct SpatialJoinComponentsBuilder {
 impl SpatialJoinComponentsBuilder {
     /// Create a new builder capturing the execution context and configuration
     /// required to produce `SpatialJoinComponents` from build-side streams.
+    #[expect(clippy::too_many_arguments)]
     pub fn new(
         context: Arc<TaskContext>,
         build_schema: SchemaRef,
@@ -87,6 +89,7 @@ impl SpatialJoinComponentsBuilder {
         join_type: JoinType,
         probe_threads_count: usize,
         metrics: ExecutionPlanMetricsSet,
+        join_provider: Arc<dyn SpatialJoinProvider>,
         seed: u64,
     ) -> Self {
         let session_config = context.session_config();
@@ -103,6 +106,7 @@ impl SpatialJoinComponentsBuilder {
             join_type,
             probe_threads_count,
             metrics,
+            join_provider,
             seed,
             sedona_options,
         }
@@ -216,21 +220,12 @@ impl SpatialJoinComponentsBuilder {
             reservations.push(reservation);
             collect_metrics_vec.push(CollectBuildSideMetrics::new(k, 
&self.metrics));
         }
-        let join_metrics = SpatialJoinBuildMetrics::new(0, &self.metrics);
-        let builder = Arc::new(DefaultSpatialIndexBuilder::new(
-            self.build_schema.clone(),
-            self.spatial_predicate.clone(),
-            self.sedona_options.spatial_join.clone(),
-            self.join_type,
-            self.probe_threads_count,
-            join_metrics.clone(),
-        )?);
         let collector = BuildSideBatchesCollector::new(
             self.spatial_predicate.clone(),
             self.sedona_options.spatial_join.clone(),
             Arc::clone(&runtime_env),
             spill_compression,
-            builder,
+            self.join_provider.clone(),
         );
         let build_partitions = collector
             .collect_all(
@@ -407,6 +402,7 @@ impl SpatialJoinComponentsBuilder {
             self.join_type,
             self.probe_threads_count,
             SpatialJoinBuildMetrics::new(0, &self.metrics),
+            self.join_provider.clone(),
         );
 
         let probe_stream_options = ProbeStreamOptions {
@@ -440,6 +436,7 @@ impl SpatialJoinComponentsBuilder {
             self.probe_threads_count,
             build_partitions,
             SpatialJoinBuildMetrics::new(0, &self.metrics),
+            self.join_provider.clone(),
         );
 
         let probe_stream_options = ProbeStreamOptions {
@@ -477,6 +474,7 @@ impl SpatialJoinComponentsBuilder {
             self.probe_threads_count,
             merged_spilled_partitions,
             SpatialJoinBuildMetrics::new(0, &self.metrics),
+            self.join_provider.clone(),
             reservations,
         );
 
diff --git a/rust/sedona-spatial-join/src/stream.rs 
b/rust/sedona-spatial-join/src/stream.rs
index b40be025..288c59d7 100644
--- a/rust/sedona-spatial-join/src/stream.rs
+++ b/rust/sedona-spatial-join/src/stream.rs
@@ -44,6 +44,7 @@ use 
crate::evaluated_batch::evaluated_batch_stream::SendableEvaluatedBatchStream
 use crate::evaluated_batch::EvaluatedBatch;
 use crate::index::partitioned_index_provider::PartitionedIndexProvider;
 use crate::index::spatial_index::SpatialIndexRef;
+use crate::join_provider::SpatialJoinProvider;
 use crate::operand_evaluator::create_operand_evaluator;
 use crate::partitioning::SpatialPartition;
 use crate::prepare::SpatialJoinComponents;
@@ -144,6 +145,7 @@ impl SpatialJoinStream {
         session_config: &SessionConfig,
         runtime_env: Arc<RuntimeEnv>,
         metrics: &ExecutionPlanMetricsSet,
+        join_provider: Arc<dyn SpatialJoinProvider>,
         once_fut_spatial_join_components: OnceFut<SpatialJoinComponents>,
         once_async_spatial_join_components: 
Arc<Mutex<Option<OnceAsync<SpatialJoinComponents>>>>,
     ) -> Self {
@@ -156,7 +158,7 @@ impl SpatialJoinStream {
             .cloned()
             .unwrap_or_default();
 
-        let evaluator = create_operand_evaluator(on, 
sedona_options.spatial_join.clone());
+        let evaluator = create_operand_evaluator(on, 
join_provider.evaluated_array_factory());
         let join_metrics = SpatialJoinProbeMetrics::new(probe_partition_id, 
metrics);
         let probe_stream = create_evaluated_probe_stream(
             probe_stream,
diff --git a/rust/sedona-spatial-join/src/utils/once_fut.rs 
b/rust/sedona-spatial-join/src/utils/once_fut.rs
index 8e7f4d49..0e3043d6 100644
--- a/rust/sedona-spatial-join/src/utils/once_fut.rs
+++ b/rust/sedona-spatial-join/src/utils/once_fut.rs
@@ -131,7 +131,7 @@ impl<T: 'static> OnceFut<T> {
     }
 
     /// Get the result of the computation if it is ready, without consuming it
-    #[allow(unused)]
+    #[cfg(test)]
     pub(crate) fn get(&mut self, cx: &mut Context<'_>) -> Poll<Result<&T>> {
         if let OnceFutState::Pending(fut) = &mut self.state {
             let r = ready!(fut.poll_unpin(cx));

Reply via email to