This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 73038f50dd chore: Format examples in doc strings - physical expr,
optimizer, and plan (#18357)
73038f50dd is described below
commit 73038f50dd7e1086a36589b4a5ee1e8db18b96f3
Author: Yu-Chuan Hung <[email protected]>
AuthorDate: Sat Nov 1 20:44:55 2025 +0800
chore: Format examples in doc strings - physical expr, optimizer, and plan
(#18357)
## Which issue does this PR close?
Part of #16915
## Rationale for this change
Format code examples in documentation comments to improve readability
and maintain consistent code style across the codebase. This is part of
a multi-PR effort to format all doc comment examples and eventually
enable CI checks to enforce this formatting.
## What changes are included in this PR?
Run `cargo fmt -p <crate> -- --config format_code_in_doc_comments=true`
for the following datasource-related crates:
- `datafusion-physical-expr`
- `datafusion-physical-expr-adapter`
- `datafusion-physical-expr-common`
- `datafusion-physical-optimizer`
- `datafusion-physical-plan`
Additionally, add some spaces to maintain the ASCII art indentation.
## Are these changes tested?
No testing needed - this is purely a formatting change with no
functional modifications.
## Are there any user-facing changes?
No - this only affects documentation formatting.
---
.../physical-expr-common/src/physical_expr.rs | 1 -
.../src/equivalence/properties/mod.rs | 9 ++++---
datafusion/physical-expr/src/expressions/case.rs | 1 -
datafusion/physical-expr/src/expressions/column.rs | 6 ++---
.../physical-expr/src/intervals/cp_solver.rs | 8 ++-----
datafusion/physical-expr/src/physical_expr.rs | 16 ++++++++-----
datafusion/physical-expr/src/projection.rs | 10 ++++----
.../src/combine_partial_final_agg.rs | 1 -
.../physical-optimizer/src/enforce_distribution.rs | 1 -
.../physical-optimizer/src/join_selection.rs | 1 -
.../src/aggregates/group_values/mod.rs | 1 -
.../aggregates/group_values/multi_group_by/mod.rs | 11 ---------
.../group_values/single_group_by/primitive.rs | 1 -
.../physical-plan/src/aggregates/order/partial.rs | 2 +-
.../physical-plan/src/aggregates/row_hash.rs | 1 -
datafusion/physical-plan/src/execution_plan.rs | 21 +++++++++++-----
.../physical-plan/src/joins/hash_join/exec.rs | 2 --
.../physical-plan/src/joins/hash_join/stream.rs | 1 -
.../src/joins/piecewise_merge_join/exec.rs | 1 -
.../physical-plan/src/joins/stream_join_utils.rs | 1 -
.../physical-plan/src/joins/symmetric_hash_join.rs | 1 -
datafusion/physical-plan/src/joins/utils.rs | 1 -
datafusion/physical-plan/src/metrics/builder.rs | 19 +++++++--------
datafusion/physical-plan/src/metrics/custom.rs | 3 ++-
datafusion/physical-plan/src/metrics/mod.rs | 25 ++++++++++---------
datafusion/physical-plan/src/projection.rs | 28 ++++++++++++----------
datafusion/physical-plan/src/recursive_query.rs | 1 -
datafusion/physical-plan/src/repartition/mod.rs | 11 ++++-----
datafusion/physical-plan/src/sorts/merge.rs | 1 -
.../physical-plan/src/sorts/multi_level_merge.rs | 1 -
datafusion/physical-plan/src/sorts/partial_sort.rs | 4 ++--
datafusion/physical-plan/src/sorts/sort.rs | 1 -
datafusion/physical-plan/src/stream.rs | 15 ++++++++----
datafusion/physical-plan/src/test/exec.rs | 1 -
datafusion/physical-plan/src/union.rs | 16 ++++++-------
datafusion/physical-plan/src/unnest.rs | 4 ----
36 files changed, 106 insertions(+), 122 deletions(-)
diff --git a/datafusion/physical-expr-common/src/physical_expr.rs
b/datafusion/physical-expr-common/src/physical_expr.rs
index e5e7d6c00f..492383663d 100644
--- a/datafusion/physical-expr-common/src/physical_expr.rs
+++ b/datafusion/physical-expr-common/src/physical_expr.rs
@@ -341,7 +341,6 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug
+ DynEq + DynHash {
/// representation.
///
/// See the [`fmt_sql`] function for an example of printing
`PhysicalExpr`s as SQL.
- ///
fn fmt_sql(&self, f: &mut Formatter<'_>) -> fmt::Result;
/// Take a snapshot of this `PhysicalExpr`, if it is dynamic.
diff --git a/datafusion/physical-expr/src/equivalence/properties/mod.rs
b/datafusion/physical-expr/src/equivalence/properties/mod.rs
index 2404b8f0dd..4d919d623b 100644
--- a/datafusion/physical-expr/src/equivalence/properties/mod.rs
+++ b/datafusion/physical-expr/src/equivalence/properties/mod.rs
@@ -123,11 +123,14 @@ use itertools::Itertools;
/// let mut eq_properties = EquivalenceProperties::new(schema);
/// eq_properties.add_constants(vec![ConstExpr::from(col_b)]);
/// eq_properties.add_ordering([
-/// PhysicalSortExpr::new_default(col_a).asc(),
-/// PhysicalSortExpr::new_default(col_c).desc(),
+/// PhysicalSortExpr::new_default(col_a).asc(),
+/// PhysicalSortExpr::new_default(col_c).desc(),
/// ]);
///
-/// assert_eq!(eq_properties.to_string(), "order: [[a@0 ASC, c@2 DESC]], eq:
[{members: [b@1], constant: (heterogeneous)}]");
+/// assert_eq!(
+/// eq_properties.to_string(),
+/// "order: [[a@0 ASC, c@2 DESC]], eq: [{members: [b@1], constant:
(heterogeneous)}]"
+/// );
/// ```
#[derive(Clone, Debug)]
pub struct EquivalenceProperties {
diff --git a/datafusion/physical-expr/src/expressions/case.rs
b/datafusion/physical-expr/src/expressions/case.rs
index 191f5ba529..9ffb571a26 100644
--- a/datafusion/physical-expr/src/expressions/case.rs
+++ b/datafusion/physical-expr/src/expressions/case.rs
@@ -341,7 +341,6 @@ fn filter_array(
/// │└─────────┘│ │ 2 │ │ D │
/// └───────────┘ └─────────┘ └─────────┘
/// values indices result
-///
/// ```
fn merge(values: &[ArrayData], indices: &[PartialResultIndex]) ->
Result<ArrayRef> {
#[cfg(debug_assertions)]
diff --git a/datafusion/physical-expr/src/expressions/column.rs
b/datafusion/physical-expr/src/expressions/column.rs
index c9f3fb00f0..9ca464b304 100644
--- a/datafusion/physical-expr/src/expressions/column.rs
+++ b/datafusion/physical-expr/src/expressions/column.rs
@@ -49,9 +49,9 @@ use datafusion_expr::ColumnarValue;
/// # use arrow::datatypes::{DataType, Field, Schema};
/// // Schema with columns a, b, c
/// let schema = Schema::new(vec![
-/// Field::new("a", DataType::Int32, false),
-/// Field::new("b", DataType::Int32, false),
-/// Field::new("c", DataType::Int32, false),
+/// Field::new("a", DataType::Int32, false),
+/// Field::new("b", DataType::Int32, false),
+/// Field::new("c", DataType::Int32, false),
/// ]);
///
/// // reference to column b is index 1
diff --git a/datafusion/physical-expr/src/intervals/cp_solver.rs
b/datafusion/physical-expr/src/intervals/cp_solver.rs
index c44197bbbe..be0e5e1fa6 100644
--- a/datafusion/physical-expr/src/intervals/cp_solver.rs
+++ b/datafusion/physical-expr/src/intervals/cp_solver.rs
@@ -579,15 +579,11 @@ impl ExprIntervalGraph {
///
/// let mut graph = ExprIntervalGraph::try_new(expr, &schema).unwrap();
/// // Do it once, while constructing.
- /// let node_indices = graph
- /// .gather_node_indices(&[Arc::new(Column::new("gnz", 0))]);
+ /// let node_indices =
graph.gather_node_indices(&[Arc::new(Column::new("gnz", 0))]);
/// let left_index = node_indices.get(0).unwrap().1;
///
/// // Provide intervals for leaf variables (here, there is only one).
- /// let intervals = vec![(
- /// left_index,
- /// Interval::make(Some(10), Some(20)).unwrap(),
- /// )];
+ /// let intervals = vec![(left_index, Interval::make(Some(10),
Some(20)).unwrap())];
///
/// // Evaluate bounds for the composite expression:
/// graph.assign_intervals(&intervals);
diff --git a/datafusion/physical-expr/src/physical_expr.rs
b/datafusion/physical-expr/src/physical_expr.rs
index 2cc484ec6a..c658a8eddc 100644
--- a/datafusion/physical-expr/src/physical_expr.rs
+++ b/datafusion/physical-expr/src/physical_expr.rs
@@ -118,12 +118,16 @@ pub fn physical_exprs_bag_equal(
/// ]);
///
/// let sort_exprs = vec![
-/// vec![
-/// SortExpr { expr: Expr::Column(Column::new(Some("t"), "id")), asc:
true, nulls_first: false }
-/// ],
-/// vec![
-/// SortExpr { expr: Expr::Column(Column::new(Some("t"), "name")),
asc: false, nulls_first: true }
-/// ]
+/// vec![SortExpr {
+/// expr: Expr::Column(Column::new(Some("t"), "id")),
+/// asc: true,
+/// nulls_first: false,
+/// }],
+/// vec![SortExpr {
+/// expr: Expr::Column(Column::new(Some("t"), "name")),
+/// asc: false,
+/// nulls_first: true,
+/// }],
/// ];
/// let result = create_ordering(&schema, &sort_exprs).unwrap();
/// ```
diff --git a/datafusion/physical-expr/src/projection.rs
b/datafusion/physical-expr/src/projection.rs
index fc972d644e..c707d3ccff 100644
--- a/datafusion/physical-expr/src/projection.rs
+++ b/datafusion/physical-expr/src/projection.rs
@@ -166,9 +166,9 @@ impl ProjectionExprs {
/// # Example
///
/// ```rust
- /// use std::sync::Arc;
- /// use arrow::datatypes::{Schema, Field, DataType};
+ /// use arrow::datatypes::{DataType, Field, Schema};
/// use datafusion_physical_expr::projection::ProjectionExprs;
+ /// use std::sync::Arc;
///
/// // Create a schema with three columns
/// let schema = Arc::new(Schema::new(vec![
@@ -234,11 +234,11 @@ impl ProjectionExprs {
/// # Example
///
/// ```rust
- /// use std::sync::Arc;
- /// use datafusion_physical_expr::projection::{ProjectionExprs,
ProjectionExpr};
- /// use datafusion_physical_expr::expressions::{Column, BinaryExpr,
Literal};
/// use datafusion_common::{Result, ScalarValue};
/// use datafusion_expr::Operator;
+ /// use datafusion_physical_expr::expressions::{BinaryExpr, Column,
Literal};
+ /// use datafusion_physical_expr::projection::{ProjectionExpr,
ProjectionExprs};
+ /// use std::sync::Arc;
///
/// fn main() -> Result<()> {
/// // Example from the docstring:
diff --git a/datafusion/physical-optimizer/src/combine_partial_final_agg.rs
b/datafusion/physical-optimizer/src/combine_partial_final_agg.rs
index 86f7e73e9e..bffb2c9df9 100644
--- a/datafusion/physical-optimizer/src/combine_partial_final_agg.rs
+++ b/datafusion/physical-optimizer/src/combine_partial_final_agg.rs
@@ -36,7 +36,6 @@ use datafusion_physical_expr::{physical_exprs_equal,
PhysicalExpr};
/// into a Single AggregateExec if their grouping exprs and aggregate exprs
equal.
///
/// This rule should be applied after the EnforceDistribution and
EnforceSorting rules
-///
#[derive(Default, Debug)]
pub struct CombinePartialFinalAggregate {}
diff --git a/datafusion/physical-optimizer/src/enforce_distribution.rs
b/datafusion/physical-optimizer/src/enforce_distribution.rs
index 898386e2f9..e9e28fec06 100644
--- a/datafusion/physical-optimizer/src/enforce_distribution.rs
+++ b/datafusion/physical-optimizer/src/enforce_distribution.rs
@@ -281,7 +281,6 @@ pub type PlanWithKeyRequirements = PlanContext<Vec<Arc<dyn
PhysicalExpr>>>;
/// 3) If the current plan is RepartitionExec, CoalescePartitionsExec or
WindowAggExec, clear all the requirements, return the unchanged plan
/// 4) If the current plan is Projection, transform the requirements to the
columns before the Projection and push down requirements
/// 5) For other types of operators, by default, pushdown the parent
requirements to children.
-///
pub fn adjust_input_keys_ordering(
mut requirements: PlanWithKeyRequirements,
) -> Result<Transformed<PlanWithKeyRequirements>> {
diff --git a/datafusion/physical-optimizer/src/join_selection.rs
b/datafusion/physical-optimizer/src/join_selection.rs
index 1db4d7b305..b55c01f62e 100644
--- a/datafusion/physical-optimizer/src/join_selection.rs
+++ b/datafusion/physical-optimizer/src/join_selection.rs
@@ -476,7 +476,6 @@ fn hash_join_convert_symmetric_subrule(
/// | Data Source |--------------| Repartition |
/// | | | |
/// +--------------+ +--------------+
-///
/// ```
pub fn hash_join_swap_subrule(
mut input: Arc<dyn ExecutionPlan>,
diff --git a/datafusion/physical-plan/src/aggregates/group_values/mod.rs
b/datafusion/physical-plan/src/aggregates/group_values/mod.rs
index 5f2a2faa11..4bd7f03506 100644
--- a/datafusion/physical-plan/src/aggregates/group_values/mod.rs
+++ b/datafusion/physical-plan/src/aggregates/group_values/mod.rs
@@ -131,7 +131,6 @@ pub trait GroupValues: Send {
/// `GroupColumn`:
crate::aggregates::group_values::multi_group_by::GroupColumn
/// `GroupValuesColumn`:
crate::aggregates::group_values::multi_group_by::GroupValuesColumn
/// `GroupValuesRows`: crate::aggregates::group_values::row::GroupValuesRows
-///
pub fn new_group_values(
schema: SchemaRef,
group_ordering: &GroupOrdering,
diff --git
a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs
b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs
index 58bd35d640..9adf028eca 100644
--- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs
+++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs
@@ -77,7 +77,6 @@ pub trait GroupColumn: Send + Sync {
///
/// And if found nth result in `equal_to_results` is already
/// `false`, the check for nth row will be skipped.
- ///
fn vectorized_equal_to(
&self,
lhs_rows: &[usize],
@@ -137,7 +136,6 @@ pub fn nulls_equal_to(lhs_null: bool, rhs_null: bool) ->
Option<bool> {
/// +---------------------+---------------------------------------------+
///
/// `inlined flag`: 1 represents `non-inlined`, and 0 represents `inlined`
-///
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct GroupIndexView(u64);
@@ -166,7 +164,6 @@ impl GroupIndexView {
/// A [`GroupValues`] that stores multiple columns of group values,
/// and supports vectorized operators for them
-///
pub struct GroupValuesColumn<const STREAMING: bool> {
/// The output schema
schema: SchemaRef,
@@ -184,7 +181,6 @@ pub struct GroupValuesColumn<const STREAMING: bool> {
/// instead we store the `group indices` pointing to values in
`GroupValues`.
/// And we use [`GroupIndexView`] to represent such `group indices` in
table.
///
- ///
map: HashTable<(u64, GroupIndexView)>,
/// The size of `map` in bytes
@@ -197,7 +193,6 @@ pub struct GroupValuesColumn<const STREAMING: bool> {
///
/// The chained indices is like:
/// `latest group index -> older group index -> even older group index
-> ...`
- ///
group_index_lists: Vec<Vec<usize>>,
/// When emitting first n, we need to decrease/erase group indices in
@@ -323,7 +318,6 @@ impl<const STREAMING: bool> GroupValuesColumn<STREAMING> {
///
/// `Group indices` order are against with their input order, and this
will lead to error
/// in `streaming aggregation`.
- ///
fn scalarized_intern(
&mut self,
cols: &[ArrayRef],
@@ -425,7 +419,6 @@ impl<const STREAMING: bool> GroupValuesColumn<STREAMING> {
///
/// The vectorized approach can offer higher performance for avoiding row
by row
/// downcast for `cols` and being able to implement even more
optimizations(like simd).
- ///
fn vectorized_intern(
&mut self,
cols: &[ArrayRef],
@@ -493,7 +486,6 @@ impl<const STREAMING: bool> GroupValuesColumn<STREAMING> {
/// - Check if the `group index view` is `inlined` or `non_inlined`:
/// If it is inlined, add to `vectorized_equal_to_group_indices`
directly.
/// Otherwise get all group indices from `group_index_lists`, and add
them.
- ///
fn collect_vectorized_process_context(
&mut self,
batch_hashes: &[u64],
@@ -721,7 +713,6 @@ impl<const STREAMING: bool> GroupValuesColumn<STREAMING> {
/// The hash collision may be not frequent, so the fallback will indeed
hardly happen.
/// In most situations, `scalarized_indices` will found to be empty after
finishing to
/// preform `vectorized_equal_to`.
- ///
fn scalarized_intern_remaining(
&mut self,
cols: &[ArrayRef],
@@ -886,7 +877,6 @@ impl<const STREAMING: bool> GroupValuesColumn<STREAMING> {
/// `$v`: the vector to push the new builder into
/// `$nullable`: whether the input can contains nulls
/// `$t`: the primitive type of the builder
-///
macro_rules! instantiate_primitive {
($v:expr, $nullable:expr, $t:ty, $data_type:ident) => {
if $nullable {
@@ -1468,7 +1458,6 @@ mod tests {
/// - Group not exist + bucket not found in `map`
/// - Group not exist + not equal to inlined group view(tested in hash
collision)
/// - Group not exist + not equal to non-inlined group view(tested in
hash collision)
- ///
struct VectorizedTestDataSet {
test_batches: Vec<Vec<ArrayRef>>,
expected_batch: RecordBatch,
diff --git
a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs
b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs
index 8b1905e540..f35c580b0e 100644
---
a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs
+++
b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs
@@ -87,7 +87,6 @@ pub struct GroupValuesPrimitive<T: ArrowPrimitiveType> {
/// is obvious in high cardinality group by situation.
/// More details can see:
/// <https://github.com/apache/datafusion/issues/15961>
- ///
map: HashTable<(usize, u64)>,
/// The group index of the null value if any
null_group: Option<usize>,
diff --git a/datafusion/physical-plan/src/aggregates/order/partial.rs
b/datafusion/physical-plan/src/aggregates/order/partial.rs
index 3e495900f7..476551a7ca 100644
--- a/datafusion/physical-plan/src/aggregates/order/partial.rs
+++ b/datafusion/physical-plan/src/aggregates/order/partial.rs
@@ -61,7 +61,7 @@ use datafusion_expr::EmitTo;
/// group indices
/// (in group value group_values current tracks the most
/// order) recent group index
-///```
+/// ```
#[derive(Debug)]
pub struct GroupOrderingPartial {
/// State machine
diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs
b/datafusion/physical-plan/src/aggregates/row_hash.rs
index 98c8cb235c..e8d842cc85 100644
--- a/datafusion/physical-plan/src/aggregates/row_hash.rs
+++ b/datafusion/physical-plan/src/aggregates/row_hash.rs
@@ -298,7 +298,6 @@ impl SkipAggregationProbe {
/// later stream-merge sort on reading back the spilled data does re-grouping.
Note the rows cannot
/// be grouped once spilled onto disk, the read back data needs to be
re-grouped again. In addition,
/// re-grouping may cause out of memory again. Thus, re-grouping has to be a
sort based aggregation.
-///
/// ```text
/// Partial Aggregation [batch_size = 2] (max memory = 3 rows)
///
diff --git a/datafusion/physical-plan/src/execution_plan.rs
b/datafusion/physical-plan/src/execution_plan.rs
index a70cd9cb0d..00fbdde533 100644
--- a/datafusion/physical-plan/src/execution_plan.rs
+++ b/datafusion/physical-plan/src/execution_plan.rs
@@ -354,12 +354,15 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
/// fn execute(
/// &self,
/// partition: usize,
- /// context: Arc<TaskContext>
+ /// context: Arc<TaskContext>,
/// ) -> Result<SendableRecordBatchStream> {
/// // use functions from futures crate convert the batch into a
stream
/// let fut = futures::future::ready(Ok(self.batch.clone()));
/// let stream = futures::stream::once(fut);
- /// Ok(Box::pin(RecordBatchStreamAdapter::new(self.batch.schema(),
stream)))
+ /// Ok(Box::pin(RecordBatchStreamAdapter::new(
+ /// self.batch.schema(),
+ /// stream,
+ /// )))
/// }
/// }
/// ```
@@ -389,11 +392,14 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
/// fn execute(
/// &self,
/// partition: usize,
- /// context: Arc<TaskContext>
+ /// context: Arc<TaskContext>,
/// ) -> Result<SendableRecordBatchStream> {
/// let fut = get_batch();
/// let stream = futures::stream::once(fut);
- /// Ok(Box::pin(RecordBatchStreamAdapter::new(self.schema.clone(),
stream)))
+ /// Ok(Box::pin(RecordBatchStreamAdapter::new(
+ /// self.schema.clone(),
+ /// stream,
+ /// )))
/// }
/// }
/// ```
@@ -425,13 +431,16 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
/// fn execute(
/// &self,
/// partition: usize,
- /// context: Arc<TaskContext>
+ /// context: Arc<TaskContext>,
/// ) -> Result<SendableRecordBatchStream> {
/// // A future that yields a stream
/// let fut = get_batch_stream();
/// // Use TryStreamExt::try_flatten to flatten the stream of
streams
/// let stream = futures::stream::once(fut).try_flatten();
- /// Ok(Box::pin(RecordBatchStreamAdapter::new(self.schema.clone(),
stream)))
+ /// Ok(Box::pin(RecordBatchStreamAdapter::new(
+ /// self.schema.clone(),
+ /// stream,
+ /// )))
/// }
/// }
/// ```
diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs
b/datafusion/physical-plan/src/joins/hash_join/exec.rs
index b5fe5ee5cd..0a582bd911 100644
--- a/datafusion/physical-plan/src/joins/hash_join/exec.rs
+++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs
@@ -238,7 +238,6 @@ impl JoinLeftData {
/// └───────┘ │
└───────┘ │
/// │
│
///
└───────────────────────────┘
-///
/// ```
///
/// 2. the **probe phase** where the tuples of the probe side are streamed
@@ -273,7 +272,6 @@ impl JoinLeftData {
/// └────────────┘
└────────────┘
///
/// build side probe side
-///
/// ```
///
/// # Example "Optimal" Plans
diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs
b/datafusion/physical-plan/src/joins/hash_join/stream.rs
index 88c50c2eb2..bb3465365e 100644
--- a/datafusion/physical-plan/src/joins/hash_join/stream.rs
+++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs
@@ -115,7 +115,6 @@ impl BuildSide {
/// │ │
/// │ ▼
/// └─ ProcessProbeBatch
-///
/// ```
#[derive(Debug, Clone)]
pub(super) enum HashJoinStreamState {
diff --git a/datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs
b/datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs
index 987f3e9df4..a9ea92f2d9 100644
--- a/datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs
+++ b/datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs
@@ -156,7 +156,6 @@ use crate::{DisplayAs, DisplayFormatType,
ExecutionPlanProperties};
/// ├──────────────────┤
/// 5 │ 400 │
/// └──────────────────┘
-///
/// ```
///
/// ## Existence Joins (Semi, Anti, Mark)
diff --git a/datafusion/physical-plan/src/joins/stream_join_utils.rs
b/datafusion/physical-plan/src/joins/stream_join_utils.rs
index 9f5485ee93..3e4cbc5d33 100644
--- a/datafusion/physical-plan/src/joins/stream_join_utils.rs
+++ b/datafusion/physical-plan/src/joins/stream_join_utils.rs
@@ -655,7 +655,6 @@ pub fn combine_two_batches(
/// * `visited` - A hash set to store the visited indices.
/// * `offset` - An offset to the indices in the `PrimitiveArray`.
/// * `indices` - The input `PrimitiveArray` of type `T` which stores the
indices to be recorded.
-///
pub fn record_visited_indices<T: ArrowPrimitiveType>(
visited: &mut HashSet<usize>,
offset: usize,
diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
index b55b7e15f1..be4646e88b 100644
--- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
+++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
@@ -796,7 +796,6 @@ fn need_to_produce_result_in_final(build_side: JoinSide,
join_type: JoinType) ->
/// # Returns
///
/// A tuple of two arrays of primitive types representing the build and probe
indices.
-///
fn calculate_indices_by_join_type<L: ArrowPrimitiveType, R:
ArrowPrimitiveType>(
build_side: JoinSide,
prune_length: usize,
diff --git a/datafusion/physical-plan/src/joins/utils.rs
b/datafusion/physical-plan/src/joins/utils.rs
index 78652d443d..9b589b674c 100644
--- a/datafusion/physical-plan/src/joins/utils.rs
+++ b/datafusion/physical-plan/src/joins/utils.rs
@@ -221,7 +221,6 @@ pub struct ColumnIndex {
/// Returns the output field given the input field. Outer joins may
/// insert nulls even if the input was not null
-///
fn output_join_field(old_field: &Field, join_type: &JoinType, is_left: bool)
-> Field {
let force_nullable = match join_type {
JoinType::Inner => false,
diff --git a/datafusion/physical-plan/src/metrics/builder.rs
b/datafusion/physical-plan/src/metrics/builder.rs
index bf59dccf66..1e86cd9d31 100644
--- a/datafusion/physical-plan/src/metrics/builder.rs
+++ b/datafusion/physical-plan/src/metrics/builder.rs
@@ -31,19 +31,18 @@ use super::{
/// case of constant strings
///
/// ```rust
-/// use datafusion_physical_plan::metrics::*;
+/// use datafusion_physical_plan::metrics::*;
///
-/// let metrics = ExecutionPlanMetricsSet::new();
-/// let partition = 1;
+/// let metrics = ExecutionPlanMetricsSet::new();
+/// let partition = 1;
///
-/// // Create the standard output_rows metric
-/// let output_rows = MetricBuilder::new(&metrics).output_rows(partition);
-///
-/// // Create a operator specific counter with some labels
-/// let num_bytes = MetricBuilder::new(&metrics)
-/// .with_new_label("filename", "my_awesome_file.parquet")
-/// .counter("num_bytes", partition);
+/// // Create the standard output_rows metric
+/// let output_rows = MetricBuilder::new(&metrics).output_rows(partition);
///
+/// // Create a operator specific counter with some labels
+/// let num_bytes = MetricBuilder::new(&metrics)
+/// .with_new_label("filename", "my_awesome_file.parquet")
+/// .counter("num_bytes", partition);
/// ```
pub struct MetricBuilder<'a> {
/// Location that the metric created by this builder will be added do
diff --git a/datafusion/physical-plan/src/metrics/custom.rs
b/datafusion/physical-plan/src/metrics/custom.rs
index 546af6f333..4421db94dc 100644
--- a/datafusion/physical-plan/src/metrics/custom.rs
+++ b/datafusion/physical-plan/src/metrics/custom.rs
@@ -64,7 +64,8 @@ use std::{any::Any, fmt::Debug, fmt::Display, sync::Arc};
///
/// fn aggregate(&self, other: Arc<dyn CustomMetricValue>) {
/// let other = other.as_any().downcast_ref::<Self>().unwrap();
-/// self.count.fetch_add(other.count.load(Ordering::Relaxed),
Ordering::Relaxed);
+/// self.count
+/// .fetch_add(other.count.load(Ordering::Relaxed),
Ordering::Relaxed);
/// }
///
/// fn as_any(&self) -> &dyn Any {
diff --git a/datafusion/physical-plan/src/metrics/mod.rs
b/datafusion/physical-plan/src/metrics/mod.rs
index c9ddbe8f89..fde748f8f3 100644
--- a/datafusion/physical-plan/src/metrics/mod.rs
+++ b/datafusion/physical-plan/src/metrics/mod.rs
@@ -47,24 +47,23 @@ pub use value::{
/// [`ExecutionPlanMetricsSet`].
///
/// ```
-/// use datafusion_physical_plan::metrics::*;
+/// use datafusion_physical_plan::metrics::*;
///
-/// let metrics = ExecutionPlanMetricsSet::new();
-/// assert!(metrics.clone_inner().output_rows().is_none());
+/// let metrics = ExecutionPlanMetricsSet::new();
+/// assert!(metrics.clone_inner().output_rows().is_none());
///
-/// // Create a counter to increment using the MetricBuilder
-/// let partition = 1;
-/// let output_rows = MetricBuilder::new(&metrics)
-/// .output_rows(partition);
+/// // Create a counter to increment using the MetricBuilder
+/// let partition = 1;
+/// let output_rows = MetricBuilder::new(&metrics).output_rows(partition);
///
-/// // Counter can be incremented
-/// output_rows.add(13);
+/// // Counter can be incremented
+/// output_rows.add(13);
///
-/// // The value can be retrieved directly:
-/// assert_eq!(output_rows.value(), 13);
+/// // The value can be retrieved directly:
+/// assert_eq!(output_rows.value(), 13);
///
-/// // As well as from the metrics set
-/// assert_eq!(metrics.clone_inner().output_rows(), Some(13));
+/// // As well as from the metrics set
+/// assert_eq!(metrics.clone_inner().output_rows(), Some(13));
/// ```
///
/// [`ExecutionPlan`]: super::ExecutionPlan
diff --git a/datafusion/physical-plan/src/projection.rs
b/datafusion/physical-plan/src/projection.rs
index 2c84570b33..ead2196860 100644
--- a/datafusion/physical-plan/src/projection.rs
+++ b/datafusion/physical-plan/src/projection.rs
@@ -110,18 +110,22 @@ impl ProjectionExec {
/// let b = col("b", &schema).unwrap();
/// let a_plus_b = binary(Arc::clone(&a), Operator::Plus, b,
&schema).unwrap();
/// // create ProjectionExec
- /// let proj = ProjectionExec::try_new([
- /// ProjectionExpr {
- /// // expr a produces the column named "a"
- /// expr: a,
- /// alias: "a".to_string(),
- /// },
- /// ProjectionExpr {
- /// // expr: a + b produces the column named "sum_ab"
- /// expr: a_plus_b,
- /// alias: "sum_ab".to_string(),
- /// }
- /// ], input()).unwrap();
+ /// let proj = ProjectionExec::try_new(
+ /// [
+ /// ProjectionExpr {
+ /// // expr a produces the column named "a"
+ /// expr: a,
+ /// alias: "a".to_string(),
+ /// },
+ /// ProjectionExpr {
+ /// // expr: a + b produces the column named "sum_ab"
+ /// expr: a_plus_b,
+ /// alias: "sum_ab".to_string(),
+ /// },
+ /// ],
+ /// input(),
+ /// )
+ /// .unwrap();
/// # }
/// ```
pub fn try_new<I, E>(expr: I, input: Arc<dyn ExecutionPlan>) ->
Result<Self>
diff --git a/datafusion/physical-plan/src/recursive_query.rs
b/datafusion/physical-plan/src/recursive_query.rs
index b4cdf2dff2..163f214444 100644
--- a/datafusion/physical-plan/src/recursive_query.rs
+++ b/datafusion/physical-plan/src/recursive_query.rs
@@ -247,7 +247,6 @@ impl DisplayAs for RecursiveQueryExec {
/// while batch := recursive_stream.next():
/// buffer.append(batch)
/// yield buffer
-///
struct RecursiveQueryStream {
/// The context to be used for managing handlers & executing new tasks
task_context: Arc<TaskContext>,
diff --git a/datafusion/physical-plan/src/repartition/mod.rs
b/datafusion/physical-plan/src/repartition/mod.rs
index 8174f71c31..2128304e07 100644
--- a/datafusion/physical-plan/src/repartition/mod.rs
+++ b/datafusion/physical-plan/src/repartition/mod.rs
@@ -476,10 +476,10 @@ impl BatchPartitioner {
/// │ │ │
/// │ │ │
/// │ │ │
-///┌───────────────┐ ┌───────────────┐ ┌───────────────┐
-///│ GroupBy │ │ GroupBy │ │ GroupBy │
-///│ (Partial) │ │ (Partial) │ │ (Partial) │
-///└───────────────┘ └───────────────┘ └───────────────┘
+/// ┌───────────────┐ ┌───────────────┐ ┌───────────────┐
+/// │ GroupBy │ │ GroupBy │ │ GroupBy │
+/// │ (Partial) │ │ (Partial) │ │ (Partial) │
+/// └───────────────┘ └───────────────┘ └───────────────┘
/// ▲ ▲ ▲
/// └──────────────────┼──────────────────┘
/// │
@@ -498,7 +498,7 @@ impl BatchPartitioner {
/// ╲ ╱ ╲ ╱
/// '─. ,─' '─. ,─'
/// `───────' `───────'
-///```
+/// ```
///
/// # Error Handling
///
@@ -2158,7 +2158,6 @@ mod test {
///
/// `$EXPECTED_PLAN_LINES`: input plan
/// `$PLAN`: the plan to optimized
- ///
macro_rules! assert_plan {
($PLAN: expr, @ $EXPECTED: expr) => {
let formatted = crate::displayable($PLAN).indent(true).to_string();
diff --git a/datafusion/physical-plan/src/sorts/merge.rs
b/datafusion/physical-plan/src/sorts/merge.rs
index 0b0136cd12..720a3e53e4 100644
--- a/datafusion/physical-plan/src/sorts/merge.rs
+++ b/datafusion/physical-plan/src/sorts/merge.rs
@@ -390,7 +390,6 @@ impl<C: CursorValues> SortPreservingMergeStream<C> {
///
/// Zooming in at node 2 in the loser tree as an example, we can see that
/// it takes as input the next item at (S0) and the loser of (S3, S4).
- ///
#[inline]
fn lt_leaf_node_index(&self, cursor_index: usize) -> usize {
(self.cursors.len() + cursor_index) / 2
diff --git a/datafusion/physical-plan/src/sorts/multi_level_merge.rs
b/datafusion/physical-plan/src/sorts/multi_level_merge.rs
index 58d046cc90..6e7a5e7a72 100644
--- a/datafusion/physical-plan/src/sorts/multi_level_merge.rs
+++ b/datafusion/physical-plan/src/sorts/multi_level_merge.rs
@@ -125,7 +125,6 @@ use futures::{Stream, StreamExt};
/// available during merge operations.
/// 2. **Adaptive Buffer Sizing**: Reduces buffer sizes when memory is
constrained
/// 3. **Spill-to-Disk**: Spill to disk when we cannot merge all files in
memory
-///
pub(crate) struct MultiLevelMergeBuilder {
spill_manager: SpillManager,
schema: SchemaRef,
diff --git a/datafusion/physical-plan/src/sorts/partial_sort.rs
b/datafusion/physical-plan/src/sorts/partial_sort.rs
index 513081e627..7a623b0c30 100644
--- a/datafusion/physical-plan/src/sorts/partial_sort.rs
+++ b/datafusion/physical-plan/src/sorts/partial_sort.rs
@@ -32,7 +32,7 @@
//! | 0 | 1 | 1 |
//! | 0 | 2 | 0 |
//! +---+---+---+
-//!```
+//! ```
//!
//! and required ordering for the plan is `a ASC, b ASC, d ASC`.
//! The first 3 rows(segment) can be sorted as the segment already
@@ -46,7 +46,7 @@
//! +---+---+---+
//! | 0 | 2 | 4 |
//! +---+---+---+
-//!```
+//! ```
//!
//! The plan concats incoming data with such last rows of previous input
//! and continues partial sorting of the segments.
diff --git a/datafusion/physical-plan/src/sorts/sort.rs
b/datafusion/physical-plan/src/sorts/sort.rs
index bd798ab4f5..a95fad19f6 100644
--- a/datafusion/physical-plan/src/sorts/sort.rs
+++ b/datafusion/physical-plan/src/sorts/sort.rs
@@ -133,7 +133,6 @@ impl ExternalSorterMetrics {
/// └─────┘
///
/// in_mem_batches
-///
/// ```
///
/// # When data does not fit in available memory
diff --git a/datafusion/physical-plan/src/stream.rs
b/datafusion/physical-plan/src/stream.rs
index 100a6a7ffc..480b723d0b 100644
--- a/datafusion/physical-plan/src/stream.rs
+++ b/datafusion/physical-plan/src/stream.rs
@@ -207,7 +207,9 @@ impl<O: Send + 'static> ReceiverStreamBuilder<O> {
/// let schema_1 = Arc::clone(&schema);
/// builder.spawn(async move {
/// // Your task needs to send batches to the tx
-/// tx_1.send(Ok(RecordBatch::new_empty(schema_1))).await.unwrap();
+/// tx_1.send(Ok(RecordBatch::new_empty(schema_1)))
+/// .await
+/// .unwrap();
///
/// Ok(())
/// });
@@ -217,7 +219,9 @@ impl<O: Send + 'static> ReceiverStreamBuilder<O> {
/// let schema_2 = Arc::clone(&schema);
/// builder.spawn(async move {
/// // Your task needs to send batches to the tx
-/// tx_2.send(Ok(RecordBatch::new_empty(schema_2))).await.unwrap();
+/// tx_2.send(Ok(RecordBatch::new_empty(schema_2)))
+/// .await
+/// .unwrap();
///
/// Ok(())
/// });
@@ -417,9 +421,10 @@ impl<S> RecordBatchStreamAdapter<S> {
/// # use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
/// // Create stream of Result<RecordBatch>
/// let batch = record_batch!(
- /// ("a", Int32, [1, 2, 3]),
- /// ("b", Float64, [Some(4.0), None, Some(5.0)])
- /// ).expect("created batch");
+ /// ("a", Int32, [1, 2, 3]),
+ /// ("b", Float64, [Some(4.0), None, Some(5.0)])
+ /// )
+ /// .expect("created batch");
/// let schema = batch.schema();
/// let stream = futures::stream::iter(vec![Ok(batch)]);
/// // Convert the stream to a SendableRecordBatchStream
diff --git a/datafusion/physical-plan/src/test/exec.rs
b/datafusion/physical-plan/src/test/exec.rs
index 12ffca871f..b720181b27 100644
--- a/datafusion/physical-plan/src/test/exec.rs
+++ b/datafusion/physical-plan/src/test/exec.rs
@@ -291,7 +291,6 @@ fn clone_error(e: &DataFusionError) -> DataFusionError {
/// A Mock ExecutionPlan that does not start producing input until a
/// barrier is called
-///
#[derive(Debug)]
pub struct BarrierExec {
/// partitions to send back
diff --git a/datafusion/physical-plan/src/union.rs
b/datafusion/physical-plan/src/union.rs
index 164f17edeb..c95678dac9 100644
--- a/datafusion/physical-plan/src/union.rs
+++ b/datafusion/physical-plan/src/union.rs
@@ -67,14 +67,14 @@ use tokio::macros::support::thread_rng_n;
/// partitions, and then next `M` output partitions are from Input 2.
///
/// ```text
-/// ▲ ▲ ▲ ▲
-/// │ │ │ │
-/// Output │ ... │ │ │
-/// Partitions │0 │N-1 │ N │N+M-1
-///(passes through ┌────┴───────┴───────────┴─────────┴───┐
-/// the N+M input │ UnionExec │
-/// partitions) │ │
-/// └──────────────────────────────────────┘
+/// ▲ ▲ ▲ ▲
+/// │ │ │ │
+/// Output │ ... │ │ │
+/// Partitions │0 │N-1 │ N │N+M-1
+/// (passes through ┌────┴───────┴───────────┴─────────┴───┐
+/// the N+M input │ UnionExec │
+/// partitions) │ │
+/// └──────────────────────────────────────┘
/// ▲
/// │
/// │
diff --git a/datafusion/physical-plan/src/unnest.rs
b/datafusion/physical-plan/src/unnest.rs
index 026a7fbcd0..7212c76413 100644
--- a/datafusion/physical-plan/src/unnest.rs
+++ b/datafusion/physical-plan/src/unnest.rs
@@ -760,7 +760,6 @@ fn build_batch(
/// ```ignore
/// longest_length: [3, 1, 1, 2]
/// ```
-///
fn find_longest_length(
list_arrays: &[ArrayRef],
options: &UnnestOptions,
@@ -881,7 +880,6 @@ fn unnest_list_arrays(
/// ```ignore
/// [1, null, 2, 3, 4, null, null, 5, null, null]
/// ```
-///
fn unnest_list_array(
list_array: &dyn ListArrayType,
length_array: &PrimitiveArray<Int64Type>,
@@ -929,7 +927,6 @@ fn unnest_list_array(
/// ```ignore
/// [0, 0, 1, 1, 1, 2]
/// ```
-///
fn create_take_indices(
length_array: &PrimitiveArray<Int64Type>,
capacity: usize,
@@ -994,7 +991,6 @@ fn create_take_indices(
/// ```ignore
/// c1: 1, null, 2, 3, 4, null, 5, 6 // Repeated using `indices`
/// c2: null, null, null, null, null, null, null, null // Replaced with nulls
-///
fn repeat_arrs_from_indices(
batch: &[ArrayRef],
indices: &PrimitiveArray<Int64Type>,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]