Hi Anton,

Another possibility is to look at our C++ CSV reader and parser (in
src/arrow/csv).  It's the only piece of Arrow that uses non-trivial
multi-threading right now (with tasks spawning new tasks dynamically,
see InferringColumnBuilder).  It's based on the ThreadPool and TaskGroup
APIs (in src/arrow/util/).  These APIs are not set in stone, so you're
free to propose changes to make them fit better with a TBB-based
implementation.

Regards

Antoine.


Le 03/05/2019 à 01:42, Malakhov, Anton a écrit :
> Thanks Wes!
> 
> Sounds like a good way to go! We'll create a demo, as you suggested, 
> implementing a parallel execution model for a simple analytics pipeline that 
> reads and processes the files. My only concern is about adding more pipeline 
> breaker nodes and compute intensive operations into this demo because min/max 
> are effectively no-ops fused into I/O scan node. What do you think about 
> adding group-by into this picture, effectively implementing NY taxi and/or 
> mortgage benchmarks? Ideally, I'd like to go even further and add sci-kit 
> learn-like stuff for processing that data in order to demonstrate the 
> co-existence side of the story. What do you think?
> So, the idea of the prototype will be to validate the parallel execution 
> model as the first step. After that, it'll help to shape API for both - 
> execution nodes and the threading backend. Does it sound right to you?
> 
> P.S. I can well understand your hesitation about using TBB directly and as 
> non-optional dependency, thus I'm suggesting threading layers approach here. 
> Please let me clarify myself, using TBB and nested parallelism is non-goal by 
> itself. The goal is to build components of efficient execution model, which 
> coexist well with each other and with all the other, external to Arrow, 
> components of an applications. However, without a rich, composable, and 
> mature parallel toolkit, it is hard to achieve and to focus on this goal. 
> Thus, I wanted to check with the community if it is an acceptable way at all 
> and what's the roadmap.
> 
> Thanks,
> // Anton
> 
> 
> -----Original Message-----
> From: Wes McKinney [mailto:wesmck...@gmail.com] 
> Sent: Thursday, May 2, 2019 13:52
> To: dev@arrow.apache.org
> Subject: Re: [DISCUSS][C++][Proposal] Threading engine for Arrow
> 
> hi Anton,
> 
> Thank you for bringing your expertise to the project -- this is a very useful 
> discussion to have.
> 
> Partly why our threading capabilities in the project are not further 
> developed is that there is not much that needs to be parallelized. It would 
> be like designing a supercharger when you don't have a car yet.
> That being said, it is worthwhile to plan ahead so we aren't trying to 
> retrofit significant pieces of software to be able to take advantage of a 
> more advanced task scheduler.
> 
> From my perspective, we have a few key practical areas of consideration:
> 
> * Computational tasks that may offer nested parallelism (e.g. an Aggregation 
> or Projection task may be able to execution in multiple
> threads)
> * IO operations performed from within tasks that appear to be computational 
> in nature (example: in the course of reading a Parquet file, both computation 
> -- decoding, decompression -- and IO -- local or remote filesystem operations 
> -- must be performed). The status quo right now is that IO performed inside a 
> task in the thread pool is not releasing any resources to other tasks.
> 
> I believe that we should design and develop a sane programming model / API 
> for implementing our software in the presence of these challenges.
> If the backend / implementation of this API uses TBB and that makes things 
> more efficient than other approaches, then that sounds great to me. I would 
> be hesitant to use TBB APIs directly in Arrow application code unless it can 
> be clearly demonstrated by that is a superior option to alternatives.
> 
> It seems useful to validate the implementation approach by starting with some 
> practical problems. Suppose, for the sake of argument, you want to read 10 
> Parquet files (constituting a single logical dataset) as fast as possible and 
> perform some simple analytics on them -- let's take something very simple 
> like computing the maximum and minimum values of each column in the dataset. 
> This problem features both problems listed above:
> 
> * Reading a single Parquet file can be parallelized (by columns -- since 
> columns can be decoded in parallel) on the global thread pool, so reading 
> multiple files in parallel would cause nested parallelism
> * Within the context of reading a single Parquet file column, IO calls are 
> performed. CPU threads sit idle while this IO is taking place, particularly 
> if the file system is high latency (e.g. HDFS)
> 
> What do you think about -- as a way of moving this project forward -- 
> developing a prototype threading backend and developer API (for people like 
> me to use to develop libraries like the Parquet library) that addresses these 
> issues? I think it could be difficult to build consensus around a threading 
> backend developed in the abstract.
> 
> Thanks
> Wes
> 
> On Tue, Apr 30, 2019 at 9:28 PM Malakhov, Anton <anton.malak...@intel.com> 
> wrote:
>>
>> Hi dear Arrow developers, Antoine,
>>
>> I'd like to kick off the discussion of the threading engine that Arrow can 
>> use underneath for implementing multicore parallelism for execution nodes, 
>> kernels, and/or all the functions, which can be optimized this way.
>> I've documented some ideas on Arrow's Confluence Wiki: 
>> https://cwiki.apache.org/confluence/display/ARROW/Parallel+Execution+E
>> ngine The bottom line is that while Arrow is moving into the right 
>> direction introducing shared thread pool, there are some questions and 
>> concerns about current implementation and the way how it is supposed to 
>> co-exist with other threaded libraries ("threading composability") while 
>> providing efficient nestable NUMA&cache-aware data and data-flow parallelism.
>> I suggest to introduce threading layers like in other libraries like MKL and 
>> Numba, starting with TBB-based layer. Or maybe even use TBB directly. In 
>> short, there are the following arguments for it:
>>
>> 1.      Designed for composability from day zero. Avoids mandatory 
>> parallelism. Provides work stealing and FIFO scheduling. Compatible with 
>> parallel depth first scheduling (a better composability research).
>>
>> 2.      TBB Flow Graph. It fits nicely into data flow and execution nodes 
>> model of SQL databases. Besides basic nodes needed for implementing an 
>> execution engine, it also provides a foundation for heterogeneous and 
>> distributed computing (async_node, opencl_node, distributed_node)
>>
>> 3.      Arrow's ThreadPool, TaskGroup, and ParallelFor have direct 
>> equivalent in TBB: task_arena, task_group, and parallel_for while providing 
>> mature and performant implementation, which solves many if not all of the 
>> XXX todo notes in the comments like exceptions, singletons and time of 
>> initialization, lock-free.
>>
>> 4.      Concurrent hash tables, queues, vector and other concurrent 
>> containers. Hash tables are required for implementing parallel versions of 
>> joins, groupby, uniq, dictionary operations. There is a contribution to 
>> integrate libcuckoo under TBB interface.
>>
>> 5.      TBB scalable malloc and memory pools, which can use any 
>> user-provided memory chunk for scalable allocation. Arrow uses jemalloc, 
>> which is slower in some cases than tbbmalloc or tcmalloc.
>>
>> 6.      OpenMP is good for NUMA with static schedule, however, there is no 
>> good answer for dynamic tasks, graphs. TBB provides tools for implementing 
>> NUMA support: task_arena, task_scheduler_observer, task affinity & 
>> priorities, committed to improve NUMA for its other customers in 2019.
>>
>> 7.      TBB is licensed under Apache 2.0, has conda-forge feedstock, 
>> supports CMake, it's adopted for CPU scheduling by other industry players, 
>> has multiple ports for other OSes and CPU arches.
>>
>> Full disclosure: I was TBB developer before its 1.0 version, responsible for 
>> multiple core components like hash tables, adaptive partitioning, interfaces 
>> of memory pools and task_arena, all of these are very relevant to Arrow. 
>> I've background in scalability and NUMA-aware performance optimization like 
>> what we did for OpenCL runtime for CPU (TBB-based). I also was behind 
>> optimizations for Intel Distribution for Python and its threading 
>> composability 
>> story<https://software.intel.com/en-us/blogs/2016/04/04/unleash-parallel-performance-of-python-programs>.
>>  Thus, I'm sincerely hope to reuse all these stuff in order to deliver the 
>> best performance for Arrow.
>>
>>
>> Best regards,
>> Anton Malakhov<http://www.linkedin.com/in/antonmalakhov>
>> IAGS Scripting Analyzers & Tools
>>
>> O: +1-512-3620-512
>> 1300 S. MoPac Expy
>> Office:  AN4-C1-D4
>> Austin, TX 78746
>> Intel Corporation | www.intel.com
>>

Reply via email to