mustafasrepo commented on code in PR #6932:
URL: https://github.com/apache/arrow-datafusion/pull/6932#discussion_r1265285395


##########
datafusion/core/src/physical_plan/aggregates/order/partial.rs:
##########
@@ -0,0 +1,266 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::physical_expr::EmitTo;
+use arrow::row::{OwnedRow, RowConverter, Rows, SortField};
+use arrow_array::ArrayRef;
+use arrow_schema::Schema;
+use datafusion_common::Result;
+use datafusion_execution::memory_pool::proxy::VecAllocExt;
+use datafusion_physical_expr::PhysicalSortExpr;
+
+/// Tracks grouping state when the data is ordered by some subset of
+/// the group keys.
+///
+/// Once the next *sort key* value is seen, never see groups with that
+/// sort key again, so we can emit all groups wtih the previous sort
+/// key and earlier.
+///
+/// For example, given `SUM(amt) GROUP BY id, state` if the input is
+/// sorted by `state, when a new value of `state` is seen, all groups
+/// with prior values of `state` can be emitted.
+///
+/// The state is tracked like this:
+///
+/// ```text
+///                                            ┏━━━━━━━━━━━━━━━━━┓ ┏━━━━━━━┓
+///     ┌─────┐    ┌───────────────────┐ ┌─────┃        9        ┃ ┃ "MD"  ┃
+///     │┌───┐│    │ ┌──────────────┐  │ │     ┗━━━━━━━━━━━━━━━━━┛ ┗━━━━━━━┛
+///     ││ 0 ││    │ │  123, "MA"   │  │ │        current_sort      sort_key
+///     │└───┘│    │ └──────────────┘  │ │
+///     │ ... │    │    ...            │ │      current_sort tracks the most
+///     │┌───┐│    │ ┌──────────────┐  │ │      recent group index that had
+///     ││12 ││    │ │  765, "MA"   │  │ │      the same sort_key as current
+///     │├───┤│    │ ├──────────────┤  │ │
+///     ││12 ││    │ │  923, "MD"   │◀─┼─┘
+///     │├───┤│    │ ├──────────────┤  │        ┏━━━━━━━━━━━━━━┓
+///     ││13 ││    │ │  345, "MD"   │◀─┼────────┃      12      ┃
+///     │└───┘│    │ └──────────────┘  │        ┗━━━━━━━━━━━━━━┛
+///     └─────┘    └───────────────────┘            current
+///  group indices
+/// (in group value  group_values               current tracks the most
+///      order)                                    recent group index
+///```
+#[derive(Debug)]
+pub(crate) struct GroupOrderingPartial {
+    /// State machine
+    state: State,
+
+    /// The indexes of the group by columns that form the sort key.
+    /// For example if grouping by `id, state` and ordered by `state`
+    /// this would be `[1]`.
+    order_indices: Vec<usize>,
+
+    /// Converter for the sort key (used on the group columns
+    /// specified in `order_indexes`)
+    row_converter: RowConverter,
+
+    /// Hash values for groups in 0..completed
+    hashes: Vec<u64>,
+}
+
+#[derive(Debug, Default)]
+enum State {
+    /// The ordering was temporarily taken.  `Self::Taken` is left
+    /// when state must be temporarily taken to satisfy the borrow
+    /// checker. If an error happens before the state can be restored,
+    /// the ordering information is lost and execution can not
+    /// proceed, but there is no undefined behavior.
+    #[default]
+    Taken,
+
+    /// Seen no input yet
+    Start,
+
+    /// Data is in progress.
+    InProgress {
+        /// first group index with the sort_key
+        current_sort: usize,
+        /// The sort key of group_index `current_sort`
+        sort_key: OwnedRow,
+        /// index of the current group for which values are being
+        /// generated
+        current: usize,
+    },
+
+    /// Seen end of input, all groups can be emitted
+    Complete,
+}
+
+impl GroupOrderingPartial {
+    pub fn try_new(
+        input_schema: &Schema,
+        order_indices: &[usize],
+        ordering: &[PhysicalSortExpr],
+    ) -> Result<Self> {
+        assert!(!order_indices.is_empty());
+        assert_eq!(order_indices.len(), ordering.len());
+
+        let fields = ordering
+            .iter()
+            .map(|sort_expr| {
+                Ok(SortField::new_with_options(
+                    sort_expr.expr.data_type(input_schema)?,
+                    sort_expr.options,
+                ))
+            })
+            .collect::<Result<Vec<_>>>()?;
+
+        Ok(Self {
+            state: State::Start,
+            order_indices: order_indices.to_vec(),
+            row_converter: RowConverter::new(fields)?,
+            hashes: vec![],
+        })
+    }
+
+    /// Creates sort keys from the group values
+    ///
+    /// For example, if group_values had `A, B, C` but the input was
+    /// only sorted on `B` and `C` this sould return rows for (`B`,

Review Comment:
   ```suggestion
       /// only sorted on `B` and `C` this should return rows for (`B`,
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to