> > Cython is 75% Python, 25% C/C++, and 100% neither. Differences in imports > and syntax can make it very challenging to reproduce some perfectly > functioning C++ code in Cython. I'll try to keep the Cython layer very thin > and keep everything in either Python or C++.
This sounds like a sane approach to me :) On Thu, Apr 15, 2021 at 11:26 AM Xander Dunn <[email protected]> wrote: > Big thanks to both of you, this was very helpful. > > I moved my loop down to pure C++ and replaced my GetScalarValue calls with > array type casts as Weston suggested. Time on the first 290GB of data went > from 107min to 21min! Same functionality. You were right, I was introducing > a lot of overhead with the allocations. > > As you predicted, the string_view as map keys didn't quite work out of the > box. This may be related ( > https://stackoverflow.com/questions/35525777/use-of-string-view-for-map-lookup) > but I haven't been able to try it yet because of some Cython silliness. > Currently I'm just using the std::string from GetString(). > > My data do not start out with any grouping by `name`. It's sorted by > timestamp and it's important that I process it in order of timestamp, so > the `name` column is close to uniformly distributed. Although the grouping > by `name` is not a stateful process, I am in parallel computing some > stateful functions. At some point you're right this dataset is going to > become large enough that I need to learn a distributed compute framework > and break these tasks up. For now it's convenient that the processing of > stored data is the same code as the processing of real-time production data > streams. > > I haven't used Cython in ~6 years and it's been quite a pain. I'm rusty on > my C++ as well, but I found the same functionality much more > straight-forward to implement purely in C++. Cython is 75% Python, 25% > C/C++, and 100% neither. Differences in imports and syntax can make it very > challenging to reproduce some perfectly functioning C++ code in Cython. > I'll try to keep the Cython layer very thin and keep everything in either > Python or C++. > > > On Wed, Apr 14, 2021 at 8:05 PM, Micah Kornfield <[email protected]> > wrote: > >> One more thought, at 7TB your data starts entering the realm of "big". >> You might consider doing the grouping with a distributed compute framework >> outputting then outputting to plasma. >> >> On Wed, Apr 14, 2021 at 7:39 PM Micah Kornfield <[email protected]> >> wrote: >> >>> +1 to everything Weston said. >>> >>> From your comments about Gandiva, it sounded like you were OK with the >>> filter based approach but maybe you had a different idea of using Gandiva? >>> >>> I believe "filter" also has optimizations if your data was already >>> mostly grouped by name. >>> >>> I agree algorithmically, the map approach is probably optimal but as >>> Weston alluded to there are hidden constant overheads that might even out >>> with different approaches. >>> >>> Also if your data is already grouped using the dataset expression might >>> be fairly efficient since it does row group pruning based on predicates on >>> the underlying parquet data. >>> >>> -Micah >>> >>> On Wed, Apr 14, 2021 at 7:13 PM Weston Pace <[email protected]> >>> wrote: >>> >>>> Correct, the "group by" operation you're looking for doesn't quite >>>> exist (externally) yet (others can correct me if I'm wrong here). >>>> ARROW-3978 <https://issues.apache.org/jira/browse/ARROW-3978> >>>> sometimes gets brought up in reference to this. There are some things >>>> (e.g. C++ query execution engine >>>> <https://docs.google.com/document/d/1AyTdLU-RxA-Gsb9EsYnrQrmqPMOYMfPlWwxRi1Is1tQ/edit?usp=sharing>) >>>> in the works which would provide this. There is also an internal >>>> implementation (arrow::compute::internal::Grouper) that is used for >>>> computing partitions but I believe it was intentionally kept internal, >>>> others may be able to explain more the reason. >>>> >>>> Expressions are (or will soon be) built on compute so using them is >>>> unable to provide much benefit over what is in compute. I want to say the >>>> best approach you could get in what is in compute for 4.0.0 is O(num_rows * >>>> num_unique_names). To create the mask you would use the equals function. >>>> So the whole operation would be... >>>> >>>> 1) Use unique to get the possible string values >>>> 2) For each string value >>>> a) Use equals to get a mask >>>> b) Use filter to get a subarray >>>> >>>> So what you have may be a pretty reasonable workaround. I'd recommend >>>> comparing with what you get from compute just for the sake of comparison. >>>> >>>> So there are a few minor optimizations you can make that shouldn't be >>>> too much harder. You want to avoid GetScalar if you can as it will make an >>>> allocation / copy for every item you access. Grab the column from the >>>> record batch and cast it to the appropriate typed array (this is only easy >>>> because it appears you have a fairly rigid schema). This will allow you to >>>> access values directly without wrapping them in a scalar. For example, in >>>> C++ (I'll leave the cython to you :)) it would look like... >>>> >>>> auto arr = >>>> std::dynamic_pointer_cast<arrow::DoubleArray>(record_batch->column(0)); >>>> std::cout << arr->Value(0) << std::endl; >>>> >>>> For the string array I believe it is... >>>> >>>> auto str_arr = >>>> std::dynamic_pointer_cast<arrow::StringArray>(record_batch->column(0)); >>>> arrow::util::string_view view = arr->GetView(0); >>>> >>>> It may take a slight bit of finesse to figure out how to get >>>> arrow::util::string_view to work with map but it should be doable. There >>>> is also GetString which returns std::string which should only be slightly >>>> more expensive and GetValue which returns a uint8_t* and writes the length >>>> into an out parameter. >>>> >>>> On Wed, Apr 14, 2021 at 3:15 PM Xander Dunn <[email protected]> wrote: >>>> >>>>> Thanks, I did try a few things with pyarrow.compute. However, the >>>>> pyarrow.compute.filter interface indicates that it takes a boolean mask to >>>>> do the filtering: >>>>> https://arrow.apache.org/docs/python/generated/pyarrow.compute.filter.html >>>>> >>>>> But it doesn't actually help me create the mask? I'm back to iterating >>>>> through the rows and now I would need to create a boolean array of size >>>>> (num_rows) for every unique value of `name`. >>>>> >>>>> I saw in the dataset docs ( >>>>> https://arrow.apache.org/docs/python/dataset.html) some discussion on >>>>> Expressions, such as `ds.field("name") == "Xander"`. However, I don't see >>>>> a >>>>> way of computing such an expression without loading the entire dataset >>>>> into >>>>> memory with `dataset.to_table()`, which doesn't work for my dataset >>>>> because >>>>> it's many times larger than RAM. Can an Expression be computed on a >>>>> RecordBatch? >>>>> >>>>> But it's also hard to foresee how applying filter for each unique >>>>> value of `name` will be more computationally efficient. The loop I posted >>>>> above is O(num_rows), whereas applying filter for each name would be >>>>> O(num_rows * num_unique_names). It could still be faster if my loop code >>>>> is >>>>> poorly implemented or if filter is multi-threaded. >>>>> >>>>> >>>>> On Wed, Apr 14, 2021 at 4:45 PM, Micah Kornfield < >>>>> [email protected]> wrote: >>>>> >>>>>> Have you looked at the pyarrow compute functions [1][2]? >>>>>> >>>>>> Unique and filter seems like they would help. >>>>>> >>>>>> [1] >>>>>> https://arrow.apache.org/docs/python/compute.html?highlight=pyarrow%20compute >>>>>> [2] >>>>>> https://arrow.apache.org/docs/cpp/compute.html#compute-function-list >>>>>> >>>>>> On Wed, Apr 14, 2021 at 2:02 PM Xander Dunn <[email protected]> wrote: >>>>>> >>>>>>> Thanks Weston, >>>>>>> >>>>>>> Performance is paramount here, I'm streaming through 7TB of data. >>>>>>> >>>>>>> I actually need to separate the data based on the value of the >>>>>>> `name` column. For every unique value of `name`, I need a batch of those >>>>>>> rows. I tried using gandiva's filter function but can't get gandiva >>>>>>> installed on Ubuntu (see my earlier thread "[Python] >>>>>>> pyarrow.gandiva unavailable on Ubuntu?" on this mailing list). >>>>>>> >>>>>>> Aside from that, I'm not sure of a way to separate the data faster >>>>>>> than iterating through every row and placing the values into a map >>>>>>> keyed on >>>>>>> `name`: >>>>>>> ``` >>>>>>> cdef struct myUpdateStruct: >>>>>>> double value >>>>>>> int64_t checksum >>>>>>> >>>>>>> cdef iterate_dataset(): >>>>>>> cdef map[c_string, deque[myUpdateStruct]] myUpdates >>>>>>> cdef shared_ptr[CRecordBatch] batch # This is populated by a >>>>>>> scanner of .parquet files >>>>>>> cdef int64_t batch_row_index = 0 >>>>>>> while batch_row_index < batch.get().num_rows(): >>>>>>> name_buffer = >>>>>>> (<CBaseBinaryScalar*>GetResultValue(names.get().\ >>>>>>> GetScalar(batch_row_index)).get()).value >>>>>>> name = <char *>name_buffer.get().data() >>>>>>> value = (<CDoubleScalar*>GetResultValue(values.get().\ >>>>>>> GetScalar(batch_row_index)).get()).value >>>>>>> checksum = (<CInt64Scalar*>GetResultValue(checksums.get().\ >>>>>>> GetScalar(batch_row_index)).get()).value >>>>>>> newUpdate = myUpdateStruct(value, checksum) >>>>>>> if myUpdates.count(name) <= 0: >>>>>>> myUpdates[name] = deque[myUpdateStruct]() >>>>>>> myUpdates[name].push_front(newUpdate) >>>>>>> if myUpdates[name].size() > 1024: >>>>>>> myUpdates[name].pop_back() >>>>>>> batch_row_index += 1 >>>>>>> ``` >>>>>>> This takes 107minutes to iterate through the first 290GB of data. >>>>>>> Without accessing or filtering the data in any way it takes only 12min >>>>>>> to >>>>>>> read all the .parquet files into RecordBatches and place them into >>>>>>> Plasma. >>>>>>> >>>>>>> >>>>>>> On Wed, Apr 14, 2021 at 12:57 PM, Weston Pace <[email protected] >>>>>>> > wrote: >>>>>>> >>>>>>>> If you don't need the performance, you could stay in python (use >>>>>>>> to_pylist() for the array or as_py() for scalars). >>>>>>>> >>>>>>>> If you do need the performance then you're probably better served >>>>>>>> getting the buffers and operating on them directly. Or, even better, >>>>>>>> making use of the compute kernels: >>>>>>>> >>>>>>>> arr = pa.array(['abc', 'ab', 'Xander', None], pa.string()) >>>>>>>> desired = pa.array(['Xander'], pa.string()) >>>>>>>> pc.any(pc.is_in(arr, value_set=desired)).as_py() # True >>>>>>>> >>>>>>>> On Wed, Apr 14, 2021 at 6:29 AM Xander Dunn <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> This works for getting a c string out of the CScalar: >>>>>>>>> ``` >>>>>>>>> name_buffer = >>>>>>>>> (<CBaseBinaryScalar*>GetResultValue(names.get().\ >>>>>>>>> GetScalar(batch_row_index)).get()).value >>>>>>>>> name = <char *>name_buffer.get().data() >>>>>>>>> ``` >>>>>>>>> >>>>>>>>> >>>>>>>>> On Tue, Apr 13, 2021 at 10:43 PM, Xander Dunn <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Here is an example code snippet from a .pyx file that >>>>>>>>>> successfully iterates through a CRecordBatch and ensures that the >>>>>>>>>> timestamps are ascending: >>>>>>>>>> ``` >>>>>>>>>> while batch_row_index < batch.get().num_rows(): >>>>>>>>>> timestamp = >>>>>>>>>> GetResultValue(times.get().GetScalar(batch_row_index)) >>>>>>>>>> new_timestamp = <CTimestampScalar*>timestamp.get() >>>>>>>>>> current_timestamp = timestamps[name] >>>>>>>>>> if current_timestamp > new_timestamp.value: >>>>>>>>>> abort() >>>>>>>>>> batch_row_index += 1 >>>>>>>>>> ``` >>>>>>>>>> >>>>>>>>>> However, I'm having difficulty operating on the values in a >>>>>>>>>> column of string type. Unlike CTimestampScalar, there is no >>>>>>>>>> CStringScalar. >>>>>>>>>> Although there is a StringScalar type in C++, it isn't defined in the >>>>>>>>>> Cython interface. There is a `CStringType` and a `c_string` type. >>>>>>>>>> ``` >>>>>>>>>> while batch_row_index < batch.get().num_rows(): >>>>>>>>>> name = >>>>>>>>>> GetResultValue(names.get().GetScalar(batch_row_index)) >>>>>>>>>> name_string = <CStringType*>name.get() # This is wrong >>>>>>>>>> printf("%s\n", name_string) # This prints garbage >>>>>>>>>> if name_string == b"Xander": # Doesn't work >>>>>>>>>> print("found it") >>>>>>>>>> batch_row_index += 1 >>>>>>>>>> ``` >>>>>>>>>> How do I get the string value as a C type and compare it to other >>>>>>>>>> strings? >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> Xander >>>>>>>>>> >>>>>>>>> >
