This is an automated email from the ASF dual-hosted git repository.

kontinuation pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-db.git


The following commit(s) were added to refs/heads/main by this push:
     new 1ac184d8 feat(rust/sedona-raster-functions): add RS_Contains, 
RS_Intersects, RS_Within UDFs (#615)
1ac184d8 is described below

commit 1ac184d8a93c62f544e548d5f2a5df0162e6b5f5
Author: Kristin Cowalcijk <[email protected]>
AuthorDate: Tue Mar 10 18:15:48 2026 +0800

    feat(rust/sedona-raster-functions): add RS_Contains, RS_Intersects, 
RS_Within UDFs (#615)
    
    ## Summary
    
    - Add `RS_Contains`, `RS_Intersects`, and `RS_Within` spatial predicate 
UDFs to `sedona-raster-functions`
    - Supports raster-geometry and raster-raster spatial relationship tests 
with CRS-aware coordinate transformation
    - Adds `crs_utils` module for CRS resolution and coordinate transformation 
helpers
    - Extends `executor.rs` with `execute_raster_wkb_crs_void` (raster+geometry 
iteration) and `execute_raster_raster_void` (two-raster iteration) patterns
    - Makes `sedona-proj::st_transform` module and `with_global_proj_engine` 
function public for cross-crate CRS transformation
    - Includes unit tests and benchmark entries in `native-raster-functions.rs`
    
    Co-authored-by: Dewey Dunnington <[email protected]>
---
 Cargo.lock                                         |   4 +
 c/sedona-proj/src/transform.rs                     |   5 +-
 docs/reference/sql/rs_contains.qmd                 |  58 ++
 docs/reference/sql/rs_intersects.qmd               |  57 ++
 docs/reference/sql/rs_within.qmd                   |  63 ++
 rust/sedona-functions/src/lib.rs                   |   2 +-
 rust/sedona-raster-functions/Cargo.toml            |   5 +
 .../benches/native-raster-functions.rs             | 113 ++++
 rust/sedona-raster-functions/src/crs_utils.rs      | 462 +++++++++++++
 rust/sedona-raster-functions/src/executor.rs       | 723 +++++++++++++++++++-
 rust/sedona-raster-functions/src/lib.rs            |   2 +
 rust/sedona-raster-functions/src/register.rs       |   3 +
 .../src/rs_spatial_predicates.rs                   | 745 +++++++++++++++++++++
 rust/sedona-schema/src/crs.rs                      |   6 +
 14 files changed, 2241 insertions(+), 7 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 67c17af7..5055fe96 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -5568,14 +5568,18 @@ dependencies = [
  "criterion",
  "datafusion-common",
  "datafusion-expr",
+ "geo-traits",
  "rstest",
  "sedona-common",
  "sedona-expr",
  "sedona-geometry",
+ "sedona-proj",
  "sedona-raster",
  "sedona-schema",
  "sedona-testing",
+ "sedona-tg",
  "serde_json",
+ "wkb",
 ]
 
 [[package]]
diff --git a/c/sedona-proj/src/transform.rs b/c/sedona-proj/src/transform.rs
index 1790a51a..28e231fb 100644
--- a/c/sedona-proj/src/transform.rs
+++ b/c/sedona-proj/src/transform.rs
@@ -162,10 +162,7 @@ pub fn configure_global_proj_engine(builder: 
ProjCrsEngineBuilder) -> Result<()>
 
 /// Do something with the global thread-local PROJ engine, creating it if it 
has not
 /// already been created.
-pub(crate) fn with_global_proj_engine<
-    R,
-    F: FnMut(&CachingCrsEngine<ProjCrsEngine>) -> Result<R>,
->(
+pub fn with_global_proj_engine<R, F: FnMut(&CachingCrsEngine<ProjCrsEngine>) 
-> Result<R>>(
     mut func: F,
 ) -> Result<R> {
     PROJ_ENGINE.with(|engine_cell| {
diff --git a/docs/reference/sql/rs_contains.qmd 
b/docs/reference/sql/rs_contains.qmd
new file mode 100644
index 00000000..dbc4eaba
--- /dev/null
+++ b/docs/reference/sql/rs_contains.qmd
@@ -0,0 +1,58 @@
+---
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+title: RS_Contains
+description: Returns true if the first argument's extent contains the second.
+kernels:
+  - returns: boolean
+    args: [raster, geometry]
+  - returns: boolean
+    args: [geometry, raster]
+  - returns: boolean
+    args:
+    - name: rastA
+      type: raster
+      description: Input raster
+    - name: rastB
+      type: raster
+      description: Input raster
+---
+
+## Description
+
+Returns `true` if the convex hull of the first argument completely contains
+the second argument. Both rasters and geometries are accepted in either
+argument position. When two rasters are provided, their convex hulls are
+compared.
+
+If the arguments have different CRSes, the geometry is transformed into the
+raster's CRS before evaluating the predicate. For two rasters, the second
+raster's extent is transformed into the first raster's CRS. If the preferred
+transformation fails, the extent of both sides are transformed to WGS 84 as a 
fallback.
+If either argument has no CRS set, the comparison is performed directly without
+CRS transformation.
+
+## Examples
+
+```sql
+SELECT RS_Contains(RS_Example(), ST_Point(0.5, 0.5, 4326));
+```
+
+```sql
+SELECT RS_Contains(RS_Example(), RS_Example());
+```
diff --git a/docs/reference/sql/rs_intersects.qmd 
b/docs/reference/sql/rs_intersects.qmd
new file mode 100644
index 00000000..bf631a24
--- /dev/null
+++ b/docs/reference/sql/rs_intersects.qmd
@@ -0,0 +1,57 @@
+---
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+title: RS_Intersects
+description: Returns true if the extents of the two arguments intersect.
+kernels:
+  - returns: boolean
+    args: [raster, geometry]
+  - returns: boolean
+    args: [geometry, raster]
+  - returns: boolean
+    args:
+    - name: rastA
+      type: raster
+      description: Input raster
+    - name: rastB
+      type: raster
+      description: Input raster
+---
+
+## Description
+
+Returns `true` if the convex hull of the first argument intersects the second
+argument. Both rasters and geometries are accepted in either argument position.
+When two rasters are provided, their convex hulls are compared.
+
+If the arguments have different CRSes, the geometry is transformed into the
+raster's CRS before evaluating the predicate. For two rasters, the second
+raster's extent is transformed into the first raster's CRS. If the preferred
+transformation fails, the extent of both sides are transformed to WGS 84 as a 
fallback.
+If either argument has no CRS set, the comparison is performed directly without
+CRS transformation.
+
+## Examples
+
+```sql
+SELECT RS_Intersects(RS_Example(), ST_Point(0.5, 0.5, 4326));
+```
+
+```sql
+SELECT RS_Intersects(RS_Example(), RS_Example());
+```
diff --git a/docs/reference/sql/rs_within.qmd b/docs/reference/sql/rs_within.qmd
new file mode 100644
index 00000000..73077b6b
--- /dev/null
+++ b/docs/reference/sql/rs_within.qmd
@@ -0,0 +1,63 @@
+---
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+title: RS_Within
+description: Returns true if the first argument's extent is within the second.
+kernels:
+  - returns: boolean
+    args: [raster, geometry]
+  - returns: boolean
+    args: [geometry, raster]
+  - returns: boolean
+    args:
+    - name: rastA
+      type: raster
+      description: Input raster
+    - name: rastB
+      type: raster
+      description: Input raster
+---
+
+## Description
+
+Returns `true` if the convex hull of the first argument is completely within
+the second argument. This is the inverse of `RS_Contains`:
+`RS_Within(A, B)` is equivalent to `RS_Contains(B, A)`.
+
+Both rasters and geometries are accepted in either argument position.
+When two rasters are provided, their convex hulls are compared.
+
+If the arguments have different CRSes, the geometry is transformed into the
+raster's CRS before evaluating the predicate. For two rasters, the second
+raster's extent is transformed into the first raster's CRS. If the preferred
+transformation fails, the extent of both sides are transformed to WGS 84 as a 
fallback.
+If either argument has no CRS set, the comparison is performed directly without
+CRS transformation.
+
+## Examples
+
+```sql
+SELECT RS_Within(
+    ST_Point(0.5, 0.5, 4326),
+    RS_Example()
+);
+```
+
+```sql
+SELECT RS_Within(RS_Example(), RS_Example());
+```
diff --git a/rust/sedona-functions/src/lib.rs b/rust/sedona-functions/src/lib.rs
index 43c77045..52e428a3 100644
--- a/rust/sedona-functions/src/lib.rs
+++ b/rust/sedona-functions/src/lib.rs
@@ -54,7 +54,7 @@ mod st_pointzm;
 mod st_reverse;
 mod st_rotate;
 mod st_scale;
-mod st_setsrid;
+pub mod st_setsrid;
 mod st_srid;
 mod st_start_point;
 mod st_translate;
diff --git a/rust/sedona-raster-functions/Cargo.toml 
b/rust/sedona-raster-functions/Cargo.toml
index 3eedf160..36ab816f 100644
--- a/rust/sedona-raster-functions/Cargo.toml
+++ b/rust/sedona-raster-functions/Cargo.toml
@@ -41,11 +41,16 @@ sedona-expr = { workspace = true }
 sedona-geometry = { workspace = true }
 sedona-raster = { workspace = true }
 sedona-schema = { workspace = true }
+sedona-tg = { workspace = true }
+sedona-proj = { workspace = true }
 serde_json = { workspace = true }
+wkb = { workspace = true }
 
 [dev-dependencies]
 criterion = { workspace = true }
+geo-traits = { workspace = true }
 sedona-testing = { workspace = true, features = ["criterion"] }
+sedona-proj = { workspace = true, features = ["proj-sys"] }
 rstest = { workspace = true }
 
 [[bench]]
diff --git a/rust/sedona-raster-functions/benches/native-raster-functions.rs 
b/rust/sedona-raster-functions/benches/native-raster-functions.rs
index e9e5146c..2b6ffeef 100644
--- a/rust/sedona-raster-functions/benches/native-raster-functions.rs
+++ b/rust/sedona-raster-functions/benches/native-raster-functions.rs
@@ -14,9 +14,58 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
+use std::sync::Arc;
+
 use criterion::{criterion_group, criterion_main, Criterion};
+use datafusion_common::error::Result;
+use datafusion_expr::{ColumnarValue, Volatility};
+use sedona_expr::scalar_udf::{SedonaScalarKernel, SedonaScalarUDF};
+use sedona_schema::{
+    crs::{lnglat, Crs},
+    datatypes::SedonaType,
+};
 use sedona_testing::benchmark_util::{benchmark, BenchmarkArgSpec::*, 
BenchmarkArgs};
 
+fn sd_apply_default_crs_udf() -> SedonaScalarUDF {
+    SedonaScalarUDF::new(
+        "sd_applydefaultcrs",
+        vec![Arc::new(SDApplyDefaultCRS { crs: lnglat() })],
+        Volatility::Immutable,
+    )
+}
+
+#[derive(Debug)]
+struct SDApplyDefaultCRS {
+    crs: Crs,
+}
+
+impl SedonaScalarKernel for SDApplyDefaultCRS {
+    fn return_type(&self, args: &[SedonaType]) -> Result<Option<SedonaType>> {
+        if args.len() != 1 {
+            return Ok(None);
+        }
+
+        match &args[0] {
+            SedonaType::Wkb(edges, crs) if crs.is_none() => {
+                Ok(Some(SedonaType::Wkb(*edges, self.crs.clone())))
+            }
+            SedonaType::WkbView(edges, crs) if crs.is_none() => {
+                Ok(Some(SedonaType::WkbView(*edges, self.crs.clone())))
+            }
+            SedonaType::Wkb(..) | SedonaType::WkbView(..) => 
Ok(Some(args[0].clone())),
+            _ => Ok(None),
+        }
+    }
+
+    fn invoke_batch(
+        &self,
+        _arg_types: &[SedonaType],
+        args: &[ColumnarValue],
+    ) -> Result<ColumnarValue> {
+        Ok(args[0].clone())
+    }
+}
+
 fn criterion_benchmark(c: &mut Criterion) {
     let f = sedona_raster_functions::register::default_function_set();
 
@@ -164,6 +213,70 @@ fn criterion_benchmark(c: &mut Criterion) {
             Float64(-45.0, 45.0),
         ),
     );
+
+    // RS_Intersects(raster, geometry) - point
+    benchmark::scalar(
+        c,
+        &f,
+        "native-raster",
+        "rs_intersects",
+        BenchmarkArgs::ArrayArray(
+            Raster(64, 64),
+            Transformed(Box::new(Point), sd_apply_default_crs_udf().into()),
+        ),
+    );
+    // RS_Intersects(raster, geometry) - polygon
+    benchmark::scalar(
+        c,
+        &f,
+        "native-raster",
+        "rs_intersects",
+        BenchmarkArgs::ArrayArray(
+            Raster(64, 64),
+            Transformed(Box::new(Polygon(4)), 
sd_apply_default_crs_udf().into()),
+        ),
+    );
+    // RS_Intersects(raster, raster)
+    benchmark::scalar(
+        c,
+        &f,
+        "native-raster",
+        "rs_intersects",
+        BenchmarkArgs::ArrayArray(Raster(64, 64), Raster(64, 64)),
+    );
+    // RS_Contains(raster, geometry) - point
+    benchmark::scalar(
+        c,
+        &f,
+        "native-raster",
+        "rs_contains",
+        BenchmarkArgs::ArrayArray(
+            Raster(64, 64),
+            Transformed(Box::new(Point), sd_apply_default_crs_udf().into()),
+        ),
+    );
+    // RS_Contains(raster, geometry) - polygon
+    benchmark::scalar(
+        c,
+        &f,
+        "native-raster",
+        "rs_contains",
+        BenchmarkArgs::ArrayArray(
+            Raster(64, 64),
+            Transformed(Box::new(Polygon(4)), 
sd_apply_default_crs_udf().into()),
+        ),
+    );
+    // RS_Within(raster, geometry) - polygon
+    benchmark::scalar(
+        c,
+        &f,
+        "native-raster",
+        "rs_within",
+        BenchmarkArgs::ArrayArray(
+            Raster(64, 64),
+            Transformed(Box::new(Polygon(4)), 
sd_apply_default_crs_udf().into()),
+        ),
+    );
 }
 
 criterion_group!(benches, criterion_benchmark);
diff --git a/rust/sedona-raster-functions/src/crs_utils.rs 
b/rust/sedona-raster-functions/src/crs_utils.rs
new file mode 100644
index 00000000..b65c08b2
--- /dev/null
+++ b/rust/sedona-raster-functions/src/crs_utils.rs
@@ -0,0 +1,462 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion_common::{exec_datafusion_err, DataFusionError, Result};
+use sedona_geometry::transform::{transform, CrsEngine};
+use sedona_schema::crs::{deserialize_crs, CoordinateReferenceSystem, Crs};
+use wkb::reader::read_wkb;
+
+/// Resolve an optional CRS string to a concrete CRS object.
+///
+/// - If `crs_str` is `Some` and deserializes to a known CRS, that CRS is 
returned.
+/// - Otherwise (None, empty, "0", etc.), `None` is returned.
+pub fn resolve_crs(crs_str: Option<&str>) -> Result<Crs> {
+    if let Some(crs_str) = crs_str {
+        deserialize_crs(crs_str)
+    } else {
+        Ok(None)
+    }
+}
+
+/// Transform a geometry encoded as WKB from one CRS to another.
+///
+/// This is a utility used by raster/spatial functions to reproject a geometry
+/// without leaking PROJ engine details into call sites.
+///
+/// **Behavior**
+/// - If `from_crs` and `to_crs` are equal, returns the original WKB (clone) 
without decoding.
+/// - Otherwise, builds a PROJ pipeline and transforms all coordinates.
+///
+/// **Errors**
+/// - Returns an error if WKB parsing fails, PROJ cannot build the CRS-to-CRS 
transform,
+///   or if the coordinate transformation itself fails.
+///
+/// **Note**
+/// - For best performance, verify that `from_crs` and `to_crs` are different 
before calling this function
+///   to avoid unnecessary WKB parsing and reprojection when CRSes are 
equivalent.
+pub fn crs_transform_wkb(
+    wkb: &[u8],
+    from_crs: &dyn CoordinateReferenceSystem,
+    to_crs: &dyn CoordinateReferenceSystem,
+    engine: &dyn CrsEngine,
+) -> Result<Vec<u8>> {
+    let crs_transform = engine
+        .get_transform_crs_to_crs(&from_crs.to_crs_string(), 
&to_crs.to_crs_string(), None, "")
+        .map_err(|e| exec_datafusion_err!("CRS transform error: {}", e))?;
+    let geom = read_wkb(wkb).map_err(|e| 
DataFusionError::External(Box::new(e)))?;
+    let mut out = Vec::with_capacity(wkb.len());
+    transform(geom, crs_transform.as_ref(), &mut out)
+        .map_err(|e| exec_datafusion_err!("Transform error: {}", e))?;
+    Ok(out)
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use geo_traits::{CoordTrait, GeometryTrait, GeometryType, PointTrait};
+    use sedona_proj::transform::with_global_proj_engine;
+    use sedona_testing::create::make_wkb;
+
+    /// A simple WKB point at (1.0, 2.0) used across all tests.
+    fn sample_wkb() -> Vec<u8> {
+        make_wkb("POINT (1.0 2.0)")
+    }
+
+    fn transform_wkb(
+        wkb: &[u8],
+        from: Option<&str>,
+        to: Option<&str>,
+        engine: &dyn CrsEngine,
+    ) -> Result<Vec<u8>> {
+        let from_crs = resolve_crs(from)?;
+        let to_crs = resolve_crs(to)?;
+        match (from_crs, to_crs) {
+            (Some(from_crs), Some(to_crs)) => {
+                crs_transform_wkb(wkb, from_crs.as_ref(), to_crs.as_ref(), 
engine)
+            }
+            _ => Ok(wkb.to_vec()),
+        }
+    }
+
+    // -----------------------------------------------------------------------
+    // Case 1: Both CRSes are empty / None / "0" (all combinations)
+    //
+    // All of these resolve to None. Since both are None, no transformation
+    // is performed and the original WKB is returned.
+    // -----------------------------------------------------------------------
+
+    #[test]
+    fn both_none() {
+        let wkb = sample_wkb();
+        with_global_proj_engine(|engine| {
+            let result = transform_wkb(&wkb, None, None, engine).unwrap();
+            assert_eq!(result, wkb);
+            Ok(())
+        })
+        .unwrap();
+    }
+
+    #[test]
+    fn both_empty_string() {
+        let wkb = sample_wkb();
+        with_global_proj_engine(|engine| {
+            let result = transform_wkb(&wkb, Some(""), Some(""), 
engine).unwrap();
+            assert_eq!(result, wkb);
+            Ok(())
+        })
+        .unwrap();
+    }
+
+    #[test]
+    fn both_zero() {
+        let wkb = sample_wkb();
+        with_global_proj_engine(|engine| {
+            let result = transform_wkb(&wkb, Some("0"), Some("0"), 
engine).unwrap();
+            assert_eq!(result, wkb);
+            Ok(())
+        })
+        .unwrap();
+    }
+
+    #[test]
+    fn from_empty_to_zero() {
+        let wkb = sample_wkb();
+        with_global_proj_engine(|engine| {
+            let result = transform_wkb(&wkb, Some(""), Some("0"), 
engine).unwrap();
+            assert_eq!(result, wkb);
+            Ok(())
+        })
+        .unwrap();
+    }
+
+    #[test]
+    fn from_zero_to_empty() {
+        let wkb = sample_wkb();
+        with_global_proj_engine(|engine| {
+            let result = transform_wkb(&wkb, Some("0"), Some(""), 
engine).unwrap();
+            assert_eq!(result, wkb);
+            Ok(())
+        })
+        .unwrap();
+    }
+
+    #[test]
+    fn from_none_to_empty() {
+        let wkb = sample_wkb();
+        with_global_proj_engine(|engine| {
+            let result = transform_wkb(&wkb, None, Some(""), engine).unwrap();
+            assert_eq!(result, wkb);
+            Ok(())
+        })
+        .unwrap();
+    }
+
+    #[test]
+    fn from_none_to_zero() {
+        let wkb = sample_wkb();
+        with_global_proj_engine(|engine| {
+            let result = transform_wkb(&wkb, None, Some("0"), engine).unwrap();
+            assert_eq!(result, wkb);
+            Ok(())
+        })
+        .unwrap();
+    }
+
+    #[test]
+    fn from_empty_to_none() {
+        let wkb = sample_wkb();
+        with_global_proj_engine(|engine| {
+            let result = transform_wkb(&wkb, Some(""), None, engine).unwrap();
+            assert_eq!(result, wkb);
+            Ok(())
+        })
+        .unwrap();
+    }
+
+    #[test]
+    fn from_zero_to_none() {
+        let wkb = sample_wkb();
+        with_global_proj_engine(|engine| {
+            let result = transform_wkb(&wkb, Some("0"), None, engine).unwrap();
+            assert_eq!(result, wkb);
+            Ok(())
+        })
+        .unwrap();
+    }
+
+    // -----------------------------------------------------------------------
+    // Case 2: One is empty/None/"0", the other is a real (non-WGS84) CRS
+    //
+    // Empty/"0"/None resolve to None. When one side has no CRS, no
+    // transformation can be performed, so the original WKB is returned.
+    // -----------------------------------------------------------------------
+
+    #[test]
+    fn from_empty_to_real_crs() {
+        let wkb = sample_wkb();
+        with_global_proj_engine(|engine| {
+            let result = transform_wkb(&wkb, Some(""), Some("EPSG:3857"), 
engine).unwrap();
+            // One side is None, so no transformation — WKB returned unchanged.
+            assert_eq!(result, wkb);
+            Ok(())
+        })
+        .unwrap();
+    }
+
+    #[test]
+    fn from_zero_to_real_crs() {
+        let wkb = sample_wkb();
+        with_global_proj_engine(|engine| {
+            let result = transform_wkb(&wkb, Some("0"), Some("EPSG:3857"), 
engine).unwrap();
+            assert_eq!(result, wkb);
+            Ok(())
+        })
+        .unwrap();
+    }
+
+    #[test]
+    fn from_real_crs_to_empty() {
+        let wkb = sample_wkb();
+        with_global_proj_engine(|engine| {
+            let result = transform_wkb(&wkb, Some("EPSG:3857"), Some(""), 
engine).unwrap();
+            assert_eq!(result, wkb);
+            Ok(())
+        })
+        .unwrap();
+    }
+
+    #[test]
+    fn from_real_crs_to_zero() {
+        let wkb = sample_wkb();
+        with_global_proj_engine(|engine| {
+            let result = transform_wkb(&wkb, Some("EPSG:3857"), Some("0"), 
engine).unwrap();
+            assert_eq!(result, wkb);
+            Ok(())
+        })
+        .unwrap();
+    }
+
+    // -----------------------------------------------------------------------
+    // Case 3: Both are real CRSes that are equivalent
+    //
+    // The fast-path (crs_equals) should detect equality and return the
+    // original WKB without invoking the PROJ pipeline.
+    // -----------------------------------------------------------------------
+
+    #[test]
+    fn both_epsg_4326() {
+        let wkb = sample_wkb();
+        with_global_proj_engine(|engine| {
+            let result = transform_wkb(&wkb, Some("EPSG:4326"), 
Some("EPSG:4326"), engine).unwrap();
+            assert_eq!(result, wkb);
+            Ok(())
+        })
+        .unwrap();
+    }
+
+    #[test]
+    fn both_ogc_crs84() {
+        let wkb = sample_wkb();
+        with_global_proj_engine(|engine| {
+            let result = transform_wkb(&wkb, Some("OGC:CRS84"), 
Some("OGC:CRS84"), engine).unwrap();
+            assert_eq!(result, wkb);
+            Ok(())
+        })
+        .unwrap();
+    }
+
+    #[test]
+    fn epsg_4326_vs_ogc_crs84() {
+        // EPSG:4326 and OGC:CRS84 are treated as equivalent lnglat CRSes.
+        let wkb = sample_wkb();
+        with_global_proj_engine(|engine| {
+            let result = transform_wkb(&wkb, Some("EPSG:4326"), 
Some("OGC:CRS84"), engine).unwrap();
+            assert_eq!(result, wkb);
+            Ok(())
+        })
+        .unwrap();
+    }
+
+    #[test]
+    fn ogc_crs84_vs_epsg_4326() {
+        let wkb = sample_wkb();
+        with_global_proj_engine(|engine| {
+            let result = transform_wkb(&wkb, Some("OGC:CRS84"), 
Some("EPSG:4326"), engine).unwrap();
+            assert_eq!(result, wkb);
+            Ok(())
+        })
+        .unwrap();
+    }
+
+    #[test]
+    fn both_epsg_3857() {
+        let wkb = sample_wkb();
+        with_global_proj_engine(|engine| {
+            let result = transform_wkb(&wkb, Some("EPSG:3857"), 
Some("EPSG:3857"), engine).unwrap();
+            assert_eq!(result, wkb);
+            Ok(())
+        })
+        .unwrap();
+    }
+
+    // -----------------------------------------------------------------------
+    // Case 3.5: One is empty/None/"0", the other is WGS84-equivalent
+    //
+    // Empty/"0"/None resolve to None. When one side is None, no
+    // transformation is performed; the original WKB is returned.
+    // -----------------------------------------------------------------------
+
+    #[test]
+    fn from_none_to_epsg_4326() {
+        let wkb = sample_wkb();
+        with_global_proj_engine(|engine| {
+            let result = transform_wkb(&wkb, None, Some("EPSG:4326"), 
engine).unwrap();
+            assert_eq!(result, wkb);
+            Ok(())
+        })
+        .unwrap();
+    }
+
+    #[test]
+    fn from_epsg_4326_to_none() {
+        let wkb = sample_wkb();
+        with_global_proj_engine(|engine| {
+            let result = transform_wkb(&wkb, Some("EPSG:4326"), None, 
engine).unwrap();
+            assert_eq!(result, wkb);
+            Ok(())
+        })
+        .unwrap();
+    }
+
+    #[test]
+    fn from_none_to_ogc_crs84() {
+        let wkb = sample_wkb();
+        with_global_proj_engine(|engine| {
+            let result = transform_wkb(&wkb, None, Some("OGC:CRS84"), 
engine).unwrap();
+            assert_eq!(result, wkb);
+            Ok(())
+        })
+        .unwrap();
+    }
+
+    #[test]
+    fn from_ogc_crs84_to_none() {
+        let wkb = sample_wkb();
+        with_global_proj_engine(|engine| {
+            let result = transform_wkb(&wkb, Some("OGC:CRS84"), None, 
engine).unwrap();
+            assert_eq!(result, wkb);
+            Ok(())
+        })
+        .unwrap();
+    }
+
+    #[test]
+    fn from_empty_to_epsg_4326() {
+        let wkb = sample_wkb();
+        with_global_proj_engine(|engine| {
+            let result = transform_wkb(&wkb, Some(""), Some("EPSG:4326"), 
engine).unwrap();
+            assert_eq!(result, wkb);
+            Ok(())
+        })
+        .unwrap();
+    }
+
+    #[test]
+    fn from_epsg_4326_to_empty() {
+        let wkb = sample_wkb();
+        with_global_proj_engine(|engine| {
+            let result = transform_wkb(&wkb, Some("EPSG:4326"), Some(""), 
engine).unwrap();
+            assert_eq!(result, wkb);
+            Ok(())
+        })
+        .unwrap();
+    }
+
+    #[test]
+    fn from_zero_to_epsg_4326() {
+        let wkb = sample_wkb();
+        with_global_proj_engine(|engine| {
+            let result = transform_wkb(&wkb, Some("0"), Some("EPSG:4326"), 
engine).unwrap();
+            assert_eq!(result, wkb);
+            Ok(())
+        })
+        .unwrap();
+    }
+
+    #[test]
+    fn from_epsg_4326_to_zero() {
+        let wkb = sample_wkb();
+        with_global_proj_engine(|engine| {
+            let result = transform_wkb(&wkb, Some("EPSG:4326"), Some("0"), 
engine).unwrap();
+            assert_eq!(result, wkb);
+            Ok(())
+        })
+        .unwrap();
+    }
+
+    // -----------------------------------------------------------------------
+    // Case 4: Both are real CRSes that are NOT equivalent
+    //
+    // An actual coordinate transformation should occur. The output WKB must
+    // differ from the input.
+    // -----------------------------------------------------------------------
+
+    #[test]
+    fn transform_4326_to_3857() {
+        // EPSG:4326 (WGS84) -> EPSG:3857 (Web Mercator) should change the 
coordinates.
+        let wkb = sample_wkb();
+        with_global_proj_engine(|engine| {
+            let result = transform_wkb(&wkb, Some("EPSG:4326"), 
Some("EPSG:3857"), engine).unwrap();
+            assert_ne!(result, wkb, "Coordinates should change after 
reprojection");
+            Ok(())
+        })
+        .unwrap();
+    }
+
+    #[test]
+    fn transform_3857_to_4326() {
+        // The reverse direction should also produce a different WKB.
+        let wkb = sample_wkb();
+        with_global_proj_engine(|engine| {
+            let result = transform_wkb(&wkb, Some("EPSG:3857"), 
Some("EPSG:4326"), engine).unwrap();
+            assert_ne!(result, wkb, "Coordinates should change after 
reprojection");
+            Ok(())
+        })
+        .unwrap();
+    }
+
+    #[test]
+    fn roundtrip_4326_3857_4326() {
+        // Transform 4326 -> 3857 -> 4326 should approximately recover the 
original point.
+        let wkb = sample_wkb();
+        with_global_proj_engine(|engine| {
+            let projected =
+                transform_wkb(&wkb, Some("EPSG:4326"), Some("EPSG:3857"), 
engine).unwrap();
+            let roundtripped =
+                transform_wkb(&projected, Some("EPSG:3857"), 
Some("EPSG:4326"), engine).unwrap();
+            let wkb = wkb::reader::read_wkb(&roundtripped).unwrap();
+            let GeometryType::Point(p) = wkb.as_type() else {
+                panic!("Expected a Point geometry");
+            };
+            let coord = p.coord().unwrap();
+            assert_eq!(coord.x().round(), 1.0);
+            assert_eq!(coord.y().round(), 2.0);
+            Ok(())
+        })
+        .unwrap();
+    }
+}
diff --git a/rust/sedona-raster-functions/src/executor.rs 
b/rust/sedona-raster-functions/src/executor.rs
index 4f995c48..aa71f711 100644
--- a/rust/sedona-raster-functions/src/executor.rs
+++ b/rust/sedona-raster-functions/src/executor.rs
@@ -15,12 +15,17 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use arrow_array::{Array, ArrayRef, StructArray};
+use arrow_array::{Array, ArrayRef, BinaryArray, BinaryViewArray, 
StringViewArray, StructArray};
+use arrow_schema::DataType;
+use datafusion_common::cast::{
+    as_binary_array, as_binary_view_array, as_string_view_array, 
as_struct_array,
+};
 use datafusion_common::error::Result;
-use datafusion_common::ScalarValue;
+use datafusion_common::{exec_err, ScalarValue};
 use datafusion_expr::ColumnarValue;
 use sedona_common::{sedona_internal_datafusion_err, sedona_internal_err};
 use sedona_raster::array::{RasterRefImpl, RasterStructArray};
+use sedona_schema::crs::{deserialize_crs, Crs, CrsRef};
 use sedona_schema::datatypes::SedonaType;
 use sedona_schema::datatypes::RASTER;
 
@@ -35,6 +40,176 @@ pub struct RasterExecutor<'a, 'b> {
     num_iterations: usize,
 }
 
+// The accessor types below use enum-based dispatch to handle different Arrow
+// array representations (Binary vs BinaryView, etc.) rather than trait objects
+// like `Box<dyn Iterator>`. Both approaches involve dynamic dispatch, but the
+// enum variant is simpler and avoids an extra heap allocation. Since raster
+// operations are expensive relative to per-element dispatch overhead, the cost
+// of matching on each access is negligible in practice.
+#[derive(Clone)]
+enum ItemWkbAccessor {
+    Binary(BinaryArray),
+    BinaryView(BinaryViewArray),
+}
+
+impl ItemWkbAccessor {
+    #[inline]
+    fn get(&self, i: usize) -> Option<&[u8]> {
+        match self {
+            Self::Binary(arr) => {
+                if arr.is_null(i) {
+                    None
+                } else {
+                    Some(arr.value(i))
+                }
+            }
+            Self::BinaryView(arr) => {
+                if arr.is_null(i) {
+                    None
+                } else {
+                    Some(arr.value(i))
+                }
+            }
+        }
+    }
+}
+
+// Same enum-dispatch rationale as `ItemWkbAccessor` above: the per-element
+// match cost is dwarfed by the raster and CRS operations performed on each 
row.
+enum GeomWkbCrsAccessor {
+    WkbArray {
+        wkb: ItemWkbAccessor,
+        static_crs: Crs,
+    },
+    WkbScalar {
+        wkb: Option<Vec<u8>>,
+        static_crs: Crs,
+    },
+    ItemCrsArray {
+        struct_array: StructArray,
+        item: ItemWkbAccessor,
+        crs: StringViewArray,
+        item_static_crs: Crs,
+        resolved_crs: Crs,
+    },
+    ItemCrsScalar {
+        struct_array: StructArray,
+        item: ItemWkbAccessor,
+        crs: StringViewArray,
+        item_static_crs: Crs,
+        resolved_crs: Crs,
+    },
+    Null,
+}
+
+impl GeomWkbCrsAccessor {
+    #[inline]
+    fn get(&mut self, i: usize) -> Result<(Option<&[u8]>, CrsRef<'_>)> {
+        match self {
+            Self::Null => Ok((None, None)),
+            Self::WkbArray { wkb, static_crs } => {
+                let maybe_wkb = wkb.get(i);
+                if maybe_wkb.is_none() {
+                    return Ok((None, None));
+                }
+                Ok((maybe_wkb, static_crs.as_deref()))
+            }
+            Self::WkbScalar { wkb, static_crs } => {
+                if wkb.is_none() {
+                    return Ok((None, None));
+                }
+                let _ = i;
+                Ok((wkb.as_deref(), static_crs.as_deref()))
+            }
+            Self::ItemCrsArray {
+                struct_array,
+                item,
+                crs,
+                item_static_crs,
+                resolved_crs,
+            } => {
+                if struct_array.is_null(i) {
+                    return Ok((None, None));
+                }
+
+                let maybe_wkb = item.get(i);
+                if maybe_wkb.is_none() {
+                    return Ok((None, None));
+                }
+
+                let item_crs_str = if crs.is_null(i) {
+                    None
+                } else {
+                    Some(crs.value(i))
+                };
+                *resolved_crs = resolve_item_crs(item_crs_str, 
item_static_crs)?;
+                Ok((maybe_wkb, resolved_crs.as_deref()))
+            }
+            Self::ItemCrsScalar {
+                struct_array,
+                item,
+                crs,
+                item_static_crs,
+                resolved_crs,
+            } => {
+                if struct_array.is_null(0) {
+                    return Ok((None, None));
+                }
+
+                let maybe_wkb = item.get(0);
+                if maybe_wkb.is_none() {
+                    return Ok((None, None));
+                }
+
+                let item_crs_str = if crs.is_null(0) {
+                    None
+                } else {
+                    Some(crs.value(0))
+                };
+                *resolved_crs = resolve_item_crs(item_crs_str, 
item_static_crs)?;
+                let _ = i;
+                Ok((maybe_wkb, resolved_crs.as_deref()))
+            }
+        }
+    }
+}
+
+fn resolve_item_crs(item_crs_str: Option<&str>, static_crs: &Crs) -> 
Result<Crs> {
+    let item_crs = if let Some(s) = item_crs_str {
+        deserialize_crs(s)?
+    } else {
+        None
+    };
+
+    match (&item_crs, static_crs) {
+        (None, None) => Ok(None),
+        (Some(_), None) => Ok(item_crs),
+        (None, Some(_)) => Ok(static_crs.clone()),
+        (Some(_), Some(_)) => {
+            if item_crs == *static_crs {
+                Ok(item_crs)
+            } else {
+                exec_err!("CRS values not equal: {item_crs:?} vs 
{static_crs:?}")
+            }
+        }
+    }
+}
+
+fn crs_from_sedona_type(sedona_type: &SedonaType) -> Crs {
+    match sedona_type {
+        SedonaType::Wkb(_, crs) | SedonaType::WkbView(_, crs) => crs.clone(),
+        _ => None,
+    }
+}
+
+fn is_item_crs_type(sedona_type: &SedonaType) -> bool {
+    matches!(
+        sedona_type,
+        SedonaType::Arrow(DataType::Struct(fields))
+            if fields.len() == 2 && fields[0].name() == "item" && 
fields[1].name() == "crs"
+    )
+}
+
 impl<'a, 'b> RasterExecutor<'a, 'b> {
     /// Create a new [RasterExecutor]
     pub fn new(arg_types: &'a [SedonaType], args: &'b [ColumnarValue]) -> Self 
{
@@ -45,6 +220,29 @@ impl<'a, 'b> RasterExecutor<'a, 'b> {
         }
     }
 
+    /// Create a new [RasterExecutor] with an explicit number of iterations.
+    ///
+    /// This is useful when the executor is built from a subset of the original
+    /// arguments (e.g. only raster + geometry) but the overall UDF should 
still
+    /// iterate according to other array arguments.
+    #[cfg(test)]
+    pub fn new_with_num_iterations(
+        arg_types: &'a [SedonaType],
+        args: &'b [ColumnarValue],
+        num_iterations: usize,
+    ) -> Self {
+        let has_any_array = args.iter().any(|a| matches!(a, 
ColumnarValue::Array(_)));
+        Self {
+            arg_types,
+            args,
+            num_iterations: if has_any_array {
+                Self::calc_num_iterations(args)
+            } else {
+                num_iterations
+            },
+        }
+    }
+
     /// Return the number of iterations that will be performed
     pub fn num_iterations(&self) -> usize {
         self.num_iterations
@@ -114,6 +312,367 @@ impl<'a, 'b> RasterExecutor<'a, 'b> {
         }
     }
 
+    /// Execute a function by iterating over two raster arguments in sync.
+    ///
+    /// Supports Array/Array, Array/Scalar, Scalar/Array, and Scalar/Scalar.
+    /// If both inputs are arrays, their lengths must match `num_iterations`.
+    pub fn execute_raster_raster_void<F>(&self, mut func: F) -> Result<()>
+    where
+        F: FnMut(usize, Option<&RasterRefImpl<'_>>, 
Option<&RasterRefImpl<'_>>) -> Result<()>,
+    {
+        if self.arg_types.first() != Some(&RASTER) || self.arg_types.get(1) != 
Some(&RASTER) {
+            return sedona_internal_err!("Expected (raster, raster) argument 
types");
+        }
+        if self.args.len() < 2 {
+            return sedona_internal_err!("Expected at least 2 arguments 
(raster, raster)");
+        }
+
+        match (&self.args[0], &self.args[1]) {
+            (ColumnarValue::Array(a0), ColumnarValue::Array(a1)) => {
+                let s0 = 
a0.as_any().downcast_ref::<StructArray>().ok_or_else(|| {
+                    sedona_internal_datafusion_err!("Expected StructArray for 
raster data")
+                })?;
+                let s1 = 
a1.as_any().downcast_ref::<StructArray>().ok_or_else(|| {
+                    sedona_internal_datafusion_err!("Expected StructArray for 
raster data")
+                })?;
+
+                let arr0 = RasterStructArray::new(s0);
+                let arr1 = RasterStructArray::new(s1);
+                if arr0.len() != self.num_iterations || arr1.len() != 
self.num_iterations {
+                    return sedona_internal_err!(
+                        "Expected arrays of length {} but got ({}, {})",
+                        self.num_iterations,
+                        arr0.len(),
+                        arr1.len()
+                    );
+                }
+
+                for i in 0..self.num_iterations {
+                    let r0 = if arr0.is_null(i) {
+                        None
+                    } else {
+                        Some(arr0.get(i)?)
+                    };
+                    let r1 = if arr1.is_null(i) {
+                        None
+                    } else {
+                        Some(arr1.get(i)?)
+                    };
+                    func(i, r0.as_ref(), r1.as_ref())?;
+                }
+                Ok(())
+            }
+            (ColumnarValue::Array(a0), ColumnarValue::Scalar(sv1)) => {
+                let s0 = 
a0.as_any().downcast_ref::<StructArray>().ok_or_else(|| {
+                    sedona_internal_datafusion_err!("Expected StructArray for 
raster data")
+                })?;
+                let arr0 = RasterStructArray::new(s0);
+                if arr0.len() != self.num_iterations {
+                    return sedona_internal_err!(
+                        "Expected array of length {} but got {}",
+                        self.num_iterations,
+                        arr0.len()
+                    );
+                }
+                let r1 = match sv1 {
+                    ScalarValue::Struct(arc_struct) => {
+                        let arr1 = RasterStructArray::new(arc_struct.as_ref());
+                        if arr1.is_null(0) {
+                            None
+                        } else {
+                            Some(arr1.get(0)?)
+                        }
+                    }
+                    ScalarValue::Null => None,
+                    _ => {
+                        return sedona_internal_err!("Expected Struct scalar 
for raster");
+                    }
+                };
+
+                for i in 0..self.num_iterations {
+                    let r0 = if arr0.is_null(i) {
+                        None
+                    } else {
+                        Some(arr0.get(i)?)
+                    };
+                    func(i, r0.as_ref(), r1.as_ref())?;
+                }
+                Ok(())
+            }
+            (ColumnarValue::Scalar(sv0), ColumnarValue::Array(a1)) => {
+                let s1 = 
a1.as_any().downcast_ref::<StructArray>().ok_or_else(|| {
+                    sedona_internal_datafusion_err!("Expected StructArray for 
raster data")
+                })?;
+                let arr1 = RasterStructArray::new(s1);
+                if arr1.len() != self.num_iterations {
+                    return sedona_internal_err!(
+                        "Expected array of length {} but got {}",
+                        self.num_iterations,
+                        arr1.len()
+                    );
+                }
+                let r0 = match sv0 {
+                    ScalarValue::Struct(arc_struct) => {
+                        let arr0 = RasterStructArray::new(arc_struct.as_ref());
+                        if arr0.is_null(0) {
+                            None
+                        } else {
+                            Some(arr0.get(0)?)
+                        }
+                    }
+                    ScalarValue::Null => None,
+                    _ => {
+                        return sedona_internal_err!("Expected Struct scalar 
for raster");
+                    }
+                };
+
+                for i in 0..self.num_iterations {
+                    let r1 = if arr1.is_null(i) {
+                        None
+                    } else {
+                        Some(arr1.get(i)?)
+                    };
+                    func(i, r0.as_ref(), r1.as_ref())?;
+                }
+                Ok(())
+            }
+            (ColumnarValue::Scalar(sv0), ColumnarValue::Scalar(sv1)) => {
+                let r0 = match sv0 {
+                    ScalarValue::Struct(arc_struct) => {
+                        let arr0 = RasterStructArray::new(arc_struct.as_ref());
+                        if arr0.is_null(0) {
+                            None
+                        } else {
+                            Some(arr0.get(0)?)
+                        }
+                    }
+                    ScalarValue::Null => None,
+                    _ => {
+                        return sedona_internal_err!("Expected Struct scalar 
for raster");
+                    }
+                };
+                let r1 = match sv1 {
+                    ScalarValue::Struct(arc_struct) => {
+                        let arr1 = RasterStructArray::new(arc_struct.as_ref());
+                        if arr1.is_null(0) {
+                            None
+                        } else {
+                            Some(arr1.get(0)?)
+                        }
+                    }
+                    ScalarValue::Null => None,
+                    _ => {
+                        return sedona_internal_err!("Expected Struct scalar 
for raster");
+                    }
+                };
+
+                for i in 0..self.num_iterations {
+                    func(i, r0.as_ref(), r1.as_ref())?;
+                }
+                Ok(())
+            }
+        }
+    }
+
+    /// Execute a function by iterating over rasters and a geometry in sync.
+    ///
+    /// The geometry argument may be:
+    /// - A WKB Binary or BinaryView array/scalar (with type-level CRS via 
[SedonaType])
+    /// - An item-level CRS struct column: struct(item: wkb, crs: utf8view)
+    ///
+    /// The closure is invoked for each row with `(raster, wkb_bytes, 
crs_str)`.
+    pub fn execute_raster_wkb_crs_void<F>(&self, mut func: F) -> Result<()>
+    where
+        F: FnMut(Option<&RasterRefImpl<'_>>, Option<&[u8]>, CrsRef<'_>) -> 
Result<()>,
+    {
+        if self.arg_types.first() != Some(&RASTER) {
+            return sedona_internal_err!("First argument must be a raster 
type");
+        }
+        if self.args.len() < 2 {
+            return sedona_internal_err!("Expected at least 2 arguments 
(raster, geom)");
+        }
+
+        let mut geom_accessor = self.make_geom_wkb_crs_accessor(1)?;
+
+        match &self.args[0] {
+            ColumnarValue::Array(array) => {
+                let raster_struct =
+                    array
+                        .as_any()
+                        .downcast_ref::<StructArray>()
+                        .ok_or_else(|| {
+                            sedona_internal_datafusion_err!("Expected 
StructArray for raster data")
+                        })?;
+                let raster_array = RasterStructArray::new(raster_struct);
+
+                for i in 0..self.num_iterations {
+                    let (maybe_wkb, maybe_crs) = geom_accessor.get(i)?;
+                    if raster_array.is_null(i) {
+                        func(None, maybe_wkb, maybe_crs)?;
+                        continue;
+                    }
+                    let raster = raster_array.get(i)?;
+                    func(Some(&raster), maybe_wkb, maybe_crs)?;
+                }
+
+                Ok(())
+            }
+            ColumnarValue::Scalar(scalar_value) => match scalar_value {
+                ScalarValue::Struct(arc_struct) => {
+                    let raster_array = 
RasterStructArray::new(arc_struct.as_ref());
+                    let raster_opt = if raster_array.is_null(0) {
+                        None
+                    } else {
+                        Some(raster_array.get(0)?)
+                    };
+                    for i in 0..self.num_iterations {
+                        let (maybe_wkb, maybe_crs) = geom_accessor.get(i)?;
+                        func(raster_opt.as_ref(), maybe_wkb, maybe_crs)?;
+                    }
+                    Ok(())
+                }
+                ScalarValue::Null => {
+                    for i in 0..self.num_iterations {
+                        let (maybe_wkb, maybe_crs) = geom_accessor.get(i)?;
+                        func(None, maybe_wkb, maybe_crs)?;
+                    }
+                    Ok(())
+                }
+                _ => sedona_internal_err!("Expected Struct scalar for raster"),
+            },
+        }
+    }
+
+    fn make_geom_wkb_crs_accessor(&self, arg_index: usize) -> 
Result<GeomWkbCrsAccessor> {
+        let sedona_type = self
+            .arg_types
+            .get(arg_index)
+            .ok_or_else(|| sedona_internal_datafusion_err!("Missing argument 
type"))?;
+        let arg = self
+            .args
+            .get(arg_index)
+            .ok_or_else(|| sedona_internal_datafusion_err!("Missing 
argument"))?;
+
+        if is_item_crs_type(sedona_type) {
+            let item_type = match sedona_type {
+                SedonaType::Arrow(DataType::Struct(fields)) => {
+                    SedonaType::from_storage_field(&fields[0])?
+                }
+                _ => return sedona_internal_err!("Unexpected item_crs type"),
+            };
+            let item_static_crs = crs_from_sedona_type(&item_type);
+
+            match arg {
+                ColumnarValue::Array(array) => {
+                    let struct_array_ref = as_struct_array(array)?;
+                    let item_col = struct_array_ref.column(0);
+                    let crs_col = struct_array_ref.column(1);
+                    let crs_array = as_string_view_array(crs_col)?.clone();
+                    let item_accessor = match &item_type {
+                        SedonaType::Wkb(_, _) => {
+                            
ItemWkbAccessor::Binary(as_binary_array(item_col)?.clone())
+                        }
+                        SedonaType::WkbView(_, _) => {
+                            
ItemWkbAccessor::BinaryView(as_binary_view_array(item_col)?.clone())
+                        }
+                        SedonaType::Arrow(DataType::Binary) => {
+                            
ItemWkbAccessor::Binary(as_binary_array(item_col)?.clone())
+                        }
+                        SedonaType::Arrow(DataType::BinaryView) => {
+                            
ItemWkbAccessor::BinaryView(as_binary_view_array(item_col)?.clone())
+                        }
+                        other => {
+                            return sedona_internal_err!(
+                                "Unsupported item_crs item field type: 
{other:?}"
+                            );
+                        }
+                    };
+
+                    Ok(GeomWkbCrsAccessor::ItemCrsArray {
+                        struct_array: struct_array_ref.clone(),
+                        item: item_accessor,
+                        crs: crs_array,
+                        item_static_crs,
+                        resolved_crs: None,
+                    })
+                }
+                ColumnarValue::Scalar(ScalarValue::Struct(struct_scalar)) => {
+                    let struct_array_ref = struct_scalar.as_ref();
+                    let item_col = struct_array_ref.column(0);
+                    let crs_col = struct_array_ref.column(1);
+                    let crs_array = as_string_view_array(crs_col)?.clone();
+                    let item_accessor = match &item_type {
+                        SedonaType::Wkb(_, _) => {
+                            
ItemWkbAccessor::Binary(as_binary_array(item_col)?.clone())
+                        }
+                        SedonaType::WkbView(_, _) => {
+                            
ItemWkbAccessor::BinaryView(as_binary_view_array(item_col)?.clone())
+                        }
+                        SedonaType::Arrow(DataType::Binary) => {
+                            
ItemWkbAccessor::Binary(as_binary_array(item_col)?.clone())
+                        }
+                        SedonaType::Arrow(DataType::BinaryView) => {
+                            
ItemWkbAccessor::BinaryView(as_binary_view_array(item_col)?.clone())
+                        }
+                        other => {
+                            return sedona_internal_err!(
+                                "Unsupported item_crs item field type: 
{other:?}"
+                            );
+                        }
+                    };
+
+                    Ok(GeomWkbCrsAccessor::ItemCrsScalar {
+                        struct_array: struct_array_ref.clone(),
+                        item: item_accessor,
+                        crs: crs_array,
+                        item_static_crs,
+                        resolved_crs: None,
+                    })
+                }
+                ColumnarValue::Scalar(ScalarValue::Null) => 
Ok(GeomWkbCrsAccessor::Null),
+                other => {
+                    sedona_internal_err!("Expected item_crs Struct 
scalar/array, got {other:?}")
+                }
+            }
+        } else {
+            let static_crs = crs_from_sedona_type(sedona_type);
+            match arg {
+                ColumnarValue::Array(array) => match sedona_type {
+                    SedonaType::Wkb(_, _) | 
SedonaType::Arrow(DataType::Binary) => {
+                        Ok(GeomWkbCrsAccessor::WkbArray {
+                            wkb: 
ItemWkbAccessor::Binary(as_binary_array(array)?.clone()),
+                            static_crs,
+                        })
+                    }
+                    SedonaType::WkbView(_, _) | 
SedonaType::Arrow(DataType::BinaryView) => {
+                        Ok(GeomWkbCrsAccessor::WkbArray {
+                            wkb: 
ItemWkbAccessor::BinaryView(as_binary_view_array(array)?.clone()),
+                            static_crs,
+                        })
+                    }
+                    other => sedona_internal_err!("Unsupported geometry type: 
{other:?}"),
+                },
+                ColumnarValue::Scalar(scalar_value) => {
+                    let maybe_bytes: Option<Vec<u8>> = match scalar_value {
+                        ScalarValue::Binary(v)
+                        | ScalarValue::BinaryView(v)
+                        | ScalarValue::LargeBinary(v) => v.clone(),
+                        ScalarValue::Null => None,
+                        other => {
+                            return sedona_internal_err!(
+                                "Unsupported geometry scalar type: {other:?}"
+                            )
+                        }
+                    };
+                    Ok(GeomWkbCrsAccessor::WkbScalar {
+                        wkb: maybe_bytes,
+                        static_crs,
+                    })
+                }
+            }
+        }
+    }
+
     /// Finish an [ArrayRef] output as the appropriate [ColumnarValue]
     ///
     /// Converts the output into a [ColumnarValue::Scalar] if all arguments 
were scalars,
@@ -156,8 +715,12 @@ mod tests {
     use super::*;
     use arrow_array::builder::UInt64Builder;
     use arrow_array::UInt64Array;
+    use arrow_schema::Field;
     use sedona_raster::traits::RasterRef;
+    use sedona_schema::crs::{deserialize_crs, lnglat};
     use sedona_schema::datatypes::RASTER;
+    use sedona_schema::datatypes::{Edges, WKB_GEOMETRY, WKB_VIEW_GEOMETRY};
+    use sedona_testing::create::{create_array, create_array_item_crs};
     use sedona_testing::rasters::generate_test_rasters;
     use std::sync::Arc;
 
@@ -274,4 +837,160 @@ mod tests {
 
         assert_eq!(width_scalar, &ScalarValue::UInt64(None));
     }
+
+    #[test]
+    fn test_raster_executor_execute_raster_wkb_crs_void_static_crs() {
+        let rasters = generate_test_rasters(2, None).unwrap();
+        let raster_args = ColumnarValue::Array(Arc::new(rasters));
+
+        let geom_type = SedonaType::Wkb(Edges::Planar, lnglat());
+        let geom_array = create_array(&[Some("POINT (0 0)"), None], 
&geom_type);
+        let geom_args = ColumnarValue::Array(geom_array);
+
+        let args = [raster_args, geom_args];
+        let arg_types = vec![RASTER, geom_type];
+        let executor = RasterExecutor::new(&arg_types, &args);
+
+        let expected_crs = deserialize_crs("EPSG:4326").unwrap().unwrap();
+        let mut out_crs_matches: Vec<Option<bool>> = 
Vec::with_capacity(executor.num_iterations());
+        let mut out_has_wkb: Vec<bool> = 
Vec::with_capacity(executor.num_iterations());
+        executor
+            .execute_raster_wkb_crs_void(|_raster, wkb, crs| {
+                out_has_wkb.push(wkb.is_some());
+                out_crs_matches.push(crs.map(|c| 
c.crs_equals(expected_crs.as_ref())));
+                Ok(())
+            })
+            .unwrap();
+
+        assert_eq!(out_has_wkb, vec![true, false]);
+        assert_eq!(out_crs_matches, vec![Some(true), None]);
+    }
+
+    #[test]
+    fn test_raster_executor_execute_raster_wkb_crs_void_item_crs_row_level() {
+        let rasters = generate_test_rasters(2, None).unwrap();
+        let raster_args = ColumnarValue::Array(Arc::new(rasters));
+
+        // Build a custom item_crs type (standard item type without static CRS)
+        let item_field = WKB_GEOMETRY.to_storage_field("item", true).unwrap();
+        let crs_field = Field::new("crs", DataType::Utf8View, true);
+        let geom_type = SedonaType::Arrow(DataType::Struct(vec![item_field, 
crs_field].into()));
+
+        let geom_array = create_array_item_crs(
+            &[Some("POINT (0 0)"), Some("POINT (1 1)")],
+            [Some("EPSG:4326"), None],
+            &WKB_GEOMETRY,
+        );
+        let geom_args = ColumnarValue::Array(geom_array);
+
+        let args = [raster_args, geom_args];
+        let arg_types = vec![RASTER, geom_type];
+        let executor = RasterExecutor::new(&arg_types, &args);
+
+        let expected_crs = deserialize_crs("EPSG:4326").unwrap().unwrap();
+        let mut out_crs_matches: Vec<Option<bool>> = 
Vec::with_capacity(executor.num_iterations());
+        executor
+            .execute_raster_wkb_crs_void(|_raster, _wkb, crs| {
+                out_crs_matches.push(crs.map(|c| 
c.crs_equals(expected_crs.as_ref())));
+                Ok(())
+            })
+            .unwrap();
+
+        assert_eq!(out_crs_matches, vec![Some(true), None]);
+    }
+
+    #[test]
+    fn 
test_raster_executor_execute_raster_wkb_crs_void_item_crs_static_fallback() {
+        let rasters = generate_test_rasters(1, None).unwrap();
+        let raster_struct = 
rasters.as_any().downcast_ref::<StructArray>().unwrap();
+        let scalar_raster = 
ScalarValue::Struct(Arc::new(raster_struct.clone()));
+
+        let item_type_with_crs = SedonaType::Wkb(Edges::Planar, lnglat());
+        let item_field = item_type_with_crs.to_storage_field("item", 
true).unwrap();
+        let crs_field = Field::new("crs", DataType::Utf8View, true);
+        let geom_type = SedonaType::Arrow(DataType::Struct(vec![item_field, 
crs_field].into()));
+
+        let geom_array = create_array_item_crs(
+            &[Some("POINT (0 0)"), Some("POINT (1 1)")],
+            [None, None],
+            &item_type_with_crs,
+        );
+        let geom_args = ColumnarValue::Array(geom_array);
+
+        let args = [ColumnarValue::Scalar(scalar_raster), geom_args];
+        let arg_types = vec![RASTER, geom_type];
+        let executor = RasterExecutor::new(&arg_types, &args);
+        assert_eq!(executor.num_iterations(), 2);
+
+        let expected_crs = deserialize_crs("EPSG:4326").unwrap().unwrap();
+        let mut out_crs_matches: Vec<Option<bool>> = 
Vec::with_capacity(executor.num_iterations());
+        executor
+            .execute_raster_wkb_crs_void(|_raster, _wkb, crs| {
+                out_crs_matches.push(crs.map(|c| 
c.crs_equals(expected_crs.as_ref())));
+                Ok(())
+            })
+            .unwrap();
+
+        assert_eq!(out_crs_matches, vec![Some(true), Some(true)]);
+    }
+
+    #[test]
+    fn 
test_raster_executor_execute_raster_wkb_crs_void_item_crs_conflict_errors() {
+        let rasters = generate_test_rasters(1, None).unwrap();
+        let raster_args = ColumnarValue::Array(Arc::new(rasters));
+
+        let item_type_with_crs = SedonaType::Wkb(Edges::Planar, lnglat());
+        let item_field = item_type_with_crs.to_storage_field("item", 
true).unwrap();
+        let crs_field = Field::new("crs", DataType::Utf8View, true);
+        let geom_type = SedonaType::Arrow(DataType::Struct(vec![item_field, 
crs_field].into()));
+
+        let geom_array = create_array_item_crs(
+            &[Some("POINT (0 0)")],
+            [Some("EPSG:3857")],
+            &item_type_with_crs,
+        );
+        let geom_args = ColumnarValue::Array(geom_array);
+
+        let args = [raster_args, geom_args];
+        let arg_types = vec![RASTER, geom_type];
+        let executor = RasterExecutor::new(&arg_types, &args);
+
+        let err = executor
+            .execute_raster_wkb_crs_void(|_raster, _wkb, _crs| Ok(()))
+            .unwrap_err();
+        assert!(err.message().contains("CRS values not equal"));
+    }
+
+    #[test]
+    fn test_raster_executor_execute_raster_wkb_crs_void_wkb_view() {
+        let rasters = generate_test_rasters(1, None).unwrap();
+        let raster_args = ColumnarValue::Array(Arc::new(rasters));
+
+        let geom_array = create_array(&[Some("POINT (0 0)")], 
&WKB_VIEW_GEOMETRY);
+        let geom_args = ColumnarValue::Array(geom_array);
+
+        let args = [raster_args, geom_args];
+        let arg_types = vec![RASTER, WKB_VIEW_GEOMETRY];
+        let executor = RasterExecutor::new(&arg_types, &args);
+
+        executor
+            .execute_raster_wkb_crs_void(|_raster, wkb, crs| {
+                assert!(wkb.is_some());
+                assert!(crs.is_none());
+                Ok(())
+            })
+            .unwrap();
+    }
+
+    #[test]
+    fn test_raster_executor_new_with_num_iterations_scalar_args() {
+        let rasters = generate_test_rasters(1, None).unwrap();
+        let raster_struct = 
rasters.as_any().downcast_ref::<StructArray>().unwrap();
+        let scalar_raster = 
ScalarValue::Struct(Arc::new(raster_struct.clone()));
+        let args = [ColumnarValue::Scalar(scalar_raster)];
+        let arg_types = vec![RASTER];
+
+        let executor = RasterExecutor::new_with_num_iterations(&arg_types, 
&args, 10);
+        assert_eq!(executor.num_iterations(), 10);
+    }
 }
diff --git a/rust/sedona-raster-functions/src/lib.rs 
b/rust/sedona-raster-functions/src/lib.rs
index 6a3a9ff3..592a0b4c 100644
--- a/rust/sedona-raster-functions/src/lib.rs
+++ b/rust/sedona-raster-functions/src/lib.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+pub mod crs_utils;
 mod executor;
 pub mod register;
 pub mod rs_band_accessors;
@@ -29,5 +30,6 @@ pub mod rs_pixel_functions;
 pub mod rs_rastercoordinate;
 pub mod rs_setsrid;
 pub mod rs_size;
+pub mod rs_spatial_predicates;
 pub mod rs_srid;
 pub mod rs_worldcoordinate;
diff --git a/rust/sedona-raster-functions/src/register.rs 
b/rust/sedona-raster-functions/src/register.rs
index 1e88945c..e86fe8e9 100644
--- a/rust/sedona-raster-functions/src/register.rs
+++ b/rust/sedona-raster-functions/src/register.rs
@@ -65,6 +65,9 @@ pub fn default_function_set() -> FunctionSet {
         crate::rs_setsrid::rs_set_srid_udf,
         crate::rs_srid::rs_crs_udf,
         crate::rs_srid::rs_srid_udf,
+        crate::rs_spatial_predicates::rs_contains_udf,
+        crate::rs_spatial_predicates::rs_intersects_udf,
+        crate::rs_spatial_predicates::rs_within_udf,
         crate::rs_worldcoordinate::rs_rastertoworldcoord_udf,
         crate::rs_worldcoordinate::rs_rastertoworldcoordx_udf,
         crate::rs_worldcoordinate::rs_rastertoworldcoordy_udf,
diff --git a/rust/sedona-raster-functions/src/rs_spatial_predicates.rs 
b/rust/sedona-raster-functions/src/rs_spatial_predicates.rs
new file mode 100644
index 00000000..b0eaa057
--- /dev/null
+++ b/rust/sedona-raster-functions/src/rs_spatial_predicates.rs
@@ -0,0 +1,745 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! RS_Intersects, RS_Contains, RS_Within functions
+//!
+//! These functions test spatial relationships between rasters and geometries.
+//! Each function supports three overloads: (raster, geometry), (geometry, 
raster),
+//! and (raster, raster). Rasters are compared via their convex hulls.
+//!
+//! CRS transformation rules:
+//! - If neither side has a CRS, the comparison is performed directly without 
CRS transformation
+//! - If one side has a CRS but the other does not, an error is returned
+//! - If both sides are in the same CRS, perform the relationship test directly
+//! - For raster/geometry pairs, the geometry is transformed into the raster's 
CRS
+//! - For raster/raster pairs, the second raster is transformed into the 
first's CRS
+//! - If the preferred transformation fails, both sides are transformed to 
WGS84 as a fallback
+
+use std::sync::Arc;
+
+use crate::crs_utils::crs_transform_wkb;
+use crate::crs_utils::resolve_crs;
+use crate::executor::RasterExecutor;
+use arrow_array::builder::BooleanBuilder;
+use arrow_schema::DataType;
+use datafusion_common::exec_datafusion_err;
+use datafusion_common::exec_err;
+use datafusion_common::DataFusionError;
+use datafusion_common::Result;
+use datafusion_expr::{ColumnarValue, Volatility};
+use sedona_expr::scalar_udf::{SedonaScalarKernel, SedonaScalarUDF};
+use sedona_geometry::transform::CrsEngine;
+use sedona_geometry::wkb_factory::write_wkb_polygon;
+use sedona_proj::transform::with_global_proj_engine;
+use sedona_raster::affine_transformation::to_world_coordinate;
+use sedona_raster::traits::RasterRef;
+use sedona_schema::crs::{lnglat, CrsRef};
+use sedona_schema::{datatypes::SedonaType, matchers::ArgMatcher};
+use sedona_tg::tg;
+
+/// RS_Intersects() scalar UDF
+///
+/// Returns true if the extents of the two arguments intersect. Supports
+/// (raster, geometry), (geometry, raster), and (raster, raster) overloads.
+/// Rasters are compared via their convex hulls.
+pub fn rs_intersects_udf() -> SedonaScalarUDF {
+    SedonaScalarUDF::new(
+        "rs_intersects",
+        vec![
+            Arc::new(RsSpatialPredicate::<tg::Intersects>::raster_geom()),
+            Arc::new(RsSpatialPredicate::<tg::Intersects>::geom_raster()),
+            Arc::new(RsSpatialPredicate::<tg::Intersects>::raster_raster()),
+        ],
+        Volatility::Immutable,
+    )
+}
+
+/// RS_Contains() scalar UDF
+///
+/// Returns true if the first argument's extent completely contains the second.
+/// Supports (raster, geometry), (geometry, raster), and (raster, raster) 
overloads.
+/// Rasters are compared via their convex hulls.
+pub fn rs_contains_udf() -> SedonaScalarUDF {
+    SedonaScalarUDF::new(
+        "rs_contains",
+        vec![
+            Arc::new(RsSpatialPredicate::<tg::Contains>::raster_geom()),
+            Arc::new(RsSpatialPredicate::<tg::Contains>::geom_raster()),
+            Arc::new(RsSpatialPredicate::<tg::Contains>::raster_raster()),
+        ],
+        Volatility::Immutable,
+    )
+}
+
+/// RS_Within() scalar UDF
+///
+/// Returns true if the first argument's extent is completely within the 
second.
+/// Supports (raster, geometry), (geometry, raster), and (raster, raster) 
overloads.
+/// Rasters are compared via their convex hulls.
+pub fn rs_within_udf() -> SedonaScalarUDF {
+    SedonaScalarUDF::new(
+        "rs_within",
+        vec![
+            Arc::new(RsSpatialPredicate::<tg::Within>::raster_geom()),
+            Arc::new(RsSpatialPredicate::<tg::Within>::geom_raster()),
+            Arc::new(RsSpatialPredicate::<tg::Within>::raster_raster()),
+        ],
+        Volatility::Immutable,
+    )
+}
+
+/// Argument order for the spatial predicate
+#[derive(Debug, Clone, Copy)]
+enum ArgOrder {
+    /// First arg is raster, second is geometry
+    RasterGeom,
+    /// First arg is geometry, second is raster
+    GeomRaster,
+    /// Both args are rasters
+    RasterRaster,
+}
+
+#[derive(Debug)]
+struct RsSpatialPredicate<Op: tg::BinaryPredicate> {
+    arg_order: ArgOrder,
+    _op: std::marker::PhantomData<Op>,
+}
+
+impl<Op: tg::BinaryPredicate> RsSpatialPredicate<Op> {
+    fn raster_geom() -> Self {
+        Self {
+            arg_order: ArgOrder::RasterGeom,
+            _op: std::marker::PhantomData,
+        }
+    }
+
+    fn geom_raster() -> Self {
+        Self {
+            arg_order: ArgOrder::GeomRaster,
+            _op: std::marker::PhantomData,
+        }
+    }
+
+    fn raster_raster() -> Self {
+        Self {
+            arg_order: ArgOrder::RasterRaster,
+            _op: std::marker::PhantomData,
+        }
+    }
+}
+
+impl<Op: tg::BinaryPredicate + Send + Sync> SedonaScalarKernel for 
RsSpatialPredicate<Op> {
+    fn return_type(&self, args: &[SedonaType]) -> Result<Option<SedonaType>> {
+        let matcher = match self.arg_order {
+            ArgOrder::RasterGeom => ArgMatcher::new(
+                vec![ArgMatcher::is_raster(), ArgMatcher::is_geometry()],
+                SedonaType::Arrow(DataType::Boolean),
+            ),
+            ArgOrder::GeomRaster => ArgMatcher::new(
+                vec![ArgMatcher::is_geometry(), ArgMatcher::is_raster()],
+                SedonaType::Arrow(DataType::Boolean),
+            ),
+            ArgOrder::RasterRaster => ArgMatcher::new(
+                vec![ArgMatcher::is_raster(), ArgMatcher::is_raster()],
+                SedonaType::Arrow(DataType::Boolean),
+            ),
+        };
+
+        matcher.match_args(args)
+    }
+
+    fn invoke_batch(
+        &self,
+        arg_types: &[SedonaType],
+        args: &[ColumnarValue],
+    ) -> Result<ColumnarValue> {
+        match self.arg_order {
+            ArgOrder::RasterGeom => self.invoke_raster_geom(arg_types, args),
+            ArgOrder::GeomRaster => self.invoke_geom_raster(arg_types, args),
+            ArgOrder::RasterRaster => self.invoke_raster_raster(arg_types, 
args),
+        }
+    }
+}
+
+impl<Op: tg::BinaryPredicate + Send + Sync> RsSpatialPredicate<Op> {
+    /// Invoke RS_<Predicate>(raster, geometry)
+    fn invoke_raster_geom(
+        &self,
+        arg_types: &[SedonaType],
+        args: &[ColumnarValue],
+    ) -> Result<ColumnarValue> {
+        // Ensure executor always sees (raster, geom)
+        let exec_arg_types = vec![arg_types[0].clone(), arg_types[1].clone()];
+        let exec_args = vec![args[0].clone(), args[1].clone()];
+        let executor = RasterExecutor::new(&exec_arg_types, &exec_args);
+        let mut builder = 
BooleanBuilder::with_capacity(executor.num_iterations());
+        let mut raster_wkb = Vec::with_capacity(CONVEXHULL_WKB_SIZE);
+
+        with_global_proj_engine(|engine| {
+            executor.execute_raster_wkb_crs_void(|raster_opt, maybe_wkb, 
geom_crs| {
+                match (raster_opt, maybe_wkb) {
+                    (Some(raster), Some(geom_wkb)) => {
+                        raster_wkb.clear();
+                        write_convexhull_wkb(raster, &mut raster_wkb)?;
+
+                        let raster_crs = resolve_crs(raster.crs())?;
+                        let result = evaluate_predicate_with_crs::<Op>(
+                            &raster_wkb,
+                            raster_crs.as_deref(),
+                            geom_wkb,
+                            geom_crs,
+                            false,
+                            engine,
+                        )?;
+                        builder.append_value(result);
+                    }
+                    _ => builder.append_null(),
+                }
+                Ok(())
+            })
+        })?;
+
+        executor.finish(Arc::new(builder.finish()))
+    }
+
+    /// Invoke RS_<Predicate>(geometry, raster)
+    fn invoke_geom_raster(
+        &self,
+        arg_types: &[SedonaType],
+        args: &[ColumnarValue],
+    ) -> Result<ColumnarValue> {
+        // Reorder so executor always sees (raster, geom)
+        let exec_arg_types = vec![arg_types[1].clone(), arg_types[0].clone()];
+        let exec_args = vec![args[1].clone(), args[0].clone()];
+        let executor = RasterExecutor::new(&exec_arg_types, &exec_args);
+        let mut builder = 
BooleanBuilder::with_capacity(executor.num_iterations());
+        let mut raster_wkb = Vec::with_capacity(CONVEXHULL_WKB_SIZE);
+
+        with_global_proj_engine(|engine| {
+            executor.execute_raster_wkb_crs_void(|raster_opt, maybe_wkb, 
geom_crs| {
+                match (raster_opt, maybe_wkb) {
+                    (Some(raster), Some(geom_wkb)) => {
+                        raster_wkb.clear();
+                        write_convexhull_wkb(raster, &mut raster_wkb)?;
+
+                        let raster_crs = resolve_crs(raster.crs())?;
+                        // Note: order is geometry, raster for the predicate
+                        let result = evaluate_predicate_with_crs::<Op>(
+                            geom_wkb,
+                            geom_crs,
+                            &raster_wkb,
+                            raster_crs.as_deref(),
+                            true,
+                            engine,
+                        )?;
+                        builder.append_value(result);
+                    }
+                    _ => builder.append_null(),
+                }
+                Ok(())
+            })
+        })?;
+
+        executor.finish(Arc::new(builder.finish()))
+    }
+
+    /// Invoke RS_<Predicate>(raster1, raster2)
+    fn invoke_raster_raster(
+        &self,
+        arg_types: &[SedonaType],
+        args: &[ColumnarValue],
+    ) -> Result<ColumnarValue> {
+        // Ensure executor always sees (raster, raster)
+        let exec_arg_types = vec![arg_types[0].clone(), arg_types[1].clone()];
+        let exec_args = vec![args[0].clone(), args[1].clone()];
+        let executor = RasterExecutor::new(&exec_arg_types, &exec_args);
+        let mut builder = 
BooleanBuilder::with_capacity(executor.num_iterations());
+        let mut wkb0 = Vec::with_capacity(CONVEXHULL_WKB_SIZE);
+        let mut wkb1 = Vec::with_capacity(CONVEXHULL_WKB_SIZE);
+
+        with_global_proj_engine(|engine| {
+            executor.execute_raster_raster_void(|_i, r0_opt, r1_opt| {
+                match (r0_opt, r1_opt) {
+                    (Some(r0), Some(r1)) => {
+                        wkb0.clear();
+                        wkb1.clear();
+                        write_convexhull_wkb(r0, &mut wkb0)?;
+                        write_convexhull_wkb(r1, &mut wkb1)?;
+
+                        let crs0 = resolve_crs(r0.crs())?;
+                        let crs1 = resolve_crs(r1.crs())?;
+                        let result = evaluate_predicate_with_crs::<Op>(
+                            &wkb0,
+                            crs0.as_deref(),
+                            &wkb1,
+                            crs1.as_deref(),
+                            false,
+                            engine,
+                        )?;
+                        builder.append_value(result);
+                    }
+                    _ => builder.append_null(),
+                }
+                Ok(())
+            })
+        })?;
+
+        executor.finish(Arc::new(builder.finish()))
+    }
+}
+
+/// Evaluate a spatial predicate with CRS handling
+///
+/// Rules:
+/// - If neither side has a CRS, compare directly without transformation
+/// - If one side has a CRS but the other does not, return an error
+/// - If both same CRS, compare directly
+/// - Otherwise, try transforming one side to the other's CRS for comparison.
+///   If that fails, transform both to WGS84 and compare.
+fn evaluate_predicate_with_crs<Op: tg::BinaryPredicate>(
+    wkb_a: &[u8],
+    crs_a: CrsRef<'_>,
+    wkb_b: &[u8],
+    crs_b: CrsRef<'_>,
+    from_a_to_b: bool,
+    engine: &dyn CrsEngine,
+) -> Result<bool> {
+    // If either side has no CRS, compare directly without transformation.
+    let (crs_a, crs_b) = match (crs_a, crs_b) {
+        (Some(a), Some(b)) => (a, b),
+        (None, None) => return evaluate_predicate::<Op>(wkb_a, wkb_b),
+        (Some(_), None) => {
+            return exec_err!(
+                "Cannot evaluate spatial predicate: \
+                left geometry has CRS but right geometry does not"
+            )
+        }
+        (None, Some(_)) => {
+            return exec_err!(
+                "Cannot evaluate spatial predicate: \
+                right geometry has CRS but left geometry does not"
+            )
+        }
+    };
+
+    // If both CRSes are equal, compare directly.
+    if crs_a.crs_equals(crs_b) {
+        return evaluate_predicate::<Op>(wkb_a, wkb_b);
+    }
+
+    // Try preferred transformation direction.
+    if from_a_to_b {
+        if let Ok(wkb_a) = crs_transform_wkb(wkb_a, crs_a, crs_b, engine) {
+            return evaluate_predicate::<Op>(&wkb_a, wkb_b);
+        }
+    } else if let Ok(wkb_b) = crs_transform_wkb(wkb_b, crs_b, crs_a, engine) {
+        return evaluate_predicate::<Op>(wkb_a, &wkb_b);
+    }
+
+    // Fallback: transform both sides to WGS84 for comparison.
+    let lnglat_crs = lnglat().expect("lnglat() should always return Some");
+    let wkb_a = crs_transform_wkb(wkb_a, crs_a, lnglat_crs.as_ref(), engine)?;
+    let wkb_b = crs_transform_wkb(wkb_b, crs_b, lnglat_crs.as_ref(), engine)?;
+    evaluate_predicate::<Op>(&wkb_a, &wkb_b)
+}
+
+/// Evaluate a spatial predicate between two WKB geometries
+fn evaluate_predicate<Op: tg::BinaryPredicate>(wkb_a: &[u8], wkb_b: &[u8]) -> 
Result<bool> {
+    let geom_a = tg::Geom::parse_wkb(wkb_a, tg::IndexType::Default)
+        .map_err(|e| exec_datafusion_err!("Failed to parse WKB A: {e}"))?;
+    let geom_b = tg::Geom::parse_wkb(wkb_b, tg::IndexType::Default)
+        .map_err(|e| exec_datafusion_err!("Failed to parse WKB B: {e}"))?;
+
+    Ok(Op::evaluate(&geom_a, &geom_b))
+}
+
+/// Exact WKB byte size for a 2D polygon with 1 ring of 5 points (the raster 
convex hull).
+///
+/// Layout: polygon header (1 byte order + 4 type + 4 ring count)
+///       + ring header (4 point count)
+///       + 5 points × 2 coordinates × 8 bytes
+///       = 9 + 4 + 80 = 93
+const CONVEXHULL_WKB_SIZE: usize = 93;
+
+/// Create WKB for a convex hull polygon for the raster
+fn write_convexhull_wkb(raster: &dyn RasterRef, out: &mut impl std::io::Write) 
-> Result<()> {
+    let width = raster.metadata().width() as i64;
+    let height = raster.metadata().height() as i64;
+
+    let (ulx, uly) = to_world_coordinate(raster, 0, 0);
+    let (urx, ury) = to_world_coordinate(raster, width, 0);
+    let (lrx, lry) = to_world_coordinate(raster, width, height);
+    let (llx, lly) = to_world_coordinate(raster, 0, height);
+
+    write_wkb_polygon(
+        out,
+        [(ulx, uly), (urx, ury), (lrx, lry), (llx, lly), (ulx, 
uly)].into_iter(),
+    )
+    .map_err(|e| DataFusionError::External(e.into()))?;
+
+    Ok(())
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use arrow_array::{create_array, ArrayRef};
+    use datafusion_expr::ScalarUDF;
+    use rstest::rstest;
+    use sedona_raster::builder::RasterBuilder;
+    use sedona_raster::traits::{BandMetadata, RasterMetadata};
+    use sedona_schema::crs::deserialize_crs;
+    use sedona_schema::crs::OGC_CRS84_PROJJSON;
+    use sedona_schema::datatypes::Edges;
+    use sedona_schema::datatypes::RASTER;
+    use sedona_schema::datatypes::WKB_GEOMETRY;
+    use sedona_schema::raster::{BandDataType, StorageType};
+    use sedona_testing::compare::assert_array_equal;
+    use sedona_testing::create::create_array as create_geom_array;
+    use sedona_testing::rasters::generate_test_rasters;
+    use sedona_testing::testers::ScalarUdfTester;
+
+    /// Transform a coordinate from one CRS to another, using the provided CRS 
engine.
+    fn crs_transform_coord(
+        coord: (f64, f64),
+        from_crs: &str,
+        to_crs: &str,
+        engine: &dyn CrsEngine,
+    ) -> Result<(f64, f64)> {
+        let trans = engine
+            .get_transform_crs_to_crs(from_crs, to_crs, None, "")
+            .map_err(|e| DataFusionError::External(Box::new(e)))?;
+        let mut coord = coord;
+        trans
+            .transform_coord(&mut coord)
+            .map_err(|e| DataFusionError::External(Box::new(e)))?;
+        Ok(coord)
+    }
+
+    /// Build a 1×1 raster whose convex hull covers (0,0) to (1,1).
+    ///
+    /// If `crs` is `None`, the raster has no CRS.
+    fn build_unit_raster(crs: Option<&str>) -> arrow_array::StructArray {
+        let mut builder = RasterBuilder::new(1);
+        let metadata = RasterMetadata {
+            width: 1,
+            height: 1,
+            upperleft_x: 0.0,
+            upperleft_y: 1.0,
+            scale_x: 1.0,
+            scale_y: -1.0,
+            skew_x: 0.0,
+            skew_y: 0.0,
+        };
+        builder.start_raster(&metadata, crs).unwrap();
+        builder
+            .start_band(BandMetadata {
+                datatype: BandDataType::UInt8,
+                nodata_value: None,
+                storage_type: StorageType::InDb,
+                outdb_url: None,
+                outdb_band_id: None,
+            })
+            .unwrap();
+        builder.band_data_writer().append_value([0u8]);
+        builder.finish_band().unwrap();
+        builder.finish_raster().unwrap();
+        builder.finish().unwrap()
+    }
+
+    #[test]
+    fn rs_intersects_udf_docs() {
+        let udf: ScalarUDF = rs_intersects_udf().into();
+        assert_eq!(udf.name(), "rs_intersects");
+    }
+
+    #[test]
+    fn rs_contains_udf_docs() {
+        let udf: ScalarUDF = rs_contains_udf().into();
+        assert_eq!(udf.name(), "rs_contains");
+    }
+
+    #[test]
+    fn rs_within_udf_docs() {
+        let udf: ScalarUDF = rs_within_udf().into();
+        assert_eq!(udf.name(), "rs_within");
+    }
+
+    #[rstest]
+    fn rs_intersects_raster_geom() {
+        let udf = rs_intersects_udf();
+        let geom_type = SedonaType::Wkb(Edges::Planar, lnglat());
+        let tester = ScalarUdfTester::new(udf.into(), vec![RASTER, 
geom_type.clone()]);
+
+        let rasters = generate_test_rasters(3, Some(0)).unwrap();
+
+        // Test rasters:
+        // Raster 1: corners at approximately (2.0, 3.0), (2.2, 3.08), (2.29, 
2.48), (2.09, 2.4)
+        // Raster 2: corners at approximately (3.0, 4.0), (3.6, 4.24), (3.84, 
2.64), (3.24, 2.4)
+
+        // Points that should intersect with raster 1 (approximately)
+        // Point inside raster 1
+        let geoms = create_geom_array(
+            &[
+                None,
+                Some("POINT (2.15 2.75)"), // Inside raster 1
+                Some("POINT (0.0 0.0)"),   // Outside all rasters
+            ],
+            &geom_type,
+        );
+
+        let expected: ArrayRef = create_array!(Boolean, [None, Some(true), 
Some(false)]);
+
+        let result = tester
+            .invoke_arrays(vec![Arc::new(rasters), geoms])
+            .unwrap();
+
+        assert_array_equal(&result, &expected);
+    }
+
+    #[rstest]
+    fn rs_intersects_raster_geom_crs_mismatch() {
+        let udf = rs_intersects_udf();
+        let geom_type = SedonaType::Wkb(Edges::Planar, 
deserialize_crs("EPSG:3857").unwrap());
+        let tester = ScalarUdfTester::new(udf.into(), vec![RASTER, 
geom_type.clone()]);
+
+        let rasters = generate_test_rasters(3, Some(0)).unwrap();
+        let (x, y) = with_global_proj_engine(|engine| {
+            crs_transform_coord((2.15, 2.75), "OGC:CRS84", "EPSG:3857", engine)
+        })
+        .unwrap();
+        let point_3857 = format!("POINT ({} {})", x, y);
+        let wkt_values: [Option<&str>; 3] = [None, Some(point_3857.as_str()), 
Some("POINT (0 0)")];
+
+        let geoms = create_geom_array(&wkt_values, &geom_type);
+
+        let expected: ArrayRef = create_array!(Boolean, [None, Some(true), 
Some(false)]);
+
+        let result = tester
+            .invoke_arrays(vec![Arc::new(rasters), geoms])
+            .unwrap();
+
+        assert_array_equal(&result, &expected);
+    }
+
+    #[test]
+    fn rs_intersects_raster_geom_projjson_crs() {
+        // Use an authority code at the geometry type-level so we don't need 
PROJ for this test.
+        // The raster side exercises PROJJSON CRS deserialization.
+        let geom_type = SedonaType::Wkb(Edges::Planar, 
deserialize_crs("EPSG:4326").unwrap());
+
+        let udf = rs_intersects_udf();
+        let tester = ScalarUdfTester::new(udf.into(), vec![RASTER, 
geom_type.clone()]);
+
+        let rasters = build_unit_raster(Some(OGC_CRS84_PROJJSON));
+
+        let geoms = create_geom_array(&[Some("POINT (0.5 0.5)")], &geom_type);
+        let expected: ArrayRef = create_array!(Boolean, [Some(true)]);
+        let result = tester
+            .invoke_arrays(vec![Arc::new(rasters), geoms])
+            .unwrap();
+        assert_array_equal(&result, &expected);
+    }
+
+    #[rstest]
+    fn rs_contains_raster_geom() {
+        let udf = rs_contains_udf();
+        let geom_type = SedonaType::Wkb(Edges::Planar, lnglat());
+        let tester = ScalarUdfTester::new(udf.into(), vec![RASTER, 
geom_type.clone()]);
+
+        let rasters = generate_test_rasters(3, Some(0)).unwrap();
+
+        // Point inside raster 1 should be contained
+        let geoms = create_geom_array(
+            &[
+                None,
+                Some("POINT (2.15 2.75)"), // Inside raster 1
+                Some("POINT (0.0 0.0)"),   // Outside all rasters
+            ],
+            &geom_type,
+        );
+
+        let expected: ArrayRef = create_array!(Boolean, [None, Some(true), 
Some(false)]);
+
+        let result = tester
+            .invoke_arrays(vec![Arc::new(rasters), geoms])
+            .unwrap();
+
+        assert_array_equal(&result, &expected);
+    }
+
+    #[rstest]
+    fn rs_within_raster_geom() {
+        let udf = rs_within_udf();
+        let geom_type = SedonaType::Wkb(Edges::Planar, lnglat());
+        let tester = ScalarUdfTester::new(udf.into(), vec![RASTER, 
geom_type.clone()]);
+
+        let rasters = generate_test_rasters(3, Some(0)).unwrap();
+
+        // Test rasters:
+        // Raster 1: corners at approximately (2.0, 3.0), (2.2, 3.08), (2.29, 
2.48), (2.09, 2.4)
+
+        // Large polygon that contains raster 1
+        let geoms = create_geom_array(
+            &[
+                None,
+                Some("POLYGON ((0 0, 10 0, 10 10, 0 10, 0 0))"), // Contains 
raster 1
+                Some("POLYGON ((0 0, 0.1 0, 0.1 0.1, 0 0.1, 0 0))"), // Does 
not contain raster 2
+            ],
+            &geom_type,
+        );
+
+        let expected: ArrayRef = create_array!(Boolean, [None, Some(true), 
Some(false)]);
+
+        let result = tester
+            .invoke_arrays(vec![Arc::new(rasters), geoms])
+            .unwrap();
+
+        assert_array_equal(&result, &expected);
+    }
+
+    #[rstest]
+    fn rs_intersects_geom_raster() {
+        let udf = rs_intersects_udf();
+        let geom_type = SedonaType::Wkb(Edges::Planar, lnglat());
+        let tester = ScalarUdfTester::new(udf.into(), vec![geom_type.clone(), 
RASTER]);
+
+        let rasters = generate_test_rasters(3, Some(0)).unwrap();
+
+        // Test with geometry as first argument
+        let geoms = create_geom_array(
+            &[
+                None,
+                Some("POINT (2.15 2.75)"), // Inside raster 1
+                Some("POINT (0.0 0.0)"),   // Outside all rasters
+            ],
+            &geom_type,
+        );
+
+        let expected: ArrayRef = create_array!(Boolean, [None, Some(true), 
Some(false)]);
+
+        let result = tester
+            .invoke_arrays(vec![geoms, Arc::new(rasters)])
+            .unwrap();
+
+        assert_array_equal(&result, &expected);
+    }
+
+    #[rstest]
+    fn rs_intersects_raster_raster() {
+        let udf = rs_intersects_udf();
+        let tester = ScalarUdfTester::new(udf.into(), vec![RASTER, RASTER]);
+
+        let rasters1 = generate_test_rasters(3, Some(0)).unwrap();
+        let rasters2 = generate_test_rasters(3, Some(0)).unwrap();
+
+        // Same rasters should intersect with themselves
+        let expected: ArrayRef = create_array!(Boolean, [None, Some(true), 
Some(true)]);
+
+        let result = tester
+            .invoke_arrays(vec![Arc::new(rasters1), Arc::new(rasters2)])
+            .unwrap();
+
+        assert_array_equal(&result, &expected);
+    }
+
+    #[rstest]
+    fn rs_intersects_null_handling() {
+        let udf = rs_intersects_udf();
+        let tester = ScalarUdfTester::new(udf.into(), vec![RASTER, 
WKB_GEOMETRY]);
+
+        let rasters = generate_test_rasters(3, Some(0)).unwrap();
+
+        // Test with null geometry
+        let geoms = create_geom_array(&[None::<&str>, None::<&str>, 
None::<&str>], &WKB_GEOMETRY);
+
+        let expected: ArrayRef = create_array!(Boolean, [None, None, None]);
+
+        let result = tester
+            .invoke_arrays(vec![Arc::new(rasters), geoms])
+            .unwrap();
+
+        assert_array_equal(&result, &expected);
+    }
+
+    /// When neither the raster nor the geometry has a CRS, the comparison
+    /// should succeed (both sides are in an unknown/assumed-same CRS).
+    #[test]
+    fn rs_intersects_both_no_crs_succeeds() {
+        let udf = rs_intersects_udf();
+        let tester = ScalarUdfTester::new(udf.into(), vec![RASTER, 
WKB_GEOMETRY]);
+
+        // Raster covers (0,0)–(1,1) with no CRS
+        let rasters = build_unit_raster(None);
+        let geoms = create_geom_array(&[Some("POINT (0.5 0.5)")], 
&WKB_GEOMETRY);
+
+        let expected: ArrayRef = create_array!(Boolean, [Some(true)]);
+        let result = tester
+            .invoke_arrays(vec![Arc::new(rasters), geoms])
+            .unwrap();
+        assert_array_equal(&result, &expected);
+    }
+
+    /// When one side has a CRS but the other does not, an error must be 
returned.
+    ///
+    /// Covers three overloads:
+    /// - (raster with CRS, geometry without CRS) — raster_geom
+    /// - (geometry with CRS, raster without CRS) — geom_raster
+    /// - (raster with CRS, raster without CRS) — raster_raster
+    #[test]
+    fn rs_intersects_crs_mismatch_one_missing_errors() {
+        let rasters_with_crs = build_unit_raster(Some("OGC:CRS84"));
+        let rasters_no_crs = build_unit_raster(None);
+
+        // raster (has CRS) + geometry (no CRS)
+        let udf = rs_intersects_udf();
+        let tester = ScalarUdfTester::new(udf.clone().into(), vec![RASTER, 
WKB_GEOMETRY]);
+        let geoms = create_geom_array(&[Some("POINT (0.5 0.5)")], 
&WKB_GEOMETRY);
+        let err = tester
+            .invoke_arrays(vec![Arc::new(rasters_with_crs.clone()), geoms])
+            .unwrap_err();
+        assert!(
+            err.message().contains("has CRS but"),
+            "unexpected error: {err}"
+        );
+
+        // geometry (has CRS) + raster (no CRS)
+        let geom_type = SedonaType::Wkb(Edges::Planar, lnglat());
+        let tester = ScalarUdfTester::new(udf.clone().into(), 
vec![geom_type.clone(), RASTER]);
+        let geoms = create_geom_array(&[Some("POINT (0.5 0.5)")], &geom_type);
+        let err = tester
+            .invoke_arrays(vec![geoms, Arc::new(rasters_no_crs.clone())])
+            .unwrap_err();
+        assert!(
+            err.message().contains("has CRS but"),
+            "unexpected error: {err}"
+        );
+
+        // raster (has CRS) + raster (no CRS)
+        let tester = ScalarUdfTester::new(udf.into(), vec![RASTER, RASTER]);
+        let err = tester
+            .invoke_arrays(vec![
+                Arc::new(rasters_with_crs.clone()),
+                Arc::new(rasters_no_crs.clone()),
+            ])
+            .unwrap_err();
+        assert!(
+            err.message().contains("has CRS but"),
+            "unexpected error: {err}"
+        );
+    }
+}
diff --git a/rust/sedona-schema/src/crs.rs b/rust/sedona-schema/src/crs.rs
index e6ca93d8..fa24602c 100644
--- a/rust/sedona-schema/src/crs.rs
+++ b/rust/sedona-schema/src/crs.rs
@@ -269,6 +269,12 @@ pub fn lnglat() -> Crs {
 /// equality (for binary operators).
 pub type Crs = Option<Arc<dyn CoordinateReferenceSystem + Send + Sync>>;
 
+/// Borrowed form of [`Crs`] — a reference to a CRS trait object.
+///
+/// Returned by `Crs::as_deref()` and used in function signatures that only
+/// need to inspect a CRS without taking ownership.
+pub type CrsRef<'a> = Option<&'a (dyn CoordinateReferenceSystem + Send + 
Sync)>;
+
 impl Display for dyn CoordinateReferenceSystem + Send + Sync {
     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
         if let Ok(Some(auth_code)) = self.to_authority_code() {


Reply via email to