Re: [DISCUSS] Rethinking our approach to scheduling CPU and IO work in C++?

2020-09-22 Thread Matthias Vallentin
We are building a highly concurrent database for security data with Arrow
as data plane (VAST ), so I thought I'll
share our view on this since we went over pretty much all of the above
mentioned questions. I'm not trying to say "you should do it this way" but
instead share our journey, in the hope that you can draw some insight from
it.

It sounds like there are several challenges to be solved, ranging from
non-blocking I/O to efficient task-based scheduling - for some notion of
task. We found that the actor model
 solved all of
these challenges. In particular, we rely on CAF
, the C++ Actor
Framework as concrete implementation. The basic abstraction is that an
actor, which is effectively a heavy-weight task (100-200 bytes) that can be
scheduled in two ways: on a dedicated thread or in a thread pool. The
thread pool is driven either by a work-stealing or work-sharing scheduler,
based on the deployment environment. Here's how we solve some concrete
problems with these abstractions:

   1. *Asynchronous I/O*: we have one "detached" filesystem actor that
   lives in its own thread that does all I/O. All reads and writes (mmap too,
   but let's take that aside) go through this actor. You'd request a write
   with a contiguous chunk of data, and get a response back when the operation
   succeeded. Any actor can interact with the filesystem actor, also actors
   that are scheduled in the pool. The point of this abstraction is that I/O
   operations can block, which is why you never want to execute them in the
   thread pool. Otherwise all other tasks/actors that are scheduled right
   behind the I/O operation might get stalled. Sure, work-stealing will
   alleviate this, but not when steals occur frequently.
   2. *Concurrent task execution*: if the work can be *overdecomposed* into
   smaller chunks such that there are more chunks than CPU cores, a thread
   pool plus scheduler will do a good job at exploiting the available system
   concurrency. In our use case, we build dozens of indexes in parallel, with
   one actor being responsible for one column. They all run in parallel and
   independent of each other, but operate on the same (immutable) data. So
   there are no data races by design. Once an indexer actor completes a set of
   record batches, it builds a flatbuffer and ships it to the I/O actor to
   persist it. At that point we're in point (1), asynchronous I/O. This plays
   together nicely.
   3. *Network transparency*: Since the actor model communication
   abstraction is message passing, it's easy to hide the actor location, be it
   in memory or remotely available via IPC. The actor runtime takes care of
   either just passing a pointer with the message contents or doing the
   transparent serialization. This makes it very easy to build a distributed
   system if need be, or run everything in a single process.

We could have gone with lower-level abstraction, e.g., thread pool
with coroutines, but decided that we get more mileage from an actor
runtime. We see coroutines just as the syntactic sugar that make the
inversion of control more reasonable to understand through straight-line
code, but it's not a new messaging capability in the context of the actor
model implementation we use, which allows for arbitrary messaging and
synchronization patterns - all fully asynchronous, i.e., non-blocking by
yielding to the scheduler. Calling .then(...) on when a message arrives is
effectively a future, and we frequently create response promises and
message delegation patterns, often implicitly through the runtime simply by
returning a value in a lambda. We also challenged the overhead of an actor
compared to a light-weight task, but found that even creating millions of
actors in parallel in CAF still doesn't cause memory pressure or
substantially more cache misses. At the end, we could not find a reason to
*not* go with CAF, and we don't regret this choice to date. To date, we
work with the experimental credit-based streaming feature of CAF that gives
Flink-like streaming semantics though actor-based backpressure. Once again,
a single powerful abstraction to pretty much address all our needs in
scalable distributed systems that is close to the hardware as well.

I hope this helps making better decisions in finding the right abstractions
for you. I've used the actor model as a vehicle for my arguments, but there
are other isomorphic models and vocabulary (e.g., CSP). The main point I
want to get across is that thinking too low-level and single-solution (only
threapools, just coroutines, etc.) may result in a local instead of global
optimum.

Matthias


On Mon, Sep 21, 2020 at 9:38 PM Ben Kietzman  wrote:

> FWIW boost.coroutine and boost.asio provide composable coroutines,
> non blocking IO, and configurable scheduling for CPU work out of the box.
>
> The boost libra

Re: Floating-point order

2020-09-02 Thread Matthias Vallentin
I agree that users should have the capability to determine their own way,
but with Arrow going more into the direction of providing compute
building blocks (kernels), a choice must be made, e.g., when it comes to
sorting, computing a mean, etc.

Certainly -Inf and Inf are easy to put in the row, but NaN is the odd one
out. At the very left, right, between -0 and 0? There are many
possibilities, and I'm sure one could argue in depth about what makes most
sense. I think as long as the total order is explicit, users can work with
it.

I'm not sure how easy it will be to give the user choice wrt. to the
ordering. If the API allows for specifying a custom comparison function,
then the flexibility is there. Would that be possible with Gandiva?

On Wed, Sep 2, 2020 at 11:50 AM Antoine Pitrou  wrote:

>
> Well, Inf and -Inf are already ordered ;-)  Nan is, as usual, a can of
> worms.  Ordering probably doesn't belong in the Arrow spec (which is
> only concerned with representing data, not processing it).
>
> In any case, I agree that it makes sense to handle all NaNs as equal
> when implementing comparison-based functions: we're trying to do that on
> the C++ side.  It also makes sense to choose a well-defined ordering for
> them (for example "consider all NaNs smaller than non-NaNs", or larger).
>
> In some cases, you may also want to let the user alter the behaviour.
> For example, when summing an array, you may want a NaN in the input to
> force the result to NaN (as the IEEE spec would say), or you may want
> NaNs to be ignored.  NumPy has two functions (`sum` and `nansum`) for
> these two different behaviours.
>
> Regards
>
> Antoine.
>
>
> Le 02/09/2020 à 11:40, Matthias Vallentin a écrit :
> > Would it perhaps make sense to define the total order for non-numbers
> (NaN,
> > Inf, -Inf) globally (i.e., in the spec or in Arrow itself) so that the
> > behavior is the same across all languages?
> >
> > On Fri, Aug 28, 2020 at 7:42 PM Andy Grove 
> wrote:
> >
> >> Hi Jörn,
> >>
> >> I agree with your concerns about NaN. There was a discussion about this
> in
> >> https://github.com/apache/arrow/pull/7193
> >>
> >> I will try and make time this weekend to look at the current
> implementation
> >> and your suggestions around DictionaryArray.
> >>
> >> Hopefully, other contributors that are more familiar with that code can
> >> respond as well.
> >>
> >> Thanks,
> >>
> >> Andy.
> >>
> >>
> >>
> >> On Fri, Aug 28, 2020 at 8:34 AM Jörn Horstmann <
> >> joern.horstm...@signavio.com>
> >> wrote:
> >>
> >>> I ran into a few issues with the rust sort kernels and would like to
> >>> discuss possible solutions.
> >>>
> >>> 1. When sorting by multiple columns (lexsort_to_indices) the Float32
> >>> and Float64 data types are not supported because the implementation
> >>> relies on the OrdArray trait. This trait is not implemented because
> >>> f64/f32 only implements PartialOrd. The sort function for a single
> >>> column (sort_to_indices) has some special logic which looks like it
> >>> wants to treats NaN the same as null, but I'm also not convinced this
> >>> is the correct way. For example postgres does the following
> >>> (
> https://www.postgresql.org/docs/12/datatype-numeric.html#DATATYPE-FLOAT
> >> )
> >>>
> >>> "In order to allow floating-point values to be sorted and used in
> >>> tree-based indexes, PostgreSQL treats NaN values as equal, and greater
> >>> than all non-NaN values."
> >>>
> >>> I propose to do the same in an OrdArray impl for
> >>> Float64Array/Float32Array and then simplifying the sort_to_indices
> >>> function accordingly.
> >>>
> >>> 2. Sorting for dictionary encoded strings. The problem here is that
> >>> DictionaryArray does not have a generic parameter for the value type
> >>> so it is not currently possible to only implement OrdArray for string
> >>> dictionaries. Again for the single column case, the value data type
> >>> could be checked and a sort could be implemented by looking up each
> >>> key in the dictionary. An optimization could be to check the is_sorted
> >>> flag of DictionaryArray (which does not seem to be used really) and
> >>> then directly sort by the keys. For the general case I see roughly to
> >>> options
> >>>
> >>> - Somehow impleme

Re: Rust/Datafusion sort kernel issues

2020-09-02 Thread Matthias Vallentin
Would it perhaps make sense to define the total order for non-numbers (NaN,
Inf, -Inf) globally (i.e., in the spec or in Arrow itself) so that the
behavior is the same across all languages?

On Fri, Aug 28, 2020 at 7:42 PM Andy Grove  wrote:

> Hi Jörn,
>
> I agree with your concerns about NaN. There was a discussion about this in
> https://github.com/apache/arrow/pull/7193
>
> I will try and make time this weekend to look at the current implementation
> and your suggestions around DictionaryArray.
>
> Hopefully, other contributors that are more familiar with that code can
> respond as well.
>
> Thanks,
>
> Andy.
>
>
>
> On Fri, Aug 28, 2020 at 8:34 AM Jörn Horstmann <
> joern.horstm...@signavio.com>
> wrote:
>
> > I ran into a few issues with the rust sort kernels and would like to
> > discuss possible solutions.
> >
> > 1. When sorting by multiple columns (lexsort_to_indices) the Float32
> > and Float64 data types are not supported because the implementation
> > relies on the OrdArray trait. This trait is not implemented because
> > f64/f32 only implements PartialOrd. The sort function for a single
> > column (sort_to_indices) has some special logic which looks like it
> > wants to treats NaN the same as null, but I'm also not convinced this
> > is the correct way. For example postgres does the following
> > (https://www.postgresql.org/docs/12/datatype-numeric.html#DATATYPE-FLOAT
> )
> >
> > "In order to allow floating-point values to be sorted and used in
> > tree-based indexes, PostgreSQL treats NaN values as equal, and greater
> > than all non-NaN values."
> >
> > I propose to do the same in an OrdArray impl for
> > Float64Array/Float32Array and then simplifying the sort_to_indices
> > function accordingly.
> >
> > 2. Sorting for dictionary encoded strings. The problem here is that
> > DictionaryArray does not have a generic parameter for the value type
> > so it is not currently possible to only implement OrdArray for string
> > dictionaries. Again for the single column case, the value data type
> > could be checked and a sort could be implemented by looking up each
> > key in the dictionary. An optimization could be to check the is_sorted
> > flag of DictionaryArray (which does not seem to be used really) and
> > then directly sort by the keys. For the general case I see roughly to
> > options
> >
> > - Somehow implement an OrdArray view of the dictionary array. This
> > could be easier if OrdArray did not extend Array but was a completely
> > separate trait.
> > - Change the lexicographic sort impl to not use dynamic calls but
> > instead sort multiple times. So for a query `ORDER BY a, b`, first
> > sort by b and afterwards sort again by a. With a stable sort
> > implementation this should result in the same ordering. I'm curious
> > about the performance, it could avoid dynamic method calls for each
> > comparison, but it would process the indices vector multiple times.
> >
> > Looking forward to any other suggestions or feedback.
> >
> > --
> > Jörn Horstmann | Senior Backend Engineer
> >
> > www.signavio.com
> > Kurfürstenstraße 111, 10787 Berlin, Germany
> >
>


Re: [DISCUSS] Plasma appears to have been forked, consider deprecating pyarrow.serialization

2020-08-18 Thread Matthias Vallentin
We are very interested in Plasma as a stand-alone project. The fork would
hit us doubly hard, because it reduces both the appeal of an Arrow-specific
use case as well as our planned Ray integration.

We are developing effectively a database for network activity data that
runs with Arrow as data plane. See https://github.com/tenzir/vast for
details. One of our upcoming features is supporting a 1:N output channel
using Plasma, where multiple downstream tools (Python/Pandas, R, Spark) can
process the same data set that's exactly materialized in memory once. We
currently don't have the developer bandwidth to prioritize this effort, but
the concurrent, multi-tool processing capability was one of the main
strategic reasons to go with Arrow as data plane. If Plasma has no future,
Arrow has a reduced appeal for us in the medium term.

We also have Ray as a data consumer on our roadmap, but the dependency
chain seems now inverted. If we have to do costly custom plumbing for Ray,
with a custom version of Plasma, the Ray integration will lose quite a bit
of appeal because it doesn't fit into the existing 1:N model. That is, even
though the fork may make sense from a Ray-internal point of view, it
decreases the appeal of Ray from the outside. (Again, only speaking shared
data plane here.)

In the future, we're happy to contribute cycles when it comes to keeping
Plasma as a useful standalone project. We recently made sure that static
builds work as expected . As of
now, we unfortunately cannot commit to anything specific though, but our
interest extends to Gandiva, Flight, and lots of other parts of the Arrow
ecosystem.

On Tue, Aug 18, 2020 at 4:02 AM Robert Nishihara 
wrote:

> To answer Wes's question, the Plasma inside of Ray is not currently usable
>
>
> in a C++ library context, though it wouldn't be impossible to make that
>
>
> happen.
>
>
>
>
>
> I (or someone) could conduct a simple poll via Google Forms on the user
>
>
> mailing list to gauge demand if we are concerned about breaking a lot of
>
>
> people's workflow.
>
>
>
>
>
> On Mon, Aug 17, 2020 at 3:21 AM Antoine Pitrou  wrote:
>
>
>
>
>
> >
>
>
> > Le 15/08/2020 à 17:56, Wes McKinney a écrit :
>
>
> > >
>
>
> > > What isn't clear is whether the Plasma that's in Ray is usable in a
>
>
> > > C++ library context (e.g. what we currently ship as libplasma-dev e.g.
>
>
> > > on Ubuntu/Debian). That seems still useful, but if the project isn't
>
>
> > > being actively maintained / developed (which, given the series of
>
>
> > > stale PRs over the last year or two, it doesn't seem to be) it's
>
>
> > > unclear whether we want to keep shipping it.
>
>
> >
>
>
> > At least on GitHub, the C++ API seems to be getting little use.  Most
>
>
> > search results below are forks/copies of the Arrow or Ray codebases.
>
>
> > There are also a couple stale experiments:
>
>
> > https://github.com/search?l=C%2B%2B&p=1&q=PlasmaClient&type=Code
>
>
> >
>
>
> > Regards
>
>
> >
>
>
> > Antoine.
>
>
> >
>
>
>


Performance of ArrowJS in the DOM

2020-07-02 Thread Matthias Vallentin
Hi folks,

We are reaching out to better understand the performance of ArrowJS when it
comes to viewing large amounts of data (> 1M records) in the browser’s DOM.
Our backend (https://github.com/tenzir/vast) spits out record batches,
which we are accumulating in the frontend with a RecordBatchReader.

At first, we only want to render the data fast, line by line, with minimal
markup according to its types from the schema. We use a virtual scrolling
window to avoid overloading the DOM, that is, we lazily convert the record
batch data to DOM elements according to a scroll window defined by the
user. As the user scrolls, elements outside the window get removed and new
ones added.

The data consists of one or more Tables that we are pulling in through the
RecordBatchReader. We use the Async Interator interface to go over the
record batches and convert them into rows. This API feels suboptimal for
our use cases, where we want random access to the data. Is there a
faster/better way to do this?

Does anyone have any experience worth sharing with doing something similar?
The DOM is the main bottleneck, but if there are some clever things we can
do with Arrow to pull out the data in the most efficient way, that would be
nice.

Matthias


[jira] [Created] (ARROW-4319) plasma/store.h pulls ins flatbuffer dependency

2019-01-22 Thread Matthias Vallentin (JIRA)
Matthias Vallentin created ARROW-4319:
-

 Summary: plasma/store.h pulls ins flatbuffer dependency
 Key: ARROW-4319
 URL: https://issues.apache.org/jira/browse/ARROW-4319
 Project: Apache Arrow
  Issue Type: Bug
  Components: C++
Affects Versions: 0.12.0
Reporter: Matthias Vallentin


For our unit testing, we'd like to use the plasma store programmatically by 
including *plasma/store.h*. However, this header pulls in flatbuffers via 
*src/plasma/common_generated.h*. Is this a necessary include or would a forward 
declaration suffice?

Installing just flatbuffers didn't solve the problem, though. It looks like a 
specific version is needed:
{noformat}
 In file included from /Users/mavam/code/src/arrow/cpp/src/plasma/store.h:30:
 In file included from 
/Users/mavam/code/src/arrow/cpp/src/plasma/eviction_policy.h:27:
 In file included from /Users/mavam/code/src/arrow/cpp/src/plasma/plasma.h:41:
 /Users/mavam/code/src/arrow/cpp/src/plasma/common_generated.h:65:21: error: no 
matching member function for call to 'Verify'
 verifier.Verify(object_id()) &&
 ~^~
 /usr/local/include/flatbuffers/flatbuffers.h:1896:29: note: candidate template 
ignored: couldn't infer template argument 'T'
 template bool Verify(size_t elem) const {
 ^
 /usr/local/include/flatbuffers/flatbuffers.h:1905:29: note: candidate function 
template not viable: requires 2 arguments, but 1 was provided
 template bool Verify(const uint8_t *base, voffset_t elem_off)
 ^
 /usr/local/include/flatbuffers/flatbuffers.h:1880:8: note: candidate function 
not viable: requires 2 arguments, but 1 was provided
 bool Verify(size_t elem, size_t elem_len) const {
 ^
 /usr/local/include/flatbuffers/flatbuffers.h:1901:8: note: candidate function 
not viable: requires 3 arguments, but 1 was provided
 bool Verify(const uint8_t *base, voffset_t elem_off, size_t elem_len) const {
{noformat}




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: General questions about Arrow & Plasma

2017-11-25 Thread Matthias Vallentin

Here are some more examples on how to interact between Plasma and Arrow:
http://arrow.apache.org/docs/python/plasma.html, see also the C++
documentation: http://arrow.apache.org/docs/cpp/md_tutorials_plasma.html


I'm browsing through the C++ API documentation and have trouble finding
the right API to copy an arrow::Table into a Plasma object buffer.
Concretely:

   (1) How do I get the size of the underlying buffer of an
   arrow::Table needed to construct a Plasma object?

   (2) Thereafter, how do I obtain a pointer to the table data so that
   I can copy the table into the Plasma object buffer?

I also tried to look at the RecordBatch API, but couldn't find anything
there either. Both APIs seem only to provide columnar access.

   Matthias


Re: General questions about Arrow & Plasma

2017-11-18 Thread Matthias Vallentin
Thanks for your answers, Philipp. 

It sounds like the memcpy in the two-step procedure of first 
constructing and sealing an arrow record batch and the copying into the 
Plasma buffer does not constitute a performance concern. The batch 
construction itself or downstream operations (I/O, etc.) probably 
dominate. That's good to hear, as this model will also make sure that 
Plasma buffers don't waste any space (modulo page/alignment size).


The additional benefits of Plasma come in handy as we develop our 
prototype. Since we're also interested in using Ray down the line, we 
might as well get used to Plasma from the get-go. (The only reason why 
we're debating is that we'd like to keep dependencies not under our 
control to a minimum. But since Plasma ships with Arrow, and the project 
is in great health and under active development, we feel good with this 
technology investment. :-)


   Matthias

On Thu, Nov 16, 2017 at 10:37:20AM -0800, Philipp Moritz wrote:

Here are some more examples on how to interact between Plasma and Arrow:
http://arrow.apache.org/docs/python/plasma.html, see also the C++
documentation: http://arrow.apache.org/docs/cpp/md_tutorials_plasma.html

On Thu, Nov 16, 2017 at 10:31 AM, Philipp Moritz  wrote:


Hey Matthias,

1. The way it is done is as in https://github.com/apache/a
rrow/blob/c6295f3b74bcc2fa9ea1b9442f922bf564669b8e/python/
pyarrow/plasma.pyx#L394: You first create the arrow object (using the
builder from C++ or the python functions), get it's size, create a plasma
object of the required size, use the FixedSizeBufferWriter to copy the data
into shared memory (this is doing a multithreaded memcopy which is pretty
fast, for large objects we measure 15GB/s), and then seal the object. Both
of these can be done both with the C++ and Python APIs.

2. Using mmap by hand works and if you just want to exchange some data via
a POSIX file system interface it might be a good solution. Using Plasma has
a number of advantages:
a) It takes care of object lifetime management on a per object basis
between the runtimes for you
b) It can be used to synchronize object access between processes
(plasma.get yields when the creator calls plasma.seal)
c) It supports small objects of a few bytes to a few hundred bytes
efficiently by letting them share memory mapped files
d) If combined with the plasma manager from Ray, it allows to ship objects
between machines easily and also has some more object synchronization via
plasma.wait

We plan to do some improvements to the C++ API and make it so
plasma::Create return an arrow ResizableBuffer object, then from C++ it
will be easy to create arrow data with builders without copies and our
Python serialization will also be able to take limited advantage of this.

-- Philipp

On Thu, Nov 16, 2017 at 7:30 AM, Matthias Vallentin  wrote:


Two question about Plasma; my use case is sharing Arrow data between a
C++ and Python application (eventually also R).
1. What's the typical memory allocation procedure when using Plasma and
 Arrow? Do I first construct a builder, populate it, finish it, and
 *then* copy it into mmaped buffer? Or do I obtain mmaped buffer from
 Plasma first, in which the builder operates incrementally until it's
 full? If I understand it correctly, a Plasma buffer has a fixed size,   so
I wonder how you accommodate the fact that the Arrow builder   constructs a
record batches incrementally, while at the same time   avoiding extra
copying of large memory chunks after finishing the   builder.

1. Do I need Plasma to exchange the mmapped buffers between the two
 apps? Or could I mmap my Arrow data manually and tell pyarrow through   a
different mechanism to obtain the shared buffer?
   Matthias






General questions about Arrow & Plasma

2017-11-16 Thread Matthias Vallentin
Two question about Plasma; my use case is sharing Arrow data between a 
C++ and Python application (eventually also R). 

1. What's the typical memory allocation procedure when using Plasma and 
  Arrow? Do I first construct a builder, populate it, finish it, and 
  *then* copy it into mmaped buffer? Or do I obtain mmaped buffer from 
  Plasma first, in which the builder operates incrementally until it's 
  full? If I understand it correctly, a Plasma buffer has a fixed size, 
  so I wonder how you accommodate the fact that the Arrow builder 
  constructs a record batches incrementally, while at the same time 
  avoiding extra copying of large memory chunks after finishing the 
  builder.


1. Do I need Plasma to exchange the mmapped buffers between the two 
  apps? Or could I mmap my Arrow data manually and tell pyarrow through 
  a different mechanism to obtain the shared buffer? 


   Matthias