This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 673d7c93a0 Refactor TopKHashTable to use HashTable API (#19464)
673d7c93a0 is described below

commit 673d7c93a0528bcd0629f457f726b549db0aa1c6
Author: Daniël Heres <[email protected]>
AuthorDate: Mon Dec 29 19:38:01 2025 +0100

    Refactor TopKHashTable to use HashTable API (#19464)
    
    ## Which issue does this PR close?
    
    Part of #13433
    
    - [x] Add some docs
    - [x] Report performance changes
    
    ## Rationale for this change
    
    Getting rid of `RawTable` so we can freely upgrade `hasbrown`, also
    removing a lot of unsafe code at the same time (helping with later
    refactoring / optimization as well).
    
    Note to reviewers: this change was in large part done by Google Jules
    (Gemini), I mainly did some review / cleanup.
    
    The (micro) topk aggregation benchmarks show no difference on my machine
    (as desired)
    
    ## What changes are included in this PR?
    
    Changes storage to use a separate `Vec` to store index-based info
    instead of directly in hashmap.
    ## Are these changes tested?
    
    ## Are there any user-facing changes?
    
    ---------
    
    Co-authored-by: google-labs-jules[bot] 
<161369871+google-labs-jules[bot]@users.noreply.github.com>
---
 .../src/aggregates/topk/hash_table.rs              | 392 +++++++++------------
 .../physical-plan/src/aggregates/topk/heap.rs      |  38 --
 .../src/aggregates/topk/priority_map.rs            |  26 +-
 3 files changed, 178 insertions(+), 278 deletions(-)

diff --git a/datafusion/physical-plan/src/aggregates/topk/hash_table.rs 
b/datafusion/physical-plan/src/aggregates/topk/hash_table.rs
index 1fae507d90..4a3f3ac258 100644
--- a/datafusion/physical-plan/src/aggregates/topk/hash_table.rs
+++ b/datafusion/physical-plan/src/aggregates/topk/hash_table.rs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! A wrapper around `hashbrown::RawTable` that allows entries to be tracked 
by index
+//! A wrapper around `hashbrown::HashTable` that allows entries to be tracked 
by index
 
 use crate::aggregates::group_values::HashValue;
 use crate::aggregates::topk::heap::Comparable;
@@ -29,7 +29,7 @@ use arrow::datatypes::{DataType, i256};
 use datafusion_common::Result;
 use datafusion_common::exec_datafusion_err;
 use half::f16;
-use hashbrown::raw::RawTable;
+use hashbrown::hash_table::HashTable;
 use std::fmt::Debug;
 use std::sync::Arc;
 
@@ -48,13 +48,17 @@ pub struct HashTableItem<ID: KeyType> {
     pub heap_idx: usize,
 }
 
-/// A custom wrapper around `hashbrown::RawTable` that:
+/// A custom wrapper around `hashbrown::HashTable` that:
 /// 1. limits the number of entries to the top K
 /// 2. Allocates a capacity greater than top K to maintain a low-fill factor 
and prevent resizing
 /// 3. Tracks indexes to allow corresponding heap to refer to entries by index 
vs hash
-/// 4. Catches resize events to allow the corresponding heap to update it's 
indexes
 struct TopKHashTable<ID: KeyType> {
-    map: RawTable<HashTableItem<ID>>,
+    map: HashTable<usize>,
+    // Store the actual items separately to allow for index-based access
+    store: Vec<Option<HashTableItem<ID>>>,
+    // Free index in the store for reuse
+    free_index: Option<usize>,
+    // The maximum number of entries allowed
     limit: usize,
 }
 
@@ -62,25 +66,10 @@ struct TopKHashTable<ID: KeyType> {
 pub trait ArrowHashTable {
     fn set_batch(&mut self, ids: ArrayRef);
     fn len(&self) -> usize;
-    // JUSTIFICATION
-    //  Benefit:  ~15% speedup + required to index into RawTable from binary 
heap
-    //  Soundness: the caller must provide valid indexes
-    unsafe fn update_heap_idx(&mut self, mapper: &[(usize, usize)]);
-    // JUSTIFICATION
-    //  Benefit:  ~15% speedup + required to index into RawTable from binary 
heap
-    //  Soundness: the caller must provide a valid index
-    unsafe fn heap_idx_at(&self, map_idx: usize) -> usize;
-    unsafe fn take_all(&mut self, indexes: Vec<usize>) -> ArrayRef;
-
-    // JUSTIFICATION
-    //  Benefit:  ~15% speedup + required to index into RawTable from binary 
heap
-    //  Soundness: the caller must provide valid indexes
-    unsafe fn find_or_insert(
-        &mut self,
-        row_idx: usize,
-        replace_idx: usize,
-        map: &mut Vec<(usize, usize)>,
-    ) -> (usize, bool);
+    fn update_heap_idx(&mut self, mapper: &[(usize, usize)]);
+    fn heap_idx_at(&self, map_idx: usize) -> usize;
+    fn take_all(&mut self, indexes: Vec<usize>) -> ArrayRef;
+    fn find_or_insert(&mut self, row_idx: usize, replace_idx: usize) -> 
(usize, bool);
 }
 
 // An implementation of ArrowHashTable for String keys
@@ -130,91 +119,82 @@ impl ArrowHashTable for StringHashTable {
         self.map.len()
     }
 
-    unsafe fn update_heap_idx(&mut self, mapper: &[(usize, usize)]) {
-        unsafe {
-            self.map.update_heap_idx(mapper);
-        }
+    fn update_heap_idx(&mut self, mapper: &[(usize, usize)]) {
+        self.map.update_heap_idx(mapper);
     }
 
-    unsafe fn heap_idx_at(&self, map_idx: usize) -> usize {
-        unsafe { self.map.heap_idx_at(map_idx) }
+    fn heap_idx_at(&self, map_idx: usize) -> usize {
+        self.map.heap_idx_at(map_idx)
     }
 
-    unsafe fn take_all(&mut self, indexes: Vec<usize>) -> ArrayRef {
-        unsafe {
-            let ids = self.map.take_all(indexes);
-            match self.data_type {
-                DataType::Utf8 => Arc::new(StringArray::from(ids)),
-                DataType::LargeUtf8 => Arc::new(LargeStringArray::from(ids)),
-                DataType::Utf8View => Arc::new(StringViewArray::from(ids)),
-                _ => unreachable!(),
-            }
+    fn take_all(&mut self, indexes: Vec<usize>) -> ArrayRef {
+        let ids = self.map.take_all(indexes);
+        match self.data_type {
+            DataType::Utf8 => Arc::new(StringArray::from(ids)),
+            DataType::LargeUtf8 => Arc::new(LargeStringArray::from(ids)),
+            DataType::Utf8View => Arc::new(StringViewArray::from(ids)),
+            _ => unreachable!(),
         }
     }
 
-    unsafe fn find_or_insert(
-        &mut self,
-        row_idx: usize,
-        replace_idx: usize,
-        mapper: &mut Vec<(usize, usize)>,
-    ) -> (usize, bool) {
-        unsafe {
-            let id = match self.data_type {
-                DataType::Utf8 => {
-                    let ids = self
-                        .owned
-                        .as_any()
-                        .downcast_ref::<StringArray>()
-                        .expect("Expected StringArray for DataType::Utf8");
-                    if ids.is_null(row_idx) {
-                        None
-                    } else {
-                        Some(ids.value(row_idx))
-                    }
+    fn find_or_insert(&mut self, row_idx: usize, replace_idx: usize) -> 
(usize, bool) {
+        let id = match self.data_type {
+            DataType::Utf8 => {
+                let ids = self
+                    .owned
+                    .as_any()
+                    .downcast_ref::<StringArray>()
+                    .expect("Expected StringArray for DataType::Utf8");
+                if ids.is_null(row_idx) {
+                    None
+                } else {
+                    Some(ids.value(row_idx))
                 }
-                DataType::LargeUtf8 => {
-                    let ids = self
-                        .owned
-                        .as_any()
-                        .downcast_ref::<LargeStringArray>()
-                        .expect("Expected LargeStringArray for 
DataType::LargeUtf8");
-                    if ids.is_null(row_idx) {
-                        None
-                    } else {
-                        Some(ids.value(row_idx))
-                    }
+            }
+            DataType::LargeUtf8 => {
+                let ids = self
+                    .owned
+                    .as_any()
+                    .downcast_ref::<LargeStringArray>()
+                    .expect("Expected LargeStringArray for 
DataType::LargeUtf8");
+                if ids.is_null(row_idx) {
+                    None
+                } else {
+                    Some(ids.value(row_idx))
                 }
-                DataType::Utf8View => {
-                    let ids = self
-                        .owned
-                        .as_any()
-                        .downcast_ref::<StringViewArray>()
-                        .expect("Expected StringViewArray for 
DataType::Utf8View");
-                    if ids.is_null(row_idx) {
-                        None
-                    } else {
-                        Some(ids.value(row_idx))
-                    }
+            }
+            DataType::Utf8View => {
+                let ids = self
+                    .owned
+                    .as_any()
+                    .downcast_ref::<StringViewArray>()
+                    .expect("Expected StringViewArray for DataType::Utf8View");
+                if ids.is_null(row_idx) {
+                    None
+                } else {
+                    Some(ids.value(row_idx))
                 }
-                _ => panic!("Unsupported data type"),
-            };
-
-            let hash = self.rnd.hash_one(id);
-            if let Some(map_idx) = self
-                .map
-                .find(hash, |mi| id == mi.as_ref().map(|id| id.as_str()))
-            {
-                return (map_idx, false);
             }
+            _ => panic!("Unsupported data type"),
+        };
 
-            // we're full and this is a better value, so remove the worst
-            let heap_idx = self.map.remove_if_full(replace_idx);
+        // TODO: avoid double lookup by using entry API
 
-            // add the new group
-            let id = id.map(|id| id.to_string());
-            let map_idx = self.map.insert(hash, id, heap_idx, mapper);
-            (map_idx, true)
+        let hash = self.rnd.hash_one(id);
+        if let Some(map_idx) = self
+            .map
+            .find(hash, |mi| id == mi.as_ref().map(|id| id.as_str()))
+        {
+            return (map_idx, false);
         }
+
+        // we're full and this is a better value, so remove the worst
+        let heap_idx = self.map.remove_if_full(replace_idx);
+
+        // add the new group
+        let id = id.map(|id| id.to_string());
+        let map_idx = self.map.insert(hash, &id, heap_idx);
+        (map_idx, true)
     }
 }
 
@@ -251,149 +231,137 @@ where
         self.map.len()
     }
 
-    unsafe fn update_heap_idx(&mut self, mapper: &[(usize, usize)]) {
-        unsafe {
-            self.map.update_heap_idx(mapper);
-        }
+    fn update_heap_idx(&mut self, mapper: &[(usize, usize)]) {
+        self.map.update_heap_idx(mapper);
     }
 
-    unsafe fn heap_idx_at(&self, map_idx: usize) -> usize {
-        unsafe { self.map.heap_idx_at(map_idx) }
+    fn heap_idx_at(&self, map_idx: usize) -> usize {
+        self.map.heap_idx_at(map_idx)
     }
 
-    unsafe fn take_all(&mut self, indexes: Vec<usize>) -> ArrayRef {
-        unsafe {
-            let ids = self.map.take_all(indexes);
-            let mut builder: PrimitiveBuilder<VAL> =
-                
PrimitiveArray::builder(ids.len()).with_data_type(self.kt.clone());
-            for id in ids.into_iter() {
-                match id {
-                    None => builder.append_null(),
-                    Some(id) => builder.append_value(id),
-                }
+    fn take_all(&mut self, indexes: Vec<usize>) -> ArrayRef {
+        let ids = self.map.take_all(indexes);
+        let mut builder: PrimitiveBuilder<VAL> =
+            PrimitiveArray::builder(ids.len()).with_data_type(self.kt.clone());
+        for id in ids.into_iter() {
+            match id {
+                None => builder.append_null(),
+                Some(id) => builder.append_value(id),
             }
-            let ids = builder.finish();
-            Arc::new(ids)
         }
+        let ids = builder.finish();
+        Arc::new(ids)
     }
 
-    unsafe fn find_or_insert(
-        &mut self,
-        row_idx: usize,
-        replace_idx: usize,
-        mapper: &mut Vec<(usize, usize)>,
-    ) -> (usize, bool) {
-        unsafe {
-            let ids = self.owned.as_primitive::<VAL>();
-            let id: Option<VAL::Native> = if ids.is_null(row_idx) {
-                None
-            } else {
-                Some(ids.value(row_idx))
-            };
-
-            let hash: u64 = id.hash(&self.rnd);
-            if let Some(map_idx) = self.map.find(hash, |mi| id == *mi) {
-                return (map_idx, false);
-            }
-
-            // we're full and this is a better value, so remove the worst
-            let heap_idx = self.map.remove_if_full(replace_idx);
+    fn find_or_insert(&mut self, row_idx: usize, replace_idx: usize) -> 
(usize, bool) {
+        let ids = self.owned.as_primitive::<VAL>();
+        let id: Option<VAL::Native> = if ids.is_null(row_idx) {
+            None
+        } else {
+            Some(ids.value(row_idx))
+        };
 
-            // add the new group
-            let map_idx = self.map.insert(hash, id, heap_idx, mapper);
-            (map_idx, true)
+        let hash: u64 = id.hash(&self.rnd);
+        // TODO: avoid double lookup by using entry API
+        if let Some(map_idx) = self.map.find(hash, |mi| id == *mi) {
+            return (map_idx, false);
         }
+
+        // we're full and this is a better value, so remove the worst
+        let heap_idx = self.map.remove_if_full(replace_idx);
+
+        // add the new group
+        let map_idx = self.map.insert(hash, &id, heap_idx);
+        (map_idx, true)
     }
 }
 
-impl<ID: KeyType> TopKHashTable<ID> {
+use hashbrown::hash_table::Entry;
+impl<ID: KeyType + PartialEq> TopKHashTable<ID> {
     pub fn new(limit: usize, capacity: usize) -> Self {
         Self {
-            map: RawTable::with_capacity(capacity),
+            map: HashTable::with_capacity(capacity),
+            store: Vec::with_capacity(capacity),
+            free_index: None,
             limit,
         }
     }
 
     pub fn find(&self, hash: u64, mut eq: impl FnMut(&ID) -> bool) -> 
Option<usize> {
-        let bucket = self.map.find(hash, |mi| eq(&mi.id))?;
-        // JUSTIFICATION
-        //  Benefit:  ~15% speedup + required to index into RawTable from 
binary heap
-        //  Soundness: getting the index of a bucket we just found
-        let idx = unsafe { self.map.bucket_index(&bucket) };
-        Some(idx)
+        let eq = |&idx: &usize| eq(&self.store[idx].as_ref().unwrap().id);
+        self.map.find(hash, eq).copied()
     }
 
-    pub unsafe fn heap_idx_at(&self, map_idx: usize) -> usize {
-        unsafe {
-            let bucket = self.map.bucket(map_idx);
-            bucket.as_ref().heap_idx
-        }
+    pub fn heap_idx_at(&self, map_idx: usize) -> usize {
+        self.store[map_idx].as_ref().unwrap().heap_idx
     }
 
-    pub unsafe fn remove_if_full(&mut self, replace_idx: usize) -> usize {
-        unsafe {
-            if self.map.len() >= self.limit {
-                self.map.erase(self.map.bucket(replace_idx));
-                0 // if full, always replace top node
-            } else {
-                self.map.len() // if we're not full, always append to end
+    pub fn remove_if_full(&mut self, replace_idx: usize) -> usize {
+        if self.map.len() >= self.limit {
+            let item_to_remove = self.store[replace_idx].as_ref().unwrap();
+            let hash = item_to_remove.hash;
+            let id_to_remove = &item_to_remove.id;
+
+            let eq = |&idx: &usize| self.store[idx].as_ref().unwrap().id == 
*id_to_remove;
+            let hasher = |idx: &usize| self.store[*idx].as_ref().unwrap().hash;
+            match self.map.entry(hash, eq, hasher) {
+                Entry::Occupied(entry) => {
+                    let (removed_idx, _) = entry.remove();
+                    self.store[removed_idx] = None;
+                    self.free_index = Some(removed_idx);
+                }
+                Entry::Vacant(_) => unreachable!(),
             }
+            0 // if full, always replace top node
+        } else {
+            self.map.len() // if we're not full, always append to end
         }
     }
 
-    unsafe fn update_heap_idx(&mut self, mapper: &[(usize, usize)]) {
-        unsafe {
-            for (m, h) in mapper {
-                self.map.bucket(*m).as_mut().heap_idx = *h
-            }
+    fn update_heap_idx(&mut self, mapper: &[(usize, usize)]) {
+        for (m, h) in mapper {
+            self.store[*m].as_mut().unwrap().heap_idx = *h;
         }
     }
 
-    pub fn insert(
-        &mut self,
-        hash: u64,
-        id: ID,
-        heap_idx: usize,
-        mapper: &mut Vec<(usize, usize)>,
-    ) -> usize {
-        let mi = HashTableItem::new(hash, id, heap_idx);
-        let bucket = self.map.try_insert_no_grow(hash, mi);
-        let bucket = match bucket {
-            Ok(bucket) => bucket,
-            Err(new_item) => {
-                let bucket = self.map.insert(hash, new_item, |mi| mi.hash);
-                // JUSTIFICATION
-                //  Benefit:  ~15% speedup + required to index into RawTable 
from binary heap
-                //  Soundness: we're getting indexes of buckets, not 
dereferencing them
-                unsafe {
-                    for bucket in self.map.iter() {
-                        let heap_idx = bucket.as_ref().heap_idx;
-                        let map_idx = self.map.bucket_index(&bucket);
-                        mapper.push((heap_idx, map_idx));
-                    }
-                }
-                bucket
-            }
+    pub fn insert(&mut self, hash: u64, id: &ID, heap_idx: usize) -> usize {
+        let mi = HashTableItem::new(hash, id.clone(), heap_idx);
+        let store_idx = if let Some(idx) = self.free_index.take() {
+            self.store[idx] = Some(mi);
+            idx
+        } else {
+            self.store.push(Some(mi));
+            self.store.len() - 1
         };
-        // JUSTIFICATION
-        //  Benefit:  ~15% speedup + required to index into RawTable from 
binary heap
-        //  Soundness: we're getting indexes of buckets, not dereferencing them
-        unsafe { self.map.bucket_index(&bucket) }
+
+        let hasher = |idx: &usize| self.store[*idx].as_ref().unwrap().hash;
+        if self.map.len() == self.map.capacity() {
+            self.map.reserve(self.limit, hasher);
+        }
+
+        let eq_fn = |idx: &usize| self.store[*idx].as_ref().unwrap().id == *id;
+        match self.map.entry(hash, eq_fn, hasher) {
+            Entry::Occupied(_) => unreachable!("Item should not exist"),
+            Entry::Vacant(vacant) => {
+                vacant.insert(store_idx);
+            }
+        }
+        store_idx
     }
 
     pub fn len(&self) -> usize {
         self.map.len()
     }
 
-    pub unsafe fn take_all(&mut self, idxs: Vec<usize>) -> Vec<ID> {
-        unsafe {
-            let ids = idxs
-                .into_iter()
-                .map(|idx| self.map.bucket(idx).as_ref().id.clone())
-                .collect();
-            self.map.clear();
-            ids
-        }
+    pub fn take_all(&mut self, idxs: Vec<usize>) -> Vec<ID> {
+        let ids = idxs
+            .into_iter()
+            .map(|idx| self.store[idx].take().unwrap().id)
+            .collect();
+        self.map.clear();
+        self.store.clear();
+        self.free_index = None;
+        ids
     }
 }
 
@@ -471,11 +439,8 @@ mod tests {
         let dt = DataType::Timestamp(TimeUnit::Millisecond, 
Some("UTC".into()));
         let mut ht = new_hash_table(1, dt.clone())?;
         ht.set_batch(Arc::new(ids));
-        let mut mapper = vec![];
-        let ids = unsafe {
-            ht.find_or_insert(0, 0, &mut mapper);
-            ht.take_all(vec![0])
-        };
+        ht.find_or_insert(0, 0);
+        let ids = ht.take_all(vec![0]);
         assert_eq!(ids.data_type(), &dt);
 
         Ok(())
@@ -486,26 +451,13 @@ mod tests {
         let mut heap_to_map = BTreeMap::<usize, usize>::new();
         let mut map = TopKHashTable::<Option<String>>::new(5, 3);
         for (heap_idx, id) in vec!["1", "2", "3", "4", 
"5"].into_iter().enumerate() {
-            let mut mapper = vec![];
             let hash = heap_idx as u64;
-            let map_idx = map.insert(hash, Some(id.to_string()), heap_idx, 
&mut mapper);
+            let map_idx = map.insert(hash, &Some(id.to_string()), heap_idx);
             let _ = heap_to_map.insert(heap_idx, map_idx);
-            if heap_idx == 3 {
-                assert_eq!(
-                    mapper,
-                    vec![(0, 0), (1, 1), (2, 2), (3, 3)],
-                    "Pass {heap_idx} resized incorrectly!"
-                );
-                for (heap_idx, map_idx) in mapper {
-                    let _ = heap_to_map.insert(heap_idx, map_idx);
-                }
-            } else {
-                assert_eq!(mapper, vec![], "Pass {heap_idx} should not have 
resized!");
-            }
         }
 
         let (_heap_idxs, map_idxs): (Vec<_>, Vec<_>) = 
heap_to_map.into_iter().unzip();
-        let ids = unsafe { map.take_all(map_idxs) };
+        let ids = map.take_all(map_idxs);
         assert_eq!(
             format!("{ids:?}"),
             r#"[Some("1"), Some("2"), Some("3"), Some("4"), Some("5")]"#
diff --git a/datafusion/physical-plan/src/aggregates/topk/heap.rs 
b/datafusion/physical-plan/src/aggregates/topk/heap.rs
index abdf320ea3..b4569c3d08 100644
--- a/datafusion/physical-plan/src/aggregates/topk/heap.rs
+++ b/datafusion/physical-plan/src/aggregates/topk/heap.rs
@@ -72,7 +72,6 @@ pub trait ArrowHeap {
     fn set_batch(&mut self, vals: ArrayRef);
     fn is_worse(&self, idx: usize) -> bool;
     fn worst_map_idx(&self) -> usize;
-    fn renumber(&mut self, heap_to_map: &[(usize, usize)]);
     fn insert(&mut self, row_idx: usize, map_idx: usize, map: &mut Vec<(usize, 
usize)>);
     fn replace_if_better(
         &mut self,
@@ -131,10 +130,6 @@ where
         self.heap.worst_map_idx()
     }
 
-    fn renumber(&mut self, heap_to_map: &[(usize, usize)]) {
-        self.heap.renumber(heap_to_map);
-    }
-
     fn insert(&mut self, row_idx: usize, map_idx: usize, map: &mut Vec<(usize, 
usize)>) {
         let vals = self.batch.as_primitive::<VAL>();
         let new_val = vals.value(row_idx);
@@ -268,14 +263,6 @@ impl<VAL: ValueType> TopKHeap<VAL> {
         self.heapify_down(heap_idx, mapper);
     }
 
-    pub fn renumber(&mut self, heap_to_map: &[(usize, usize)]) {
-        for (heap_idx, map_idx) in heap_to_map.iter() {
-            if let Some(Some(hi)) = self.heap.get_mut(*heap_idx) {
-                hi.map_idx = *map_idx;
-            }
-        }
-    }
-
     fn heapify_up(&mut self, mut idx: usize, mapper: &mut Vec<(usize, usize)>) 
{
         let desc = self.desc;
         while idx != 0 {
@@ -608,29 +595,4 @@ mod tests {
 
         Ok(())
     }
-
-    #[test]
-    fn should_renumber() -> Result<()> {
-        let mut map = vec![];
-        let mut heap = TopKHeap::new(10, false);
-
-        heap.append_or_replace(1, 1, &mut map);
-        heap.append_or_replace(2, 2, &mut map);
-
-        let actual = heap.to_string();
-        assert_snapshot!(actual, @r"
-        val=2 idx=0, bucket=2
-        └── val=1 idx=1, bucket=1
-        ");
-
-        let numbers = vec![(0, 1), (1, 2)];
-        heap.renumber(numbers.as_slice());
-        let actual = heap.to_string();
-        assert_snapshot!(actual, @r"
-        val=2 idx=0, bucket=1
-        └── val=1 idx=1, bucket=2
-        ");
-
-        Ok(())
-    }
 }
diff --git a/datafusion/physical-plan/src/aggregates/topk/priority_map.rs 
b/datafusion/physical-plan/src/aggregates/topk/priority_map.rs
index fdff6b3a1a..8e093d213e 100644
--- a/datafusion/physical-plan/src/aggregates/topk/priority_map.rs
+++ b/datafusion/physical-plan/src/aggregates/topk/priority_map.rs
@@ -63,40 +63,26 @@ impl PriorityMap {
         // handle new groups we haven't seen yet
         map.clear();
         let replace_idx = self.heap.worst_map_idx();
-        // JUSTIFICATION
-        //  Benefit:  ~15% speedup + required to index into RawTable from 
binary heap
-        //  Soundness: replace_idx kept valid during resizes
-        let (map_idx, did_insert) =
-            unsafe { self.map.find_or_insert(row_idx, replace_idx, map) };
+
+        let (map_idx, did_insert) = self.map.find_or_insert(row_idx, 
replace_idx);
         if did_insert {
-            self.heap.renumber(map);
-            map.clear();
             self.heap.insert(row_idx, map_idx, map);
-            // JUSTIFICATION
-            //  Benefit:  ~15% speedup + required to index into RawTable from 
binary heap
-            //  Soundness: the map was created on the line above, so all the 
indexes should be valid
-            unsafe { self.map.update_heap_idx(map) };
+            self.map.update_heap_idx(map);
             return Ok(());
         };
 
         // this is a value for an existing group
         map.clear();
-        // JUSTIFICATION
-        //  Benefit:  ~15% speedup + required to index into RawTable from 
binary heap
-        //  Soundness: map_idx was just found, so it is valid
-        let heap_idx = unsafe { self.map.heap_idx_at(map_idx) };
+        let heap_idx = self.map.heap_idx_at(map_idx);
         self.heap.replace_if_better(heap_idx, row_idx, map);
-        // JUSTIFICATION
-        //  Benefit:  ~15% speedup + required to index into RawTable from 
binary heap
-        //  Soundness: the index map was just built, so it will be valid
-        unsafe { self.map.update_heap_idx(map) };
+        self.map.update_heap_idx(map);
 
         Ok(())
     }
 
     pub fn emit(&mut self) -> Result<Vec<ArrayRef>> {
         let (vals, map_idxs) = self.heap.drain();
-        let ids = unsafe { self.map.take_all(map_idxs) };
+        let ids = self.map.take_all(map_idxs);
         Ok(vec![ids, vals])
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to