Thanks Weston, Performance is paramount here, I'm streaming through 7TB of data.
I actually need to separate the data based on the value of the `name` column. For every unique value of `name`, I need a batch of those rows. I tried using gandiva's filter function but can't get gandiva installed on Ubuntu (see my earlier thread " [Python] pyarrow.gandiva unavailable on Ubuntu? " on this mailing list). Aside from that, I'm not sure of a way to separate the data faster than iterating through every row and placing the values into a map keyed on `name`: ``` cdef struct myUpdateStruct: double value int64_t checksum cdef iterate_dataset(): cdef map[c_string, deque[myUpdateStruct]] myUpdates cdef shared_ptr[CRecordBatch] batch # This is populated by a scanner of .parquet files cdef int64_t batch_row_index = 0 while batch_row_index < batch.get().num_rows(): name_buffer = (<CBaseBinaryScalar*>GetResultValue(names.get().\ GetScalar(batch_row_index)).get()).value name = <char *>name_buffer.get().data() value = (<CDoubleScalar*>GetResultValue(values.get().\ GetScalar(batch_row_index)).get()).value checksum = (<CInt64Scalar*>GetResultValue(checksums.get().\ GetScalar(batch_row_index)).get()).value newUpdate = myUpdateStruct(value, checksum) if myUpdates.count(name) <= 0: myUpdates[name] = deque[myUpdateStruct]() myUpdates[name].push_front(newUpdate) if myUpdates[name].size() > 1024: myUpdates[name].pop_back() batch_row_index += 1 ``` This takes 107minutes to iterate through the first 290GB of data. Without accessing or filtering the data in any way it takes only 12min to read all the .parquet files into RecordBatches and place them into Plasma. On Wed, Apr 14, 2021 at 12:57 PM, Weston Pace < [email protected] > wrote: > > If you don't need the performance, you could stay in python (use > to_pylist() for the array or as_py() for scalars). > > > If you do need the performance then you're probably better served getting > the buffers and operating on them directly. Or, even better, making use > of the compute kernels: > > > arr = pa.array(['abc', 'ab', 'Xander', None], pa.string()) > desired = pa.array(['Xander'], pa.string()) > pc.any(pc.is_in(arr, value_set=desired)).as_py() # True > > > On Wed, Apr 14, 2021 at 6:29 AM Xander Dunn < xander@ xander. ai ( > [email protected] ) > wrote: > > >> This works for getting a c string out of the CScalar: >> ``` >> >> name_buffer = (<CBaseBinaryScalar*>GetResultValue(names.get().\ >> >> GetScalar(batch_row_index)).get()).value >> >> name = <char *>name_buffer.get().data() >> >> ``` >> >> >> >> On Tue, Apr 13, 2021 at 10:43 PM, Xander Dunn < xander@ xander. ai ( >> [email protected] ) > wrote: >> >>> Here is an example code snippet from a .pyx file that successfully >>> iterates through a CRecordBatch and ensures that the timestamps are >>> ascending: >>> >>> ``` >>> >>> while batch_row_index < batch.get().num_rows(): >>> >>> timestamp = GetResultValue(times.get().GetScalar(batch_row_index)) >>> >>> new_timestamp = <CTimestampScalar*>timestamp.get() >>> >>> current_timestamp = timestamps[name] >>> >>> if current_timestamp > new_timestamp.value: >>> >>> abort() >>> >>> batch_row_index += 1 >>> >>> ``` >>> >>> >>> >>> However, I'm having difficulty operating on the values in a column of >>> string type. Unlike CTimestampScalar, there is no CStringScalar. Although >>> there is a StringScalar type in C++, it isn't defined in the Cython >>> interface. There is a `CStringType` and a `c_string` type. >>> >>> ``` >>> >>> while batch_row_index < batch.get().num_rows(): >>> >>> name = GetResultValue(names.get().GetScalar(batch_row_index)) >>> >>> name_string = <CStringType*>name.get() # This is wrong >>> >>> printf("%s\n", name_string) # This prints garbage >>> >>> if name_string == b"Xander": # Doesn't work >>> >>> print("found it") >>> >>> batch_row_index += 1 >>> >>> ``` >>> >>> How do I get the string value as a C type and compare it to other strings? >>> >>> >>> >>> >>> Thanks, >>> >>> Xander >>> >> >> > >
