On #2, I think this discussion might be overly speculative until a
full-fledged multithreaded hash aggregation is implemented in the
Apache Arrow C++ library. There are other analytic database systems in
the wild which might provide a blueprint for the way that we should
approach this, and I don't know that it should be constrained by the
particular details of the existing kernel machinery we have
implemented.

To be clear, it would be great to see this work happen in the Apache
project rather than in third party projects.

On Wed, Nov 18, 2020 at 10:20 AM Benjamin Kietzman <bengil...@gmail.com> wrote:
>
> 1: Excellent!
>
> 2: The open JIRA for grouped aggregation is
> https://issues.apache.org/jira/browse/ARROW-4124
> (though it's out of date since it predates the addition of
> ScalarAggregateKernel).
> To summarize: for *grouped* aggregation we want the kernel to do the work
> of evaluating group
> condition(s) and the corresponding KernelState should include any hash
> tables and a columnar
> (for mean, this would be an array of struct<sum: T, count: int64>) store of
> pre-finalized results.
> Pushing this into the kernel yields significant perf wins since layout of
> state can be controlled
> more freely and we avoid talking across a virtual/type erased interface
> inside a hot loop.
> A sketch of how ARROW-4124 could be resolved within the
> compute::{Function,Kernel} interface:
> - add compute functions like "grouped_mean" which are binary.
> - the first argument is the array to aggregate while the second is the
> grouping condition
> - compute kernels for these functions can probably still be
> ScalarAggregateKernels (though after
>   this we may need to rename them); the interface to implement would be
> ScalarAggregator, which
>   is in compute/kernels/aggregate_internal.h
> - The new ScalarAggregator would be able to reuse existing arrow internals
> such as our HashTable
>   and could store pre-finalized results in a mutable ArrayData (which would
> make finalizing pretty
>   trivial).
>
> This returns to your original question about how to get access to arrow
> internals like ScalarAggregator
> outside the arrow repo. However hopefully after this discussion it seems
> feasible and preferable to
> add the functionality you need upstream
>
> On Tue, Nov 17, 2020 at 9:58 PM Niranda Perera <niranda.per...@gmail.com>
> wrote:
>
> > 1. This is great. I will follow this JIRA. (better yet, I'll see if I can
> > make that contribution)
> >
> > 2. If we forget about the multithreading case for a moment, this
> > requirement came up while implementing a "groupby + aggregation" operation
> > (single-threaded). Let's assume that a table is not sorted. So, the
> > simplest approach would be to keep the intermediate state in a container
> > (map/vector) and update the state while traversing the table. This approach
> > becomes important when there is a large number of groups and there aren't
> > enough rows with the same key to use 'consume' vector aggregation (on a
> > sorted table).
> >
> >
> >
> > On Tue, Nov 17, 2020 at 10:54 AM Benjamin Kietzman <bengil...@gmail.com>
> > wrote:
> >
> >> Hi Niranda,
> >>
> >> hastebin: That looks generally correct, though I should warn you that a
> >> recent PR
> >> ( https://github.com/apache/arrow/pull/8574 ) changed the return type of
> >> DispatchExact to Kernel so you'll need to insert an explicit cast to
> >> ScalarAggregateKernel.
> >>
> >> 1: This seems like a feature which might be generally useful to consumers
> >> of
> >> the compute module, so it's probably worth adding to the KernelState
> >> interface
> >> in some way rather than exposing individual implementations. I've opened
> >> https://issues.apache.org/jira/browse/ARROW-10630 to track this feature
> >>
> >> 2: I would not expect your container to contain a large number of
> >> KernelStates.
> >> Specifically: why would you need more than one per local thread?
> >> Additionally
> >> for the specific case of aggregation I'd expect that KernelStates not
> >> actively in
> >> use would be `merge`d and no longer stored. With small numbers of
> >> instances
> >> I would expect the memory overhead due to polymorphism to be negligible.
> >>
> >> On Mon, Nov 16, 2020 at 7:03 PM Niranda Perera <niranda.per...@gmail.com>
> >> wrote:
> >>
> >>> Hi Ben and Wes,
> >>> Based on our discussion, I did the following.
> >>> https://hastebin.com/ajadonados.cpp
> >>>
> >>> It seems to be working fine. Would love to get your feedback on this!
> >>> :-)
> >>>
> >>> But I have a couple of concerns.
> >>> 1. Say I want to communicate the intermediate state data across multiple
> >>> processes. Unfortunately, KernelState struct does not expose the data
> >>> pointer to the outside. If say SumState is exposed, we could have accessed
> >>> that data, isn't it? WDYT?
> >>> 2. Polymorphism and virtual functions - Intuitively, a mean aggregation
> >>> intermediate state would be a {T, int64} tuple. But I believe the size of
> >>> struct "SumImpl : public ScalarAggregator (:public KernelState)" would be
> >>> sizeof(T) + sizeof(int64) + sizeof(ScalarAggregator) + sizeof(vptr),
> >>> isn't it? So, if I am managing a compute state container, this means that
> >>> my memory requirement would be higher than simply using a {T, int64} 
> >>> tuple.
> >>> Please correct me if I am wrong. I am not sure if there is a better
> >>> solution to this, but just want to discuss it with you.
> >>>
> >>>
> >>> On Tue, Nov 10, 2020 at 9:44 AM Wes McKinney <wesmck...@gmail.com>
> >>> wrote:
> >>>
> >>>> Yes, open a Jira and propose a PR implementing the changes you need
> >>>>
> >>>> On Mon, Nov 9, 2020 at 8:31 PM Niranda Perera <niranda.per...@gmail.com>
> >>>> wrote:
> >>>> >
> >>>> > @wes How should I proceed with this nevertheless? should I open a
> >>>> JIRA?
> >>>> >
> >>>> > On Mon, Nov 9, 2020 at 11:09 AM Wes McKinney <wesmck...@gmail.com>
> >>>> wrote:
> >>>> >
> >>>> > > On Mon, Nov 9, 2020 at 9:32 AM Niranda Perera <
> >>>> niranda.per...@gmail.com>
> >>>> > > wrote:
> >>>> > > >
> >>>> > > > @Ben
> >>>> > > > Thank you very much for the feedback. But unfortunately, I was
> >>>> unable to
> >>>> > > > find a header that exposes a SumAggregateKernel in the v2.0.0.
> >>>> Maybe I am
> >>>> > > > checking it wrong. I remember accessing them in v0.16 IINM.
> >>>> > > >
> >>>> > > > @Wes
> >>>> > > > Yes, that would be great. How about adding a CMake compilation
> >>>> flag for
> >>>> > > > such dev use cases?
> >>>> > > >
> >>>> > >
> >>>> > > This seems like it could cause more problems -- I think it would be
> >>>> > > sufficient to use an "internal::" C++ namespace and always install
> >>>> the
> >>>> > > relevant header file
> >>>> > >
> >>>> > > >
> >>>> > > >
> >>>> > > > On Sun, Nov 8, 2020 at 9:14 PM Wes McKinney <wesmck...@gmail.com>
> >>>> wrote:
> >>>> > > >
> >>>> > > > > I'm not opposed to installing headers that provide access to
> >>>> some of
> >>>> > > > > the kernel implementation internals (with the caveat that
> >>>> changes
> >>>> > > > > won't go through a deprecation cycle, so caveat emptor). It
> >>>> might be
> >>>> > > > > more sustainable to think about what kind of stable-ish public
> >>>> API
> >>>> > > > > could be exported to support applications like Cylon.
> >>>> > > > >
> >>>> > > > > On Sun, Nov 8, 2020 at 10:37 AM Ben Kietzman <
> >>>> b...@ursacomputing.com>
> >>>> > > > > wrote:
> >>>> > > > > >
> >>>> > > > > > Hi Niranda,
> >>>> > > > > >
> >>>> > > > > > SumImpl is a subclass of KernelState. Given a
> >>>> SumAggregateKernel,
> >>>> > > one can
> >>>> > > > > > produce zeroed KernelState using the `init` member, then
> >>>> operate on
> >>>> > > data
> >>>> > > > > > using the `consume`, `merge`, and `finalize` members. You can
> >>>> look at
> >>>> > > > > > ScalarAggExecutor for an example of how to get from a compute
> >>>> > > function to
> >>>> > > > > > kernels and kernel state. Will that work for you?
> >>>> > > > > >
> >>>> > > > > > Ben Kietzman
> >>>> > > > > >
> >>>> > > > > > On Sun, Nov 8, 2020, 11:21 Niranda Perera <
> >>>> niranda.per...@gmail.com>
> >>>> > > > > wrote:
> >>>> > > > > >
> >>>> > > > > > > Hi Ben,
> >>>> > > > > > >
> >>>> > > > > > > We are building a distributed table abstraction on top of
> >>>> Arrow
> >>>> > > > > dataframes
> >>>> > > > > > > called Cylon (https://github.com/cylondata/cylon).
> >>>> Currently we
> >>>> > > have a
> >>>> > > > > > > simple aggregation and group-by operation implementation.
> >>>> But we
> >>>> > > felt
> >>>> > > > > like
> >>>> > > > > > > we can give more functionality if we can import arrow
> >>>> kernels and
> >>>> > > > > states to
> >>>> > > > > > > corresponding cylon distributed kernels.
> >>>> > > > > > > Ex: For distributed mean, we would have to communicate the
> >>>> local
> >>>> > > arrow
> >>>> > > > > > > SumState and then do a SumImpl::MergeFrom() and the call
> >>>> Finalize.
> >>>> > > > > > > Is there any other way to access these intermediate states
> >>>> from
> >>>> > > compute
> >>>> > > > > > > operations?
> >>>> > > > > > >
> >>>> > > > > > > On Sun, Nov 8, 2020 at 11:11 AM Ben Kietzman <
> >>>> > > b...@ursacomputing.com>
> >>>> > > > > > > wrote:
> >>>> > > > > > >
> >>>> > > > > > > > Ni Niranda,
> >>>> > > > > > > >
> >>>> > > > > > > > What is the context of your work? if you're working
> >>>> inside the
> >>>> > > arrow
> >>>> > > > > > > > repository you shouldn't need to install headers before
> >>>> using
> >>>> > > them,
> >>>> > > > > and
> >>>> > > > > > > we
> >>>> > > > > > > > welcome PRs for new kernels. Otherwise, could you provide
> >>>> some
> >>>> > > > > details
> >>>> > > > > > > > about how your work is using Arrow as a dependency?
> >>>> > > > > > > >
> >>>> > > > > > > > Ben Kietzman
> >>>> > > > > > > >
> >>>> > > > > > > > On Sun, Nov 8, 2020, 10:57 Niranda Perera <
> >>>> > > niranda.per...@gmail.com>
> >>>> > > > > > > > wrote:
> >>>> > > > > > > >
> >>>> > > > > > > > > Hi,
> >>>> > > > > > > > >
> >>>> > > > > > > > > I was wondering if I could use the
> >>>> > > > > arrow/compute/kernels/*internal.h
> >>>> > > > > > > > > headers in my work? I would like to reuse some of the
> >>>> kernel
> >>>> > > > > > > > > implementations and kernel states.
> >>>> > > > > > > > >
> >>>> > > > > > > > > With -DARROW_COMPUTE=ON, those headers are not added
> >>>> into the
> >>>> > > > > include
> >>>> > > > > > > > dir.
> >>>> > > > > > > > > I see that the *internal.h headers are skipped from
> >>>> > > > > > > > > the ARROW_INSTALL_ALL_HEADERS cmake function
> >>>> unfortunately.
> >>>> > > > > > > > >
> >>>> > > > > > > > > Best
> >>>> > > > > > > > > --
> >>>> > > > > > > > > Niranda Perera
> >>>> > > > > > > > > @n1r44 <https://twitter.com/N1R44>
> >>>> > > > > > > > > +1 812 558 8884 / +94 71 554 8430
> >>>> > > > > > > > > https://www.linkedin.com/in/niranda
> >>>> > > > > > > > >
> >>>> > > > > > > >
> >>>> > > > > > >
> >>>> > > > > > >
> >>>> > > > > > > --
> >>>> > > > > > > Niranda Perera
> >>>> > > > > > > @n1r44 <https://twitter.com/N1R44>
> >>>> > > > > > > +1 812 558 8884 / +94 71 554 8430
> >>>> > > > > > > https://www.linkedin.com/in/niranda
> >>>> > > > > > >
> >>>> > > > >
> >>>> > > >
> >>>> > > >
> >>>> > > > --
> >>>> > > > Niranda Perera
> >>>> > > > @n1r44 <https://twitter.com/N1R44>
> >>>> > > > +1 812 558 8884 / +94 71 554 8430
> >>>> > > > https://www.linkedin.com/in/niranda
> >>>> > >
> >>>> >
> >>>> >
> >>>> > --
> >>>> > Niranda Perera
> >>>> > @n1r44 <https://twitter.com/N1R44>
> >>>> > +1 812 558 8884 / +94 71 554 8430
> >>>> > https://www.linkedin.com/in/niranda
> >>>>
> >>>
> >>>
> >>> --
> >>> Niranda Perera
> >>> @n1r44 <https://twitter.com/N1R44>
> >>> +1 812 558 8884 / +94 71 554 8430
> >>> https://www.linkedin.com/in/niranda
> >>>
> >>
> >
> > --
> > Niranda Perera
> > @n1r44 <https://twitter.com/N1R44>
> > +1 812 558 8884 / +94 71 554 8430
> > https://www.linkedin.com/in/niranda
> >

Reply via email to