This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 5501e8eb32 Support COPY TO Externally Defined File Formats, add 
FileType trait (#11060)
5501e8eb32 is described below

commit 5501e8eb326b808a9877d9a8cb743cbb428ecc8a
Author: Devin D'Angelo <[email protected]>
AuthorDate: Thu Jun 27 20:20:43 2024 -0400

    Support COPY TO Externally Defined File Formats, add FileType trait (#11060)
    
    * wip create and register ext file types with session
    
    * Add contains function, and support in datafusion substrait consumer 
(#10879)
    
    * adding new function contains
    
    * adding substrait test
    
    * adding doc
    
    * adding doc
    
    * Update docs/source/user-guide/sql/scalar_functions.md
    
    Co-authored-by: Alex Huang <[email protected]>
    
    * adding entry
    
    ---------
    
    Co-authored-by: Alex Huang <[email protected]>
    
    * logical planning updated
    
    * compiling
    
    * removing filetype enum
    
    * compiling
    
    * working on tests
    
    * fix some tests
    
    * test fixes
    
    * cli fix
    
    * cli fmt
    
    * Update datafusion/core/src/datasource/file_format/mod.rs
    
    Co-authored-by: Andrew Lamb <[email protected]>
    
    * Update datafusion/core/src/execution/session_state.rs
    
    Co-authored-by: Andrew Lamb <[email protected]>
    
    * review comments
    
    * review comments
    
    * review comments
    
    * typo fix
    
    * fmt
    
    * fix err log style
    
    * fmt
    
    ---------
    
    Co-authored-by: Lordworms <[email protected]>
    Co-authored-by: Alex Huang <[email protected]>
    Co-authored-by: Andrew Lamb <[email protected]>
---
 datafusion-cli/src/exec.rs                         |  31 +-
 .../external_dependency/dataframe-to-s3.rs         |   6 +-
 datafusion/common/src/config.rs                    |  77 ++--
 datafusion/common/src/file_options/file_type.rs    | 116 +-----
 datafusion/common/src/file_options/mod.rs          |  12 +-
 datafusion/common/src/lib.rs                       |   4 +-
 datafusion/core/src/dataframe/mod.rs               |  28 +-
 datafusion/core/src/dataframe/parquet.rs           |  19 +-
 .../core/src/datasource/file_format/arrow.rs       |  56 ++-
 datafusion/core/src/datasource/file_format/avro.rs |  56 +++
 datafusion/core/src/datasource/file_format/csv.rs  |  92 ++++-
 .../file_format/file_compression_type.rs           |  83 +----
 datafusion/core/src/datasource/file_format/json.rs |  76 +++-
 datafusion/core/src/datasource/file_format/mod.rs  |  92 ++++-
 .../core/src/datasource/file_format/parquet.rs     |  86 ++++-
 datafusion/core/src/datasource/listing/table.rs    |  97 +++--
 .../core/src/datasource/listing_table_factory.rs   |  41 +--
 .../core/src/datasource/physical_plan/csv.rs       |  24 +-
 .../core/src/datasource/physical_plan/json.rs      |  14 +-
 .../src/datasource/physical_plan/parquet/mod.rs    |   4 +-
 datafusion/core/src/execution/session_state.rs     |  81 ++++-
 datafusion/core/src/physical_planner.rs            |  42 +--
 datafusion/core/src/test/mod.rs                    |  19 +-
 datafusion/expr/src/logical_plan/builder.rs        |   6 +-
 datafusion/expr/src/logical_plan/display.rs        |   4 +-
 datafusion/expr/src/logical_plan/dml.rs            |   6 +-
 datafusion/expr/src/logical_plan/plan.rs           |   8 +-
 datafusion/expr/src/logical_plan/tree_node.rs      |   4 +-
 datafusion/proto/gen/src/main.rs                   |   1 +
 datafusion/proto/proto/datafusion.proto            |   8 +-
 datafusion/proto/src/generated/pbjson.rs           |  93 ++---
 datafusion/proto/src/generated/prost.rs            |  21 +-
 datafusion/proto/src/logical_plan/file_formats.rs  | 399 +++++++++++++++++++++
 datafusion/proto/src/logical_plan/mod.rs           |  35 +-
 datafusion/proto/src/physical_plan/from_proto.rs   |  22 +-
 datafusion/proto/src/physical_plan/to_proto.rs     |  28 +-
 .../proto/tests/cases/roundtrip_logical_plan.rs    |  62 ++--
 datafusion/sql/src/planner.rs                      |   6 +
 datafusion/sql/src/statement.rs                    |  58 +--
 datafusion/sql/tests/common/mod.rs                 |  32 +-
 40 files changed, 1305 insertions(+), 644 deletions(-)

diff --git a/datafusion-cli/src/exec.rs b/datafusion-cli/src/exec.rs
index c4c92be152..b78f32e0ac 100644
--- a/datafusion-cli/src/exec.rs
+++ b/datafusion-cli/src/exec.rs
@@ -21,7 +21,6 @@ use std::collections::HashMap;
 use std::fs::File;
 use std::io::prelude::*;
 use std::io::BufReader;
-use std::str::FromStr;
 
 use crate::cli_context::CliSessionContext;
 use crate::helper::split_from_semicolon;
@@ -35,6 +34,7 @@ use crate::{
 
 use datafusion::common::instant::Instant;
 use datafusion::common::plan_datafusion_err;
+use datafusion::config::ConfigFileType;
 use datafusion::datasource::listing::ListingTableUrl;
 use datafusion::error::{DataFusionError, Result};
 use datafusion::logical_expr::{DdlStatement, LogicalPlan};
@@ -42,7 +42,6 @@ use datafusion::physical_plan::{collect, execute_stream, 
ExecutionPlanProperties
 use datafusion::sql::parser::{DFParser, Statement};
 use datafusion::sql::sqlparser::dialect::dialect_from_str;
 
-use datafusion::common::FileType;
 use datafusion::sql::sqlparser;
 use rustyline::error::ReadlineError;
 use rustyline::Editor;
@@ -291,6 +290,15 @@ impl AdjustedPrintOptions {
     }
 }
 
+fn config_file_type_from_str(ext: &str) -> Option<ConfigFileType> {
+    match ext.to_lowercase().as_str() {
+        "csv" => Some(ConfigFileType::CSV),
+        "json" => Some(ConfigFileType::JSON),
+        "parquet" => Some(ConfigFileType::PARQUET),
+        _ => None,
+    }
+}
+
 async fn create_plan(
     ctx: &mut dyn CliSessionContext,
     statement: Statement,
@@ -302,7 +310,7 @@ async fn create_plan(
     // will raise Configuration errors.
     if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &plan {
         // To support custom formats, treat error as None
-        let format = FileType::from_str(&cmd.file_type).ok();
+        let format = config_file_type_from_str(&cmd.file_type);
         register_object_store_and_config_extensions(
             ctx,
             &cmd.location,
@@ -313,13 +321,13 @@ async fn create_plan(
     }
 
     if let LogicalPlan::Copy(copy_to) = &mut plan {
-        let format: FileType = (&copy_to.format_options).into();
+        let format = config_file_type_from_str(&copy_to.file_type.get_ext());
 
         register_object_store_and_config_extensions(
             ctx,
             &copy_to.output_url,
             &copy_to.options,
-            Some(format),
+            format,
         )
         .await?;
     }
@@ -357,7 +365,7 @@ pub(crate) async fn 
register_object_store_and_config_extensions(
     ctx: &dyn CliSessionContext,
     location: &String,
     options: &HashMap<String, String>,
-    format: Option<FileType>,
+    format: Option<ConfigFileType>,
 ) -> Result<()> {
     // Parse the location URL to extract the scheme and other components
     let table_path = ListingTableUrl::parse(location)?;
@@ -374,7 +382,7 @@ pub(crate) async fn 
register_object_store_and_config_extensions(
     // Clone and modify the default table options based on the provided options
     let mut table_options = 
ctx.session_state().default_table_options().clone();
     if let Some(format) = format {
-        table_options.set_file_format(format);
+        table_options.set_config_format(format);
     }
     table_options.alter_with_string_hash_map(options)?;
 
@@ -392,7 +400,6 @@ pub(crate) async fn 
register_object_store_and_config_extensions(
 mod tests {
     use super::*;
 
-    use datafusion::common::config::FormatOptions;
     use datafusion::common::plan_err;
 
     use datafusion::prelude::SessionContext;
@@ -403,7 +410,7 @@ mod tests {
         let plan = ctx.state().create_logical_plan(sql).await?;
 
         if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = 
&plan {
-            let format = FileType::from_str(&cmd.file_type).ok();
+            let format = config_file_type_from_str(&cmd.file_type);
             register_object_store_and_config_extensions(
                 &ctx,
                 &cmd.location,
@@ -429,12 +436,12 @@ mod tests {
         let plan = ctx.state().create_logical_plan(sql).await?;
 
         if let LogicalPlan::Copy(cmd) = &plan {
-            let format: FileType = (&cmd.format_options).into();
+            let format = config_file_type_from_str(&cmd.file_type.get_ext());
             register_object_store_and_config_extensions(
                 &ctx,
                 &cmd.output_url,
                 &cmd.options,
-                Some(format),
+                format,
             )
             .await?;
         } else {
@@ -484,7 +491,7 @@ mod tests {
                 let mut plan = create_plan(&mut ctx, statement).await?;
                 if let LogicalPlan::Copy(copy_to) = &mut plan {
                     assert_eq!(copy_to.output_url, location);
-                    assert!(matches!(copy_to.format_options, 
FormatOptions::PARQUET(_)));
+                    assert_eq!(copy_to.file_type.get_ext(), 
"parquet".to_string());
                     ctx.runtime_env()
                         .object_store_registry
                         .get_store(&Url::parse(&copy_to.output_url).unwrap())?;
diff --git 
a/datafusion-examples/examples/external_dependency/dataframe-to-s3.rs 
b/datafusion-examples/examples/external_dependency/dataframe-to-s3.rs
index 4d71ed7589..e75ba5dd53 100644
--- a/datafusion-examples/examples/external_dependency/dataframe-to-s3.rs
+++ b/datafusion-examples/examples/external_dependency/dataframe-to-s3.rs
@@ -20,10 +20,10 @@ use std::sync::Arc;
 
 use datafusion::dataframe::DataFrameWriteOptions;
 use datafusion::datasource::file_format::parquet::ParquetFormat;
+use datafusion::datasource::file_format::FileFormat;
 use datafusion::datasource::listing::ListingOptions;
 use datafusion::error::Result;
 use datafusion::prelude::*;
-use datafusion_common::{FileType, GetExt};
 
 use object_store::aws::AmazonS3Builder;
 use url::Url;
@@ -54,7 +54,7 @@ async fn main() -> Result<()> {
     let path = format!("s3://{bucket_name}/test_data/");
     let file_format = ParquetFormat::default().with_enable_pruning(true);
     let listing_options = ListingOptions::new(Arc::new(file_format))
-        .with_file_extension(FileType::PARQUET.get_ext());
+        .with_file_extension(ParquetFormat::default().get_ext());
     ctx.register_listing_table("test", &path, listing_options, None, None)
         .await?;
 
@@ -79,7 +79,7 @@ async fn main() -> Result<()> {
 
     let file_format = ParquetFormat::default().with_enable_pruning(true);
     let listing_options = ListingOptions::new(Arc::new(file_format))
-        .with_file_extension(FileType::PARQUET.get_ext());
+        .with_file_extension(ParquetFormat::default().get_ext());
     ctx.register_listing_table("test2", &out_path, listing_options, None, None)
         .await?;
 
diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs
index 47da14574c..b90aeffb07 100644
--- a/datafusion/common/src/config.rs
+++ b/datafusion/common/src/config.rs
@@ -24,7 +24,7 @@ use std::str::FromStr;
 
 use crate::error::_config_err;
 use crate::parsers::CompressionTypeVariant;
-use crate::{DataFusionError, FileType, Result};
+use crate::{DataFusionError, Result};
 
 /// A macro that wraps a configuration struct and automatically derives
 /// [`Default`] and [`ConfigField`] for it, allowing it to be used
@@ -1116,6 +1116,16 @@ macro_rules! extensions_options {
     }
 }
 
+/// These file types have special built in behavior for configuration.
+/// Use TableOptions::Extensions for configuring other file types.
+#[derive(Debug, Clone)]
+pub enum ConfigFileType {
+    CSV,
+    #[cfg(feature = "parquet")]
+    PARQUET,
+    JSON,
+}
+
 /// Represents the configuration options available for handling different 
table formats within a data processing application.
 /// This struct encompasses options for various file formats including CSV, 
Parquet, and JSON, allowing for flexible configuration
 /// of parsing and writing behaviors specific to each format. Additionally, it 
supports extending functionality through custom extensions.
@@ -1134,7 +1144,7 @@ pub struct TableOptions {
 
     /// The current file format that the table operations should assume. This 
option allows
     /// for dynamic switching between the supported file types (e.g., CSV, 
Parquet, JSON).
-    pub current_format: Option<FileType>,
+    pub current_format: Option<ConfigFileType>,
 
     /// Optional extensions that can be used to extend or customize the 
behavior of the table
     /// options. Extensions can be registered using `Extensions::insert` and 
might include
@@ -1152,10 +1162,9 @@ impl ConfigField for TableOptions {
         if let Some(file_type) = &self.current_format {
             match file_type {
                 #[cfg(feature = "parquet")]
-                FileType::PARQUET => self.parquet.visit(v, "format", ""),
-                FileType::CSV => self.csv.visit(v, "format", ""),
-                FileType::JSON => self.json.visit(v, "format", ""),
-                _ => {}
+                ConfigFileType::PARQUET => self.parquet.visit(v, "format", ""),
+                ConfigFileType::CSV => self.csv.visit(v, "format", ""),
+                ConfigFileType::JSON => self.json.visit(v, "format", ""),
             }
         } else {
             self.csv.visit(v, "csv", "");
@@ -1188,12 +1197,9 @@ impl ConfigField for TableOptions {
         match key {
             "format" => match format {
                 #[cfg(feature = "parquet")]
-                FileType::PARQUET => self.parquet.set(rem, value),
-                FileType::CSV => self.csv.set(rem, value),
-                FileType::JSON => self.json.set(rem, value),
-                _ => {
-                    _config_err!("Config value \"{key}\" is not supported on 
{}", format)
-                }
+                ConfigFileType::PARQUET => self.parquet.set(rem, value),
+                ConfigFileType::CSV => self.csv.set(rem, value),
+                ConfigFileType::JSON => self.json.set(rem, value),
             },
             _ => _config_err!("Config value \"{key}\" not found on 
TableOptions"),
         }
@@ -1210,15 +1216,6 @@ impl TableOptions {
         Self::default()
     }
 
-    /// Sets the file format for the table.
-    ///
-    /// # Parameters
-    ///
-    /// * `format`: The file format to use (e.g., CSV, Parquet).
-    pub fn set_file_format(&mut self, format: FileType) {
-        self.current_format = Some(format);
-    }
-
     /// Creates a new `TableOptions` instance initialized with settings from a 
given session config.
     ///
     /// # Parameters
@@ -1249,6 +1246,15 @@ impl TableOptions {
         clone
     }
 
+    /// Sets the file format for the table.
+    ///
+    /// # Parameters
+    ///
+    /// * `format`: The file format to use (e.g., CSV, Parquet).
+    pub fn set_config_format(&mut self, format: ConfigFileType) {
+        self.current_format = Some(format);
+    }
+
     /// Sets the extensions for this `TableOptions` instance.
     ///
     /// # Parameters
@@ -1673,6 +1679,8 @@ config_namespace! {
     }
 }
 
+pub trait FormatOptionsExt: Display {}
+
 #[derive(Debug, Clone, PartialEq)]
 #[allow(clippy::large_enum_variant)]
 pub enum FormatOptions {
@@ -1698,28 +1706,15 @@ impl Display for FormatOptions {
     }
 }
 
-impl From<FileType> for FormatOptions {
-    fn from(value: FileType) -> Self {
-        match value {
-            FileType::ARROW => FormatOptions::ARROW,
-            FileType::AVRO => FormatOptions::AVRO,
-            #[cfg(feature = "parquet")]
-            FileType::PARQUET => 
FormatOptions::PARQUET(TableParquetOptions::default()),
-            FileType::CSV => FormatOptions::CSV(CsvOptions::default()),
-            FileType::JSON => FormatOptions::JSON(JsonOptions::default()),
-        }
-    }
-}
-
 #[cfg(test)]
 mod tests {
     use std::any::Any;
     use std::collections::HashMap;
 
     use crate::config::{
-        ConfigEntry, ConfigExtension, ExtensionOptions, Extensions, 
TableOptions,
+        ConfigEntry, ConfigExtension, ConfigFileType, ExtensionOptions, 
Extensions,
+        TableOptions,
     };
-    use crate::FileType;
 
     #[derive(Default, Debug, Clone)]
     pub struct TestExtensionConfig {
@@ -1777,7 +1772,7 @@ mod tests {
         let mut extension = Extensions::new();
         extension.insert(TestExtensionConfig::default());
         let mut table_config = TableOptions::new().with_extensions(extension);
-        table_config.set_file_format(FileType::CSV);
+        table_config.set_config_format(ConfigFileType::CSV);
         table_config.set("format.delimiter", ";").unwrap();
         assert_eq!(table_config.csv.delimiter, b';');
         table_config.set("test.bootstrap.servers", "asd").unwrap();
@@ -1794,7 +1789,7 @@ mod tests {
     #[test]
     fn csv_u8_table_options() {
         let mut table_config = TableOptions::new();
-        table_config.set_file_format(FileType::CSV);
+        table_config.set_config_format(ConfigFileType::CSV);
         table_config.set("format.delimiter", ";").unwrap();
         assert_eq!(table_config.csv.delimiter as char, ';');
         table_config.set("format.escape", "\"").unwrap();
@@ -1807,7 +1802,7 @@ mod tests {
     #[test]
     fn parquet_table_options() {
         let mut table_config = TableOptions::new();
-        table_config.set_file_format(FileType::PARQUET);
+        table_config.set_config_format(ConfigFileType::PARQUET);
         table_config
             .set("format.bloom_filter_enabled::col1", "true")
             .unwrap();
@@ -1821,7 +1816,7 @@ mod tests {
     #[test]
     fn parquet_table_options_config_entry() {
         let mut table_config = TableOptions::new();
-        table_config.set_file_format(FileType::PARQUET);
+        table_config.set_config_format(ConfigFileType::PARQUET);
         table_config
             .set("format.bloom_filter_enabled::col1", "true")
             .unwrap();
@@ -1835,7 +1830,7 @@ mod tests {
     #[test]
     fn parquet_table_options_config_metadata_entry() {
         let mut table_config = TableOptions::new();
-        table_config.set_file_format(FileType::PARQUET);
+        table_config.set_config_format(ConfigFileType::PARQUET);
         table_config.set("format.metadata::key1", "").unwrap();
         table_config.set("format.metadata::key2", "value2").unwrap();
         table_config
diff --git a/datafusion/common/src/file_options/file_type.rs 
b/datafusion/common/src/file_options/file_type.rs
index fc0bb74456..2648f72897 100644
--- a/datafusion/common/src/file_options/file_type.rs
+++ b/datafusion/common/src/file_options/file_type.rs
@@ -17,11 +17,8 @@
 
 //! File type abstraction
 
-use std::fmt::{self, Display};
-use std::str::FromStr;
-
-use crate::config::FormatOptions;
-use crate::error::{DataFusionError, Result};
+use std::any::Any;
+use std::fmt::Display;
 
 /// The default file extension of arrow files
 pub const DEFAULT_ARROW_EXTENSION: &str = ".arrow";
@@ -40,107 +37,10 @@ pub trait GetExt {
     fn get_ext(&self) -> String;
 }
 
-/// Readable file type
-#[derive(Debug, Clone, PartialEq, Eq, Hash)]
-pub enum FileType {
-    /// Apache Arrow file
-    ARROW,
-    /// Apache Avro file
-    AVRO,
-    /// Apache Parquet file
-    #[cfg(feature = "parquet")]
-    PARQUET,
-    /// CSV file
-    CSV,
-    /// JSON file
-    JSON,
-}
-
-impl From<&FormatOptions> for FileType {
-    fn from(value: &FormatOptions) -> Self {
-        match value {
-            FormatOptions::CSV(_) => FileType::CSV,
-            FormatOptions::JSON(_) => FileType::JSON,
-            #[cfg(feature = "parquet")]
-            FormatOptions::PARQUET(_) => FileType::PARQUET,
-            FormatOptions::AVRO => FileType::AVRO,
-            FormatOptions::ARROW => FileType::ARROW,
-        }
-    }
-}
-
-impl GetExt for FileType {
-    fn get_ext(&self) -> String {
-        match self {
-            FileType::ARROW => DEFAULT_ARROW_EXTENSION.to_owned(),
-            FileType::AVRO => DEFAULT_AVRO_EXTENSION.to_owned(),
-            #[cfg(feature = "parquet")]
-            FileType::PARQUET => DEFAULT_PARQUET_EXTENSION.to_owned(),
-            FileType::CSV => DEFAULT_CSV_EXTENSION.to_owned(),
-            FileType::JSON => DEFAULT_JSON_EXTENSION.to_owned(),
-        }
-    }
-}
-
-impl Display for FileType {
-    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
-        let out = match self {
-            FileType::CSV => "csv",
-            FileType::JSON => "json",
-            #[cfg(feature = "parquet")]
-            FileType::PARQUET => "parquet",
-            FileType::AVRO => "avro",
-            FileType::ARROW => "arrow",
-        };
-        write!(f, "{}", out)
-    }
-}
-
-impl FromStr for FileType {
-    type Err = DataFusionError;
-
-    fn from_str(s: &str) -> Result<Self> {
-        let s = s.to_uppercase();
-        match s.as_str() {
-            "ARROW" => Ok(FileType::ARROW),
-            "AVRO" => Ok(FileType::AVRO),
-            #[cfg(feature = "parquet")]
-            "PARQUET" => Ok(FileType::PARQUET),
-            "CSV" => Ok(FileType::CSV),
-            "JSON" | "NDJSON" => Ok(FileType::JSON),
-            _ => Err(DataFusionError::NotImplemented(format!(
-                "Unknown FileType: {s}"
-            ))),
-        }
-    }
-}
-
-#[cfg(test)]
-#[cfg(feature = "parquet")]
-mod tests {
-    use std::str::FromStr;
-
-    use crate::error::DataFusionError;
-    use crate::FileType;
-
-    #[test]
-    fn from_str() {
-        for (ext, file_type) in [
-            ("csv", FileType::CSV),
-            ("CSV", FileType::CSV),
-            ("json", FileType::JSON),
-            ("JSON", FileType::JSON),
-            ("avro", FileType::AVRO),
-            ("AVRO", FileType::AVRO),
-            ("parquet", FileType::PARQUET),
-            ("PARQUET", FileType::PARQUET),
-        ] {
-            assert_eq!(FileType::from_str(ext).unwrap(), file_type);
-        }
-
-        assert!(matches!(
-            FileType::from_str("Unknown"),
-            Err(DataFusionError::NotImplemented(_))
-        ));
-    }
+/// Defines the functionality needed for logical planning for
+/// a type of file which will be read or written to storage.
+pub trait FileType: GetExt + Display + Send + Sync {
+    /// Returns the table source as [`Any`] so that it can be
+    /// downcast to a specific implementation.
+    fn as_any(&self) -> &dyn Any;
 }
diff --git a/datafusion/common/src/file_options/mod.rs 
b/datafusion/common/src/file_options/mod.rs
index 59040b4290..77781457d0 100644
--- a/datafusion/common/src/file_options/mod.rs
+++ b/datafusion/common/src/file_options/mod.rs
@@ -32,10 +32,10 @@ mod tests {
 
     use super::parquet_writer::ParquetWriterOptions;
     use crate::{
-        config::TableOptions,
+        config::{ConfigFileType, TableOptions},
         file_options::{csv_writer::CsvWriterOptions, 
json_writer::JsonWriterOptions},
         parsers::CompressionTypeVariant,
-        FileType, Result,
+        Result,
     };
 
     use parquet::{
@@ -76,7 +76,7 @@ mod tests {
         option_map.insert("format.bloom_filter_ndv".to_owned(), 
"123".to_owned());
 
         let mut table_config = TableOptions::new();
-        table_config.set_file_format(FileType::PARQUET);
+        table_config.set_config_format(ConfigFileType::PARQUET);
         table_config.alter_with_string_hash_map(&option_map)?;
 
         let parquet_options = 
ParquetWriterOptions::try_from(&table_config.parquet)?;
@@ -181,7 +181,7 @@ mod tests {
         );
 
         let mut table_config = TableOptions::new();
-        table_config.set_file_format(FileType::PARQUET);
+        table_config.set_config_format(ConfigFileType::PARQUET);
         table_config.alter_with_string_hash_map(&option_map)?;
 
         let parquet_options = 
ParquetWriterOptions::try_from(&table_config.parquet)?;
@@ -284,7 +284,7 @@ mod tests {
         option_map.insert("format.delimiter".to_owned(), ";".to_owned());
 
         let mut table_config = TableOptions::new();
-        table_config.set_file_format(FileType::CSV);
+        table_config.set_config_format(ConfigFileType::CSV);
         table_config.alter_with_string_hash_map(&option_map)?;
 
         let csv_options = CsvWriterOptions::try_from(&table_config.csv)?;
@@ -306,7 +306,7 @@ mod tests {
         option_map.insert("format.compression".to_owned(), "gzip".to_owned());
 
         let mut table_config = TableOptions::new();
-        table_config.set_file_format(FileType::JSON);
+        table_config.set_config_format(ConfigFileType::JSON);
         table_config.alter_with_string_hash_map(&option_map)?;
 
         let json_options = JsonWriterOptions::try_from(&table_config.json)?;
diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs
index e64acd0bfe..c275152642 100644
--- a/datafusion/common/src/lib.rs
+++ b/datafusion/common/src/lib.rs
@@ -54,8 +54,8 @@ pub use error::{
     SharedResult,
 };
 pub use file_options::file_type::{
-    FileType, GetExt, DEFAULT_ARROW_EXTENSION, DEFAULT_AVRO_EXTENSION,
-    DEFAULT_CSV_EXTENSION, DEFAULT_JSON_EXTENSION, DEFAULT_PARQUET_EXTENSION,
+    GetExt, DEFAULT_ARROW_EXTENSION, DEFAULT_AVRO_EXTENSION, 
DEFAULT_CSV_EXTENSION,
+    DEFAULT_JSON_EXTENSION, DEFAULT_PARQUET_EXTENSION,
 };
 pub use functional_dependencies::{
     aggregate_functional_dependencies, get_required_group_by_exprs_indices,
diff --git a/datafusion/core/src/dataframe/mod.rs 
b/datafusion/core/src/dataframe/mod.rs
index 86e510969b..8e55da8c3a 100644
--- a/datafusion/core/src/dataframe/mod.rs
+++ b/datafusion/core/src/dataframe/mod.rs
@@ -26,6 +26,9 @@ use std::sync::Arc;
 
 use crate::arrow::record_batch::RecordBatch;
 use crate::arrow::util::pretty;
+use crate::datasource::file_format::csv::CsvFormatFactory;
+use crate::datasource::file_format::format_as_file_type;
+use crate::datasource::file_format::json::JsonFormatFactory;
 use crate::datasource::{provider_as_source, MemTable, TableProvider};
 use crate::error::Result;
 use crate::execution::context::{SessionState, TaskContext};
@@ -44,7 +47,7 @@ use arrow::array::{Array, ArrayRef, Int64Array, StringArray};
 use arrow::compute::{cast, concat};
 use arrow::datatypes::{DataType, Field};
 use arrow_schema::{Schema, SchemaRef};
-use datafusion_common::config::{CsvOptions, FormatOptions, JsonOptions};
+use datafusion_common::config::{CsvOptions, JsonOptions};
 use datafusion_common::{
     plan_err, Column, DFSchema, DataFusionError, ParamValues, SchemaError, 
UnnestOptions,
 };
@@ -1329,13 +1332,19 @@ impl DataFrame {
                 "Overwrites are not implemented for 
DataFrame::write_csv.".to_owned(),
             ));
         }
-        let props = writer_options
-            .unwrap_or_else(|| self.session_state.default_table_options().csv);
+
+        let format = if let Some(csv_opts) = writer_options {
+            Arc::new(CsvFormatFactory::new_with_options(csv_opts))
+        } else {
+            Arc::new(CsvFormatFactory::new())
+        };
+
+        let file_type = format_as_file_type(format);
 
         let plan = LogicalPlanBuilder::copy_to(
             self.plan,
             path.into(),
-            FormatOptions::CSV(props),
+            file_type,
             HashMap::new(),
             options.partition_by,
         )?
@@ -1384,13 +1393,18 @@ impl DataFrame {
             ));
         }
 
-        let props = writer_options
-            .unwrap_or_else(|| 
self.session_state.default_table_options().json);
+        let format = if let Some(json_opts) = writer_options {
+            Arc::new(JsonFormatFactory::new_with_options(json_opts))
+        } else {
+            Arc::new(JsonFormatFactory::new())
+        };
+
+        let file_type = format_as_file_type(format);
 
         let plan = LogicalPlanBuilder::copy_to(
             self.plan,
             path.into(),
-            FormatOptions::JSON(props),
+            file_type,
             Default::default(),
             options.partition_by,
         )?
diff --git a/datafusion/core/src/dataframe/parquet.rs 
b/datafusion/core/src/dataframe/parquet.rs
index 0ec46df0ae..1abb550f5c 100644
--- a/datafusion/core/src/dataframe/parquet.rs
+++ b/datafusion/core/src/dataframe/parquet.rs
@@ -15,11 +15,17 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::sync::Arc;
+
+use crate::datasource::file_format::{
+    format_as_file_type, parquet::ParquetFormatFactory,
+};
+
 use super::{
     DataFrame, DataFrameWriteOptions, DataFusionError, LogicalPlanBuilder, 
RecordBatch,
 };
 
-use datafusion_common::config::{FormatOptions, TableParquetOptions};
+use datafusion_common::config::TableParquetOptions;
 
 impl DataFrame {
     /// Execute the `DataFrame` and write the results to Parquet file(s).
@@ -57,13 +63,18 @@ impl DataFrame {
             ));
         }
 
-        let props = writer_options
-            .unwrap_or_else(|| 
self.session_state.default_table_options().parquet);
+        let format = if let Some(parquet_opts) = writer_options {
+            Arc::new(ParquetFormatFactory::new_with_options(parquet_opts))
+        } else {
+            Arc::new(ParquetFormatFactory::new())
+        };
+
+        let file_type = format_as_file_type(format);
 
         let plan = LogicalPlanBuilder::copy_to(
             self.plan,
             path.into(),
-            FormatOptions::PARQUET(props),
+            file_type,
             Default::default(),
             options.partition_by,
         )?
diff --git a/datafusion/core/src/datasource/file_format/arrow.rs 
b/datafusion/core/src/datasource/file_format/arrow.rs
index 8c67905415..478a11d7e7 100644
--- a/datafusion/core/src/datasource/file_format/arrow.rs
+++ b/datafusion/core/src/datasource/file_format/arrow.rs
@@ -21,12 +21,14 @@
 
 use std::any::Any;
 use std::borrow::Cow;
+use std::collections::HashMap;
 use std::fmt::{self, Debug};
 use std::sync::Arc;
 
 use super::file_compression_type::FileCompressionType;
 use super::write::demux::start_demuxer_task;
 use super::write::{create_writer, SharedBuffer};
+use super::FileFormatFactory;
 use crate::datasource::file_format::FileFormat;
 use crate::datasource::physical_plan::{
     ArrowExec, FileGroupDisplay, FileScanConfig, FileSinkConfig,
@@ -40,7 +42,10 @@ use arrow::ipc::reader::FileReader;
 use arrow::ipc::writer::IpcWriteOptions;
 use arrow::ipc::{root_as_message, CompressionType};
 use arrow_schema::{ArrowError, Schema, SchemaRef};
-use datafusion_common::{not_impl_err, DataFusionError, Statistics};
+use datafusion_common::parsers::CompressionTypeVariant;
+use datafusion_common::{
+    not_impl_err, DataFusionError, GetExt, Statistics, DEFAULT_ARROW_EXTENSION,
+};
 use datafusion_execution::{SendableRecordBatchStream, TaskContext};
 use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};
 use datafusion_physical_plan::insert::{DataSink, DataSinkExec};
@@ -61,6 +66,38 @@ const INITIAL_BUFFER_BYTES: usize = 1048576;
 /// If the buffered Arrow data exceeds this size, it is flushed to object store
 const BUFFER_FLUSH_BYTES: usize = 1024000;
 
+#[derive(Default)]
+/// Factory struct used to create [ArrowFormat]
+pub struct ArrowFormatFactory;
+
+impl ArrowFormatFactory {
+    /// Creates an instance of [ArrowFormatFactory]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+impl FileFormatFactory for ArrowFormatFactory {
+    fn create(
+        &self,
+        _state: &SessionState,
+        _format_options: &HashMap<String, String>,
+    ) -> Result<Arc<dyn FileFormat>> {
+        Ok(Arc::new(ArrowFormat))
+    }
+
+    fn default(&self) -> Arc<dyn FileFormat> {
+        Arc::new(ArrowFormat)
+    }
+}
+
+impl GetExt for ArrowFormatFactory {
+    fn get_ext(&self) -> String {
+        // Removes the dot, i.e. ".parquet" -> "parquet"
+        DEFAULT_ARROW_EXTENSION[1..].to_string()
+    }
+}
+
 /// Arrow `FileFormat` implementation.
 #[derive(Default, Debug)]
 pub struct ArrowFormat;
@@ -71,6 +108,23 @@ impl FileFormat for ArrowFormat {
         self
     }
 
+    fn get_ext(&self) -> String {
+        ArrowFormatFactory::new().get_ext()
+    }
+
+    fn get_ext_with_compression(
+        &self,
+        file_compression_type: &FileCompressionType,
+    ) -> Result<String> {
+        let ext = self.get_ext();
+        match file_compression_type.get_variant() {
+            CompressionTypeVariant::UNCOMPRESSED => Ok(ext),
+            _ => Err(DataFusionError::Internal(
+                "Arrow FileFormat does not support compression.".into(),
+            )),
+        }
+    }
+
     async fn infer_schema(
         &self,
         _state: &SessionState,
diff --git a/datafusion/core/src/datasource/file_format/avro.rs 
b/datafusion/core/src/datasource/file_format/avro.rs
index 7b2c26a2c4..f4f9adcba7 100644
--- a/datafusion/core/src/datasource/file_format/avro.rs
+++ b/datafusion/core/src/datasource/file_format/avro.rs
@@ -18,15 +18,22 @@
 //! [`AvroFormat`] Apache Avro [`FileFormat`] abstractions
 
 use std::any::Any;
+use std::collections::HashMap;
 use std::sync::Arc;
 
 use arrow::datatypes::Schema;
 use arrow::datatypes::SchemaRef;
 use async_trait::async_trait;
+use datafusion_common::parsers::CompressionTypeVariant;
+use datafusion_common::DataFusionError;
+use datafusion_common::GetExt;
+use datafusion_common::DEFAULT_AVRO_EXTENSION;
 use datafusion_physical_expr::PhysicalExpr;
 use object_store::{GetResultPayload, ObjectMeta, ObjectStore};
 
+use super::file_compression_type::FileCompressionType;
 use super::FileFormat;
+use super::FileFormatFactory;
 use crate::datasource::avro_to_arrow::read_avro_schema_from_reader;
 use crate::datasource::physical_plan::{AvroExec, FileScanConfig};
 use crate::error::Result;
@@ -34,6 +41,38 @@ use crate::execution::context::SessionState;
 use crate::physical_plan::ExecutionPlan;
 use crate::physical_plan::Statistics;
 
+#[derive(Default)]
+/// Factory struct used to create [AvroFormat]
+pub struct AvroFormatFactory;
+
+impl AvroFormatFactory {
+    /// Creates an instance of [AvroFormatFactory]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+impl FileFormatFactory for AvroFormatFactory {
+    fn create(
+        &self,
+        _state: &SessionState,
+        _format_options: &HashMap<String, String>,
+    ) -> Result<Arc<dyn FileFormat>> {
+        Ok(Arc::new(AvroFormat))
+    }
+
+    fn default(&self) -> Arc<dyn FileFormat> {
+        Arc::new(AvroFormat)
+    }
+}
+
+impl GetExt for AvroFormatFactory {
+    fn get_ext(&self) -> String {
+        // Removes the dot, i.e. ".parquet" -> "parquet"
+        DEFAULT_AVRO_EXTENSION[1..].to_string()
+    }
+}
+
 /// Avro `FileFormat` implementation.
 #[derive(Default, Debug)]
 pub struct AvroFormat;
@@ -44,6 +83,23 @@ impl FileFormat for AvroFormat {
         self
     }
 
+    fn get_ext(&self) -> String {
+        AvroFormatFactory::new().get_ext()
+    }
+
+    fn get_ext_with_compression(
+        &self,
+        file_compression_type: &FileCompressionType,
+    ) -> Result<String> {
+        let ext = self.get_ext();
+        match file_compression_type.get_variant() {
+            CompressionTypeVariant::UNCOMPRESSED => Ok(ext),
+            _ => Err(DataFusionError::Internal(
+                "Avro FileFormat does not support compression.".into(),
+            )),
+        }
+    }
+
     async fn infer_schema(
         &self,
         _state: &SessionState,
diff --git a/datafusion/core/src/datasource/file_format/csv.rs 
b/datafusion/core/src/datasource/file_format/csv.rs
index 2139b35621..92cb11e2b4 100644
--- a/datafusion/core/src/datasource/file_format/csv.rs
+++ b/datafusion/core/src/datasource/file_format/csv.rs
@@ -18,12 +18,12 @@
 //! [`CsvFormat`], Comma Separated Value (CSV) [`FileFormat`] abstractions
 
 use std::any::Any;
-use std::collections::HashSet;
+use std::collections::{HashMap, HashSet};
 use std::fmt::{self, Debug};
 use std::sync::Arc;
 
 use super::write::orchestration::stateless_multipart_put;
-use super::FileFormat;
+use super::{FileFormat, FileFormatFactory};
 use crate::datasource::file_format::file_compression_type::FileCompressionType;
 use crate::datasource::file_format::write::BatchSerializer;
 use crate::datasource::physical_plan::{
@@ -40,9 +40,11 @@ use arrow::array::RecordBatch;
 use arrow::csv::WriterBuilder;
 use arrow::datatypes::SchemaRef;
 use arrow::datatypes::{DataType, Field, Fields, Schema};
-use datafusion_common::config::CsvOptions;
+use datafusion_common::config::{ConfigField, ConfigFileType, CsvOptions};
 use datafusion_common::file_options::csv_writer::CsvWriterOptions;
-use datafusion_common::{exec_err, not_impl_err, DataFusionError};
+use datafusion_common::{
+    exec_err, not_impl_err, DataFusionError, GetExt, DEFAULT_CSV_EXTENSION,
+};
 use datafusion_execution::TaskContext;
 use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};
 use datafusion_physical_plan::metrics::MetricsSet;
@@ -53,6 +55,63 @@ use futures::stream::BoxStream;
 use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
 use object_store::{delimited::newline_delimited_stream, ObjectMeta, 
ObjectStore};
 
+#[derive(Default)]
+/// Factory struct used to create [CsvFormatFactory]
+pub struct CsvFormatFactory {
+    options: Option<CsvOptions>,
+}
+
+impl CsvFormatFactory {
+    /// Creates an instance of [CsvFormatFactory]
+    pub fn new() -> Self {
+        Self { options: None }
+    }
+
+    /// Creates an instance of [CsvFormatFactory] with customized default 
options
+    pub fn new_with_options(options: CsvOptions) -> Self {
+        Self {
+            options: Some(options),
+        }
+    }
+}
+
+impl FileFormatFactory for CsvFormatFactory {
+    fn create(
+        &self,
+        state: &SessionState,
+        format_options: &HashMap<String, String>,
+    ) -> Result<Arc<dyn FileFormat>> {
+        let csv_options = match &self.options {
+            None => {
+                let mut table_options = state.default_table_options();
+                table_options.set_config_format(ConfigFileType::CSV);
+                table_options.alter_with_string_hash_map(format_options)?;
+                table_options.csv
+            }
+            Some(csv_options) => {
+                let mut csv_options = csv_options.clone();
+                for (k, v) in format_options {
+                    csv_options.set(k, v)?;
+                }
+                csv_options
+            }
+        };
+
+        Ok(Arc::new(CsvFormat::default().with_options(csv_options)))
+    }
+
+    fn default(&self) -> Arc<dyn FileFormat> {
+        Arc::new(CsvFormat::default())
+    }
+}
+
+impl GetExt for CsvFormatFactory {
+    fn get_ext(&self) -> String {
+        // Removes the dot, i.e. ".parquet" -> "parquet"
+        DEFAULT_CSV_EXTENSION[1..].to_string()
+    }
+}
+
 /// Character Separated Value `FileFormat` implementation.
 #[derive(Debug, Default)]
 pub struct CsvFormat {
@@ -206,6 +265,18 @@ impl FileFormat for CsvFormat {
         self
     }
 
+    fn get_ext(&self) -> String {
+        CsvFormatFactory::new().get_ext()
+    }
+
+    fn get_ext_with_compression(
+        &self,
+        file_compression_type: &FileCompressionType,
+    ) -> Result<String> {
+        let ext = self.get_ext();
+        Ok(format!("{}{}", ext, file_compression_type.get_ext()))
+    }
+
     async fn infer_schema(
         &self,
         state: &SessionState,
@@ -558,7 +629,6 @@ mod tests {
     use datafusion_common::cast::as_string_array;
     use datafusion_common::internal_err;
     use datafusion_common::stats::Precision;
-    use datafusion_common::{FileType, GetExt};
     use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};
     use datafusion_expr::{col, lit};
 
@@ -1060,9 +1130,9 @@ mod tests {
             .with_repartition_file_min_size(0)
             .with_target_partitions(n_partitions);
         let ctx = SessionContext::new_with_config(config);
-        let file_format = CsvFormat::default().with_has_header(false);
-        let listing_options = ListingOptions::new(Arc::new(file_format))
-            .with_file_extension(FileType::CSV.get_ext());
+        let file_format = 
Arc::new(CsvFormat::default().with_has_header(false));
+        let listing_options = ListingOptions::new(file_format.clone())
+            .with_file_extension(file_format.get_ext());
         ctx.register_listing_table(
             "empty",
             "tests/data/empty_files/all_empty/",
@@ -1113,9 +1183,9 @@ mod tests {
             .with_repartition_file_min_size(0)
             .with_target_partitions(n_partitions);
         let ctx = SessionContext::new_with_config(config);
-        let file_format = CsvFormat::default().with_has_header(false);
-        let listing_options = ListingOptions::new(Arc::new(file_format))
-            .with_file_extension(FileType::CSV.get_ext());
+        let file_format = 
Arc::new(CsvFormat::default().with_has_header(false));
+        let listing_options = ListingOptions::new(file_format.clone())
+            .with_file_extension(file_format.get_ext());
         ctx.register_listing_table(
             "empty",
             "tests/data/empty_files/some_empty",
diff --git 
a/datafusion/core/src/datasource/file_format/file_compression_type.rs 
b/datafusion/core/src/datasource/file_format/file_compression_type.rs
index c1fbe352d3..a054094822 100644
--- a/datafusion/core/src/datasource/file_format/file_compression_type.rs
+++ b/datafusion/core/src/datasource/file_format/file_compression_type.rs
@@ -22,7 +22,7 @@ use std::str::FromStr;
 use crate::error::{DataFusionError, Result};
 
 use datafusion_common::parsers::CompressionTypeVariant::{self, *};
-use datafusion_common::{FileType, GetExt};
+use datafusion_common::GetExt;
 
 #[cfg(feature = "compression")]
 use async_compression::tokio::bufread::{
@@ -112,6 +112,11 @@ impl FileCompressionType {
         variant: UNCOMPRESSED,
     };
 
+    /// Read only access to self.variant
+    pub fn get_variant(&self) -> &CompressionTypeVariant {
+        &self.variant
+    }
+
     /// The file is compressed or not
     pub const fn is_compressed(&self) -> bool {
         self.variant.is_compressed()
@@ -245,90 +250,16 @@ pub trait FileTypeExt {
     fn get_ext_with_compression(&self, c: FileCompressionType) -> 
Result<String>;
 }
 
-impl FileTypeExt for FileType {
-    fn get_ext_with_compression(&self, c: FileCompressionType) -> 
Result<String> {
-        let ext = self.get_ext();
-
-        match self {
-            FileType::JSON | FileType::CSV => Ok(format!("{}{}", ext, 
c.get_ext())),
-            FileType::AVRO | FileType::ARROW => match c.variant {
-                UNCOMPRESSED => Ok(ext),
-                _ => Err(DataFusionError::Internal(
-                    "FileCompressionType can be specified for CSV/JSON 
FileType.".into(),
-                )),
-            },
-            #[cfg(feature = "parquet")]
-            FileType::PARQUET => match c.variant {
-                UNCOMPRESSED => Ok(ext),
-                _ => Err(DataFusionError::Internal(
-                    "FileCompressionType can be specified for CSV/JSON 
FileType.".into(),
-                )),
-            },
-        }
-    }
-}
-
 #[cfg(test)]
 mod tests {
     use std::str::FromStr;
 
-    use crate::datasource::file_format::file_compression_type::{
-        FileCompressionType, FileTypeExt,
-    };
+    use 
crate::datasource::file_format::file_compression_type::FileCompressionType;
     use crate::error::DataFusionError;
 
-    use datafusion_common::file_options::file_type::FileType;
-
     use bytes::Bytes;
     use futures::StreamExt;
 
-    #[test]
-    fn get_ext_with_compression() {
-        for (file_type, compression, extension) in [
-            (FileType::CSV, FileCompressionType::UNCOMPRESSED, ".csv"),
-            (FileType::CSV, FileCompressionType::GZIP, ".csv.gz"),
-            (FileType::CSV, FileCompressionType::XZ, ".csv.xz"),
-            (FileType::CSV, FileCompressionType::BZIP2, ".csv.bz2"),
-            (FileType::CSV, FileCompressionType::ZSTD, ".csv.zst"),
-            (FileType::JSON, FileCompressionType::UNCOMPRESSED, ".json"),
-            (FileType::JSON, FileCompressionType::GZIP, ".json.gz"),
-            (FileType::JSON, FileCompressionType::XZ, ".json.xz"),
-            (FileType::JSON, FileCompressionType::BZIP2, ".json.bz2"),
-            (FileType::JSON, FileCompressionType::ZSTD, ".json.zst"),
-        ] {
-            assert_eq!(
-                file_type.get_ext_with_compression(compression).unwrap(),
-                extension
-            );
-        }
-
-        let mut ty_ext_tuple = vec![];
-        ty_ext_tuple.push((FileType::AVRO, ".avro"));
-        #[cfg(feature = "parquet")]
-        ty_ext_tuple.push((FileType::PARQUET, ".parquet"));
-
-        // Cannot specify compression for these file types
-        for (file_type, extension) in ty_ext_tuple {
-            assert_eq!(
-                file_type
-                    
.get_ext_with_compression(FileCompressionType::UNCOMPRESSED)
-                    .unwrap(),
-                extension
-            );
-            for compression in [
-                FileCompressionType::GZIP,
-                FileCompressionType::XZ,
-                FileCompressionType::BZIP2,
-                FileCompressionType::ZSTD,
-            ] {
-                assert!(matches!(
-                    file_type.get_ext_with_compression(compression),
-                    Err(DataFusionError::Internal(_))
-                ));
-            }
-        }
-    }
-
     #[test]
     fn from_str() {
         for (ext, compression_type) in [
diff --git a/datafusion/core/src/datasource/file_format/json.rs 
b/datafusion/core/src/datasource/file_format/json.rs
index d5347c498c..007b084f50 100644
--- a/datafusion/core/src/datasource/file_format/json.rs
+++ b/datafusion/core/src/datasource/file_format/json.rs
@@ -18,13 +18,14 @@
 //! [`JsonFormat`]: Line delimited JSON [`FileFormat`] abstractions
 
 use std::any::Any;
+use std::collections::HashMap;
 use std::fmt;
 use std::fmt::Debug;
 use std::io::BufReader;
 use std::sync::Arc;
 
 use super::write::orchestration::stateless_multipart_put;
-use super::{FileFormat, FileScanConfig};
+use super::{FileFormat, FileFormatFactory, FileScanConfig};
 use crate::datasource::file_format::file_compression_type::FileCompressionType;
 use crate::datasource::file_format::write::BatchSerializer;
 use crate::datasource::physical_plan::FileGroupDisplay;
@@ -41,9 +42,9 @@ use arrow::datatypes::SchemaRef;
 use arrow::json;
 use arrow::json::reader::{infer_json_schema_from_iterator, ValueIter};
 use arrow_array::RecordBatch;
-use datafusion_common::config::JsonOptions;
+use datafusion_common::config::{ConfigField, ConfigFileType, JsonOptions};
 use datafusion_common::file_options::json_writer::JsonWriterOptions;
-use datafusion_common::not_impl_err;
+use datafusion_common::{not_impl_err, GetExt, DEFAULT_JSON_EXTENSION};
 use datafusion_execution::TaskContext;
 use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};
 use datafusion_physical_plan::metrics::MetricsSet;
@@ -53,6 +54,63 @@ use async_trait::async_trait;
 use bytes::{Buf, Bytes};
 use object_store::{GetResultPayload, ObjectMeta, ObjectStore};
 
+#[derive(Default)]
+/// Factory struct used to create [JsonFormat]
+pub struct JsonFormatFactory {
+    options: Option<JsonOptions>,
+}
+
+impl JsonFormatFactory {
+    /// Creates an instance of [JsonFormatFactory]
+    pub fn new() -> Self {
+        Self { options: None }
+    }
+
+    /// Creates an instance of [JsonFormatFactory] with customized default 
options
+    pub fn new_with_options(options: JsonOptions) -> Self {
+        Self {
+            options: Some(options),
+        }
+    }
+}
+
+impl FileFormatFactory for JsonFormatFactory {
+    fn create(
+        &self,
+        state: &SessionState,
+        format_options: &HashMap<String, String>,
+    ) -> Result<Arc<dyn FileFormat>> {
+        let json_options = match &self.options {
+            None => {
+                let mut table_options = state.default_table_options();
+                table_options.set_config_format(ConfigFileType::JSON);
+                table_options.alter_with_string_hash_map(format_options)?;
+                table_options.json
+            }
+            Some(json_options) => {
+                let mut json_options = json_options.clone();
+                for (k, v) in format_options {
+                    json_options.set(k, v)?;
+                }
+                json_options
+            }
+        };
+
+        Ok(Arc::new(JsonFormat::default().with_options(json_options)))
+    }
+
+    fn default(&self) -> Arc<dyn FileFormat> {
+        Arc::new(JsonFormat::default())
+    }
+}
+
+impl GetExt for JsonFormatFactory {
+    fn get_ext(&self) -> String {
+        // Removes the dot, i.e. ".parquet" -> "parquet"
+        DEFAULT_JSON_EXTENSION[1..].to_string()
+    }
+}
+
 /// New line delimited JSON `FileFormat` implementation.
 #[derive(Debug, Default)]
 pub struct JsonFormat {
@@ -95,6 +153,18 @@ impl FileFormat for JsonFormat {
         self
     }
 
+    fn get_ext(&self) -> String {
+        JsonFormatFactory::new().get_ext()
+    }
+
+    fn get_ext_with_compression(
+        &self,
+        file_compression_type: &FileCompressionType,
+    ) -> Result<String> {
+        let ext = self.get_ext();
+        Ok(format!("{}{}", ext, file_compression_type.get_ext()))
+    }
+
     async fn infer_schema(
         &self,
         _state: &SessionState,
diff --git a/datafusion/core/src/datasource/file_format/mod.rs 
b/datafusion/core/src/datasource/file_format/mod.rs
index 9462cde436..1aa93a106a 100644
--- a/datafusion/core/src/datasource/file_format/mod.rs
+++ b/datafusion/core/src/datasource/file_format/mod.rs
@@ -32,7 +32,8 @@ pub mod parquet;
 pub mod write;
 
 use std::any::Any;
-use std::fmt;
+use std::collections::HashMap;
+use std::fmt::{self, Display};
 use std::sync::Arc;
 
 use crate::arrow::datatypes::SchemaRef;
@@ -41,12 +42,29 @@ use crate::error::Result;
 use crate::execution::context::SessionState;
 use crate::physical_plan::{ExecutionPlan, Statistics};
 
-use datafusion_common::not_impl_err;
+use datafusion_common::file_options::file_type::FileType;
+use datafusion_common::{internal_err, not_impl_err, GetExt};
 use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};
 
 use async_trait::async_trait;
+use file_compression_type::FileCompressionType;
 use object_store::{ObjectMeta, ObjectStore};
 
+/// Factory for creating [`FileFormat`] instances based on session and command 
level options
+///
+/// Users can provide their own `FileFormatFactory` to support arbitrary file 
formats
+pub trait FileFormatFactory: Sync + Send + GetExt {
+    /// Initialize a [FileFormat] and configure based on session and command 
level options
+    fn create(
+        &self,
+        state: &SessionState,
+        format_options: &HashMap<String, String>,
+    ) -> Result<Arc<dyn FileFormat>>;
+
+    /// Initialize a [FileFormat] with all options set to default values
+    fn default(&self) -> Arc<dyn FileFormat>;
+}
+
 /// This trait abstracts all the file format specific implementations
 /// from the [`TableProvider`]. This helps code re-utilization across
 /// providers that support the same file formats.
@@ -58,6 +76,15 @@ pub trait FileFormat: Send + Sync + fmt::Debug {
     /// downcast to a specific implementation.
     fn as_any(&self) -> &dyn Any;
 
+    /// Returns the extension for this FileFormat, e.g. "file.csv" -> csv
+    fn get_ext(&self) -> String;
+
+    /// Returns the extension for this FileFormat when compressed, e.g. 
"file.csv.gz" -> csv
+    fn get_ext_with_compression(
+        &self,
+        _file_compression_type: &FileCompressionType,
+    ) -> Result<String>;
+
     /// Infer the common schema of the provided objects. The objects will 
usually
     /// be analysed up to a given number of records or files (as specified in 
the
     /// format config) then give the estimated common schema. This might fail 
if
@@ -106,6 +133,67 @@ pub trait FileFormat: Send + Sync + fmt::Debug {
     }
 }
 
+/// A container of [FileFormatFactory] which also implements [FileType].
+/// This enables converting a dyn FileFormat to a dyn FileType.
+/// The former trait is a superset of the latter trait, which includes 
execution time
+/// relevant methods. [FileType] is only used in logical planning and only 
implements
+/// the subset of methods required during logical planning.
+pub struct DefaultFileType {
+    file_format_factory: Arc<dyn FileFormatFactory>,
+}
+
+impl DefaultFileType {
+    /// Constructs a [DefaultFileType] wrapper from a [FileFormatFactory]
+    pub fn new(file_format_factory: Arc<dyn FileFormatFactory>) -> Self {
+        Self {
+            file_format_factory,
+        }
+    }
+}
+
+impl FileType for DefaultFileType {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+}
+
+impl Display for DefaultFileType {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        self.file_format_factory.default().fmt(f)
+    }
+}
+
+impl GetExt for DefaultFileType {
+    fn get_ext(&self) -> String {
+        self.file_format_factory.get_ext()
+    }
+}
+
+/// Converts a [FileFormatFactory] to a [FileType]
+pub fn format_as_file_type(
+    file_format_factory: Arc<dyn FileFormatFactory>,
+) -> Arc<dyn FileType> {
+    Arc::new(DefaultFileType {
+        file_format_factory,
+    })
+}
+
+/// Converts a [FileType] to a [FileFormatFactory].
+/// Returns an error if the [FileType] cannot be
+/// downcasted to a [DefaultFileType].
+pub fn file_type_to_format(
+    file_type: &Arc<dyn FileType>,
+) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
+    match file_type
+        .as_ref()
+        .as_any()
+        .downcast_ref::<DefaultFileType>()
+    {
+        Some(source) => Ok(source.file_format_factory.clone()),
+        _ => internal_err!("FileType was not DefaultFileType"),
+    }
+}
+
 #[cfg(test)]
 pub(crate) mod test_util {
     use std::ops::Range;
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs 
b/datafusion/core/src/datasource/file_format/parquet.rs
index 4204593eba..44c9cc4ec4 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -24,7 +24,7 @@ use std::sync::Arc;
 
 use super::write::demux::start_demuxer_task;
 use super::write::{create_writer, SharedBuffer};
-use super::{FileFormat, FileScanConfig};
+use super::{FileFormat, FileFormatFactory, FileScanConfig};
 use crate::arrow::array::RecordBatch;
 use crate::arrow::datatypes::{Fields, Schema, SchemaRef};
 use crate::datasource::file_format::file_compression_type::FileCompressionType;
@@ -39,11 +39,13 @@ use crate::physical_plan::{
 };
 
 use arrow::compute::sum;
-use datafusion_common::config::TableParquetOptions;
+use datafusion_common::config::{ConfigField, ConfigFileType, 
TableParquetOptions};
 use datafusion_common::file_options::parquet_writer::ParquetWriterOptions;
+use datafusion_common::parsers::CompressionTypeVariant;
 use datafusion_common::stats::Precision;
 use datafusion_common::{
-    exec_err, internal_datafusion_err, not_impl_err, DataFusionError,
+    exec_err, internal_datafusion_err, not_impl_err, DataFusionError, GetExt,
+    DEFAULT_PARQUET_EXTENSION,
 };
 use datafusion_common_runtime::SpawnedTask;
 use datafusion_execution::TaskContext;
@@ -53,6 +55,7 @@ use datafusion_physical_plan::metrics::MetricsSet;
 
 use async_trait::async_trait;
 use bytes::{BufMut, BytesMut};
+use hashbrown::HashMap;
 use log::debug;
 use object_store::buffered::BufWriter;
 use parquet::arrow::arrow_writer::{
@@ -75,7 +78,6 @@ use crate::datasource::physical_plan::parquet::{
     ParquetExecBuilder, StatisticsConverter,
 };
 use futures::{StreamExt, TryStreamExt};
-use hashbrown::HashMap;
 use object_store::path::Path;
 use object_store::{ObjectMeta, ObjectStore};
 
@@ -87,6 +89,65 @@ const INITIAL_BUFFER_BYTES: usize = 1048576;
 /// this size, it is flushed to object store
 const BUFFER_FLUSH_BYTES: usize = 1024000;
 
+#[derive(Default)]
+/// Factory struct used to create [ParquetFormat]
+pub struct ParquetFormatFactory {
+    options: Option<TableParquetOptions>,
+}
+
+impl ParquetFormatFactory {
+    /// Creates an instance of [ParquetFormatFactory]
+    pub fn new() -> Self {
+        Self { options: None }
+    }
+
+    /// Creates an instance of [ParquetFormatFactory] with customized default 
options
+    pub fn new_with_options(options: TableParquetOptions) -> Self {
+        Self {
+            options: Some(options),
+        }
+    }
+}
+
+impl FileFormatFactory for ParquetFormatFactory {
+    fn create(
+        &self,
+        state: &SessionState,
+        format_options: &std::collections::HashMap<String, String>,
+    ) -> Result<Arc<dyn FileFormat>> {
+        let parquet_options = match &self.options {
+            None => {
+                let mut table_options = state.default_table_options();
+                table_options.set_config_format(ConfigFileType::PARQUET);
+                table_options.alter_with_string_hash_map(format_options)?;
+                table_options.parquet
+            }
+            Some(parquet_options) => {
+                let mut parquet_options = parquet_options.clone();
+                for (k, v) in format_options {
+                    parquet_options.set(k, v)?;
+                }
+                parquet_options
+            }
+        };
+
+        Ok(Arc::new(
+            ParquetFormat::default().with_options(parquet_options),
+        ))
+    }
+
+    fn default(&self) -> Arc<dyn FileFormat> {
+        Arc::new(ParquetFormat::default())
+    }
+}
+
+impl GetExt for ParquetFormatFactory {
+    fn get_ext(&self) -> String {
+        // Removes the dot, i.e. ".parquet" -> "parquet"
+        DEFAULT_PARQUET_EXTENSION[1..].to_string()
+    }
+}
+
 /// The Apache Parquet `FileFormat` implementation
 #[derive(Debug, Default)]
 pub struct ParquetFormat {
@@ -188,6 +249,23 @@ impl FileFormat for ParquetFormat {
         self
     }
 
+    fn get_ext(&self) -> String {
+        ParquetFormatFactory::new().get_ext()
+    }
+
+    fn get_ext_with_compression(
+        &self,
+        file_compression_type: &FileCompressionType,
+    ) -> Result<String> {
+        let ext = self.get_ext();
+        match file_compression_type.get_variant() {
+            CompressionTypeVariant::UNCOMPRESSED => Ok(ext),
+            _ => Err(DataFusionError::Internal(
+                "Parquet FileFormat does not support compression.".into(),
+            )),
+        }
+    }
+
     async fn infer_schema(
         &self,
         state: &SessionState,
diff --git a/datafusion/core/src/datasource/listing/table.rs 
b/datafusion/core/src/datasource/listing/table.rs
index 7f5e80c498..74aca82b3e 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -24,20 +24,11 @@ use std::{any::Any, sync::Arc};
 use super::helpers::{expr_applicable_for_cols, pruned_partition_list, 
split_files};
 use super::PartitionedFile;
 
-#[cfg(feature = "parquet")]
-use crate::datasource::file_format::parquet::ParquetFormat;
 use crate::datasource::{
     create_ordering, get_statistics_with_limit, TableProvider, TableType,
 };
 use crate::datasource::{
-    file_format::{
-        arrow::ArrowFormat,
-        avro::AvroFormat,
-        csv::CsvFormat,
-        file_compression_type::{FileCompressionType, FileTypeExt},
-        json::JsonFormat,
-        FileFormat,
-    },
+    file_format::{file_compression_type::FileCompressionType, FileFormat},
     listing::ListingTableUrl,
     physical_plan::{FileScanConfig, FileSinkConfig},
 };
@@ -51,7 +42,8 @@ use crate::{
 use arrow::datatypes::{DataType, Field, SchemaBuilder, SchemaRef};
 use arrow_schema::Schema;
 use datafusion_common::{
-    internal_err, plan_err, project_schema, Constraints, FileType, SchemaExt, 
ToDFSchema,
+    config_datafusion_err, internal_err, plan_err, project_schema, Constraints,
+    SchemaExt, ToDFSchema,
 };
 use datafusion_execution::cache::cache_manager::FileStatisticsCache;
 use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache;
@@ -119,9 +111,7 @@ impl ListingTableConfig {
         }
     }
 
-    fn infer_file_type(path: &str) -> Result<(FileType, String)> {
-        let err_msg = format!("Unable to infer file type from path: {path}");
-
+    fn infer_file_extension(path: &str) -> Result<String> {
         let mut exts = path.rsplit('.');
 
         let mut splitted = exts.next().unwrap_or("");
@@ -133,14 +123,7 @@ impl ListingTableConfig {
             splitted = exts.next().unwrap_or("");
         }
 
-        let file_type = FileType::from_str(splitted)
-            .map_err(|_| DataFusionError::Internal(err_msg.to_owned()))?;
-
-        let ext = file_type
-            .get_ext_with_compression(file_compression_type.to_owned())
-            .map_err(|_| DataFusionError::Internal(err_msg))?;
-
-        Ok((file_type, ext))
+        Ok(splitted.to_string())
     }
 
     /// Infer `ListingOptions` based on `table_path` suffix.
@@ -161,25 +144,15 @@ impl ListingTableConfig {
             .await
             .ok_or_else(|| DataFusionError::Internal("No files for 
table".into()))??;
 
-        let (file_type, file_extension) =
-            ListingTableConfig::infer_file_type(file.location.as_ref())?;
+        let file_extension =
+            ListingTableConfig::infer_file_extension(file.location.as_ref())?;
 
-        let mut table_options = state.default_table_options();
-        table_options.set_file_format(file_type.clone());
-        let file_format: Arc<dyn FileFormat> = match file_type {
-            FileType::CSV => {
-                Arc::new(CsvFormat::default().with_options(table_options.csv))
-            }
-            #[cfg(feature = "parquet")]
-            FileType::PARQUET => {
-                
Arc::new(ParquetFormat::default().with_options(table_options.parquet))
-            }
-            FileType::AVRO => Arc::new(AvroFormat),
-            FileType::JSON => {
-                
Arc::new(JsonFormat::default().with_options(table_options.json))
-            }
-            FileType::ARROW => Arc::new(ArrowFormat),
-        };
+        let file_format = state
+            .get_file_format_factory(&file_extension)
+            .ok_or(config_datafusion_err!(
+                "No file_format found with extension {file_extension}"
+            ))?
+            .create(state, &HashMap::new())?;
 
         let listing_options = ListingOptions::new(file_format)
             .with_file_extension(file_extension)
@@ -1060,6 +1033,10 @@ impl ListingTable {
 #[cfg(test)]
 mod tests {
     use super::*;
+    use crate::datasource::file_format::avro::AvroFormat;
+    use crate::datasource::file_format::csv::CsvFormat;
+    use crate::datasource::file_format::json::JsonFormat;
+    use crate::datasource::file_format::parquet::ParquetFormat;
     #[cfg(feature = "parquet")]
     use crate::datasource::{provider_as_source, MemTable};
     use crate::execution::options::ArrowReadOptions;
@@ -1073,7 +1050,7 @@ mod tests {
     use arrow::record_batch::RecordBatch;
     use arrow_schema::SortOptions;
     use datafusion_common::stats::Precision;
-    use datafusion_common::{assert_contains, GetExt, ScalarValue};
+    use datafusion_common::{assert_contains, ScalarValue};
     use datafusion_expr::{BinaryExpr, LogicalPlanBuilder, Operator};
     use datafusion_physical_expr::PhysicalSortExpr;
     use datafusion_physical_plan::ExecutionPlanProperties;
@@ -1104,6 +1081,8 @@ mod tests {
     #[cfg(feature = "parquet")]
     #[tokio::test]
     async fn load_table_stats_by_default() -> Result<()> {
+        use crate::datasource::file_format::parquet::ParquetFormat;
+
         let testdata = crate::test_util::parquet_test_data();
         let filename = format!("{}/{}", testdata, "alltypes_plain.parquet");
         let table_path = ListingTableUrl::parse(filename).unwrap();
@@ -1128,6 +1107,8 @@ mod tests {
     #[cfg(feature = "parquet")]
     #[tokio::test]
     async fn load_table_stats_when_no_stats() -> Result<()> {
+        use crate::datasource::file_format::parquet::ParquetFormat;
+
         let testdata = crate::test_util::parquet_test_data();
         let filename = format!("{}/{}", testdata, "alltypes_plain.parquet");
         let table_path = ListingTableUrl::parse(filename).unwrap();
@@ -1162,7 +1143,10 @@ mod tests {
         let options = ListingOptions::new(Arc::new(ParquetFormat::default()));
         let schema = options.infer_schema(&state, &table_path).await.unwrap();
 
-        use crate::physical_plan::expressions::col as physical_col;
+        use crate::{
+            datasource::file_format::parquet::ParquetFormat,
+            physical_plan::expressions::col as physical_col,
+        };
         use std::ops::Add;
 
         // (file_sort_order, expected_result)
@@ -1253,7 +1237,7 @@ mod tests {
         register_test_store(&ctx, &[(&path, 100)]);
 
         let opt = ListingOptions::new(Arc::new(AvroFormat {}))
-            .with_file_extension(FileType::AVRO.get_ext())
+            .with_file_extension(AvroFormat.get_ext())
             .with_table_partition_cols(vec![(String::from("p1"), 
DataType::Utf8)])
             .with_target_partitions(4);
 
@@ -1516,7 +1500,7 @@ mod tests {
             "10".into(),
         );
         helper_test_append_new_files_to_table(
-            FileType::JSON,
+            JsonFormat::default().get_ext(),
             FileCompressionType::UNCOMPRESSED,
             Some(config_map),
             2,
@@ -1534,7 +1518,7 @@ mod tests {
             "10".into(),
         );
         helper_test_append_new_files_to_table(
-            FileType::CSV,
+            CsvFormat::default().get_ext(),
             FileCompressionType::UNCOMPRESSED,
             Some(config_map),
             2,
@@ -1552,7 +1536,7 @@ mod tests {
             "10".into(),
         );
         helper_test_append_new_files_to_table(
-            FileType::PARQUET,
+            ParquetFormat::default().get_ext(),
             FileCompressionType::UNCOMPRESSED,
             Some(config_map),
             2,
@@ -1570,7 +1554,7 @@ mod tests {
             "20".into(),
         );
         helper_test_append_new_files_to_table(
-            FileType::PARQUET,
+            ParquetFormat::default().get_ext(),
             FileCompressionType::UNCOMPRESSED,
             Some(config_map),
             1,
@@ -1756,7 +1740,7 @@ mod tests {
         );
         config_map.insert("datafusion.execution.batch_size".into(), 
"1".into());
         helper_test_append_new_files_to_table(
-            FileType::PARQUET,
+            ParquetFormat::default().get_ext(),
             FileCompressionType::UNCOMPRESSED,
             Some(config_map),
             2,
@@ -1774,7 +1758,7 @@ mod tests {
             "zstd".into(),
         );
         let e = helper_test_append_new_files_to_table(
-            FileType::PARQUET,
+            ParquetFormat::default().get_ext(),
             FileCompressionType::UNCOMPRESSED,
             Some(config_map),
             2,
@@ -1787,7 +1771,7 @@ mod tests {
     }
 
     async fn helper_test_append_new_files_to_table(
-        file_type: FileType,
+        file_type_ext: String,
         file_compression_type: FileCompressionType,
         session_config_map: Option<HashMap<String, String>>,
         expected_n_files_per_insert: usize,
@@ -1824,8 +1808,8 @@ mod tests {
 
         // Register appropriate table depending on file_type we want to test
         let tmp_dir = TempDir::new()?;
-        match file_type {
-            FileType::CSV => {
+        match file_type_ext.as_str() {
+            "csv" => {
                 session_ctx
                     .register_csv(
                         "t",
@@ -1836,7 +1820,7 @@ mod tests {
                     )
                     .await?;
             }
-            FileType::JSON => {
+            "json" => {
                 session_ctx
                     .register_json(
                         "t",
@@ -1847,7 +1831,7 @@ mod tests {
                     )
                     .await?;
             }
-            FileType::PARQUET => {
+            "parquet" => {
                 session_ctx
                     .register_parquet(
                         "t",
@@ -1856,7 +1840,7 @@ mod tests {
                     )
                     .await?;
             }
-            FileType::AVRO => {
+            "avro" => {
                 session_ctx
                     .register_avro(
                         "t",
@@ -1865,7 +1849,7 @@ mod tests {
                     )
                     .await?;
             }
-            FileType::ARROW => {
+            "arrow" => {
                 session_ctx
                     .register_arrow(
                         "t",
@@ -1874,6 +1858,7 @@ mod tests {
                     )
                     .await?;
             }
+            _ => panic!("Unrecognized file extension {file_type_ext}"),
         }
 
         // Create and register the source table with the provided schema and 
inserted data
diff --git a/datafusion/core/src/datasource/listing_table_factory.rs 
b/datafusion/core/src/datasource/listing_table_factory.rs
index 6e47498243..1d4d084818 100644
--- a/datafusion/core/src/datasource/listing_table_factory.rs
+++ b/datafusion/core/src/datasource/listing_table_factory.rs
@@ -18,14 +18,8 @@
 //! Factory for creating ListingTables with default options
 
 use std::path::Path;
-use std::str::FromStr;
 use std::sync::Arc;
 
-#[cfg(feature = "parquet")]
-use crate::datasource::file_format::parquet::ParquetFormat;
-use crate::datasource::file_format::{
-    arrow::ArrowFormat, avro::AvroFormat, csv::CsvFormat, json::JsonFormat, 
FileFormat,
-};
 use crate::datasource::listing::{
     ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
 };
@@ -34,8 +28,8 @@ use crate::datasource::TableProvider;
 use crate::execution::context::SessionState;
 
 use arrow::datatypes::{DataType, SchemaRef};
-use datafusion_common::Result;
-use datafusion_common::{arrow_datafusion_err, DataFusionError, FileType};
+use datafusion_common::{arrow_datafusion_err, DataFusionError};
+use datafusion_common::{config_datafusion_err, Result};
 use datafusion_expr::CreateExternalTable;
 
 use async_trait::async_trait;
@@ -58,28 +52,15 @@ impl TableProviderFactory for ListingTableFactory {
         state: &SessionState,
         cmd: &CreateExternalTable,
     ) -> Result<Arc<dyn TableProvider>> {
-        let file_type = FileType::from_str(cmd.file_type.as_str()).map_err(|_| 
{
-            DataFusionError::Execution(format!("Unknown FileType {}", 
cmd.file_type))
-        })?;
-        let mut table_options = state.default_table_options();
-        table_options.set_file_format(file_type.clone());
-        table_options.alter_with_string_hash_map(&cmd.options)?;
+        let file_format = state
+            .get_file_format_factory(cmd.file_type.as_str())
+            .ok_or(config_datafusion_err!(
+                "Unable to create table with format {}! Could not find 
FileFormat.",
+                cmd.file_type
+            ))?
+            .create(state, &cmd.options)?;
 
         let file_extension = get_extension(cmd.location.as_str());
-        let file_format: Arc<dyn FileFormat> = match file_type {
-            FileType::CSV => {
-                Arc::new(CsvFormat::default().with_options(table_options.csv))
-            }
-            #[cfg(feature = "parquet")]
-            FileType::PARQUET => {
-                
Arc::new(ParquetFormat::default().with_options(table_options.parquet))
-            }
-            FileType::AVRO => Arc::new(AvroFormat),
-            FileType::JSON => {
-                
Arc::new(JsonFormat::default().with_options(table_options.json))
-            }
-            FileType::ARROW => Arc::new(ArrowFormat),
-        };
 
         let (provided_schema, table_partition_cols) = if 
cmd.schema.fields().is_empty() {
             (
@@ -166,7 +147,9 @@ mod tests {
     use std::collections::HashMap;
 
     use super::*;
-    use crate::execution::context::SessionContext;
+    use crate::{
+        datasource::file_format::csv::CsvFormat, 
execution::context::SessionContext,
+    };
 
     use datafusion_common::{Constraints, DFSchema, TableReference};
 
diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs 
b/datafusion/core/src/datasource/physical_plan/csv.rs
index c06c630c45..327fbd976e 100644
--- a/datafusion/core/src/datasource/physical_plan/csv.rs
+++ b/datafusion/core/src/datasource/physical_plan/csv.rs
@@ -534,13 +534,13 @@ mod tests {
 
     use super::*;
     use crate::dataframe::DataFrameWriteOptions;
+    use crate::datasource::file_format::csv::CsvFormat;
     use crate::prelude::*;
     use crate::test::{partitioned_csv_config, partitioned_file_groups};
     use crate::{scalar::ScalarValue, test_util::aggr_test_schema};
 
     use arrow::datatypes::*;
     use datafusion_common::test_util::arrow_test_data;
-    use datafusion_common::FileType;
 
     use object_store::chunked::ChunkedStore;
     use object_store::local::LocalFileSystem;
@@ -561,6 +561,8 @@ mod tests {
     async fn csv_exec_with_projection(
         file_compression_type: FileCompressionType,
     ) -> Result<()> {
+        use crate::datasource::file_format::csv::CsvFormat;
+
         let session_ctx = SessionContext::new();
         let task_ctx = session_ctx.task_ctx();
         let file_schema = aggr_test_schema();
@@ -572,7 +574,7 @@ mod tests {
             path.as_str(),
             filename,
             1,
-            FileType::CSV,
+            Arc::new(CsvFormat::default()),
             file_compression_type.to_owned(),
             tmp_dir.path(),
         )?;
@@ -627,6 +629,8 @@ mod tests {
     async fn csv_exec_with_mixed_order_projection(
         file_compression_type: FileCompressionType,
     ) -> Result<()> {
+        use crate::datasource::file_format::csv::CsvFormat;
+
         let cfg = 
SessionConfig::new().set_str("datafusion.catalog.has_header", "true");
         let session_ctx = SessionContext::new_with_config(cfg);
         let task_ctx = session_ctx.task_ctx();
@@ -639,7 +643,7 @@ mod tests {
             path.as_str(),
             filename,
             1,
-            FileType::CSV,
+            Arc::new(CsvFormat::default()),
             file_compression_type.to_owned(),
             tmp_dir.path(),
         )?;
@@ -694,6 +698,8 @@ mod tests {
     async fn csv_exec_with_limit(
         file_compression_type: FileCompressionType,
     ) -> Result<()> {
+        use crate::datasource::file_format::csv::CsvFormat;
+
         let cfg = 
SessionConfig::new().set_str("datafusion.catalog.has_header", "true");
         let session_ctx = SessionContext::new_with_config(cfg);
         let task_ctx = session_ctx.task_ctx();
@@ -706,7 +712,7 @@ mod tests {
             path.as_str(),
             filename,
             1,
-            FileType::CSV,
+            Arc::new(CsvFormat::default()),
             file_compression_type.to_owned(),
             tmp_dir.path(),
         )?;
@@ -759,6 +765,8 @@ mod tests {
     async fn csv_exec_with_missing_column(
         file_compression_type: FileCompressionType,
     ) -> Result<()> {
+        use crate::datasource::file_format::csv::CsvFormat;
+
         let session_ctx = SessionContext::new();
         let task_ctx = session_ctx.task_ctx();
         let file_schema = aggr_test_schema_with_missing_col();
@@ -770,7 +778,7 @@ mod tests {
             path.as_str(),
             filename,
             1,
-            FileType::CSV,
+            Arc::new(CsvFormat::default()),
             file_compression_type.to_owned(),
             tmp_dir.path(),
         )?;
@@ -813,6 +821,8 @@ mod tests {
     async fn csv_exec_with_partition(
         file_compression_type: FileCompressionType,
     ) -> Result<()> {
+        use crate::datasource::file_format::csv::CsvFormat;
+
         let session_ctx = SessionContext::new();
         let task_ctx = session_ctx.task_ctx();
         let file_schema = aggr_test_schema();
@@ -824,7 +834,7 @@ mod tests {
             path.as_str(),
             filename,
             1,
-            FileType::CSV,
+            Arc::new(CsvFormat::default()),
             file_compression_type.to_owned(),
             tmp_dir.path(),
         )?;
@@ -929,7 +939,7 @@ mod tests {
             path.as_str(),
             filename,
             1,
-            FileType::CSV,
+            Arc::new(CsvFormat::default()),
             file_compression_type.to_owned(),
             tmp_dir.path(),
         )
diff --git a/datafusion/core/src/datasource/physical_plan/json.rs 
b/datafusion/core/src/datasource/physical_plan/json.rs
index e97554a791..c051b5d9b5 100644
--- a/datafusion/core/src/datasource/physical_plan/json.rs
+++ b/datafusion/core/src/datasource/physical_plan/json.rs
@@ -384,7 +384,6 @@ mod tests {
 
     use super::*;
     use crate::dataframe::DataFrameWriteOptions;
-    use crate::datasource::file_format::file_compression_type::FileTypeExt;
     use crate::datasource::file_format::{json::JsonFormat, FileFormat};
     use crate::datasource::object_store::ObjectStoreUrl;
     use crate::execution::context::SessionState;
@@ -397,7 +396,6 @@ mod tests {
     use arrow::array::Array;
     use arrow::datatypes::{Field, SchemaBuilder};
     use datafusion_common::cast::{as_int32_array, as_int64_array, 
as_string_array};
-    use datafusion_common::FileType;
     use object_store::chunked::ChunkedStore;
     use object_store::local::LocalFileSystem;
     use rstest::*;
@@ -419,7 +417,7 @@ mod tests {
             TEST_DATA_BASE,
             filename,
             1,
-            FileType::JSON,
+            Arc::new(JsonFormat::default()),
             file_compression_type.to_owned(),
             work_dir,
         )
@@ -453,7 +451,7 @@ mod tests {
             TEST_DATA_BASE,
             filename,
             1,
-            FileType::JSON,
+            Arc::new(JsonFormat::default()),
             file_compression_type.to_owned(),
             tmp_dir.path(),
         )
@@ -472,8 +470,8 @@ mod tests {
         let path_buf = Path::new(url.path()).join(path);
         let path = path_buf.to_str().unwrap();
 
-        let ext = FileType::JSON
-            .get_ext_with_compression(file_compression_type.to_owned())
+        let ext = JsonFormat::default()
+            .get_ext_with_compression(&file_compression_type)
             .unwrap();
 
         let read_options = NdJsonReadOptions::default()
@@ -904,8 +902,8 @@ mod tests {
         let url: &Url = store_url.as_ref();
         let path_buf = Path::new(url.path()).join(path);
         let path = path_buf.to_str().unwrap();
-        let ext = FileType::JSON
-            .get_ext_with_compression(file_compression_type.to_owned())
+        let ext = JsonFormat::default()
+            .get_ext_with_compression(&file_compression_type)
             .unwrap();
 
         let read_option = NdJsonReadOptions::default()
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs 
b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
index ea7faac08c..9d5c64719e 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
@@ -799,7 +799,7 @@ mod tests {
     use arrow::datatypes::{Field, Schema, SchemaBuilder};
     use arrow::record_batch::RecordBatch;
     use arrow_schema::Fields;
-    use datafusion_common::{assert_contains, FileType, GetExt, ScalarValue};
+    use datafusion_common::{assert_contains, ScalarValue};
     use datafusion_expr::{col, lit, when, Expr};
     use datafusion_physical_expr::planner::logical2physical;
     use datafusion_physical_plan::ExecutionPlanProperties;
@@ -1994,7 +1994,7 @@ mod tests {
         // Configure listing options
         let file_format = ParquetFormat::default().with_enable_pruning(true);
         let listing_options = ListingOptions::new(Arc::new(file_format))
-            .with_file_extension(FileType::PARQUET.get_ext());
+            .with_file_extension(ParquetFormat::default().get_ext());
 
         // execute a simple query and write the results to parquet
         let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out";
diff --git a/datafusion/core/src/execution/session_state.rs 
b/datafusion/core/src/execution/session_state.rs
index d2bac134b5..2b7867e720 100644
--- a/datafusion/core/src/execution/session_state.rs
+++ b/datafusion/core/src/execution/session_state.rs
@@ -25,6 +25,13 @@ use crate::catalog::{
     MemoryCatalogProviderList,
 };
 use crate::datasource::cte_worktable::CteWorkTable;
+use crate::datasource::file_format::arrow::ArrowFormatFactory;
+use crate::datasource::file_format::avro::AvroFormatFactory;
+use crate::datasource::file_format::csv::CsvFormatFactory;
+use crate::datasource::file_format::json::JsonFormatFactory;
+#[cfg(feature = "parquet")]
+use crate::datasource::file_format::parquet::ParquetFormatFactory;
+use crate::datasource::file_format::{format_as_file_type, FileFormatFactory};
 use crate::datasource::function::{TableFunction, TableFunctionImpl};
 use crate::datasource::provider::{DefaultTableFactory, TableProviderFactory};
 use crate::datasource::provider_as_source;
@@ -41,10 +48,11 @@ use chrono::{DateTime, Utc};
 use datafusion_common::alias::AliasGenerator;
 use datafusion_common::config::{ConfigExtension, ConfigOptions, TableOptions};
 use datafusion_common::display::{PlanType, StringifiedPlan, ToStringifiedPlan};
+use datafusion_common::file_options::file_type::FileType;
 use datafusion_common::tree_node::TreeNode;
 use datafusion_common::{
-    not_impl_err, plan_datafusion_err, DFSchema, DataFusionError, 
ResolvedTableReference,
-    TableReference,
+    config_err, not_impl_err, plan_datafusion_err, DFSchema, DataFusionError,
+    ResolvedTableReference, TableReference,
 };
 use datafusion_execution::config::SessionConfig;
 use datafusion_execution::object_store::ObjectStoreUrl;
@@ -109,6 +117,8 @@ pub struct SessionState {
     window_functions: HashMap<String, Arc<WindowUDF>>,
     /// Deserializer registry for extensions.
     serializer_registry: Arc<dyn SerializerRegistry>,
+    /// Holds registered external FileFormat implementations
+    file_formats: HashMap<String, Arc<dyn FileFormatFactory>>,
     /// Session configuration
     config: SessionConfig,
     /// Table options
@@ -230,6 +240,7 @@ impl SessionState {
             aggregate_functions: HashMap::new(),
             window_functions: HashMap::new(),
             serializer_registry: Arc::new(EmptySerializerRegistry),
+            file_formats: HashMap::new(),
             table_options: 
TableOptions::default_from_session_config(config.options()),
             config,
             execution_props: ExecutionProps::new(),
@@ -238,6 +249,37 @@ impl SessionState {
             function_factory: None,
         };
 
+        #[cfg(feature = "parquet")]
+        if let Err(e) =
+            
new_self.register_file_format(Arc::new(ParquetFormatFactory::new()), false)
+        {
+            log::info!("Unable to register default ParquetFormat: {e}")
+        };
+
+        if let Err(e) =
+            new_self.register_file_format(Arc::new(JsonFormatFactory::new()), 
false)
+        {
+            log::info!("Unable to register default JsonFormat: {e}")
+        };
+
+        if let Err(e) =
+            new_self.register_file_format(Arc::new(CsvFormatFactory::new()), 
false)
+        {
+            log::info!("Unable to register default CsvFormat: {e}")
+        };
+
+        if let Err(e) =
+            new_self.register_file_format(Arc::new(ArrowFormatFactory::new()), 
false)
+        {
+            log::info!("Unable to register default ArrowFormat: {e}")
+        };
+
+        if let Err(e) =
+            new_self.register_file_format(Arc::new(AvroFormatFactory::new()), 
false)
+        {
+            log::info!("Unable to register default AvroFormat: {e}")
+        };
+
         // register built in functions
         functions::register_all(&mut new_self)
             .expect("can not register built in functions");
@@ -811,6 +853,31 @@ impl SessionState {
         self.table_options.extensions.insert(extension)
     }
 
+    /// Adds or updates a [FileFormatFactory] which can be used with COPY TO 
or CREATE EXTERNAL TABLE statements for reading
+    /// and writing files of custom formats.
+    pub fn register_file_format(
+        &mut self,
+        file_format: Arc<dyn FileFormatFactory>,
+        overwrite: bool,
+    ) -> Result<(), DataFusionError> {
+        let ext = file_format.get_ext().to_lowercase();
+        match (self.file_formats.entry(ext.clone()), overwrite){
+            (Entry::Vacant(e), _) => {e.insert(file_format);},
+            (Entry::Occupied(mut e), true)  => {e.insert(file_format);},
+            (Entry::Occupied(_), false) => return config_err!("File type 
already registered for extension {ext}. Set overwrite to true to replace this 
extension."),
+        };
+        Ok(())
+    }
+
+    /// Retrieves a [FileFormatFactory] based on file extension which has been 
registered
+    /// via SessionContext::register_file_format. Extensions are not case 
sensitive.
+    pub fn get_file_format_factory(
+        &self,
+        ext: &str,
+    ) -> Option<Arc<dyn FileFormatFactory>> {
+        self.file_formats.get(&ext.to_lowercase()).cloned()
+    }
+
     /// Get a new TaskContext to run in this session
     pub fn task_ctx(&self) -> Arc<TaskContext> {
         Arc::new(TaskContext::from(self))
@@ -967,6 +1034,16 @@ impl<'a> ContextProvider for SessionContextProvider<'a> {
     fn udwf_names(&self) -> Vec<String> {
         self.state.window_functions().keys().cloned().collect()
     }
+
+    fn get_file_type(&self, ext: &str) -> datafusion_common::Result<Arc<dyn 
FileType>> {
+        self.state
+            .file_formats
+            .get(&ext.to_lowercase())
+            .ok_or(plan_datafusion_err!(
+                "There is no registered file format with ext {ext}"
+            ))
+            .map(|file_type| format_as_file_type(file_type.clone()))
+    }
 }
 
 impl FunctionRegistry for SessionState {
diff --git a/datafusion/core/src/physical_planner.rs 
b/datafusion/core/src/physical_planner.rs
index 15f7555575..5b8501baaa 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -22,13 +22,7 @@ use std::collections::HashMap;
 use std::fmt::Write;
 use std::sync::Arc;
 
-use crate::datasource::file_format::arrow::ArrowFormat;
-use crate::datasource::file_format::avro::AvroFormat;
-use crate::datasource::file_format::csv::CsvFormat;
-use crate::datasource::file_format::json::JsonFormat;
-#[cfg(feature = "parquet")]
-use crate::datasource::file_format::parquet::ParquetFormat;
-use crate::datasource::file_format::FileFormat;
+use crate::datasource::file_format::file_type_to_format;
 use crate::datasource::listing::ListingTableUrl;
 use crate::datasource::physical_plan::FileSinkConfig;
 use crate::datasource::source_as_provider;
@@ -74,11 +68,10 @@ use arrow::compute::SortOptions;
 use arrow::datatypes::{Schema, SchemaRef};
 use arrow_array::builder::StringBuilder;
 use arrow_array::RecordBatch;
-use datafusion_common::config::FormatOptions;
 use datafusion_common::display::ToStringifiedPlan;
 use datafusion_common::{
     exec_err, internal_datafusion_err, internal_err, not_impl_err, plan_err, 
DFSchema,
-    FileType, ScalarValue,
+    ScalarValue,
 };
 use datafusion_expr::dml::CopyTo;
 use datafusion_expr::expr::{
@@ -764,7 +757,7 @@ impl DefaultPhysicalPlanner {
             LogicalPlan::Copy(CopyTo {
                 input,
                 output_url,
-                format_options,
+                file_type,
                 partition_by,
                 options: source_option_tuples,
             }) => {
@@ -791,32 +784,9 @@ impl DefaultPhysicalPlanner {
                     table_partition_cols,
                     overwrite: false,
                 };
-                let mut table_options = session_state.default_table_options();
-                let sink_format: Arc<dyn FileFormat> = match format_options {
-                    FormatOptions::CSV(options) => {
-                        table_options.csv = options.clone();
-                        table_options.set_file_format(FileType::CSV);
-                        
table_options.alter_with_string_hash_map(source_option_tuples)?;
-                        
Arc::new(CsvFormat::default().with_options(table_options.csv))
-                    }
-                    FormatOptions::JSON(options) => {
-                        table_options.json = options.clone();
-                        table_options.set_file_format(FileType::JSON);
-                        
table_options.alter_with_string_hash_map(source_option_tuples)?;
-                        
Arc::new(JsonFormat::default().with_options(table_options.json))
-                    }
-                    #[cfg(feature = "parquet")]
-                    FormatOptions::PARQUET(options) => {
-                        table_options.parquet = options.clone();
-                        table_options.set_file_format(FileType::PARQUET);
-                        
table_options.alter_with_string_hash_map(source_option_tuples)?;
-                        Arc::new(
-                            
ParquetFormat::default().with_options(table_options.parquet),
-                        )
-                    }
-                    FormatOptions::AVRO => Arc::new(AvroFormat {}),
-                    FormatOptions::ARROW => Arc::new(ArrowFormat {}),
-                };
+
+                let sink_format = file_type_to_format(file_type)?
+                    .create(session_state, source_option_tuples)?;
 
                 sink_format
                     .create_writer_physical_plan(input_exec, session_state, 
config, None)
diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs
index 2515b8a4e0..e8550a79cb 100644
--- a/datafusion/core/src/test/mod.rs
+++ b/datafusion/core/src/test/mod.rs
@@ -24,9 +24,9 @@ use std::io::{BufReader, BufWriter};
 use std::path::Path;
 use std::sync::Arc;
 
-use crate::datasource::file_format::file_compression_type::{
-    FileCompressionType, FileTypeExt,
-};
+use crate::datasource::file_format::csv::CsvFormat;
+use crate::datasource::file_format::file_compression_type::FileCompressionType;
+use crate::datasource::file_format::FileFormat;
 use crate::datasource::listing::PartitionedFile;
 use crate::datasource::object_store::ObjectStoreUrl;
 use crate::datasource::physical_plan::{CsvExec, FileScanConfig};
@@ -40,7 +40,7 @@ use crate::test_util::{aggr_test_schema, arrow_test_data};
 use arrow::array::{self, Array, ArrayRef, Decimal128Builder, Int32Array};
 use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
 use arrow::record_batch::RecordBatch;
-use datafusion_common::{DataFusionError, FileType, Statistics};
+use datafusion_common::{DataFusionError, Statistics};
 use datafusion_execution::{SendableRecordBatchStream, TaskContext};
 use datafusion_physical_expr::{EquivalenceProperties, Partitioning, 
PhysicalSortExpr};
 use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec};
@@ -87,7 +87,7 @@ pub fn scan_partitioned_csv(partitions: usize, work_dir: 
&Path) -> Result<Arc<Cs
         path.as_str(),
         filename,
         partitions,
-        FileType::CSV,
+        Arc::new(CsvFormat::default()),
         FileCompressionType::UNCOMPRESSED,
         work_dir,
     )?;
@@ -108,7 +108,7 @@ pub fn partitioned_file_groups(
     path: &str,
     filename: &str,
     partitions: usize,
-    file_type: FileType,
+    file_format: Arc<dyn FileFormat>,
     file_compression_type: FileCompressionType,
     work_dir: &Path,
 ) -> Result<Vec<Vec<PartitionedFile>>> {
@@ -120,9 +120,8 @@ pub fn partitioned_file_groups(
         let filename = format!(
             "partition-{}{}",
             i,
-            file_type
-                .to_owned()
-                .get_ext_with_compression(file_compression_type.to_owned())
+            file_format
+                .get_ext_with_compression(&file_compression_type)
                 .unwrap()
         );
         let filename = work_dir.join(filename);
@@ -167,7 +166,7 @@ pub fn partitioned_file_groups(
     for (i, line) in f.lines().enumerate() {
         let line = line.unwrap();
 
-        if i == 0 && file_type == FileType::CSV {
+        if i == 0 && file_format.get_ext() == CsvFormat::default().get_ext() {
             // write header to all partitions
             for w in writers.iter_mut() {
                 w.write_all(line.as_bytes()).unwrap();
diff --git a/datafusion/expr/src/logical_plan/builder.rs 
b/datafusion/expr/src/logical_plan/builder.rs
index 4564a8c71f..f87151efd8 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -49,8 +49,8 @@ use crate::{
 };
 
 use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef};
-use datafusion_common::config::FormatOptions;
 use datafusion_common::display::ToStringifiedPlan;
+use datafusion_common::file_options::file_type::FileType;
 use datafusion_common::{
     get_target_functional_dependencies, internal_err, not_impl_err, 
plan_datafusion_err,
     plan_err, Column, DFSchema, DFSchemaRef, DataFusionError, Result, 
ScalarValue,
@@ -272,14 +272,14 @@ impl LogicalPlanBuilder {
     pub fn copy_to(
         input: LogicalPlan,
         output_url: String,
-        format_options: FormatOptions,
+        file_type: Arc<dyn FileType>,
         options: HashMap<String, String>,
         partition_by: Vec<String>,
     ) -> Result<Self> {
         Ok(Self::from(LogicalPlan::Copy(CopyTo {
             input: Arc::new(input),
             output_url,
-            format_options,
+            file_type,
             options,
             partition_by,
         })))
diff --git a/datafusion/expr/src/logical_plan/display.rs 
b/datafusion/expr/src/logical_plan/display.rs
index 707cff8ab5..81fd03555a 100644
--- a/datafusion/expr/src/logical_plan/display.rs
+++ b/datafusion/expr/src/logical_plan/display.rs
@@ -425,7 +425,7 @@ impl<'a, 'b> PgJsonVisitor<'a, 'b> {
             LogicalPlan::Copy(CopyTo {
                 input: _,
                 output_url,
-                format_options,
+                file_type,
                 partition_by: _,
                 options,
             }) => {
@@ -437,7 +437,7 @@ impl<'a, 'b> PgJsonVisitor<'a, 'b> {
                 json!({
                     "Node Type": "CopyTo",
                     "Output URL": output_url,
-                    "Format Options": format!("{}", format_options),
+                    "File Type": format!("{}", file_type.get_ext()),
                     "Options": op_str
                 })
             }
diff --git a/datafusion/expr/src/logical_plan/dml.rs 
b/datafusion/expr/src/logical_plan/dml.rs
index 13f3759ab8..c9eef9bd34 100644
--- a/datafusion/expr/src/logical_plan/dml.rs
+++ b/datafusion/expr/src/logical_plan/dml.rs
@@ -21,7 +21,7 @@ use std::hash::{Hash, Hasher};
 use std::sync::Arc;
 
 use arrow::datatypes::{DataType, Field, Schema};
-use datafusion_common::config::FormatOptions;
+use datafusion_common::file_options::file_type::FileType;
 use datafusion_common::{DFSchemaRef, TableReference};
 
 use crate::LogicalPlan;
@@ -35,8 +35,8 @@ pub struct CopyTo {
     pub output_url: String,
     /// Determines which, if any, columns should be used for hive-style 
partitioned writes
     pub partition_by: Vec<String>,
-    /// File format options.
-    pub format_options: FormatOptions,
+    /// File type trait
+    pub file_type: Arc<dyn FileType>,
     /// SQL Options that can affect the formats
     pub options: HashMap<String, String>,
 }
diff --git a/datafusion/expr/src/logical_plan/plan.rs 
b/datafusion/expr/src/logical_plan/plan.rs
index 6e7efaf39e..31f830a6a1 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -857,13 +857,13 @@ impl LogicalPlan {
             LogicalPlan::Copy(CopyTo {
                 input: _,
                 output_url,
-                format_options,
+                file_type,
                 options,
                 partition_by,
             }) => Ok(LogicalPlan::Copy(CopyTo {
                 input: Arc::new(inputs.swap_remove(0)),
                 output_url: output_url.clone(),
-                format_options: format_options.clone(),
+                file_type: file_type.clone(),
                 options: options.clone(),
                 partition_by: partition_by.clone(),
             })),
@@ -1729,7 +1729,7 @@ impl LogicalPlan {
                     LogicalPlan::Copy(CopyTo {
                         input: _,
                         output_url,
-                        format_options,
+                        file_type,
                         options,
                         ..
                     }) => {
@@ -1739,7 +1739,7 @@ impl LogicalPlan {
                             .collect::<Vec<String>>()
                             .join(", ");
 
-                        write!(f, "CopyTo: format={format_options} 
output_url={output_url} options: ({op_str})")
+                        write!(f, "CopyTo: format={} output_url={output_url} 
options: ({op_str})", file_type.get_ext())
                     }
                     LogicalPlan::Ddl(ddl) => {
                         write!(f, "{}", ddl.display())
diff --git a/datafusion/expr/src/logical_plan/tree_node.rs 
b/datafusion/expr/src/logical_plan/tree_node.rs
index 86c0cffd80..a47906f203 100644
--- a/datafusion/expr/src/logical_plan/tree_node.rs
+++ b/datafusion/expr/src/logical_plan/tree_node.rs
@@ -256,14 +256,14 @@ impl TreeNode for LogicalPlan {
                 input,
                 output_url,
                 partition_by,
-                format_options,
+                file_type,
                 options,
             }) => rewrite_arc(input, f)?.update_data(|input| {
                 LogicalPlan::Copy(CopyTo {
                     input,
                     output_url,
                     partition_by,
-                    format_options,
+                    file_type,
                     options,
                 })
             }),
diff --git a/datafusion/proto/gen/src/main.rs b/datafusion/proto/gen/src/main.rs
index 22c16eacb0..d38a41a01a 100644
--- a/datafusion/proto/gen/src/main.rs
+++ b/datafusion/proto/gen/src/main.rs
@@ -29,6 +29,7 @@ fn main() -> Result<(), String> {
     let descriptor_path = proto_dir.join("proto/proto_descriptor.bin");
 
     prost_build::Config::new()
+        .protoc_arg("--experimental_allow_proto3_optional")
         .file_descriptor_set_path(&descriptor_path)
         .out_dir(out_dir)
         .compile_well_known_types()
diff --git a/datafusion/proto/proto/datafusion.proto 
b/datafusion/proto/proto/datafusion.proto
index 2e7005a4cb..f2594ba103 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -251,13 +251,7 @@ message DistinctOnNode {
 message CopyToNode {
     LogicalPlanNode input = 1;
     string output_url = 2;
-    oneof format_options {
-      datafusion_common.CsvOptions csv = 8;
-      datafusion_common.JsonOptions json = 9;
-      datafusion_common.TableParquetOptions parquet = 10;
-      datafusion_common.AvroOptions avro = 11;
-      datafusion_common.ArrowOptions arrow = 12;
-    }
+    bytes file_type = 3;
     repeated string partition_by = 7;
 }
 
diff --git a/datafusion/proto/src/generated/pbjson.rs 
b/datafusion/proto/src/generated/pbjson.rs
index 8fdc5d2e4d..e8fbe95442 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -2536,10 +2536,10 @@ impl serde::Serialize for CopyToNode {
         if !self.output_url.is_empty() {
             len += 1;
         }
-        if !self.partition_by.is_empty() {
+        if !self.file_type.is_empty() {
             len += 1;
         }
-        if self.format_options.is_some() {
+        if !self.partition_by.is_empty() {
             len += 1;
         }
         let mut struct_ser = 
serializer.serialize_struct("datafusion.CopyToNode", len)?;
@@ -2549,28 +2549,13 @@ impl serde::Serialize for CopyToNode {
         if !self.output_url.is_empty() {
             struct_ser.serialize_field("outputUrl", &self.output_url)?;
         }
+        if !self.file_type.is_empty() {
+            #[allow(clippy::needless_borrow)]
+            struct_ser.serialize_field("fileType", 
pbjson::private::base64::encode(&self.file_type).as_str())?;
+        }
         if !self.partition_by.is_empty() {
             struct_ser.serialize_field("partitionBy", &self.partition_by)?;
         }
-        if let Some(v) = self.format_options.as_ref() {
-            match v {
-                copy_to_node::FormatOptions::Csv(v) => {
-                    struct_ser.serialize_field("csv", v)?;
-                }
-                copy_to_node::FormatOptions::Json(v) => {
-                    struct_ser.serialize_field("json", v)?;
-                }
-                copy_to_node::FormatOptions::Parquet(v) => {
-                    struct_ser.serialize_field("parquet", v)?;
-                }
-                copy_to_node::FormatOptions::Avro(v) => {
-                    struct_ser.serialize_field("avro", v)?;
-                }
-                copy_to_node::FormatOptions::Arrow(v) => {
-                    struct_ser.serialize_field("arrow", v)?;
-                }
-            }
-        }
         struct_ser.end()
     }
 }
@@ -2584,25 +2569,18 @@ impl<'de> serde::Deserialize<'de> for CopyToNode {
             "input",
             "output_url",
             "outputUrl",
+            "file_type",
+            "fileType",
             "partition_by",
             "partitionBy",
-            "csv",
-            "json",
-            "parquet",
-            "avro",
-            "arrow",
         ];
 
         #[allow(clippy::enum_variant_names)]
         enum GeneratedField {
             Input,
             OutputUrl,
+            FileType,
             PartitionBy,
-            Csv,
-            Json,
-            Parquet,
-            Avro,
-            Arrow,
         }
         impl<'de> serde::Deserialize<'de> for GeneratedField {
             fn deserialize<D>(deserializer: D) -> 
std::result::Result<GeneratedField, D::Error>
@@ -2626,12 +2604,8 @@ impl<'de> serde::Deserialize<'de> for CopyToNode {
                         match value {
                             "input" => Ok(GeneratedField::Input),
                             "outputUrl" | "output_url" => 
Ok(GeneratedField::OutputUrl),
+                            "fileType" | "file_type" => 
Ok(GeneratedField::FileType),
                             "partitionBy" | "partition_by" => 
Ok(GeneratedField::PartitionBy),
-                            "csv" => Ok(GeneratedField::Csv),
-                            "json" => Ok(GeneratedField::Json),
-                            "parquet" => Ok(GeneratedField::Parquet),
-                            "avro" => Ok(GeneratedField::Avro),
-                            "arrow" => Ok(GeneratedField::Arrow),
                             _ => Err(serde::de::Error::unknown_field(value, 
FIELDS)),
                         }
                     }
@@ -2653,8 +2627,8 @@ impl<'de> serde::Deserialize<'de> for CopyToNode {
             {
                 let mut input__ = None;
                 let mut output_url__ = None;
+                let mut file_type__ = None;
                 let mut partition_by__ = None;
-                let mut format_options__ = None;
                 while let Some(k) = map_.next_key()? {
                     match k {
                         GeneratedField::Input => {
@@ -2669,54 +2643,27 @@ impl<'de> serde::Deserialize<'de> for CopyToNode {
                             }
                             output_url__ = Some(map_.next_value()?);
                         }
+                        GeneratedField::FileType => {
+                            if file_type__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("fileType"));
+                            }
+                            file_type__ = 
+                                
Some(map_.next_value::<::pbjson::private::BytesDeserialize<_>>()?.0)
+                            ;
+                        }
                         GeneratedField::PartitionBy => {
                             if partition_by__.is_some() {
                                 return 
Err(serde::de::Error::duplicate_field("partitionBy"));
                             }
                             partition_by__ = Some(map_.next_value()?);
                         }
-                        GeneratedField::Csv => {
-                            if format_options__.is_some() {
-                                return 
Err(serde::de::Error::duplicate_field("csv"));
-                            }
-                            format_options__ = 
map_.next_value::<::std::option::Option<_>>()?.map(copy_to_node::FormatOptions::Csv)
-;
-                        }
-                        GeneratedField::Json => {
-                            if format_options__.is_some() {
-                                return 
Err(serde::de::Error::duplicate_field("json"));
-                            }
-                            format_options__ = 
map_.next_value::<::std::option::Option<_>>()?.map(copy_to_node::FormatOptions::Json)
-;
-                        }
-                        GeneratedField::Parquet => {
-                            if format_options__.is_some() {
-                                return 
Err(serde::de::Error::duplicate_field("parquet"));
-                            }
-                            format_options__ = 
map_.next_value::<::std::option::Option<_>>()?.map(copy_to_node::FormatOptions::Parquet)
-;
-                        }
-                        GeneratedField::Avro => {
-                            if format_options__.is_some() {
-                                return 
Err(serde::de::Error::duplicate_field("avro"));
-                            }
-                            format_options__ = 
map_.next_value::<::std::option::Option<_>>()?.map(copy_to_node::FormatOptions::Avro)
-;
-                        }
-                        GeneratedField::Arrow => {
-                            if format_options__.is_some() {
-                                return 
Err(serde::de::Error::duplicate_field("arrow"));
-                            }
-                            format_options__ = 
map_.next_value::<::std::option::Option<_>>()?.map(copy_to_node::FormatOptions::Arrow)
-;
-                        }
                     }
                 }
                 Ok(CopyToNode {
                     input: input__,
                     output_url: output_url__.unwrap_or_default(),
+                    file_type: file_type__.unwrap_or_default(),
                     partition_by: partition_by__.unwrap_or_default(),
-                    format_options: format_options__,
                 })
             }
         }
diff --git a/datafusion/proto/src/generated/prost.rs 
b/datafusion/proto/src/generated/prost.rs
index 036b7cff9b..93bf6c0602 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -411,27 +411,10 @@ pub struct CopyToNode {
     pub input: 
::core::option::Option<::prost::alloc::boxed::Box<LogicalPlanNode>>,
     #[prost(string, tag = "2")]
     pub output_url: ::prost::alloc::string::String,
+    #[prost(bytes = "vec", tag = "3")]
+    pub file_type: ::prost::alloc::vec::Vec<u8>,
     #[prost(string, repeated, tag = "7")]
     pub partition_by: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
-    #[prost(oneof = "copy_to_node::FormatOptions", tags = "8, 9, 10, 11, 12")]
-    pub format_options: ::core::option::Option<copy_to_node::FormatOptions>,
-}
-/// Nested message and enum types in `CopyToNode`.
-pub mod copy_to_node {
-    #[allow(clippy::derive_partial_eq_without_eq)]
-    #[derive(Clone, PartialEq, ::prost::Oneof)]
-    pub enum FormatOptions {
-        #[prost(message, tag = "8")]
-        Csv(super::super::datafusion_common::CsvOptions),
-        #[prost(message, tag = "9")]
-        Json(super::super::datafusion_common::JsonOptions),
-        #[prost(message, tag = "10")]
-        Parquet(super::super::datafusion_common::TableParquetOptions),
-        #[prost(message, tag = "11")]
-        Avro(super::super::datafusion_common::AvroOptions),
-        #[prost(message, tag = "12")]
-        Arrow(super::super::datafusion_common::ArrowOptions),
-    }
 }
 #[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
diff --git a/datafusion/proto/src/logical_plan/file_formats.rs 
b/datafusion/proto/src/logical_plan/file_formats.rs
new file mode 100644
index 0000000000..31102b728e
--- /dev/null
+++ b/datafusion/proto/src/logical_plan/file_formats.rs
@@ -0,0 +1,399 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use datafusion::{
+    datasource::file_format::{
+        arrow::ArrowFormatFactory, csv::CsvFormatFactory, 
json::JsonFormatFactory,
+        parquet::ParquetFormatFactory, FileFormatFactory,
+    },
+    prelude::SessionContext,
+};
+use datafusion_common::not_impl_err;
+
+use super::LogicalExtensionCodec;
+
+#[derive(Debug)]
+pub struct CsvLogicalExtensionCodec;
+
+// TODO! This is a placeholder for now and needs to be implemented for real.
+impl LogicalExtensionCodec for CsvLogicalExtensionCodec {
+    fn try_decode(
+        &self,
+        _buf: &[u8],
+        _inputs: &[datafusion_expr::LogicalPlan],
+        _ctx: &datafusion::prelude::SessionContext,
+    ) -> datafusion_common::Result<datafusion_expr::Extension> {
+        not_impl_err!("Method not implemented")
+    }
+
+    fn try_encode(
+        &self,
+        _node: &datafusion_expr::Extension,
+        _buf: &mut Vec<u8>,
+    ) -> datafusion_common::Result<()> {
+        not_impl_err!("Method not implemented")
+    }
+
+    fn try_decode_table_provider(
+        &self,
+        _buf: &[u8],
+        _schema: arrow::datatypes::SchemaRef,
+        _ctx: &datafusion::prelude::SessionContext,
+    ) -> datafusion_common::Result<
+        std::sync::Arc<dyn datafusion::datasource::TableProvider>,
+    > {
+        not_impl_err!("Method not implemented")
+    }
+
+    fn try_encode_table_provider(
+        &self,
+        _node: std::sync::Arc<dyn datafusion::datasource::TableProvider>,
+        _buf: &mut Vec<u8>,
+    ) -> datafusion_common::Result<()> {
+        not_impl_err!("Method not implemented")
+    }
+
+    fn try_decode_file_format(
+        &self,
+        __buf: &[u8],
+        __ctx: &SessionContext,
+    ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
+        Ok(Arc::new(CsvFormatFactory::new()))
+    }
+
+    fn try_encode_file_format(
+        &self,
+        __buf: &[u8],
+        __node: Arc<dyn FileFormatFactory>,
+    ) -> datafusion_common::Result<()> {
+        Ok(())
+    }
+
+    fn try_decode_udf(
+        &self,
+        name: &str,
+        __buf: &[u8],
+    ) -> datafusion_common::Result<Arc<datafusion_expr::ScalarUDF>> {
+        not_impl_err!("LogicalExtensionCodec is not provided for scalar 
function {name}")
+    }
+
+    fn try_encode_udf(
+        &self,
+        __node: &datafusion_expr::ScalarUDF,
+        __buf: &mut Vec<u8>,
+    ) -> datafusion_common::Result<()> {
+        Ok(())
+    }
+}
+
+#[derive(Debug)]
+pub struct JsonLogicalExtensionCodec;
+
+// TODO! This is a placeholder for now and needs to be implemented for real.
+impl LogicalExtensionCodec for JsonLogicalExtensionCodec {
+    fn try_decode(
+        &self,
+        _buf: &[u8],
+        _inputs: &[datafusion_expr::LogicalPlan],
+        _ctx: &datafusion::prelude::SessionContext,
+    ) -> datafusion_common::Result<datafusion_expr::Extension> {
+        not_impl_err!("Method not implemented")
+    }
+
+    fn try_encode(
+        &self,
+        _node: &datafusion_expr::Extension,
+        _buf: &mut Vec<u8>,
+    ) -> datafusion_common::Result<()> {
+        not_impl_err!("Method not implemented")
+    }
+
+    fn try_decode_table_provider(
+        &self,
+        _buf: &[u8],
+        _schema: arrow::datatypes::SchemaRef,
+        _ctx: &datafusion::prelude::SessionContext,
+    ) -> datafusion_common::Result<
+        std::sync::Arc<dyn datafusion::datasource::TableProvider>,
+    > {
+        not_impl_err!("Method not implemented")
+    }
+
+    fn try_encode_table_provider(
+        &self,
+        _node: std::sync::Arc<dyn datafusion::datasource::TableProvider>,
+        _buf: &mut Vec<u8>,
+    ) -> datafusion_common::Result<()> {
+        not_impl_err!("Method not implemented")
+    }
+
+    fn try_decode_file_format(
+        &self,
+        __buf: &[u8],
+        __ctx: &SessionContext,
+    ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
+        Ok(Arc::new(JsonFormatFactory::new()))
+    }
+
+    fn try_encode_file_format(
+        &self,
+        __buf: &[u8],
+        __node: Arc<dyn FileFormatFactory>,
+    ) -> datafusion_common::Result<()> {
+        Ok(())
+    }
+
+    fn try_decode_udf(
+        &self,
+        name: &str,
+        __buf: &[u8],
+    ) -> datafusion_common::Result<Arc<datafusion_expr::ScalarUDF>> {
+        not_impl_err!("LogicalExtensionCodec is not provided for scalar 
function {name}")
+    }
+
+    fn try_encode_udf(
+        &self,
+        __node: &datafusion_expr::ScalarUDF,
+        __buf: &mut Vec<u8>,
+    ) -> datafusion_common::Result<()> {
+        Ok(())
+    }
+}
+
+#[derive(Debug)]
+pub struct ParquetLogicalExtensionCodec;
+
+// TODO! This is a placeholder for now and needs to be implemented for real.
+impl LogicalExtensionCodec for ParquetLogicalExtensionCodec {
+    fn try_decode(
+        &self,
+        _buf: &[u8],
+        _inputs: &[datafusion_expr::LogicalPlan],
+        _ctx: &datafusion::prelude::SessionContext,
+    ) -> datafusion_common::Result<datafusion_expr::Extension> {
+        not_impl_err!("Method not implemented")
+    }
+
+    fn try_encode(
+        &self,
+        _node: &datafusion_expr::Extension,
+        _buf: &mut Vec<u8>,
+    ) -> datafusion_common::Result<()> {
+        not_impl_err!("Method not implemented")
+    }
+
+    fn try_decode_table_provider(
+        &self,
+        _buf: &[u8],
+        _schema: arrow::datatypes::SchemaRef,
+        _ctx: &datafusion::prelude::SessionContext,
+    ) -> datafusion_common::Result<
+        std::sync::Arc<dyn datafusion::datasource::TableProvider>,
+    > {
+        not_impl_err!("Method not implemented")
+    }
+
+    fn try_encode_table_provider(
+        &self,
+        _node: std::sync::Arc<dyn datafusion::datasource::TableProvider>,
+        _buf: &mut Vec<u8>,
+    ) -> datafusion_common::Result<()> {
+        not_impl_err!("Method not implemented")
+    }
+
+    fn try_decode_file_format(
+        &self,
+        __buf: &[u8],
+        __ctx: &SessionContext,
+    ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
+        Ok(Arc::new(ParquetFormatFactory::new()))
+    }
+
+    fn try_encode_file_format(
+        &self,
+        __buf: &[u8],
+        __node: Arc<dyn FileFormatFactory>,
+    ) -> datafusion_common::Result<()> {
+        Ok(())
+    }
+
+    fn try_decode_udf(
+        &self,
+        name: &str,
+        __buf: &[u8],
+    ) -> datafusion_common::Result<Arc<datafusion_expr::ScalarUDF>> {
+        not_impl_err!("LogicalExtensionCodec is not provided for scalar 
function {name}")
+    }
+
+    fn try_encode_udf(
+        &self,
+        __node: &datafusion_expr::ScalarUDF,
+        __buf: &mut Vec<u8>,
+    ) -> datafusion_common::Result<()> {
+        Ok(())
+    }
+}
+
+#[derive(Debug)]
+pub struct ArrowLogicalExtensionCodec;
+
+// TODO! This is a placeholder for now and needs to be implemented for real.
+impl LogicalExtensionCodec for ArrowLogicalExtensionCodec {
+    fn try_decode(
+        &self,
+        _buf: &[u8],
+        _inputs: &[datafusion_expr::LogicalPlan],
+        _ctx: &datafusion::prelude::SessionContext,
+    ) -> datafusion_common::Result<datafusion_expr::Extension> {
+        not_impl_err!("Method not implemented")
+    }
+
+    fn try_encode(
+        &self,
+        _node: &datafusion_expr::Extension,
+        _buf: &mut Vec<u8>,
+    ) -> datafusion_common::Result<()> {
+        not_impl_err!("Method not implemented")
+    }
+
+    fn try_decode_table_provider(
+        &self,
+        _buf: &[u8],
+        _schema: arrow::datatypes::SchemaRef,
+        _ctx: &datafusion::prelude::SessionContext,
+    ) -> datafusion_common::Result<
+        std::sync::Arc<dyn datafusion::datasource::TableProvider>,
+    > {
+        not_impl_err!("Method not implemented")
+    }
+
+    fn try_encode_table_provider(
+        &self,
+        _node: std::sync::Arc<dyn datafusion::datasource::TableProvider>,
+        _buf: &mut Vec<u8>,
+    ) -> datafusion_common::Result<()> {
+        not_impl_err!("Method not implemented")
+    }
+
+    fn try_decode_file_format(
+        &self,
+        __buf: &[u8],
+        __ctx: &SessionContext,
+    ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
+        Ok(Arc::new(ArrowFormatFactory::new()))
+    }
+
+    fn try_encode_file_format(
+        &self,
+        __buf: &[u8],
+        __node: Arc<dyn FileFormatFactory>,
+    ) -> datafusion_common::Result<()> {
+        Ok(())
+    }
+
+    fn try_decode_udf(
+        &self,
+        name: &str,
+        __buf: &[u8],
+    ) -> datafusion_common::Result<Arc<datafusion_expr::ScalarUDF>> {
+        not_impl_err!("LogicalExtensionCodec is not provided for scalar 
function {name}")
+    }
+
+    fn try_encode_udf(
+        &self,
+        __node: &datafusion_expr::ScalarUDF,
+        __buf: &mut Vec<u8>,
+    ) -> datafusion_common::Result<()> {
+        Ok(())
+    }
+}
+
+#[derive(Debug)]
+pub struct AvroLogicalExtensionCodec;
+
+// TODO! This is a placeholder for now and needs to be implemented for real.
+impl LogicalExtensionCodec for AvroLogicalExtensionCodec {
+    fn try_decode(
+        &self,
+        _buf: &[u8],
+        _inputs: &[datafusion_expr::LogicalPlan],
+        _ctx: &datafusion::prelude::SessionContext,
+    ) -> datafusion_common::Result<datafusion_expr::Extension> {
+        not_impl_err!("Method not implemented")
+    }
+
+    fn try_encode(
+        &self,
+        _node: &datafusion_expr::Extension,
+        _buf: &mut Vec<u8>,
+    ) -> datafusion_common::Result<()> {
+        not_impl_err!("Method not implemented")
+    }
+
+    fn try_decode_table_provider(
+        &self,
+        _buf: &[u8],
+        _schema: arrow::datatypes::SchemaRef,
+        _cts: &datafusion::prelude::SessionContext,
+    ) -> datafusion_common::Result<
+        std::sync::Arc<dyn datafusion::datasource::TableProvider>,
+    > {
+        not_impl_err!("Method not implemented")
+    }
+
+    fn try_encode_table_provider(
+        &self,
+        _node: std::sync::Arc<dyn datafusion::datasource::TableProvider>,
+        _buf: &mut Vec<u8>,
+    ) -> datafusion_common::Result<()> {
+        not_impl_err!("Method not implemented")
+    }
+
+    fn try_decode_file_format(
+        &self,
+        __buf: &[u8],
+        __ctx: &SessionContext,
+    ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
+        Ok(Arc::new(ArrowFormatFactory::new()))
+    }
+
+    fn try_encode_file_format(
+        &self,
+        __buf: &[u8],
+        __node: Arc<dyn FileFormatFactory>,
+    ) -> datafusion_common::Result<()> {
+        Ok(())
+    }
+
+    fn try_decode_udf(
+        &self,
+        name: &str,
+        __buf: &[u8],
+    ) -> datafusion_common::Result<Arc<datafusion_expr::ScalarUDF>> {
+        not_impl_err!("LogicalExtensionCodec is not provided for scalar 
function {name}")
+    }
+
+    fn try_encode_udf(
+        &self,
+        __node: &datafusion_expr::ScalarUDF,
+        __buf: &mut Vec<u8>,
+    ) -> datafusion_common::Result<()> {
+        Ok(())
+    }
+}
diff --git a/datafusion/proto/src/logical_plan/mod.rs 
b/datafusion/proto/src/logical_plan/mod.rs
index ef37150a35..cdb9d5260a 100644
--- a/datafusion/proto/src/logical_plan/mod.rs
+++ b/datafusion/proto/src/logical_plan/mod.rs
@@ -33,6 +33,9 @@ use crate::protobuf::{proto_error, FromProtoError, 
ToProtoError};
 use arrow::datatypes::{DataType, Schema, SchemaRef};
 #[cfg(feature = "parquet")]
 use datafusion::datasource::file_format::parquet::ParquetFormat;
+use datafusion::datasource::file_format::{
+    file_type_to_format, format_as_file_type, FileFormatFactory,
+};
 use datafusion::{
     datasource::{
         file_format::{avro::AvroFormat, csv::CsvFormat, FileFormat},
@@ -43,6 +46,7 @@ use datafusion::{
     datasource::{provider_as_source, source_as_provider},
     prelude::SessionContext,
 };
+use datafusion_common::file_options::file_type::FileType;
 use datafusion_common::{
     context, internal_datafusion_err, internal_err, not_impl_err, 
DataFusionError,
     Result, TableReference,
@@ -64,6 +68,7 @@ use prost::Message;
 
 use self::to_proto::serialize_expr;
 
+pub mod file_formats;
 pub mod from_proto;
 pub mod to_proto;
 
@@ -114,6 +119,22 @@ pub trait LogicalExtensionCodec: Debug + Send + Sync {
         buf: &mut Vec<u8>,
     ) -> Result<()>;
 
+    fn try_decode_file_format(
+        &self,
+        _buf: &[u8],
+        _ctx: &SessionContext,
+    ) -> Result<Arc<dyn FileFormatFactory>> {
+        not_impl_err!("LogicalExtensionCodec is not provided for file format")
+    }
+
+    fn try_encode_file_format(
+        &self,
+        _buf: &[u8],
+        _node: Arc<dyn FileFormatFactory>,
+    ) -> Result<()> {
+        Ok(())
+    }
+
     fn try_decode_udf(&self, name: &str, _buf: &[u8]) -> 
Result<Arc<ScalarUDF>> {
         not_impl_err!("LogicalExtensionCodec is not provided for scalar 
function {name}")
     }
@@ -829,12 +850,16 @@ impl AsLogicalPlan for LogicalPlanNode {
                 let input: LogicalPlan =
                     into_logical_plan!(copy.input, ctx, extension_codec)?;
 
+                let file_type: Arc<dyn FileType> = format_as_file_type(
+                    extension_codec.try_decode_file_format(&copy.file_type, 
ctx)?,
+                );
+
                 Ok(datafusion_expr::LogicalPlan::Copy(
                     datafusion_expr::dml::CopyTo {
                         input: Arc::new(input),
                         output_url: copy.output_url.clone(),
                         partition_by: copy.partition_by.clone(),
-                        format_options: 
convert_required!(copy.format_options)?,
+                        file_type,
                         options: Default::default(),
                     },
                 ))
@@ -1609,7 +1634,7 @@ impl AsLogicalPlan for LogicalPlanNode {
             LogicalPlan::Copy(dml::CopyTo {
                 input,
                 output_url,
-                format_options,
+                file_type,
                 partition_by,
                 ..
             }) => {
@@ -1618,12 +1643,16 @@ impl AsLogicalPlan for LogicalPlanNode {
                     extension_codec,
                 )?;
 
+                let buf = Vec::new();
+                extension_codec
+                    .try_encode_file_format(&buf, 
file_type_to_format(file_type)?)?;
+
                 Ok(protobuf::LogicalPlanNode {
                     logical_plan_type: Some(LogicalPlanType::CopyTo(Box::new(
                         protobuf::CopyToNode {
                             input: Some(Box::new(input)),
                             output_url: output_url.to_string(),
-                            format_options: Some(format_options.try_into()?),
+                            file_type: buf,
                             partition_by: partition_by.clone(),
                         },
                     ))),
diff --git a/datafusion/proto/src/physical_plan/from_proto.rs 
b/datafusion/proto/src/physical_plan/from_proto.rs
index b636c77641..7783c15611 100644
--- a/datafusion/proto/src/physical_plan/from_proto.rs
+++ b/datafusion/proto/src/physical_plan/from_proto.rs
@@ -41,14 +41,13 @@ use datafusion::physical_plan::expressions::{
 };
 use datafusion::physical_plan::windows::{create_window_expr, 
schema_add_window_field};
 use datafusion::physical_plan::{Partitioning, PhysicalExpr, WindowExpr};
-use datafusion_common::config::FormatOptions;
 use datafusion_common::{not_impl_err, DataFusionError, Result};
 use datafusion_proto_common::common::proto_error;
 
 use crate::convert_required;
 use crate::logical_plan::{self};
+use crate::protobuf;
 use crate::protobuf::physical_expr_node::ExprType;
-use crate::protobuf::{self, copy_to_node};
 
 use super::PhysicalExtensionCodec;
 
@@ -653,22 +652,3 @@ impl TryFrom<&protobuf::FileSinkConfig> for FileSinkConfig 
{
         })
     }
 }
-
-impl TryFrom<&copy_to_node::FormatOptions> for FormatOptions {
-    type Error = DataFusionError;
-    fn try_from(value: &copy_to_node::FormatOptions) -> Result<Self, 
Self::Error> {
-        Ok(match value {
-            copy_to_node::FormatOptions::Csv(options) => {
-                FormatOptions::CSV(options.try_into()?)
-            }
-            copy_to_node::FormatOptions::Json(options) => {
-                FormatOptions::JSON(options.try_into()?)
-            }
-            copy_to_node::FormatOptions::Parquet(options) => {
-                FormatOptions::PARQUET(options.try_into()?)
-            }
-            copy_to_node::FormatOptions::Avro(_) => FormatOptions::AVRO,
-            copy_to_node::FormatOptions::Arrow(_) => FormatOptions::ARROW,
-        })
-    }
-}
diff --git a/datafusion/proto/src/physical_plan/to_proto.rs 
b/datafusion/proto/src/physical_plan/to_proto.rs
index c02b59d062..8583900e9f 100644
--- a/datafusion/proto/src/physical_plan/to_proto.rs
+++ b/datafusion/proto/src/physical_plan/to_proto.rs
@@ -39,12 +39,11 @@ use datafusion::{
     },
     physical_plan::expressions::LikeExpr,
 };
-use datafusion_common::config::FormatOptions;
 use datafusion_common::{internal_err, not_impl_err, DataFusionError, Result};
 
 use crate::protobuf::{
-    self, copy_to_node, physical_aggregate_expr_node, 
physical_window_expr_node,
-    PhysicalSortExprNode, PhysicalSortExprNodeCollection,
+    self, physical_aggregate_expr_node, physical_window_expr_node, 
PhysicalSortExprNode,
+    PhysicalSortExprNodeCollection,
 };
 
 use super::PhysicalExtensionCodec;
@@ -728,26 +727,3 @@ impl TryFrom<&FileSinkConfig> for protobuf::FileSinkConfig 
{
         })
     }
 }
-
-impl TryFrom<&FormatOptions> for copy_to_node::FormatOptions {
-    type Error = DataFusionError;
-    fn try_from(value: &FormatOptions) -> std::result::Result<Self, 
Self::Error> {
-        Ok(match value {
-            FormatOptions::CSV(options) => {
-                copy_to_node::FormatOptions::Csv(options.try_into()?)
-            }
-            FormatOptions::JSON(options) => {
-                copy_to_node::FormatOptions::Json(options.try_into()?)
-            }
-            FormatOptions::PARQUET(options) => {
-                copy_to_node::FormatOptions::Parquet(options.try_into()?)
-            }
-            FormatOptions::AVRO => {
-                copy_to_node::FormatOptions::Avro(protobuf::AvroOptions {})
-            }
-            FormatOptions::ARROW => {
-                copy_to_node::FormatOptions::Arrow(protobuf::ArrowOptions {})
-            }
-        })
-    }
-}
diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs 
b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
index 510ebe9a98..d54078b72b 100644
--- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
@@ -26,6 +26,13 @@ use arrow::datatypes::{
     DataType, Field, Fields, Int32Type, IntervalDayTimeType, 
IntervalMonthDayNanoType,
     IntervalUnit, Schema, SchemaRef, TimeUnit, UnionFields, UnionMode,
 };
+use datafusion::datasource::file_format::arrow::ArrowFormatFactory;
+use datafusion::datasource::file_format::csv::CsvFormatFactory;
+use datafusion::datasource::file_format::format_as_file_type;
+use datafusion::datasource::file_format::parquet::ParquetFormatFactory;
+use datafusion_proto::logical_plan::file_formats::{
+    ArrowLogicalExtensionCodec, CsvLogicalExtensionCodec, 
ParquetLogicalExtensionCodec,
+};
 use prost::Message;
 
 use datafusion::datasource::provider::TableProviderFactory;
@@ -41,11 +48,11 @@ use datafusion::functions_aggregate::expr_fn::{
 };
 use datafusion::prelude::*;
 use datafusion::test_util::{TestTableFactory, TestTableProvider};
-use datafusion_common::config::{FormatOptions, TableOptions};
+use datafusion_common::config::TableOptions;
 use datafusion_common::scalar::ScalarStructBuilder;
 use datafusion_common::{
     internal_datafusion_err, internal_err, not_impl_err, plan_err, DFSchema, 
DFSchemaRef,
-    DataFusionError, FileType, Result, ScalarValue,
+    DataFusionError, Result, ScalarValue,
 };
 use datafusion_expr::dml::CopyTo;
 use datafusion_expr::expr::{
@@ -326,20 +333,20 @@ async fn roundtrip_logical_plan_copy_to_sql_options() -> 
Result<()> {
     let ctx = SessionContext::new();
 
     let input = create_csv_scan(&ctx).await?;
-    let mut table_options = ctx.copied_table_options();
-    table_options.set_file_format(FileType::CSV);
-    table_options.set("format.delimiter", ";")?;
+    let file_type = format_as_file_type(Arc::new(CsvFormatFactory::new()));
 
     let plan = LogicalPlan::Copy(CopyTo {
         input: Arc::new(input),
         output_url: "test.csv".to_string(),
         partition_by: vec!["a".to_string(), "b".to_string(), "c".to_string()],
-        format_options: FormatOptions::CSV(table_options.csv.clone()),
+        file_type,
         options: Default::default(),
     });
 
-    let bytes = logical_plan_to_bytes(&plan)?;
-    let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?;
+    let codec = CsvLogicalExtensionCodec {};
+    let bytes = logical_plan_to_bytes_with_extension_codec(&plan, &codec)?;
+    let logical_round_trip =
+        logical_plan_from_bytes_with_extension_codec(&bytes, &ctx, &codec)?;
     assert_eq!(format!("{plan:?}"), format!("{logical_round_trip:?}"));
 
     Ok(())
@@ -364,26 +371,27 @@ async fn roundtrip_logical_plan_copy_to_writer_options() 
-> Result<()> {
     parquet_format.global.dictionary_page_size_limit = 444;
     parquet_format.global.max_row_group_size = 555;
 
+    let file_type = format_as_file_type(Arc::new(ParquetFormatFactory::new()));
+
     let plan = LogicalPlan::Copy(CopyTo {
         input: Arc::new(input),
         output_url: "test.parquet".to_string(),
-        format_options: FormatOptions::PARQUET(parquet_format.clone()),
+        file_type,
         partition_by: vec!["a".to_string(), "b".to_string(), "c".to_string()],
         options: Default::default(),
     });
 
-    let bytes = logical_plan_to_bytes(&plan)?;
-    let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?;
+    let codec = ParquetLogicalExtensionCodec {};
+    let bytes = logical_plan_to_bytes_with_extension_codec(&plan, &codec)?;
+    let logical_round_trip =
+        logical_plan_from_bytes_with_extension_codec(&bytes, &ctx, &codec)?;
     assert_eq!(format!("{plan:?}"), format!("{logical_round_trip:?}"));
 
     match logical_round_trip {
         LogicalPlan::Copy(copy_to) => {
             assert_eq!("test.parquet", copy_to.output_url);
             assert_eq!(vec!["a", "b", "c"], copy_to.partition_by);
-            assert_eq!(
-                copy_to.format_options,
-                FormatOptions::PARQUET(parquet_format)
-            );
+            assert_eq!(copy_to.file_type.get_ext(), "parquet".to_string());
         }
         _ => panic!(),
     }
@@ -396,22 +404,26 @@ async fn roundtrip_logical_plan_copy_to_arrow() -> 
Result<()> {
 
     let input = create_csv_scan(&ctx).await?;
 
+    let file_type = format_as_file_type(Arc::new(ArrowFormatFactory::new()));
+
     let plan = LogicalPlan::Copy(CopyTo {
         input: Arc::new(input),
         output_url: "test.arrow".to_string(),
         partition_by: vec!["a".to_string(), "b".to_string(), "c".to_string()],
-        format_options: FormatOptions::ARROW,
+        file_type,
         options: Default::default(),
     });
 
-    let bytes = logical_plan_to_bytes(&plan)?;
-    let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?;
+    let codec = ArrowLogicalExtensionCodec {};
+    let bytes = logical_plan_to_bytes_with_extension_codec(&plan, &codec)?;
+    let logical_round_trip =
+        logical_plan_from_bytes_with_extension_codec(&bytes, &ctx, &codec)?;
     assert_eq!(format!("{plan:?}"), format!("{logical_round_trip:?}"));
 
     match logical_round_trip {
         LogicalPlan::Copy(copy_to) => {
             assert_eq!("test.arrow", copy_to.output_url);
-            assert_eq!(FormatOptions::ARROW, copy_to.format_options);
+            assert_eq!("arrow".to_string(), copy_to.file_type.get_ext());
             assert_eq!(vec!["a", "b", "c"], copy_to.partition_by);
         }
         _ => panic!(),
@@ -437,22 +449,26 @@ async fn roundtrip_logical_plan_copy_to_csv() -> 
Result<()> {
     csv_format.time_format = Some("HH:mm:ss".to_string());
     csv_format.null_value = Some("NIL".to_string());
 
+    let file_type = format_as_file_type(Arc::new(CsvFormatFactory::new()));
+
     let plan = LogicalPlan::Copy(CopyTo {
         input: Arc::new(input),
         output_url: "test.csv".to_string(),
         partition_by: vec!["a".to_string(), "b".to_string(), "c".to_string()],
-        format_options: FormatOptions::CSV(csv_format.clone()),
+        file_type,
         options: Default::default(),
     });
 
-    let bytes = logical_plan_to_bytes(&plan)?;
-    let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?;
+    let codec = CsvLogicalExtensionCodec {};
+    let bytes = logical_plan_to_bytes_with_extension_codec(&plan, &codec)?;
+    let logical_round_trip =
+        logical_plan_from_bytes_with_extension_codec(&bytes, &ctx, &codec)?;
     assert_eq!(format!("{plan:?}"), format!("{logical_round_trip:?}"));
 
     match logical_round_trip {
         LogicalPlan::Copy(copy_to) => {
             assert_eq!("test.csv", copy_to.output_url);
-            assert_eq!(FormatOptions::CSV(csv_format), copy_to.format_options);
+            assert_eq!("csv".to_string(), copy_to.file_type.get_ext());
             assert_eq!(vec!["a", "b", "c"], copy_to.partition_by);
         }
         _ => panic!(),
diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs
index 30f95170a3..63ef86446a 100644
--- a/datafusion/sql/src/planner.rs
+++ b/datafusion/sql/src/planner.rs
@@ -21,6 +21,7 @@ use std::sync::Arc;
 use std::vec;
 
 use arrow_schema::*;
+use datafusion_common::file_options::file_type::FileType;
 use datafusion_common::{
     field_not_found, internal_err, plan_datafusion_err, DFSchemaRef, 
SchemaError,
 };
@@ -48,6 +49,11 @@ use crate::utils::make_decimal_type;
 pub trait ContextProvider {
     /// Getter for a datasource
     fn get_table_source(&self, name: TableReference) -> Result<Arc<dyn 
TableSource>>;
+
+    fn get_file_type(&self, _ext: &str) -> Result<Arc<dyn FileType>> {
+        not_impl_err!("Registered file types are not supported")
+    }
+
     /// Getter for a table function
     fn get_table_function_source(
         &self,
diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs
index cb492b390c..518972545a 100644
--- a/datafusion/sql/src/statement.rs
+++ b/datafusion/sql/src/statement.rs
@@ -34,8 +34,8 @@ use datafusion_common::parsers::CompressionTypeVariant;
 use datafusion_common::{
     exec_err, not_impl_err, plan_datafusion_err, plan_err, schema_err,
     unqualified_field_not_found, Column, Constraints, DFSchema, DFSchemaRef,
-    DataFusionError, FileType, Result, ScalarValue, SchemaError, 
SchemaReference,
-    TableReference, ToDFSchema,
+    DataFusionError, Result, ScalarValue, SchemaError, SchemaReference, 
TableReference,
+    ToDFSchema,
 };
 use datafusion_expr::dml::CopyTo;
 use 
datafusion_expr::expr_rewriter::normalize_col_with_schemas_and_ambiguity_check;
@@ -899,31 +899,35 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
             }
         }
 
-        let file_type = if let Some(file_type) = statement.stored_as {
-            FileType::from_str(&file_type).map_err(|_| {
-                DataFusionError::Configuration(format!("Unknown FileType {}", 
file_type))
-            })?
+        let maybe_file_type = if let Some(stored_as) = &statement.stored_as {
+            if let Ok(ext_file_type) = 
self.context_provider.get_file_type(stored_as) {
+                Some(ext_file_type)
+            } else {
+                None
+            }
         } else {
-            let e = || {
-                DataFusionError::Configuration(
-                    "Format not explicitly set and unable to get file 
extension! Use STORED AS to define file format."
-                        .to_string(),
-                )
-            };
-            // try to infer file format from file extension
-            let extension: &str = &Path::new(&statement.target)
-                .extension()
-                .ok_or_else(e)?
-                .to_str()
-                .ok_or_else(e)?
-                .to_lowercase();
-
-            FileType::from_str(extension).map_err(|e| {
-                DataFusionError::Configuration(format!(
-                    "{}. Use STORED AS to define file format.",
-                    e
-                ))
-            })?
+            None
+        };
+
+        let file_type = match maybe_file_type {
+            Some(ft) => ft,
+            None => {
+                let e = || {
+                    DataFusionError::Configuration(
+                        "Format not explicitly set and unable to get file 
extension! Use STORED AS to define file format."
+                            .to_string(),
+                    )
+                };
+                // try to infer file format from file extension
+                let extension: &str = &Path::new(&statement.target)
+                    .extension()
+                    .ok_or_else(e)?
+                    .to_str()
+                    .ok_or_else(e)?
+                    .to_lowercase();
+
+                self.context_provider.get_file_type(extension)?
+            }
         };
 
         let partition_by = statement
@@ -938,7 +942,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         Ok(LogicalPlan::Copy(CopyTo {
             input: Arc::new(input),
             output_url: statement.target,
-            format_options: file_type.into(),
+            file_type,
             partition_by,
             options,
         }))
diff --git a/datafusion/sql/tests/common/mod.rs 
b/datafusion/sql/tests/common/mod.rs
index 893678d6b3..f5caaefb3e 100644
--- a/datafusion/sql/tests/common/mod.rs
+++ b/datafusion/sql/tests/common/mod.rs
@@ -15,16 +15,39 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::any::Any;
 #[cfg(test)]
 use std::collections::HashMap;
+use std::fmt::Display;
 use std::{sync::Arc, vec};
 
 use arrow_schema::*;
 use datafusion_common::config::ConfigOptions;
-use datafusion_common::{plan_err, Result, TableReference};
+use datafusion_common::file_options::file_type::FileType;
+use datafusion_common::{plan_err, GetExt, Result, TableReference};
 use datafusion_expr::{AggregateUDF, ScalarUDF, TableSource, WindowUDF};
 use datafusion_sql::planner::ContextProvider;
 
+struct MockCsvType {}
+
+impl GetExt for MockCsvType {
+    fn get_ext(&self) -> String {
+        "csv".to_string()
+    }
+}
+
+impl FileType for MockCsvType {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+}
+
+impl Display for MockCsvType {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "{}", self.get_ext())
+    }
+}
+
 #[derive(Default)]
 pub(crate) struct MockContextProvider {
     options: ConfigOptions,
@@ -191,6 +214,13 @@ impl ContextProvider for MockContextProvider {
         &self.options
     }
 
+    fn get_file_type(
+        &self,
+        _ext: &str,
+    ) -> Result<Arc<dyn datafusion_common::file_options::file_type::FileType>> 
{
+        Ok(Arc::new(MockCsvType {}))
+    }
+
     fn create_cte_work_table(
         &self,
         _name: &str,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to