Hello Weston,
Thank you very much for your detailed explanation. I was under the
impression that the compute functions take advantage of parallelism, which
was clearly incorrect.
For some functionality, Acero is the way forward for me.

Also, I'd like to know if I can implement custom kernels (for sort_indices
and take, for example) which can take advantage of parallelism along the
lines of thrust::stable_sort_by_key
<https://thrust.github.io/doc/group__sorting_gaee08ac5db848d5d3a8704bd6e7f3869b.html>
and thrust::gather
<https://thrust.github.io/doc/group__gathering_ga86722e76264fb600d659c1adef5d51b2.html#ga86722e76264fb600d659c1adef5d51b2>
(utilizing tbb ?)
Our application is already using thrust for cpu (using tbb) and gpu
parallelism

Thanks,
Surya



On Sat, Apr 22, 2023 at 6:21 PM Weston Pace <[email protected]> wrote:

> I'm not entirely sure what your use case is.  If you're running compute
> functions on giant arrays then you will not get any parallelism.  Compute
> functions do not, themselves, exploit parallelism.  Instead we normally
> achieve parallelism in Acero by running batches through the compute
> functions in parallel.  So, if you have a very large table, you would get
> some parallelism by creating a plan with a table_source and a project
> node.  The table source will peel off batches of size 1Mi (by default) and
> run your expressions in parallel on those batches.  You could also do this
> parallel slicing and function execution yourself without Acero.
>
> However, you mention `sort_indices`, neither of which is a scalar function
> (and thus can't be used in the project node), and not something that can
> easily be computed with a divide-and-conquer fork join.  There is an
> order_by node in Acero but it isn't terribly optimized.  It simply collects
> all batches into one large chunked array and runs the sort_indices compute
> function.  Parallel sorting is very much possible so there is definitely
> room for improvement here.
>
>  * We could make the sort_indices kernel itself take advantage of
> parallelism.  This would be a little unique (kernel functions don't
> traditionally take advantage of parallelism) but now that we are on C++17
> this could be as simple as using STL's parallel stable_sort.
>  * Another approach to modifying the sort_indices kernel could be to
> modify the ChunkedArraySorter.  When sorting chunked arrays we sort each
> chunk individually and then merge them.  The sorting of individual chunks
> is an easily parallelizable thing but we do this serially today.
>  * Or we could leave the sort_indices kernel alone and modify the order_by
> node to sort in parallel and then merge instead of relying on a chunked
> array compute kernel.
>
> You also mention `take`.  This is another non-scalar function.  However,
> if you have a large list of indices, you can probably implement a fork-join
> on top of this yourself.  Then again, if I recall correctly, you are
> applying many filters.  So it may be possible that you are only using
> `take` indirectly.
>
> On Fri, Apr 21, 2023 at 7:18 PM Surya Kiran Gullapalli <
> [email protected]> wrote:
>
>> Hi,
>> Thanks for the reply and the suggestion on custom executor. I'll take a
>> look at it.
>>
>> I was profiling my application (windows 10) which is dealing with
>> millions of rows (roughly 10M) of data and I found some places where only a
>> few cpus were engaged and the rest of them were sitting idle. On
>> investigating further, I found out the time spent was in arrow compute
>> calls (take, sort_indices etc). In fact out of 12 cores, only one core was
>> doing work. The CPU thread count was showing 24 (12 * 2). So I'm trying to
>> find out if there's any way I can improve the performance as my application
>> is already using TBB for some tasks.
>>
>> Any pointers in this direction would be greatly appreciated.
>>
>> Thanks,
>> Surya
>>
>> On Sat, Apr 22, 2023 at 2:27 AM Weston Pace <[email protected]>
>> wrote:
>>
>>> No, there's no build-time configuration settings to enable TBB
>>> specifically.
>>>
>>> You can, at runtime, specify a custom executor to use for most
>>> operations.  We use one thread pool for CPU tasks and one for I/O tasks.
>>> You could replace either or both with a TBB-based executor.
>>>
>>> For example, the method for creating a CSV file is defined as:
>>>
>>> ```
>>> static Future<std::shared_ptr<StreamingReader>> MakeAsync(
>>>   io::IOContext io_context, std::shared_ptr<io::InputStream> input,
>>>   arrow::internal::Executor* cpu_executor, const ReadOptions&, const
>>> ParseOptions&,
>>>   const ConvertOptions&);
>>> ```
>>>
>>> The `cpu_executor` property specifies which thread pool to use for CPU
>>> tasks.  The I/O executor is a part of the `io_context`.
>>>
>>> The executor interface is pretty straightforward.  Hiding the utility
>>> functions it is...
>>>
>>> ```
>>> class ARROW_EXPORT Executor {
>>> public:
>>>   virtual int GetCapacity() = 0;
>>> protected:
>>>   virtual Status SpawnReal(TaskHints hints, FnOnce<void()> task,
>>> StopToken, StopCallback&&) = 0;
>>> };
>>> ```
>>>
>>> It shouldn't be too much work to create a custom implementation based on
>>> TBB.  Out of curiosity, what is the motivation for using TBB?
>>>
>>> -Weston
>>>
>>> On Fri, Apr 21, 2023 at 11:04 AM Surya Kiran Gullapalli <
>>> [email protected]> wrote:
>>>
>>>> Hello,
>>>> I'm curious to know if c++ sdk of arrow compute functions can use tbb
>>>> parallelization underneath ?
>>>> The documentation mentions that arrow uses a threadpool for
>>>> parallelization. Does compute functions also use threadpool and parallelize
>>>> computation ?
>>>>
>>>> Looking at the .so file created I do not see tbb library as a
>>>> dependency for arrow library.
>>>>
>>>> Is there a configuration variable during build which can activate this
>>>> ?
>>>>
>>>> Thanks,
>>>> Surya
>>>>
>>>

Reply via email to