We’ve been doing this in a few different ways at Graphistry, mostly guided by 
use case and device characteristics.

For temporary/in-memory/microservice CPU workloads, we’ll compute a set of 
valid row indices as one side of a DictionaryVector, with the original 
table/column as the dictionary side. If we want to filter on the filtered set, 
we compute a new set of indices, and use the DictionaryVector as the dictionary 
side of another DictionaryVector. Usually we aren’t filtering more than three 
or four levels, and with this approach we can always map back to the original 
indices in a tight loop.

For GPU workloads we have two approaches:

The physics kernels always operate on the graph in place, with host/device 
transfers kept to a minimum (and absolutely zero reallocation). We have two 
buffers of indices that the kernels use for row lookups into the node and edge 
tables. Even though we don’t resize these, we do update them, and set the 
“logical length” for the tables to their length. In our testing we’ve observed 
the coalesced global memory accesses is more performant/predictable than 
validity bitmaps due to BarnesHut’s irregularity each step.

The other approach is our cuda/cudf-backed analytical services. For these I’ve 
been generating and shuttling validity bitmaps between services. These 
workloads are bursty and infrequent enough (per client) that it’s more 
important to free the GPU memory for another client. The masks are shipped back 
to the service tier, where they’re used to build indices for those 
DictionaryVectors I mentioned before.

For sending materialized datasets over the wire, we also have a few approaches.

For the GPU/visualization state, we have a server-side kernel that prepares the 
filtered rows as arrow recordbatch bodies, so the only thing the CPU has to do 
is copy them to the network interface and write the arrow metadata.

For general-purpose (and pre-arrow) rich-client/server APIs we use a project I 
built at Netflix called Falcor, which is essentially Netflix-flavored GraphQL. 
Clients issue queries for a subset of resources from the server’s expansive 
“virtual graph” data model, intelligently caching/invalidating as necessary. We 
read directly from those nested-DictionaryVectors at the service tier, just 
copying out to small json payloads.

I did have a “dictionary-filter” transform stream for selecting only the 
referenced rows in the set of dictionary batches, but abandoned it shortly 
after I finished the big JS refactor. It’s probably still useful in other 
contexts, but for now we’re happy with just sending around indices/masks, and 
have opted to keep dictionaries in blob storage or shared memory (when 
necessary).

Paul


> On Jan 27, 2019, at 9:52 PM, Wes McKinney <wesmck...@gmail.com> wrote:
> 
> I was having a discussion recently about Arrow and the topic of
> server-side filtering vs. client-side filtering came up.
> 
> The basic problem is this:
> 
> If you have a RecordBatch that you wish to filter out some of the
> "rows", one way to track this in-memory is to create a separate array
> of true/false values instead of forcing a materialization of a
> filtered RecordBatch.
> 
> So you might have a record batch with 2 fields and 4 rows
> 
> a: [1, 2, 3, 4]
> b: ['foo', 'bar', 'baz', 'qux']
> 
> and then a filter
> 
> is_selected: [true, true, false, true]
> 
> This can be easily handled as an application-level concern. Creating a
> bitmask is generally cheap relative to materializing the filtered
> version, and some operators may support "pushing down" such filters
> (e.g. aggregations may accept a selection mask to exclude "off"
> values). I myself implemented such a scheme in the past for a query
> engine I built in the 2013-2014 time frame and it yielded material
> performance improvements in some cases.
> 
> One question is what you should do when you want to put the data on
> the wire, e.g. via RPC / Flight or IPC. Two options
> 
> * Pass the complete RecordBatch, plus the filter as a "special" field
> and attach some metadata so that you know that filter field is
> "special"
> 
> * Filter the RecordBatch before sending, send only the selected rows
> 
> The first option can of course be implemented as an application-level
> detail, much as we handle the serialization of pandas row indexes
> right now (where we have custom pandas metadata and "special"
> fields/columns for the index arrays). But it could be a common enough
> use case to merit a more well-defined standardized approach.
> 
> I'm not sure what the answer is, but I wanted to describe the problem
> as I see it and see if anyone has any thoughts about it.
> 
> I'm aware that Dremio is a user of "selection vectors" (selected
> indices, instead of boolean true/false values, so we would have [0, 1,
> 3] in the above case), so the similar discussion may apply to passing
> selection vectors on the wire.
> 
> - Wes

Reply via email to