One more thought, at 7TB your data starts entering the realm of "big". You might consider doing the grouping with a distributed compute framework outputting then outputting to plasma.
On Wed, Apr 14, 2021 at 7:39 PM Micah Kornfield <[email protected]> wrote: > +1 to everything Weston said. > > From your comments about Gandiva, it sounded like you were OK with the > filter based approach but maybe you had a different idea of using Gandiva? > > I believe "filter" also has optimizations if your data was already mostly > grouped by name. > > I agree algorithmically, the map approach is probably optimal but as > Weston alluded to there are hidden constant overheads that might even out > with different approaches. > > Also if your data is already grouped using the dataset expression might be > fairly efficient since it does row group pruning based on predicates on the > underlying parquet data. > > -Micah > > On Wed, Apr 14, 2021 at 7:13 PM Weston Pace <[email protected]> wrote: > >> Correct, the "group by" operation you're looking for doesn't quite exist >> (externally) yet (others can correct me if I'm wrong here). ARROW-3978 >> <https://issues.apache.org/jira/browse/ARROW-3978> sometimes gets >> brought up in reference to this. There are some things (e.g. C++ query >> execution engine >> <https://docs.google.com/document/d/1AyTdLU-RxA-Gsb9EsYnrQrmqPMOYMfPlWwxRi1Is1tQ/edit?usp=sharing>) >> in the works which would provide this. There is also an internal >> implementation (arrow::compute::internal::Grouper) that is used for >> computing partitions but I believe it was intentionally kept internal, >> others may be able to explain more the reason. >> >> Expressions are (or will soon be) built on compute so using them is >> unable to provide much benefit over what is in compute. I want to say the >> best approach you could get in what is in compute for 4.0.0 is O(num_rows * >> num_unique_names). To create the mask you would use the equals function. >> So the whole operation would be... >> >> 1) Use unique to get the possible string values >> 2) For each string value >> a) Use equals to get a mask >> b) Use filter to get a subarray >> >> So what you have may be a pretty reasonable workaround. I'd recommend >> comparing with what you get from compute just for the sake of comparison. >> >> So there are a few minor optimizations you can make that shouldn't be too >> much harder. You want to avoid GetScalar if you can as it will make an >> allocation / copy for every item you access. Grab the column from the >> record batch and cast it to the appropriate typed array (this is only easy >> because it appears you have a fairly rigid schema). This will allow you to >> access values directly without wrapping them in a scalar. For example, in >> C++ (I'll leave the cython to you :)) it would look like... >> >> auto arr = >> std::dynamic_pointer_cast<arrow::DoubleArray>(record_batch->column(0)); >> std::cout << arr->Value(0) << std::endl; >> >> For the string array I believe it is... >> >> auto str_arr = >> std::dynamic_pointer_cast<arrow::StringArray>(record_batch->column(0)); >> arrow::util::string_view view = arr->GetView(0); >> >> It may take a slight bit of finesse to figure out how to get >> arrow::util::string_view to work with map but it should be doable. There >> is also GetString which returns std::string which should only be slightly >> more expensive and GetValue which returns a uint8_t* and writes the length >> into an out parameter. >> >> On Wed, Apr 14, 2021 at 3:15 PM Xander Dunn <[email protected]> wrote: >> >>> Thanks, I did try a few things with pyarrow.compute. However, the >>> pyarrow.compute.filter interface indicates that it takes a boolean mask to >>> do the filtering: >>> https://arrow.apache.org/docs/python/generated/pyarrow.compute.filter.html >>> >>> But it doesn't actually help me create the mask? I'm back to iterating >>> through the rows and now I would need to create a boolean array of size >>> (num_rows) for every unique value of `name`. >>> >>> I saw in the dataset docs ( >>> https://arrow.apache.org/docs/python/dataset.html) some discussion on >>> Expressions, such as `ds.field("name") == "Xander"`. However, I don't see a >>> way of computing such an expression without loading the entire dataset into >>> memory with `dataset.to_table()`, which doesn't work for my dataset because >>> it's many times larger than RAM. Can an Expression be computed on a >>> RecordBatch? >>> >>> But it's also hard to foresee how applying filter for each unique value >>> of `name` will be more computationally efficient. The loop I posted above >>> is O(num_rows), whereas applying filter for each name would be O(num_rows * >>> num_unique_names). It could still be faster if my loop code is poorly >>> implemented or if filter is multi-threaded. >>> >>> >>> On Wed, Apr 14, 2021 at 4:45 PM, Micah Kornfield <[email protected]> >>> wrote: >>> >>>> Have you looked at the pyarrow compute functions [1][2]? >>>> >>>> Unique and filter seems like they would help. >>>> >>>> [1] >>>> https://arrow.apache.org/docs/python/compute.html?highlight=pyarrow%20compute >>>> [2] >>>> https://arrow.apache.org/docs/cpp/compute.html#compute-function-list >>>> >>>> On Wed, Apr 14, 2021 at 2:02 PM Xander Dunn <[email protected]> wrote: >>>> >>>>> Thanks Weston, >>>>> >>>>> Performance is paramount here, I'm streaming through 7TB of data. >>>>> >>>>> I actually need to separate the data based on the value of the `name` >>>>> column. For every unique value of `name`, I need a batch of those rows. I >>>>> tried using gandiva's filter function but can't get gandiva installed on >>>>> Ubuntu (see my earlier thread "[Python] pyarrow.gandiva unavailable >>>>> on Ubuntu?" on this mailing list). >>>>> >>>>> Aside from that, I'm not sure of a way to separate the data faster >>>>> than iterating through every row and placing the values into a map keyed >>>>> on >>>>> `name`: >>>>> ``` >>>>> cdef struct myUpdateStruct: >>>>> double value >>>>> int64_t checksum >>>>> >>>>> cdef iterate_dataset(): >>>>> cdef map[c_string, deque[myUpdateStruct]] myUpdates >>>>> cdef shared_ptr[CRecordBatch] batch # This is populated by a >>>>> scanner of .parquet files >>>>> cdef int64_t batch_row_index = 0 >>>>> while batch_row_index < batch.get().num_rows(): >>>>> name_buffer = (<CBaseBinaryScalar*>GetResultValue(names.get().\ >>>>> GetScalar(batch_row_index)).get()).value >>>>> name = <char *>name_buffer.get().data() >>>>> value = (<CDoubleScalar*>GetResultValue(values.get().\ >>>>> GetScalar(batch_row_index)).get()).value >>>>> checksum = (<CInt64Scalar*>GetResultValue(checksums.get().\ >>>>> GetScalar(batch_row_index)).get()).value >>>>> newUpdate = myUpdateStruct(value, checksum) >>>>> if myUpdates.count(name) <= 0: >>>>> myUpdates[name] = deque[myUpdateStruct]() >>>>> myUpdates[name].push_front(newUpdate) >>>>> if myUpdates[name].size() > 1024: >>>>> myUpdates[name].pop_back() >>>>> batch_row_index += 1 >>>>> ``` >>>>> This takes 107minutes to iterate through the first 290GB of data. >>>>> Without accessing or filtering the data in any way it takes only 12min to >>>>> read all the .parquet files into RecordBatches and place them into Plasma. >>>>> >>>>> >>>>> On Wed, Apr 14, 2021 at 12:57 PM, Weston Pace <[email protected]> >>>>> wrote: >>>>> >>>>>> If you don't need the performance, you could stay in python (use >>>>>> to_pylist() for the array or as_py() for scalars). >>>>>> >>>>>> If you do need the performance then you're probably better served >>>>>> getting the buffers and operating on them directly. Or, even better, >>>>>> making use of the compute kernels: >>>>>> >>>>>> arr = pa.array(['abc', 'ab', 'Xander', None], pa.string()) >>>>>> desired = pa.array(['Xander'], pa.string()) >>>>>> pc.any(pc.is_in(arr, value_set=desired)).as_py() # True >>>>>> >>>>>> On Wed, Apr 14, 2021 at 6:29 AM Xander Dunn <[email protected]> wrote: >>>>>> >>>>>>> This works for getting a c string out of the CScalar: >>>>>>> ``` >>>>>>> name_buffer = >>>>>>> (<CBaseBinaryScalar*>GetResultValue(names.get().\ >>>>>>> GetScalar(batch_row_index)).get()).value >>>>>>> name = <char *>name_buffer.get().data() >>>>>>> ``` >>>>>>> >>>>>>> >>>>>>> On Tue, Apr 13, 2021 at 10:43 PM, Xander Dunn <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> Here is an example code snippet from a .pyx file that successfully >>>>>>>> iterates through a CRecordBatch and ensures that the timestamps are >>>>>>>> ascending: >>>>>>>> ``` >>>>>>>> while batch_row_index < batch.get().num_rows(): >>>>>>>> timestamp = >>>>>>>> GetResultValue(times.get().GetScalar(batch_row_index)) >>>>>>>> new_timestamp = <CTimestampScalar*>timestamp.get() >>>>>>>> current_timestamp = timestamps[name] >>>>>>>> if current_timestamp > new_timestamp.value: >>>>>>>> abort() >>>>>>>> batch_row_index += 1 >>>>>>>> ``` >>>>>>>> >>>>>>>> However, I'm having difficulty operating on the values in a column >>>>>>>> of string type. Unlike CTimestampScalar, there is no CStringScalar. >>>>>>>> Although there is a StringScalar type in C++, it isn't defined in the >>>>>>>> Cython interface. There is a `CStringType` and a `c_string` type. >>>>>>>> ``` >>>>>>>> while batch_row_index < batch.get().num_rows(): >>>>>>>> name = >>>>>>>> GetResultValue(names.get().GetScalar(batch_row_index)) >>>>>>>> name_string = <CStringType*>name.get() # This is wrong >>>>>>>> printf("%s\n", name_string) # This prints garbage >>>>>>>> if name_string == b"Xander": # Doesn't work >>>>>>>> print("found it") >>>>>>>> batch_row_index += 1 >>>>>>>> ``` >>>>>>>> How do I get the string value as a C type and compare it to other >>>>>>>> strings? >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Xander >>>>>>>> >>>>>>> >>>
