Ooh, [2] was very interesting to read. I am also adding parameters to combine a specified number of chunks together (also uses combineChunks) and invoke the function on a specified limit of columns. If desirable, I can add some of these descriptions and such to the JIRA and maybe I can use it as an anchor to dig more into the backing mechanisms.
I'll also move this discussion to a github issue or dev@, I forgot that I asked the initial questions on user@. Thanks for taking the time to look at the repo and for the response! Aldrin Montana Computer Science PhD Student UC Santa Cruz On Wed, Mar 23, 2022 at 2:59 PM Weston Pace <[email protected]> wrote: > Thank you for providing such an interesting (and enlightening) > reproduction. In both cases you are calling the compute functions exactly > the same. For example, you are adding two chunked arrays in both cases. > The key difference appears to be that you are either making <1 call to > `Add` with a chunked array with 565 chunks> or <565 calls to `Add` with a > chunked array with 1 chunk>. > > One could theorize that the underlying code is somehow allocating or > iterating more efficiently when given a large batch of input data. > However, running through a profiler, that does not appear to be the case. > > Instead I think the C++ implementation is simply inefficient at the moment > and our function overhead is too high. The cost is spread out very > diffusely throughout the entire compute module. I have created [1] to get > a better grasp on this. The details of the fixes can be left for JIRA > and/or the dev@ mailing list but reducing this overhead is pretty > important for my current work so I hope to be able to look into it. I'll > also add that this is a pretty similar concern to [2] which Wes raised last > summer. > > -Weston > > [1] https://issues.apache.org/jira/browse/ARROW-16014 > [2] https://lists.apache.org/thread/mp68ofm2hnvs2v2oz276rvw7y5kwqoyd > > On Sun, Mar 20, 2022 at 9:20 PM Aldrin <[email protected]> wrote: > >> Sorry it took awhile, but here is a repository I put together that should >> reproducibly illustrate what I am seeing, and what I'd like to understand >> better (if not improve)[1]. >> >> The linked source code [2] shows 2 places where I am collecting times >> (using std::chrono::steady_clock in C++). The unexpected behavior is >> mentioned at the end of the README.md [3]: >> >> Applying a function on a Table: "Executed in 4.08 secs" >> Applying the function on each "table chunk": "Executed in 58.67 secs" >> >> For the test data I included in this repository, the Table has the >> following properties: >> * 27118 rows >> * 2152 columns >> * 565 "table chunks" >> * 48 rows per "table chunk" >> >> Every column of the table should be identically chunked, so a "table >> chunk" just refers to the same chunk of every column in the table. I call >> it a "table chunk" in this email because I'm not sure if it is the same as >> a RecordBatch or if RecordBatch lays the columns out differently. The >> reason each column is chunked identically is so that the whole table can be >> split across many key-values (written to a KeyValue store). In the code I >> provide in the repository, the table is decomposed via: Table -> >> TableReader -> RecordBatch -> Table::FromRecordBatches (vector of 1 >> RecordBatch) [4]. I have found that applying functions to Tables and to >> ChunkedArrays is more ergonomic than RecordBatches and Arrays (at least as >> of ~5.0.0). >> >> So, what I want to understand is what contributes to the difference in >> performance? >> >> Thanks for any help or advice you can give! >> >> P.S. I will try to also add tensors to this for my own curiosity. >> >> >> [1]: https://github.com/drin/arrow-partial-aggr >> [2]: >> https://github.com/drin/arrow-partial-aggr/blob/mainline/src/simple.cpp#L108 >> [3]: https://github.com/drin/arrow-partial-aggr/blob/mainline/README.md >> [4]: >> https://github.com/drin/arrow-partial-aggr/blob/mainline/src/simple.cpp#L123 >> >> Aldrin Montana >> Computer Science PhD Student >> UC Santa Cruz >> >> >> On Fri, Mar 11, 2022 at 6:44 AM Niranda Perera <[email protected]> >> wrote: >> >>> Sore, I think I've missed the smart pointers in my response. It *should* >>> be smart pointers, otherwise you'll lose the allocation when you go out of >>> context. It should have been, >>> class MeanAggr{ >>> int64_t count_; >>> vector<shared_ptr<Array>> sums_; >>> vector<shared_ptr<Array>> sum_squares_; >>> } >>> >>> On Fri, Mar 11, 2022 at 3:16 AM Aldrin <[email protected]> wrote: >>> >>>> Actually, I think I understand now; I misread "extending the class >>>> members". But I think the point got across--if I know my table has a single >>>> chunk, then I can do the operations on the arrays and then I can wrap the >>>> result in a ChunkedArray or Table. For each slice, I can just maintain the >>>> results in a vector without smart pointers. >>>> >>>> I'll definitely try this. Thanks! >>>> >>>> Aldrin Montana >>>> Computer Science PhD Student >>>> UC Santa Cruz >>>> >>>> >>>> On Thu, Mar 10, 2022 at 11:35 PM Aldrin <[email protected]> wrote: >>>> >>>>> I think there's one minor misunderstanding, but I like the essence of >>>>> the feedback. >>>>> >>>>> To clarify, the MeanAggr::Accumulate function is used to gather over >>>>> points of a sample, where a row is considered a sample, and columns are >>>>> corresponding values, e.g.: >>>>> >>>>> columns (values) | c0 | c1 | c2 | c3 | c4 >>>>> row 0 (sample 0) | 1 | 2 | 3 | 4 | 5 >>>>> row 1 (sample 1) | 1 | 4 | 27 | 256 | 3125 >>>>> >>>>> For this tiny example, applying Accumulate "by slice" means that I >>>>> apply it once on row 0, then again on row 1, and I add the times together. >>>>> "By Table" means that I concatenate row 0 and row 1, then apply Accumulate >>>>> on the resulting table. Combine isn't currently being considered (it's for >>>>> when I split on columns). You can sort of see this in [1], but it also >>>>> illustrates sequential calls of Accumulate instead of using Combine. I >>>>> will >>>>> explain this more in a reproducible example. >>>>> >>>>> Given the clarification, I am not sure if the suggested local >>>>> calculations are helpful, but maybe you mean I shouldn't use so many >>>>> shared >>>>> pointers? Although, I do think I'll try reducing the code path by using >>>>> Arrays when I'm applying to a Table that I know has only 1 chunk (because >>>>> I >>>>> have specified it that way). This seems like it should help isolate some >>>>> of >>>>> the overhead. >>>>> >>>>> Thanks for the feedback! >>>>> >>>>> [1]: >>>>> https://gitlab.com/skyhookdm/skytether-singlecell/-/blob/fb688531169421a5b5985d2cbfee100e793cae2f/resources/assets/TStatistic_Diagram.png >>>>> >>>>> Aldrin Montana >>>>> Computer Science PhD Student >>>>> UC Santa Cruz >>>>> >>>>> >>>>> On Thu, Mar 10, 2022 at 7:49 PM Niranda Perera < >>>>> [email protected]> wrote: >>>>> >>>>>> Okay, one thing I immediately see is that there are a lot of memory >>>>>> allocations/ deallocations happening in the approach you have given IMO. >>>>>> arrow::compute methods are immutable, so when you get an answer, it would >>>>>> be allocated freshly in memory, and when you update an existing >>>>>> shared_ptr, >>>>>> you would be deallocating the previous buffers. In both, >>>>>> MeanAggr::Combine >>>>>> and MeanAggr::Accumulate this is happening and this could be a reason why >>>>>> the splitted version is slower. Single table version only has to go >>>>>> through >>>>>> MeanAggr::Accumulate. >>>>>> >>>>>> If I may suggest an alternative approach, I'd do this for variance >>>>>> calculation, >>>>>> class MeanAggr{ >>>>>> int64_t count_; >>>>>> vector<Array> sums_; >>>>>> vector<Array> sum_squares_; >>>>>> } >>>>>> >>>>>> At every Accumulate, I will calculate local sums, sum squares, and >>>>>> extend the class members with the resultant ChunkArray's chunks (which >>>>>> are >>>>>> Arrays). >>>>>> At the end, I'll create some ChunkArrays from these vectors, and use >>>>>> E(x^2)-E(x)^2 to calculate the variance. I feel like this might reduce >>>>>> the >>>>>> number of extra allocs and deallocs. >>>>>> >>>>>> On Thu, Mar 10, 2022 at 9:47 PM Aldrin <[email protected]> wrote: >>>>>> >>>>>>> You're correct with the first clarification. I am not (currently) >>>>>>> slicing column-wise. >>>>>>> >>>>>>> And yes, I am calculating variance, mean, etc. so that I can >>>>>>> calculate the t-statistic. >>>>>>> >>>>>>> Aldrin Montana >>>>>>> Computer Science PhD Student >>>>>>> UC Santa Cruz >>>>>>> >>>>>>> >>>>>>> On Thu, Mar 10, 2022 at 5:16 PM Niranda Perera < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> Or are you slicing column-wise? >>>>>>>> >>>>>>>> On Thu, Mar 10, 2022 at 8:14 PM Niranda Perera < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>>> From the looks of it, you are trying to calculate variance, mean, >>>>>>>>> etc over rows, isn't it? >>>>>>>>> >>>>>>>>> I need to clarify a bit on this statement. >>>>>>>>> "Where "by slice" is total time, summed from running the function >>>>>>>>> on each slice and "by table" is the time of just running the function >>>>>>>>> on >>>>>>>>> the table concatenated from each slice." >>>>>>>>> So, I assume you are originally using a `vector<shared_ptr<Table>> >>>>>>>>> slices`. For the former case, you are passing each slice to >>>>>>>>> `MeanAggr::Accumulate`, and for the latter case, you are calling >>>>>>>>> arrow::Concatenate(slices) and passing the result as a single table? >>>>>>>>> >>>>>>>>> On Thu, Mar 10, 2022 at 7:41 PM Aldrin <[email protected]> wrote: >>>>>>>>> >>>>>>>>>> Oh, but the short answer is that I'm using: Add, Subtract, >>>>>>>>>> Divide, Multiply, Power, and Absolute. Sometimes with both inputs >>>>>>>>>> being >>>>>>>>>> ChunkedArrays, sometimes with 1 input being a ChunkedArray and the >>>>>>>>>> other >>>>>>>>>> being a scalar. >>>>>>>>>> >>>>>>>>>> Aldrin Montana >>>>>>>>>> Computer Science PhD Student >>>>>>>>>> UC Santa Cruz >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Thu, Mar 10, 2022 at 4:38 PM Aldrin <[email protected]> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Niranda! >>>>>>>>>>> >>>>>>>>>>> Sure thing, I've linked to my code. [1] is essentially the >>>>>>>>>>> function being called, and [2] is an example of a wrapper function >>>>>>>>>>> (more in >>>>>>>>>>> that file) I wrote to reduce boilerplate (to make [1] more >>>>>>>>>>> readable). But, >>>>>>>>>>> now that I look at [2] again, which I wrote before I really knew >>>>>>>>>>> much about >>>>>>>>>>> smart pointers, I wonder if some of what I benchmarked is overhead >>>>>>>>>>> from >>>>>>>>>>> misusing C++ structures? >>>>>>>>>>> >>>>>>>>>>> Thanks! >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> [1]: >>>>>>>>>>> https://gitlab.com/skyhookdm/skytether-singlecell/-/blob/58839eb921c53d17ac32129be6af214ae4b58a13/src/cpp/processing/statops.cpp#L96 >>>>>>>>>>> [2]: >>>>>>>>>>> https://gitlab.com/skyhookdm/skytether-singlecell/-/blob/58839eb921c53d17ac32129be6af214ae4b58a13/src/cpp/processing/numops.cpp#L18 >>>>>>>>>>> >>>>>>>>>>> Aldrin Montana >>>>>>>>>>> Computer Science PhD Student >>>>>>>>>>> UC Santa Cruz >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Thu, Mar 10, 2022 at 4:30 PM Niranda Perera < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi Aldrin, >>>>>>>>>>>> >>>>>>>>>>>> It would be helpful to know what sort of compute operators you >>>>>>>>>>>> are using. >>>>>>>>>>>> >>>>>>>>>>>> On Thu, Mar 10, 2022, 19:12 Aldrin <[email protected]> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> I will work on a reproducible example. >>>>>>>>>>>>> >>>>>>>>>>>>> As a sneak peek, what I was seeing was the following (pasted >>>>>>>>>>>>> in gmail, see [1] for markdown version): >>>>>>>>>>>>> >>>>>>>>>>>>> Table ID Columns Rows Rows (slice) Slice count Time (ms) >>>>>>>>>>>>> total; by slice Time (ms) >>>>>>>>>>>>> total; by table >>>>>>>>>>>>> E-GEOD-100618 415 20631 299 69 644.065 410 >>>>>>>>>>>>> E-GEOD-76312 2152 27120 48 565 25607.927 2953 >>>>>>>>>>>>> E-GEOD-106540 2145 24480 45 544 25193.507 3088 >>>>>>>>>>>>> >>>>>>>>>>>>> Where "by slice" is total time, summed from running the >>>>>>>>>>>>> function on each slice and "by table" is the time of just running >>>>>>>>>>>>> the >>>>>>>>>>>>> function on the table concatenated from each slice. >>>>>>>>>>>>> >>>>>>>>>>>>> The difference was large (but not *so* large) for ~70 >>>>>>>>>>>>> iterations (1.5x); but for ~550 iterations (and 6x fewer rows, 5x >>>>>>>>>>>>> more >>>>>>>>>>>>> columns) the difference became significant (~10x). >>>>>>>>>>>>> >>>>>>>>>>>>> I will follow up here when I have a more reproducible example. >>>>>>>>>>>>> I also started doing this before tensors were available, so I'll >>>>>>>>>>>>> try to see >>>>>>>>>>>>> how that changes performance. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> [1]: >>>>>>>>>>>>> https://gist.github.com/drin/4b2e2ea97a07c9ad54647bcdc462611a >>>>>>>>>>>>> >>>>>>>>>>>>> Aldrin Montana >>>>>>>>>>>>> Computer Science PhD Student >>>>>>>>>>>>> UC Santa Cruz >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Thu, Mar 10, 2022 at 2:32 PM Weston Pace < >>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> As far as I know (and my knowledge here may be dated) the >>>>>>>>>>>>>> compute >>>>>>>>>>>>>> kernels themselves do not do any concurrency. There are >>>>>>>>>>>>>> certainly >>>>>>>>>>>>>> compute kernels that could benefit from concurrency in this >>>>>>>>>>>>>> manner >>>>>>>>>>>>>> (many kernels naively so) and I think things are setup so >>>>>>>>>>>>>> that, if we >>>>>>>>>>>>>> decide to tackle this feature, we could do so in a systematic >>>>>>>>>>>>>> way >>>>>>>>>>>>>> (instead of writing something for each kernel). >>>>>>>>>>>>>> >>>>>>>>>>>>>> I believe that kernels, if given a unique kernel context, >>>>>>>>>>>>>> should be thread safe. >>>>>>>>>>>>>> >>>>>>>>>>>>>> The streaming compute engine, on the other hand, does support >>>>>>>>>>>>>> concurrency. It is mostly driven by the scanner at the >>>>>>>>>>>>>> moment (e.g. >>>>>>>>>>>>>> each batch we fetch from the scanner gets a fresh thread task >>>>>>>>>>>>>> for >>>>>>>>>>>>>> running through the execution plan) but there is some >>>>>>>>>>>>>> intra-node >>>>>>>>>>>>>> concurrency in the hash join and (I think) the hash aggregate >>>>>>>>>>>>>> nodes. >>>>>>>>>>>>>> This has been sufficient to saturate cores on the benchmarks >>>>>>>>>>>>>> we run. >>>>>>>>>>>>>> I know there is ongoing interest in understanding and >>>>>>>>>>>>>> improving our >>>>>>>>>>>>>> concurrency here. >>>>>>>>>>>>>> >>>>>>>>>>>>>> The scanner supports concurrency. It will typically fetch >>>>>>>>>>>>>> multiple >>>>>>>>>>>>>> files at once and, for each file, it will fetch multiple >>>>>>>>>>>>>> batches at >>>>>>>>>>>>>> once (assuming the file has more than one batch). >>>>>>>>>>>>>> >>>>>>>>>>>>>> > I see a large difference between the total time to apply >>>>>>>>>>>>>> compute functions to a single table (concatenated from many >>>>>>>>>>>>>> small tables) >>>>>>>>>>>>>> compared to applying compute functions to each sub-table in the >>>>>>>>>>>>>> composition. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Which one is better? Can you share a reproducible example? >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Thu, Mar 10, 2022 at 12:01 PM Aldrin <[email protected]> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > Hello! >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > I'm wondering if there's any documentation that describes >>>>>>>>>>>>>> the concurrency/parallelism architecture for the compute API. >>>>>>>>>>>>>> I'd also be >>>>>>>>>>>>>> interested if there are recommended approaches for seeing >>>>>>>>>>>>>> performance of >>>>>>>>>>>>>> threads used by Arrow--should I try to check a processor ID and >>>>>>>>>>>>>> infer >>>>>>>>>>>>>> performance or are there particular tools that the community >>>>>>>>>>>>>> uses? >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > Specifically, I am wondering if the concurrency is going to >>>>>>>>>>>>>> be different when using a ChunkedArray as an input compared to >>>>>>>>>>>>>> an Array or >>>>>>>>>>>>>> for ChunkedArrays with various chunk sizes (1 chunk vs tens or >>>>>>>>>>>>>> hundreds). I >>>>>>>>>>>>>> see a large difference between the total time to apply compute >>>>>>>>>>>>>> functions to >>>>>>>>>>>>>> a single table (concatenated from many small tables) compared to >>>>>>>>>>>>>> applying >>>>>>>>>>>>>> compute functions to each sub-table in the composition. I'm >>>>>>>>>>>>>> trying to >>>>>>>>>>>>>> figure out where that difference may come from and I'm wondering >>>>>>>>>>>>>> if it's >>>>>>>>>>>>>> related to parallelism within Arrow. >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > I tried using the github issues and JIRA issues (e.g. [1]) >>>>>>>>>>>>>> as a way to sleuth the info, but I couldn't find anything. The >>>>>>>>>>>>>> pyarrow API >>>>>>>>>>>>>> seems to have functions I could try and use to figure it out >>>>>>>>>>>>>> (cpu_count and >>>>>>>>>>>>>> set_cpu_count), but that seems like a vague road. >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > [1]: https://issues.apache.org/jira/browse/ARROW-12726 >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > Thank you! >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > Aldrin Montana >>>>>>>>>>>>>> > Computer Science PhD Student >>>>>>>>>>>>>> > UC Santa Cruz >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Niranda Perera >>>>>>>>> https://niranda.dev/ >>>>>>>>> @n1r44 <https://twitter.com/N1R44> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Niranda Perera >>>>>>>> https://niranda.dev/ >>>>>>>> @n1r44 <https://twitter.com/N1R44> >>>>>>>> >>>>>>>> >>>>>> >>>>>> -- >>>>>> Niranda Perera >>>>>> https://niranda.dev/ >>>>>> @n1r44 <https://twitter.com/N1R44> >>>>>> >>>>>> >>> >>> -- >>> Niranda Perera >>> https://niranda.dev/ >>> @n1r44 <https://twitter.com/N1R44> >>> >>>
