Actually, I think I understand now; I misread "extending the class
members". But I think the point got across--if I know my table has a single
chunk, then I can do the operations on the arrays and then I can wrap the
result in a ChunkedArray or Table. For each slice, I can just maintain the
results in a vector without smart pointers.

I'll definitely try this. Thanks!

Aldrin Montana
Computer Science PhD Student
UC Santa Cruz


On Thu, Mar 10, 2022 at 11:35 PM Aldrin <[email protected]> wrote:

> I think there's one minor misunderstanding, but I like the essence of the
> feedback.
>
> To clarify, the MeanAggr::Accumulate function is used to gather over
> points of a sample, where a row is considered a sample, and columns are
> corresponding values, e.g.:
>
> columns (values) |  c0  |  c1  |  c2 |  c3 |   c4
> row 0 (sample 0) |   1  |   2  |   3 |   4 |     5
> row 1 (sample 1) |   1  |   4  |  27 | 256 |  3125
>
> For this tiny example, applying Accumulate "by slice" means that I apply
> it once on row 0, then again on row 1, and I add the times together. "By
> Table" means that I concatenate row 0 and row 1, then apply Accumulate on
> the resulting table. Combine isn't currently being considered (it's for
> when I split on columns). You can sort of see this in [1], but it also
> illustrates sequential calls of Accumulate instead of using Combine. I will
> explain this more in a reproducible example.
>
> Given the clarification, I am not sure if the suggested local calculations
> are helpful, but maybe you mean I shouldn't use so many shared pointers?
> Although, I do think I'll try reducing the code path by using Arrays when
> I'm applying to a Table that I know has only 1 chunk (because I have
> specified it that way). This seems like it should help isolate some of the
> overhead.
>
> Thanks for the feedback!
>
> [1]:
> https://gitlab.com/skyhookdm/skytether-singlecell/-/blob/fb688531169421a5b5985d2cbfee100e793cae2f/resources/assets/TStatistic_Diagram.png
>
> Aldrin Montana
> Computer Science PhD Student
> UC Santa Cruz
>
>
> On Thu, Mar 10, 2022 at 7:49 PM Niranda Perera <[email protected]>
> wrote:
>
>> Okay, one thing I immediately see is that there are a lot of memory
>> allocations/ deallocations happening in the approach you have given IMO.
>> arrow::compute methods are immutable, so when you get an answer, it would
>> be allocated freshly in memory, and when you update an existing shared_ptr,
>> you would be deallocating the previous buffers. In both, MeanAggr::Combine
>> and MeanAggr::Accumulate this is happening and this could be a reason why
>> the splitted version is slower. Single table version only has to go through
>> MeanAggr::Accumulate.
>>
>> If I may suggest an alternative approach, I'd do this for variance
>> calculation,
>> class MeanAggr{
>> int64_t count_;
>> vector<Array> sums_;
>> vector<Array> sum_squares_;
>> }
>>
>> At every Accumulate, I will calculate local sums, sum squares, and extend
>> the class members with the resultant ChunkArray's chunks (which are
>> Arrays).
>> At the end, I'll create some ChunkArrays from these vectors, and use
>> E(x^2)-E(x)^2 to calculate the variance. I feel like this might reduce the
>> number of extra allocs and deallocs.
>>
>> On Thu, Mar 10, 2022 at 9:47 PM Aldrin <[email protected]> wrote:
>>
>>> You're correct with the first clarification. I am not (currently)
>>> slicing column-wise.
>>>
>>> And yes, I am calculating variance, mean, etc. so that I can calculate
>>> the t-statistic.
>>>
>>> Aldrin Montana
>>> Computer Science PhD Student
>>> UC Santa Cruz
>>>
>>>
>>> On Thu, Mar 10, 2022 at 5:16 PM Niranda Perera <[email protected]>
>>> wrote:
>>>
>>>> Or are you slicing column-wise?
>>>>
>>>> On Thu, Mar 10, 2022 at 8:14 PM Niranda Perera <
>>>> [email protected]> wrote:
>>>>
>>>>> From the looks of it, you are trying to calculate variance, mean, etc
>>>>> over rows, isn't it?
>>>>>
>>>>> I need to clarify a bit on this statement.
>>>>> "Where "by slice" is total time, summed from running the function on
>>>>> each slice and "by table" is the time of just running the function on the
>>>>> table concatenated from each slice."
>>>>> So, I assume you are originally using a `vector<shared_ptr<Table>>
>>>>> slices`. For the former case, you are passing each slice to
>>>>> `MeanAggr::Accumulate`, and for the latter case, you are calling
>>>>> arrow::Concatenate(slices) and passing the result as a single table?
>>>>>
>>>>> On Thu, Mar 10, 2022 at 7:41 PM Aldrin <[email protected]> wrote:
>>>>>
>>>>>> Oh, but the short answer is that I'm using: Add, Subtract, Divide,
>>>>>> Multiply, Power, and Absolute. Sometimes with both inputs being
>>>>>> ChunkedArrays, sometimes with 1 input being a ChunkedArray and the other
>>>>>> being a scalar.
>>>>>>
>>>>>> Aldrin Montana
>>>>>> Computer Science PhD Student
>>>>>> UC Santa Cruz
>>>>>>
>>>>>>
>>>>>> On Thu, Mar 10, 2022 at 4:38 PM Aldrin <[email protected]> wrote:
>>>>>>
>>>>>>> Hi Niranda!
>>>>>>>
>>>>>>> Sure thing, I've linked to my code. [1] is essentially the function
>>>>>>> being called, and [2] is an example of a wrapper function (more in that
>>>>>>> file) I wrote to reduce boilerplate (to make [1] more readable). But, 
>>>>>>> now
>>>>>>> that I look at [2] again, which I wrote before I really knew much about
>>>>>>> smart pointers, I wonder if some of what I benchmarked is overhead from
>>>>>>> misusing C++ structures?
>>>>>>>
>>>>>>> Thanks!
>>>>>>>
>>>>>>>
>>>>>>> [1]:
>>>>>>> https://gitlab.com/skyhookdm/skytether-singlecell/-/blob/58839eb921c53d17ac32129be6af214ae4b58a13/src/cpp/processing/statops.cpp#L96
>>>>>>> [2]:
>>>>>>> https://gitlab.com/skyhookdm/skytether-singlecell/-/blob/58839eb921c53d17ac32129be6af214ae4b58a13/src/cpp/processing/numops.cpp#L18
>>>>>>>
>>>>>>> Aldrin Montana
>>>>>>> Computer Science PhD Student
>>>>>>> UC Santa Cruz
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Mar 10, 2022 at 4:30 PM Niranda Perera <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>> Hi Aldrin,
>>>>>>>>
>>>>>>>> It would be helpful to know what sort of compute operators you are
>>>>>>>> using.
>>>>>>>>
>>>>>>>> On Thu, Mar 10, 2022, 19:12 Aldrin <[email protected]> wrote:
>>>>>>>>
>>>>>>>>> I will work on a reproducible example.
>>>>>>>>>
>>>>>>>>> As a sneak peek, what I was seeing was the following (pasted in
>>>>>>>>> gmail, see [1] for markdown version):
>>>>>>>>>
>>>>>>>>> Table ID Columns Rows Rows (slice) Slice count Time (ms)
>>>>>>>>> total; by slice Time (ms)
>>>>>>>>> total; by table
>>>>>>>>> E-GEOD-100618 415 20631 299 69 644.065 410
>>>>>>>>> E-GEOD-76312 2152 27120 48 565 25607.927 2953
>>>>>>>>> E-GEOD-106540 2145 24480 45 544 25193.507 3088
>>>>>>>>>
>>>>>>>>> Where "by slice" is total time, summed from running the function
>>>>>>>>> on each slice and "by table" is the time of just running the function 
>>>>>>>>> on
>>>>>>>>> the table concatenated from each slice.
>>>>>>>>>
>>>>>>>>> The difference was large (but not *so* large) for ~70 iterations
>>>>>>>>> (1.5x); but for ~550 iterations (and 6x fewer rows, 5x more columns) 
>>>>>>>>> the
>>>>>>>>> difference became significant (~10x).
>>>>>>>>>
>>>>>>>>> I will follow up here when I have a more reproducible example. I
>>>>>>>>> also started doing this before tensors were available, so I'll try to 
>>>>>>>>> see
>>>>>>>>> how that changes performance.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> [1]: https://gist.github.com/drin/4b2e2ea97a07c9ad54647bcdc462611a
>>>>>>>>>
>>>>>>>>> Aldrin Montana
>>>>>>>>> Computer Science PhD Student
>>>>>>>>> UC Santa Cruz
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Mar 10, 2022 at 2:32 PM Weston Pace <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> As far as I know (and my knowledge here may be dated) the compute
>>>>>>>>>> kernels themselves do not do any concurrency.  There are certainly
>>>>>>>>>> compute kernels that could benefit from concurrency in this manner
>>>>>>>>>> (many kernels naively so) and I think things are setup so that,
>>>>>>>>>> if we
>>>>>>>>>> decide to tackle this feature, we could do so in a systematic way
>>>>>>>>>> (instead of writing something for each kernel).
>>>>>>>>>>
>>>>>>>>>> I believe that kernels, if given a unique kernel context, should
>>>>>>>>>> be thread safe.
>>>>>>>>>>
>>>>>>>>>> The streaming compute engine, on the other hand, does support
>>>>>>>>>> concurrency.  It is mostly driven by the scanner at the moment
>>>>>>>>>> (e.g.
>>>>>>>>>> each batch we fetch from the scanner gets a fresh thread task for
>>>>>>>>>> running through the execution plan) but there is some intra-node
>>>>>>>>>> concurrency in the hash join and (I think) the hash aggregate
>>>>>>>>>> nodes.
>>>>>>>>>> This has been sufficient to saturate cores on the benchmarks we
>>>>>>>>>> run.
>>>>>>>>>> I know there is ongoing interest in understanding and improving
>>>>>>>>>> our
>>>>>>>>>> concurrency here.
>>>>>>>>>>
>>>>>>>>>> The scanner supports concurrency.  It will typically fetch
>>>>>>>>>> multiple
>>>>>>>>>> files at once and, for each file, it will fetch multiple batches
>>>>>>>>>> at
>>>>>>>>>> once (assuming the file has more than one batch).
>>>>>>>>>>
>>>>>>>>>> > I see a large difference between the total time to apply
>>>>>>>>>> compute functions to a single table (concatenated from many small 
>>>>>>>>>> tables)
>>>>>>>>>> compared to applying compute functions to each sub-table in the 
>>>>>>>>>> composition.
>>>>>>>>>>
>>>>>>>>>> Which one is better?  Can you share a reproducible example?
>>>>>>>>>>
>>>>>>>>>> On Thu, Mar 10, 2022 at 12:01 PM Aldrin <[email protected]>
>>>>>>>>>> wrote:
>>>>>>>>>> >
>>>>>>>>>> > Hello!
>>>>>>>>>> >
>>>>>>>>>> > I'm wondering if there's any documentation that describes the
>>>>>>>>>> concurrency/parallelism architecture for the compute API. I'd also be
>>>>>>>>>> interested if there are recommended approaches for seeing 
>>>>>>>>>> performance of
>>>>>>>>>> threads used by Arrow--should I try to check a processor ID and infer
>>>>>>>>>> performance or are there particular tools that the community uses?
>>>>>>>>>> >
>>>>>>>>>> > Specifically, I am wondering if the concurrency is going to be
>>>>>>>>>> different when using a ChunkedArray as an input compared to an Array 
>>>>>>>>>> or for
>>>>>>>>>> ChunkedArrays with various chunk sizes (1 chunk vs tens or 
>>>>>>>>>> hundreds). I see
>>>>>>>>>> a large difference between the total time to apply compute functions 
>>>>>>>>>> to a
>>>>>>>>>> single table (concatenated from many small tables) compared to 
>>>>>>>>>> applying
>>>>>>>>>> compute functions to each sub-table in the composition. I'm trying to
>>>>>>>>>> figure out where that difference may come from and I'm wondering if 
>>>>>>>>>> it's
>>>>>>>>>> related to parallelism within Arrow.
>>>>>>>>>> >
>>>>>>>>>> > I tried using the github issues and JIRA issues (e.g.  [1]) as
>>>>>>>>>> a way to sleuth the info, but I couldn't find anything. The pyarrow 
>>>>>>>>>> API
>>>>>>>>>> seems to have functions I could try and use to figure it out 
>>>>>>>>>> (cpu_count and
>>>>>>>>>> set_cpu_count), but that seems like a vague road.
>>>>>>>>>> >
>>>>>>>>>> > [1]: https://issues.apache.org/jira/browse/ARROW-12726
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> > Thank you!
>>>>>>>>>> >
>>>>>>>>>> > Aldrin Montana
>>>>>>>>>> > Computer Science PhD Student
>>>>>>>>>> > UC Santa Cruz
>>>>>>>>>>
>>>>>>>>>
>>>>>
>>>>> --
>>>>> Niranda Perera
>>>>> https://niranda.dev/
>>>>> @n1r44 <https://twitter.com/N1R44>
>>>>>
>>>>>
>>>>
>>>> --
>>>> Niranda Perera
>>>> https://niranda.dev/
>>>> @n1r44 <https://twitter.com/N1R44>
>>>>
>>>>
>>
>> --
>> Niranda Perera
>> https://niranda.dev/
>> @n1r44 <https://twitter.com/N1R44>
>>
>>

Reply via email to