xudong963 commented on code in PR #17242:
URL: https://github.com/apache/datafusion/pull/17242#discussion_r2312447897


##########
datafusion/core/src/datasource/physical_plan/arrow_file.rs:
##########
@@ -52,50 +62,40 @@ impl From<ArrowSource> for Arc<dyn FileSource> {
 }
 
 impl FileSource for ArrowSource {
+    fn config(&self) -> &FileScanConfig {
+        &self.config
+    }
+
+    fn with_config(&self, config: FileScanConfig) -> Arc<dyn FileSource> {
+        let mut this = self.clone();
+        this.config = config;
+
+        Arc::new(this)
+    }

Review Comment:
   I don't find a reason why we don't give a default implementation for it in 
`FileSource`



##########
datafusion/core/src/datasource/physical_plan/arrow_file.rs:
##########
@@ -52,50 +62,40 @@ impl From<ArrowSource> for Arc<dyn FileSource> {
 }
 
 impl FileSource for ArrowSource {
+    fn config(&self) -> &FileScanConfig {
+        &self.config
+    }
+
+    fn with_config(&self, config: FileScanConfig) -> Arc<dyn FileSource> {
+        let mut this = self.clone();
+        this.config = config;
+
+        Arc::new(this)
+    }
+
+    fn as_data_source(&self) -> Arc<dyn DataSource> {
+        Arc::new(self.clone())
+    }

Review Comment:
   ditto



##########
datafusion/datasource-parquet/src/file_format.rs:
##########
@@ -475,21 +475,25 @@ impl FileFormat for ParquetFormat {
     async fn create_physical_plan(
         &self,
         state: &dyn Session,
-        conf: FileScanConfig,
+        source: Arc<dyn FileSource>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
         let mut metadata_size_hint = None;
 
         if let Some(metadata) = self.metadata_size_hint() {
             metadata_size_hint = Some(metadata);
         }
 
-        let mut source = ParquetSource::new(self.options.clone());
+        let mut source = source
+            .as_any()
+            .downcast_ref::<ParquetSource>()
+            .unwrap()

Review Comment:
   It should be better to be replaced with proper error handling



##########
datafusion/datasource-parquet/src/source.rs:
##########
@@ -580,12 +616,12 @@ impl FileSource for ParquetSource {
         self
     }
 
-    fn metrics(&self) -> &ExecutionPlanMetricsSet {
+    fn metrics_inner(&self) -> &ExecutionPlanMetricsSet {
         &self.metrics
     }
 
-    fn file_source_statistics(&self, config: &FileScanConfig) -> Statistics {
-        let statistics = config.file_source_projected_statistics.clone();
+    fn file_source_statistics(&self) -> Statistics {
+        let statistics = self.config.file_source_projected_statistics.clone();

Review Comment:
   seems just a name issue. 



##########
datafusion/datasource/src/file.rs:
##########
@@ -51,28 +67,27 @@ pub fn as_file_source<T: FileSource + 'static>(source: T) 
-> Arc<dyn FileSource>
 /// * 
[`ParquetSource`](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.ParquetSource.html)
 ///
 /// [`DataSource`]: crate::source::DataSource
-pub trait FileSource: Send + Sync {
+pub trait FileSource: fmt::Debug + Send + Sync {
+    fn config(&self) -> &FileScanConfig;
+
+    fn with_config(&self, config: FileScanConfig) -> Arc<dyn FileSource>;
+
+    fn as_data_source(&self) -> Arc<dyn DataSource>;
+
     /// Creates a `dyn FileOpener` based on given parameters
     fn create_file_opener(
         &self,
         object_store: Arc<dyn ObjectStore>,
-        base_config: &FileScanConfig,
         partition: usize,
     ) -> Arc<dyn FileOpener>;
     /// Any
     fn as_any(&self) -> &dyn Any;
-    /// Initialize new type with batch size configuration
-    fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource>;
-    /// Initialize new instance with a new schema
-    fn with_schema(&self, schema: SchemaRef) -> Arc<dyn FileSource>;
-    /// Initialize new instance with projection information
-    fn with_projection(&self, config: &FileScanConfig) -> Arc<dyn FileSource>;
-    /// Initialize new instance with projected statistics
-    fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource>;
     /// Return execution plan metrics
-    fn metrics(&self) -> &ExecutionPlanMetricsSet;
+    fn metrics_inner(&self) -> &ExecutionPlanMetricsSet;

Review Comment:
   Can we avoid the method here? It seems that we can add metrics to 
`FileScanConfig`



##########
datafusion/datasource-parquet/src/file_format.rs:
##########
@@ -500,13 +504,7 @@ impl FileFormat for ParquetFormat {
 
         source = self.set_source_encryption_factory(source, state)?;
 
-        // Apply schema adapter factory before building the new config
-        let file_source = source.apply_schema_adapter(&conf)?;
-
-        let conf = FileScanConfigBuilder::from(conf)
-            .with_source(file_source)
-            .build();

Review Comment:
   Note for later reviewers, these step have been pulled up by 
`create_file_source_with_schema_adapter`



##########
datafusion/core/src/datasource/physical_plan/arrow_file.rs:
##########
@@ -52,50 +62,40 @@ impl From<ArrowSource> for Arc<dyn FileSource> {
 }
 
 impl FileSource for ArrowSource {
+    fn config(&self) -> &FileScanConfig {
+        &self.config

Review Comment:
   ditto



##########
datafusion/datasource/src/file_scan_config.rs:
##########
@@ -184,17 +172,17 @@ pub struct FileScanConfig {
     pub file_compression_type: FileCompressionType,
     /// Are new lines in values supported for CSVOptions
     pub new_lines_in_values: bool,
-    /// File source such as `ParquetSource`, `CsvSource`, `JsonSource`, etc.
-    pub file_source: Arc<dyn FileSource>,
     /// Batch size while creating new batches
     /// Defaults to [`datafusion_common::config::ExecutionOptions`] batch_size.
     pub batch_size: Option<usize>,
     /// Expression adapter used to adapt filters and projections that are 
pushed down into the scan
     /// from the logical schema to the physical schema of the file.
     pub expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
+
+    pub file_source_projected_statistics: Statistics,

Review Comment:
   `projected_statistics` is good to me



##########
datafusion/core/src/datasource/physical_plan/arrow_file.rs:
##########
@@ -49,23 +62,37 @@ impl From<ArrowSource> for Arc<dyn FileSource> {
 }
 
 impl FileSource for ArrowSource {
+    fn config(&self) -> &FileScanConfig {
+        &self.config
+    }
+
+    fn with_config(&self, config: FileScanConfig) -> Arc<dyn FileSource> {
+        let mut this = self.clone();
+        this.config = config;
+
+        Arc::new(this)
+    }
+
+    fn as_data_source(&self) -> Arc<dyn DataSource> {
+        Arc::new(self.clone())
+    }
+
     fn create_file_opener(
         &self,
         object_store: Arc<dyn ObjectStore>,
-        base_config: &FileScanConfig,
         _partition: usize,
     ) -> Arc<dyn FileOpener> {
         Arc::new(ArrowOpener {
             object_store,
-            projection: base_config.file_column_projection_indices(),
+            projection: self.config().file_column_projection_indices(),

Review Comment:
    Both approaches work in this case because:
   - The trait method `config()` just returns `&self.config`
   - So `self.config()` and `self.config` access the same data
   
    Use `self.config()` for consistency with the trait interface, especially 
since:
   
   1. It maintains abstraction - using the trait method rather than direct 
field access
   2. It would work correctly even if the trait implementation changed, though 
it seems not to have happened



##########
datafusion/datasource/src/file.rs:
##########
@@ -84,13 +99,14 @@ pub trait FileSource: Send + Sync {
     ///
     /// The default implementation uses [`FileGroupPartitioner`]. See that
     /// struct for more details.
-    fn repartitioned(
+    fn repartitioned_inner(

Review Comment:
   +1



##########
datafusion/datasource/src/file.rs:
##########
@@ -270,8 +293,29 @@ impl<T: FileSource + 'static> DataSource for T {
         SchedulingType::Cooperative
     }
 
-    fn data_source_statistics(&self) -> Result<Statistics> {
-        Ok(self.config().projected_stats(self.file_source_statistics()))
+    fn data_source_statistics(&self) -> Statistics {
+        let file_source_statistics = self.file_source_statistics();

Review Comment:
   Both `statistics()`s are good to me



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to