avantgardnerio commented on code in PR #7192: URL: https://github.com/apache/arrow-datafusion/pull/7192#discussion_r1305966661
########## datafusion/core/src/physical_plan/aggregates/priority_queue.rs: ########## @@ -0,0 +1,235 @@ +use uuid::Uuid; +use std::collections::{BTreeMap, HashMap}; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; +use arrow::row::{OwnedRow, RowConverter, SortField}; +use arrow::util::pretty::print_batches; +use arrow_array::{RecordBatch}; +use arrow_schema::{DataType, SchemaRef, SortOptions}; +use futures::stream::{Stream, StreamExt}; +use hashbrown::HashSet; +use datafusion_common::DataFusionError; +use datafusion_execution::TaskContext; +use datafusion_physical_expr::{PhysicalExpr}; +use crate::physical_plan::aggregates::{aggregate_expressions, AggregateExec, evaluate_group_by, evaluate_many, group_schema, PhysicalGroupBy}; +use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream}; +use datafusion_common::Result; +use datafusion_physical_expr::expressions::{Max, Min}; + +pub(crate) struct GroupedPriorityQueueAggregateStream { + schema: SchemaRef, + input: SendableRecordBatchStream, + aggregate_arguments: Vec<Vec<Arc<dyn PhysicalExpr>>>, + group_by: PhysicalGroupBy, + group_converter: RowConverter, + value_converter: RowConverter, // TODO: use accumulators + group_to_val: HashMap<OwnedRow, OwnedRow>, // TODO: BTreeMap->BinaryHeap, OwnedRow->Rows + val_to_group: BTreeMap<OwnedRow, HashSet<OwnedRow>>, Review Comment: I swapped out the `BTreeSet` for binary heap because that allowed me to decouple the types and support all permutations of `<K,V>`, while keeping approximately the same level of performance. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org