This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 73038f50dd chore: Format examples in doc strings - physical expr, 
optimizer, and plan (#18357)
73038f50dd is described below

commit 73038f50dd7e1086a36589b4a5ee1e8db18b96f3
Author: Yu-Chuan Hung <[email protected]>
AuthorDate: Sat Nov 1 20:44:55 2025 +0800

    chore: Format examples in doc strings - physical expr, optimizer, and plan 
(#18357)
    
    ## Which issue does this PR close?
    Part of #16915
    
    ## Rationale for this change
    Format code examples in documentation comments to improve readability
    and maintain consistent code style across the codebase. This is part of
    a multi-PR effort to format all doc comment examples and eventually
    enable CI checks to enforce this formatting.
    
    ## What changes are included in this PR?
    Run `cargo fmt -p <crate> -- --config format_code_in_doc_comments=true`
    for the following datasource-related crates:
      - `datafusion-physical-expr`
      - `datafusion-physical-expr-adapter`
      - `datafusion-physical-expr-common`
      - `datafusion-physical-optimizer`
      - `datafusion-physical-plan`
    
    Additionally, add some spaces to maintain the ASCII art indentation.
    
    ## Are these changes tested?
    No testing needed - this is purely a formatting change with no
    functional modifications.
    
    ## Are there any user-facing changes?
    No - this only affects documentation formatting.
---
 .../physical-expr-common/src/physical_expr.rs      |  1 -
 .../src/equivalence/properties/mod.rs              |  9 ++++---
 datafusion/physical-expr/src/expressions/case.rs   |  1 -
 datafusion/physical-expr/src/expressions/column.rs |  6 ++---
 .../physical-expr/src/intervals/cp_solver.rs       |  8 ++-----
 datafusion/physical-expr/src/physical_expr.rs      | 16 ++++++++-----
 datafusion/physical-expr/src/projection.rs         | 10 ++++----
 .../src/combine_partial_final_agg.rs               |  1 -
 .../physical-optimizer/src/enforce_distribution.rs |  1 -
 .../physical-optimizer/src/join_selection.rs       |  1 -
 .../src/aggregates/group_values/mod.rs             |  1 -
 .../aggregates/group_values/multi_group_by/mod.rs  | 11 ---------
 .../group_values/single_group_by/primitive.rs      |  1 -
 .../physical-plan/src/aggregates/order/partial.rs  |  2 +-
 .../physical-plan/src/aggregates/row_hash.rs       |  1 -
 datafusion/physical-plan/src/execution_plan.rs     | 21 +++++++++++-----
 .../physical-plan/src/joins/hash_join/exec.rs      |  2 --
 .../physical-plan/src/joins/hash_join/stream.rs    |  1 -
 .../src/joins/piecewise_merge_join/exec.rs         |  1 -
 .../physical-plan/src/joins/stream_join_utils.rs   |  1 -
 .../physical-plan/src/joins/symmetric_hash_join.rs |  1 -
 datafusion/physical-plan/src/joins/utils.rs        |  1 -
 datafusion/physical-plan/src/metrics/builder.rs    | 19 +++++++--------
 datafusion/physical-plan/src/metrics/custom.rs     |  3 ++-
 datafusion/physical-plan/src/metrics/mod.rs        | 25 ++++++++++---------
 datafusion/physical-plan/src/projection.rs         | 28 ++++++++++++----------
 datafusion/physical-plan/src/recursive_query.rs    |  1 -
 datafusion/physical-plan/src/repartition/mod.rs    | 11 ++++-----
 datafusion/physical-plan/src/sorts/merge.rs        |  1 -
 .../physical-plan/src/sorts/multi_level_merge.rs   |  1 -
 datafusion/physical-plan/src/sorts/partial_sort.rs |  4 ++--
 datafusion/physical-plan/src/sorts/sort.rs         |  1 -
 datafusion/physical-plan/src/stream.rs             | 15 ++++++++----
 datafusion/physical-plan/src/test/exec.rs          |  1 -
 datafusion/physical-plan/src/union.rs              | 16 ++++++-------
 datafusion/physical-plan/src/unnest.rs             |  4 ----
 36 files changed, 106 insertions(+), 122 deletions(-)

diff --git a/datafusion/physical-expr-common/src/physical_expr.rs 
b/datafusion/physical-expr-common/src/physical_expr.rs
index e5e7d6c00f..492383663d 100644
--- a/datafusion/physical-expr-common/src/physical_expr.rs
+++ b/datafusion/physical-expr-common/src/physical_expr.rs
@@ -341,7 +341,6 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug 
+ DynEq + DynHash {
     /// representation.
     ///
     /// See the [`fmt_sql`] function for an example of printing 
`PhysicalExpr`s as SQL.
-    ///
     fn fmt_sql(&self, f: &mut Formatter<'_>) -> fmt::Result;
 
     /// Take a snapshot of this `PhysicalExpr`, if it is dynamic.
diff --git a/datafusion/physical-expr/src/equivalence/properties/mod.rs 
b/datafusion/physical-expr/src/equivalence/properties/mod.rs
index 2404b8f0dd..4d919d623b 100644
--- a/datafusion/physical-expr/src/equivalence/properties/mod.rs
+++ b/datafusion/physical-expr/src/equivalence/properties/mod.rs
@@ -123,11 +123,14 @@ use itertools::Itertools;
 /// let mut eq_properties = EquivalenceProperties::new(schema);
 /// eq_properties.add_constants(vec![ConstExpr::from(col_b)]);
 /// eq_properties.add_ordering([
-///   PhysicalSortExpr::new_default(col_a).asc(),
-///   PhysicalSortExpr::new_default(col_c).desc(),
+///     PhysicalSortExpr::new_default(col_a).asc(),
+///     PhysicalSortExpr::new_default(col_c).desc(),
 /// ]);
 ///
-/// assert_eq!(eq_properties.to_string(), "order: [[a@0 ASC, c@2 DESC]], eq: 
[{members: [b@1], constant: (heterogeneous)}]");
+/// assert_eq!(
+///     eq_properties.to_string(),
+///     "order: [[a@0 ASC, c@2 DESC]], eq: [{members: [b@1], constant: 
(heterogeneous)}]"
+/// );
 /// ```
 #[derive(Clone, Debug)]
 pub struct EquivalenceProperties {
diff --git a/datafusion/physical-expr/src/expressions/case.rs 
b/datafusion/physical-expr/src/expressions/case.rs
index 191f5ba529..9ffb571a26 100644
--- a/datafusion/physical-expr/src/expressions/case.rs
+++ b/datafusion/physical-expr/src/expressions/case.rs
@@ -341,7 +341,6 @@ fn filter_array(
 /// │└─────────┘│  │    2    │                             │    D    │
 /// └───────────┘  └─────────┘                             └─────────┘
 ///    values        indices                                  result
-///
 /// ```
 fn merge(values: &[ArrayData], indices: &[PartialResultIndex]) -> 
Result<ArrayRef> {
     #[cfg(debug_assertions)]
diff --git a/datafusion/physical-expr/src/expressions/column.rs 
b/datafusion/physical-expr/src/expressions/column.rs
index c9f3fb00f0..9ca464b304 100644
--- a/datafusion/physical-expr/src/expressions/column.rs
+++ b/datafusion/physical-expr/src/expressions/column.rs
@@ -49,9 +49,9 @@ use datafusion_expr::ColumnarValue;
 /// # use arrow::datatypes::{DataType, Field, Schema};
 /// // Schema with columns a, b, c
 /// let schema = Schema::new(vec![
-///    Field::new("a", DataType::Int32, false),
-///    Field::new("b", DataType::Int32, false),
-///    Field::new("c", DataType::Int32, false),
+///     Field::new("a", DataType::Int32, false),
+///     Field::new("b", DataType::Int32, false),
+///     Field::new("c", DataType::Int32, false),
 /// ]);
 ///
 /// // reference to column b is index 1
diff --git a/datafusion/physical-expr/src/intervals/cp_solver.rs 
b/datafusion/physical-expr/src/intervals/cp_solver.rs
index c44197bbbe..be0e5e1fa6 100644
--- a/datafusion/physical-expr/src/intervals/cp_solver.rs
+++ b/datafusion/physical-expr/src/intervals/cp_solver.rs
@@ -579,15 +579,11 @@ impl ExprIntervalGraph {
     ///
     /// let mut graph = ExprIntervalGraph::try_new(expr, &schema).unwrap();
     /// // Do it once, while constructing.
-    /// let node_indices = graph
-    ///     .gather_node_indices(&[Arc::new(Column::new("gnz", 0))]);
+    /// let node_indices = 
graph.gather_node_indices(&[Arc::new(Column::new("gnz", 0))]);
     /// let left_index = node_indices.get(0).unwrap().1;
     ///
     /// // Provide intervals for leaf variables (here, there is only one).
-    /// let intervals = vec![(
-    ///     left_index,
-    ///     Interval::make(Some(10), Some(20)).unwrap(),
-    /// )];
+    /// let intervals = vec![(left_index, Interval::make(Some(10), 
Some(20)).unwrap())];
     ///
     /// // Evaluate bounds for the composite expression:
     /// graph.assign_intervals(&intervals);
diff --git a/datafusion/physical-expr/src/physical_expr.rs 
b/datafusion/physical-expr/src/physical_expr.rs
index 2cc484ec6a..c658a8eddc 100644
--- a/datafusion/physical-expr/src/physical_expr.rs
+++ b/datafusion/physical-expr/src/physical_expr.rs
@@ -118,12 +118,16 @@ pub fn physical_exprs_bag_equal(
 /// ]);
 ///
 /// let sort_exprs = vec![
-///     vec![
-///         SortExpr { expr: Expr::Column(Column::new(Some("t"), "id")), asc: 
true, nulls_first: false }
-///     ],
-///     vec![
-///         SortExpr { expr: Expr::Column(Column::new(Some("t"), "name")), 
asc: false, nulls_first: true }
-///     ]
+///     vec![SortExpr {
+///         expr: Expr::Column(Column::new(Some("t"), "id")),
+///         asc: true,
+///         nulls_first: false,
+///     }],
+///     vec![SortExpr {
+///         expr: Expr::Column(Column::new(Some("t"), "name")),
+///         asc: false,
+///         nulls_first: true,
+///     }],
 /// ];
 /// let result = create_ordering(&schema, &sort_exprs).unwrap();
 /// ```
diff --git a/datafusion/physical-expr/src/projection.rs 
b/datafusion/physical-expr/src/projection.rs
index fc972d644e..c707d3ccff 100644
--- a/datafusion/physical-expr/src/projection.rs
+++ b/datafusion/physical-expr/src/projection.rs
@@ -166,9 +166,9 @@ impl ProjectionExprs {
     /// # Example
     ///
     /// ```rust
-    /// use std::sync::Arc;
-    /// use arrow::datatypes::{Schema, Field, DataType};
+    /// use arrow::datatypes::{DataType, Field, Schema};
     /// use datafusion_physical_expr::projection::ProjectionExprs;
+    /// use std::sync::Arc;
     ///
     /// // Create a schema with three columns
     /// let schema = Arc::new(Schema::new(vec![
@@ -234,11 +234,11 @@ impl ProjectionExprs {
     /// # Example
     ///
     /// ```rust
-    /// use std::sync::Arc;
-    /// use datafusion_physical_expr::projection::{ProjectionExprs, 
ProjectionExpr};
-    /// use datafusion_physical_expr::expressions::{Column, BinaryExpr, 
Literal};
     /// use datafusion_common::{Result, ScalarValue};
     /// use datafusion_expr::Operator;
+    /// use datafusion_physical_expr::expressions::{BinaryExpr, Column, 
Literal};
+    /// use datafusion_physical_expr::projection::{ProjectionExpr, 
ProjectionExprs};
+    /// use std::sync::Arc;
     ///
     /// fn main() -> Result<()> {
     ///     // Example from the docstring:
diff --git a/datafusion/physical-optimizer/src/combine_partial_final_agg.rs 
b/datafusion/physical-optimizer/src/combine_partial_final_agg.rs
index 86f7e73e9e..bffb2c9df9 100644
--- a/datafusion/physical-optimizer/src/combine_partial_final_agg.rs
+++ b/datafusion/physical-optimizer/src/combine_partial_final_agg.rs
@@ -36,7 +36,6 @@ use datafusion_physical_expr::{physical_exprs_equal, 
PhysicalExpr};
 /// into a Single AggregateExec if their grouping exprs and aggregate exprs 
equal.
 ///
 /// This rule should be applied after the EnforceDistribution and 
EnforceSorting rules
-///
 #[derive(Default, Debug)]
 pub struct CombinePartialFinalAggregate {}
 
diff --git a/datafusion/physical-optimizer/src/enforce_distribution.rs 
b/datafusion/physical-optimizer/src/enforce_distribution.rs
index 898386e2f9..e9e28fec06 100644
--- a/datafusion/physical-optimizer/src/enforce_distribution.rs
+++ b/datafusion/physical-optimizer/src/enforce_distribution.rs
@@ -281,7 +281,6 @@ pub type PlanWithKeyRequirements = PlanContext<Vec<Arc<dyn 
PhysicalExpr>>>;
 /// 3) If the current plan is RepartitionExec, CoalescePartitionsExec or 
WindowAggExec, clear all the requirements, return the unchanged plan
 /// 4) If the current plan is Projection, transform the requirements to the 
columns before the Projection and push down requirements
 /// 5) For other types of operators, by default, pushdown the parent 
requirements to children.
-///
 pub fn adjust_input_keys_ordering(
     mut requirements: PlanWithKeyRequirements,
 ) -> Result<Transformed<PlanWithKeyRequirements>> {
diff --git a/datafusion/physical-optimizer/src/join_selection.rs 
b/datafusion/physical-optimizer/src/join_selection.rs
index 1db4d7b305..b55c01f62e 100644
--- a/datafusion/physical-optimizer/src/join_selection.rs
+++ b/datafusion/physical-optimizer/src/join_selection.rs
@@ -476,7 +476,6 @@ fn hash_join_convert_symmetric_subrule(
 ///           | Data Source  |--------------| Repartition  |
 ///           |              |              |              |
 ///           +--------------+              +--------------+
-///
 /// ```
 pub fn hash_join_swap_subrule(
     mut input: Arc<dyn ExecutionPlan>,
diff --git a/datafusion/physical-plan/src/aggregates/group_values/mod.rs 
b/datafusion/physical-plan/src/aggregates/group_values/mod.rs
index 5f2a2faa11..4bd7f03506 100644
--- a/datafusion/physical-plan/src/aggregates/group_values/mod.rs
+++ b/datafusion/physical-plan/src/aggregates/group_values/mod.rs
@@ -131,7 +131,6 @@ pub trait GroupValues: Send {
 /// `GroupColumn`:  
crate::aggregates::group_values::multi_group_by::GroupColumn
 /// `GroupValuesColumn`: 
crate::aggregates::group_values::multi_group_by::GroupValuesColumn
 /// `GroupValuesRows`: crate::aggregates::group_values::row::GroupValuesRows
-///
 pub fn new_group_values(
     schema: SchemaRef,
     group_ordering: &GroupOrdering,
diff --git 
a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs 
b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs
index 58bd35d640..9adf028eca 100644
--- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs
+++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs
@@ -77,7 +77,6 @@ pub trait GroupColumn: Send + Sync {
     ///
     /// And if found nth result in `equal_to_results` is already
     /// `false`, the check for nth row will be skipped.
-    ///
     fn vectorized_equal_to(
         &self,
         lhs_rows: &[usize],
@@ -137,7 +136,6 @@ pub fn nulls_equal_to(lhs_null: bool, rhs_null: bool) -> 
Option<bool> {
 ///   +---------------------+---------------------------------------------+
 ///
 /// `inlined flag`: 1 represents `non-inlined`, and 0 represents `inlined`
-///
 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
 struct GroupIndexView(u64);
 
@@ -166,7 +164,6 @@ impl GroupIndexView {
 
 /// A [`GroupValues`] that stores multiple columns of group values,
 /// and supports vectorized operators for them
-///
 pub struct GroupValuesColumn<const STREAMING: bool> {
     /// The output schema
     schema: SchemaRef,
@@ -184,7 +181,6 @@ pub struct GroupValuesColumn<const STREAMING: bool> {
     /// instead we store the `group indices` pointing to values in 
`GroupValues`.
     /// And we use [`GroupIndexView`] to represent such `group indices` in 
table.
     ///
-    ///
     map: HashTable<(u64, GroupIndexView)>,
 
     /// The size of `map` in bytes
@@ -197,7 +193,6 @@ pub struct GroupValuesColumn<const STREAMING: bool> {
     ///
     /// The chained indices is like:
     ///   `latest group index -> older group index -> even older group index 
-> ...`
-    ///
     group_index_lists: Vec<Vec<usize>>,
 
     /// When emitting first n, we need to decrease/erase group indices in
@@ -323,7 +318,6 @@ impl<const STREAMING: bool> GroupValuesColumn<STREAMING> {
     ///
     /// `Group indices` order are against with their input order, and this 
will lead to error
     /// in `streaming aggregation`.
-    ///
     fn scalarized_intern(
         &mut self,
         cols: &[ArrayRef],
@@ -425,7 +419,6 @@ impl<const STREAMING: bool> GroupValuesColumn<STREAMING> {
     ///
     /// The vectorized approach can offer higher performance for avoiding row 
by row
     /// downcast for `cols` and being able to implement even more 
optimizations(like simd).
-    ///
     fn vectorized_intern(
         &mut self,
         cols: &[ArrayRef],
@@ -493,7 +486,6 @@ impl<const STREAMING: bool> GroupValuesColumn<STREAMING> {
     ///   - Check if the `group index view` is `inlined` or `non_inlined`:
     ///     If it is inlined, add to `vectorized_equal_to_group_indices` 
directly.
     ///     Otherwise get all group indices from `group_index_lists`, and add 
them.
-    ///
     fn collect_vectorized_process_context(
         &mut self,
         batch_hashes: &[u64],
@@ -721,7 +713,6 @@ impl<const STREAMING: bool> GroupValuesColumn<STREAMING> {
     /// The hash collision may be not frequent, so the fallback will indeed 
hardly happen.
     /// In most situations, `scalarized_indices` will found to be empty after 
finishing to
     /// preform `vectorized_equal_to`.
-    ///
     fn scalarized_intern_remaining(
         &mut self,
         cols: &[ArrayRef],
@@ -886,7 +877,6 @@ impl<const STREAMING: bool> GroupValuesColumn<STREAMING> {
 /// `$v`: the vector to push the new builder into
 /// `$nullable`: whether the input can contains nulls
 /// `$t`: the primitive type of the builder
-///
 macro_rules! instantiate_primitive {
     ($v:expr, $nullable:expr, $t:ty, $data_type:ident) => {
         if $nullable {
@@ -1468,7 +1458,6 @@ mod tests {
     ///   - Group not exist + bucket not found in `map`
     ///   - Group not exist + not equal to inlined group view(tested in hash 
collision)
     ///   - Group not exist + not equal to non-inlined group view(tested in 
hash collision)
-    ///
     struct VectorizedTestDataSet {
         test_batches: Vec<Vec<ArrayRef>>,
         expected_batch: RecordBatch,
diff --git 
a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs
 
b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs
index 8b1905e540..f35c580b0e 100644
--- 
a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs
+++ 
b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs
@@ -87,7 +87,6 @@ pub struct GroupValuesPrimitive<T: ArrowPrimitiveType> {
     /// is obvious in high cardinality group by situation.
     /// More details can see:
     /// <https://github.com/apache/datafusion/issues/15961>
-    ///
     map: HashTable<(usize, u64)>,
     /// The group index of the null value if any
     null_group: Option<usize>,
diff --git a/datafusion/physical-plan/src/aggregates/order/partial.rs 
b/datafusion/physical-plan/src/aggregates/order/partial.rs
index 3e495900f7..476551a7ca 100644
--- a/datafusion/physical-plan/src/aggregates/order/partial.rs
+++ b/datafusion/physical-plan/src/aggregates/order/partial.rs
@@ -61,7 +61,7 @@ use datafusion_expr::EmitTo;
 ///  group indices
 /// (in group value  group_values               current tracks the most
 ///      order)                                    recent group index
-///```
+/// ```
 #[derive(Debug)]
 pub struct GroupOrderingPartial {
     /// State machine
diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs 
b/datafusion/physical-plan/src/aggregates/row_hash.rs
index 98c8cb235c..e8d842cc85 100644
--- a/datafusion/physical-plan/src/aggregates/row_hash.rs
+++ b/datafusion/physical-plan/src/aggregates/row_hash.rs
@@ -298,7 +298,6 @@ impl SkipAggregationProbe {
 /// later stream-merge sort on reading back the spilled data does re-grouping. 
Note the rows cannot
 /// be grouped once spilled onto disk, the read back data needs to be 
re-grouped again. In addition,
 /// re-grouping may cause out of memory again. Thus, re-grouping has to be a 
sort based aggregation.
-///
 /// ```text
 /// Partial Aggregation [batch_size = 2] (max memory = 3 rows)
 ///
diff --git a/datafusion/physical-plan/src/execution_plan.rs 
b/datafusion/physical-plan/src/execution_plan.rs
index a70cd9cb0d..00fbdde533 100644
--- a/datafusion/physical-plan/src/execution_plan.rs
+++ b/datafusion/physical-plan/src/execution_plan.rs
@@ -354,12 +354,15 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
     ///     fn execute(
     ///         &self,
     ///         partition: usize,
-    ///         context: Arc<TaskContext>
+    ///         context: Arc<TaskContext>,
     ///     ) -> Result<SendableRecordBatchStream> {
     ///         // use functions from futures crate convert the batch into a 
stream
     ///         let fut = futures::future::ready(Ok(self.batch.clone()));
     ///         let stream = futures::stream::once(fut);
-    ///         Ok(Box::pin(RecordBatchStreamAdapter::new(self.batch.schema(), 
stream)))
+    ///         Ok(Box::pin(RecordBatchStreamAdapter::new(
+    ///             self.batch.schema(),
+    ///             stream,
+    ///         )))
     ///     }
     /// }
     /// ```
@@ -389,11 +392,14 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
     ///     fn execute(
     ///         &self,
     ///         partition: usize,
-    ///         context: Arc<TaskContext>
+    ///         context: Arc<TaskContext>,
     ///     ) -> Result<SendableRecordBatchStream> {
     ///         let fut = get_batch();
     ///         let stream = futures::stream::once(fut);
-    ///         Ok(Box::pin(RecordBatchStreamAdapter::new(self.schema.clone(), 
stream)))
+    ///         Ok(Box::pin(RecordBatchStreamAdapter::new(
+    ///             self.schema.clone(),
+    ///             stream,
+    ///         )))
     ///     }
     /// }
     /// ```
@@ -425,13 +431,16 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
     ///     fn execute(
     ///         &self,
     ///         partition: usize,
-    ///         context: Arc<TaskContext>
+    ///         context: Arc<TaskContext>,
     ///     ) -> Result<SendableRecordBatchStream> {
     ///         // A future that yields a stream
     ///         let fut = get_batch_stream();
     ///         // Use TryStreamExt::try_flatten to flatten the stream of 
streams
     ///         let stream = futures::stream::once(fut).try_flatten();
-    ///         Ok(Box::pin(RecordBatchStreamAdapter::new(self.schema.clone(), 
stream)))
+    ///         Ok(Box::pin(RecordBatchStreamAdapter::new(
+    ///             self.schema.clone(),
+    ///             stream,
+    ///         )))
     ///     }
     /// }
     /// ```
diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs 
b/datafusion/physical-plan/src/joins/hash_join/exec.rs
index b5fe5ee5cd..0a582bd911 100644
--- a/datafusion/physical-plan/src/joins/hash_join/exec.rs
+++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs
@@ -238,7 +238,6 @@ impl JoinLeftData {
 ///            └───────┘                                                    │  
        └───────┘        │
 ///                                                                         │  
                         │
 ///                                                                         
└───────────────────────────┘
-///
 /// ```
 ///
 /// 2. the **probe phase** where the tuples of the probe side are streamed
@@ -273,7 +272,6 @@ impl JoinLeftData {
 ///     └────────────┘                                            
└────────────┘
 ///
 ///        build side                                                probe side
-///
 /// ```
 ///
 /// # Example "Optimal" Plans
diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs 
b/datafusion/physical-plan/src/joins/hash_join/stream.rs
index 88c50c2eb2..bb3465365e 100644
--- a/datafusion/physical-plan/src/joins/hash_join/stream.rs
+++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs
@@ -115,7 +115,6 @@ impl BuildSide {
 ///  │          │
 ///  │          ▼
 ///  └─ ProcessProbeBatch
-///
 /// ```
 #[derive(Debug, Clone)]
 pub(super) enum HashJoinStreamState {
diff --git a/datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs 
b/datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs
index 987f3e9df4..a9ea92f2d9 100644
--- a/datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs
+++ b/datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs
@@ -156,7 +156,6 @@ use crate::{DisplayAs, DisplayFormatType, 
ExecutionPlanProperties};
 ///       ├──────────────────┤  
 ///     5 │       400        │
 ///       └──────────────────┘     
-///
 /// ```
 ///
 /// ## Existence Joins (Semi, Anti, Mark)
diff --git a/datafusion/physical-plan/src/joins/stream_join_utils.rs 
b/datafusion/physical-plan/src/joins/stream_join_utils.rs
index 9f5485ee93..3e4cbc5d33 100644
--- a/datafusion/physical-plan/src/joins/stream_join_utils.rs
+++ b/datafusion/physical-plan/src/joins/stream_join_utils.rs
@@ -655,7 +655,6 @@ pub fn combine_two_batches(
 /// * `visited` - A hash set to store the visited indices.
 /// * `offset` - An offset to the indices in the `PrimitiveArray`.
 /// * `indices` - The input `PrimitiveArray` of type `T` which stores the 
indices to be recorded.
-///
 pub fn record_visited_indices<T: ArrowPrimitiveType>(
     visited: &mut HashSet<usize>,
     offset: usize,
diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs 
b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
index b55b7e15f1..be4646e88b 100644
--- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
+++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
@@ -796,7 +796,6 @@ fn need_to_produce_result_in_final(build_side: JoinSide, 
join_type: JoinType) ->
 /// # Returns
 ///
 /// A tuple of two arrays of primitive types representing the build and probe 
indices.
-///
 fn calculate_indices_by_join_type<L: ArrowPrimitiveType, R: 
ArrowPrimitiveType>(
     build_side: JoinSide,
     prune_length: usize,
diff --git a/datafusion/physical-plan/src/joins/utils.rs 
b/datafusion/physical-plan/src/joins/utils.rs
index 78652d443d..9b589b674c 100644
--- a/datafusion/physical-plan/src/joins/utils.rs
+++ b/datafusion/physical-plan/src/joins/utils.rs
@@ -221,7 +221,6 @@ pub struct ColumnIndex {
 
 /// Returns the output field given the input field. Outer joins may
 /// insert nulls even if the input was not null
-///
 fn output_join_field(old_field: &Field, join_type: &JoinType, is_left: bool) 
-> Field {
     let force_nullable = match join_type {
         JoinType::Inner => false,
diff --git a/datafusion/physical-plan/src/metrics/builder.rs 
b/datafusion/physical-plan/src/metrics/builder.rs
index bf59dccf66..1e86cd9d31 100644
--- a/datafusion/physical-plan/src/metrics/builder.rs
+++ b/datafusion/physical-plan/src/metrics/builder.rs
@@ -31,19 +31,18 @@ use super::{
 /// case of constant strings
 ///
 /// ```rust
-///  use datafusion_physical_plan::metrics::*;
+/// use datafusion_physical_plan::metrics::*;
 ///
-///  let metrics = ExecutionPlanMetricsSet::new();
-///  let partition = 1;
+/// let metrics = ExecutionPlanMetricsSet::new();
+/// let partition = 1;
 ///
-///  // Create the standard output_rows metric
-///  let output_rows = MetricBuilder::new(&metrics).output_rows(partition);
-///
-///  // Create a operator specific counter with some labels
-///  let num_bytes = MetricBuilder::new(&metrics)
-///    .with_new_label("filename", "my_awesome_file.parquet")
-///    .counter("num_bytes", partition);
+/// // Create the standard output_rows metric
+/// let output_rows = MetricBuilder::new(&metrics).output_rows(partition);
 ///
+/// // Create a operator specific counter with some labels
+/// let num_bytes = MetricBuilder::new(&metrics)
+///     .with_new_label("filename", "my_awesome_file.parquet")
+///     .counter("num_bytes", partition);
 /// ```
 pub struct MetricBuilder<'a> {
     /// Location that the metric created by this builder will be added do
diff --git a/datafusion/physical-plan/src/metrics/custom.rs 
b/datafusion/physical-plan/src/metrics/custom.rs
index 546af6f333..4421db94dc 100644
--- a/datafusion/physical-plan/src/metrics/custom.rs
+++ b/datafusion/physical-plan/src/metrics/custom.rs
@@ -64,7 +64,8 @@ use std::{any::Any, fmt::Debug, fmt::Display, sync::Arc};
 ///
 ///     fn aggregate(&self, other: Arc<dyn CustomMetricValue>) {
 ///         let other = other.as_any().downcast_ref::<Self>().unwrap();
-///         self.count.fetch_add(other.count.load(Ordering::Relaxed), 
Ordering::Relaxed);
+///         self.count
+///             .fetch_add(other.count.load(Ordering::Relaxed), 
Ordering::Relaxed);
 ///     }
 ///
 ///     fn as_any(&self) -> &dyn Any {
diff --git a/datafusion/physical-plan/src/metrics/mod.rs 
b/datafusion/physical-plan/src/metrics/mod.rs
index c9ddbe8f89..fde748f8f3 100644
--- a/datafusion/physical-plan/src/metrics/mod.rs
+++ b/datafusion/physical-plan/src/metrics/mod.rs
@@ -47,24 +47,23 @@ pub use value::{
 /// [`ExecutionPlanMetricsSet`].
 ///
 /// ```
-///  use datafusion_physical_plan::metrics::*;
+/// use datafusion_physical_plan::metrics::*;
 ///
-///  let metrics = ExecutionPlanMetricsSet::new();
-///  assert!(metrics.clone_inner().output_rows().is_none());
+/// let metrics = ExecutionPlanMetricsSet::new();
+/// assert!(metrics.clone_inner().output_rows().is_none());
 ///
-///  // Create a counter to increment using the MetricBuilder
-///  let partition = 1;
-///  let output_rows = MetricBuilder::new(&metrics)
-///      .output_rows(partition);
+/// // Create a counter to increment using the MetricBuilder
+/// let partition = 1;
+/// let output_rows = MetricBuilder::new(&metrics).output_rows(partition);
 ///
-///  // Counter can be incremented
-///  output_rows.add(13);
+/// // Counter can be incremented
+/// output_rows.add(13);
 ///
-///  // The value can be retrieved directly:
-///  assert_eq!(output_rows.value(), 13);
+/// // The value can be retrieved directly:
+/// assert_eq!(output_rows.value(), 13);
 ///
-///  // As well as from the metrics set
-///  assert_eq!(metrics.clone_inner().output_rows(), Some(13));
+/// // As well as from the metrics set
+/// assert_eq!(metrics.clone_inner().output_rows(), Some(13));
 /// ```
 ///
 /// [`ExecutionPlan`]: super::ExecutionPlan
diff --git a/datafusion/physical-plan/src/projection.rs 
b/datafusion/physical-plan/src/projection.rs
index 2c84570b33..ead2196860 100644
--- a/datafusion/physical-plan/src/projection.rs
+++ b/datafusion/physical-plan/src/projection.rs
@@ -110,18 +110,22 @@ impl ProjectionExec {
     /// let b = col("b", &schema).unwrap();
     /// let a_plus_b = binary(Arc::clone(&a), Operator::Plus, b, 
&schema).unwrap();
     /// // create ProjectionExec
-    /// let proj = ProjectionExec::try_new([
-    ///     ProjectionExpr {
-    ///       // expr a produces the column named "a"
-    ///       expr: a,
-    ///       alias: "a".to_string(),
-    ///     },
-    ///     ProjectionExpr {
-    ///       // expr: a + b produces the column named "sum_ab"
-    ///       expr: a_plus_b,
-    ///       alias: "sum_ab".to_string(),
-    ///     }
-    ///   ], input()).unwrap();
+    /// let proj = ProjectionExec::try_new(
+    ///     [
+    ///         ProjectionExpr {
+    ///             // expr a produces the column named "a"
+    ///             expr: a,
+    ///             alias: "a".to_string(),
+    ///         },
+    ///         ProjectionExpr {
+    ///             // expr: a + b produces the column named "sum_ab"
+    ///             expr: a_plus_b,
+    ///             alias: "sum_ab".to_string(),
+    ///         },
+    ///     ],
+    ///     input(),
+    /// )
+    /// .unwrap();
     /// # }
     /// ```
     pub fn try_new<I, E>(expr: I, input: Arc<dyn ExecutionPlan>) -> 
Result<Self>
diff --git a/datafusion/physical-plan/src/recursive_query.rs 
b/datafusion/physical-plan/src/recursive_query.rs
index b4cdf2dff2..163f214444 100644
--- a/datafusion/physical-plan/src/recursive_query.rs
+++ b/datafusion/physical-plan/src/recursive_query.rs
@@ -247,7 +247,6 @@ impl DisplayAs for RecursiveQueryExec {
 ///    while batch := recursive_stream.next():
 ///        buffer.append(batch)
 ///        yield buffer
-///
 struct RecursiveQueryStream {
     /// The context to be used for managing handlers & executing new tasks
     task_context: Arc<TaskContext>,
diff --git a/datafusion/physical-plan/src/repartition/mod.rs 
b/datafusion/physical-plan/src/repartition/mod.rs
index 8174f71c31..2128304e07 100644
--- a/datafusion/physical-plan/src/repartition/mod.rs
+++ b/datafusion/physical-plan/src/repartition/mod.rs
@@ -476,10 +476,10 @@ impl BatchPartitioner {
 ///        │                  │                  │
 ///        │                  │                  │
 ///        │                  │                  │
-///┌───────────────┐  ┌───────────────┐  ┌───────────────┐
-///│    GroupBy    │  │    GroupBy    │  │    GroupBy    │
-///│   (Partial)   │  │   (Partial)   │  │   (Partial)   │
-///└───────────────┘  └───────────────┘  └───────────────┘
+/// ┌───────────────┐  ┌───────────────┐  ┌───────────────┐
+/// │    GroupBy    │  │    GroupBy    │  │    GroupBy    │
+/// │   (Partial)   │  │   (Partial)   │  │   (Partial)   │
+/// └───────────────┘  └───────────────┘  └───────────────┘
 ///        ▲                  ▲                  ▲
 ///        └──────────────────┼──────────────────┘
 ///                           │
@@ -498,7 +498,7 @@ impl BatchPartitioner {
 ///     ╲               ╱           ╲               ╱
 ///      '─.         ,─'             '─.         ,─'
 ///         `───────'                   `───────'
-///```
+/// ```
 ///
 /// # Error Handling
 ///
@@ -2158,7 +2158,6 @@ mod test {
     ///
     /// `$EXPECTED_PLAN_LINES`: input plan
     /// `$PLAN`: the plan to optimized
-    ///
     macro_rules! assert_plan {
         ($PLAN: expr,  @ $EXPECTED: expr) => {
             let formatted = crate::displayable($PLAN).indent(true).to_string();
diff --git a/datafusion/physical-plan/src/sorts/merge.rs 
b/datafusion/physical-plan/src/sorts/merge.rs
index 0b0136cd12..720a3e53e4 100644
--- a/datafusion/physical-plan/src/sorts/merge.rs
+++ b/datafusion/physical-plan/src/sorts/merge.rs
@@ -390,7 +390,6 @@ impl<C: CursorValues> SortPreservingMergeStream<C> {
     ///
     /// Zooming in at node 2 in the loser tree as an example, we can see that
     /// it takes as input the next item at (S0) and the loser of (S3, S4).
-    ///
     #[inline]
     fn lt_leaf_node_index(&self, cursor_index: usize) -> usize {
         (self.cursors.len() + cursor_index) / 2
diff --git a/datafusion/physical-plan/src/sorts/multi_level_merge.rs 
b/datafusion/physical-plan/src/sorts/multi_level_merge.rs
index 58d046cc90..6e7a5e7a72 100644
--- a/datafusion/physical-plan/src/sorts/multi_level_merge.rs
+++ b/datafusion/physical-plan/src/sorts/multi_level_merge.rs
@@ -125,7 +125,6 @@ use futures::{Stream, StreamExt};
 ///    available during merge operations.
 /// 2. **Adaptive Buffer Sizing**: Reduces buffer sizes when memory is 
constrained
 /// 3. **Spill-to-Disk**: Spill to disk when we cannot merge all files in 
memory
-///
 pub(crate) struct MultiLevelMergeBuilder {
     spill_manager: SpillManager,
     schema: SchemaRef,
diff --git a/datafusion/physical-plan/src/sorts/partial_sort.rs 
b/datafusion/physical-plan/src/sorts/partial_sort.rs
index 513081e627..7a623b0c30 100644
--- a/datafusion/physical-plan/src/sorts/partial_sort.rs
+++ b/datafusion/physical-plan/src/sorts/partial_sort.rs
@@ -32,7 +32,7 @@
 //! | 0 | 1 | 1 |
 //! | 0 | 2 | 0 |
 //! +---+---+---+
-//!```
+//! ```
 //!
 //! and required ordering for the plan is `a ASC, b ASC, d ASC`.
 //! The first 3 rows(segment) can be sorted as the segment already
@@ -46,7 +46,7 @@
 //! +---+---+---+
 //! | 0 | 2 | 4 |
 //! +---+---+---+
-//!```
+//! ```
 //!
 //! The plan concats incoming data with such last rows of previous input
 //! and continues partial sorting of the segments.
diff --git a/datafusion/physical-plan/src/sorts/sort.rs 
b/datafusion/physical-plan/src/sorts/sort.rs
index bd798ab4f5..a95fad19f6 100644
--- a/datafusion/physical-plan/src/sorts/sort.rs
+++ b/datafusion/physical-plan/src/sorts/sort.rs
@@ -133,7 +133,6 @@ impl ExternalSorterMetrics {
 ///    └─────┘
 ///
 /// in_mem_batches
-///
 /// ```
 ///
 /// # When data does not fit in available memory
diff --git a/datafusion/physical-plan/src/stream.rs 
b/datafusion/physical-plan/src/stream.rs
index 100a6a7ffc..480b723d0b 100644
--- a/datafusion/physical-plan/src/stream.rs
+++ b/datafusion/physical-plan/src/stream.rs
@@ -207,7 +207,9 @@ impl<O: Send + 'static> ReceiverStreamBuilder<O> {
 /// let schema_1 = Arc::clone(&schema);
 /// builder.spawn(async move {
 ///     // Your task needs to send batches to the tx
-///     tx_1.send(Ok(RecordBatch::new_empty(schema_1))).await.unwrap();
+///     tx_1.send(Ok(RecordBatch::new_empty(schema_1)))
+///         .await
+///         .unwrap();
 ///
 ///     Ok(())
 /// });
@@ -217,7 +219,9 @@ impl<O: Send + 'static> ReceiverStreamBuilder<O> {
 /// let schema_2 = Arc::clone(&schema);
 /// builder.spawn(async move {
 ///     // Your task needs to send batches to the tx
-///     tx_2.send(Ok(RecordBatch::new_empty(schema_2))).await.unwrap();
+///     tx_2.send(Ok(RecordBatch::new_empty(schema_2)))
+///         .await
+///         .unwrap();
 ///
 ///     Ok(())
 /// });
@@ -417,9 +421,10 @@ impl<S> RecordBatchStreamAdapter<S> {
     /// # use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
     /// // Create stream of Result<RecordBatch>
     /// let batch = record_batch!(
-    ///   ("a", Int32, [1, 2, 3]),
-    ///   ("b", Float64, [Some(4.0), None, Some(5.0)])
-    /// ).expect("created batch");
+    ///     ("a", Int32, [1, 2, 3]),
+    ///     ("b", Float64, [Some(4.0), None, Some(5.0)])
+    /// )
+    /// .expect("created batch");
     /// let schema = batch.schema();
     /// let stream = futures::stream::iter(vec![Ok(batch)]);
     /// // Convert the stream to a SendableRecordBatchStream
diff --git a/datafusion/physical-plan/src/test/exec.rs 
b/datafusion/physical-plan/src/test/exec.rs
index 12ffca871f..b720181b27 100644
--- a/datafusion/physical-plan/src/test/exec.rs
+++ b/datafusion/physical-plan/src/test/exec.rs
@@ -291,7 +291,6 @@ fn clone_error(e: &DataFusionError) -> DataFusionError {
 
 /// A Mock ExecutionPlan that does not start producing input until a
 /// barrier is called
-///
 #[derive(Debug)]
 pub struct BarrierExec {
     /// partitions to send back
diff --git a/datafusion/physical-plan/src/union.rs 
b/datafusion/physical-plan/src/union.rs
index 164f17edeb..c95678dac9 100644
--- a/datafusion/physical-plan/src/union.rs
+++ b/datafusion/physical-plan/src/union.rs
@@ -67,14 +67,14 @@ use tokio::macros::support::thread_rng_n;
 /// partitions, and then next `M` output partitions are from Input 2.
 ///
 /// ```text
-///                       ▲       ▲           ▲         ▲
-///                       │       │           │         │
-///     Output            │  ...  │           │         │
-///   Partitions          │0      │N-1        │ N       │N+M-1
-///(passes through   ┌────┴───────┴───────────┴─────────┴───┐
-/// the N+M input    │              UnionExec               │
-///  partitions)     │                                      │
-///                  └──────────────────────────────────────┘
+///                        ▲       ▲           ▲         ▲
+///                        │       │           │         │
+///      Output            │  ...  │           │         │
+///    Partitions          │0      │N-1        │ N       │N+M-1
+/// (passes through   ┌────┴───────┴───────────┴─────────┴───┐
+///  the N+M input    │              UnionExec               │
+///   partitions)     │                                      │
+///                   └──────────────────────────────────────┘
 ///                                      ▲
 ///                                      │
 ///                                      │
diff --git a/datafusion/physical-plan/src/unnest.rs 
b/datafusion/physical-plan/src/unnest.rs
index 026a7fbcd0..7212c76413 100644
--- a/datafusion/physical-plan/src/unnest.rs
+++ b/datafusion/physical-plan/src/unnest.rs
@@ -760,7 +760,6 @@ fn build_batch(
 /// ```ignore
 /// longest_length: [3, 1, 1, 2]
 /// ```
-///
 fn find_longest_length(
     list_arrays: &[ArrayRef],
     options: &UnnestOptions,
@@ -881,7 +880,6 @@ fn unnest_list_arrays(
 /// ```ignore
 /// [1, null, 2, 3, 4, null, null, 5, null, null]
 /// ```
-///
 fn unnest_list_array(
     list_array: &dyn ListArrayType,
     length_array: &PrimitiveArray<Int64Type>,
@@ -929,7 +927,6 @@ fn unnest_list_array(
 /// ```ignore
 /// [0, 0, 1, 1, 1, 2]
 /// ```
-///
 fn create_take_indices(
     length_array: &PrimitiveArray<Int64Type>,
     capacity: usize,
@@ -994,7 +991,6 @@ fn create_take_indices(
 /// ```ignore
 /// c1: 1, null, 2, 3, 4, null, 5, 6  // Repeated using `indices`
 /// c2: null, null, null, null, null, null, null, null  // Replaced with nulls
-///
 fn repeat_arrs_from_indices(
     batch: &[ArrayRef],
     indices: &PrimitiveArray<Int64Type>,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to