Have you looked at the pyarrow compute functions [1][2]? Unique and filter seems like they would help.
[1] https://arrow.apache.org/docs/python/compute.html?highlight=pyarrow%20compute [2] https://arrow.apache.org/docs/cpp/compute.html#compute-function-list On Wed, Apr 14, 2021 at 2:02 PM Xander Dunn <[email protected]> wrote: > Thanks Weston, > > Performance is paramount here, I'm streaming through 7TB of data. > > I actually need to separate the data based on the value of the `name` > column. For every unique value of `name`, I need a batch of those rows. I > tried using gandiva's filter function but can't get gandiva installed on > Ubuntu (see my earlier thread "[Python] pyarrow.gandiva unavailable on > Ubuntu?" on this mailing list). > > Aside from that, I'm not sure of a way to separate the data faster than > iterating through every row and placing the values into a map keyed on > `name`: > ``` > cdef struct myUpdateStruct: > double value > int64_t checksum > > cdef iterate_dataset(): > cdef map[c_string, deque[myUpdateStruct]] myUpdates > cdef shared_ptr[CRecordBatch] batch # This is populated by a scanner > of .parquet files > cdef int64_t batch_row_index = 0 > while batch_row_index < batch.get().num_rows(): > name_buffer = (<CBaseBinaryScalar*>GetResultValue(names.get().\ > GetScalar(batch_row_index)).get()).value > name = <char *>name_buffer.get().data() > value = (<CDoubleScalar*>GetResultValue(values.get().\ > GetScalar(batch_row_index)).get()).value > checksum = (<CInt64Scalar*>GetResultValue(checksums.get().\ > GetScalar(batch_row_index)).get()).value > newUpdate = myUpdateStruct(value, checksum) > if myUpdates.count(name) <= 0: > myUpdates[name] = deque[myUpdateStruct]() > myUpdates[name].push_front(newUpdate) > if myUpdates[name].size() > 1024: > myUpdates[name].pop_back() > batch_row_index += 1 > ``` > This takes 107minutes to iterate through the first 290GB of data. Without > accessing or filtering the data in any way it takes only 12min to read all > the .parquet files into RecordBatches and place them into Plasma. > > > On Wed, Apr 14, 2021 at 12:57 PM, Weston Pace <[email protected]> > wrote: > >> If you don't need the performance, you could stay in python (use >> to_pylist() for the array or as_py() for scalars). >> >> If you do need the performance then you're probably better served getting >> the buffers and operating on them directly. Or, even better, making use of >> the compute kernels: >> >> arr = pa.array(['abc', 'ab', 'Xander', None], pa.string()) >> desired = pa.array(['Xander'], pa.string()) >> pc.any(pc.is_in(arr, value_set=desired)).as_py() # True >> >> On Wed, Apr 14, 2021 at 6:29 AM Xander Dunn <[email protected]> wrote: >> >>> This works for getting a c string out of the CScalar: >>> ``` >>> name_buffer = >>> (<CBaseBinaryScalar*>GetResultValue(names.get().\ >>> GetScalar(batch_row_index)).get()).value >>> name = <char *>name_buffer.get().data() >>> ``` >>> >>> >>> On Tue, Apr 13, 2021 at 10:43 PM, Xander Dunn <[email protected]> wrote: >>> >>>> Here is an example code snippet from a .pyx file that successfully >>>> iterates through a CRecordBatch and ensures that the timestamps are >>>> ascending: >>>> ``` >>>> while batch_row_index < batch.get().num_rows(): >>>> timestamp = >>>> GetResultValue(times.get().GetScalar(batch_row_index)) >>>> new_timestamp = <CTimestampScalar*>timestamp.get() >>>> current_timestamp = timestamps[name] >>>> if current_timestamp > new_timestamp.value: >>>> abort() >>>> batch_row_index += 1 >>>> ``` >>>> >>>> However, I'm having difficulty operating on the values in a column of >>>> string type. Unlike CTimestampScalar, there is no CStringScalar. Although >>>> there is a StringScalar type in C++, it isn't defined in the Cython >>>> interface. There is a `CStringType` and a `c_string` type. >>>> ``` >>>> while batch_row_index < batch.get().num_rows(): >>>> name = GetResultValue(names.get().GetScalar(batch_row_index)) >>>> name_string = <CStringType*>name.get() # This is wrong >>>> printf("%s\n", name_string) # This prints garbage >>>> if name_string == b"Xander": # Doesn't work >>>> print("found it") >>>> batch_row_index += 1 >>>> ``` >>>> How do I get the string value as a C type and compare it to other >>>> strings? >>>> >>>> Thanks, >>>> Xander >>>> >>> >
