adriangb commented on code in PR #18449:
URL: https://github.com/apache/datafusion/pull/18449#discussion_r2508641119


##########
datafusion/common/src/hash_utils.rs:
##########
@@ -41,6 +41,94 @@ pub fn combine_hashes(l: u64, r: u64) -> u64 {
     hash.wrapping_mul(37).wrapping_add(r)
 }
 
+/// Maximum size for the thread-local hash buffer before truncation (4MB = 
524,288 u64 elements).
+/// The goal of this is to avoid unbounded memory growth that would appear as 
a memory leak.
+/// We allow temporary allocations beyond this size, but after use the buffer 
is truncated
+/// to this size.
+const MAX_BUFFER_SIZE: usize = 524_288;
+
+thread_local! {
+    /// Thread-local buffer for hash computations to avoid repeated 
allocations.
+    /// The buffer is reused across calls and truncated if it exceeds 
MAX_BUFFER_SIZE.
+    /// Defaults to a capacity of 8192 u64 elements which is the default batch 
size.
+    /// This corresponds to 64KB of memory.
+    static HASH_BUFFER: RefCell<Vec<u64>> = 
RefCell::new(Vec::with_capacity(8192));
+}
+
+/// Creates hashes for the given arrays using a thread-local buffer, then 
calls the provided callback
+/// with an immutable reference to the computed hashes.
+///
+/// This function manages a thread-local buffer to avoid repeated allocations. 
The buffer is automatically
+/// truncated if it exceeds [`MAX_BUFFER_SIZE`] after use.
+///
+/// # Arguments
+/// * `arrays` - The arrays to hash (must contain at least one array)
+/// * `random_state` - The random state for hashing
+/// * `callback` - A function that receives an immutable reference to the hash 
slice and returns a result
+///
+/// # Errors
+/// Returns an error if:
+/// - No arrays are provided
+/// - The function is called reentrantly (i.e., the callback invokes 
`with_hashes` again on the same thread)
+/// - The function is called during or after thread destruction
+///
+/// # Example
+/// ```ignore
+/// use datafusion_common::hash_utils::{with_hashes, RandomState};
+/// use arrow::array::{Int32Array, ArrayRef};
+/// use std::sync::Arc;
+///
+/// let array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
+/// let random_state = RandomState::new();
+///
+/// let result = with_hashes([&array], &random_state, |hashes| {
+///     // Use the hashes here
+///     Ok(hashes.len())
+/// })?;
+/// ```
+pub fn with_hashes<I, T, F, R>(
+    arrays: I,
+    random_state: &RandomState,
+    callback: F,
+) -> Result<R>
+where
+    I: IntoIterator<Item = T>,
+    T: AsDynArray,
+    F: FnOnce(&[u64]) -> Result<R>,
+{
+    // Peek at the first array to determine buffer size without fully 
collecting
+    let mut iter = arrays.into_iter().peekable();
+
+    // Get the required size from the first array
+    let required_size = match iter.peek() {
+        Some(arr) => arr.as_dyn_array().len(),
+        None => return _internal_err!("with_hashes requires at least one 
array"),
+    };
+
+    HASH_BUFFER.try_with(|cell| {
+        let mut buffer = cell.try_borrow_mut()
+            .map_err(|_| _internal_datafusion_err!("with_hashes cannot be 
called reentrantly on the same thread"))?;
+
+        // Ensure buffer has sufficient length, clearing old values
+        buffer.clear();
+        buffer.resize(required_size, 0);
+
+        // Create hashes in the buffer - this consumes the iterator
+        create_hashes(iter, random_state, &mut buffer[..required_size])?;
+
+        // Execute the callback with an immutable slice
+        let result = callback(&buffer[..required_size])?;
+
+        // Cleanup: truncate if buffer grew too large
+        if buffer.capacity() > MAX_BUFFER_SIZE {
+            buffer.truncate(MAX_BUFFER_SIZE);
+            buffer.shrink_to_fit();
+        }
+
+        Ok(result)
+    }).map_err(|_| _internal_datafusion_err!("with_hashes cannot access 
thread-local storage during or after thread destruction"))?
+}

Review Comment:
   This can be its own PR if we really want to reduce the diff. Then we could 
also use it in all of the other call sites that have similar patterns.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to