This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new d610a9fc77 Set DisplayAs to be a supertrait of ExecutionPlan (#6835)
d610a9fc77 is described below

commit d610a9fc771d4c20b269d4702c0d6a2b6618d713
Author: Kirill Zaborsky <[email protected]>
AuthorDate: Thu Jul 6 23:34:55 2023 +0300

    Set DisplayAs to be a supertrait of ExecutionPlan (#6835)
    
    Follow-up to https://github.com/apache/arrow-datafusion/pull/6711
---
 datafusion-examples/examples/custom_datasource.rs  |  11 +-
 .../src/datasource/physical_plan/arrow_file.rs     |  20 +--
 .../core/src/datasource/physical_plan/avro.rs      |  20 +--
 .../core/src/datasource/physical_plan/csv.rs       |  22 +--
 .../core/src/datasource/physical_plan/json.rs      |  20 +--
 .../core/src/datasource/physical_plan/parquet.rs   |  54 +++----
 .../core/src/physical_optimizer/repartition.rs     |  20 +--
 .../core/src/physical_plan/aggregates/mod.rs       | 166 +++++++++++---------
 datafusion/core/src/physical_plan/analyze.rs       |  28 ++--
 .../core/src/physical_plan/coalesce_batches.rs     |  35 +++--
 .../core/src/physical_plan/coalesce_partitions.rs  |  28 ++--
 datafusion/core/src/physical_plan/empty.rs         |  28 ++--
 datafusion/core/src/physical_plan/explain.rs       |  27 ++--
 datafusion/core/src/physical_plan/filter.rs        |  30 ++--
 datafusion/core/src/physical_plan/insert.rs        |  28 ++--
 .../core/src/physical_plan/joins/cross_join.rs     |  27 ++--
 .../core/src/physical_plan/joins/hash_join.rs      |  47 +++---
 .../src/physical_plan/joins/nested_loop_join.rs    |  38 ++---
 .../src/physical_plan/joins/sort_merge_join.rs     |  43 +++---
 .../src/physical_plan/joins/symmetric_hash_join.rs |  47 +++---
 datafusion/core/src/physical_plan/limit.rs         |  63 ++++----
 datafusion/core/src/physical_plan/memory.rs        |  42 ++---
 datafusion/core/src/physical_plan/mod.rs           |  12 +-
 datafusion/core/src/physical_plan/projection.rs    |  54 +++----
 .../core/src/physical_plan/repartition/mod.rs      |  40 ++---
 datafusion/core/src/physical_plan/sorts/sort.rs    |  42 ++---
 .../physical_plan/sorts/sort_preserving_merge.rs   |  39 ++---
 datafusion/core/src/physical_plan/streaming.rs     |  16 ++
 datafusion/core/src/physical_plan/union.rs         |  53 ++++---
 datafusion/core/src/physical_plan/unnest.rs        |  28 ++--
 datafusion/core/src/physical_plan/values.rs        |  28 ++--
 .../windows/bounded_window_agg_exec.rs             |  60 ++++----
 .../src/physical_plan/windows/window_agg_exec.rs   |  56 +++----
 datafusion/core/src/physical_planner.rs            |  20 +--
 datafusion/core/src/test/exec.rs                   | 169 +++++++++++----------
 datafusion/core/src/test_util/mod.rs               |  37 ++---
 datafusion/core/tests/custom_sources.rs            |  30 ++--
 .../provider_filter_pushdown.rs                    |  29 ++--
 .../core/tests/custom_sources_cases/statistics.rs  |  38 ++---
 .../core/tests/user_defined/user_defined_plan.rs   |  31 ++--
 40 files changed, 880 insertions(+), 746 deletions(-)

diff --git a/datafusion-examples/examples/custom_datasource.rs 
b/datafusion-examples/examples/custom_datasource.rs
index c426d9611c..a24573c860 100644
--- a/datafusion-examples/examples/custom_datasource.rs
+++ b/datafusion-examples/examples/custom_datasource.rs
@@ -27,13 +27,14 @@ use datafusion::execution::context::{SessionState, 
TaskContext};
 use datafusion::physical_plan::expressions::PhysicalSortExpr;
 use datafusion::physical_plan::memory::MemoryStream;
 use datafusion::physical_plan::{
-    project_schema, ExecutionPlan, SendableRecordBatchStream, Statistics,
+    project_schema, DisplayAs, DisplayFormatType, ExecutionPlan,
+    SendableRecordBatchStream, Statistics,
 };
 use datafusion::prelude::*;
 use datafusion_expr::{Expr, LogicalPlanBuilder};
 use std::any::Any;
 use std::collections::{BTreeMap, HashMap};
-use std::fmt::{Debug, Formatter};
+use std::fmt::{self, Debug, Formatter};
 use std::sync::{Arc, Mutex};
 use std::time::Duration;
 use tokio::time::timeout;
@@ -204,6 +205,12 @@ impl CustomExec {
     }
 }
 
+impl DisplayAs for CustomExec {
+    fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> 
std::fmt::Result {
+        write!(f, "CustomExec")
+    }
+}
+
 impl ExecutionPlan for CustomExec {
     fn as_any(&self) -> &dyn Any {
         self
diff --git a/datafusion/core/src/datasource/physical_plan/arrow_file.rs 
b/datafusion/core/src/datasource/physical_plan/arrow_file.rs
index 0003ffea8c..b497ef7f2b 100644
--- a/datafusion/core/src/datasource/physical_plan/arrow_file.rs
+++ b/datafusion/core/src/datasource/physical_plan/arrow_file.rs
@@ -68,6 +68,17 @@ impl ArrowExec {
     }
 }
 
+impl DisplayAs for ArrowExec {
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        write!(f, "ArrowExec: ")?;
+        self.base_config.fmt_as(t, f)
+    }
+}
+
 impl ExecutionPlan for ArrowExec {
     fn as_any(&self) -> &dyn Any {
         self
@@ -132,15 +143,6 @@ impl ExecutionPlan for ArrowExec {
         Some(self.metrics.clone_inner())
     }
 
-    fn fmt_as(
-        &self,
-        t: DisplayFormatType,
-        f: &mut std::fmt::Formatter,
-    ) -> std::fmt::Result {
-        write!(f, "ArrowExec: ")?;
-        self.base_config.fmt_as(t, f)
-    }
-
     fn statistics(&self) -> Statistics {
         self.projected_statistics.clone()
     }
diff --git a/datafusion/core/src/datasource/physical_plan/avro.rs 
b/datafusion/core/src/datasource/physical_plan/avro.rs
index df82ccca29..ec2f4db263 100644
--- a/datafusion/core/src/datasource/physical_plan/avro.rs
+++ b/datafusion/core/src/datasource/physical_plan/avro.rs
@@ -65,6 +65,17 @@ impl AvroExec {
     }
 }
 
+impl DisplayAs for AvroExec {
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        write!(f, "AvroExec: ")?;
+        self.base_config.fmt_as(t, f)
+    }
+}
+
 impl ExecutionPlan for AvroExec {
     fn as_any(&self) -> &dyn Any {
         self
@@ -141,15 +152,6 @@ impl ExecutionPlan for AvroExec {
         Ok(Box::pin(stream))
     }
 
-    fn fmt_as(
-        &self,
-        t: DisplayFormatType,
-        f: &mut std::fmt::Formatter,
-    ) -> std::fmt::Result {
-        write!(f, "AvroExec: ")?;
-        self.base_config.fmt_as(t, f)
-    }
-
     fn statistics(&self) -> Statistics {
         self.projected_statistics.clone()
     }
diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs 
b/datafusion/core/src/datasource/physical_plan/csv.rs
index eba51615cd..e27da2b41e 100644
--- a/datafusion/core/src/datasource/physical_plan/csv.rs
+++ b/datafusion/core/src/datasource/physical_plan/csv.rs
@@ -98,6 +98,18 @@ impl CsvExec {
     }
 }
 
+impl DisplayAs for CsvExec {
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        write!(f, "CsvExec: ")?;
+        self.base_config.fmt_as(t, f)?;
+        write!(f, ", has_header={}", self.has_header)
+    }
+}
+
 impl ExecutionPlan for CsvExec {
     /// Return a reference to Any that can be used for downcasting
     fn as_any(&self) -> &dyn Any {
@@ -171,16 +183,6 @@ impl ExecutionPlan for CsvExec {
         Ok(Box::pin(stream) as SendableRecordBatchStream)
     }
 
-    fn fmt_as(
-        &self,
-        t: DisplayFormatType,
-        f: &mut std::fmt::Formatter,
-    ) -> std::fmt::Result {
-        write!(f, "CsvExec: ")?;
-        self.base_config.fmt_as(t, f)?;
-        write!(f, ", has_header={}", self.has_header)
-    }
-
     fn statistics(&self) -> Statistics {
         self.projected_statistics.clone()
     }
diff --git a/datafusion/core/src/datasource/physical_plan/json.rs 
b/datafusion/core/src/datasource/physical_plan/json.rs
index 64f7077660..e9082b8084 100644
--- a/datafusion/core/src/datasource/physical_plan/json.rs
+++ b/datafusion/core/src/datasource/physical_plan/json.rs
@@ -84,6 +84,17 @@ impl NdJsonExec {
     }
 }
 
+impl DisplayAs for NdJsonExec {
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        write!(f, "JsonExec: ")?;
+        self.base_config.fmt_as(t, f)
+    }
+}
+
 impl ExecutionPlan for NdJsonExec {
     fn as_any(&self) -> &dyn Any {
         self
@@ -149,15 +160,6 @@ impl ExecutionPlan for NdJsonExec {
         Ok(Box::pin(stream) as SendableRecordBatchStream)
     }
 
-    fn fmt_as(
-        &self,
-        t: DisplayFormatType,
-        f: &mut std::fmt::Formatter,
-    ) -> std::fmt::Result {
-        write!(f, "JsonExec: ")?;
-        self.base_config.fmt_as(t, f)
-    }
-
     fn statistics(&self) -> Statistics {
         self.projected_statistics.clone()
     }
diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs 
b/datafusion/core/src/datasource/physical_plan/parquet.rs
index 96e5ce9fa0..d8df3941f5 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet.rs
@@ -325,6 +325,34 @@ impl ParquetExec {
     }
 }
 
+impl DisplayAs for ParquetExec {
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                let predicate_string = self
+                    .predicate
+                    .as_ref()
+                    .map(|p| format!(", predicate={p}"))
+                    .unwrap_or_default();
+
+                let pruning_predicate_string = self
+                    .pruning_predicate
+                    .as_ref()
+                    .map(|pre| format!(", pruning_predicate={}", 
pre.predicate_expr()))
+                    .unwrap_or_default();
+
+                write!(f, "ParquetExec: ")?;
+                self.base_config.fmt_as(t, f)?;
+                write!(f, "{}{}", predicate_string, pruning_predicate_string,)
+            }
+        }
+    }
+}
+
 impl ExecutionPlan for ParquetExec {
     /// Return a reference to Any that can be used for downcasting
     fn as_any(&self) -> &dyn Any {
@@ -413,32 +441,6 @@ impl ExecutionPlan for ParquetExec {
         Ok(Box::pin(stream))
     }
 
-    fn fmt_as(
-        &self,
-        t: DisplayFormatType,
-        f: &mut std::fmt::Formatter,
-    ) -> std::fmt::Result {
-        match t {
-            DisplayFormatType::Default | DisplayFormatType::Verbose => {
-                let predicate_string = self
-                    .predicate
-                    .as_ref()
-                    .map(|p| format!(", predicate={p}"))
-                    .unwrap_or_default();
-
-                let pruning_predicate_string = self
-                    .pruning_predicate
-                    .as_ref()
-                    .map(|pre| format!(", pruning_predicate={}", 
pre.predicate_expr()))
-                    .unwrap_or_default();
-
-                write!(f, "ParquetExec: ")?;
-                self.base_config.fmt_as(t, f)?;
-                write!(f, "{}{}", predicate_string, pruning_predicate_string,)
-            }
-        }
-    }
-
     fn metrics(&self) -> Option<MetricsSet> {
         Some(self.metrics.clone_inner())
     }
diff --git a/datafusion/core/src/physical_optimizer/repartition.rs 
b/datafusion/core/src/physical_optimizer/repartition.rs
index 33ebcec6a6..f941912586 100644
--- a/datafusion/core/src/physical_optimizer/repartition.rs
+++ b/datafusion/core/src/physical_optimizer/repartition.rs
@@ -338,7 +338,7 @@ mod tests {
     use crate::physical_plan::sorts::sort::SortExec;
     use 
crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
     use crate::physical_plan::union::UnionExec;
-    use crate::physical_plan::{displayable, DisplayFormatType, Statistics};
+    use crate::physical_plan::{displayable, DisplayAs, DisplayFormatType, 
Statistics};
     use datafusion_physical_expr::PhysicalSortRequirement;
 
     fn schema() -> SchemaRef {
@@ -1136,6 +1136,16 @@ mod tests {
         }
     }
 
+    impl DisplayAs for SortRequiredExec {
+        fn fmt_as(
+            &self,
+            _t: DisplayFormatType,
+            f: &mut std::fmt::Formatter,
+        ) -> std::fmt::Result {
+            write!(f, "SortRequiredExec")
+        }
+    }
+
     impl ExecutionPlan for SortRequiredExec {
         fn as_any(&self) -> &dyn std::any::Any {
             self
@@ -1184,13 +1194,5 @@ mod tests {
         fn statistics(&self) -> Statistics {
             self.input.statistics()
         }
-
-        fn fmt_as(
-            &self,
-            _t: DisplayFormatType,
-            f: &mut std::fmt::Formatter,
-        ) -> std::fmt::Result {
-            write!(f, "SortRequiredExec")
-        }
     }
 }
diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs 
b/datafusion/core/src/physical_plan/aggregates/mod.rs
index 343f7628b7..4bf5f66445 100644
--- a/datafusion/core/src/physical_plan/aggregates/mod.rs
+++ b/datafusion/core/src/physical_plan/aggregates/mod.rs
@@ -58,6 +58,8 @@ use datafusion_physical_expr::utils::{
     get_finer_ordering, ordering_satisfy_requirement_concrete,
 };
 
+use super::DisplayAs;
+
 /// Hash aggregate modes
 #[derive(Debug, Copy, Clone, PartialEq, Eq)]
 pub enum AggregateMode {
@@ -719,6 +721,80 @@ impl AggregateExec {
     }
 }
 
+impl DisplayAs for AggregateExec {
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                write!(f, "AggregateExec: mode={:?}", self.mode)?;
+                let g: Vec<String> = if self.group_by.groups.len() == 1 {
+                    self.group_by
+                        .expr
+                        .iter()
+                        .map(|(e, alias)| {
+                            let e = e.to_string();
+                            if &e != alias {
+                                format!("{e} as {alias}")
+                            } else {
+                                e
+                            }
+                        })
+                        .collect()
+                } else {
+                    self.group_by
+                        .groups
+                        .iter()
+                        .map(|group| {
+                            let terms = group
+                                .iter()
+                                .enumerate()
+                                .map(|(idx, is_null)| {
+                                    if *is_null {
+                                        let (e, alias) = 
&self.group_by.null_expr[idx];
+                                        let e = e.to_string();
+                                        if &e != alias {
+                                            format!("{e} as {alias}")
+                                        } else {
+                                            e
+                                        }
+                                    } else {
+                                        let (e, alias) = 
&self.group_by.expr[idx];
+                                        let e = e.to_string();
+                                        if &e != alias {
+                                            format!("{e} as {alias}")
+                                        } else {
+                                            e
+                                        }
+                                    }
+                                })
+                                .collect::<Vec<String>>()
+                                .join(", ");
+                            format!("({terms})")
+                        })
+                        .collect()
+                };
+
+                write!(f, ", gby=[{}]", g.join(", "))?;
+
+                let a: Vec<String> = self
+                    .aggr_expr
+                    .iter()
+                    .map(|agg| agg.name().to_string())
+                    .collect();
+                write!(f, ", aggr=[{}]", a.join(", "))?;
+
+                if let Some(aggregation_ordering) = &self.aggregation_ordering 
{
+                    write!(f, ", ordering_mode={:?}", 
aggregation_ordering.mode)?;
+                }
+            }
+        }
+        Ok(())
+    }
+}
+
 impl ExecutionPlan for AggregateExec {
     /// Return a reference to Any that can be used for down-casting
     fn as_any(&self) -> &dyn Any {
@@ -838,78 +914,6 @@ impl ExecutionPlan for AggregateExec {
         Some(self.metrics.clone_inner())
     }
 
-    fn fmt_as(
-        &self,
-        t: DisplayFormatType,
-        f: &mut std::fmt::Formatter,
-    ) -> std::fmt::Result {
-        match t {
-            DisplayFormatType::Default | DisplayFormatType::Verbose => {
-                write!(f, "AggregateExec: mode={:?}", self.mode)?;
-                let g: Vec<String> = if self.group_by.groups.len() == 1 {
-                    self.group_by
-                        .expr
-                        .iter()
-                        .map(|(e, alias)| {
-                            let e = e.to_string();
-                            if &e != alias {
-                                format!("{e} as {alias}")
-                            } else {
-                                e
-                            }
-                        })
-                        .collect()
-                } else {
-                    self.group_by
-                        .groups
-                        .iter()
-                        .map(|group| {
-                            let terms = group
-                                .iter()
-                                .enumerate()
-                                .map(|(idx, is_null)| {
-                                    if *is_null {
-                                        let (e, alias) = 
&self.group_by.null_expr[idx];
-                                        let e = e.to_string();
-                                        if &e != alias {
-                                            format!("{e} as {alias}")
-                                        } else {
-                                            e
-                                        }
-                                    } else {
-                                        let (e, alias) = 
&self.group_by.expr[idx];
-                                        let e = e.to_string();
-                                        if &e != alias {
-                                            format!("{e} as {alias}")
-                                        } else {
-                                            e
-                                        }
-                                    }
-                                })
-                                .collect::<Vec<String>>()
-                                .join(", ");
-                            format!("({terms})")
-                        })
-                        .collect()
-                };
-
-                write!(f, ", gby=[{}]", g.join(", "))?;
-
-                let a: Vec<String> = self
-                    .aggr_expr
-                    .iter()
-                    .map(|agg| agg.name().to_string())
-                    .collect();
-                write!(f, ", aggr=[{}]", a.join(", "))?;
-
-                if let Some(aggregation_ordering) = &self.aggregation_ordering 
{
-                    write!(f, ", ordering_mode={:?}", 
aggregation_ordering.mode)?;
-                }
-            }
-        }
-        Ok(())
-    }
-
     fn statistics(&self) -> Statistics {
         // TODO stats: group expressions:
         // - once expressions will be able to compute their own stats, use it 
here
@@ -1245,8 +1249,8 @@ mod tests {
     };
     use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
     use crate::physical_plan::{
-        ExecutionPlan, Partitioning, RecordBatchStream, 
SendableRecordBatchStream,
-        Statistics,
+        DisplayAs, ExecutionPlan, Partitioning, RecordBatchStream,
+        SendableRecordBatchStream, Statistics,
     };
     use crate::prelude::SessionContext;
 
@@ -1579,6 +1583,20 @@ mod tests {
         pub yield_first: bool,
     }
 
+    impl DisplayAs for TestYieldingExec {
+        fn fmt_as(
+            &self,
+            t: DisplayFormatType,
+            f: &mut std::fmt::Formatter,
+        ) -> std::fmt::Result {
+            match t {
+                DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                    write!(f, "TestYieldingExec")
+                }
+            }
+        }
+    }
+
     impl ExecutionPlan for TestYieldingExec {
         fn as_any(&self) -> &dyn Any {
             self
diff --git a/datafusion/core/src/physical_plan/analyze.rs 
b/datafusion/core/src/physical_plan/analyze.rs
index 90b7f4c02f..bba1c37e6d 100644
--- a/datafusion/core/src/physical_plan/analyze.rs
+++ b/datafusion/core/src/physical_plan/analyze.rs
@@ -30,7 +30,7 @@ use futures::StreamExt;
 
 use super::expressions::PhysicalSortExpr;
 use super::stream::{RecordBatchReceiverStream, RecordBatchStreamAdapter};
-use super::{Distribution, SendableRecordBatchStream};
+use super::{DisplayAs, Distribution, SendableRecordBatchStream};
 use datafusion_execution::TaskContext;
 
 /// `EXPLAIN ANALYZE` execution plan operator. This operator runs its input,
@@ -56,6 +56,20 @@ impl AnalyzeExec {
     }
 }
 
+impl DisplayAs for AnalyzeExec {
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                write!(f, "AnalyzeExec verbose={}", self.verbose)
+            }
+        }
+    }
+}
+
 impl ExecutionPlan for AnalyzeExec {
     /// Return a reference to Any that can be used for downcasting
     fn as_any(&self) -> &dyn Any {
@@ -158,18 +172,6 @@ impl ExecutionPlan for AnalyzeExec {
         )))
     }
 
-    fn fmt_as(
-        &self,
-        t: DisplayFormatType,
-        f: &mut std::fmt::Formatter,
-    ) -> std::fmt::Result {
-        match t {
-            DisplayFormatType::Default | DisplayFormatType::Verbose => {
-                write!(f, "AnalyzeExec verbose={}", self.verbose)
-            }
-        }
-    }
-
     fn statistics(&self) -> Statistics {
         // Statistics an an ANALYZE plan are not relevant
         Statistics::default()
diff --git a/datafusion/core/src/physical_plan/coalesce_batches.rs 
b/datafusion/core/src/physical_plan/coalesce_batches.rs
index 0064f353e9..9454a36af7 100644
--- a/datafusion/core/src/physical_plan/coalesce_batches.rs
+++ b/datafusion/core/src/physical_plan/coalesce_batches.rs
@@ -38,6 +38,7 @@ use log::trace;
 
 use super::expressions::PhysicalSortExpr;
 use super::metrics::{BaselineMetrics, MetricsSet};
+use super::DisplayAs;
 use super::{metrics::ExecutionPlanMetricsSet, Statistics};
 
 /// CoalesceBatchesExec combines small batches into larger batches for more 
efficient use of
@@ -73,6 +74,24 @@ impl CoalesceBatchesExec {
     }
 }
 
+impl DisplayAs for CoalesceBatchesExec {
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                write!(
+                    f,
+                    "CoalesceBatchesExec: target_batch_size={}",
+                    self.target_batch_size
+                )
+            }
+        }
+    }
+}
+
 impl ExecutionPlan for CoalesceBatchesExec {
     /// Return a reference to Any that can be used for downcasting
     fn as_any(&self) -> &dyn Any {
@@ -141,22 +160,6 @@ impl ExecutionPlan for CoalesceBatchesExec {
         }))
     }
 
-    fn fmt_as(
-        &self,
-        t: DisplayFormatType,
-        f: &mut std::fmt::Formatter,
-    ) -> std::fmt::Result {
-        match t {
-            DisplayFormatType::Default | DisplayFormatType::Verbose => {
-                write!(
-                    f,
-                    "CoalesceBatchesExec: target_batch_size={}",
-                    self.target_batch_size
-                )
-            }
-        }
-    }
-
     fn metrics(&self) -> Option<MetricsSet> {
         Some(self.metrics.clone_inner())
     }
diff --git a/datafusion/core/src/physical_plan/coalesce_partitions.rs 
b/datafusion/core/src/physical_plan/coalesce_partitions.rs
index d04ffa576a..82f74a62bb 100644
--- a/datafusion/core/src/physical_plan/coalesce_partitions.rs
+++ b/datafusion/core/src/physical_plan/coalesce_partitions.rs
@@ -26,7 +26,7 @@ use arrow::datatypes::SchemaRef;
 use super::expressions::PhysicalSortExpr;
 use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
 use super::stream::{ObservedStream, RecordBatchReceiverStream};
-use super::Statistics;
+use super::{DisplayAs, Statistics};
 use crate::physical_plan::{
     DisplayFormatType, EquivalenceProperties, ExecutionPlan, Partitioning,
 };
@@ -60,6 +60,20 @@ impl CoalescePartitionsExec {
     }
 }
 
+impl DisplayAs for CoalescePartitionsExec {
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                write!(f, "CoalescePartitionsExec")
+            }
+        }
+    }
+}
+
 impl ExecutionPlan for CoalescePartitionsExec {
     /// Return a reference to Any that can be used for downcasting
     fn as_any(&self) -> &dyn Any {
@@ -146,18 +160,6 @@ impl ExecutionPlan for CoalescePartitionsExec {
         }
     }
 
-    fn fmt_as(
-        &self,
-        t: DisplayFormatType,
-        f: &mut std::fmt::Formatter,
-    ) -> std::fmt::Result {
-        match t {
-            DisplayFormatType::Default | DisplayFormatType::Verbose => {
-                write!(f, "CoalescePartitionsExec")
-            }
-        }
-    }
-
     fn metrics(&self) -> Option<MetricsSet> {
         Some(self.metrics.clone_inner())
     }
diff --git a/datafusion/core/src/physical_plan/empty.rs 
b/datafusion/core/src/physical_plan/empty.rs
index 4d1b1cffc0..17bfa0af3a 100644
--- a/datafusion/core/src/physical_plan/empty.rs
+++ b/datafusion/core/src/physical_plan/empty.rs
@@ -30,7 +30,7 @@ use datafusion_common::{DataFusionError, Result};
 use log::trace;
 
 use super::expressions::PhysicalSortExpr;
-use super::{common, SendableRecordBatchStream, Statistics};
+use super::{common, DisplayAs, SendableRecordBatchStream, Statistics};
 
 use datafusion_execution::TaskContext;
 
@@ -94,6 +94,20 @@ impl EmptyExec {
     }
 }
 
+impl DisplayAs for EmptyExec {
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                write!(f, "EmptyExec: produce_one_row={}", 
self.produce_one_row)
+            }
+        }
+    }
+}
+
 impl ExecutionPlan for EmptyExec {
     /// Return a reference to Any that can be used for downcasting
     fn as_any(&self) -> &dyn Any {
@@ -148,18 +162,6 @@ impl ExecutionPlan for EmptyExec {
         )?))
     }
 
-    fn fmt_as(
-        &self,
-        t: DisplayFormatType,
-        f: &mut std::fmt::Formatter,
-    ) -> std::fmt::Result {
-        match t {
-            DisplayFormatType::Default | DisplayFormatType::Verbose => {
-                write!(f, "EmptyExec: produce_one_row={}", 
self.produce_one_row)
-            }
-        }
-    }
-
     fn statistics(&self) -> Statistics {
         let batch = self
             .data()
diff --git a/datafusion/core/src/physical_plan/explain.rs 
b/datafusion/core/src/physical_plan/explain.rs
index 22b0817c33..d732e70f8a 100644
--- a/datafusion/core/src/physical_plan/explain.rs
+++ b/datafusion/core/src/physical_plan/explain.rs
@@ -28,6 +28,7 @@ use crate::physical_plan::{DisplayFormatType, ExecutionPlan, 
Partitioning, Stati
 use arrow::{array::StringBuilder, datatypes::SchemaRef, 
record_batch::RecordBatch};
 use log::trace;
 
+use super::DisplayAs;
 use super::{expressions::PhysicalSortExpr, SendableRecordBatchStream};
 use crate::physical_plan::stream::RecordBatchStreamAdapter;
 use datafusion_execution::TaskContext;
@@ -70,6 +71,20 @@ impl ExplainExec {
     }
 }
 
+impl DisplayAs for ExplainExec {
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                write!(f, "ExplainExec")
+            }
+        }
+    }
+}
+
 impl ExecutionPlan for ExplainExec {
     /// Return a reference to Any that can be used for downcasting
     fn as_any(&self) -> &dyn Any {
@@ -156,18 +171,6 @@ impl ExecutionPlan for ExplainExec {
         )))
     }
 
-    fn fmt_as(
-        &self,
-        t: DisplayFormatType,
-        f: &mut std::fmt::Formatter,
-    ) -> std::fmt::Result {
-        match t {
-            DisplayFormatType::Default | DisplayFormatType::Verbose => {
-                write!(f, "ExplainExec")
-            }
-        }
-    }
-
     fn statistics(&self) -> Statistics {
         // Statistics an EXPLAIN plan are not relevant
         Statistics::default()
diff --git a/datafusion/core/src/physical_plan/filter.rs 
b/datafusion/core/src/physical_plan/filter.rs
index d56f582028..d2cfc3ce8a 100644
--- a/datafusion/core/src/physical_plan/filter.rs
+++ b/datafusion/core/src/physical_plan/filter.rs
@@ -24,7 +24,9 @@ use std::sync::Arc;
 use std::task::{Context, Poll};
 
 use super::expressions::PhysicalSortExpr;
-use super::{ColumnStatistics, RecordBatchStream, SendableRecordBatchStream, 
Statistics};
+use super::{
+    ColumnStatistics, DisplayAs, RecordBatchStream, SendableRecordBatchStream, 
Statistics,
+};
 use crate::physical_plan::{
     metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet},
     Column, DisplayFormatType, EquivalenceProperties, ExecutionPlan, 
Partitioning,
@@ -85,6 +87,20 @@ impl FilterExec {
     }
 }
 
+impl DisplayAs for FilterExec {
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                write!(f, "FilterExec: {}", self.predicate)
+            }
+        }
+    }
+}
+
 impl ExecutionPlan for FilterExec {
     /// Return a reference to Any that can be used for downcasting
     fn as_any(&self) -> &dyn Any {
@@ -157,18 +173,6 @@ impl ExecutionPlan for FilterExec {
         }))
     }
 
-    fn fmt_as(
-        &self,
-        t: DisplayFormatType,
-        f: &mut std::fmt::Formatter,
-    ) -> std::fmt::Result {
-        match t {
-            DisplayFormatType::Default | DisplayFormatType::Verbose => {
-                write!(f, "FilterExec: {}", self.predicate)
-            }
-        }
-    }
-
     fn metrics(&self) -> Option<MetricsSet> {
         Some(self.metrics.clone_inner())
     }
diff --git a/datafusion/core/src/physical_plan/insert.rs 
b/datafusion/core/src/physical_plan/insert.rs
index 8bcb9bab83..8766b62e9a 100644
--- a/datafusion/core/src/physical_plan/insert.rs
+++ b/datafusion/core/src/physical_plan/insert.rs
@@ -138,6 +138,21 @@ impl InsertExec {
     }
 }
 
+impl DisplayAs for InsertExec {
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                write!(f, "InsertExec: sink=")?;
+                self.sink.fmt_as(t, f)
+            }
+        }
+    }
+}
+
 impl ExecutionPlan for InsertExec {
     /// Return a reference to Any that can be used for downcasting
     fn as_any(&self) -> &dyn Any {
@@ -232,19 +247,6 @@ impl ExecutionPlan for InsertExec {
         )))
     }
 
-    fn fmt_as(
-        &self,
-        t: DisplayFormatType,
-        f: &mut std::fmt::Formatter,
-    ) -> std::fmt::Result {
-        match t {
-            DisplayFormatType::Default | DisplayFormatType::Verbose => {
-                write!(f, "InsertExec: sink=")?;
-                self.sink.fmt_as(t, f)
-            }
-        }
-    }
-
     fn statistics(&self) -> Statistics {
         Statistics::default()
     }
diff --git a/datafusion/core/src/physical_plan/joins/cross_join.rs 
b/datafusion/core/src/physical_plan/joins/cross_join.rs
index f752d134a1..1ecbfbea95 100644
--- a/datafusion/core/src/physical_plan/joins/cross_join.rs
+++ b/datafusion/core/src/physical_plan/joins/cross_join.rs
@@ -26,6 +26,7 @@ use arrow::datatypes::{Fields, Schema, SchemaRef};
 use arrow::record_batch::RecordBatch;
 
 use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
+use crate::physical_plan::DisplayAs;
 use crate::physical_plan::{
     coalesce_batches::concat_batches, 
coalesce_partitions::CoalescePartitionsExec,
     ColumnStatistics, DisplayFormatType, Distribution, EquivalenceProperties,
@@ -139,6 +140,20 @@ async fn load_left_input(
     Ok((merged_batch, reservation))
 }
 
+impl DisplayAs for CrossJoinExec {
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                write!(f, "CrossJoinExec")
+            }
+        }
+    }
+}
+
 impl ExecutionPlan for CrossJoinExec {
     fn as_any(&self) -> &dyn Any {
         self
@@ -243,18 +258,6 @@ impl ExecutionPlan for CrossJoinExec {
         }))
     }
 
-    fn fmt_as(
-        &self,
-        t: DisplayFormatType,
-        f: &mut std::fmt::Formatter,
-    ) -> std::fmt::Result {
-        match t {
-            DisplayFormatType::Default | DisplayFormatType::Verbose => {
-                write!(f, "CrossJoinExec")
-            }
-        }
-    }
-
     fn statistics(&self) -> Statistics {
         stats_cartesian_product(
             self.left.statistics(),
diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs 
b/datafusion/core/src/physical_plan/joins/hash_join.rs
index a3c553c9b3..d258e7529a 100644
--- a/datafusion/core/src/physical_plan/joins/hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/hash_join.rs
@@ -60,6 +60,7 @@ use crate::physical_plan::joins::utils::{
     adjust_indices_by_join_type, apply_join_filter_to_indices, 
build_batch_from_indices,
     get_final_indices_from_bit_map, need_produce_result_in_final, JoinSide,
 };
+use crate::physical_plan::DisplayAs;
 use crate::physical_plan::{
     coalesce_batches::concat_batches,
     coalesce_partitions::CoalescePartitionsExec,
@@ -204,6 +205,30 @@ impl HashJoinExec {
     }
 }
 
+impl DisplayAs for HashJoinExec {
+    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> 
fmt::Result {
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                let display_filter = self.filter.as_ref().map_or_else(
+                    || "".to_string(),
+                    |f| format!(", filter={}", f.expression()),
+                );
+                let on = self
+                    .on
+                    .iter()
+                    .map(|(c1, c2)| format!("({}, {})", c1, c2))
+                    .collect::<Vec<String>>()
+                    .join(", ");
+                write!(
+                    f,
+                    "HashJoinExec: mode={:?}, join_type={:?}, on=[{}]{}",
+                    self.mode, self.join_type, on, display_filter
+                )
+            }
+        }
+    }
+}
+
 impl ExecutionPlan for HashJoinExec {
     fn as_any(&self) -> &dyn Any {
         self
@@ -420,28 +445,6 @@ impl ExecutionPlan for HashJoinExec {
         }))
     }
 
-    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> 
fmt::Result {
-        match t {
-            DisplayFormatType::Default | DisplayFormatType::Verbose => {
-                let display_filter = self.filter.as_ref().map_or_else(
-                    || "".to_string(),
-                    |f| format!(", filter={}", f.expression()),
-                );
-                let on = self
-                    .on
-                    .iter()
-                    .map(|(c1, c2)| format!("({}, {})", c1, c2))
-                    .collect::<Vec<String>>()
-                    .join(", ");
-                write!(
-                    f,
-                    "HashJoinExec: mode={:?}, join_type={:?}, on=[{}]{}",
-                    self.mode, self.join_type, on, display_filter
-                )
-            }
-        }
-    }
-
     fn metrics(&self) -> Option<MetricsSet> {
         Some(self.metrics.clone_inner())
     }
diff --git a/datafusion/core/src/physical_plan/joins/nested_loop_join.rs 
b/datafusion/core/src/physical_plan/joins/nested_loop_join.rs
index 6586456fd2..d5de88d933 100644
--- a/datafusion/core/src/physical_plan/joins/nested_loop_join.rs
+++ b/datafusion/core/src/physical_plan/joins/nested_loop_join.rs
@@ -29,8 +29,8 @@ use crate::physical_plan::joins::utils::{
 };
 use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
 use crate::physical_plan::{
-    DisplayFormatType, Distribution, ExecutionPlan, Partitioning, 
RecordBatchStream,
-    SendableRecordBatchStream,
+    DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
+    RecordBatchStream, SendableRecordBatchStream,
 };
 use arrow::array::{
     BooleanBufferBuilder, UInt32Array, UInt32Builder, UInt64Array, 
UInt64Builder,
@@ -120,6 +120,24 @@ impl NestedLoopJoinExec {
     }
 }
 
+impl DisplayAs for NestedLoopJoinExec {
+    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> 
std::fmt::Result {
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                let display_filter = self.filter.as_ref().map_or_else(
+                    || "".to_string(),
+                    |f| format!(", filter={}", f.expression()),
+                );
+                write!(
+                    f,
+                    "NestedLoopJoinExec: join_type={:?}{}",
+                    self.join_type, display_filter
+                )
+            }
+        }
+    }
+}
+
 impl ExecutionPlan for NestedLoopJoinExec {
     fn as_any(&self) -> &dyn Any {
         self
@@ -249,22 +267,6 @@ impl ExecutionPlan for NestedLoopJoinExec {
         }))
     }
 
-    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> 
std::fmt::Result {
-        match t {
-            DisplayFormatType::Default | DisplayFormatType::Verbose => {
-                let display_filter = self.filter.as_ref().map_or_else(
-                    || "".to_string(),
-                    |f| format!(", filter={}", f.expression()),
-                );
-                write!(
-                    f,
-                    "NestedLoopJoinExec: join_type={:?}{}",
-                    self.join_type, display_filter
-                )
-            }
-        }
-    }
-
     fn metrics(&self) -> Option<MetricsSet> {
         Some(self.metrics.clone_inner())
     }
diff --git a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs 
b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
index bc8c686670..2a628a6146 100644
--- a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
+++ b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
@@ -46,8 +46,9 @@ use crate::physical_plan::joins::utils::{
 };
 use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, 
MetricsSet};
 use crate::physical_plan::{
-    metrics, DisplayFormatType, Distribution, EquivalenceProperties, 
ExecutionPlan,
-    Partitioning, PhysicalExpr, RecordBatchStream, SendableRecordBatchStream, 
Statistics,
+    metrics, DisplayAs, DisplayFormatType, Distribution, EquivalenceProperties,
+    ExecutionPlan, Partitioning, PhysicalExpr, RecordBatchStream,
+    SendableRecordBatchStream, Statistics,
 };
 use datafusion_common::DataFusionError;
 use datafusion_common::JoinType;
@@ -200,6 +201,26 @@ impl SortMergeJoinExec {
     }
 }
 
+impl DisplayAs for SortMergeJoinExec {
+    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> 
std::fmt::Result {
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                let on = self
+                    .on
+                    .iter()
+                    .map(|(c1, c2)| format!("({}, {})", c1, c2))
+                    .collect::<Vec<String>>()
+                    .join(", ");
+                write!(
+                    f,
+                    "SortMergeJoin: join_type={:?}, on=[{}]",
+                    self.join_type, on
+                )
+            }
+        }
+    }
+}
+
 impl ExecutionPlan for SortMergeJoinExec {
     fn as_any(&self) -> &dyn Any {
         self
@@ -361,24 +382,6 @@ impl ExecutionPlan for SortMergeJoinExec {
         Some(self.metrics.clone_inner())
     }
 
-    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> 
std::fmt::Result {
-        match t {
-            DisplayFormatType::Default | DisplayFormatType::Verbose => {
-                let on = self
-                    .on
-                    .iter()
-                    .map(|(c1, c2)| format!("({}, {})", c1, c2))
-                    .collect::<Vec<String>>()
-                    .join(", ");
-                write!(
-                    f,
-                    "SortMergeJoin: join_type={:?}, on=[{}]",
-                    self.join_type, on
-                )
-            }
-        }
-    }
-
     fn statistics(&self) -> Statistics {
         // TODO stats: it is not possible in general to know the output size 
of joins
         // There are some special cases though, for example:
diff --git a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs 
b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
index b46aba2fb5..dfd38a20c0 100644
--- a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
@@ -57,6 +57,7 @@ use datafusion_physical_expr::intervals::{ExprIntervalGraph, 
Interval, IntervalB
 
 use crate::physical_plan::common::SharedMemoryReservation;
 use 
crate::physical_plan::joins::hash_join_utils::convert_sort_expr_with_filter_schema;
+use crate::physical_plan::DisplayAs;
 use crate::physical_plan::{
     expressions::Column,
     expressions::PhysicalSortExpr,
@@ -385,6 +386,30 @@ impl SymmetricHashJoinExec {
     }
 }
 
+impl DisplayAs for SymmetricHashJoinExec {
+    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> 
fmt::Result {
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                let display_filter = self.filter.as_ref().map_or_else(
+                    || "".to_string(),
+                    |f| format!(", filter={}", f.expression()),
+                );
+                let on = self
+                    .on
+                    .iter()
+                    .map(|(c1, c2)| format!("({}, {})", c1, c2))
+                    .collect::<Vec<String>>()
+                    .join(", ");
+                write!(
+                    f,
+                    "SymmetricHashJoinExec: join_type={:?}, on=[{}]{}",
+                    self.join_type, on, display_filter
+                )
+            }
+        }
+    }
+}
+
 impl ExecutionPlan for SymmetricHashJoinExec {
     fn as_any(&self) -> &dyn Any {
         self
@@ -460,28 +485,6 @@ impl ExecutionPlan for SymmetricHashJoinExec {
         )?))
     }
 
-    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> 
fmt::Result {
-        match t {
-            DisplayFormatType::Default | DisplayFormatType::Verbose => {
-                let display_filter = self.filter.as_ref().map_or_else(
-                    || "".to_string(),
-                    |f| format!(", filter={}", f.expression()),
-                );
-                let on = self
-                    .on
-                    .iter()
-                    .map(|(c1, c2)| format!("({}, {})", c1, c2))
-                    .collect::<Vec<String>>()
-                    .join(", ");
-                write!(
-                    f,
-                    "SymmetricHashJoinExec: join_type={:?}, on=[{}]{}",
-                    self.join_type, on, display_filter
-                )
-            }
-        }
-    }
-
     fn metrics(&self) -> Option<MetricsSet> {
         Some(self.metrics.clone_inner())
     }
diff --git a/datafusion/core/src/physical_plan/limit.rs 
b/datafusion/core/src/physical_plan/limit.rs
index d1948cc288..572cea006c 100644
--- a/datafusion/core/src/physical_plan/limit.rs
+++ b/datafusion/core/src/physical_plan/limit.rs
@@ -34,6 +34,7 @@ use arrow::record_batch::{RecordBatch, RecordBatchOptions};
 use datafusion_common::{DataFusionError, Result};
 
 use super::expressions::PhysicalSortExpr;
+use super::DisplayAs;
 use super::{
     metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet},
     RecordBatchStream, SendableRecordBatchStream, Statistics,
@@ -82,6 +83,25 @@ impl GlobalLimitExec {
     }
 }
 
+impl DisplayAs for GlobalLimitExec {
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                write!(
+                    f,
+                    "GlobalLimitExec: skip={}, fetch={}",
+                    self.skip,
+                    self.fetch.map_or("None".to_string(), |x| x.to_string())
+                )
+            }
+        }
+    }
+}
+
 impl ExecutionPlan for GlobalLimitExec {
     /// Return a reference to Any that can be used for downcasting
     fn as_any(&self) -> &dyn Any {
@@ -164,23 +184,6 @@ impl ExecutionPlan for GlobalLimitExec {
         )))
     }
 
-    fn fmt_as(
-        &self,
-        t: DisplayFormatType,
-        f: &mut std::fmt::Formatter,
-    ) -> std::fmt::Result {
-        match t {
-            DisplayFormatType::Default | DisplayFormatType::Verbose => {
-                write!(
-                    f,
-                    "GlobalLimitExec: skip={}, fetch={}",
-                    self.skip,
-                    self.fetch.map_or("None".to_string(), |x| x.to_string())
-                )
-            }
-        }
-    }
-
     fn metrics(&self) -> Option<MetricsSet> {
         Some(self.metrics.clone_inner())
     }
@@ -265,6 +268,20 @@ impl LocalLimitExec {
     }
 }
 
+impl DisplayAs for LocalLimitExec {
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                write!(f, "LocalLimitExec: fetch={}", self.fetch)
+            }
+        }
+    }
+}
+
 impl ExecutionPlan for LocalLimitExec {
     /// Return a reference to Any that can be used for downcasting
     fn as_any(&self) -> &dyn Any {
@@ -331,18 +348,6 @@ impl ExecutionPlan for LocalLimitExec {
         )))
     }
 
-    fn fmt_as(
-        &self,
-        t: DisplayFormatType,
-        f: &mut std::fmt::Formatter,
-    ) -> std::fmt::Result {
-        match t {
-            DisplayFormatType::Default | DisplayFormatType::Verbose => {
-                write!(f, "LocalLimitExec: fetch={}", self.fetch)
-            }
-        }
-    }
-
     fn metrics(&self) -> Option<MetricsSet> {
         Some(self.metrics.clone_inner())
     }
diff --git a/datafusion/core/src/physical_plan/memory.rs 
b/datafusion/core/src/physical_plan/memory.rs
index fa456d15ee..16c22a3b2d 100644
--- a/datafusion/core/src/physical_plan/memory.rs
+++ b/datafusion/core/src/physical_plan/memory.rs
@@ -19,7 +19,7 @@
 
 use super::expressions::PhysicalSortExpr;
 use super::{
-    common, project_schema, DisplayFormatType, ExecutionPlan, Partitioning,
+    common, project_schema, DisplayAs, DisplayFormatType, ExecutionPlan, 
Partitioning,
     RecordBatchStream, SendableRecordBatchStream, Statistics,
 };
 use arrow::datatypes::SchemaRef;
@@ -56,6 +56,27 @@ impl fmt::Debug for MemoryExec {
     }
 }
 
+impl DisplayAs for MemoryExec {
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                let partitions: Vec<_> =
+                    self.partitions.iter().map(|b| b.len()).collect();
+                write!(
+                    f,
+                    "MemoryExec: partitions={}, partition_sizes={:?}",
+                    partitions.len(),
+                    partitions
+                )
+            }
+        }
+    }
+}
+
 impl ExecutionPlan for MemoryExec {
     /// Return a reference to Any that can be used for downcasting
     fn as_any(&self) -> &dyn Any {
@@ -102,25 +123,6 @@ impl ExecutionPlan for MemoryExec {
         )?))
     }
 
-    fn fmt_as(
-        &self,
-        t: DisplayFormatType,
-        f: &mut std::fmt::Formatter,
-    ) -> std::fmt::Result {
-        match t {
-            DisplayFormatType::Default | DisplayFormatType::Verbose => {
-                let partitions: Vec<_> =
-                    self.partitions.iter().map(|b| b.len()).collect();
-                write!(
-                    f,
-                    "MemoryExec: partitions={}, partition_sizes={:?}",
-                    partitions.len(),
-                    partitions
-                )
-            }
-        }
-    }
-
     /// We recompute the statistics dynamically from the arrow metadata as it 
is pretty cheap to do so
     fn statistics(&self) -> Statistics {
         common::compute_record_batch_statistics(
diff --git a/datafusion/core/src/physical_plan/mod.rs 
b/datafusion/core/src/physical_plan/mod.rs
index 7efd5a19ee..4e1de2a6cb 100644
--- a/datafusion/core/src/physical_plan/mod.rs
+++ b/datafusion/core/src/physical_plan/mod.rs
@@ -99,7 +99,7 @@ impl Stream for EmptyRecordBatchStream {
 /// [`ExecutionPlan`] can be displayed in a simplified form using the
 /// return value from [`displayable`] in addition to the (normally
 /// quite verbose) `Debug` output.
-pub trait ExecutionPlan: Debug + Send + Sync {
+pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
     /// Returns the execution plan as [`Any`](std::any::Any) so that it can be
     /// downcast to a specific implementation.
     fn as_any(&self) -> &dyn Any;
@@ -225,16 +225,6 @@ pub trait ExecutionPlan: Debug + Send + Sync {
         None
     }
 
-    /// Format this `ExecutionPlan` to `f` in the specified type.
-    ///
-    /// Should not include a newline
-    ///
-    /// Note this function prints a placeholder by default to preserve
-    /// backwards compatibility.
-    fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> 
fmt::Result {
-        write!(f, "ExecutionPlan(PlaceHolder)")
-    }
-
     /// Returns the global output statistics for this `ExecutionPlan` node.
     fn statistics(&self) -> Statistics;
 }
diff --git a/datafusion/core/src/physical_plan/projection.rs 
b/datafusion/core/src/physical_plan/projection.rs
index d1e1a4f9db..c6845c7afe 100644
--- a/datafusion/core/src/physical_plan/projection.rs
+++ b/datafusion/core/src/physical_plan/projection.rs
@@ -39,7 +39,7 @@ use log::trace;
 
 use super::expressions::{Column, PhysicalSortExpr};
 use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
-use super::{RecordBatchStream, SendableRecordBatchStream, Statistics};
+use super::{DisplayAs, RecordBatchStream, SendableRecordBatchStream, 
Statistics};
 
 use datafusion_physical_expr::{
     normalize_out_expr_with_columns_map, project_equivalence_properties,
@@ -156,6 +156,33 @@ impl ProjectionExec {
     }
 }
 
+impl DisplayAs for ProjectionExec {
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                let expr: Vec<String> = self
+                    .expr
+                    .iter()
+                    .map(|(e, alias)| {
+                        let e = e.to_string();
+                        if &e != alias {
+                            format!("{e} as {alias}")
+                        } else {
+                            e
+                        }
+                    })
+                    .collect();
+
+                write!(f, "ProjectionExec: expr=[{}]", expr.join(", "))
+            }
+        }
+    }
+}
+
 impl ExecutionPlan for ProjectionExec {
     /// Return a reference to Any that can be used for downcasting
     fn as_any(&self) -> &dyn Any {
@@ -260,31 +287,6 @@ impl ExecutionPlan for ProjectionExec {
         }))
     }
 
-    fn fmt_as(
-        &self,
-        t: DisplayFormatType,
-        f: &mut std::fmt::Formatter,
-    ) -> std::fmt::Result {
-        match t {
-            DisplayFormatType::Default | DisplayFormatType::Verbose => {
-                let expr: Vec<String> = self
-                    .expr
-                    .iter()
-                    .map(|(e, alias)| {
-                        let e = e.to_string();
-                        if &e != alias {
-                            format!("{e} as {alias}")
-                        } else {
-                            e
-                        }
-                    })
-                    .collect();
-
-                write!(f, "ProjectionExec: expr=[{}]", expr.join(", "))
-            }
-        }
-    }
-
     fn metrics(&self) -> Option<MetricsSet> {
         Some(self.metrics.clone_inner())
     }
diff --git a/datafusion/core/src/physical_plan/repartition/mod.rs 
b/datafusion/core/src/physical_plan/repartition/mod.rs
index 3c689e97ab..cb4d5c8988 100644
--- a/datafusion/core/src/physical_plan/repartition/mod.rs
+++ b/datafusion/core/src/physical_plan/repartition/mod.rs
@@ -43,7 +43,7 @@ use self::distributor_channels::{DistributionReceiver, 
DistributionSender};
 use super::common::{AbortOnDropMany, AbortOnDropSingle, 
SharedMemoryReservation};
 use super::expressions::PhysicalSortExpr;
 use super::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet};
-use super::{RecordBatchStream, SendableRecordBatchStream};
+use super::{DisplayAs, RecordBatchStream, SendableRecordBatchStream};
 
 use crate::physical_plan::common::transpose;
 use crate::physical_plan::metrics::BaselineMetrics;
@@ -320,6 +320,26 @@ impl RepartitionExec {
     }
 }
 
+impl DisplayAs for RepartitionExec {
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                write!(
+                    f,
+                    "{}: partitioning={}, input_partitions={}",
+                    self.name(),
+                    self.partitioning,
+                    self.input.output_partitioning().partition_count()
+                )
+            }
+        }
+    }
+}
+
 impl ExecutionPlan for RepartitionExec {
     /// Return a reference to Any that can be used for downcasting
     fn as_any(&self) -> &dyn Any {
@@ -515,24 +535,6 @@ impl ExecutionPlan for RepartitionExec {
         Some(self.metrics.clone_inner())
     }
 
-    fn fmt_as(
-        &self,
-        t: DisplayFormatType,
-        f: &mut std::fmt::Formatter,
-    ) -> std::fmt::Result {
-        match t {
-            DisplayFormatType::Default | DisplayFormatType::Verbose => {
-                write!(
-                    f,
-                    "{}: partitioning={}, input_partitions={}",
-                    self.name(),
-                    self.partitioning,
-                    self.input.output_partitioning().partition_count()
-                )
-            }
-        }
-    }
-
     fn statistics(&self) -> Statistics {
         self.input.statistics()
     }
diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs 
b/datafusion/core/src/physical_plan/sorts/sort.rs
index 205ec706b5..f660f0acf8 100644
--- a/datafusion/core/src/physical_plan/sorts/sort.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort.rs
@@ -27,8 +27,8 @@ use crate::physical_plan::metrics::{
 use crate::physical_plan::sorts::merge::streaming_merge;
 use crate::physical_plan::stream::{RecordBatchReceiverStream, 
RecordBatchStreamAdapter};
 use crate::physical_plan::{
-    DisplayFormatType, Distribution, EmptyRecordBatchStream, ExecutionPlan, 
Partitioning,
-    SendableRecordBatchStream, Statistics,
+    DisplayAs, DisplayFormatType, Distribution, EmptyRecordBatchStream, 
ExecutionPlan,
+    Partitioning, SendableRecordBatchStream, Statistics,
 };
 pub use arrow::compute::SortOptions;
 use arrow::compute::{concat_batches, lexsort_to_indices, take};
@@ -513,6 +513,26 @@ impl SortExec {
     }
 }
 
+impl DisplayAs for SortExec {
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                let expr: Vec<String> = self.expr.iter().map(|e| 
e.to_string()).collect();
+                match self.fetch {
+                    Some(fetch) => {
+                        write!(f, "SortExec: fetch={fetch}, expr=[{}]", 
expr.join(","))
+                    }
+                    None => write!(f, "SortExec: expr=[{}]", expr.join(",")),
+                }
+            }
+        }
+    }
+}
+
 impl ExecutionPlan for SortExec {
     fn as_any(&self) -> &dyn Any {
         self
@@ -619,24 +639,6 @@ impl ExecutionPlan for SortExec {
         Some(self.metrics_set.clone_inner())
     }
 
-    fn fmt_as(
-        &self,
-        t: DisplayFormatType,
-        f: &mut std::fmt::Formatter,
-    ) -> std::fmt::Result {
-        match t {
-            DisplayFormatType::Default | DisplayFormatType::Verbose => {
-                let expr: Vec<String> = self.expr.iter().map(|e| 
e.to_string()).collect();
-                match self.fetch {
-                    Some(fetch) => {
-                        write!(f, "SortExec: fetch={fetch}, expr=[{}]", 
expr.join(","))
-                    }
-                    None => write!(f, "SortExec: expr=[{}]", expr.join(",")),
-                }
-            }
-        }
-    }
-
     fn statistics(&self) -> Statistics {
         self.input.statistics()
     }
diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs 
b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
index 397d254162..8262a18f23 100644
--- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
@@ -28,6 +28,7 @@ use crate::physical_plan::metrics::{
     BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet,
 };
 use crate::physical_plan::sorts::streaming_merge;
+use crate::physical_plan::DisplayAs;
 use crate::physical_plan::{
     expressions::PhysicalSortExpr, DisplayFormatType, Distribution, 
ExecutionPlan,
     Partitioning, SendableRecordBatchStream, Statistics,
@@ -107,6 +108,26 @@ impl SortPreservingMergeExec {
     }
 }
 
+impl DisplayAs for SortPreservingMergeExec {
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                let expr: Vec<String> = self.expr.iter().map(|e| 
e.to_string()).collect();
+                write!(f, "SortPreservingMergeExec: [{}]", expr.join(","))?;
+                if let Some(fetch) = self.fetch {
+                    write!(f, ", fetch={fetch}")?;
+                };
+
+                Ok(())
+            }
+        }
+    }
+}
+
 impl ExecutionPlan for SortPreservingMergeExec {
     /// Return a reference to Any that can be used for downcasting
     fn as_any(&self) -> &dyn Any {
@@ -215,24 +236,6 @@ impl ExecutionPlan for SortPreservingMergeExec {
         }
     }
 
-    fn fmt_as(
-        &self,
-        t: DisplayFormatType,
-        f: &mut std::fmt::Formatter,
-    ) -> std::fmt::Result {
-        match t {
-            DisplayFormatType::Default | DisplayFormatType::Verbose => {
-                let expr: Vec<String> = self.expr.iter().map(|e| 
e.to_string()).collect();
-                write!(f, "SortPreservingMergeExec: [{}]", expr.join(","))?;
-                if let Some(fetch) = self.fetch {
-                    write!(f, ", fetch={fetch}")?;
-                };
-
-                Ok(())
-            }
-        }
-    }
-
     fn metrics(&self) -> Option<MetricsSet> {
         Some(self.metrics.clone_inner())
     }
diff --git a/datafusion/core/src/physical_plan/streaming.rs 
b/datafusion/core/src/physical_plan/streaming.rs
index 48f1409e14..887592f295 100644
--- a/datafusion/core/src/physical_plan/streaming.rs
+++ b/datafusion/core/src/physical_plan/streaming.rs
@@ -32,6 +32,8 @@ use crate::physical_plan::stream::RecordBatchStreamAdapter;
 use crate::physical_plan::{ExecutionPlan, Partitioning, 
SendableRecordBatchStream};
 use datafusion_execution::TaskContext;
 
+use super::{DisplayAs, DisplayFormatType};
+
 /// A partition that can be converted into a [`SendableRecordBatchStream`]
 pub trait PartitionStream: Send + Sync {
     /// Returns the schema of this partition
@@ -90,6 +92,20 @@ impl std::fmt::Debug for StreamingTableExec {
     }
 }
 
+impl DisplayAs for StreamingTableExec {
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                write!(f, "StreamingTableExec")
+            }
+        }
+    }
+}
+
 #[async_trait]
 impl ExecutionPlan for StreamingTableExec {
     fn as_any(&self) -> &dyn Any {
diff --git a/datafusion/core/src/physical_plan/union.rs 
b/datafusion/core/src/physical_plan/union.rs
index ffd3f06ee8..e1f8072085 100644
--- a/datafusion/core/src/physical_plan/union.rs
+++ b/datafusion/core/src/physical_plan/union.rs
@@ -34,6 +34,7 @@ use futures::Stream;
 use itertools::Itertools;
 use log::{debug, trace, warn};
 
+use super::DisplayAs;
 use super::{
     expressions::PhysicalSortExpr,
     metrics::{ExecutionPlanMetricsSet, MetricsSet},
@@ -144,6 +145,20 @@ impl UnionExec {
     }
 }
 
+impl DisplayAs for UnionExec {
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                write!(f, "UnionExec")
+            }
+        }
+    }
+}
+
 impl ExecutionPlan for UnionExec {
     /// Return a reference to Any that can be used for downcasting
     fn as_any(&self) -> &dyn Any {
@@ -247,18 +262,6 @@ impl ExecutionPlan for UnionExec {
         )))
     }
 
-    fn fmt_as(
-        &self,
-        t: DisplayFormatType,
-        f: &mut std::fmt::Formatter,
-    ) -> std::fmt::Result {
-        match t {
-            DisplayFormatType::Default | DisplayFormatType::Verbose => {
-                write!(f, "UnionExec")
-            }
-        }
-    }
-
     fn metrics(&self) -> Option<MetricsSet> {
         Some(self.metrics.clone_inner())
     }
@@ -342,6 +345,20 @@ impl InterleaveExec {
     }
 }
 
+impl DisplayAs for InterleaveExec {
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                write!(f, "InterleaveExec")
+            }
+        }
+    }
+}
+
 impl ExecutionPlan for InterleaveExec {
     /// Return a reference to Any that can be used for downcasting
     fn as_any(&self) -> &dyn Any {
@@ -421,18 +438,6 @@ impl ExecutionPlan for InterleaveExec {
         )))
     }
 
-    fn fmt_as(
-        &self,
-        t: DisplayFormatType,
-        f: &mut std::fmt::Formatter,
-    ) -> std::fmt::Result {
-        match t {
-            DisplayFormatType::Default | DisplayFormatType::Verbose => {
-                write!(f, "InterleaveExec")
-            }
-        }
-    }
-
     fn metrics(&self) -> Option<MetricsSet> {
         Some(self.metrics.clone_inner())
     }
diff --git a/datafusion/core/src/physical_plan/unnest.rs 
b/datafusion/core/src/physical_plan/unnest.rs
index 2e5c92872f..7f8d56847e 100644
--- a/datafusion/core/src/physical_plan/unnest.rs
+++ b/datafusion/core/src/physical_plan/unnest.rs
@@ -38,6 +38,8 @@ use crate::physical_plan::{
 };
 use datafusion_common::{DataFusionError, Result, ScalarValue};
 
+use super::DisplayAs;
+
 /// Unnest the given column by joining the row with each value in the nested 
type.
 #[derive(Debug)]
 pub struct UnnestExec {
@@ -60,6 +62,20 @@ impl UnnestExec {
     }
 }
 
+impl DisplayAs for UnnestExec {
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                write!(f, "UnnestExec")
+            }
+        }
+    }
+}
+
 impl ExecutionPlan for UnnestExec {
     fn as_any(&self) -> &dyn Any {
         self
@@ -126,18 +142,6 @@ impl ExecutionPlan for UnnestExec {
         }))
     }
 
-    fn fmt_as(
-        &self,
-        t: DisplayFormatType,
-        f: &mut std::fmt::Formatter,
-    ) -> std::fmt::Result {
-        match t {
-            DisplayFormatType::Default | DisplayFormatType::Verbose => {
-                write!(f, "UnnestExec")
-            }
-        }
-    }
-
     fn statistics(&self) -> Statistics {
         Default::default()
     }
diff --git a/datafusion/core/src/physical_plan/values.rs 
b/datafusion/core/src/physical_plan/values.rs
index 4099cfdb36..70e00ed034 100644
--- a/datafusion/core/src/physical_plan/values.rs
+++ b/datafusion/core/src/physical_plan/values.rs
@@ -18,7 +18,7 @@
 //! Values execution plan
 
 use super::expressions::PhysicalSortExpr;
-use super::{common, SendableRecordBatchStream, Statistics};
+use super::{common, DisplayAs, SendableRecordBatchStream, Statistics};
 use crate::physical_plan::{
     memory::MemoryStream, ColumnarValue, DisplayFormatType, ExecutionPlan, 
Partitioning,
     PhysicalExpr,
@@ -94,6 +94,20 @@ impl ValuesExec {
     }
 }
 
+impl DisplayAs for ValuesExec {
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                write!(f, "ValuesExec")
+            }
+        }
+    }
+}
+
 impl ExecutionPlan for ValuesExec {
     /// Return a reference to Any that can be used for downcasting
     fn as_any(&self) -> &dyn Any {
@@ -145,18 +159,6 @@ impl ExecutionPlan for ValuesExec {
         )?))
     }
 
-    fn fmt_as(
-        &self,
-        t: DisplayFormatType,
-        f: &mut std::fmt::Formatter,
-    ) -> std::fmt::Result {
-        match t {
-            DisplayFormatType::Default | DisplayFormatType::Verbose => {
-                write!(f, "ValuesExec")
-            }
-        }
-    }
-
     fn statistics(&self) -> Statistics {
         let batch = self.data();
         common::compute_record_batch_statistics(&[batch], &self.schema, None)
diff --git 
a/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs 
b/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs
index 9c86abec81..1f0da4a8e6 100644
--- a/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs
+++ b/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs
@@ -28,8 +28,8 @@ use crate::physical_plan::windows::{
     calc_requirements, get_ordered_partition_by_indices, 
window_ordering_equivalence,
 };
 use crate::physical_plan::{
-    ColumnStatistics, DisplayFormatType, Distribution, ExecutionPlan, 
Partitioning,
-    RecordBatchStream, SendableRecordBatchStream, Statistics, WindowExpr,
+    ColumnStatistics, DisplayAs, DisplayFormatType, Distribution, 
ExecutionPlan,
+    Partitioning, RecordBatchStream, SendableRecordBatchStream, Statistics, 
WindowExpr,
 };
 use datafusion_common::Result;
 use datafusion_execution::TaskContext;
@@ -201,6 +201,35 @@ impl BoundedWindowAggExec {
     }
 }
 
+impl DisplayAs for BoundedWindowAggExec {
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                write!(f, "BoundedWindowAggExec: ")?;
+                let g: Vec<String> = self
+                    .window_expr
+                    .iter()
+                    .map(|e| {
+                        format!(
+                            "{}: {:?}, frame: {:?}",
+                            e.name().to_owned(),
+                            e.field(),
+                            e.get_window_frame()
+                        )
+                    })
+                    .collect();
+                let mode = &self.partition_search_mode;
+                write!(f, "wdw=[{}], mode=[{:?}]", g.join(", "), mode)?;
+            }
+        }
+        Ok(())
+    }
+}
+
 impl ExecutionPlan for BoundedWindowAggExec {
     /// Return a reference to Any that can be used for downcasting
     fn as_any(&self) -> &dyn Any {
@@ -299,33 +328,6 @@ impl ExecutionPlan for BoundedWindowAggExec {
         Ok(stream)
     }
 
-    fn fmt_as(
-        &self,
-        t: DisplayFormatType,
-        f: &mut std::fmt::Formatter,
-    ) -> std::fmt::Result {
-        match t {
-            DisplayFormatType::Default | DisplayFormatType::Verbose => {
-                write!(f, "BoundedWindowAggExec: ")?;
-                let g: Vec<String> = self
-                    .window_expr
-                    .iter()
-                    .map(|e| {
-                        format!(
-                            "{}: {:?}, frame: {:?}",
-                            e.name().to_owned(),
-                            e.field(),
-                            e.get_window_frame()
-                        )
-                    })
-                    .collect();
-                let mode = &self.partition_search_mode;
-                write!(f, "wdw=[{}], mode=[{:?}]", g.join(", "), mode)?;
-            }
-        }
-        Ok(())
-    }
-
     fn metrics(&self) -> Option<MetricsSet> {
         Some(self.metrics.clone_inner())
     }
diff --git a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs 
b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
index 4a0648c8f1..7e7fc22965 100644
--- a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
+++ b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
@@ -26,7 +26,7 @@ use crate::physical_plan::windows::{
     calc_requirements, get_ordered_partition_by_indices, 
window_ordering_equivalence,
 };
 use crate::physical_plan::{
-    ColumnStatistics, DisplayFormatType, Distribution, EquivalenceProperties,
+    ColumnStatistics, DisplayAs, DisplayFormatType, Distribution, 
EquivalenceProperties,
     ExecutionPlan, Partitioning, PhysicalExpr, RecordBatchStream,
     SendableRecordBatchStream, Statistics, WindowExpr,
 };
@@ -121,6 +121,34 @@ impl WindowAggExec {
     }
 }
 
+impl DisplayAs for WindowAggExec {
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                write!(f, "WindowAggExec: ")?;
+                let g: Vec<String> = self
+                    .window_expr
+                    .iter()
+                    .map(|e| {
+                        format!(
+                            "{}: {:?}, frame: {:?}",
+                            e.name().to_owned(),
+                            e.field(),
+                            e.get_window_frame()
+                        )
+                    })
+                    .collect();
+                write!(f, "wdw=[{}]", g.join(", "))?;
+            }
+        }
+        Ok(())
+    }
+}
+
 impl ExecutionPlan for WindowAggExec {
     /// Return a reference to Any that can be used for downcasting
     fn as_any(&self) -> &dyn Any {
@@ -225,32 +253,6 @@ impl ExecutionPlan for WindowAggExec {
         Ok(stream)
     }
 
-    fn fmt_as(
-        &self,
-        t: DisplayFormatType,
-        f: &mut std::fmt::Formatter,
-    ) -> std::fmt::Result {
-        match t {
-            DisplayFormatType::Default | DisplayFormatType::Verbose => {
-                write!(f, "WindowAggExec: ")?;
-                let g: Vec<String> = self
-                    .window_expr
-                    .iter()
-                    .map(|e| {
-                        format!(
-                            "{}: {:?}, frame: {:?}",
-                            e.name().to_owned(),
-                            e.field(),
-                            e.get_window_frame()
-                        )
-                    })
-                    .collect();
-                write!(f, "wdw=[{}]", g.join(", "))?;
-            }
-        }
-        Ok(())
-    }
-
     fn metrics(&self) -> Option<MetricsSet> {
         Some(self.metrics.clone_inner())
     }
diff --git a/datafusion/core/src/physical_planner.rs 
b/datafusion/core/src/physical_planner.rs
index f00f5e0d5e..92c7604c37 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -1934,10 +1934,10 @@ mod tests {
     use super::*;
     use crate::datasource::file_format::options::CsvReadOptions;
     use crate::datasource::MemTable;
-    use crate::physical_plan::SendableRecordBatchStream;
     use crate::physical_plan::{
         expressions, DisplayFormatType, Partitioning, Statistics,
     };
+    use crate::physical_plan::{DisplayAs, SendableRecordBatchStream};
     use crate::physical_planner::PhysicalPlanner;
     use crate::prelude::{SessionConfig, SessionContext};
     use crate::scalar::ScalarValue;
@@ -2500,6 +2500,16 @@ mod tests {
         schema: SchemaRef,
     }
 
+    impl DisplayAs for NoOpExecutionPlan {
+        fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> 
fmt::Result {
+            match t {
+                DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                    write!(f, "NoOpExecutionPlan")
+                }
+            }
+        }
+    }
+
     impl ExecutionPlan for NoOpExecutionPlan {
         /// Return a reference to Any that can be used for downcasting
         fn as_any(&self) -> &dyn Any {
@@ -2537,14 +2547,6 @@ mod tests {
             unimplemented!("NoOpExecutionPlan::execute");
         }
 
-        fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> 
fmt::Result {
-            match t {
-                DisplayFormatType::Default | DisplayFormatType::Verbose => {
-                    write!(f, "NoOpExecutionPlan")
-                }
-            }
-        }
-
         fn statistics(&self) -> Statistics {
             unimplemented!("NoOpExecutionPlan::statistics");
         }
diff --git a/datafusion/core/src/test/exec.rs b/datafusion/core/src/test/exec.rs
index 32e7d4bcfe..63c86084ea 100644
--- a/datafusion/core/src/test/exec.rs
+++ b/datafusion/core/src/test/exec.rs
@@ -31,11 +31,11 @@ use arrow::{
 };
 use futures::Stream;
 
-use crate::physical_plan::expressions::PhysicalSortExpr;
 use crate::physical_plan::{
     common, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
     SendableRecordBatchStream, Statistics,
 };
+use crate::physical_plan::{expressions::PhysicalSortExpr, DisplayAs};
 use crate::{
     error::{DataFusionError, Result},
     physical_plan::stream::RecordBatchReceiverStream,
@@ -153,6 +153,20 @@ impl MockExec {
     }
 }
 
+impl DisplayAs for MockExec {
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                write!(f, "MockExec")
+            }
+        }
+    }
+}
+
 impl ExecutionPlan for MockExec {
     fn as_any(&self) -> &dyn Any {
         self
@@ -225,18 +239,6 @@ impl ExecutionPlan for MockExec {
         }
     }
 
-    fn fmt_as(
-        &self,
-        t: DisplayFormatType,
-        f: &mut std::fmt::Formatter,
-    ) -> std::fmt::Result {
-        match t {
-            DisplayFormatType::Default | DisplayFormatType::Verbose => {
-                write!(f, "MockExec")
-            }
-        }
-    }
-
     // Panics if one of the batches is an error
     fn statistics(&self) -> Statistics {
         let data: Result<Vec<_>> = self
@@ -295,6 +297,20 @@ impl BarrierExec {
     }
 }
 
+impl DisplayAs for BarrierExec {
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                write!(f, "BarrierExec")
+            }
+        }
+    }
+}
+
 impl ExecutionPlan for BarrierExec {
     fn as_any(&self) -> &dyn Any {
         self
@@ -352,18 +368,6 @@ impl ExecutionPlan for BarrierExec {
         Ok(builder.build())
     }
 
-    fn fmt_as(
-        &self,
-        t: DisplayFormatType,
-        f: &mut std::fmt::Formatter,
-    ) -> std::fmt::Result {
-        match t {
-            DisplayFormatType::Default | DisplayFormatType::Verbose => {
-                write!(f, "BarrierExec")
-            }
-        }
-    }
-
     fn statistics(&self) -> Statistics {
         common::compute_record_batch_statistics(&self.data, &self.schema, None)
     }
@@ -392,6 +396,20 @@ impl ErrorExec {
     }
 }
 
+impl DisplayAs for ErrorExec {
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                write!(f, "ErrorExec")
+            }
+        }
+    }
+}
+
 impl ExecutionPlan for ErrorExec {
     fn as_any(&self) -> &dyn Any {
         self
@@ -431,18 +449,6 @@ impl ExecutionPlan for ErrorExec {
         )))
     }
 
-    fn fmt_as(
-        &self,
-        t: DisplayFormatType,
-        f: &mut std::fmt::Formatter,
-    ) -> std::fmt::Result {
-        match t {
-            DisplayFormatType::Default | DisplayFormatType::Verbose => {
-                write!(f, "ErrorExec")
-            }
-        }
-    }
-
     fn statistics(&self) -> Statistics {
         Statistics::default()
     }
@@ -470,6 +476,26 @@ impl StatisticsExec {
         }
     }
 }
+
+impl DisplayAs for StatisticsExec {
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                write!(
+                    f,
+                    "StatisticsExec: col_count={}, row_count={:?}",
+                    self.schema.fields().len(),
+                    self.stats.num_rows,
+                )
+            }
+        }
+    }
+}
+
 impl ExecutionPlan for StatisticsExec {
     fn as_any(&self) -> &dyn Any {
         self
@@ -509,23 +535,6 @@ impl ExecutionPlan for StatisticsExec {
     fn statistics(&self) -> Statistics {
         self.stats.clone()
     }
-
-    fn fmt_as(
-        &self,
-        t: DisplayFormatType,
-        f: &mut std::fmt::Formatter,
-    ) -> std::fmt::Result {
-        match t {
-            DisplayFormatType::Default | DisplayFormatType::Verbose => {
-                write!(
-                    f,
-                    "StatisticsExec: col_count={}, row_count={:?}",
-                    self.schema.fields().len(),
-                    self.stats.num_rows,
-                )
-            }
-        }
-    }
 }
 
 /// Execution plan that emits streams that block forever.
@@ -563,6 +572,20 @@ impl BlockingExec {
     }
 }
 
+impl DisplayAs for BlockingExec {
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                write!(f, "BlockingExec",)
+            }
+        }
+    }
+}
+
 impl ExecutionPlan for BlockingExec {
     fn as_any(&self) -> &dyn Any {
         self
@@ -605,18 +628,6 @@ impl ExecutionPlan for BlockingExec {
         }))
     }
 
-    fn fmt_as(
-        &self,
-        t: DisplayFormatType,
-        f: &mut std::fmt::Formatter,
-    ) -> std::fmt::Result {
-        match t {
-            DisplayFormatType::Default | DisplayFormatType::Verbose => {
-                write!(f, "BlockingExec",)
-            }
-        }
-    }
-
     fn statistics(&self) -> Statistics {
         unimplemented!()
     }
@@ -697,6 +708,20 @@ impl PanicExec {
     }
 }
 
+impl DisplayAs for PanicExec {
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                write!(f, "PanickingExec",)
+            }
+        }
+    }
+}
+
 impl ExecutionPlan for PanicExec {
     fn as_any(&self) -> &dyn Any {
         self
@@ -743,18 +768,6 @@ impl ExecutionPlan for PanicExec {
         }))
     }
 
-    fn fmt_as(
-        &self,
-        t: DisplayFormatType,
-        f: &mut std::fmt::Formatter,
-    ) -> std::fmt::Result {
-        match t {
-            DisplayFormatType::Default | DisplayFormatType::Verbose => {
-                write!(f, "PanickingExec",)
-            }
-        }
-    }
-
     fn statistics(&self) -> Statistics {
         unimplemented!()
     }
diff --git a/datafusion/core/src/test_util/mod.rs 
b/datafusion/core/src/test_util/mod.rs
index 6715a6ba98..7f73e00dca 100644
--- a/datafusion/core/src/test_util/mod.rs
+++ b/datafusion/core/src/test_util/mod.rs
@@ -33,7 +33,7 @@ use crate::execution::context::{SessionState, TaskContext};
 use crate::execution::options::ReadOptions;
 use crate::logical_expr::{LogicalPlanBuilder, UNNAMED_TABLE};
 use crate::physical_plan::{
-    DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
+    DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, 
RecordBatchStream,
     SendableRecordBatchStream,
 };
 use crate::prelude::{CsvReadOptions, SessionContext};
@@ -363,6 +363,25 @@ impl UnboundedExec {
         }
     }
 }
+
+impl DisplayAs for UnboundedExec {
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                write!(
+                    f,
+                    "UnboundableExec: unbounded={}",
+                    self.batch_produce.is_none(),
+                )
+            }
+        }
+    }
+}
+
 impl ExecutionPlan for UnboundedExec {
     fn as_any(&self) -> &dyn Any {
         self
@@ -406,22 +425,6 @@ impl ExecutionPlan for UnboundedExec {
         }))
     }
 
-    fn fmt_as(
-        &self,
-        t: DisplayFormatType,
-        f: &mut std::fmt::Formatter,
-    ) -> std::fmt::Result {
-        match t {
-            DisplayFormatType::Default | DisplayFormatType::Verbose => {
-                write!(
-                    f,
-                    "UnboundableExec: unbounded={}",
-                    self.batch_produce.is_none(),
-                )
-            }
-        }
-    }
-
     fn statistics(&self) -> Statistics {
         Statistics::default()
     }
diff --git a/datafusion/core/tests/custom_sources.rs 
b/datafusion/core/tests/custom_sources.rs
index 7374a056d6..0f742f7b9b 100644
--- a/datafusion/core/tests/custom_sources.rs
+++ b/datafusion/core/tests/custom_sources.rs
@@ -26,8 +26,8 @@ use datafusion::logical_expr::{
 use datafusion::physical_plan::empty::EmptyExec;
 use datafusion::physical_plan::expressions::PhysicalSortExpr;
 use datafusion::physical_plan::{
-    project_schema, ColumnStatistics, ExecutionPlan, Partitioning, 
RecordBatchStream,
-    SendableRecordBatchStream, Statistics,
+    project_schema, ColumnStatistics, DisplayAs, ExecutionPlan, Partitioning,
+    RecordBatchStream, SendableRecordBatchStream, Statistics,
 };
 use datafusion::scalar::ScalarValue;
 use datafusion::{
@@ -101,6 +101,20 @@ impl Stream for TestCustomRecordBatchStream {
     }
 }
 
+impl DisplayAs for CustomExecutionPlan {
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                write!(f, "CustomExecutionPlan: projection={:#?}", 
self.projection)
+            }
+        }
+    }
+}
+
 impl ExecutionPlan for CustomExecutionPlan {
     fn as_any(&self) -> &dyn Any {
         self
@@ -138,18 +152,6 @@ impl ExecutionPlan for CustomExecutionPlan {
         Ok(Box::pin(TestCustomRecordBatchStream { nb_batch: 1 }))
     }
 
-    fn fmt_as(
-        &self,
-        t: DisplayFormatType,
-        f: &mut std::fmt::Formatter,
-    ) -> std::fmt::Result {
-        match t {
-            DisplayFormatType::Default | DisplayFormatType::Verbose => {
-                write!(f, "CustomExecutionPlan: projection={:#?}", 
self.projection)
-            }
-        }
-    }
-
     fn statistics(&self) -> Statistics {
         let batch = TEST_CUSTOM_RECORD_BATCH!().unwrap();
         Statistics {
diff --git 
a/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs 
b/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs
index 36b15f4359..b471121d1b 100644
--- a/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs
+++ b/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs
@@ -26,7 +26,8 @@ use datafusion::logical_expr::{Expr, 
TableProviderFilterPushDown};
 use datafusion::physical_plan::expressions::PhysicalSortExpr;
 use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
 use datafusion::physical_plan::{
-    DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, 
Statistics,
+    DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, 
SendableRecordBatchStream,
+    Statistics,
 };
 use datafusion::prelude::*;
 use datafusion::scalar::ScalarValue;
@@ -58,6 +59,20 @@ struct CustomPlan {
     batches: Vec<RecordBatch>,
 }
 
+impl DisplayAs for CustomPlan {
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                write!(f, "CustomPlan: batch_size={}", self.batches.len(),)
+            }
+        }
+    }
+}
+
 impl ExecutionPlan for CustomPlan {
     fn as_any(&self) -> &dyn std::any::Any {
         self
@@ -97,18 +112,6 @@ impl ExecutionPlan for CustomPlan {
         )))
     }
 
-    fn fmt_as(
-        &self,
-        t: DisplayFormatType,
-        f: &mut std::fmt::Formatter,
-    ) -> std::fmt::Result {
-        match t {
-            DisplayFormatType::Default | DisplayFormatType::Verbose => {
-                write!(f, "CustomPlan: batch_size={}", self.batches.len(),)
-            }
-        }
-    }
-
     fn statistics(&self) -> Statistics {
         // here we could provide more accurate statistics
         // but we want to test the filter pushdown not the CBOs
diff --git a/datafusion/core/tests/custom_sources_cases/statistics.rs 
b/datafusion/core/tests/custom_sources_cases/statistics.rs
index 73237fa857..167cf0e7f3 100644
--- a/datafusion/core/tests/custom_sources_cases/statistics.rs
+++ b/datafusion/core/tests/custom_sources_cases/statistics.rs
@@ -25,7 +25,7 @@ use datafusion::{
     error::Result,
     logical_expr::Expr,
     physical_plan::{
-        expressions::PhysicalSortExpr, project_schema, ColumnStatistics,
+        expressions::PhysicalSortExpr, project_schema, ColumnStatistics, 
DisplayAs,
         DisplayFormatType, ExecutionPlan, Partitioning, 
SendableRecordBatchStream,
         Statistics,
     },
@@ -111,6 +111,25 @@ impl TableProvider for StatisticsValidation {
     }
 }
 
+impl DisplayAs for StatisticsValidation {
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                write!(
+                    f,
+                    "StatisticsValidation: col_count={}, row_count={:?}",
+                    self.schema.fields().len(),
+                    self.stats.num_rows,
+                )
+            }
+        }
+    }
+}
+
 impl ExecutionPlan for StatisticsValidation {
     fn as_any(&self) -> &dyn Any {
         self
@@ -150,23 +169,6 @@ impl ExecutionPlan for StatisticsValidation {
     fn statistics(&self) -> Statistics {
         self.stats.clone()
     }
-
-    fn fmt_as(
-        &self,
-        t: DisplayFormatType,
-        f: &mut std::fmt::Formatter,
-    ) -> std::fmt::Result {
-        match t {
-            DisplayFormatType::Default | DisplayFormatType::Verbose => {
-                write!(
-                    f,
-                    "StatisticsValidation: col_count={}, row_count={:?}",
-                    self.schema.fields().len(),
-                    self.stats.num_rows,
-                )
-            }
-        }
-    }
 }
 
 fn init_ctx(stats: Statistics, schema: Schema) -> Result<SessionContext> {
diff --git a/datafusion/core/tests/user_defined/user_defined_plan.rs 
b/datafusion/core/tests/user_defined/user_defined_plan.rs
index 493d90d91a..0771ef7f79 100644
--- a/datafusion/core/tests/user_defined/user_defined_plan.rs
+++ b/datafusion/core/tests/user_defined/user_defined_plan.rs
@@ -80,8 +80,9 @@ use datafusion::{
     },
     optimizer::{optimize_children, OptimizerConfig, OptimizerRule},
     physical_plan::{
-        expressions::PhysicalSortExpr, DisplayFormatType, Distribution, 
ExecutionPlan,
-        Partitioning, RecordBatchStream, SendableRecordBatchStream, Statistics,
+        expressions::PhysicalSortExpr, DisplayAs, DisplayFormatType, 
Distribution,
+        ExecutionPlan, Partitioning, RecordBatchStream, 
SendableRecordBatchStream,
+        Statistics,
     },
     physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, 
PhysicalPlanner},
     prelude::{SessionConfig, SessionContext},
@@ -421,6 +422,20 @@ impl Debug for TopKExec {
     }
 }
 
+impl DisplayAs for TopKExec {
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                write!(f, "TopKExec: k={}", self.k)
+            }
+        }
+    }
+}
+
 #[async_trait]
 impl ExecutionPlan for TopKExec {
     /// Return a reference to Any that can be used for downcasting
@@ -478,18 +493,6 @@ impl ExecutionPlan for TopKExec {
         }))
     }
 
-    fn fmt_as(
-        &self,
-        t: DisplayFormatType,
-        f: &mut std::fmt::Formatter,
-    ) -> std::fmt::Result {
-        match t {
-            DisplayFormatType::Default | DisplayFormatType::Verbose => {
-                write!(f, "TopKExec: k={}", self.k)
-            }
-        }
-    }
-
     fn statistics(&self) -> Statistics {
         // to improve the optimizability of this plan
         // better statistics inference could be provided


Reply via email to