drusso commented on pull request #8222:
URL: https://github.com/apache/arrow/pull/8222#issuecomment-699164097


   @andygrove (cc @jorgecarleitao):
   
   My apologies, I don't have the changes ready yet. Though I did have some 
time today to look into integrating this with #8172/master. 
   
   I'm still digesting the upstream changes, and I see accumulators now have 
partial "state" that they emit and ingest across stages. This is great, and I 
think it would work well for a distinct count accumulator implementation.
   
   However, I think there's a small roadblock with porting the current 
implementation.
   
   I believe the approach would be to implement `state_fields()` to return one 
field for each input field. Each state field is a `LargeList`, and each list's 
subtype mirrors the type from the corresponding input field. Then, `state()` 
returns, for each field, the distinct values.
   
   As an example, given a table:
   
   ```
   | c1 | c2 | c3 |
   |----|----|----|
   | a  |  1 |  5 |
   | a  |  1 |  5 |
   | a  |  2 |  5 |
   | a  |  1 |  6 |
   | a  |  1 |  5 |
   | b  |  1 |  8 |
   ```
   
   And given a query:
   
   ```
   SELECT c1, COUNT(DISTINCT c2, c2) as d FROM t1 GROUP BY c1
   ```
   
   Then the set of output `Array`from the first `HashAggregateExec` would be:
   
   ```
   c1   = [ a         b ]
   d_c1 = [ [ 1 2 1 ] [ 1 ] ]
   d_c2 = [ [ 5 5 6 ] [ 8 ] ]
   ```
   
   Assuming this is all looking correct so far, then the issue is `state()` is 
limited to returning a single scalar per field. However, we would need to allow 
for a list of scalars per field.
   
   There are a number of paths forward. Brainstorming some solutions:
   
   * What if the responsibility of `Array` building is moved away from 
`HashAggregateExec` (currently  
[here](https://github.com/apache/arrow/blob/97ade8115ab52d8f591da04ea46a283f2c377ab1/rust/datafusion/src/physical_plan/hash_aggregate.rs#L596))
 to somewhere more specific to an aggregate and its accumulator. Then each 
aggrgate has the flexibility to create its own arrays as needed. Maybe 
`finalize_aggregation()` would supply the accumulators, and recieve back 
`ArrayRef` (one per field)?
   
   * A new enum to distinguish between single and multiple values. Then 
`state()`'s return is `Result<Vec<Value>>`, assuming `Value` is defined as:
   
   ```rust
   enum Value {
       Single(ScalarValue),
       Multi(Vec<ScalarValue>),
   }
   ```
   
   Let me know what you think and how we can proceed. Thanks!
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to