tustvold commented on code in PR #3603: URL: https://github.com/apache/arrow-rs/pull/3603#discussion_r1092258209
########## arrow-array/src/array/run_array.rs: ########## @@ -504,4 +702,30 @@ mod tests { let a = RunArray::<Int32Type>::from_iter(["32"]); let _ = RunArray::<Int64Type>::from(a.into_data()); } + + #[test] + fn test_ree_array_accessor() { + let input_array = build_input_array(256); + + // Encode the input_array to ree_array + let mut builder = + PrimitiveRunBuilder::<Int16Type, Int32Type>::with_capacity(input_array.len()); + builder.extend(input_array.clone().into_iter()); Review Comment: ```suggestion builder.extend(input_array.iter().copied()); ``` Perhaps ########## arrow-array/src/array/run_array.rs: ########## @@ -121,6 +122,27 @@ impl<R: RunEndIndexType> RunArray<R> { pub fn values(&self) -> &ArrayRef { &self.values } + + /// Downcast this dictionary to a [`TypedRunArray`] + /// + /// ``` + /// use arrow_array::{Array, ArrayAccessor, RunArray, StringArray, types::Int32Type}; + /// + /// let orig = [Some("a"), Some("b"), None]; + /// let run_array = RunArray::<Int32Type>::from_iter(orig); + /// let typed = run_array.downcast_ref::<StringArray>().unwrap(); + /// assert_eq!(typed.value(0), "a"); + /// assert_eq!(typed.value(1), "b"); + /// assert!(typed.values().is_null(2)); + /// ``` + /// + pub fn downcast_ref<V: 'static>(&self) -> Option<TypedRunArray<'_, R, V>> { Review Comment: ```suggestion pub fn downcast<V: 'static>(&self) -> Option<TypedRunArray<'_, R, V>> { ``` Or something so as not to overload `dyn Any` ########## arrow-array/src/array/run_array.rs: ########## @@ -274,15 +296,191 @@ pub type Int32RunArray = RunArray<Int32Type>; /// ``` pub type Int64RunArray = RunArray<Int64Type>; +/// A strongly-typed wrapper around a [`RunArray`] that implements [`ArrayAccessor`] +/// and [`IntoIterator`] allowing fast access to its elements +/// +/// ``` +/// use arrow_array::{RunArray, StringArray, types::Int32Type}; +/// +/// let orig = ["a", "b", "a", "b"]; +/// let ree_array = RunArray::<Int32Type>::from_iter(orig); +/// +/// // `TypedRunArray` allows you to access the values directly +/// let typed = ree_array.downcast_ref::<StringArray>().unwrap(); +/// +/// for (maybe_val, orig) in typed.into_iter().zip(orig) { +/// assert_eq!(maybe_val.unwrap(), orig) +/// } +/// ``` +pub struct TypedRunArray<'a, R: RunEndIndexType, V> { + /// The run array + run_array: &'a RunArray<R>, + + /// The values of the run_array + values: &'a V, +} + +// Manually implement `Clone` to avoid `V: Clone` type constraint +impl<'a, R: RunEndIndexType, V> Clone for TypedRunArray<'a, R, V> { + fn clone(&self) -> Self { + Self { + run_array: self.run_array, + values: self.values, + } + } +} + +impl<'a, R: RunEndIndexType, V> Copy for TypedRunArray<'a, R, V> {} + +impl<'a, R: RunEndIndexType, V> std::fmt::Debug for TypedRunArray<'a, R, V> { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + writeln!(f, "TypedRunArray({:?})", self.run_array) + } +} + +impl<'a, R: RunEndIndexType, V> TypedRunArray<'a, R, V> { + /// Returns the run_ends of this [`TypedRunArray`] + pub fn run_ends(&self) -> &'a PrimitiveArray<R> { + self.run_array.run_ends() + } + + /// Returns the values of this [`TypedRunArray`] + pub fn values(&self) -> &'a V { + self.values + } + + /// Returns index to the physcial array for the given index to the logical array. + /// Performs a binary search on the run_ends array for the input index. + #[inline] + pub fn get_physical_index(&self, logical_index: usize) -> Option<usize> { + if logical_index >= self.run_array.len() { + return None; + } + let mut st: usize = 0; Review Comment: I wonder if we could use [`binary_search`](https://doc.rust-lang.org/std/primitive.slice.html#method.binary_search) on `self.run_array.values()` ########## arrow-array/src/run_iterator.rs: ########## @@ -0,0 +1,271 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Idiomatic iterator for [`RunArray`](crate::Array) + +use arrow_buffer::ArrowNativeType; + +use crate::{array::ArrayAccessor, types::RunEndIndexType, Array, TypedRunArray}; + +/// The [`RunArrayIter`] provides an idiomatic way to iterate over the run array. +/// It returns Some(T) if there is a value or None if the value is null. +/// +/// The iterator comes with a cost as it has to iterate over three arrays to determine +/// the value to be returned. The run_ends array is used to determine the index of the value. +/// The nulls array is used to determine if the value is null and the values array is used to +/// get the value. +/// +/// Unlike other iterators in this crate, [`RunArrayIter`] does not use [`ArrayAccessor`] +/// because the run array accessor does binary search to access each value which is too slow. +/// The run array iterator can determine the next value in constant time. +/// +#[derive(Debug)] +pub struct RunArrayIter<'a, R, V> +where + R: RunEndIndexType, + V: Sync + Send, + &'a V: ArrayAccessor, + <&'a V as ArrayAccessor>::Item: Default, +{ + array: TypedRunArray<'a, R, V>, + current_logical: usize, + current_physical: usize, + current_end_logical: usize, + current_end_physical: usize, +} + +impl<'a, R, V> RunArrayIter<'a, R, V> +where + R: RunEndIndexType, + V: Sync + Send, + &'a V: ArrayAccessor, + <&'a V as ArrayAccessor>::Item: Default, +{ + /// create a new iterator + pub fn new(array: TypedRunArray<'a, R, V>) -> Self { + let logical_len = array.len(); + let physical_len: usize = array.values().len(); + RunArrayIter { + array, + current_logical: 0, + current_physical: 0, + current_end_logical: logical_len, + current_end_physical: physical_len, + } + } +} + +impl<'a, R, V> Iterator for RunArrayIter<'a, R, V> +where + R: RunEndIndexType, + V: Sync + Send, + &'a V: ArrayAccessor, + <&'a V as ArrayAccessor>::Item: Default, +{ + type Item = Option<<&'a V as ArrayAccessor>::Item>; + + #[inline] + fn next(&mut self) -> Option<Self::Item> { + if self.current_logical == self.current_end_logical { + return None; + } + // If current logical index is greater than current run end index then increment + // the physical index. + if self.current_logical + >= self + .array + .run_ends() + .value(self.current_physical) + .as_usize() + { + // As the run_ends is expected to be strictly increasing, there + // should be at least one logical entry in one physical entry. Because of this + // reason the next value can be accessed by incrementing physical index once. + self.current_physical += 1; + } + if self.array.values().is_null(self.current_physical) { + self.current_logical += 1; + Some(None) + } else { + self.current_logical += 1; + // Safety: + // The self.current_physical is kept within bounds of self.current_logical. + // The self.current_logical will not go out of bounds because of the check + // `self.current_logical = self.current_end_logical` above. + unsafe { + Some(Some( + self.array.values().value_unchecked(self.current_physical), + )) + } + } + } + + fn size_hint(&self) -> (usize, Option<usize>) { + ( + self.current_end_logical - self.current_logical, + Some(self.current_end_logical - self.current_logical), + ) + } +} + +impl<'a, R, V> DoubleEndedIterator for RunArrayIter<'a, R, V> +where + R: RunEndIndexType, + V: Sync + Send, + &'a V: ArrayAccessor, + <&'a V as ArrayAccessor>::Item: Default, +{ + fn next_back(&mut self) -> Option<Self::Item> { + if self.current_end_logical == self.current_logical { Review Comment: FWIW it might be clear to reduce the nesting with some early `return`, don't feel strongly ########## arrow-array/src/array/run_array.rs: ########## @@ -274,15 +296,191 @@ pub type Int32RunArray = RunArray<Int32Type>; /// ``` pub type Int64RunArray = RunArray<Int64Type>; +/// A strongly-typed wrapper around a [`RunArray`] that implements [`ArrayAccessor`] +/// and [`IntoIterator`] allowing fast access to its elements +/// +/// ``` +/// use arrow_array::{RunArray, StringArray, types::Int32Type}; +/// +/// let orig = ["a", "b", "a", "b"]; +/// let ree_array = RunArray::<Int32Type>::from_iter(orig); +/// +/// // `TypedRunArray` allows you to access the values directly +/// let typed = ree_array.downcast_ref::<StringArray>().unwrap(); +/// +/// for (maybe_val, orig) in typed.into_iter().zip(orig) { +/// assert_eq!(maybe_val.unwrap(), orig) +/// } +/// ``` +pub struct TypedRunArray<'a, R: RunEndIndexType, V> { + /// The run array + run_array: &'a RunArray<R>, + + /// The values of the run_array + values: &'a V, +} + +// Manually implement `Clone` to avoid `V: Clone` type constraint +impl<'a, R: RunEndIndexType, V> Clone for TypedRunArray<'a, R, V> { + fn clone(&self) -> Self { + Self { + run_array: self.run_array, + values: self.values, + } + } +} + +impl<'a, R: RunEndIndexType, V> Copy for TypedRunArray<'a, R, V> {} + +impl<'a, R: RunEndIndexType, V> std::fmt::Debug for TypedRunArray<'a, R, V> { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + writeln!(f, "TypedRunArray({:?})", self.run_array) + } +} + +impl<'a, R: RunEndIndexType, V> TypedRunArray<'a, R, V> { + /// Returns the run_ends of this [`TypedRunArray`] + pub fn run_ends(&self) -> &'a PrimitiveArray<R> { + self.run_array.run_ends() + } + + /// Returns the values of this [`TypedRunArray`] + pub fn values(&self) -> &'a V { + self.values + } + + /// Returns index to the physcial array for the given index to the logical array. + /// Performs a binary search on the run_ends array for the input index. + #[inline] + pub fn get_physical_index(&self, logical_index: usize) -> Option<usize> { + if logical_index >= self.run_array.len() { + return None; + } + let mut st: usize = 0; + let mut en: usize = self.run_ends().len(); + while st + 1 < en { + let mid: usize = (st + en) / 2; + if logical_index + < unsafe { + // Safety: + // The value of mid will always be between 1 and len - 1, + // where len is length of run ends array. + // This is based on the fact that `st` starts with 0 and + // `en` starts with len. The condition `st + 1 < en` ensures + // `st` and `en` differs atleast by two. So the value of `mid` + // will never be either `st` or `en` + self.run_ends().value_unchecked(mid - 1).as_usize() + } + { + en = mid + } else { + st = mid + } + } + Some(st) + } +} + +impl<'a, R: RunEndIndexType, V: Sync> Array for TypedRunArray<'a, R, V> { + fn as_any(&self) -> &dyn Any { + self.run_array + } + + fn data(&self) -> &ArrayData { + &self.run_array.data + } + + fn into_data(self) -> ArrayData { + self.run_array.into_data() + } +} + +// Array accessor converts the index of logical array to the index of the physical array +// using binary search. The time complexity is O(log N) where N is number of runs. +impl<'a, R, V> ArrayAccessor for TypedRunArray<'a, R, V> +where + R: RunEndIndexType, + V: Sync + Send, + &'a V: ArrayAccessor, + <&'a V as ArrayAccessor>::Item: Default, +{ + type Item = <&'a V as ArrayAccessor>::Item; + + fn value(&self, logical_index: usize) -> Self::Item { + assert!( + logical_index < self.len(), + "Trying to access an element at index {} from a TypedRunArray of length {}", + logical_index, + self.len() + ); + unsafe { self.value_unchecked(logical_index) } + } + + unsafe fn value_unchecked(&self, logical_index: usize) -> Self::Item { + let physical_index = self.get_physical_index(logical_index).unwrap(); + self.values().value_unchecked(physical_index) + } +} + +impl<'a, R, V> IntoIterator for TypedRunArray<'a, R, V> +where + R: RunEndIndexType, + V: Sync + Send, + &'a V: ArrayAccessor, + <&'a V as ArrayAccessor>::Item: Default, +{ + type Item = Option<<&'a V as ArrayAccessor>::Item>; + type IntoIter = RunArrayIter<'a, R, V>; + + fn into_iter(self) -> Self::IntoIter { + RunArrayIter::new(self) + } +} + #[cfg(test)] mod tests { use std::sync::Arc; + use rand::seq::SliceRandom; + use rand::thread_rng; + use rand::Rng; + use super::*; use crate::builder::PrimitiveRunBuilder; use crate::types::{Int16Type, Int32Type, Int8Type, UInt32Type}; use crate::{Array, Int16Array, Int32Array, StringArray}; + fn build_input_array(approx_size: usize) -> Vec<Option<i32>> { + let mut seed: Vec<Option<i32>> = vec![ Review Comment: Some comments might help explain what this is doing, i.e. creating runs of the same value sourced from `seed` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org