Big thanks to both of you, this was very helpful. I moved my loop down to pure C++ and replaced my GetScalarValue calls with array type casts as Weston suggested. Time on the first 290GB of data went from 107min to 21min! Same functionality. You were right, I was introducing a lot of overhead with the allocations.
As you predicted, the string_view as map keys didn't quite work out of the box. This may be related ( https://stackoverflow.com/questions/35525777/use-of-string-view-for-map-lookup ) but I haven't been able to try it yet because of some Cython silliness. Currently I'm just using the std::string from GetString(). My data do not start out with any grouping by `name`. It's sorted by timestamp and it's important that I process it in order of timestamp, so the `name` column is close to uniformly distributed. Although the grouping by `name` is not a stateful process, I am in parallel computing some stateful functions. At some point you're right this dataset is going to become large enough that I need to learn a distributed compute framework and break these tasks up. For now it's convenient that the processing of stored data is the same code as the processing of real-time production data streams. I haven't used Cython in ~6 years and it's been quite a pain. I'm rusty on my C++ as well, but I found the same functionality much more straight-forward to implement purely in C++. Cython is 75% Python, 25% C/C++, and 100% neither. Differences in imports and syntax can make it very challenging to reproduce some perfectly functioning C++ code in Cython. I'll try to keep the Cython layer very thin and keep everything in either Python or C++. On Wed, Apr 14, 2021 at 8:05 PM, Micah Kornfield < [email protected] > wrote: > > One more thought, at 7TB your data starts entering the realm of "big". > You might consider doing the grouping with a distributed compute framework > outputting then outputting to plasma. > > On Wed, Apr 14, 2021 at 7:39 PM Micah Kornfield < emkornfield@ gmail. com ( > [email protected] ) > wrote: > > >> +1 to everything Weston said. >> >> >> From your comments about Gandiva, it sounded like you were OK with the >> filter based approach but maybe you had a different idea of using Gandiva? >> >> >> >> I believe "filter" also has optimizations if your data was already mostly >> grouped by name. >> >> >> I agree algorithmically, the map approach is probably optimal but as >> Weston alluded to there are hidden constant overheads that might even out >> with different approaches. >> >> >> Also if your data is already grouped using the dataset expression might be >> fairly efficient since it does row group pruning based on predicates on >> the underlying parquet data. >> >> >> -Micah >> >> On Wed, Apr 14, 2021 at 7:13 PM Weston Pace < weston. pace@ gmail. com ( >> [email protected] ) > wrote: >> >> >>> Correct, the "group by" operation you're looking for doesn't quite exist >>> (externally) yet (others can correct me if I'm wrong here). ARROW-3978 ( >>> https://issues.apache.org/jira/browse/ARROW-3978 ) sometimes gets brought >>> up in reference to this. There are some things (e.g. C++ query execution >>> engine ( >>> https://docs.google.com/document/d/1AyTdLU-RxA-Gsb9EsYnrQrmqPMOYMfPlWwxRi1Is1tQ/edit?usp=sharing >>> ) ) in the works which would provide this. There is also an internal >>> implementation (arrow::compute::internal::Grouper) that is used for >>> computing partitions but I believe it was intentionally kept internal, >>> others may be able to explain more the reason. >>> >>> >>> Expressions are (or will soon be) built on compute so using them is unable >>> to provide much benefit over what is in compute. I want to say the best >>> approach you could get in what is in compute for 4.0.0 is O(num_rows * >>> num_unique_names). To create the mask you would use the equals function. >>> So the whole operation would be... >>> >>> >>> 1) Use unique to get the possible string values >>> 2) For each string value >>> a) Use equals to get a mask >>> b) Use filter to get a subarray >>> >>> >>> >>> So what you have may be a pretty reasonable workaround. I'd recommend >>> comparing with what you get from compute just for the sake of comparison. >>> >>> >>> So there are a few minor optimizations you can make that shouldn't be too >>> much harder. You want to avoid GetScalar if you can as it will make an >>> allocation / copy for every item you access. Grab the column from the >>> record batch and cast it to the appropriate typed array (this is only easy >>> because it appears you have a fairly rigid schema). This will allow you >>> to access values directly without wrapping them in a scalar. For example, >>> in C++ (I'll leave the cython to you :)) it would look like... >>> >>> >>> auto arr = >>> std::dynamic_pointer_cast<arrow::DoubleArray>(record_batch->column(0)); >>> std::cout << arr->Value(0) << std::endl; >>> >>> >>> For the string array I believe it is... >>> >>> >>> auto str_arr = >>> std::dynamic_pointer_cast<arrow::StringArray>(record_batch->column(0)); >>> arrow::util::string_view view = arr->GetView(0); >>> >>> >>> It may take a slight bit of finesse to figure out how to get >>> arrow::util::string_view to work with map but it should be doable. There >>> is also GetString which returns std::string which should only be slightly >>> more expensive and GetValue which returns a uint8_t* and writes the length >>> into an out parameter. >>> >>> >>> On Wed, Apr 14, 2021 at 3:15 PM Xander Dunn < xander@ xander. ai ( >>> [email protected] ) > wrote: >>> >>> >>>> Thanks, I did try a few things with pyarrow.compute. However, the >>>> pyarrow.compute.filter interface indicates that it takes a boolean mask to >>>> do the filtering: https:/ / arrow. apache. org/ docs/ python/ generated/ >>>> pyarrow. >>>> compute. filter. html ( >>>> https://arrow.apache.org/docs/python/generated/pyarrow.compute.filter.html >>>> ) >>>> >>>> >>>> >>>> But it doesn't actually help me create the mask? I'm back to iterating >>>> through the rows and now I would need to create a boolean array of size >>>> (num_rows) for every unique value of `name`. >>>> >>>> >>>> >>>> I saw in the dataset docs ( https:/ / arrow. apache. org/ docs/ python/ >>>> dataset. >>>> html ( https://arrow.apache.org/docs/python/dataset.html ) ) some >>>> discussion on Expressions, such as `ds.field("name") == "Xander"`. >>>> However, I don't see a way of computing such an expression without loading >>>> the entire dataset into memory with `dataset.to_table()`, which doesn't >>>> work for my dataset because it's many times larger than RAM. Can an >>>> Expression be computed on a RecordBatch? >>>> >>>> >>>> >>>> But it's also hard to foresee how applying filter for each unique value of >>>> `name` will be more computationally efficient. The loop I posted above is >>>> O(num_rows), whereas applying filter for each name would be O(num_rows * >>>> num_unique_names). It could still be faster if my loop code is poorly >>>> implemented or if filter is multi-threaded. >>>> >>>> >>>> >>>> >>>> On Wed, Apr 14, 2021 at 4:45 PM, Micah Kornfield < emkornfield@ gmail. com >>>> ( [email protected] ) > wrote: >>>> >>>>> Have you looked at the pyarrow compute functions [1][2]? >>>>> >>>>> >>>>> Unique and filter seems like they would help. >>>>> >>>>> >>>>> [1] https:/ / arrow. apache. org/ docs/ python/ compute. >>>>> html?highlight=pyarrow%20compute >>>>> ( >>>>> https://arrow.apache.org/docs/python/compute.html?highlight=pyarrow%20compute >>>>> ) >>>>> [2] https:/ / arrow. apache. org/ docs/ cpp/ compute. >>>>> html#compute-function-list >>>>> ( https://arrow.apache.org/docs/cpp/compute.html#compute-function-list ) >>>>> >>>>> On Wed, Apr 14, 2021 at 2:02 PM Xander Dunn < xander@ xander. ai ( >>>>> [email protected] ) > wrote: >>>>> >>>>> >>>>>> Thanks Weston, >>>>>> >>>>>> >>>>>> >>>>>> Performance is paramount here, I'm streaming through 7TB of data. >>>>>> >>>>>> >>>>>> >>>>>> I actually need to separate the data based on the value of the `name` >>>>>> column. For every unique value of `name`, I need a batch of those rows. I >>>>>> tried using gandiva's filter function but can't get gandiva installed on >>>>>> Ubuntu (see my earlier thread " [Python] pyarrow.gandiva unavailable on >>>>>> Ubuntu? " on this mailing list). >>>>>> >>>>>> >>>>>> >>>>>> Aside from that, I'm not sure of a way to separate the data faster than >>>>>> iterating through every row and placing the values into a map keyed on >>>>>> `name`: >>>>>> >>>>>> ``` >>>>>> >>>>>> cdef struct myUpdateStruct: >>>>>> >>>>>> double value >>>>>> >>>>>> int64_t checksum >>>>>> >>>>>> >>>>>> cdef iterate_dataset(): >>>>>> cdef map[c_string, deque[myUpdateStruct]] myUpdates >>>>>> >>>>>> cdef shared_ptr[CRecordBatch] batch # This is populated by a scanner of >>>>>> .parquet files >>>>>> >>>>>> cdef int64_t batch_row_index = 0 >>>>>> while batch_row_index < batch.get().num_rows(): >>>>>> >>>>>> name_buffer = (<CBaseBinaryScalar*>GetResultValue(names.get().\ >>>>>> >>>>>> GetScalar(batch_row_index)).get()).value >>>>>> >>>>>> name = <char *>name_buffer.get().data() >>>>>> >>>>>> value = (<CDoubleScalar*>GetResultValue(values.get().\ >>>>>> >>>>>> GetScalar(batch_row_index)).get()).value >>>>>> >>>>>> checksum = (<CInt64Scalar*>GetResultValue(checksums.get().\ >>>>>> >>>>>> GetScalar(batch_row_index)).get()).value >>>>>> >>>>>> newUpdate = myUpdateStruct(value, checksum) >>>>>> >>>>>> if myUpdates.count(name) <= 0: >>>>>> >>>>>> myUpdates[name] = deque[myUpdateStruct]() >>>>>> >>>>>> myUpdates[name].push_front(newUpdate) >>>>>> >>>>>> if myUpdates[name].size() > 1024: >>>>>> >>>>>> myUpdates[name].pop_back() >>>>>> >>>>>> batch_row_index += 1 >>>>>> ``` >>>>>> >>>>>> This takes 107minutes to iterate through the first 290GB of data. Without >>>>>> accessing or filtering the data in any way it takes only 12min to read >>>>>> all >>>>>> the .parquet files into RecordBatches and place them into Plasma. >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Wed, Apr 14, 2021 at 12:57 PM, Weston Pace < weston. pace@ gmail. com >>>>>> ( >>>>>> [email protected] ) > wrote: >>>>>> >>>>>>> If you don't need the performance, you could stay in python (use >>>>>>> to_pylist() for the array or as_py() for scalars). >>>>>>> >>>>>>> >>>>>>> If you do need the performance then you're probably better served >>>>>>> getting >>>>>>> the buffers and operating on them directly. Or, even better, making use >>>>>>> of the compute kernels: >>>>>>> >>>>>>> >>>>>>> arr = pa.array(['abc', 'ab', 'Xander', None], pa.string()) >>>>>>> desired = pa.array(['Xander'], pa.string()) >>>>>>> pc.any(pc.is_in(arr, value_set=desired)).as_py() # True >>>>>>> >>>>>>> >>>>>>> On Wed, Apr 14, 2021 at 6:29 AM Xander Dunn < xander@ xander. ai ( >>>>>>> [email protected] ) > wrote: >>>>>>> >>>>>>> >>>>>>>> This works for getting a c string out of the CScalar: >>>>>>>> ``` >>>>>>>> >>>>>>>> name_buffer = (<CBaseBinaryScalar*>GetResultValue(names.get().\ >>>>>>>> >>>>>>>> GetScalar(batch_row_index)).get()).value >>>>>>>> >>>>>>>> name = <char *>name_buffer.get().data() >>>>>>>> >>>>>>>> ``` >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Tue, Apr 13, 2021 at 10:43 PM, Xander Dunn < xander@ xander. ai ( >>>>>>>> [email protected] ) > wrote: >>>>>>>> >>>>>>>>> Here is an example code snippet from a .pyx file that successfully >>>>>>>>> iterates through a CRecordBatch and ensures that the timestamps are >>>>>>>>> ascending: >>>>>>>>> >>>>>>>>> ``` >>>>>>>>> >>>>>>>>> while batch_row_index < batch.get().num_rows(): >>>>>>>>> >>>>>>>>> timestamp = GetResultValue(times.get().GetScalar(batch_row_index)) >>>>>>>>> >>>>>>>>> new_timestamp = <CTimestampScalar*>timestamp.get() >>>>>>>>> >>>>>>>>> current_timestamp = timestamps[name] >>>>>>>>> >>>>>>>>> if current_timestamp > new_timestamp.value: >>>>>>>>> >>>>>>>>> abort() >>>>>>>>> >>>>>>>>> batch_row_index += 1 >>>>>>>>> >>>>>>>>> ``` >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> However, I'm having difficulty operating on the values in a column of >>>>>>>>> string type. Unlike CTimestampScalar, there is no CStringScalar. >>>>>>>>> Although >>>>>>>>> there is a StringScalar type in C++, it isn't defined in the Cython >>>>>>>>> interface. There is a `CStringType` and a `c_string` type. >>>>>>>>> >>>>>>>>> ``` >>>>>>>>> >>>>>>>>> while batch_row_index < batch.get().num_rows(): >>>>>>>>> >>>>>>>>> name = GetResultValue(names.get().GetScalar(batch_row_index)) >>>>>>>>> >>>>>>>>> name_string = <CStringType*>name.get() # This is wrong >>>>>>>>> >>>>>>>>> printf("%s\n", name_string) # This prints garbage >>>>>>>>> >>>>>>>>> if name_string == b"Xander": # Doesn't work >>>>>>>>> >>>>>>>>> print("found it") >>>>>>>>> >>>>>>>>> batch_row_index += 1 >>>>>>>>> >>>>>>>>> ``` >>>>>>>>> >>>>>>>>> How do I get the string value as a C type and compare it to other >>>>>>>>> strings? >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> >>>>>>>>> Xander >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>> >>>>> >>>> >>>> >>> >>> >> >> > >
