Re: [DISCUSS][C++][Proposal] Threading engine for Arrow

2019-07-23 Thread Wes McKinney
Useful read on this topic today from the Julia language

https://julialang.org/blog/2019/07/multithreading

On Tue, Jul 23, 2019, 12:22 AM Jacques Nadeau  wrote:

> There are two main things that have been important to us in Dremio around
> threading:
>
> Separate threading model from algorithms. We chose to do parallelization at
> the engine level instead of the operation level. This allows us to
> substantially increase parallelization while still maintaining a strong
> thread prioritization model. This contrasts to some systems like Apache
> Impala which chose to implement threading at the operation level. This has
> ultimately hurt their ability for individual workloads to scale out within
> a node. See the experimental features around MT_DOP when the tried to
> retreat from this model and struggled to do so. It serves as an example of
> the challenges if you don't separate data algorithms from threading early
> on in design [1]. This intention was core to how we designed Gandiva, where
> an external driver makes decisions around threading and the actual
> algorithm only does small amounts of work before yielding to the driver.
> This allows a driver to make parallelization and scheduling decisions
> without having to know the internals of the algorithm. (In Dremio, these
> are all covered under the interfaces described in Operator [2] and it's
> subclasses that together provide a very simple state of operation states
> for the driver to understand.
>
> The second is that the majority of the data we work with these days is
> primarily in high latency cloud storage. While we may stage data locally, a
> huge amount of reads are impacted by the performance of cloud stores. To
> cover these performance behaviors we did two things, the first was
> introduce a very simple to use  async reading interface for data, seen at
> [3] and introduce a collaborative way that individual tasks could declare
> their blocking state to a central coordinator [4]. Happy to cover these in
> more detail if people are interested. In general, using these techniques
> have allowed us to tune many systems to a situation where the (highly)
> variable latency of cloud stores like S3 and ADLS can be mostly cloaked by
> aggressive read ahead and what we call predictive pipelining (where reading
> is guided based on latency performance characteristics along with knowledge
> of columnar formats like Parquet).
>
> [1]
>
> https://www.cloudera.com/documentation/enterprise/latest/topics/impala_mt_dop.html#mt_dop
> [2]
>
> https://github.com/dremio/dremio-oss/blob/master/sabot/kernel/src/main/java/com/dremio/sabot/op/spi/Operator.java
> [3]
>
> https://github.com/dremio/dremio-oss/blob/master/sabot/kernel/src/main/java/com/dremio/exec/store/dfs/async/AsyncByteReader.java
> [4]
>
> https://github.com/dremio/dremio-oss/blob/master/sabot/kernel/src/main/java/com/dremio/sabot/threads/sharedres/SharedResourceManager.java
>
> On Mon, Jul 22, 2019 at 9:56 AM Antoine Pitrou  wrote:
>
> >
> > Le 22/07/2019 à 18:52, Wes McKinney a écrit :
> > >
> > > Probably the way is to introduce async-capable read APIs into the file
> > > interfaces. For example:
> > >
> > > file->ReadAsyncBlock(thread_ctx, ...);
> > >
> > > That way the file implementation can decide whether asynchronous logic
> > > is actually needed.
> > > I doubt very much that a one-size-fits-all
> > > concurrency solution can be developed -- in some applications
> > > coarse-grained IO and CPU task scheduling may be warranted, but we
> > > need to have a solution for finer-grained scenarios where
> > >
> > > * In the memory-mapped case, there is no overhead and
> > > * The programming model is not too burdensome to the library developer
> >
> > Well, the asynchronous I/O programming model *will* be burdensome at
> > least until C++ gets coroutines (which may happen in C++20, and
> > therefore be usable somewhere around 2024 for Arrow?).
> >
> > Regards
> >
> > Antoine.
> >
>


Re: [DISCUSS][C++][Proposal] Threading engine for Arrow

2019-07-22 Thread Jacques Nadeau
There are two main things that have been important to us in Dremio around
threading:

Separate threading model from algorithms. We chose to do parallelization at
the engine level instead of the operation level. This allows us to
substantially increase parallelization while still maintaining a strong
thread prioritization model. This contrasts to some systems like Apache
Impala which chose to implement threading at the operation level. This has
ultimately hurt their ability for individual workloads to scale out within
a node. See the experimental features around MT_DOP when the tried to
retreat from this model and struggled to do so. It serves as an example of
the challenges if you don't separate data algorithms from threading early
on in design [1]. This intention was core to how we designed Gandiva, where
an external driver makes decisions around threading and the actual
algorithm only does small amounts of work before yielding to the driver.
This allows a driver to make parallelization and scheduling decisions
without having to know the internals of the algorithm. (In Dremio, these
are all covered under the interfaces described in Operator [2] and it's
subclasses that together provide a very simple state of operation states
for the driver to understand.

The second is that the majority of the data we work with these days is
primarily in high latency cloud storage. While we may stage data locally, a
huge amount of reads are impacted by the performance of cloud stores. To
cover these performance behaviors we did two things, the first was
introduce a very simple to use  async reading interface for data, seen at
[3] and introduce a collaborative way that individual tasks could declare
their blocking state to a central coordinator [4]. Happy to cover these in
more detail if people are interested. In general, using these techniques
have allowed us to tune many systems to a situation where the (highly)
variable latency of cloud stores like S3 and ADLS can be mostly cloaked by
aggressive read ahead and what we call predictive pipelining (where reading
is guided based on latency performance characteristics along with knowledge
of columnar formats like Parquet).

[1]
https://www.cloudera.com/documentation/enterprise/latest/topics/impala_mt_dop.html#mt_dop
[2]
https://github.com/dremio/dremio-oss/blob/master/sabot/kernel/src/main/java/com/dremio/sabot/op/spi/Operator.java
[3]
https://github.com/dremio/dremio-oss/blob/master/sabot/kernel/src/main/java/com/dremio/exec/store/dfs/async/AsyncByteReader.java
[4]
https://github.com/dremio/dremio-oss/blob/master/sabot/kernel/src/main/java/com/dremio/sabot/threads/sharedres/SharedResourceManager.java

On Mon, Jul 22, 2019 at 9:56 AM Antoine Pitrou  wrote:

>
> Le 22/07/2019 à 18:52, Wes McKinney a écrit :
> >
> > Probably the way is to introduce async-capable read APIs into the file
> > interfaces. For example:
> >
> > file->ReadAsyncBlock(thread_ctx, ...);
> >
> > That way the file implementation can decide whether asynchronous logic
> > is actually needed.
> > I doubt very much that a one-size-fits-all
> > concurrency solution can be developed -- in some applications
> > coarse-grained IO and CPU task scheduling may be warranted, but we
> > need to have a solution for finer-grained scenarios where
> >
> > * In the memory-mapped case, there is no overhead and
> > * The programming model is not too burdensome to the library developer
>
> Well, the asynchronous I/O programming model *will* be burdensome at
> least until C++ gets coroutines (which may happen in C++20, and
> therefore be usable somewhere around 2024 for Arrow?).
>
> Regards
>
> Antoine.
>


Re: [DISCUSS][C++][Proposal] Threading engine for Arrow

2019-07-22 Thread Antoine Pitrou


Le 22/07/2019 à 18:52, Wes McKinney a écrit :
> 
> Probably the way is to introduce async-capable read APIs into the file
> interfaces. For example:
> 
> file->ReadAsyncBlock(thread_ctx, ...);
> 
> That way the file implementation can decide whether asynchronous logic
> is actually needed.
> I doubt very much that a one-size-fits-all
> concurrency solution can be developed -- in some applications
> coarse-grained IO and CPU task scheduling may be warranted, but we
> need to have a solution for finer-grained scenarios where
> 
> * In the memory-mapped case, there is no overhead and
> * The programming model is not too burdensome to the library developer

Well, the asynchronous I/O programming model *will* be burdensome at
least until C++ gets coroutines (which may happen in C++20, and
therefore be usable somewhere around 2024 for Arrow?).

Regards

Antoine.


Re: [DISCUSS][C++][Proposal] Threading engine for Arrow

2019-07-22 Thread Wes McKinney
On Mon, Jul 22, 2019 at 11:42 AM Antoine Pitrou  wrote:
>
> On Mon, 22 Jul 2019 11:07:43 -0500
> Wes McKinney  wrote:
> >
> > Right, which is why I'm suggesting a simple model to allow threads
> > that are waiting on IO to allow other threads to execute.
>
> If you are doing memory-mapped IO, how do you plan to tell whether and
> when you'll be going to wait for IO?
>

Probably the way is to introduce async-capable read APIs into the file
interfaces. For example:

file->ReadAsyncBlock(thread_ctx, ...);

That way the file implementation can decide whether asynchronous logic
is actually needed. I doubt very much that a one-size-fits-all
concurrency solution can be developed -- in some applications
coarse-grained IO and CPU task scheduling may be warranted, but we
need to have a solution for finer-grained scenarios where

* In the memory-mapped case, there is no overhead and
* The programming model is not too burdensome to the library developer

> Regards
>
> Antoine.
>
>


Re: [DISCUSS][C++][Proposal] Threading engine for Arrow

2019-07-22 Thread Antoine Pitrou
On Mon, 22 Jul 2019 11:07:43 -0500
Wes McKinney  wrote:
> 
> Right, which is why I'm suggesting a simple model to allow threads
> that are waiting on IO to allow other threads to execute.

If you are doing memory-mapped IO, how do you plan to tell whether and
when you'll be going to wait for IO?

Regards

Antoine.




Re: [DISCUSS][C++][Proposal] Threading engine for Arrow

2019-07-22 Thread Wes McKinney
On Mon, Jul 22, 2019 at 10:49 AM Antoine Pitrou  wrote:
>
>
> Le 18/07/2019 à 00:25, Wes McKinney a écrit :
> >
> > * We look forward in the stream until we find a complete Thrift data
> > page header. This may trigger 0 or more (possibly multiple) Read calls
> > to the underlying "file" handle. In the default case, the data is all
> > actually in memory so the reads are zero copy buffer slices.
>
> If the file is memory-mapped, it doesn't mean everything is in RAM.
> Starting to read a page may incur a page fault and some unexpected
> blocking I/O.
>
> The solution to hide I/O costs could be to use madvise() (in which case
> the background read is done by the kernel without any need for
> user-visible IO threads).  Similarly, on a regular file one can use
> fadvise().  This may mean that the whole issue of "how to hide I/O for a
> given source" may be stream-specific (for example, if a file is
> S3-backed, perhaps you want to issue a HTTP fetch in background?).
>

I think we need to be designing around remote filesystems with
unpredictable latency and throughput. Anyone involved in data
warehousing systems in the cloud is going to be intimately familiar
with these issues -- a system that's designed around local disk and
memory-mapping generally isn't going to adapt well to remote
filesystems.

> > # Model B (CPU and IO work split into tasks that execute on different
> > thread queues)
> >
> > Pros
> > - Not sure
> >
> > Cons
> > - Could cause performance issues if the IO tasks are mostly free (e.g.
> > due to buffering)
>
> In the model B, the decision of whether to use a background thread or
> some other means of hiding I/O costs could also be pushed down into the
> stream implementation.
>
> > I think we need to investigate some asynchronous C++ programming libraries 
> > like
> >
> > https://github.com/facebook/folly/tree/master/folly/fibers
> >
> > to see how organizations with mature C++ practices are handling these
> > issues from a programming model standpoint
>
> Well, right now our model is synchronous I/O.  If we want to switch to
> asynchronous I/O we'll have to redesign a lot of APIs.  Also, since C++
> doesn't have a convenient story for asynchronous I/O or coroutines
> (yet), this will make programming similarly significantly more painful,
> which is (IMO) something we'd like to avoid.  And I'm not mentioning the
> problem of mapping the C++ asynchronous I/O model on the corresponding
> Python primitives...
>

Right, which is why I'm suggesting a simple model to allow threads
that are waiting on IO to allow other threads to execute. Currently
they block.

>
> More generally, I'm wary of significantly complicating our I/O handling
> until we have reliable reproducers of I/O-originated performance issues
> with Arrow.
>

If it helps, I can spend some time implementing Model A as it relates
to reading Parquet files in parallel. If you introduce a small amount
of latency into reads (10-50ms per read call -- such as you would
experience using Amazon S3) the current synchronous approach will have
significant IO-wait-related performance issues.

> Regards
>
> Antoine.


Re: [DISCUSS][C++][Proposal] Threading engine for Arrow

2019-07-22 Thread Antoine Pitrou


Le 18/07/2019 à 00:25, Wes McKinney a écrit :
> 
> * We look forward in the stream until we find a complete Thrift data
> page header. This may trigger 0 or more (possibly multiple) Read calls
> to the underlying "file" handle. In the default case, the data is all
> actually in memory so the reads are zero copy buffer slices.

If the file is memory-mapped, it doesn't mean everything is in RAM.
Starting to read a page may incur a page fault and some unexpected
blocking I/O.

The solution to hide I/O costs could be to use madvise() (in which case
the background read is done by the kernel without any need for
user-visible IO threads).  Similarly, on a regular file one can use
fadvise().  This may mean that the whole issue of "how to hide I/O for a
given source" may be stream-specific (for example, if a file is
S3-backed, perhaps you want to issue a HTTP fetch in background?).

> # Model B (CPU and IO work split into tasks that execute on different
> thread queues)
> 
> Pros
> - Not sure
> 
> Cons
> - Could cause performance issues if the IO tasks are mostly free (e.g.
> due to buffering)

In the model B, the decision of whether to use a background thread or
some other means of hiding I/O costs could also be pushed down into the
stream implementation.

> I think we need to investigate some asynchronous C++ programming libraries 
> like
> 
> https://github.com/facebook/folly/tree/master/folly/fibers
> 
> to see how organizations with mature C++ practices are handling these
> issues from a programming model standpoint

Well, right now our model is synchronous I/O.  If we want to switch to
asynchronous I/O we'll have to redesign a lot of APIs.  Also, since C++
doesn't have a convenient story for asynchronous I/O or coroutines
(yet), this will make programming similarly significantly more painful,
which is (IMO) something we'd like to avoid.  And I'm not mentioning the
problem of mapping the C++ asynchronous I/O model on the corresponding
Python primitives...


More generally, I'm wary of significantly complicating our I/O handling
until we have reliable reproducers of I/O-originated performance issues
with Arrow.

Regards

Antoine.


Re: [DISCUSS][C++][Proposal] Threading engine for Arrow

2019-07-17 Thread Wes McKinney
I've been looking at little bit at this in the context of Parquet files

One of the read hot paths in cpp/src/parquet is the function that
reads and decompresses data pages from the stream:

(SerializedPageReader::NextPage)
https://github.com/apache/arrow/blob/master/cpp/src/parquet/column_reader.cc#L143

The control flow goes like this:

* We look forward in the stream until we find a complete Thrift data
page header. This may trigger 0 or more (possibly multiple) Read calls
to the underlying "file" handle. In the default case, the data is all
actually in memory so the reads are zero copy buffer slices. The
reason we don't _always_ do zero copy is that some users want to
reduce the RAM footprint of the decoder (so we don't hold the whole
compressed column chunk in memory all at once)
* We deserialize the data page header
* We perform a Read to read the data page body
* We decompress the data page body
* Control returns to the main decoding loop that materializes values
from each data page into the output buffer

Under the programming models proposed

# Model A (CPU threads signal "idleness", causing a temporary increase
in the number of running tasks)

Pros:
- relatively simple for the developer. Instead of writing

stream_->Peek(allowed_page_size, &buffer);

we write something like

exec_ctx_->WaitIO([&]() { stream_->Peek(allowed_page_size, &buffer); };

The IO-wait signal could also be pushed down into the stream_'s
implementation so in the zero-copy case there is no overhead

Cons
- Not sure the context-switching implications since a hot loop might
cause jumps between CPU cores (I'm really out of my depth here...). It
makes me wonder if we need to look at something optimized for high
performance asynchronous task scheduling:

# Model B (CPU and IO work split into tasks that execute on different
thread queues)

Pros
- Not sure

Cons
- Could cause performance issues if the IO tasks are mostly free (e.g.
due to buffering)

The ideal approach could actually be a hybrid of Models A and B --
there's no particular reason that the programming models cannot
coexist (except that code that uses Model A approach might make code
that has optimized itself for Model B slower).

I think we need to investigate some asynchronous C++ programming libraries like

https://github.com/facebook/folly/tree/master/folly/fibers

to see how organizations with mature C++ practices are handling these
issues from a programming model standpoint

On Mon, Jul 15, 2019 at 3:15 PM Wes McKinney  wrote:
>
> On Mon, Jul 15, 2019 at 12:01 PM Antoine Pitrou  wrote:
> >
> > On Mon, 15 Jul 2019 11:49:56 -0500
> > Wes McKinney  wrote:
> > >
> > > For example, suppose we had a thread pool with a limit of 8 concurrent
> > > tasks. Now 4 of them perform IO calls. Hypothetically this should
> > > happen:
> > >
> > > * Thread pool increments a "soft limit" to allow 4 more tasks to
> > > spawn, so at this point technically we have 12 active tasks
> > > * When each IO call returns, the soft limit is decremented
> > > * The soft limit can be constrained to be some multiple of the hard
> > > limit. So if we have a hard limit of 8 CPU-bound threads, then we
> > > might allow an additional 8 tasks to be spawned if a CPU bound thread
> > > indicates that it's waiting for IO
> >
> > Well, there are two approaches to this:
> >
> > * the approach you are proposing
> > * the approach where IO is done in separate worker threads so that we
> >   needn't resize the main thread pool when IO is done
> >
> > Advantages of the second approach:
> >
> > * No need to dynamically resize the main thread pool (which may
> >   difficult to achieve in an efficient manner).
> > * CPU-bound threads can stay pinned on the same HW cores and threads
> >   most of the time, which is probably good for cache locality and to
> >   avoid migration costs.
> >
> > Advantages of the first approach:
> >
> > * The programming model is probably simpler.
> >
> > Also, the first approach is not workable if e.g. TBB doesn't support it
> > (?).
>
> Agreed with both points. I'd like to investigate these approaches to
> see what makes the most sense from a programming model and efficiency
> / performance standpoint.
>
> Currently we have lots of code that looks like (pseudocode)
>
> function Func(State* mutable_state) {
>CPUTask1(mutable_state);
>IOTask1(mutable_state);
>CPUTask2(mutable_state)
>IOTask2(mutable_state);
>CPUTask3(mutable_state);
>...
> }
>
> Either approach is going to require us to develop a programming model
> where a task scheduler is passed into many functions, so such code has
> to be refactored to push work into the scheduler rather than doing the
> work in the current thread. You could certainly argue that we should
> elect for an API which maximizes our flexibility with regards to
> scheduling work (e.g. having separate thread pools for IO and CPU).
>
> Task scheduling may also need to be aware of IO resource identities to
> control concurrent reads o

Re: [DISCUSS][C++][Proposal] Threading engine for Arrow

2019-07-15 Thread Wes McKinney
On Mon, Jul 15, 2019 at 12:01 PM Antoine Pitrou  wrote:
>
> On Mon, 15 Jul 2019 11:49:56 -0500
> Wes McKinney  wrote:
> >
> > For example, suppose we had a thread pool with a limit of 8 concurrent
> > tasks. Now 4 of them perform IO calls. Hypothetically this should
> > happen:
> >
> > * Thread pool increments a "soft limit" to allow 4 more tasks to
> > spawn, so at this point technically we have 12 active tasks
> > * When each IO call returns, the soft limit is decremented
> > * The soft limit can be constrained to be some multiple of the hard
> > limit. So if we have a hard limit of 8 CPU-bound threads, then we
> > might allow an additional 8 tasks to be spawned if a CPU bound thread
> > indicates that it's waiting for IO
>
> Well, there are two approaches to this:
>
> * the approach you are proposing
> * the approach where IO is done in separate worker threads so that we
>   needn't resize the main thread pool when IO is done
>
> Advantages of the second approach:
>
> * No need to dynamically resize the main thread pool (which may
>   difficult to achieve in an efficient manner).
> * CPU-bound threads can stay pinned on the same HW cores and threads
>   most of the time, which is probably good for cache locality and to
>   avoid migration costs.
>
> Advantages of the first approach:
>
> * The programming model is probably simpler.
>
> Also, the first approach is not workable if e.g. TBB doesn't support it
> (?).

Agreed with both points. I'd like to investigate these approaches to
see what makes the most sense from a programming model and efficiency
/ performance standpoint.

Currently we have lots of code that looks like (pseudocode)

function Func(State* mutable_state) {
   CPUTask1(mutable_state);
   IOTask1(mutable_state);
   CPUTask2(mutable_state)
   IOTask2(mutable_state);
   CPUTask3(mutable_state);
   ...
}

Either approach is going to require us to develop a programming model
where a task scheduler is passed into many functions, so such code has
to be refactored to push work into the scheduler rather than doing the
work in the current thread. You could certainly argue that we should
elect for an API which maximizes our flexibility with regards to
scheduling work (e.g. having separate thread pools for IO and CPU).

Task scheduling may also need to be aware of IO resource identities to
control concurrent reads of sources that are sensitive to that (e.g.
some filesystems may work fine accessed by 16 threads in parallel,
where others will not).

Probably we need to figure out at least what the programming model
ought to look like so we can start refactoring old code (e.g.
parquet-cpp internals) and writing new code in a more
concurrency-minded way.

>
> Regards
>
> Antoine.
>
>


Re: [DISCUSS][C++][Proposal] Threading engine for Arrow

2019-07-15 Thread Antoine Pitrou
On Mon, 15 Jul 2019 11:49:56 -0500
Wes McKinney  wrote:
> 
> For example, suppose we had a thread pool with a limit of 8 concurrent
> tasks. Now 4 of them perform IO calls. Hypothetically this should
> happen:
> 
> * Thread pool increments a "soft limit" to allow 4 more tasks to
> spawn, so at this point technically we have 12 active tasks
> * When each IO call returns, the soft limit is decremented
> * The soft limit can be constrained to be some multiple of the hard
> limit. So if we have a hard limit of 8 CPU-bound threads, then we
> might allow an additional 8 tasks to be spawned if a CPU bound thread
> indicates that it's waiting for IO

Well, there are two approaches to this:

* the approach you are proposing
* the approach where IO is done in separate worker threads so that we
  needn't resize the main thread pool when IO is done

Advantages of the second approach:

* No need to dynamically resize the main thread pool (which may
  difficult to achieve in an efficient manner).
* CPU-bound threads can stay pinned on the same HW cores and threads
  most of the time, which is probably good for cache locality and to
  avoid migration costs.

Advantages of the first approach:

* The programming model is probably simpler.

Also, the first approach is not workable if e.g. TBB doesn't support it
(?).

Regards

Antoine.




Re: [DISCUSS][C++][Proposal] Threading engine for Arrow

2019-07-15 Thread Wes McKinney
On Mon, Jul 15, 2019 at 11:38 AM Antoine Pitrou  wrote:
>
>
> Hi Anton,
>
> Le 12/07/2019 à 23:21, Malakhov, Anton a écrit :
> >
> > The result is that all these execution nodes scale well enough and run 
> > under 100 milliseconds on my 2 x Xeon E5-2650 v4 @ 2.20GHz, 128Gb RAM while 
> > CSV reader takes several seconds to complete even reading from in-memory 
> > file (8Gb), thus it is not IO bound yet even with good consumer-grade SSDs. 
> > Thus my focus recently has been around optimization of CSV parser where I 
> > have achieved 50% improvement substituting all the small object allocations 
> > via TBB scalable allocator and using TBB-based memory pool instead of 
> > default one with pre-allocated huge (2Mb) memory pages (echo 3 > 
> > /proc/sys/vm/nr_hugepages). I found no way yet how to do both of these 
> > tricks with jemalloc, so please try to beat or meet my times without TBB 
> > allocator.
>
> That sounds interesting, though optimizing memory allocations is
> probably not the most enticing use case for TBB.  Memory allocators can
> fare differently on different workloads, and just because TBB is better
> in some situation doesn't mean it'll always be better.  Similarly,
> jemalloc is not the best for every use case.
>
> Note that, as Arrow is a library, we don't want to impose a memory
> allocator on the user, hence why jemalloc is merely optional.
>
> (one reason we added the jemalloc option is that jemalloc has
> non-standard APIs for aligned allocation and reallocation, btw)
>
> > I also see other hotspots and opportunities for optimizations, some 
> > examples are memset is being heavily used while resizing buffers (why and 
> > why?) and the column builder trashes caches by not using of streaming 
> > stores.
>
> Could you open JIRA issues with your investigations?  I'd be interested
> to know what the actual execution bottlenecks are in the CSV reader.
>
> > I used TBB directly to make the execution nodes parallel, however I have 
> > also implemented a simple TBB-based ThreadPool and TaskGroup as you can see 
> > in this PR: https://github.com/aregm/arrow/pull/6
> > I see consistent improvement (up to 1200%!) on BM_ThreadedTaskGroup and 
> > BM_ThreadPoolSpawn microbenchmarks, however applying it to the real world 
> > task of CSV reader, I don't see any improvements yet.
>
> One thing you could try is shrink the block size in CSV reader and see
> when performance starts to fall significantly.  With the current
> TaskGroup overhead, small block sizes will suffer a lot.  I expect TBB
> to fare better.
>
> (and / or try a CSV file with a hundred columns or so)
>
> > Or even worse, while reading the file, TBB wastes some cycles spinning.
>
> That doesn't sound good (but is a separate issue from the main TaskGroup
> usage, IMHO).  TBB doesn't provide a facility for background IO threads
> perhaps?
>

I think we need to spend some design effort on an programming model /
API for these code paths that do a mix of IO and deserialization. This
is also a problem with Parquet files -- a CPU thread that is
deserializing a column will sit idle while it waits for IO. IMHO such
IO calls need to be able to signal to the concurrency manager that
another task can be started.

For example, suppose we had a thread pool with a limit of 8 concurrent
tasks. Now 4 of them perform IO calls. Hypothetically this should
happen:

* Thread pool increments a "soft limit" to allow 4 more tasks to
spawn, so at this point technically we have 12 active tasks
* When each IO call returns, the soft limit is decremented
* The soft limit can be constrained to be some multiple of the hard
limit. So if we have a hard limit of 8 CPU-bound threads, then we
might allow an additional 8 tasks to be spawned if a CPU bound thread
indicates that it's waiting for IO

I think that any code in the codebase that does a mix of CPU and IO
should be retrofitted with some kind of object to allow code to signal
that it's about to wait for IO.

> > I'll be looking into applying more sophisticated NUMA and locality-aware 
> > tricks as I'll be cleaning paths for the data streams in the parser.
>
> Hmm, as a first approach, I don't think we should waste time trying such
> sophisticated optimizations (well, of course, you are free to do so :-)).
>
> Regards
>
> Antoine.


Re: [DISCUSS][C++][Proposal] Threading engine for Arrow

2019-07-15 Thread Antoine Pitrou


Hi Anton,

Le 12/07/2019 à 23:21, Malakhov, Anton a écrit :
> 
> The result is that all these execution nodes scale well enough and run under 
> 100 milliseconds on my 2 x Xeon E5-2650 v4 @ 2.20GHz, 128Gb RAM while CSV 
> reader takes several seconds to complete even reading from in-memory file 
> (8Gb), thus it is not IO bound yet even with good consumer-grade SSDs. Thus 
> my focus recently has been around optimization of CSV parser where I have 
> achieved 50% improvement substituting all the small object allocations via 
> TBB scalable allocator and using TBB-based memory pool instead of default one 
> with pre-allocated huge (2Mb) memory pages (echo 3 > 
> /proc/sys/vm/nr_hugepages). I found no way yet how to do both of these tricks 
> with jemalloc, so please try to beat or meet my times without TBB allocator.

That sounds interesting, though optimizing memory allocations is
probably not the most enticing use case for TBB.  Memory allocators can
fare differently on different workloads, and just because TBB is better
in some situation doesn't mean it'll always be better.  Similarly,
jemalloc is not the best for every use case.

Note that, as Arrow is a library, we don't want to impose a memory
allocator on the user, hence why jemalloc is merely optional.

(one reason we added the jemalloc option is that jemalloc has
non-standard APIs for aligned allocation and reallocation, btw)

> I also see other hotspots and opportunities for optimizations, some examples 
> are memset is being heavily used while resizing buffers (why and why?) and 
> the column builder trashes caches by not using of streaming stores.

Could you open JIRA issues with your investigations?  I'd be interested
to know what the actual execution bottlenecks are in the CSV reader.

> I used TBB directly to make the execution nodes parallel, however I have also 
> implemented a simple TBB-based ThreadPool and TaskGroup as you can see in 
> this PR: https://github.com/aregm/arrow/pull/6
> I see consistent improvement (up to 1200%!) on BM_ThreadedTaskGroup and 
> BM_ThreadPoolSpawn microbenchmarks, however applying it to the real world 
> task of CSV reader, I don't see any improvements yet.

One thing you could try is shrink the block size in CSV reader and see
when performance starts to fall significantly.  With the current
TaskGroup overhead, small block sizes will suffer a lot.  I expect TBB
to fare better.

(and / or try a CSV file with a hundred columns or so)

> Or even worse, while reading the file, TBB wastes some cycles spinning.

That doesn't sound good (but is a separate issue from the main TaskGroup
usage, IMHO).  TBB doesn't provide a facility for background IO threads
perhaps?

> I'll be looking into applying more sophisticated NUMA and locality-aware 
> tricks as I'll be cleaning paths for the data streams in the parser.

Hmm, as a first approach, I don't think we should waste time trying such
sophisticated optimizations (well, of course, you are free to do so :-)).

Regards

Antoine.


Re: [DISCUSS][C++][Proposal] Threading engine for Arrow

2019-07-13 Thread Wes McKinney
hi Anton,

Ideally PRs like https://github.com/aregm/arrow/pull/6 would be made
into apache/arrow where the community can see them. I had no idea this
PR existed.

I have looked at the demo repository a little bit, but I'm not sure
what conclusions it will help reach. What we are missing at the moment
is a richer programming model / API for multi-threaded workflows that
do a mix of CPU and IO. If that programming model can make use of TBB
for better performance (but without exposing a lot of TBB-specific
details to the Arrow library developer), then that is great.

In any case, I'd prefer to collaborate in the context of the Arrow C++
library and the specific problems we are solving there. I haven't had
much time lately to write new code but one of the thing I'm most
interested in working on in the near future is the C++ Data Frame
library: 
https://docs.google.com/document/d/1XHe_j87n2VHGzEbnLe786GHbbcbrzbjgG8D0IXWAeHg/edit?usp=sharing.
That might be a good place to experiment with a better threading /
work-scheduling API to make things simpler for implementers.

- Wes

On Fri, Jul 12, 2019 at 4:21 PM Malakhov, Anton
 wrote:
>
> Hi, folks
>
> We were discussing improvements for the threading engine back in May and 
> agreed to implement benchmarks (sorry, I've lost the original mail thread, 
> here is the link: 
> https://lists.apache.org/thread.html/c690253d0bde643a5b644af70ec1511c6e510ebc86cc970aa8d5252e@%3Cdev.arrow.apache.org%3E
>  )
>
> Here is update of what's going on with this effort.
> We've implemented a rough prototype for group_by, aggregate, and transform 
> execution nodes on top of Arrow (along with studying the whole data analytics 
> domain along the way :-) ) and made them parallel, as you can see in this 
> repository: https://github.com/anton-malakhov/nyc_taxi
>
> The result is that all these execution nodes scale well enough and run under 
> 100 milliseconds on my 2 x Xeon E5-2650 v4 @ 2.20GHz, 128Gb RAM while CSV 
> reader takes several seconds to complete even reading from in-memory file 
> (8Gb), thus it is not IO bound yet even with good consumer-grade SSDs. Thus 
> my focus recently has been around optimization of CSV parser where I have 
> achieved 50% improvement substituting all the small object allocations via 
> TBB scalable allocator and using TBB-based memory pool instead of default one 
> with pre-allocated huge (2Mb) memory pages (echo 3 > 
> /proc/sys/vm/nr_hugepages). I found no way yet how to do both of these tricks 
> with jemalloc, so please try to beat or meet my times without TBB allocator. 
> I also see other hotspots and opportunities for optimizations, some examples 
> are memset is being heavily used while resizing buffers (why and why?) and 
> the column builder trashes caches by not using of streaming stores.
>
> I used TBB directly to make the execution nodes parallel, however I have also 
> implemented a simple TBB-based ThreadPool and TaskGroup as you can see in 
> this PR: https://github.com/aregm/arrow/pull/6
> I see consistent improvement (up to 1200%!) on BM_ThreadedTaskGroup and 
> BM_ThreadPoolSpawn microbenchmarks, however applying it to the real world 
> task of CSV reader, I don't see any improvements yet. Or even worse, while 
> reading the file, TBB wastes some cycles spinning.. probably because of 
> read-ahead thread, which oversubscribes the machine. Arrow's threading better 
> interacts with OS scheduler thus shows better performance. So, this simple 
> approach to TBB without a deeper redesign didn't help. I'll be looking into 
> applying more sophisticated NUMA and locality-aware tricks as I'll be 
> cleaning paths for the data streams in the parser. Though, I'll take some 
> time off before returning to this effort. See you in September!
>
>
> Regards,
> // Anton
>


RE: [DISCUSS][C++][Proposal] Threading engine for Arrow

2019-07-12 Thread Malakhov, Anton
Hi, folks

We were discussing improvements for the threading engine back in May and agreed 
to implement benchmarks (sorry, I've lost the original mail thread, here is the 
link: 
https://lists.apache.org/thread.html/c690253d0bde643a5b644af70ec1511c6e510ebc86cc970aa8d5252e@%3Cdev.arrow.apache.org%3E
 )

Here is update of what's going on with this effort.
We've implemented a rough prototype for group_by, aggregate, and transform 
execution nodes on top of Arrow (along with studying the whole data analytics 
domain along the way :-) ) and made them parallel, as you can see in this 
repository: https://github.com/anton-malakhov/nyc_taxi

The result is that all these execution nodes scale well enough and run under 
100 milliseconds on my 2 x Xeon E5-2650 v4 @ 2.20GHz, 128Gb RAM while CSV 
reader takes several seconds to complete even reading from in-memory file 
(8Gb), thus it is not IO bound yet even with good consumer-grade SSDs. Thus my 
focus recently has been around optimization of CSV parser where I have achieved 
50% improvement substituting all the small object allocations via TBB scalable 
allocator and using TBB-based memory pool instead of default one with 
pre-allocated huge (2Mb) memory pages (echo 3 > /proc/sys/vm/nr_hugepages). 
I found no way yet how to do both of these tricks with jemalloc, so please try 
to beat or meet my times without TBB allocator. I also see other hotspots and 
opportunities for optimizations, some examples are memset is being heavily used 
while resizing buffers (why and why?) and the column builder trashes caches by 
not using of streaming stores.

I used TBB directly to make the execution nodes parallel, however I have also 
implemented a simple TBB-based ThreadPool and TaskGroup as you can see in this 
PR: https://github.com/aregm/arrow/pull/6
I see consistent improvement (up to 1200%!) on BM_ThreadedTaskGroup and 
BM_ThreadPoolSpawn microbenchmarks, however applying it to the real world task 
of CSV reader, I don't see any improvements yet. Or even worse, while reading 
the file, TBB wastes some cycles spinning.. probably because of read-ahead 
thread, which oversubscribes the machine. Arrow's threading better interacts 
with OS scheduler thus shows better performance. So, this simple approach to 
TBB without a deeper redesign didn't help. I'll be looking into applying more 
sophisticated NUMA and locality-aware tricks as I'll be cleaning paths for the 
data streams in the parser. Though, I'll take some time off before returning to 
this effort. See you in September!


Regards,
// Anton



RE: [DISCUSS][C++][Proposal] Threading engine for Arrow

2019-05-07 Thread Malakhov, Anton
> From: Jed Brown [mailto:j...@jedbrown.org]
> Sent: Monday, May 6, 2019 16:35

> Nice paper, thanks!  Did you investigate latency impact from the IPC counting
> semaphore?  Is your test code available?
Not that deep. Basically I was looking only if its positive effect is enough to 
overcome the impact of oversubscription or not. It is, but not in all the 
cases. It is also hard to separate one impact/effect from another, e.g.: some 
parallel regions ask for all the threads but use a few, which results in 
undersubscription when serializing parallel regions in OpenMP. IPC for 
coordinating TBB processes solves the resource exhaustion problem and gives 
additional performance in some cases. However, Linux is usually good enough for 
scheduling multiple multithreaded processes. I guess it's because it sees how 
threads are grouped, which is not the case for multiple concurrent parallel 
regions with OpenMP threads in the same process.
All the results from the blog, paper, talks, and demo are available at 
https://github.com/IntelPython/composability_bench

Regards
// Anton


RE: [DISCUSS][C++][Proposal] Threading engine for Arrow

2019-05-06 Thread Jed Brown
"Malakhov, Anton"  writes:

> Jed,
>
>> From: Jed Brown [mailto:j...@jedbrown.org]
>> Sent: Friday, May 3, 2019 12:41
>
>> You linked to a NumPy discussion
>> (https://github.com/numpy/numpy/issues/11826) that is encountering the same
>> issues, but proposing solutions based on the global environment.
>> That is perhaps acceptable for typical Python callers due to the GIL, but C++
>> callers may be using threads themselves.  A typical example:
>> 
>> App:
>>   calls libB sequentially:
>> calls Arrow sequentially (wants to use threads)
>>   calls libC sequentially:
>> omp parallel (creates threads somehow):
>>   calls Arrow from threads (Arrow should not create more)
>>   omp parallel:
>> calls libD from threads:
>>   calls Arrow (Arrow should not create more)
>
> That's not correct assumption about Python. GIL is used for
> synchronization of Python's interpreter state, its C-API data
> structures. When Python calls a C extension like Numpy, the latter is
> not restricted for doing its own internal parallelism (like what
> OpenBLAS and MKL do). Moreover, Numpy and other libraries usually
> release GIL before going into a long compute region, which allows a
> concurrent thread to start a compute region in parallel. 

Thanks, I wasn't aware under what conditions NumPy (or other callers)
would release the GIL.

> So, there is no much difference between Python and C++ for what you
> can get in terms of nested parallelism (the difference is in overheads
> and scalability). If there is an app-level parallelism (like for libD)
> and/or other nesting (like in your libC), which can be implemented
> e.g. with Dask, Numpy will still create parallel region inside for
> each call from outermost thread or process (Python, Dask support
> both). And this is exactly the problem I'm solving, that's the reason
> I started this discussion, so thanks for sharing my concerns. For more
> information, please refer to my Scipy2017 talk and later paper where
> we introduced 3 approaches to the problem (TBB, settings
> orchestration, OpenMP extension):
> http://conference.scipy.org/proceedings/scipy2018/pdfs/anton_malakhov.pdf

Nice paper, thanks!  Did you investigate latency impact from the IPC
counting semaphore?  Is your test code available?


RE: [DISCUSS][C++][Proposal] Threading engine for Arrow

2019-05-06 Thread Malakhov, Anton
Jed,

> From: Jed Brown [mailto:j...@jedbrown.org]
> Sent: Friday, May 3, 2019 12:41

> You linked to a NumPy discussion
> (https://github.com/numpy/numpy/issues/11826) that is encountering the same
> issues, but proposing solutions based on the global environment.
> That is perhaps acceptable for typical Python callers due to the GIL, but C++
> callers may be using threads themselves.  A typical example:
> 
> App:
>   calls libB sequentially:
> calls Arrow sequentially (wants to use threads)
>   calls libC sequentially:
> omp parallel (creates threads somehow):
>   calls Arrow from threads (Arrow should not create more)
>   omp parallel:
> calls libD from threads:
>   calls Arrow (Arrow should not create more)

That's not correct assumption about Python. GIL is used for synchronization of 
Python's interpreter state, its C-API data structures. When Python calls a C 
extension like Numpy, the latter is not restricted for doing its own internal 
parallelism (like what OpenBLAS and MKL do). Moreover, Numpy and other 
libraries usually release GIL before going into a long compute region, which 
allows a concurrent thread to start a compute region in parallel. So, there is 
no much difference between Python and C++ for what you can get in terms of 
nested parallelism (the difference is in overheads and scalability). If there 
is an app-level parallelism (like for libD) and/or other nesting (like in your 
libC), which can be implemented e.g. with Dask, Numpy will still create 
parallel region inside for each call from outermost thread or process (Python, 
Dask support both). And this is exactly the problem I'm solving, that's the 
reason I started this discussion, so thanks for sharing my concerns. For more 
information, please refer to my Scipy2017 talk and later paper where we 
introduced 3 approaches to the problem (TBB, settings orchestration, OpenMP 
extension): 
http://conference.scipy.org/proceedings/scipy2018/pdfs/anton_malakhov.pdf

 > Arrow doesn't need to know the difference between the libC and libD cases, 
 > but
> it may make a difference to the implementation of those libraries.  In both of
> these cases, the user may desire that Arrow create tasks for load balancing
> reasons (but no new threads) so long as they can run on the specified thread
> team.

Exactly, tasks is one way to solve it. This is what TBB does as a good first 
approximation for the solution: global task scheduler, no mandatory 
threads/parallel regions, wide adoption in numeric libraries (MKL, DAAL, Numba, 
soon PyTorch and others). And that's the first step I'm proposing.
Though we know based on the past experience, that it is still not sufficient 
because NUMA effects are not accounted: tasks are randomly distributed. That's 
where other threading layer implementations can work better for some cases and 
where more elaborated TBB-based NUMA-aware implementation is needed.

> Global solutions like this one (linked by Antoine)
> 
>   https://github.com/apache/arrow/blob/master/cpp/src/arrow/util/thread-
> pool.cc#L268
> 
> imply that threading mode is global and set via an environment variable, 
> neither
> of which are true in cases such as the above (and many simpler cases).
Right. I wrote about problem with this implementation in the proposal. First, 
we should not mimic OpenMP for something completely irrelevant, it is causing 
confusion and is hard to control for more complex cases.

Regards,
// Anton



RE: [DISCUSS][C++][Proposal] Threading engine for Arrow

2019-05-06 Thread Melik-Adamyan, Areg
> The question is whether you want to spend at least a month or more of
> intense development on something else (a basic query engine, as we've been
> discussing in [1]) before we are able to develop consensus about the
> approach to threading. Personally, I would not make this choice given that
> there are good options available to move along the discussion of the
> parallelism API. I think Antoine's point about the CSV reader as an example
> of a non-trivial processing pipeline that also includes IO operations.
> 
> - Wes
> 
> [1]:
> https://docs.google.com/document/d/10RoUZmiMQRi_J1FcPeVAUAMJ6d_Zu
> iEbaM2Y33sNPu4/edit?usp=sharing
> 
[>] Anton's point is just an example. Next month we are spending on kernels and 
QE execution nodes design according to the Wes' document. After that, if the 
community will feel that it is time to discuss parallelism we will return to 
that discussion. 


Re: [DISCUSS][C++][Proposal] Threading engine for Arrow

2019-05-06 Thread Wes McKinney
Anton, per your comment:

> Sounds like a good way to go! We'll create a demo, as you suggested, 
> implementing a parallel execution model for a simple analytics pipeline that 
> reads and processes the files. My only concern is about adding more pipeline 
> breaker nodes and compute intensive operations into this demo because min/max 
> are effectively no-ops fused into I/O scan node. What do you think about 
> adding group-by into this picture, effectively implementing NY taxi and/or 
> mortgage benchmarks? Ideally, I'd like to go even further and add sci-kit 
> learn like stuff for processing that data in order to demonstrate the 
> co-existence side of the story. What do you think? So, the idea of the 
> prototype will be to validate the parallel execution model as the first step. 
> After that, it'll help to shape API for both - execution nodes and the 
> threading backend. Does it sound right to you?

The question is whether you want to spend at least a month or more of
intense development on something else (a basic query engine, as we've
been discussing in [1]) before we are able to develop consensus about
the approach to threading. Personally, I would not make this choice
given that there are good options available to move along the
discussion of the parallelism API. I think Antoine's point about the
CSV reader as an example of a non-trivial processing pipeline that
also includes IO operations.

- Wes

[1]: 
https://docs.google.com/document/d/10RoUZmiMQRi_J1FcPeVAUAMJ6d_ZuiEbaM2Y33sNPu4/edit?usp=sharing

On Fri, May 3, 2019 at 12:41 PM Jed Brown  wrote:
>
> "Malakhov, Anton"  writes:
>
> >> > the library creates threads internally.  It's a disaster for managing
> >> > oversubscription and affinity issues among groups of threads and/or
> >> > multiple processes (e.g., MPI).
> >
> > This is exactly what I'm talking about referring as issues with threading 
> > composability! OpenMP is not easy to have inside a library. I described it 
> > in this document: 
> > https://cwiki.apache.org/confluence/display/ARROW/Parallel+Execution+Engine
>
> Thanks for this document.  I'm no great fan of OpenMP, but it's being
> billed by most vendors (especially Intel) as the way to go in the
> scientific computing space and has become relatively popular (much more
> so than TBB).
>
> You linked to a NumPy discussion
> (https://github.com/numpy/numpy/issues/11826) that is encountering the
> same issues, but proposing solutions based on the global environment.
> That is perhaps acceptable for typical Python callers due to the GIL,
> but C++ callers may be using threads themselves.  A typical example:
>
> App:
>   calls libB sequentially:
> calls Arrow sequentially (wants to use threads)
>   calls libC sequentially:
> omp parallel (creates threads somehow):
>   calls Arrow from threads (Arrow should not create more)
>   omp parallel:
> calls libD from threads:
>   calls Arrow (Arrow should not create more)
>
> Arrow doesn't need to know the difference between the libC and libD
> cases, but it may make a difference to the implementation of those
> libraries.  In both of these cases, the user may desire that Arrow
> create tasks for load balancing reasons (but no new threads) so long as
> they can run on the specified thread team.
>
> I have yet to see a complete solution to this problem, but we should
> work out which modes are worth supporting and how that interface would
> look.
>
>
> Global solutions like this one (linked by Antoine)
>
>   
> https://github.com/apache/arrow/blob/master/cpp/src/arrow/util/thread-pool.cc#L268
>
> imply that threading mode is global and set via an environment variable,
> neither of which are true in cases such as the above (and many simpler
> cases).


RE: [DISCUSS][C++][Proposal] Threading engine for Arrow

2019-05-03 Thread Jed Brown
"Malakhov, Anton"  writes:

>> > the library creates threads internally.  It's a disaster for managing
>> > oversubscription and affinity issues among groups of threads and/or
>> > multiple processes (e.g., MPI).
>
> This is exactly what I'm talking about referring as issues with threading 
> composability! OpenMP is not easy to have inside a library. I described it in 
> this document: 
> https://cwiki.apache.org/confluence/display/ARROW/Parallel+Execution+Engine

Thanks for this document.  I'm no great fan of OpenMP, but it's being
billed by most vendors (especially Intel) as the way to go in the
scientific computing space and has become relatively popular (much more
so than TBB).

You linked to a NumPy discussion
(https://github.com/numpy/numpy/issues/11826) that is encountering the
same issues, but proposing solutions based on the global environment.
That is perhaps acceptable for typical Python callers due to the GIL,
but C++ callers may be using threads themselves.  A typical example:

App:
  calls libB sequentially:
calls Arrow sequentially (wants to use threads)
  calls libC sequentially:
omp parallel (creates threads somehow):
  calls Arrow from threads (Arrow should not create more)
  omp parallel:
calls libD from threads:
  calls Arrow (Arrow should not create more)

Arrow doesn't need to know the difference between the libC and libD
cases, but it may make a difference to the implementation of those
libraries.  In both of these cases, the user may desire that Arrow
create tasks for load balancing reasons (but no new threads) so long as
they can run on the specified thread team.

I have yet to see a complete solution to this problem, but we should
work out which modes are worth supporting and how that interface would
look.


Global solutions like this one (linked by Antoine)

  
https://github.com/apache/arrow/blob/master/cpp/src/arrow/util/thread-pool.cc#L268

imply that threading mode is global and set via an environment variable,
neither of which are true in cases such as the above (and many simpler
cases).


Re: [DISCUSS][C++][Proposal] Threading engine for Arrow

2019-05-03 Thread Antoine Pitrou


Le 03/05/2019 à 17:57, Jed Brown a écrit :
> 
>>> The library is then free to use constructs like omp taskgroup/taskloop
>>> as granularity warrants; it will never utilize threads that the
>>> application didn't explicitly give it.
>>
>> I don't think we're planning to use OpenMP in Arrow, though Wes probably
>> has a better answer.
> 
> I was just using it to demonstrate the semantic.  Regardless of what
> Arrow uses internally, there will be a cohort of users who are
> interested in using Arrow with OpenMP.

I know next to nothing about OpenMP, but we have some code that's
supposed to enable cooperation with OpenMP here:
https://github.com/apache/arrow/blob/master/cpp/src/arrow/util/thread-pool.cc#L268

If that doesn't work as intended, feel free to open an issue and
describe the problem.

Regards

Antoine.


RE: [DISCUSS][C++][Proposal] Threading engine for Arrow

2019-05-03 Thread Malakhov, Anton
Thanks for your answers,

> -Original Message-
> From: Antoine Pitrou [mailto:anto...@python.org]
> Sent: Friday, May 3, 2019 03:54

> Le 03/05/2019 à 05:47, Jed Brown a écrit :
> > I would caution to please not commit to the MKL/BLAS model in which
I'm actually talking about threading layers model where MKL supports several 
OpenMP runtimes (Intel, GNU, PGI) and TBB, as well as non-threaded version. It 
even supports dynamic selection, please refer to: 
https://software.intel.com/en-us/mkl-macos-developer-guide-dynamically-selecting-the-interface-and-threading-layer
The same approach we implemented in Numba (#2245):  
https://numba.pydata.org/numba-doc/dev/user/threading-layer.html

> > the library creates threads internally.  It's a disaster for managing
> > oversubscription and affinity issues among groups of threads and/or
> > multiple processes (e.g., MPI).
This is exactly what I'm talking about referring as issues with threading 
composability! OpenMP is not easy to have inside a library. I described it in 
this document: 
https://cwiki.apache.org/confluence/display/ARROW/Parallel+Execution+Engine

> Implicit multi-threading is important for user-friendliness reasons 
> (especially in
> higher-level bindings such as the Python-bindings).
Cannot agree more! There might be not enough parallelism on the application 
level, adding parallelism from DSLs is important for better CPU utilization but 
it is also tricky because of these incompatibility issues.

> > The library is then free to use constructs like omp taskgroup/taskloop
> > as granularity warrants; it will never utilize threads that the
> > application didn't explicitly give it.
> 
> I don't think we're planning to use OpenMP in Arrow, though Wes probably has a
> better answer.
I'd not exclude OpenMP from the consideration completely. I want to start with 
TBB but nothing composes better with OpenMP as OpenMP itself. The same MKL 
(i.e. Numpy) defaults to OpenMP threading. BTW, there is no more compatibility 
layer between TBB and OpenMP, it was removed from the latter.


> -Original Message-
> From: Antoine Pitrou [mailto:anto...@python.org]
> Sent: Friday, May 3, 2019 03:49
> 
> Another possibility is to look at our C++ CSV reader and parser (in
> src/arrow/csv).  It's the only piece of Arrow that uses non-trivial 
> multi-threading
> right now (with tasks spawning new tasks dynamically, see
> InferringColumnBuilder).  It's based on the ThreadPool and TaskGroup APIs (in
> src/arrow/util/).  These APIs are not set in stone, so you're free to propose
> changes to make them fit better with a TBB-based implementation.
Great! This is what I was looking for!


// Anton



Re: [DISCUSS][C++][Proposal] Threading engine for Arrow

2019-05-03 Thread Jed Brown
Antoine Pitrou  writes:

> Hi Jed,
>
> Le 03/05/2019 à 05:47, Jed Brown a écrit :
>> I would caution to please not commit to the MKL/BLAS model in which the
>> library creates threads internally.  It's a disaster for managing
>> oversubscription and affinity issues among groups of threads and/or
>> multiple processes (e.g., MPI).
>
> Implicit multi-threading is important for user-friendliness reasons
> (especially in higher-level bindings such as the Python-bindings).

I would argue that can be tucked into bindings versus making it
all-or-nothing in the C++ interface.  It's at least worthy of
discussion.

>> The library is then free to use constructs like omp taskgroup/taskloop
>> as granularity warrants; it will never utilize threads that the
>> application didn't explicitly give it.
>
> I don't think we're planning to use OpenMP in Arrow, though Wes probably
> has a better answer.

I was just using it to demonstrate the semantic.  Regardless of what
Arrow uses internally, there will be a cohort of users who are
interested in using Arrow with OpenMP.


Re: [DISCUSS][C++][Proposal] Threading engine for Arrow

2019-05-03 Thread Antoine Pitrou


Hi Jed,

Le 03/05/2019 à 05:47, Jed Brown a écrit :
> I would caution to please not commit to the MKL/BLAS model in which the
> library creates threads internally.  It's a disaster for managing
> oversubscription and affinity issues among groups of threads and/or
> multiple processes (e.g., MPI).

Implicit multi-threading is important for user-friendliness reasons
(especially in higher-level bindings such as the Python-bindings).

> The library is then free to use constructs like omp taskgroup/taskloop
> as granularity warrants; it will never utilize threads that the
> application didn't explicitly give it.

I don't think we're planning to use OpenMP in Arrow, though Wes probably
has a better answer.

Regards

Antoine.


Re: [DISCUSS][C++][Proposal] Threading engine for Arrow

2019-05-03 Thread Antoine Pitrou


Hi Anton,

Another possibility is to look at our C++ CSV reader and parser (in
src/arrow/csv).  It's the only piece of Arrow that uses non-trivial
multi-threading right now (with tasks spawning new tasks dynamically,
see InferringColumnBuilder).  It's based on the ThreadPool and TaskGroup
APIs (in src/arrow/util/).  These APIs are not set in stone, so you're
free to propose changes to make them fit better with a TBB-based
implementation.

Regards

Antoine.


Le 03/05/2019 à 01:42, Malakhov, Anton a écrit :
> Thanks Wes!
> 
> Sounds like a good way to go! We'll create a demo, as you suggested, 
> implementing a parallel execution model for a simple analytics pipeline that 
> reads and processes the files. My only concern is about adding more pipeline 
> breaker nodes and compute intensive operations into this demo because min/max 
> are effectively no-ops fused into I/O scan node. What do you think about 
> adding group-by into this picture, effectively implementing NY taxi and/or 
> mortgage benchmarks? Ideally, I'd like to go even further and add sci-kit 
> learn-like stuff for processing that data in order to demonstrate the 
> co-existence side of the story. What do you think?
> So, the idea of the prototype will be to validate the parallel execution 
> model as the first step. After that, it'll help to shape API for both - 
> execution nodes and the threading backend. Does it sound right to you?
> 
> P.S. I can well understand your hesitation about using TBB directly and as 
> non-optional dependency, thus I'm suggesting threading layers approach here. 
> Please let me clarify myself, using TBB and nested parallelism is non-goal by 
> itself. The goal is to build components of efficient execution model, which 
> coexist well with each other and with all the other, external to Arrow, 
> components of an applications. However, without a rich, composable, and 
> mature parallel toolkit, it is hard to achieve and to focus on this goal. 
> Thus, I wanted to check with the community if it is an acceptable way at all 
> and what's the roadmap.
> 
> Thanks,
> // Anton
> 
> 
> -Original Message-
> From: Wes McKinney [mailto:wesmck...@gmail.com] 
> Sent: Thursday, May 2, 2019 13:52
> To: dev@arrow.apache.org
> Subject: Re: [DISCUSS][C++][Proposal] Threading engine for Arrow
> 
> hi Anton,
> 
> Thank you for bringing your expertise to the project -- this is a very useful 
> discussion to have.
> 
> Partly why our threading capabilities in the project are not further 
> developed is that there is not much that needs to be parallelized. It would 
> be like designing a supercharger when you don't have a car yet.
> That being said, it is worthwhile to plan ahead so we aren't trying to 
> retrofit significant pieces of software to be able to take advantage of a 
> more advanced task scheduler.
> 
> From my perspective, we have a few key practical areas of consideration:
> 
> * Computational tasks that may offer nested parallelism (e.g. an Aggregation 
> or Projection task may be able to execution in multiple
> threads)
> * IO operations performed from within tasks that appear to be computational 
> in nature (example: in the course of reading a Parquet file, both computation 
> -- decoding, decompression -- and IO -- local or remote filesystem operations 
> -- must be performed). The status quo right now is that IO performed inside a 
> task in the thread pool is not releasing any resources to other tasks.
> 
> I believe that we should design and develop a sane programming model / API 
> for implementing our software in the presence of these challenges.
> If the backend / implementation of this API uses TBB and that makes things 
> more efficient than other approaches, then that sounds great to me. I would 
> be hesitant to use TBB APIs directly in Arrow application code unless it can 
> be clearly demonstrated by that is a superior option to alternatives.
> 
> It seems useful to validate the implementation approach by starting with some 
> practical problems. Suppose, for the sake of argument, you want to read 10 
> Parquet files (constituting a single logical dataset) as fast as possible and 
> perform some simple analytics on them -- let's take something very simple 
> like computing the maximum and minimum values of each column in the dataset. 
> This problem features both problems listed above:
> 
> * Reading a single Parquet file can be parallelized (by columns -- since 
> columns can be decoded in parallel) on the global thread pool, so reading 
> multiple files in parallel would cause nested parallelism
> * Within the context of reading a single Parquet file column, IO calls are 
> performed. CPU threads sit idle whil

RE: [DISCUSS][C++][Proposal] Threading engine for Arrow

2019-05-02 Thread Jed Brown
I would caution to please not commit to the MKL/BLAS model in which the
library creates threads internally.  It's a disaster for managing
oversubscription and affinity issues among groups of threads and/or
multiple processes (e.g., MPI).  For example, a composable OpenMP
technique is for the caller who wants threaded execution to create the
thread team and explicitly give it to the library,

  #pragma omp parallel
  #pragma omp single
  {
library_calls();
  }

The library is then free to use constructs like omp taskgroup/taskloop
as granularity warrants; it will never utilize threads that the
application didn't explicitly give it.

I know Intel has put work into making TBB and OpenMP thread pools
interoperate, though this historically hasn't worked cleanly on other
platforms and thus may be a liability for Arrow to consider.


"Malakhov, Anton"  writes:

> Thanks Wes!
>
> Sounds like a good way to go! We'll create a demo, as you suggested, 
> implementing a parallel execution model for a simple analytics pipeline that 
> reads and processes the files. My only concern is about adding more pipeline 
> breaker nodes and compute intensive operations into this demo because min/max 
> are effectively no-ops fused into I/O scan node. What do you think about 
> adding group-by into this picture, effectively implementing NY taxi and/or 
> mortgage benchmarks? Ideally, I'd like to go even further and add sci-kit 
> learn-like stuff for processing that data in order to demonstrate the 
> co-existence side of the story. What do you think?
> So, the idea of the prototype will be to validate the parallel execution 
> model as the first step. After that, it'll help to shape API for both - 
> execution nodes and the threading backend. Does it sound right to you?
>
> P.S. I can well understand your hesitation about using TBB directly and as 
> non-optional dependency, thus I'm suggesting threading layers approach here. 
> Please let me clarify myself, using TBB and nested parallelism is non-goal by 
> itself. The goal is to build components of efficient execution model, which 
> coexist well with each other and with all the other, external to Arrow, 
> components of an applications. However, without a rich, composable, and 
> mature parallel toolkit, it is hard to achieve and to focus on this goal. 
> Thus, I wanted to check with the community if it is an acceptable way at all 
> and what's the roadmap.
>
> Thanks,
> // Anton
>
>
> -Original Message-----
> From: Wes McKinney [mailto:wesmck...@gmail.com] 
> Sent: Thursday, May 2, 2019 13:52
> To: dev@arrow.apache.org
> Subject: Re: [DISCUSS][C++][Proposal] Threading engine for Arrow
>
> hi Anton,
>
> Thank you for bringing your expertise to the project -- this is a very useful 
> discussion to have.
>
> Partly why our threading capabilities in the project are not further 
> developed is that there is not much that needs to be parallelized. It would 
> be like designing a supercharger when you don't have a car yet.
> That being said, it is worthwhile to plan ahead so we aren't trying to 
> retrofit significant pieces of software to be able to take advantage of a 
> more advanced task scheduler.
>
> From my perspective, we have a few key practical areas of consideration:
>
> * Computational tasks that may offer nested parallelism (e.g. an Aggregation 
> or Projection task may be able to execution in multiple
> threads)
> * IO operations performed from within tasks that appear to be computational 
> in nature (example: in the course of reading a Parquet file, both computation 
> -- decoding, decompression -- and IO -- local or remote filesystem operations 
> -- must be performed). The status quo right now is that IO performed inside a 
> task in the thread pool is not releasing any resources to other tasks.
>
> I believe that we should design and develop a sane programming model / API 
> for implementing our software in the presence of these challenges.
> If the backend / implementation of this API uses TBB and that makes things 
> more efficient than other approaches, then that sounds great to me. I would 
> be hesitant to use TBB APIs directly in Arrow application code unless it can 
> be clearly demonstrated by that is a superior option to alternatives.
>
> It seems useful to validate the implementation approach by starting with some 
> practical problems. Suppose, for the sake of argument, you want to read 10 
> Parquet files (constituting a single logical dataset) as fast as possible and 
> perform some simple analytics on them -- let's take something very simple 
> like computing the maximum and minimum values of each column in the dataset. 
> This problem features both problems listed

RE: [DISCUSS][C++][Proposal] Threading engine for Arrow

2019-05-02 Thread Malakhov, Anton
Thanks Wes!

Sounds like a good way to go! We'll create a demo, as you suggested, 
implementing a parallel execution model for a simple analytics pipeline that 
reads and processes the files. My only concern is about adding more pipeline 
breaker nodes and compute intensive operations into this demo because min/max 
are effectively no-ops fused into I/O scan node. What do you think about adding 
group-by into this picture, effectively implementing NY taxi and/or mortgage 
benchmarks? Ideally, I'd like to go even further and add sci-kit learn-like 
stuff for processing that data in order to demonstrate the co-existence side of 
the story. What do you think?
So, the idea of the prototype will be to validate the parallel execution model 
as the first step. After that, it'll help to shape API for both - execution 
nodes and the threading backend. Does it sound right to you?

P.S. I can well understand your hesitation about using TBB directly and as 
non-optional dependency, thus I'm suggesting threading layers approach here. 
Please let me clarify myself, using TBB and nested parallelism is non-goal by 
itself. The goal is to build components of efficient execution model, which 
coexist well with each other and with all the other, external to Arrow, 
components of an applications. However, without a rich, composable, and mature 
parallel toolkit, it is hard to achieve and to focus on this goal. Thus, I 
wanted to check with the community if it is an acceptable way at all and what's 
the roadmap.

Thanks,
// Anton


-Original Message-
From: Wes McKinney [mailto:wesmck...@gmail.com] 
Sent: Thursday, May 2, 2019 13:52
To: dev@arrow.apache.org
Subject: Re: [DISCUSS][C++][Proposal] Threading engine for Arrow

hi Anton,

Thank you for bringing your expertise to the project -- this is a very useful 
discussion to have.

Partly why our threading capabilities in the project are not further developed 
is that there is not much that needs to be parallelized. It would be like 
designing a supercharger when you don't have a car yet.
That being said, it is worthwhile to plan ahead so we aren't trying to retrofit 
significant pieces of software to be able to take advantage of a more advanced 
task scheduler.

From my perspective, we have a few key practical areas of consideration:

* Computational tasks that may offer nested parallelism (e.g. an Aggregation or 
Projection task may be able to execution in multiple
threads)
* IO operations performed from within tasks that appear to be computational in 
nature (example: in the course of reading a Parquet file, both computation -- 
decoding, decompression -- and IO -- local or remote filesystem operations -- 
must be performed). The status quo right now is that IO performed inside a task 
in the thread pool is not releasing any resources to other tasks.

I believe that we should design and develop a sane programming model / API for 
implementing our software in the presence of these challenges.
If the backend / implementation of this API uses TBB and that makes things more 
efficient than other approaches, then that sounds great to me. I would be 
hesitant to use TBB APIs directly in Arrow application code unless it can be 
clearly demonstrated by that is a superior option to alternatives.

It seems useful to validate the implementation approach by starting with some 
practical problems. Suppose, for the sake of argument, you want to read 10 
Parquet files (constituting a single logical dataset) as fast as possible and 
perform some simple analytics on them -- let's take something very simple like 
computing the maximum and minimum values of each column in the dataset. This 
problem features both problems listed above:

* Reading a single Parquet file can be parallelized (by columns -- since 
columns can be decoded in parallel) on the global thread pool, so reading 
multiple files in parallel would cause nested parallelism
* Within the context of reading a single Parquet file column, IO calls are 
performed. CPU threads sit idle while this IO is taking place, particularly if 
the file system is high latency (e.g. HDFS)

What do you think about -- as a way of moving this project forward -- 
developing a prototype threading backend and developer API (for people like me 
to use to develop libraries like the Parquet library) that addresses these 
issues? I think it could be difficult to build consensus around a threading 
backend developed in the abstract.

Thanks
Wes

On Tue, Apr 30, 2019 at 9:28 PM Malakhov, Anton  
wrote:
>
> Hi dear Arrow developers, Antoine,
>
> I'd like to kick off the discussion of the threading engine that Arrow can 
> use underneath for implementing multicore parallelism for execution nodes, 
> kernels, and/or all the functions, which can be optimized this way.
> I've documented some ideas on Arrow's Confluence Wiki: 
> https://cwiki.apache.org/confluence/display/ARROW/

Re: [DISCUSS][C++][Proposal] Threading engine for Arrow

2019-05-02 Thread Wes McKinney
hi Anton,

Thank you for bringing your expertise to the project -- this is a very
useful discussion to have.

Partly why our threading capabilities in the project are not further
developed is that there is not much that needs to be parallelized. It
would be like designing a supercharger when you don't have a car yet.
That being said, it is worthwhile to plan ahead so we aren't trying to
retrofit significant pieces of software to be able to take advantage
of a more advanced task scheduler.

>From my perspective, we have a few key practical areas of consideration:

* Computational tasks that may offer nested parallelism (e.g. an
Aggregation or Projection task may be able to execution in multiple
threads)
* IO operations performed from within tasks that appear to be
computational in nature (example: in the course of reading a Parquet
file, both computation -- decoding, decompression -- and IO -- local
or remote filesystem operations -- must be performed). The status quo
right now is that IO performed inside a task in the thread pool is not
releasing any resources to other tasks.

I believe that we should design and develop a sane programming model /
API for implementing our software in the presence of these challenges.
If the backend / implementation of this API uses TBB and that makes
things more efficient than other approaches, then that sounds great to
me. I would be hesitant to use TBB APIs directly in Arrow application
code unless it can be clearly demonstrated by that is a superior
option to alternatives.

It seems useful to validate the implementation approach by starting
with some practical problems. Suppose, for the sake of argument, you
want to read 10 Parquet files (constituting a single logical dataset)
as fast as possible and perform some simple analytics on them -- let's
take something very simple like computing the maximum and minimum
values of each column in the dataset. This problem features both
problems listed above:

* Reading a single Parquet file can be parallelized (by columns --
since columns can be decoded in parallel) on the global thread pool,
so reading multiple files in parallel would cause nested parallelism
* Within the context of reading a single Parquet file column, IO calls
are performed. CPU threads sit idle while this IO is taking place,
particularly if the file system is high latency (e.g. HDFS)

What do you think about -- as a way of moving this project forward --
developing a prototype threading backend and developer API (for people
like me to use to develop libraries like the Parquet library) that
addresses these issues? I think it could be difficult to build
consensus around a threading backend developed in the abstract.

Thanks
Wes

On Tue, Apr 30, 2019 at 9:28 PM Malakhov, Anton
 wrote:
>
> Hi dear Arrow developers, Antoine,
>
> I'd like to kick off the discussion of the threading engine that Arrow can 
> use underneath for implementing multicore parallelism for execution nodes, 
> kernels, and/or all the functions, which can be optimized this way.
> I've documented some ideas on Arrow's Confluence Wiki: 
> https://cwiki.apache.org/confluence/display/ARROW/Parallel+Execution+Engine
> The bottom line is that while Arrow is moving into the right direction 
> introducing shared thread pool, there are some questions and concerns about 
> current implementation and the way how it is supposed to co-exist with other 
> threaded libraries ("threading composability") while providing efficient 
> nestable NUMA&cache-aware data and data-flow parallelism.
> I suggest to introduce threading layers like in other libraries like MKL and 
> Numba, starting with TBB-based layer. Or maybe even use TBB directly. In 
> short, there are the following arguments for it:
>
> 1.  Designed for composability from day zero. Avoids mandatory 
> parallelism. Provides work stealing and FIFO scheduling. Compatible with 
> parallel depth first scheduling (a better composability research).
>
> 2.  TBB Flow Graph. It fits nicely into data flow and execution nodes 
> model of SQL databases. Besides basic nodes needed for implementing an 
> execution engine, it also provides a foundation for heterogeneous and 
> distributed computing (async_node, opencl_node, distributed_node)
>
> 3.  Arrow's ThreadPool, TaskGroup, and ParallelFor have direct equivalent 
> in TBB: task_arena, task_group, and parallel_for while providing mature and 
> performant implementation, which solves many if not all of the XXX todo notes 
> in the comments like exceptions, singletons and time of initialization, 
> lock-free.
>
> 4.  Concurrent hash tables, queues, vector and other concurrent 
> containers. Hash tables are required for implementing parallel versions of 
> joins, groupby, uniq, dictionary operations. There is a contribution to 
> integrate libcuckoo under TBB interface.
>
> 5.  TBB scalable malloc and memory pools, which can use any user-provided 
> memory chunk for scalable allocation. Arrow uses

[DISCUSS][C++][Proposal] Threading engine for Arrow

2019-04-30 Thread Malakhov, Anton
Hi dear Arrow developers, Antoine,

I'd like to kick off the discussion of the threading engine that Arrow can use 
underneath for implementing multicore parallelism for execution nodes, kernels, 
and/or all the functions, which can be optimized this way.
I've documented some ideas on Arrow's Confluence Wiki: 
https://cwiki.apache.org/confluence/display/ARROW/Parallel+Execution+Engine
The bottom line is that while Arrow is moving into the right direction 
introducing shared thread pool, there are some questions and concerns about 
current implementation and the way how it is supposed to co-exist with other 
threaded libraries ("threading composability") while providing efficient 
nestable NUMA&cache-aware data and data-flow parallelism.
I suggest to introduce threading layers like in other libraries like MKL and 
Numba, starting with TBB-based layer. Or maybe even use TBB directly. In short, 
there are the following arguments for it:

1.  Designed for composability from day zero. Avoids mandatory parallelism. 
Provides work stealing and FIFO scheduling. Compatible with parallel depth 
first scheduling (a better composability research).

2.  TBB Flow Graph. It fits nicely into data flow and execution nodes model 
of SQL databases. Besides basic nodes needed for implementing an execution 
engine, it also provides a foundation for heterogeneous and distributed 
computing (async_node, opencl_node, distributed_node)

3.  Arrow's ThreadPool, TaskGroup, and ParallelFor have direct equivalent 
in TBB: task_arena, task_group, and parallel_for while providing mature and 
performant implementation, which solves many if not all of the XXX todo notes 
in the comments like exceptions, singletons and time of initialization, 
lock-free.

4.  Concurrent hash tables, queues, vector and other concurrent containers. 
Hash tables are required for implementing parallel versions of joins, groupby, 
uniq, dictionary operations. There is a contribution to integrate libcuckoo 
under TBB interface.

5.  TBB scalable malloc and memory pools, which can use any user-provided 
memory chunk for scalable allocation. Arrow uses jemalloc, which is slower in 
some cases than tbbmalloc or tcmalloc.

6.  OpenMP is good for NUMA with static schedule, however, there is no good 
answer for dynamic tasks, graphs. TBB provides tools for implementing NUMA 
support: task_arena, task_scheduler_observer, task affinity & priorities, 
committed to improve NUMA for its other customers in 2019.

7.  TBB is licensed under Apache 2.0, has conda-forge feedstock, supports 
CMake, it's adopted for CPU scheduling by other industry players, has multiple 
ports for other OSes and CPU arches.

Full disclosure: I was TBB developer before its 1.0 version, responsible for 
multiple core components like hash tables, adaptive partitioning, interfaces of 
memory pools and task_arena, all of these are very relevant to Arrow. I've 
background in scalability and NUMA-aware performance optimization like what we 
did for OpenCL runtime for CPU (TBB-based). I also was behind optimizations for 
Intel Distribution for Python and its threading composability 
story.
 Thus, I'm sincerely hope to reuse all these stuff in order to deliver the best 
performance for Arrow.


Best regards,
Anton Malakhov
IAGS Scripting Analyzers & Tools

O: +1-512-3620-512
1300 S. MoPac Expy
Office:  AN4-C1-D4
Austin, TX 78746
Intel Corporation | www.intel.com