Sorry it took awhile, but here is a repository I put together that should reproducibly illustrate what I am seeing, and what I'd like to understand better (if not improve)[1].
The linked source code [2] shows 2 places where I am collecting times (using std::chrono::steady_clock in C++). The unexpected behavior is mentioned at the end of the README.md [3]: Applying a function on a Table: "Executed in 4.08 secs" Applying the function on each "table chunk": "Executed in 58.67 secs" For the test data I included in this repository, the Table has the following properties: * 27118 rows * 2152 columns * 565 "table chunks" * 48 rows per "table chunk" Every column of the table should be identically chunked, so a "table chunk" just refers to the same chunk of every column in the table. I call it a "table chunk" in this email because I'm not sure if it is the same as a RecordBatch or if RecordBatch lays the columns out differently. The reason each column is chunked identically is so that the whole table can be split across many key-values (written to a KeyValue store). In the code I provide in the repository, the table is decomposed via: Table -> TableReader -> RecordBatch -> Table::FromRecordBatches (vector of 1 RecordBatch) [4]. I have found that applying functions to Tables and to ChunkedArrays is more ergonomic than RecordBatches and Arrays (at least as of ~5.0.0). So, what I want to understand is what contributes to the difference in performance? Thanks for any help or advice you can give! P.S. I will try to also add tensors to this for my own curiosity. [1]: https://github.com/drin/arrow-partial-aggr [2]: https://github.com/drin/arrow-partial-aggr/blob/mainline/src/simple.cpp#L108 [3]: https://github.com/drin/arrow-partial-aggr/blob/mainline/README.md [4]: https://github.com/drin/arrow-partial-aggr/blob/mainline/src/simple.cpp#L123 Aldrin Montana Computer Science PhD Student UC Santa Cruz On Fri, Mar 11, 2022 at 6:44 AM Niranda Perera <[email protected]> wrote: > Sore, I think I've missed the smart pointers in my response. It *should* > be smart pointers, otherwise you'll lose the allocation when you go out of > context. It should have been, > class MeanAggr{ > int64_t count_; > vector<shared_ptr<Array>> sums_; > vector<shared_ptr<Array>> sum_squares_; > } > > On Fri, Mar 11, 2022 at 3:16 AM Aldrin <[email protected]> wrote: > >> Actually, I think I understand now; I misread "extending the class >> members". But I think the point got across--if I know my table has a single >> chunk, then I can do the operations on the arrays and then I can wrap the >> result in a ChunkedArray or Table. For each slice, I can just maintain the >> results in a vector without smart pointers. >> >> I'll definitely try this. Thanks! >> >> Aldrin Montana >> Computer Science PhD Student >> UC Santa Cruz >> >> >> On Thu, Mar 10, 2022 at 11:35 PM Aldrin <[email protected]> wrote: >> >>> I think there's one minor misunderstanding, but I like the essence of >>> the feedback. >>> >>> To clarify, the MeanAggr::Accumulate function is used to gather over >>> points of a sample, where a row is considered a sample, and columns are >>> corresponding values, e.g.: >>> >>> columns (values) | c0 | c1 | c2 | c3 | c4 >>> row 0 (sample 0) | 1 | 2 | 3 | 4 | 5 >>> row 1 (sample 1) | 1 | 4 | 27 | 256 | 3125 >>> >>> For this tiny example, applying Accumulate "by slice" means that I apply >>> it once on row 0, then again on row 1, and I add the times together. "By >>> Table" means that I concatenate row 0 and row 1, then apply Accumulate on >>> the resulting table. Combine isn't currently being considered (it's for >>> when I split on columns). You can sort of see this in [1], but it also >>> illustrates sequential calls of Accumulate instead of using Combine. I will >>> explain this more in a reproducible example. >>> >>> Given the clarification, I am not sure if the suggested local >>> calculations are helpful, but maybe you mean I shouldn't use so many shared >>> pointers? Although, I do think I'll try reducing the code path by using >>> Arrays when I'm applying to a Table that I know has only 1 chunk (because I >>> have specified it that way). This seems like it should help isolate some of >>> the overhead. >>> >>> Thanks for the feedback! >>> >>> [1]: >>> https://gitlab.com/skyhookdm/skytether-singlecell/-/blob/fb688531169421a5b5985d2cbfee100e793cae2f/resources/assets/TStatistic_Diagram.png >>> >>> Aldrin Montana >>> Computer Science PhD Student >>> UC Santa Cruz >>> >>> >>> On Thu, Mar 10, 2022 at 7:49 PM Niranda Perera <[email protected]> >>> wrote: >>> >>>> Okay, one thing I immediately see is that there are a lot of memory >>>> allocations/ deallocations happening in the approach you have given IMO. >>>> arrow::compute methods are immutable, so when you get an answer, it would >>>> be allocated freshly in memory, and when you update an existing shared_ptr, >>>> you would be deallocating the previous buffers. In both, MeanAggr::Combine >>>> and MeanAggr::Accumulate this is happening and this could be a reason why >>>> the splitted version is slower. Single table version only has to go through >>>> MeanAggr::Accumulate. >>>> >>>> If I may suggest an alternative approach, I'd do this for variance >>>> calculation, >>>> class MeanAggr{ >>>> int64_t count_; >>>> vector<Array> sums_; >>>> vector<Array> sum_squares_; >>>> } >>>> >>>> At every Accumulate, I will calculate local sums, sum squares, and >>>> extend the class members with the resultant ChunkArray's chunks (which are >>>> Arrays). >>>> At the end, I'll create some ChunkArrays from these vectors, and use >>>> E(x^2)-E(x)^2 to calculate the variance. I feel like this might reduce the >>>> number of extra allocs and deallocs. >>>> >>>> On Thu, Mar 10, 2022 at 9:47 PM Aldrin <[email protected]> wrote: >>>> >>>>> You're correct with the first clarification. I am not (currently) >>>>> slicing column-wise. >>>>> >>>>> And yes, I am calculating variance, mean, etc. so that I can calculate >>>>> the t-statistic. >>>>> >>>>> Aldrin Montana >>>>> Computer Science PhD Student >>>>> UC Santa Cruz >>>>> >>>>> >>>>> On Thu, Mar 10, 2022 at 5:16 PM Niranda Perera < >>>>> [email protected]> wrote: >>>>> >>>>>> Or are you slicing column-wise? >>>>>> >>>>>> On Thu, Mar 10, 2022 at 8:14 PM Niranda Perera < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> From the looks of it, you are trying to calculate variance, mean, >>>>>>> etc over rows, isn't it? >>>>>>> >>>>>>> I need to clarify a bit on this statement. >>>>>>> "Where "by slice" is total time, summed from running the function on >>>>>>> each slice and "by table" is the time of just running the function on >>>>>>> the >>>>>>> table concatenated from each slice." >>>>>>> So, I assume you are originally using a `vector<shared_ptr<Table>> >>>>>>> slices`. For the former case, you are passing each slice to >>>>>>> `MeanAggr::Accumulate`, and for the latter case, you are calling >>>>>>> arrow::Concatenate(slices) and passing the result as a single table? >>>>>>> >>>>>>> On Thu, Mar 10, 2022 at 7:41 PM Aldrin <[email protected]> wrote: >>>>>>> >>>>>>>> Oh, but the short answer is that I'm using: Add, Subtract, Divide, >>>>>>>> Multiply, Power, and Absolute. Sometimes with both inputs being >>>>>>>> ChunkedArrays, sometimes with 1 input being a ChunkedArray and the >>>>>>>> other >>>>>>>> being a scalar. >>>>>>>> >>>>>>>> Aldrin Montana >>>>>>>> Computer Science PhD Student >>>>>>>> UC Santa Cruz >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Mar 10, 2022 at 4:38 PM Aldrin <[email protected]> wrote: >>>>>>>> >>>>>>>>> Hi Niranda! >>>>>>>>> >>>>>>>>> Sure thing, I've linked to my code. [1] is essentially the >>>>>>>>> function being called, and [2] is an example of a wrapper function >>>>>>>>> (more in >>>>>>>>> that file) I wrote to reduce boilerplate (to make [1] more readable). >>>>>>>>> But, >>>>>>>>> now that I look at [2] again, which I wrote before I really knew much >>>>>>>>> about >>>>>>>>> smart pointers, I wonder if some of what I benchmarked is overhead >>>>>>>>> from >>>>>>>>> misusing C++ structures? >>>>>>>>> >>>>>>>>> Thanks! >>>>>>>>> >>>>>>>>> >>>>>>>>> [1]: >>>>>>>>> https://gitlab.com/skyhookdm/skytether-singlecell/-/blob/58839eb921c53d17ac32129be6af214ae4b58a13/src/cpp/processing/statops.cpp#L96 >>>>>>>>> [2]: >>>>>>>>> https://gitlab.com/skyhookdm/skytether-singlecell/-/blob/58839eb921c53d17ac32129be6af214ae4b58a13/src/cpp/processing/numops.cpp#L18 >>>>>>>>> >>>>>>>>> Aldrin Montana >>>>>>>>> Computer Science PhD Student >>>>>>>>> UC Santa Cruz >>>>>>>>> >>>>>>>>> >>>>>>>>> On Thu, Mar 10, 2022 at 4:30 PM Niranda Perera < >>>>>>>>> [email protected]> wrote: >>>>>>>>> >>>>>>>>>> Hi Aldrin, >>>>>>>>>> >>>>>>>>>> It would be helpful to know what sort of compute operators you >>>>>>>>>> are using. >>>>>>>>>> >>>>>>>>>> On Thu, Mar 10, 2022, 19:12 Aldrin <[email protected]> wrote: >>>>>>>>>> >>>>>>>>>>> I will work on a reproducible example. >>>>>>>>>>> >>>>>>>>>>> As a sneak peek, what I was seeing was the following (pasted in >>>>>>>>>>> gmail, see [1] for markdown version): >>>>>>>>>>> >>>>>>>>>>> Table ID Columns Rows Rows (slice) Slice count Time (ms) >>>>>>>>>>> total; by slice Time (ms) >>>>>>>>>>> total; by table >>>>>>>>>>> E-GEOD-100618 415 20631 299 69 644.065 410 >>>>>>>>>>> E-GEOD-76312 2152 27120 48 565 25607.927 2953 >>>>>>>>>>> E-GEOD-106540 2145 24480 45 544 25193.507 3088 >>>>>>>>>>> >>>>>>>>>>> Where "by slice" is total time, summed from running the function >>>>>>>>>>> on each slice and "by table" is the time of just running the >>>>>>>>>>> function on >>>>>>>>>>> the table concatenated from each slice. >>>>>>>>>>> >>>>>>>>>>> The difference was large (but not *so* large) for ~70 iterations >>>>>>>>>>> (1.5x); but for ~550 iterations (and 6x fewer rows, 5x more >>>>>>>>>>> columns) the >>>>>>>>>>> difference became significant (~10x). >>>>>>>>>>> >>>>>>>>>>> I will follow up here when I have a more reproducible example. I >>>>>>>>>>> also started doing this before tensors were available, so I'll try >>>>>>>>>>> to see >>>>>>>>>>> how that changes performance. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> [1]: >>>>>>>>>>> https://gist.github.com/drin/4b2e2ea97a07c9ad54647bcdc462611a >>>>>>>>>>> >>>>>>>>>>> Aldrin Montana >>>>>>>>>>> Computer Science PhD Student >>>>>>>>>>> UC Santa Cruz >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Thu, Mar 10, 2022 at 2:32 PM Weston Pace < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> >>>>>>>>>>>> As far as I know (and my knowledge here may be dated) the >>>>>>>>>>>> compute >>>>>>>>>>>> kernels themselves do not do any concurrency. There are >>>>>>>>>>>> certainly >>>>>>>>>>>> compute kernels that could benefit from concurrency in this >>>>>>>>>>>> manner >>>>>>>>>>>> (many kernels naively so) and I think things are setup so that, >>>>>>>>>>>> if we >>>>>>>>>>>> decide to tackle this feature, we could do so in a systematic >>>>>>>>>>>> way >>>>>>>>>>>> (instead of writing something for each kernel). >>>>>>>>>>>> >>>>>>>>>>>> I believe that kernels, if given a unique kernel context, >>>>>>>>>>>> should be thread safe. >>>>>>>>>>>> >>>>>>>>>>>> The streaming compute engine, on the other hand, does support >>>>>>>>>>>> concurrency. It is mostly driven by the scanner at the moment >>>>>>>>>>>> (e.g. >>>>>>>>>>>> each batch we fetch from the scanner gets a fresh thread task >>>>>>>>>>>> for >>>>>>>>>>>> running through the execution plan) but there is some intra-node >>>>>>>>>>>> concurrency in the hash join and (I think) the hash aggregate >>>>>>>>>>>> nodes. >>>>>>>>>>>> This has been sufficient to saturate cores on the benchmarks we >>>>>>>>>>>> run. >>>>>>>>>>>> I know there is ongoing interest in understanding and improving >>>>>>>>>>>> our >>>>>>>>>>>> concurrency here. >>>>>>>>>>>> >>>>>>>>>>>> The scanner supports concurrency. It will typically fetch >>>>>>>>>>>> multiple >>>>>>>>>>>> files at once and, for each file, it will fetch multiple >>>>>>>>>>>> batches at >>>>>>>>>>>> once (assuming the file has more than one batch). >>>>>>>>>>>> >>>>>>>>>>>> > I see a large difference between the total time to apply >>>>>>>>>>>> compute functions to a single table (concatenated from many small >>>>>>>>>>>> tables) >>>>>>>>>>>> compared to applying compute functions to each sub-table in the >>>>>>>>>>>> composition. >>>>>>>>>>>> >>>>>>>>>>>> Which one is better? Can you share a reproducible example? >>>>>>>>>>>> >>>>>>>>>>>> On Thu, Mar 10, 2022 at 12:01 PM Aldrin <[email protected]> >>>>>>>>>>>> wrote: >>>>>>>>>>>> > >>>>>>>>>>>> > Hello! >>>>>>>>>>>> > >>>>>>>>>>>> > I'm wondering if there's any documentation that describes the >>>>>>>>>>>> concurrency/parallelism architecture for the compute API. I'd also >>>>>>>>>>>> be >>>>>>>>>>>> interested if there are recommended approaches for seeing >>>>>>>>>>>> performance of >>>>>>>>>>>> threads used by Arrow--should I try to check a processor ID and >>>>>>>>>>>> infer >>>>>>>>>>>> performance or are there particular tools that the community uses? >>>>>>>>>>>> > >>>>>>>>>>>> > Specifically, I am wondering if the concurrency is going to >>>>>>>>>>>> be different when using a ChunkedArray as an input compared to an >>>>>>>>>>>> Array or >>>>>>>>>>>> for ChunkedArrays with various chunk sizes (1 chunk vs tens or >>>>>>>>>>>> hundreds). I >>>>>>>>>>>> see a large difference between the total time to apply compute >>>>>>>>>>>> functions to >>>>>>>>>>>> a single table (concatenated from many small tables) compared to >>>>>>>>>>>> applying >>>>>>>>>>>> compute functions to each sub-table in the composition. I'm trying >>>>>>>>>>>> to >>>>>>>>>>>> figure out where that difference may come from and I'm wondering >>>>>>>>>>>> if it's >>>>>>>>>>>> related to parallelism within Arrow. >>>>>>>>>>>> > >>>>>>>>>>>> > I tried using the github issues and JIRA issues (e.g. [1]) >>>>>>>>>>>> as a way to sleuth the info, but I couldn't find anything. The >>>>>>>>>>>> pyarrow API >>>>>>>>>>>> seems to have functions I could try and use to figure it out >>>>>>>>>>>> (cpu_count and >>>>>>>>>>>> set_cpu_count), but that seems like a vague road. >>>>>>>>>>>> > >>>>>>>>>>>> > [1]: https://issues.apache.org/jira/browse/ARROW-12726 >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > Thank you! >>>>>>>>>>>> > >>>>>>>>>>>> > Aldrin Montana >>>>>>>>>>>> > Computer Science PhD Student >>>>>>>>>>>> > UC Santa Cruz >>>>>>>>>>>> >>>>>>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Niranda Perera >>>>>>> https://niranda.dev/ >>>>>>> @n1r44 <https://twitter.com/N1R44> >>>>>>> >>>>>>> >>>>>> >>>>>> -- >>>>>> Niranda Perera >>>>>> https://niranda.dev/ >>>>>> @n1r44 <https://twitter.com/N1R44> >>>>>> >>>>>> >>>> >>>> -- >>>> Niranda Perera >>>> https://niranda.dev/ >>>> @n1r44 <https://twitter.com/N1R44> >>>> >>>> > > -- > Niranda Perera > https://niranda.dev/ > @n1r44 <https://twitter.com/N1R44> > >
