Re: [DataFusion] Question about Accumulator API and maybe potential bugs

2022-01-03 Thread Jorge Cardoso Leitão
Hi,

The accumulator API is designed to accept multiple columns (e.g. the
pearson correlation takes 2 columns, not one). [0] corresponds to
the first column passed to the accumulator. All concrete implementations of
accumulators in DataFusion atm only accept one column (Sum, Avg, Count,
Min, Max), but the API is designed to accept with multiple columns.

So, update_batch( self, values: &[ArrayRef]) corresponds to: update the
accumulator from n columns. For sum, this would be 1, for pearson
correlation this would be 2, for e.g. a ML model whose weights are computed
over all columns, this would be the number of input columns N of the model.
For stddev, you should use 1, since stddev is a function of a single
column.

`update( self, values: &[ScalarValue])` corresponds to updating the
state with intermediary states. In a HashAggregate, we reduce each
partition, and use `update` to compute the final value from the
intermediary (scalar) states.

Hope this helps,
Jorge



On Tue, Jan 4, 2022 at 5:55 AM LM  wrote:

> Hi All,
>
> I just started looking into DataFusion and am considering using it as the
> platform for our next gen analytics solution. To get started, I tried to
> add a few functions such as stddev. While writing the code I noticed some
> discrepancies (it may also be my unfamiliarity of the code base) in the
> Accumulator API and the implementation of some functions. The API is
> defined as the following:
>
> pub trait Accumulator: Send + Sync + Debug {
> /// Returns the state of the accumulator at the end of the accumulation.
> // in the case of an average on which we track `sum` and `n`, this function
> should return a vector
> // of two values, sum and n.
> fn state() -> Result>;
>
> /// updates the accumulator's state from a vector of scalars.
> fn update( self, values: &[ScalarValue]) -> Result<()>;
>
> /// updates the accumulator's state from a vector of arrays.
> fn update_batch( self, values: &[ArrayRef]) -> Result<()> {
> if values.is_empty() {
> return Ok(());
> };
> (0..values[0].len()).try_for_each(|index| {
> let v = values
> .iter()
> .map(|array| ScalarValue::try_from_array(array, index))
> .collect::>>()?;
> self.update()
> })
> I am only quoting the update and update_batch functions for brevity, same
> for the merge functions. So here it indicates that the update function
> takes a *vector* and update_batch takes *vector of array. *
>
> When reading code for some actual implementation for example *sum* and
> *average,
> *both implementations assume when update is called *only one *value is
> passed in; and when update_batch is called *only one *array is passed in.
>
> impl Accumulator for AvgAccumulator {
> fn state() -> Result> {
> Ok(vec![ScalarValue::from(self.count), self.sum.clone()])
> }
>
> fn update( self, values: &[ScalarValue]) -> Result<()> {
> let values = [0];
>
> self.count += (!values.is_null()) as u64;
> self.sum = sum::sum(, values)?;
>
> Ok(())
> }
>
> fn update_batch( self, values: &[ArrayRef]) -> Result<()> {
> let values = [0];
>
> self.count += (values.len() - values.data().null_count()) as u64;
> self.sum = sum::sum(, ::sum_batch(values)?)?;
> Ok(())
>
> impl Accumulator for SumAccumulator {
> fn state() -> Result> {
> Ok(vec![self.sum.clone()])
> }
>
> fn update( self, values: &[ScalarValue]) -> Result<()> {
> // sum(v1, v2, v3) = v1 + v2 + v3
> self.sum = sum(, [0])?;
> Ok(())
> }
>
> fn update_batch( self, values: &[ArrayRef]) -> Result<()> {
> let values = [0];
> self.sum = sum(, _batch(values)?)?;
> Ok(())
> }
>
> Could someone shed some light in case I missed anything?
>
> Regards,
> Lin
>


Re: [C++] Interest Arrow <=> Protobuf conversion capability

2022-01-03 Thread Micah Kornfield
Hi Jacob,
I think translation between Arrow and Protobuf could be useful.  Given your
use-case I'd suggest considering a few things:

1.  If the end-goal is to work with Parquet then you might consider
building the layer directly on top of the Parquet low-level API instead of
involving Arrow.  For sparsely populated nested messages, you will avoid
unneeded construction and destruction (I think parquet-MR also has some
optimizations to avoid cache churn in these situations).  Keeping data in
row-form also adds the option of buffering and sorting efficiency, which
can greatly improve storage and metadata pruning efficiency.   Working
directly with the parquet API would also reduce the metadata you would have
to plumb through if you wanted to do schema resolution using protobuf tag
numbers (instead of field names).

2.  If the data you are working with isn't mutable (i.e. Append only), it
might be worth experimenting with code-gen that wraps Arrow Builder classes
with an idiomatic expressive API instead of adding an extra level of
indirection.

3.  If we do pursue the APIs above. I think APIs like this are likely more
naturally written in terms of RecordBatch or RecordBatchReader instead of
Tables.

4.  Defining the scope of protobuf handling will be important.   Protobuf
extensions [1] have some sharp edges to incorporate. How "oneof" fields are
handled would also be something to consider.

I'm also a bit curious to see if arrow allows faster deserialization when
> compared to a
> list of serialized protos on disk

It depends what you mean by this.  If you want to deserialize from Parquet
back to proto, I would be pretty surprised (but not completely) if going
through Arrow is more efficient, especially for deeply nested
"sparse" messages. The protobuf reflection implementation has a high
overhead of setting individual fields, and as mentioned above nested sparse
messages probably incur some level of tax that is avoided with serialized
protobufs.

[1] https://developers.google.com/protocol-buffers/docs/proto#extensions

Cheers,
Micah

On Mon, Jan 3, 2022 at 1:27 PM Jacob Huffman 
wrote:

> Hey all,
>
> Is there much interest in adding the capability to do Arrow <=> Protobuf
> conversion in C++?
>
> I'm working on this for a side project, but I was wondering if there is
> much interest from the broader Arrow community. If so, I might be able to
> find time to contribute it.
>
> To get the point across, here is a strawman API. In reality, we would
> likely need some sort of builder API which allows incrementally adding
> protos and a generator-like API for returning the protos from a table.
>
> """
> // Functions of functions using templates to work with any message type
> template 
> Result> ProtosToTable(const std::vector& protos);
>
> template 
> Result> TableToProtos(const std::shared_prt table);
>
> // Pair of functions using google::protobuf::Message and polymorphism to
> work with any message type
> Result> ProtosToTable( const
> std::vector& protos);
>
> // I don't like that this returns a vector of unique pointers. Is there a
> better way to return a vector of base classes while retaining polymorphic
> behavior?
> Result>>
> TableToProtos (const std::shared_prt table, const
> google::protobuf:Descriptor* descriptor);
> """
>
> My particular use case for these functions is that I would like to use
> protobufs for the in-memory data representation as it provides strongly
> typed classes which are very expressive (can have nested/repeated fields)
> and a well established path for schema evolution. However, I would like to
> use parquet as the data storage layer (and arrow as the glue between the
> two) so that I can take advantage of technologies like presto for querying
> the data. I'm hoping that backwards compatible changes to the proto schema
> turn into backwards compatible changes in the parquet files. I'm also a bit
> curious to see if arrow allows faster deserialization when compared to a
> list of serialized protos on disk.
>


[DataFusion] Question about Accumulator API and maybe potential bugs

2022-01-03 Thread LM
Hi All,

I just started looking into DataFusion and am considering using it as the
platform for our next gen analytics solution. To get started, I tried to
add a few functions such as stddev. While writing the code I noticed some
discrepancies (it may also be my unfamiliarity of the code base) in the
Accumulator API and the implementation of some functions. The API is
defined as the following:

pub trait Accumulator: Send + Sync + Debug {
/// Returns the state of the accumulator at the end of the accumulation.
// in the case of an average on which we track `sum` and `n`, this function
should return a vector
// of two values, sum and n.
fn state() -> Result>;

/// updates the accumulator's state from a vector of scalars.
fn update( self, values: &[ScalarValue]) -> Result<()>;

/// updates the accumulator's state from a vector of arrays.
fn update_batch( self, values: &[ArrayRef]) -> Result<()> {
if values.is_empty() {
return Ok(());
};
(0..values[0].len()).try_for_each(|index| {
let v = values
.iter()
.map(|array| ScalarValue::try_from_array(array, index))
.collect::>>()?;
self.update()
})
I am only quoting the update and update_batch functions for brevity, same
for the merge functions. So here it indicates that the update function
takes a *vector* and update_batch takes *vector of array. *

When reading code for some actual implementation for example *sum* and
*average,
*both implementations assume when update is called *only one *value is
passed in; and when update_batch is called *only one *array is passed in.

impl Accumulator for AvgAccumulator {
fn state() -> Result> {
Ok(vec![ScalarValue::from(self.count), self.sum.clone()])
}

fn update( self, values: &[ScalarValue]) -> Result<()> {
let values = [0];

self.count += (!values.is_null()) as u64;
self.sum = sum::sum(, values)?;

Ok(())
}

fn update_batch( self, values: &[ArrayRef]) -> Result<()> {
let values = [0];

self.count += (values.len() - values.data().null_count()) as u64;
self.sum = sum::sum(, ::sum_batch(values)?)?;
Ok(())

impl Accumulator for SumAccumulator {
fn state() -> Result> {
Ok(vec![self.sum.clone()])
}

fn update( self, values: &[ScalarValue]) -> Result<()> {
// sum(v1, v2, v3) = v1 + v2 + v3
self.sum = sum(, [0])?;
Ok(())
}

fn update_batch( self, values: &[ArrayRef]) -> Result<()> {
let values = [0];
self.sum = sum(, _batch(values)?)?;
Ok(())
}

Could someone shed some light in case I missed anything?

Regards,
Lin


[C++] Interest Arrow <=> Protobuf conversion capability

2022-01-03 Thread Jacob Huffman
Hey all,

Is there much interest in adding the capability to do Arrow <=> Protobuf
conversion in C++?

I'm working on this for a side project, but I was wondering if there is
much interest from the broader Arrow community. If so, I might be able to
find time to contribute it.

To get the point across, here is a strawman API. In reality, we would
likely need some sort of builder API which allows incrementally adding
protos and a generator-like API for returning the protos from a table.

"""
// Functions of functions using templates to work with any message type
template 
Result> ProtosToTable(const std::vector& protos);

template 
Result> TableToProtos(const std::shared_prt table);

// Pair of functions using google::protobuf::Message and polymorphism to
work with any message type
Result> ProtosToTable( const
std::vector& protos);

// I don't like that this returns a vector of unique pointers. Is there a
better way to return a vector of base classes while retaining polymorphic
behavior?
Result>>
TableToProtos (const std::shared_prt table, const
google::protobuf:Descriptor* descriptor);
"""

My particular use case for these functions is that I would like to use
protobufs for the in-memory data representation as it provides strongly
typed classes which are very expressive (can have nested/repeated fields)
and a well established path for schema evolution. However, I would like to
use parquet as the data storage layer (and arrow as the glue between the
two) so that I can take advantage of technologies like presto for querying
the data. I'm hoping that backwards compatible changes to the proto schema
turn into backwards compatible changes in the parquet files. I'm also a bit
curious to see if arrow allows faster deserialization when compared to a
list of serialized protos on disk.


Re: [DISCUSS] [RUST] More Frequent arrow-rs release schedule

2022-01-03 Thread Weston Pace
Are there parts of the library that are more stable?  For example, in
the C++ API the "parquet" namespace is stable and shouldn't encounter
much breaking change but the "compute" namespace is marked
"experimental" (in documentation) and much more liable for breaking
change.  If there were a way to mark particular parts of the API as
"experimental" then I think these parts could have breaking changes as
part of a minor release.  This obeys the semver rules because your
experimental APIs are not part of your "public API".

That being said, I'm just backseat driving at this point, so feel free
to discard the thought.  I don't know that the lockstep versioning
between Rust and C++ is buying us anything.  I think it is more
meaningful for something like R/C++ where the former is just bindings
upon the latter.  There is no guarantee that version X of C++ and
version X of Rust share any common features or compatibility.

-Weston

On Mon, Jan 3, 2022 at 2:23 AM Andrew Lamb  wrote:
>
> I think Micah and Adam have hit on the core issue -- there are inconsistent
> meanings ascribed to versions numbers "1.0.0" and "stable"
>
> Specifically, I think everyone agrees
> * "1.0.0"+ means "stable"
> * Some software clearly is unstable (APIs change all the time, couldn't be
> use by other software)
> * Some software is clearly stable (public API never changes)
>
> However, there are many packages that fall between the two extremes
> (arrow-rs and most of the rust ecosystem in my opinion): that change but
> are "stable enough" to be used in other projects.
>
> I don't really care about the value of the version. What I care about are
> 1. predictable regular releases and
> 2. allowing the APIs to evolve over time without causing undue stress on
> users (e.g that their CI jobs start failing without them changing anything)
>
> Andrew


Preparing for version 7.0.0 release

2022-01-03 Thread Alessandro Molina
The plan seems to be to cut a release the 2nd or 3rd week of January, a new
confluence page was made to track progress of the release (
https://cwiki.apache.org/confluence/display/ARROW/Arrow+7.0.0+Release ).

It would greatly help in the process of preparing for the release if you
could review tickets that are assigned to you in the "TODO Backlog" and
move those that you think you will not be able to close in ~1 week to
"Version 8.0.0" in Jira, so that we can start preparing release
announcements etc with a good estimate of what's actually going to end up
in the release.

Thanks everybody for the great work! Lot's of great things are coming in
7.0.0


Re: [DISCUSS] [RUST] More Frequent arrow-rs release schedule

2022-01-03 Thread Andrew Lamb
I think Micah and Adam have hit on the core issue -- there are inconsistent
meanings ascribed to versions numbers "1.0.0" and "stable"

Specifically, I think everyone agrees
* "1.0.0"+ means "stable"
* Some software clearly is unstable (APIs change all the time, couldn't be
use by other software)
* Some software is clearly stable (public API never changes)

However, there are many packages that fall between the two extremes
(arrow-rs and most of the rust ecosystem in my opinion): that change but
are "stable enough" to be used in other projects.

I don't really care about the value of the version. What I care about are
1. predictable regular releases and
2. allowing the APIs to evolve over time without causing undue stress on
users (e.g that their CI jobs start failing without them changing anything)

Andrew