On Thu, May 9, 2019 at 4:44 PM Wes McKinney <[email protected]> wrote:

> hi Ravi,
>
> It's hard to say anything without looking at some code. Are you
> working on a pull request? This seems like potentially useful
> functionality to have in the project.
>
We are currently developing on an internal netflix fork so I'm happy to
paste some snippets of the code here in the email, and would love to see
higher level feedback on the ideas used in the change.
We can also have a phone call later next week to discuss how we can
contribute back to arrow, as our implementation matures and reaches a
concrete API. We want to first find something that compares at a fair level
with Pandas, and we aren't there yet.

>From our experiments - Array::Slice seems more expensive than one would
expect. It is currently copying the vectors (of buffers) so I was slightly
thrown off the by the comment "zero-copy" in .h file. Is there a reason for
choice of vector instead of another (specialized) container?

When the runs are of small length (<= 128), I want to understand if there
is an easy way to copy elements instead of paying the cost of Slice and
pushing many elements to the output vector (which is also expensive).

Sorry if the email seems very high level, but I'm new to Arrow so I'm just
looking for higher level guidance towards which API I could use or is more
performant.

Ravi.

void slice_from_column_chunk(const
std::shared_ptr<arrow::ChunkedArray>& column_chunk,
            int64_t offset,
            int64_t length,
            arrow::ArrayVector& col_output) {
        // Trivial way:
        // output_chunks = column_chunk->Slice(offset, length).

        const arrow::ArrayVector& all_chunks = column_chunk->chunks();
        size_t num_chunks = all_chunks.size();

        // Find start_chunk to start slicing from.
        // TODO: Think of smart ways to avoid this linear scan since we are
        // doing contiguous slicing.
        size_t curr_chunk = 0;
        while (curr_chunk < num_chunks && offset >=
all_chunks[curr_chunk]->length()) {
            offset -= all_chunks[curr_chunk]->length();
            curr_chunk++;
        }

        while (curr_chunk < num_chunks && length > 0) {
            col_output.push_back(all_chunks[curr_chunk]->Slice(offset, length));
            length -= (all_chunks[curr_chunk]->length() - offset);
            offset = 0;
            curr_chunk++;
        }
    }


    /*! \brief Slice array data from a tables columns

      Slice arrays from a tables columns and store array chunks in a vector for
      each column.

      @param table The table from which to slice
      @param offset The offset to start a slice
      @param length The length of the slice
      @param output A vector where array chunks for each column can be
      appended

      @return None

     */

    void slice_from_table(const
std::vector<std::shared_ptr<arrow::ChunkedArray>>& column_chunks,
            const int64_t offset,
            const int64_t length,
            std::vector<arrow::ArrayVector>& output)
    {
        assert (output->size() == column_chunks.size());

        // TODO(crk): Easily parallelize the below function since they
read/write
        // to exclusive locations - so least contention.
        for (size_t i = 0; i < column_chunks.size(); ++i) {
            slice_from_column_chunk(column_chunks[i], offset, length,
output[i]);
        }

    }

    /*! \brief Select rows from a table given an indicator array

      Select the rows from a table that are indicated by a user supplied
      array. Rows are copied to the new table The indicator array must have the
      same number of elements as the table has rows.

      This function assumes that the indicator array contains no NULLs, is of
      underlying type BooleanArray, and has length equal to the number of rows
      in the input table

      @param indicator_array A boolean array where true elements
indicated rows to be selected
      @param table The table from which to elect rows

      @return A new table with the desired rows

     */


    template <typename T>
        typename std::enable_if<array_like<T>::value,
std::pair<arrow::Status, std::shared_ptr<arrow::Table> > >::type
        select_rows(std::shared_ptr<T> const & indicator,
std::shared_ptr<arrow::Table> const & table)
        {
            // get chunked array representation
            std::shared_ptr<arrow::ChunkedArray> indicator_array =
array_like<T>::chunked_array(indicator);

            // declare returns
            ReturnValue<std::shared_ptr<arrow::Table> > ret;

            // check that both pointers are valid
            if (!indicator_array || !table)
            {
                ret.status = arrow::Status::Invalid("Pointers must be valid");
                return ReturnValue<std::shared_ptr<arrow::Table>
>::as_pair(std::move(ret));
            }

            int num_columns = table->num_columns();
            // Prepare a vector that will hold all the arrays for each column
            std::vector<arrow::ArrayVector> out_arrays(num_columns);

            std::vector<std::shared_ptr<arrow::ChunkedArray>>
column_chunks(num_columns);
            for (int i = 0; i < num_columns; ++i) {
                column_chunks[i] = table->column(i)->data();
            }

            // loop over elements in the binary array to find
contigious blocks of true
            // values, and slice rows from the table as we go
            int64_t offset = -1;
            int64_t data_idx = 0;
            bool in_run = false;

            for (int64_t chunk_idx = 0; chunk_idx <
indicator_array->num_chunks(); ++chunk_idx)
            {
                /* for each chunk:
                   - cast to boolean array and exit if fail
                   - loop over each chunks elements
                   - update run-length variables
                   - if we end a run, slice the data
                 */
                std::shared_ptr<arrow::BooleanArray> barray =
std::dynamic_pointer_cast<arrow::BooleanArray>(indicator_array->chunk(chunk_idx));
                if (!barray)
                {
                    ret.status = arrow::Status::TypeError("Underlying
array type must be BooleanArray");
                    return ReturnValue<std::shared_ptr<arrow::Table>
>::as_pair(std::move(ret));
                }

                // look for sequences to slice
                arrow::BooleanArray const & array = *barray;
                for (int64_t array_idx = 0; array_idx <
array.length(); ++array_idx)
                {
                    // update offset if we are at start of a run
                    bool const is_selected = array.Value(array_idx);
                    if (is_selected && !in_run)
                    {
                        offset = data_idx;
                        in_run = true;
                    }

                    // slice data if we are at end of a run
                    else if (!is_selected && in_run)
                    {
                        int64_t const length = data_idx - offset;
                        slice_from_table(column_chunks, offset,
length, out_arrays);
                        offset = -1;
                        in_run = false;
                    }

                    ++data_idx;
                }
            }

            // if we are in a run, take the last slice.
            if (in_run)
            {
                int64_t const length = data_idx - offset;
                slice_from_table(column_chunks, offset, length, out_arrays);
            }

            // resize out_arrays if nothing were sliced.
            if ((int) out_arrays.size() != table->num_columns())
            {
                out_arrays.resize(table->num_columns());
            }

            // create columns
            std::shared_ptr<arrow::Schema> schema = table->schema();
            std::vector<std::shared_ptr<arrow::Column> >
column_vector(schema->num_fields());
            for (uint32_t col_idx = 0; col_idx < column_vector.size();
++col_idx)
            {
                column_vector[col_idx].reset(new
arrow::Column(schema->field(col_idx), out_arrays[col_idx]));
            }

            // create the table and return
            ret.value = arrow::Table::Make(schema, column_vector);
            return ReturnValue<std::shared_ptr<arrow::Table>
>::as_pair(std::move(ret));
        }




>
> - Wes
>
> On Thu, May 9, 2019 at 1:10 PM Ravi Kiran Chirravuri
> <[email protected]> wrote:
> >
> > Hi,
> >
> > I'm working on implementing a filter-like functionality over a
> > Table::Column given an indicator array.
> > For now, we've implemented a basic version leveraging Array::Slice while
> > iterating over the Table::Column->chunks() but this seems slow when the
> > contiguous runs are of length <= 128 (compared to pandas).
> >
> >   1 Total rows in table:  646035
> >
> >   2 crk: Period:  128
> >
> >   3 Expected output rows:  640987
> >
> >   4 PROFILE: select starting
> >
> >   5 arrowops actual_num_rows:  0
> >
> >   6 PROFILE: select completed in 795ms
> >
> >   7 PROFILE: pandas starting
> >
> >   8 pandas actual_num_rows:  640987
> >
> >   9 PROFILE: pandas completed in 737ms
> >
> > Before I dive into more observations about my implementation, I wanted to
> > check if there are alternatives I could consider that might have
> > substantially different performance characteristics.
> >
> > For comparison - pandas always takes ~750 ms for processing the 650k rows
> > in different to the indicator distribution; but our implementation to
> copy
> > the array-slice(s) into an output (ArrayVector) degrades quite a bit as
> the
> > contiguous run-length reduces.
> >
> > For a degenerate case with alternating 1's and 0's in indicator:
> >
> > Expected output rows:  323017
> >
> > PROFILE: select starting
> >
> > arrowops actual_num_rows:  323017
> >
> > PROFILE: select completed in 72110ms
> >
> > PROFILE: pandas starting
> >
> > pandas actual_num_rows:  323017
> >
> > PROFILE: pandas completed in 492ms
> >
> > Thanks in advance,
> > Ravi.
>

Reply via email to