Kontinuation commented on code in PR #645:
URL: https://github.com/apache/sedona-db/pull/645#discussion_r2831569992
##########
rust/sedona-spatial-join/src/index/spatial_index.rs:
##########
@@ -15,1989 +15,84 @@
// specific language governing permissions and limitations
// under the License.
-use std::{
- ops::Range,
- sync::{
- atomic::{AtomicUsize, Ordering},
- Arc,
- },
-};
-
+use crate::evaluated_batch::EvaluatedBatch;
+use crate::index::QueryResultMetrics;
+use arrow::array::BooleanBufferBuilder;
use arrow_array::RecordBatch;
use arrow_schema::SchemaRef;
-use datafusion_common::{DataFusionError, Result};
-use datafusion_common_runtime::JoinSet;
-use float_next_after::NextAfter;
-use geo::BoundingRect;
-use geo_index::rtree::{
- distance::{DistanceMetric, GeometryAccessor},
- util::f64_box_to_f32,
-};
-use geo_index::rtree::{sort::HilbertSort, RTree, RTreeBuilder, RTreeIndex};
-use geo_index::IndexableNum;
+use async_trait::async_trait;
+use datafusion_common::Result;
use geo_types::Rect;
use parking_lot::Mutex;
+use sedona_common::ExecutionMode;
use sedona_expr::statistics::GeoStatistics;
-use sedona_geo::to_geo::item_to_geometry;
+use std::ops::Range;
+use std::sync::Arc;
use wkb::reader::Wkb;
-use crate::{
- evaluated_batch::EvaluatedBatch,
- index::{
- knn_adapter::{KnnComponents, SedonaKnnAdapter},
- IndexQueryResult, QueryResultMetrics,
- },
- operand_evaluator::{create_operand_evaluator, distance_value_at,
OperandEvaluator},
- refine::{create_refiner, IndexQueryResultRefiner},
- spatial_predicate::SpatialPredicate,
-};
-use arrow::array::BooleanBufferBuilder;
-use sedona_common::{option::SpatialJoinOptions, sedona_internal_err,
ExecutionMode};
-
pub const DISTANCE_TOLERANCE: f64 = 1e-9;
-pub struct SpatialIndex {
- pub(crate) schema: SchemaRef,
- pub(crate) options: SpatialJoinOptions,
-
- /// The spatial predicate evaluator for the spatial predicate.
- pub(crate) evaluator: Arc<dyn OperandEvaluator>,
-
- /// The refiner for refining the index query results.
- pub(crate) refiner: Arc<dyn IndexQueryResultRefiner>,
-
- /// R-tree index for the geometry batches. It takes MBRs as query windows
and returns
- /// data indexes. These data indexes should be translated using
`data_id_to_batch_pos` to get
- /// the original geometry batch index and row index, or translated using
`prepared_geom_idx_vec`
- /// to get the prepared geometries array index.
- pub(crate) rtree: RTree<f32>,
-
- /// Indexed batches containing evaluated geometry arrays. It contains the
original record
- /// batches and geometry arrays obtained by evaluating the geometry
expression on the build side.
- pub(crate) indexed_batches: Vec<EvaluatedBatch>,
- /// An array for translating rtree data index to geometry batch index and
row index
- pub(crate) data_id_to_batch_pos: Vec<(i32, i32)>,
-
- /// An array for translating rtree data index to consecutive index. Each
geometry may be indexed by
- /// multiple boxes, so there could be multiple data indexes for the same
geometry. A mapping for
- /// squashing the index makes it easier for persisting per-geometry
auxiliary data for evaluating
- /// the spatial predicate. This is extensively used by the spatial
predicate evaluators for storing
- /// prepared geometries.
- pub(crate) geom_idx_vec: Vec<usize>,
-
- /// Shared bitmap builders for visited build side indices, one per batch
- pub(crate) visited_build_side: Option<Mutex<Vec<BooleanBufferBuilder>>>,
-
- /// Counter of running probe-threads, potentially able to update `bitmap`.
- /// Each time a probe thread finished probing the index, it will decrement
the counter.
- /// The last finished probe thread will produce the extra output batches
for unmatched
- /// build side when running left-outer joins. See also
[`report_probe_completed`].
- pub(crate) probe_threads_counter: AtomicUsize,
-
- /// Shared KNN components (distance metrics and geometry cache) for
efficient KNN queries
- pub(crate) knn_components: Option<KnnComponents>,
-}
-
-impl SpatialIndex {
- pub fn empty(
- spatial_predicate: SpatialPredicate,
- schema: SchemaRef,
- options: SpatialJoinOptions,
- probe_threads_counter: AtomicUsize,
- ) -> Self {
- let evaluator = create_operand_evaluator(&spatial_predicate,
options.clone());
- let refiner = create_refiner(
- options.spatial_library,
- &spatial_predicate,
- options.clone(),
- 0,
- GeoStatistics::empty(),
- );
- let rtree = RTreeBuilder::<f32>::new(0).finish::<HilbertSort>();
- let knn_components = matches!(spatial_predicate,
SpatialPredicate::KNearestNeighbors(_))
- .then(|| KnnComponents::new(0, &[]).unwrap());
- Self {
- schema,
- options,
- evaluator,
- refiner,
- rtree,
- data_id_to_batch_pos: Vec::new(),
- indexed_batches: Vec::new(),
- geom_idx_vec: Vec::new(),
- visited_build_side: None,
- probe_threads_counter,
- knn_components,
- }
- }
-
- pub(crate) fn schema(&self) -> SchemaRef {
- self.schema.clone()
- }
-
- /// Create a KNN geometry accessor for accessing geometries with caching
- fn create_knn_accessor(&self) -> Result<SedonaKnnAdapter<'_>> {
- let Some(knn_components) = self.knn_components.as_ref() else {
- return sedona_internal_err!("knn_components is not initialized
when running KNN join");
- };
- Ok(SedonaKnnAdapter::new(
- &self.indexed_batches,
- &self.data_id_to_batch_pos,
- knn_components,
- ))
- }
-
+/// The `SpatialIndex` trait defines the interface for spatial indexes used in
spatial join operations.
+/// It provides methods for querying the index with spatial predicates,
+/// as well as methods for managing probe statistics and tracking visited
build side batches.
+/// The trait is designed to be implemented by various spatial index structures
+#[async_trait]
+pub(crate) trait SpatialIndex {
+ /// Returns the schema of the indexed data.
+ fn schema(&self) -> SchemaRef;
+ /// Returns the number of batches that have been indexed.
+ #[allow(unused)] // This is used for tests
+ fn num_indexed_batches(&self) -> usize;
/// Get the batch at the given index.
- pub(crate) fn get_indexed_batch(&self, batch_idx: usize) -> &RecordBatch {
- &self.indexed_batches[batch_idx].batch
- }
-
+ fn get_indexed_batch(&self, batch_idx: usize) -> &RecordBatch;
/// Query the spatial index with a probe geometry to find matching
build-side geometries.
- ///
- /// This method implements a two-phase spatial join query:
- /// 1. **Filter phase**: Uses the R-tree index with the probe geometry's
bounding rectangle
- /// to quickly identify candidate geometries that might satisfy the
spatial predicate
- /// 2. **Refinement phase**: Evaluates the exact spatial predicate on
candidates to determine
- /// actual matches
- ///
- /// # Arguments
- /// * `probe_wkb` - The probe geometry in WKB format
- /// * `probe_rect` - The minimum bounding rectangle of the probe geometry
- /// * `distance` - Optional distance parameter for distance-based spatial
predicates
- /// * `build_batch_positions` - Output vector that will be populated with
(batch_idx, row_idx)
- /// pairs for each matching build-side geometry
- ///
- /// # Returns
- /// * `JoinResultMetrics` containing the number of actual matches
(`count`) and the number
- /// of candidates from the filter phase (`candidate_count`)
#[allow(unused)]
- pub(crate) fn query(
+ fn query(
&self,
probe_wkb: &Wkb,
probe_rect: &Rect<f32>,
distance: &Option<f64>,
build_batch_positions: &mut Vec<(i32, i32)>,
- ) -> Result<QueryResultMetrics> {
- let min = probe_rect.min();
- let max = probe_rect.max();
- let mut candidates = self.rtree.search(min.x, min.y, max.x, max.y);
- if candidates.is_empty() {
- return Ok(QueryResultMetrics {
- count: 0,
- candidate_count: 0,
- });
- }
-
- // Sort and dedup candidates to avoid duplicate results when we index
one geometry
- // using several boxes.
- candidates.sort_unstable();
- candidates.dedup();
-
- // Refine the candidates retrieved from the r-tree index by evaluating
the actual spatial predicate
- self.refine(probe_wkb, &candidates, distance, build_batch_positions)
- }
-
+ ) -> Result<QueryResultMetrics>;
/// Query the spatial index for k nearest neighbors of a given geometry.
- ///
- /// This method finds the k nearest neighbors to the probe geometry using:
- /// 1. R-tree's built-in neighbors() method for efficient KNN search
- /// 2. Distance refinement using actual geometry calculations
- /// 3. Tie-breaker handling when enabled
- ///
- /// # Arguments
- ///
- /// * `probe_wkb` - WKB representation of the probe geometry
- /// * `k` - Number of nearest neighbors to find
- /// * `use_spheroid` - Whether to use spheroid distance calculation
- /// * `include_tie_breakers` - Whether to include additional results with
same distance as kth neighbor
- /// * `build_batch_positions` - Output vector for matched positions
- /// * `distances` - Optional output vector for distances to matched
neighbors, aligned with `build_batch_positions`
- ///
- /// # Returns
- ///
- /// * `JoinResultMetrics` containing the number of actual matches and
candidates processed
- pub(crate) fn query_knn(
+ fn query_knn(
&self,
probe_wkb: &Wkb,
k: u32,
use_spheroid: bool,
include_tie_breakers: bool,
build_batch_positions: &mut Vec<(i32, i32)>,
- mut distances: Option<&mut Vec<f64>>,
- ) -> Result<QueryResultMetrics> {
- if k == 0 {
- return Ok(QueryResultMetrics {
- count: 0,
- candidate_count: 0,
- });
- }
-
- // Check if index is empty
- if self.indexed_batches.is_empty() ||
self.data_id_to_batch_pos.is_empty() {
- return Ok(QueryResultMetrics {
- count: 0,
- candidate_count: 0,
- });
- }
-
- // Convert probe WKB to geo::Geometry
- let probe_geom = match item_to_geometry(probe_wkb) {
- Ok(geom) => geom,
- Err(_) => {
- // Empty or unsupported geometries (e.g., POINT EMPTY) return
empty results
- return Ok(QueryResultMetrics {
- count: 0,
- candidate_count: 0,
- });
- }
- };
-
- // Select the appropriate distance metric
- let distance_metric: &dyn DistanceMetric<f32> = {
- let Some(knn_components) = self.knn_components.as_ref() else {
- return sedona_internal_err!(
- "knn_components is not initialized when running KNN join"
- );
- };
- if use_spheroid {
- &knn_components.haversine_metric
- } else {
- &knn_components.euclidean_metric
- }
- };
-
- // Create geometry accessor for on-demand WKB decoding and caching
- let geometry_accessor = self.create_knn_accessor()?;
-
- // Use neighbors_geometry to find k nearest neighbors
- let initial_results = self.rtree.neighbors_geometry(
- &probe_geom,
- Some(k as usize),
- None, // no max_distance filter
- distance_metric,
- &geometry_accessor,
- );
-
- if initial_results.is_empty() {
- return Ok(QueryResultMetrics {
- count: 0,
- candidate_count: 0,
- });
- }
-
- let mut final_results = initial_results;
- let mut candidate_count = final_results.len();
-
- // Handle tie-breakers if enabled
- if include_tie_breakers && !final_results.is_empty() && k > 0 {
- // Calculate distances for the initial k results to find the k-th
distance
- let mut distances_with_indices: Vec<(f64, u32)> = Vec::new();
-
- for &result_idx in &final_results {
- if (result_idx as usize) < self.data_id_to_batch_pos.len() {
- if let Some(item_geom) =
geometry_accessor.get_geometry(result_idx as usize) {
- let distance =
distance_metric.distance_to_geometry(&probe_geom, item_geom);
- if let Some(distance_f64) = distance.to_f64() {
- distances_with_indices.push((distance_f64,
result_idx));
- }
- }
- }
- }
-
- // Sort by distance
- distances_with_indices
- .sort_by(|a, b|
a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal));
-
- // Find the k-th distance (if we have at least k results)
- if distances_with_indices.len() >= k as usize {
- let k_idx = (k as usize)
- .min(distances_with_indices.len())
- .saturating_sub(1);
- let max_distance = distances_with_indices[k_idx].0;
-
- // For tie-breakers, create spatial envelope around probe
centroid and use rtree.search()
-
- // Create envelope bounds by expanding the probe bounding box
by max_distance
- let Some(rect) = probe_geom.bounding_rect() else {
- // If bounding rectangle cannot be computed, return empty
results
- return Ok(QueryResultMetrics {
- count: 0,
- candidate_count: 0,
- });
- };
-
- let min = rect.min();
- let max = rect.max();
- let (min_x, min_y, max_x, max_y) = f64_box_to_f32(min.x,
min.y, max.x, max.y);
- let mut distance_f32 = max_distance as f32;
- if (distance_f32 as f64) < max_distance {
- distance_f32 = distance_f32.next_after(f32::INFINITY);
- }
- let (min_x, min_y, max_x, max_y) = (
- min_x - distance_f32,
- min_y - distance_f32,
- max_x + distance_f32,
- max_y + distance_f32,
- );
-
- // Use rtree.search() with envelope bounds
- let expanded_results = self.rtree.search(min_x, min_y, max_x,
max_y);
-
- candidate_count = expanded_results.len();
-
- // Calculate distances for all results and find ties
- let mut all_distances_with_indices: Vec<(f64, u32)> =
Vec::new();
-
- for &result_idx in &expanded_results {
- if (result_idx as usize) < self.data_id_to_batch_pos.len()
{
- if let Some(item_geom) =
geometry_accessor.get_geometry(result_idx as usize)
- {
- let distance =
-
distance_metric.distance_to_geometry(&probe_geom, item_geom);
- if let Some(distance_f64) = distance.to_f64() {
- all_distances_with_indices.push((distance_f64,
result_idx));
- }
- }
- }
- }
-
- // Sort by distance
- all_distances_with_indices
- .sort_by(|a, b|
a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal));
-
- // Include all results up to and including those with the same
distance as the k-th result
- let mut tie_breaker_results: Vec<u32> = Vec::new();
-
- for (i, &(distance, result_idx)) in
all_distances_with_indices.iter().enumerate() {
- if i < k as usize {
- // Include the first k results
- tie_breaker_results.push(result_idx);
- } else if (distance - max_distance).abs() <=
DISTANCE_TOLERANCE {
- // Include tie-breakers (same distance as k-th result)
- tie_breaker_results.push(result_idx);
- } else {
- // No more ties, stop
- break;
- }
- }
-
- final_results = tie_breaker_results;
- }
- } else {
- // When tie-breakers are disabled, limit results to exactly k
- if final_results.len() > k as usize {
- final_results.truncate(k as usize);
- }
- }
-
- // Convert results to build_batch_positions using existing
data_id_to_batch_pos mapping
- for &result_idx in &final_results {
- if (result_idx as usize) < self.data_id_to_batch_pos.len() {
-
build_batch_positions.push(self.data_id_to_batch_pos[result_idx as usize]);
-
- if let Some(dists) = distances.as_mut() {
- let mut dist = f64::NAN;
- if let Some(item_geom) =
geometry_accessor.get_geometry(result_idx as usize) {
- dist = distance_metric
- .distance_to_geometry(&probe_geom, item_geom)
- .to_f64()
- .unwrap_or(f64::NAN);
- }
- dists.push(dist);
- }
- }
- }
-
- Ok(QueryResultMetrics {
- count: final_results.len(),
- candidate_count,
- })
- }
-
+ distances: Option<&mut Vec<f64>>,
+ ) -> Result<QueryResultMetrics>;
/// Query the spatial index with a batch of probe geometries to find
matching build-side geometries.
- ///
- /// This method iterates over the probe geometries in the given range of
the evaluated batch.
- /// For each probe geometry, it performs the two-phase spatial join query:
- /// 1. **Filter phase**: Uses the R-tree index with the probe geometry's
bounding rectangle
- /// to quickly identify candidate geometries.
- /// 2. **Refinement phase**: Evaluates the exact spatial predicate on
candidates to determine
- /// actual matches.
- ///
- /// # Arguments
- /// * `evaluated_batch` - The batch containing probe geometries and their
bounding rectangles
- /// * `range` - The range of rows in the evaluated batch to process.
- /// * `max_result_size` - The maximum number of results to collect before
stopping. If the
- /// number of results exceeds this limit, the method returns early.
- /// * `build_batch_positions` - Output vector that will be populated with
(batch_idx, row_idx)
- /// pairs for each matching build-side geometry.
- /// * `probe_indices` - Output vector that will be populated with the
probe row index (in
- /// `evaluated_batch`) for each match appended to
`build_batch_positions`.
- /// This means the probe index is repeated `N` times when a probe
geometry produces `N` matches,
- /// keeping `probe_indices.len()` in sync with
`build_batch_positions.len()`.
- ///
- /// # Returns
- /// * A tuple containing:
- /// - `QueryResultMetrics`: Aggregated metrics (total matches and
candidates) for the processed rows
- /// - `usize`: The index of the next row to process (exclusive end of
the processed range)
- pub(crate) async fn query_batch(
- self: &Arc<Self>,
+ async fn query_batch(
+ &self,
evaluated_batch: &Arc<EvaluatedBatch>,
range: Range<usize>,
max_result_size: usize,
build_batch_positions: &mut Vec<(i32, i32)>,
probe_indices: &mut Vec<u32>,
- ) -> Result<(QueryResultMetrics, usize)> {
- if range.is_empty() {
- return Ok((
- QueryResultMetrics {
- count: 0,
- candidate_count: 0,
- },
- range.start,
- ));
- }
-
- let rects = evaluated_batch.rects();
- let dist = evaluated_batch.distance();
- let mut total_candidates_count = 0;
- let mut total_count = 0;
- let mut current_row_idx = range.start;
- for row_idx in range {
- current_row_idx = row_idx;
- let Some(probe_rect) = rects[row_idx] else {
- continue;
- };
-
- let min = probe_rect.min();
- let max = probe_rect.max();
- let mut candidates = self.rtree.search(min.x, min.y, max.x, max.y);
- if candidates.is_empty() {
- continue;
- }
-
- let Some(probe_wkb) = evaluated_batch.wkb(row_idx) else {
- return sedona_internal_err!(
- "Failed to get WKB for row {} in evaluated batch",
- row_idx
- );
- };
-
- // Sort and dedup candidates to avoid duplicate results when we
index one geometry
- // using several boxes.
- candidates.sort_unstable();
- candidates.dedup();
-
- let distance = match dist {
- Some(dist_array) => distance_value_at(dist_array, row_idx)?,
- None => None,
- };
-
- // Refine the candidates retrieved from the r-tree index by
evaluating the actual spatial predicate
- let refine_chunk_size =
self.options.parallel_refinement_chunk_size;
- if refine_chunk_size == 0 || candidates.len() < refine_chunk_size
* 2 {
- // For small candidate sets, use refine synchronously
- let metrics =
- self.refine(probe_wkb, &candidates, &distance,
build_batch_positions)?;
- probe_indices.extend(std::iter::repeat_n(row_idx as u32,
metrics.count));
- total_count += metrics.count;
- total_candidates_count += metrics.candidate_count;
- } else {
- // For large candidate sets, spawn several tasks to
parallelize refinement
- let (metrics, positions) = self
- .refine_concurrently(
- evaluated_batch,
- row_idx,
- &candidates,
- distance,
- refine_chunk_size,
- )
- .await?;
- build_batch_positions.extend(positions);
- probe_indices.extend(std::iter::repeat_n(row_idx as u32,
metrics.count));
- total_count += metrics.count;
- total_candidates_count += metrics.candidate_count;
- }
-
- if total_count >= max_result_size {
- break;
- }
- }
-
- let end_idx = current_row_idx + 1;
- Ok((
- QueryResultMetrics {
- count: total_count,
- candidate_count: total_candidates_count,
- },
- end_idx,
- ))
- }
-
- async fn refine_concurrently(
- self: &Arc<Self>,
- evaluated_batch: &Arc<EvaluatedBatch>,
- row_idx: usize,
- candidates: &[u32],
- distance: Option<f64>,
- refine_chunk_size: usize,
- ) -> Result<(QueryResultMetrics, Vec<(i32, i32)>)> {
- let mut join_set = JoinSet::new();
- for (i, chunk) in candidates.chunks(refine_chunk_size).enumerate() {
- let cloned_evaluated_batch = Arc::clone(evaluated_batch);
- let chunk = chunk.to_vec();
- let index_ref = Arc::clone(self);
- join_set.spawn(async move {
- let Some(probe_wkb) = cloned_evaluated_batch.wkb(row_idx) else
{
- return (
- i,
- sedona_internal_err!(
- "Failed to get WKB for row {} in evaluated batch",
- row_idx
- ),
- );
- };
- let mut local_positions: Vec<(i32, i32)> =
Vec::with_capacity(chunk.len());
- let res = index_ref.refine(probe_wkb, &chunk, &distance, &mut
local_positions);
- (i, res.map(|r| (r, local_positions)))
- });
- }
-
- // Collect the results in order
- let mut refine_results = Vec::with_capacity(join_set.len());
- refine_results.resize_with(join_set.len(), || None);
- while let Some(res) = join_set.join_next().await {
- let (chunk_idx, refine_res) =
- res.map_err(|e| DataFusionError::External(Box::new(e)))?;
- let (metrics, positions) = refine_res?;
- refine_results[chunk_idx] = Some((metrics, positions));
- }
-
- let mut total_metrics = QueryResultMetrics {
- count: 0,
- candidate_count: 0,
- };
- let mut all_positions = Vec::with_capacity(candidates.len());
- for res in refine_results {
- let (metrics, positions) = res.expect("All chunks should be
processed");
- total_metrics.count += metrics.count;
- total_metrics.candidate_count += metrics.candidate_count;
- all_positions.extend(positions);
- }
-
- Ok((total_metrics, all_positions))
- }
-
- fn refine(
- &self,
- probe_wkb: &Wkb,
- candidates: &[u32],
- distance: &Option<f64>,
- build_batch_positions: &mut Vec<(i32, i32)>,
- ) -> Result<QueryResultMetrics> {
- let candidate_count = candidates.len();
-
- let mut index_query_results = Vec::with_capacity(candidate_count);
- for data_idx in candidates {
- let pos = self.data_id_to_batch_pos[*data_idx as usize];
- let (batch_idx, row_idx) = pos;
- let indexed_batch = &self.indexed_batches[batch_idx as usize];
- let build_wkb = indexed_batch.wkb(row_idx as usize);
- let Some(build_wkb) = build_wkb else {
- continue;
- };
- let distance = self.evaluator.resolve_distance(
- indexed_batch.distance(),
- row_idx as usize,
- distance,
- )?;
- let geom_idx = self.geom_idx_vec[*data_idx as usize];
- index_query_results.push(IndexQueryResult {
- wkb: build_wkb,
- distance,
- geom_idx,
- position: pos,
- });
- }
-
- if index_query_results.is_empty() {
- return Ok(QueryResultMetrics {
- count: 0,
- candidate_count,
- });
- }
-
- let results = self.refiner.refine(probe_wkb, &index_query_results)?;
- let num_results = results.len();
- build_batch_positions.extend(results);
-
- Ok(QueryResultMetrics {
- count: num_results,
- candidate_count,
- })
- }
-
+ ) -> Result<(QueryResultMetrics, usize)>;
/// Check if the index needs more probe statistics to determine the
optimal execution mode.
///
/// # Returns
/// * `bool` - `true` if the index needs more probe statistics, `false`
otherwise.
- pub(crate) fn need_more_probe_stats(&self) -> bool {
- self.refiner.need_more_probe_stats()
- }
-
+ fn need_more_probe_stats(&self) -> bool;
/// Merge the probe statistics into the index.
///
/// # Arguments
/// * `stats` - The probe statistics to merge.
- pub(crate) fn merge_probe_stats(&self, stats: GeoStatistics) {
- self.refiner.merge_probe_stats(stats);
- }
-
+ fn merge_probe_stats(&self, stats: GeoStatistics);
/// Get the bitmaps for tracking visited left-side indices. The bitmaps
will be updated
/// by the spatial join stream when producing output batches during index
probing phase.
- pub(crate) fn visited_build_side(&self) ->
Option<&Mutex<Vec<BooleanBufferBuilder>>> {
- self.visited_build_side.as_ref()
- }
-
+ fn visited_build_side(&self) -> Option<&Mutex<Vec<BooleanBufferBuilder>>>;
/// Decrements counter of running threads, and returns `true`
/// if caller is the last running thread
- pub(crate) fn report_probe_completed(&self) -> bool {
- self.probe_threads_counter.fetch_sub(1, Ordering::Relaxed) == 1
- }
-
+ fn report_probe_completed(&self) -> bool;
Review Comment:
We can do another pass of refactoring to move probe completion markers out
of spatial index, this will simplify the interface of SpatialIndex. This does
not need to be done in this PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]