alamb commented on code in PR #10286:
URL: https://github.com/apache/datafusion/pull/10286#discussion_r1583665774


##########
datafusion-examples/examples/parquet_exec_visitor.rs:
##########
@@ -0,0 +1,102 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use datafusion::datasource::file_format::parquet::ParquetFormat;
+use datafusion::datasource::listing::ListingOptions;
+use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec};
+use datafusion::execution::context::SessionContext;
+use datafusion::physical_plan::metrics::MetricValue;
+use datafusion::physical_plan::{
+    execute_stream, visit_execution_plan, ExecutionPlan, ExecutionPlanVisitor,
+};
+use futures::StreamExt;
+
+#[tokio::main]
+async fn main() {
+    let ctx = SessionContext::new();
+
+    let test_data = datafusion::test_util::parquet_test_data();
+
+    // Configure listing options
+    let file_format = ParquetFormat::default().with_enable_pruning(true);
+    let listing_options = ListingOptions::new(Arc::new(file_format))
+        // This is a workaround for this example since `test_data` contains

Review Comment:
   I think you could avoid this using 
`&format!("file://{test_data}/alltypes_plain.parquet"),` as the filename below 
(I tried it locally and it seems to work)



##########
datafusion-examples/examples/parquet_exec_visitor.rs:
##########
@@ -0,0 +1,102 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use datafusion::datasource::file_format::parquet::ParquetFormat;
+use datafusion::datasource::listing::ListingOptions;
+use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec};
+use datafusion::execution::context::SessionContext;
+use datafusion::physical_plan::metrics::MetricValue;
+use datafusion::physical_plan::{
+    execute_stream, visit_execution_plan, ExecutionPlan, ExecutionPlanVisitor,
+};
+use futures::StreamExt;
+
+#[tokio::main]
+async fn main() {
+    let ctx = SessionContext::new();
+
+    let test_data = datafusion::test_util::parquet_test_data();
+
+    // Configure listing options
+    let file_format = ParquetFormat::default().with_enable_pruning(true);
+    let listing_options = ListingOptions::new(Arc::new(file_format))
+        // This is a workaround for this example since `test_data` contains
+        // many different parquet different files,
+        // in practice use FileType::PARQUET.get_ext().
+        .with_file_extension("alltypes_plain.parquet");
+
+    // First example were we use an absolute path, which requires no 
additional setup.
+    let _ = ctx
+        .register_listing_table(
+            "my_table",
+            &format!("file://{test_data}/"),
+            listing_options.clone(),
+            None,
+            None,
+        )
+        .await;
+
+    let df = ctx.sql("SELECT * FROM my_table").await.unwrap();
+    let plan = df.create_physical_plan().await.unwrap();
+
+    // Create empty visitor
+    let mut visitor = ParquetExecVisitor {
+        file_scan_config: None,
+        bytes_scanned: None,
+    };
+
+    // Make sure you execute the plan to collect actual execution statistics.
+    // For example, in this example the `file_scan_config` is known without 
executing
+    // but the `bytes_scanned` would be None if we did not execute.
+    let mut batch_stream = execute_stream(plan.clone(), 
ctx.task_ctx()).unwrap();
+    while let Some(batch) = batch_stream.next().await {
+        println!("Do something with batch");
+    }
+
+    visit_execution_plan(plan.as_ref(), &mut visitor).unwrap();
+
+    println!("ParquetExecVisitor: {:?}", visitor);
+}
+
+/// Define a struct with fields to hold the information you want to collect
+#[derive(Debug)]
+struct ParquetExecVisitor {
+    file_scan_config: Option<FileScanConfig>,
+    bytes_scanned: Option<MetricValue>,
+}
+
+impl ExecutionPlanVisitor for ParquetExecVisitor {
+    type Error = datafusion_common::DataFusionError;
+
+    /// Based on your needs implement either `pre_visit` or `post_visit`

Review Comment:
   ```suggestion
       /// This function is called once for every node in the tree. 
       /// Based on your needs implement either `pre_visit` (visit each node 
before its children/inputs)
       /// or `post_visit` (visit each node after its children/inputs)
   ```



##########
datafusion-examples/examples/parquet_exec_visitor.rs:
##########
@@ -0,0 +1,102 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use datafusion::datasource::file_format::parquet::ParquetFormat;
+use datafusion::datasource::listing::ListingOptions;
+use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec};
+use datafusion::execution::context::SessionContext;
+use datafusion::physical_plan::metrics::MetricValue;
+use datafusion::physical_plan::{
+    execute_stream, visit_execution_plan, ExecutionPlan, ExecutionPlanVisitor,
+};
+use futures::StreamExt;
+
+#[tokio::main]
+async fn main() {

Review Comment:
   I think it helps to make the examples self standing with a few comments that 
explain what it shows, for example
   
   
https://github.com/apache/datafusion/blob/0f2a68ee1676c0d141d2c7cacf4b7c21d0033870/datafusion-examples/examples/csv_sql.rs#L22-L23
   
   So in this one, perhaps we could add something like
   
   ```suggestion
   /// Example of collecting metrics after execution by visiting the 
`ExecutionPlan`
   #[tokio::main]
   async fn main() {
   ```



##########
datafusion-examples/examples/parquet_exec_visitor.rs:
##########
@@ -0,0 +1,102 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use datafusion::datasource::file_format::parquet::ParquetFormat;
+use datafusion::datasource::listing::ListingOptions;
+use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec};
+use datafusion::execution::context::SessionContext;
+use datafusion::physical_plan::metrics::MetricValue;
+use datafusion::physical_plan::{
+    execute_stream, visit_execution_plan, ExecutionPlan, ExecutionPlanVisitor,
+};
+use futures::StreamExt;
+
+#[tokio::main]
+async fn main() {
+    let ctx = SessionContext::new();
+
+    let test_data = datafusion::test_util::parquet_test_data();
+
+    // Configure listing options
+    let file_format = ParquetFormat::default().with_enable_pruning(true);
+    let listing_options = ListingOptions::new(Arc::new(file_format))
+        // This is a workaround for this example since `test_data` contains
+        // many different parquet different files,
+        // in practice use FileType::PARQUET.get_ext().
+        .with_file_extension("alltypes_plain.parquet");
+
+    // First example were we use an absolute path, which requires no 
additional setup.
+    let _ = ctx
+        .register_listing_table(
+            "my_table",
+            &format!("file://{test_data}/"),
+            listing_options.clone(),
+            None,
+            None,
+        )
+        .await;
+
+    let df = ctx.sql("SELECT * FROM my_table").await.unwrap();
+    let plan = df.create_physical_plan().await.unwrap();
+
+    // Create empty visitor
+    let mut visitor = ParquetExecVisitor {
+        file_scan_config: None,
+        bytes_scanned: None,
+    };
+
+    // Make sure you execute the plan to collect actual execution statistics.
+    // For example, in this example the `file_scan_config` is known without 
executing
+    // but the `bytes_scanned` would be None if we did not execute.
+    let mut batch_stream = execute_stream(plan.clone(), 
ctx.task_ctx()).unwrap();
+    while let Some(batch) = batch_stream.next().await {
+        println!("Do something with batch");
+    }
+
+    visit_execution_plan(plan.as_ref(), &mut visitor).unwrap();
+
+    println!("ParquetExecVisitor: {:?}", visitor);

Review Comment:
   The output looks a little messy:
   
   ```shell
   Do something with batch
   ParquetExecVisitor: ParquetExecVisitor { file_scan_config: 
Some(object_store_url=ObjectStoreUrl { url: Url { scheme: "file", 
cannot_be_a_base: false, username: "", password: None, host: None, port: None, 
path: "/", query: None, fragment: None } }, statistics=Statistics { num_rows: 
Exact(8), total_byte_size: Exact(671), column_statistics: [ColumnStatistics { 
null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: 
Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: 
Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, 
max_value: Absent, min_value: Absent, distinct_count: Absent }, 
ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, 
distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: 
Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { 
null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: 
Absent }, ColumnStatistics { null_count: Absen
 t, max_value: Absent, min_value: Absent, distinct_count: Absent }, 
ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, 
distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: 
Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { 
null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: 
Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: 
Absent, distinct_count: Absent }] }, file_groups={1 group: 
[[Users/andrewlamb/Software/datafusion2/parquet-testing/data/alltypes_plain.parquet]]},
 projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, 
float_col, double_col, date_string_col, string_col, timestamp_col]), 
bytes_scanned: Some(Count { name: "bytes_scanned", count: Count { value: 671 } 
}) }
   ```
   
   Maybe we could print out a nicer version like 
   
   ```rust
   
       let bytes_scanned = visitor.bytes_scanned.unwrap();
       println!("Total parquet bytes scanned {bytes_scanned}");
   ```
   
   ```shell
   Do something with batch
   Total parquet bytes scanned 671
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to