Ok, my skill with C++ got in the way of my ability to put something
together.  First, I did not realize that C++ futures were a little
different than the definition I'm used to for futures.  By default,
C++ futures are not composable, you can't add continuations with
`then`, `when_all` or `when_any`.  There is an extension for this (not
sure if it will make it even in C++20) and there are continuations for
futures in boost's futures.  However, since arrow is currently using
its own future implementation I could not use either of these
libraries.  I spent a bit trying to add continuations to arrow's
future implementation but my lack of skill with C++ got in the way.  I
want to keep working on it but it may be a few days.  In the meantime
I will try and type up something more complete (with a few diagrams)
to explain what I'm intending.

Having looked at the code for a while I do have a better sense of what
is involved.  I think it would be a pretty extensive set of changes.
Also, it looks like C++20 is planning on adopting co-routines which
they will be using for sequential async.  So perhaps it makes more
sense to go directly to coroutines instead of moving to composable
futures and then later to coroutines at some point in the future.

Also, re: Julia, I looked into it a bit further and Julia is using
libuv under the hood for all file I/O (which is non-blocking I/O).
Also async/await are built into the bones of Julia.  As far as I can
tell from my brief examination is that there is no way to have a Julia
task that is performing blocking I/O (in the sense that a "thread pool
thread" is blocked on I/O.  You can have blocking I/O in the
async/await sense where you are awaiting on I/O to maintain sequential
semantics.

On Wed, Sep 16, 2020 at 8:10 AM Weston Pace <weston.p...@gmail.com> wrote:
>
> If you want to specifically look at the problem of dataset scanning,
> file scanning, and nested parallelism then probably the lowest effort
> improvement would be to eliminate the whole idea of "scan threads".
> You currently have...
>
>     for (size_t i = 0; i < readers.size(); ++i) {
>         ARROW_ASSIGN_OR_RAISE(futures[i], pool->Submit(ReadColumnFunc, i));
>     }
>     Status final_status;
>     for (auto& fut : futures) {
>         final_status &= fut.status();
>     }
>     // Hiding some follow-up aggregation and the next line is a bit 
> abbreviated
>     return Validate();
>
> You're already using futures so it would be pretty straightforward to
> change that to
>
>     for (size_t i = 0; i < readers.size(); ++i) {
>         ARROW_ASSIGN_OR_RAISE(futures[i], pool->Submit(ReadColumnFunc, i));
>     }
>     // Hiding some follow-up aggregation and the next line is a bit 
> abbreviated
>     return 
> std::experimental::when_all(futures).then(FollowUpAggregation).then(Validate);
>
> Dataset scans are currently using a threaded task group.  Those would
> change to std::experimental::when_all instead.  So now the dataset
> scan is not creating N threads but again just returning a composed
> future.  So if you have one dataset scan across 4 files and each file
> kicks off 10 column reader tasks then you have 40 "threads" submitted
> to your thread pool and the main calling thread waiting on the future.
> All of these thread pool threads are inner worker threads.  None of
> these thread pool threads have to wait on other threads.  There is no
> possibility of deadlock.
>
> You can do this at each level of nesting so that only your inner most
> worker threads are actually calling `pool->Submit`.  There is then
> just one outer main thread (presumably not a thread pool thread) that
> is waiting on the future.  It's not a super small change because now
> FileReaderImpl::ReadRowGroups returns a future.  That would have to
> propagate all the way up so that your dataset scan itself is returning
> a future (you can safely synchronize it at this point so your public
> API remains synchronous because no public API call is going to be
> arriving on a thread pool thread).
>
> That at least solves the deadlock problem.  It also starts to
> propagate futures throughout the code base which could be good or bad
> depending on your view of such things.  It does not solve the
> under-utilization problem because you still have threads sitting in
> the thread pool waiting on blocking I/O.
>
> The next step would be to move to non-blocking I/O.  At this point you
> have quite a few choices.
>
> On Wed, Sep 16, 2020 at 7:26 AM Wes McKinney <wesmck...@gmail.com> wrote:
> >
> > On Wed, Sep 16, 2020 at 10:31 AM Jorge Cardoso Leitão
> > <jorgecarlei...@gmail.com> wrote:
> > >
> > > Hi,
> > >
> > > I am not sure I fully understand, so I will try to give an example to
> > > check: we have a simple query that we want to write the result to some
> > > place:
> > >
> > > SELECT t1.b * t2.b FROM t1 JOIN ON t2 WHERE t1.a = t2.a
> > >
> > > At the physical plane, we need to
> > >
> > > 1. read each file in batches
> > > 2. join the batches
> > > 3. iterate over results and write them in partitions
> > >
> > > In principle, we can multi-thread them
> > >
> > > 1. multi-threaded scan
> > > 2. multi-threaded hash join (e.g. with a shared map)
> > > 3. multi-threaded write (e.g. 1 file per partition)
> > >
> > > The issue is that when we schedule this, the physical nodes themselves
> > > control how they perform their own operations, and there is no
> > > orchestration as to what resources are available and what should be
> > > prioritized. Consequently, we may have a scan of table t1 that is running
> > > with 12 threads, while the scan of table t2 is waiting for a thread to be
> > > available. This causes the computation to stall as both are required for
> > > step 2 to proceed. OTOH, if we have no multithreaded scans, then
> > > multithreading seldom helps, as we are bottlenecked by the scans'
> > > throughput. Is this the gist of the problem?
> > >
> > > If yes: the core issue here seems to be that there is no orchestrator to
> > > re-prioritize CPU to where it is needed (the scan of t2 in the example
> > > above), because each physical node has a thread.join that is not
> > > coordinated with their downstream dependencies (and so on). Isn't this a
> > > natural candidate for futures/async? We seem to need some coordination
> > > across the DAG.
> > >
> > > If not: could someone offer an example describing how the multi-threaded
> > > scan can cause a deadlock?
> >
> > Suppose that we have 4 large CSV files in Amazon S3 and a static
> > thread pool with 4 threads. If we use the thread pool to execute scan
> > tasks for all 4 files in parallel, then if any of those scan tasks
> > internally try to spawn tasks in the same thread pool (before other
> > tasks have finished) to parallelize some of their computational work
> > -- i.e. "nested parallelism" is what we call this -- then you have a
> > deadlock because our current thread pool implementation cannot
> > distinguish between task interdependencies / does not understand
> > nested parallelism.
> >
> > > Best,
> > > Jorge
> > >
> > >
> > >
> > >
> > > On Wed, Sep 16, 2020 at 4:16 PM Wes McKinney <wesmck...@gmail.com> wrote:
> > >
> > > > hi Jacob,
> > > >
> > > > The approach taken in Julia strikes me as being motivated by the same
> > > > problems that we have in this project. It would be interesting if
> > > > partr could be used as the basis of our nested parallelism runtime.
> > > > How does Julia handle IO calls within spawned tasks? In other words,
> > > > if we have a function like:
> > > >
> > > > void MyTask() {
> > > >   DoCPUWork();
> > > >   DoSomeIO();
> > > >   DoMoreCPUWork();
> > > >   DoAdditionalIO();
> > > > }
> > > >
> > > > (or maybe you just aren't supposed to do that)
> > > >
> > > > The biggest question would be the C++ programming model (in other
> > > > words, how we have to change our approach to writing code) that we use
> > > > throughout the Arrow libraries. What I'm getting at is to figure out
> > > > how to minimize the amount of code that needs to be significantly
> > > > altered to fit in with the new approach to work scheduling. For
> > > > example, it doesn't strike me that the API that we are using to
> > > > parallelize reading Parquet files at the column level is going to work
> > > > because there are various IO calls within the tasks that are being
> > > > submitted to the thread pool
> > > >
> > > >
> > > > https://github.com/apache/arrow/blob/apache-arrow-1.0.1/cpp/src/parquet/arrow/reader.cc#L859-L875
> > > >
> > > > - Wes
> > > >
> > > > On Wed, Sep 16, 2020 at 1:37 AM Jacob Quinn <quinn.jac...@gmail.com>
> > > > wrote:
> > > > >
> > > > > My immediate thought reading the discussion points was Julia's 
> > > > > task-based
> > > > > multithreading model that has been part of the language for over a 
> > > > > year
> > > > > now. An announcement blogpost for Julia 1.3 laid out some of the 
> > > > > details
> > > > > and high-level approach:
> > > > https://julialang.org/blog/2019/07/multithreading/,
> > > > > and the multithreading code was marked stable in the recent 1.5 
> > > > > release.
> > > > >
> > > > > Kiran, one of the main contributors to the threading model in Julia,
> > > > worked
> > > > > on a separate C-based repo for the core functionality (
> > > > > https://github.com/kpamnany/partr), but I think the latest code is
> > > > embedded
> > > > > in the Julia source code now.
> > > > >
> > > > > Anyway, probably most useful as a reference, but Jameson (cc'd) also 
> > > > > does
> > > > > weekly multithreading chats (on Wednesdays), so I imagine he wouldn't
> > > > mind
> > > > > chatting about things if desired.
> > > > >
> > > > > -Jacob
> > > > >
> > > > > On Tue, Sep 15, 2020 at 8:17 PM Weston Pace <weston.p...@gmail.com>
> > > > wrote:
> > > > >
> > > > > > My C++ is pretty rusty but I'll see if I can come up with a concrete
> > > > > > CSV example / experiment / proof of concept on Friday when I have a
> > > > > > break from work.
> > > > > >
> > > > > > On Tue, Sep 15, 2020 at 3:47 PM Wes McKinney <wesmck...@gmail.com>
> > > > wrote:
> > > > > > >
> > > > > > > On Tue, Sep 15, 2020 at 7:54 PM Weston Pace 
> > > > > > > <weston.p...@gmail.com>
> > > > > > wrote:
> > > > > > > >
> > > > > > > > Yes.  Thank you.  I am in agreement with you and 
> > > > > > > > futures/callbacks
> > > > are
> > > > > > > > one such "richer programming model for
> > > > > > > > hierarchical work scheduling".
> > > > > > > >
> > > > > > > > A scan task with a naive approach is:
> > > > > > > >
> > > > > > > >     workers = partition_files_list(files_list)
> > > > > > > >     for worker in workers:
> > > > > > > >         start_thread(worker)
> > > > > > > >     for worker in workers:
> > > > > > > >         join_thread(worker)
> > > > > > > >     return aggregate_results()
> > > > > > > >
> > > > > > > > You have N+1 threads because you have N worker threads and 1 
> > > > > > > > scan
> > > > > > > > thread.  There is the potential for deadlock if your thread pool
> > > > only
> > > > > > > > has one remaining spot and it is given to the scan thread.
> > > > > > > >
> > > > > > > > On the other hand, with a futures based approach you have:
> > > > > > > >
> > > > > > > > futures = partition_files_list(files_list)
> > > > > > > > return when_all(futures).do(aggregate_results)
> > > > > > > >
> > > > > > > > There are only N threads.  The scan thread goes away.  In fact, 
> > > > > > > > if
> > > > all
> > > > > > > > of your underlying OS/FS libraries are non-blocking then you can
> > > > > > > > completely eliminate threads in the waiting state and an entire
> > > > > > > > category of deadlocks are no longer a possibility.
> > > > > > >
> > > > > > > I don't quite follow. I think it would be most helpful to focus 
> > > > > > > on a
> > > > > > > concrete practical matter like reading Parquet or CSV files in
> > > > > > > parallel (which can be go faster through parallelism at the single
> > > > > > > file level) and devise a programming model in C++ that is 
> > > > > > > different
> > > > > > > from what we are currently doing that results in superior CPU
> > > > > > > utilization.
> > > > > > >
> > > > > > >
> > > > > > > >
> > > > > > > > -Weston
> > > > > > > >
> > > > > > > > On Tue, Sep 15, 2020 at 1:21 PM Wes McKinney 
> > > > > > > > <wesmck...@gmail.com>
> > > > > > wrote:
> > > > > > > > >
> > > > > > > > > hi Weston,
> > > > > > > > >
> > > > > > > > > We've discussed some of these problems in the past -- I was
> > > > > > > > > enumerating some of these issues to highlight the problems 
> > > > > > > > > that
> > > > are
> > > > > > > > > resulting from an absence of a richer programming model for
> > > > > > > > > hierarchical work scheduling. Parallel tasks originating in 
> > > > > > > > > each
> > > > > > > > > workload are submitted to a global thread pool where they are
> > > > > > > > > commingled with the tasks coming from other workloads.
> > > > > > > > >
> > > > > > > > > As an example of how this can go wrong, suppose we have a 
> > > > > > > > > static
> > > > > > > > > thread pool with 4 executors. If we submit 4 long-running 
> > > > > > > > > tasks
> > > > to
> > > > > > the
> > > > > > > > > pool, and then each of these tasks spawn additional tasks 
> > > > > > > > > that go
> > > > > > into
> > > > > > > > > the thread pool, a deadlock can occur, because the thread pool
> > > > thinks
> > > > > > > > > that it's executing tasks when in fact those tasks are 
> > > > > > > > > waiting on
> > > > > > > > > their dependent tasks to complete.
> > > > > > > > >
> > > > > > > > > A similar resource underutilization occurs when we do
> > > > > > > > > pool->Submit(ReadFile), where ReadFile needs to do some IO --
> > > > from
> > > > > > the
> > > > > > > > > thread pool's perspective, the task is "working" even though 
> > > > > > > > > it
> > > > may
> > > > > > > > > wait for one or more IO calls to complete.
> > > > > > > > >
> > > > > > > > > In the Datasets API in C++ we have both of these problems: 
> > > > > > > > > file
> > > > scan
> > > > > > > > > tasks are being pushed onto the global thread pool, and so to
> > > > prevent
> > > > > > > > > deadlocks multithreaded file parsing has been disabled.
> > > > Additionally,
> > > > > > > > > the scan tasks do IO, resulting in suboptimal performance (the
> > > > > > > > > problems caused by this will be especially exacerbated when
> > > > running
> > > > > > > > > against slower filesystems like Amazon S3)
> > > > > > > > >
> > > > > > > > > Hopefully the issues are more clear.
> > > > > > > > >
> > > > > > > > > Thanks
> > > > > > > > > Wes
> > > > > > > > >
> > > > > > > > > On Tue, Sep 15, 2020 at 2:57 PM Weston Pace <
> > > > weston.p...@gmail.com>
> > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > It sounds like you are describing two problems.
> > > > > > > > > >
> > > > > > > > > > 1) Idleness - Tasks are holding threads in the thread pool
> > > > while
> > > > > > they
> > > > > > > > > > wait for IO or some long running non-CPU task to complete.
> > > > These
> > > > > > > > > > threads are often in a "wait" state or something similar.
> > > > > > > > > > 2) Fairness - The ordering of tasks is causing short tasks 
> > > > > > > > > > that
> > > > > > could
> > > > > > > > > > be completed quickly from being stuck behind longer term 
> > > > > > > > > > tasks.
> > > > > > > > > > Fairness can be an issue even if all tasks are always in the
> > > > active
> > > > > > > > > > state consuming CPU time.
> > > > > > > > > >
> > > > > > > > > > Are both of these issues a problem?  Are you looking to 
> > > > > > > > > > address
> > > > > > both of them?
> > > > > > > > > >
> > > > > > > > > > I doubt it's much help as it is probably a more substantial
> > > > change
> > > > > > > > > > than what you were looking for but the popular solution to 
> > > > > > > > > > #1
> > > > these
> > > > > > > > > > days seems to be moving toward non blocking IO with
> > > > > > > > > > promises/callbacks/async.  That way threads are never in the
> > > > > > waiting
> > > > > > > > > > state (unless sitting idle in the pool).
> > > > > > > > > >
> > > > > > > > > > -Weston
> > > > > > > > > >
> > > > > > > > > > On Tue, Sep 15, 2020 at 7:00 AM Wes McKinney <
> > > > wesmck...@gmail.com>
> > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > In light of ARROW-9924, I wanted to rekindle the 
> > > > > > > > > > > discussion
> > > > > > about our
> > > > > > > > > > > approach to multithreading (especially the _programming
> > > > model_)
> > > > > > in
> > > > > > > > > > > C++. We had some discussions about this about 6 months ago
> > > > and
> > > > > > there
> > > > > > > > > > > were more discussions as I recall in summer 2019.
> > > > > > > > > > >
> > > > > > > > > > > Realistically, we are going to be consistently dealing 
> > > > > > > > > > > with
> > > > > > > > > > > independent concurrent in-process workloads that each
> > > > > > respectively can
> > > > > > > > > > > go faster by multithreading. These could be things like:
> > > > > > > > > > >
> > > > > > > > > > > * Reading file formats (CSV, Parquet, etc.) that benefit 
> > > > > > > > > > > from
> > > > > > > > > > > multithreaded parsing/decoding
> > > > > > > > > > > * Reading one or more files in parallel using the Datasets
> > > > API
> > > > > > > > > > > * Executing any number of multithreaded analytical 
> > > > > > > > > > > workloads
> > > > > > > > > > >
> > > > > > > > > > > One obvious issue with our thread scheduling is the FIFO
> > > > nature
> > > > > > of the
> > > > > > > > > > > global thread pool. If a new independent multithreaded
> > > > workload
> > > > > > shows
> > > > > > > > > > > up, it has to wait for other workloads to complete before 
> > > > > > > > > > > the
> > > > > > new work
> > > > > > > > > > > will be scheduled. Think about a Flight server serving
> > > > queries to
> > > > > > > > > > > users -- is it fair for one query to "hog" the thread pool
> > > > and
> > > > > > force
> > > > > > > > > > > other requests to wait until they can get access to some 
> > > > > > > > > > > CPU
> > > > > > > > > > > resources? You could imagine a workload that spawns 10
> > > > minutes
> > > > > > worth
> > > > > > > > > > > of CPU work, where a new workload has to wait for all of 
> > > > > > > > > > > that
> > > > > > work to
> > > > > > > > > > > complete before having any tasks scheduled for execution.
> > > > > > > > > > >
> > > > > > > > > > > The approach that's been taken in the Datasets API to 
> > > > > > > > > > > avoid
> > > > > > problems
> > > > > > > > > > > with nested parallelism (file-specific operations spawning
> > > > > > multiple
> > > > > > > > > > > tasks onto the global thread pool) is simply to disable
> > > > > > multithreading
> > > > > > > > > > > at the level of a single file. This is clearly suboptimal.
> > > > > > > > > > >
> > > > > > > > > > > We have additional problems in that some file-loading 
> > > > > > > > > > > related
> > > > > > tasks do
> > > > > > > > > > > a mixture of CPU work and IO work, and once a thread has 
> > > > > > > > > > > been
> > > > > > > > > > > dispatched to execute one of these tasks, when IO takes
> > > > place, a
> > > > > > CPU
> > > > > > > > > > > core may sit underutilized while the IO is waiting.
> > > > > > > > > > >
> > > > > > > > > > > There's more aspects we can discuss, but in general I 
> > > > > > > > > > > think
> > > > we
> > > > > > need to
> > > > > > > > > > > come up with a programming model for building our C++ 
> > > > > > > > > > > system
> > > > > > > > > > > components with the following requirements:
> > > > > > > > > > >
> > > > > > > > > > > * Deadlocks not possible by design
> > > > > > > > > > > * Any component can safely use "nested parallelism" 
> > > > > > > > > > > without
> > > > the
> > > > > > > > > > > programmer having to worry about deadlocks or one task
> > > > "hogging"
> > > > > > the
> > > > > > > > > > > thread pool. So in other words, if there's only a single
> > > > > > > > > > > multithreading-capable workload running, we "let it rip"
> > > > > > > > > > > * Resources can be reasonably fairly allocated amongst
> > > > concurrent
> > > > > > > > > > > workloads (think: independent requests coming in through
> > > > Flight,
> > > > > > or
> > > > > > > > > > > scan tasks on different Parquet files in the Datasets 
> > > > > > > > > > > API).
> > > > Limit
> > > > > > > > > > > scenarios where a new workload is blocked altogether on 
> > > > > > > > > > > the
> > > > > > completion
> > > > > > > > > > > of other workloads
> > > > > > > > > > > * A well-defined programming pattern for tasks that do a
> > > > mixture
> > > > > > of
> > > > > > > > > > > CPU work and IO work that allows CPU cores to be used 
> > > > > > > > > > > when a
> > > > > > task is
> > > > > > > > > > > waiting on IO
> > > > > > > > > > >
> > > > > > > > > > > We can't be the only project that has these problems, so 
> > > > > > > > > > > I'm
> > > > > > > > > > > interested to see what solutions have been successfully
> > > > employed
> > > > > > by
> > > > > > > > > > > others. For example, it strikes me as similar to 
> > > > > > > > > > > concurrency
> > > > > > issues
> > > > > > > > > > > inside an analytic database. How are they preventing
> > > > concurrent
> > > > > > > > > > > workload starvation problems or handling CPU/IO task
> > > > scheduling
> > > > > > to
> > > > > > > > > > > avoid CPU underutilization?
> > > > > > > > > > >
> > > > > > > > > > > Choices of which threading libraries we might use to
> > > > implement a
> > > > > > > > > > > viable solution (e.g. TBB) seem secondary to the 
> > > > > > > > > > > programming
> > > > > > model
> > > > > > > > > > > that we use to implement our components.
> > > > > > > > > > >
> > > > > > > > > > > Thanks,
> > > > > > > > > > > Wes
> > > > > >
> > > >

Reply via email to