Re: [DataFusion] Question about Accumulator API and maybe potential bugs
Hi, The accumulator API is designed to accept multiple columns (e.g. the pearson correlation takes 2 columns, not one). [0] corresponds to the first column passed to the accumulator. All concrete implementations of accumulators in DataFusion atm only accept one column (Sum, Avg, Count, Min, Max), but the API is designed to accept with multiple columns. So, update_batch( self, values: &[ArrayRef]) corresponds to: update the accumulator from n columns. For sum, this would be 1, for pearson correlation this would be 2, for e.g. a ML model whose weights are computed over all columns, this would be the number of input columns N of the model. For stddev, you should use 1, since stddev is a function of a single column. `update( self, values: &[ScalarValue])` corresponds to updating the state with intermediary states. In a HashAggregate, we reduce each partition, and use `update` to compute the final value from the intermediary (scalar) states. Hope this helps, Jorge On Tue, Jan 4, 2022 at 5:55 AM LM wrote: > Hi All, > > I just started looking into DataFusion and am considering using it as the > platform for our next gen analytics solution. To get started, I tried to > add a few functions such as stddev. While writing the code I noticed some > discrepancies (it may also be my unfamiliarity of the code base) in the > Accumulator API and the implementation of some functions. The API is > defined as the following: > > pub trait Accumulator: Send + Sync + Debug { > /// Returns the state of the accumulator at the end of the accumulation. > // in the case of an average on which we track `sum` and `n`, this function > should return a vector > // of two values, sum and n. > fn state() -> Result>; > > /// updates the accumulator's state from a vector of scalars. > fn update( self, values: &[ScalarValue]) -> Result<()>; > > /// updates the accumulator's state from a vector of arrays. > fn update_batch( self, values: &[ArrayRef]) -> Result<()> { > if values.is_empty() { > return Ok(()); > }; > (0..values[0].len()).try_for_each(|index| { > let v = values > .iter() > .map(|array| ScalarValue::try_from_array(array, index)) > .collect::>>()?; > self.update() > }) > I am only quoting the update and update_batch functions for brevity, same > for the merge functions. So here it indicates that the update function > takes a *vector* and update_batch takes *vector of array. * > > When reading code for some actual implementation for example *sum* and > *average, > *both implementations assume when update is called *only one *value is > passed in; and when update_batch is called *only one *array is passed in. > > impl Accumulator for AvgAccumulator { > fn state() -> Result> { > Ok(vec![ScalarValue::from(self.count), self.sum.clone()]) > } > > fn update( self, values: &[ScalarValue]) -> Result<()> { > let values = [0]; > > self.count += (!values.is_null()) as u64; > self.sum = sum::sum(, values)?; > > Ok(()) > } > > fn update_batch( self, values: &[ArrayRef]) -> Result<()> { > let values = [0]; > > self.count += (values.len() - values.data().null_count()) as u64; > self.sum = sum::sum(, ::sum_batch(values)?)?; > Ok(()) > > impl Accumulator for SumAccumulator { > fn state() -> Result> { > Ok(vec![self.sum.clone()]) > } > > fn update( self, values: &[ScalarValue]) -> Result<()> { > // sum(v1, v2, v3) = v1 + v2 + v3 > self.sum = sum(, [0])?; > Ok(()) > } > > fn update_batch( self, values: &[ArrayRef]) -> Result<()> { > let values = [0]; > self.sum = sum(, _batch(values)?)?; > Ok(()) > } > > Could someone shed some light in case I missed anything? > > Regards, > Lin >
Re: [C++] Interest Arrow <=> Protobuf conversion capability
Hi Jacob, I think translation between Arrow and Protobuf could be useful. Given your use-case I'd suggest considering a few things: 1. If the end-goal is to work with Parquet then you might consider building the layer directly on top of the Parquet low-level API instead of involving Arrow. For sparsely populated nested messages, you will avoid unneeded construction and destruction (I think parquet-MR also has some optimizations to avoid cache churn in these situations). Keeping data in row-form also adds the option of buffering and sorting efficiency, which can greatly improve storage and metadata pruning efficiency. Working directly with the parquet API would also reduce the metadata you would have to plumb through if you wanted to do schema resolution using protobuf tag numbers (instead of field names). 2. If the data you are working with isn't mutable (i.e. Append only), it might be worth experimenting with code-gen that wraps Arrow Builder classes with an idiomatic expressive API instead of adding an extra level of indirection. 3. If we do pursue the APIs above. I think APIs like this are likely more naturally written in terms of RecordBatch or RecordBatchReader instead of Tables. 4. Defining the scope of protobuf handling will be important. Protobuf extensions [1] have some sharp edges to incorporate. How "oneof" fields are handled would also be something to consider. I'm also a bit curious to see if arrow allows faster deserialization when > compared to a > list of serialized protos on disk It depends what you mean by this. If you want to deserialize from Parquet back to proto, I would be pretty surprised (but not completely) if going through Arrow is more efficient, especially for deeply nested "sparse" messages. The protobuf reflection implementation has a high overhead of setting individual fields, and as mentioned above nested sparse messages probably incur some level of tax that is avoided with serialized protobufs. [1] https://developers.google.com/protocol-buffers/docs/proto#extensions Cheers, Micah On Mon, Jan 3, 2022 at 1:27 PM Jacob Huffman wrote: > Hey all, > > Is there much interest in adding the capability to do Arrow <=> Protobuf > conversion in C++? > > I'm working on this for a side project, but I was wondering if there is > much interest from the broader Arrow community. If so, I might be able to > find time to contribute it. > > To get the point across, here is a strawman API. In reality, we would > likely need some sort of builder API which allows incrementally adding > protos and a generator-like API for returning the protos from a table. > > """ > // Functions of functions using templates to work with any message type > template > Result> ProtosToTable(const std::vector& protos); > > template > Result> TableToProtos(const std::shared_prt table); > > // Pair of functions using google::protobuf::Message and polymorphism to > work with any message type > Result> ProtosToTable( const > std::vector& protos); > > // I don't like that this returns a vector of unique pointers. Is there a > better way to return a vector of base classes while retaining polymorphic > behavior? > Result>> > TableToProtos (const std::shared_prt table, const > google::protobuf:Descriptor* descriptor); > """ > > My particular use case for these functions is that I would like to use > protobufs for the in-memory data representation as it provides strongly > typed classes which are very expressive (can have nested/repeated fields) > and a well established path for schema evolution. However, I would like to > use parquet as the data storage layer (and arrow as the glue between the > two) so that I can take advantage of technologies like presto for querying > the data. I'm hoping that backwards compatible changes to the proto schema > turn into backwards compatible changes in the parquet files. I'm also a bit > curious to see if arrow allows faster deserialization when compared to a > list of serialized protos on disk. >
[DataFusion] Question about Accumulator API and maybe potential bugs
Hi All, I just started looking into DataFusion and am considering using it as the platform for our next gen analytics solution. To get started, I tried to add a few functions such as stddev. While writing the code I noticed some discrepancies (it may also be my unfamiliarity of the code base) in the Accumulator API and the implementation of some functions. The API is defined as the following: pub trait Accumulator: Send + Sync + Debug { /// Returns the state of the accumulator at the end of the accumulation. // in the case of an average on which we track `sum` and `n`, this function should return a vector // of two values, sum and n. fn state() -> Result>; /// updates the accumulator's state from a vector of scalars. fn update( self, values: &[ScalarValue]) -> Result<()>; /// updates the accumulator's state from a vector of arrays. fn update_batch( self, values: &[ArrayRef]) -> Result<()> { if values.is_empty() { return Ok(()); }; (0..values[0].len()).try_for_each(|index| { let v = values .iter() .map(|array| ScalarValue::try_from_array(array, index)) .collect::>>()?; self.update() }) I am only quoting the update and update_batch functions for brevity, same for the merge functions. So here it indicates that the update function takes a *vector* and update_batch takes *vector of array. * When reading code for some actual implementation for example *sum* and *average, *both implementations assume when update is called *only one *value is passed in; and when update_batch is called *only one *array is passed in. impl Accumulator for AvgAccumulator { fn state() -> Result> { Ok(vec![ScalarValue::from(self.count), self.sum.clone()]) } fn update( self, values: &[ScalarValue]) -> Result<()> { let values = [0]; self.count += (!values.is_null()) as u64; self.sum = sum::sum(, values)?; Ok(()) } fn update_batch( self, values: &[ArrayRef]) -> Result<()> { let values = [0]; self.count += (values.len() - values.data().null_count()) as u64; self.sum = sum::sum(, ::sum_batch(values)?)?; Ok(()) impl Accumulator for SumAccumulator { fn state() -> Result> { Ok(vec![self.sum.clone()]) } fn update( self, values: &[ScalarValue]) -> Result<()> { // sum(v1, v2, v3) = v1 + v2 + v3 self.sum = sum(, [0])?; Ok(()) } fn update_batch( self, values: &[ArrayRef]) -> Result<()> { let values = [0]; self.sum = sum(, _batch(values)?)?; Ok(()) } Could someone shed some light in case I missed anything? Regards, Lin
[C++] Interest Arrow <=> Protobuf conversion capability
Hey all, Is there much interest in adding the capability to do Arrow <=> Protobuf conversion in C++? I'm working on this for a side project, but I was wondering if there is much interest from the broader Arrow community. If so, I might be able to find time to contribute it. To get the point across, here is a strawman API. In reality, we would likely need some sort of builder API which allows incrementally adding protos and a generator-like API for returning the protos from a table. """ // Functions of functions using templates to work with any message type template Result> ProtosToTable(const std::vector& protos); template Result> TableToProtos(const std::shared_prt table); // Pair of functions using google::protobuf::Message and polymorphism to work with any message type Result> ProtosToTable( const std::vector& protos); // I don't like that this returns a vector of unique pointers. Is there a better way to return a vector of base classes while retaining polymorphic behavior? Result>> TableToProtos (const std::shared_prt table, const google::protobuf:Descriptor* descriptor); """ My particular use case for these functions is that I would like to use protobufs for the in-memory data representation as it provides strongly typed classes which are very expressive (can have nested/repeated fields) and a well established path for schema evolution. However, I would like to use parquet as the data storage layer (and arrow as the glue between the two) so that I can take advantage of technologies like presto for querying the data. I'm hoping that backwards compatible changes to the proto schema turn into backwards compatible changes in the parquet files. I'm also a bit curious to see if arrow allows faster deserialization when compared to a list of serialized protos on disk.
Re: [DISCUSS] [RUST] More Frequent arrow-rs release schedule
Are there parts of the library that are more stable? For example, in the C++ API the "parquet" namespace is stable and shouldn't encounter much breaking change but the "compute" namespace is marked "experimental" (in documentation) and much more liable for breaking change. If there were a way to mark particular parts of the API as "experimental" then I think these parts could have breaking changes as part of a minor release. This obeys the semver rules because your experimental APIs are not part of your "public API". That being said, I'm just backseat driving at this point, so feel free to discard the thought. I don't know that the lockstep versioning between Rust and C++ is buying us anything. I think it is more meaningful for something like R/C++ where the former is just bindings upon the latter. There is no guarantee that version X of C++ and version X of Rust share any common features or compatibility. -Weston On Mon, Jan 3, 2022 at 2:23 AM Andrew Lamb wrote: > > I think Micah and Adam have hit on the core issue -- there are inconsistent > meanings ascribed to versions numbers "1.0.0" and "stable" > > Specifically, I think everyone agrees > * "1.0.0"+ means "stable" > * Some software clearly is unstable (APIs change all the time, couldn't be > use by other software) > * Some software is clearly stable (public API never changes) > > However, there are many packages that fall between the two extremes > (arrow-rs and most of the rust ecosystem in my opinion): that change but > are "stable enough" to be used in other projects. > > I don't really care about the value of the version. What I care about are > 1. predictable regular releases and > 2. allowing the APIs to evolve over time without causing undue stress on > users (e.g that their CI jobs start failing without them changing anything) > > Andrew
Preparing for version 7.0.0 release
The plan seems to be to cut a release the 2nd or 3rd week of January, a new confluence page was made to track progress of the release ( https://cwiki.apache.org/confluence/display/ARROW/Arrow+7.0.0+Release ). It would greatly help in the process of preparing for the release if you could review tickets that are assigned to you in the "TODO Backlog" and move those that you think you will not be able to close in ~1 week to "Version 8.0.0" in Jira, so that we can start preparing release announcements etc with a good estimate of what's actually going to end up in the release. Thanks everybody for the great work! Lot's of great things are coming in 7.0.0
Re: [DISCUSS] [RUST] More Frequent arrow-rs release schedule
I think Micah and Adam have hit on the core issue -- there are inconsistent meanings ascribed to versions numbers "1.0.0" and "stable" Specifically, I think everyone agrees * "1.0.0"+ means "stable" * Some software clearly is unstable (APIs change all the time, couldn't be use by other software) * Some software is clearly stable (public API never changes) However, there are many packages that fall between the two extremes (arrow-rs and most of the rust ecosystem in my opinion): that change but are "stable enough" to be used in other projects. I don't really care about the value of the version. What I care about are 1. predictable regular releases and 2. allowing the APIs to evolve over time without causing undue stress on users (e.g that their CI jobs start failing without them changing anything) Andrew