This is an automated email from the ASF dual-hosted git repository.

ozankabak pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new b137f60b9b Make COPY TO align with CREATE EXTERNAL TABLE (#9604)
b137f60b9b is described below

commit b137f60b9b6132d389efa9911b929d7b4d285b3c
Author: Metehan Yıldırım <[email protected]>
AuthorDate: Tue Mar 19 01:45:26 2024 +0300

    Make COPY TO align with CREATE EXTERNAL TABLE (#9604)
---
 datafusion-cli/src/catalog.rs                      |   2 +-
 datafusion-cli/src/exec.rs                         |   6 +-
 datafusion/common/src/config.rs                    | 221 ++++++++++++++++-----
 datafusion/common/src/file_options/mod.rs          |  85 ++++----
 datafusion/core/src/dataframe/mod.rs               |   9 +-
 datafusion/core/src/dataframe/parquet.rs           |   5 +-
 .../core/src/datasource/file_format/options.rs     |   2 +-
 datafusion/core/src/datasource/listing/table.rs    |  40 ++--
 .../core/src/datasource/listing_table_factory.rs   |   6 +-
 datafusion/core/src/execution/context/mod.rs       |  15 +-
 datafusion/core/src/physical_planner.rs            |   2 +-
 datafusion/core/src/test_util/parquet.rs           |   2 +-
 datafusion/core/tests/sql/sql_api.rs               |  12 +-
 .../proto/tests/cases/roundtrip_logical_plan.rs    |   9 +-
 datafusion/sql/src/parser.rs                       | 206 +++++++++++++++----
 datafusion/sql/src/statement.rs                    | 139 +++++--------
 datafusion/sql/tests/sql_integration.rs            |  26 ++-
 datafusion/sqllogictest/test_files/copy.slt        | 159 +++++++--------
 .../test_files/create_external_table.slt           |   4 +-
 datafusion/sqllogictest/test_files/csv_files.slt   |  10 +-
 datafusion/sqllogictest/test_files/group_by.slt    |   8 +-
 datafusion/sqllogictest/test_files/parquet.slt     |   8 +-
 datafusion/sqllogictest/test_files/repartition.slt |   2 +-
 .../sqllogictest/test_files/repartition_scan.slt   |   8 +-
 .../sqllogictest/test_files/schema_evolution.slt   |   8 +-
 docs/source/user-guide/sql/dml.md                  |   2 +-
 26 files changed, 598 insertions(+), 398 deletions(-)

diff --git a/datafusion-cli/src/catalog.rs b/datafusion-cli/src/catalog.rs
index a8ecb98637..46dd8bb00f 100644
--- a/datafusion-cli/src/catalog.rs
+++ b/datafusion-cli/src/catalog.rs
@@ -189,7 +189,7 @@ impl SchemaProvider for DynamicFileSchemaProvider {
                     &state,
                     table_url.scheme(),
                     url,
-                    state.default_table_options(),
+                    &state.default_table_options(),
                 )
                 .await?;
                 state.runtime_env().register_object_store(url, store);
diff --git a/datafusion-cli/src/exec.rs b/datafusion-cli/src/exec.rs
index b11f1c2022..ea765ee8ec 100644
--- a/datafusion-cli/src/exec.rs
+++ b/datafusion-cli/src/exec.rs
@@ -412,7 +412,7 @@ mod tests {
             )
         })?;
         for location in locations {
-            let sql = format!("copy (values (1,2)) to '{}';", location);
+            let sql = format!("copy (values (1,2)) to '{}' STORED AS 
PARQUET;", location);
             let statements = DFParser::parse_sql_with_dialect(&sql, 
dialect.as_ref())?;
             for statement in statements {
                 //Should not fail
@@ -438,8 +438,8 @@ mod tests {
         let location = "s3://bucket/path/file.parquet";
 
         // Missing region, use object_store defaults
-        let sql = format!("COPY (values (1,2)) TO '{location}'
-            (format parquet, 'aws.access_key_id' '{access_key_id}', 
'aws.secret_access_key' '{secret_access_key}')");
+        let sql = format!("COPY (values (1,2)) TO '{location}' STORED AS 
PARQUET
+            OPTIONS ('aws.access_key_id' '{access_key_id}', 
'aws.secret_access_key' '{secret_access_key}')");
         copy_to_table_test(location, &sql).await?;
 
         Ok(())
diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs
index 68b9ec9dab..968d8215ca 100644
--- a/datafusion/common/src/config.rs
+++ b/datafusion/common/src/config.rs
@@ -1109,58 +1109,163 @@ macro_rules! extensions_options {
     }
 }
 
+/// Represents the configuration options available for handling different 
table formats within a data processing application.
+/// This struct encompasses options for various file formats including CSV, 
Parquet, and JSON, allowing for flexible configuration
+/// of parsing and writing behaviors specific to each format. Additionally, it 
supports extending functionality through custom extensions.
 #[derive(Debug, Clone, Default)]
 pub struct TableOptions {
+    /// Configuration options for CSV file handling. This includes settings 
like the delimiter,
+    /// quote character, and whether the first row is considered as headers.
     pub csv: CsvOptions,
+
+    /// Configuration options for Parquet file handling. This includes 
settings for compression,
+    /// encoding, and other Parquet-specific file characteristics.
     pub parquet: TableParquetOptions,
+
+    /// Configuration options for JSON file handling.
     pub json: JsonOptions,
+
+    /// The current file format that the table operations should assume. This 
option allows
+    /// for dynamic switching between the supported file types (e.g., CSV, 
Parquet, JSON).
     pub current_format: Option<FileType>,
-    /// Optional extensions registered using [`Extensions::insert`]
+
+    /// Optional extensions that can be used to extend or customize the 
behavior of the table
+    /// options. Extensions can be registered using `Extensions::insert` and 
might include
+    /// custom file handling logic, additional configuration parameters, or 
other enhancements.
     pub extensions: Extensions,
 }
 
 impl ConfigField for TableOptions {
+    /// Visits configuration settings for the current file format, or all 
formats if none is selected.
+    ///
+    /// This method adapts the behavior based on whether a file format is 
currently selected in `current_format`.
+    /// If a format is selected, it visits only the settings relevant to that 
format. Otherwise,
+    /// it visits all available format settings.
     fn visit<V: Visit>(&self, v: &mut V, _key_prefix: &str, _description: 
&'static str) {
-        self.csv.visit(v, "csv", "");
-        self.parquet.visit(v, "parquet", "");
-        self.json.visit(v, "json", "");
+        if let Some(file_type) = &self.current_format {
+            match file_type {
+                #[cfg(feature = "parquet")]
+                FileType::PARQUET => self.parquet.visit(v, "format", ""),
+                FileType::CSV => self.csv.visit(v, "format", ""),
+                FileType::JSON => self.json.visit(v, "format", ""),
+                _ => {}
+            }
+        } else {
+            self.csv.visit(v, "csv", "");
+            self.parquet.visit(v, "parquet", "");
+            self.json.visit(v, "json", "");
+        }
     }
 
+    /// Sets a configuration value for a specific key within `TableOptions`.
+    ///
+    /// This method delegates setting configuration values to the specific 
file format configurations,
+    /// based on the current format selected. If no format is selected, it 
returns an error.
+    ///
+    /// # Parameters
+    ///
+    /// * `key`: The configuration key specifying which setting to adjust, 
prefixed with the format (e.g., "format.delimiter")
+    /// for CSV format.
+    /// * `value`: The value to set for the specified configuration key.
+    ///
+    /// # Returns
+    ///
+    /// A result indicating success or an error if the key is not recognized, 
if a format is not specified,
+    /// or if setting the configuration value fails for the specific format.
     fn set(&mut self, key: &str, value: &str) -> Result<()> {
         // Extensions are handled in the public `ConfigOptions::set`
         let (key, rem) = key.split_once('.').unwrap_or((key, ""));
+        let Some(format) = &self.current_format else {
+            return _config_err!("Specify a format for TableOptions");
+        };
         match key {
-            "csv" => self.csv.set(rem, value),
-            "parquet" => self.parquet.set(rem, value),
-            "json" => self.json.set(rem, value),
+            "format" => match format {
+                #[cfg(feature = "parquet")]
+                FileType::PARQUET => self.parquet.set(rem, value),
+                FileType::CSV => self.csv.set(rem, value),
+                FileType::JSON => self.json.set(rem, value),
+                _ => {
+                    _config_err!("Config value \"{key}\" is not supported on 
{}", format)
+                }
+            },
             _ => _config_err!("Config value \"{key}\" not found on 
TableOptions"),
         }
     }
 }
 
 impl TableOptions {
-    /// Creates a new [`ConfigOptions`] with default values
+    /// Constructs a new instance of `TableOptions` with default settings.
+    ///
+    /// # Returns
+    ///
+    /// A new `TableOptions` instance with default configuration values.
     pub fn new() -> Self {
         Self::default()
     }
 
+    /// Sets the file format for the table.
+    ///
+    /// # Parameters
+    ///
+    /// * `format`: The file format to use (e.g., CSV, Parquet).
     pub fn set_file_format(&mut self, format: FileType) {
         self.current_format = Some(format);
     }
 
+    /// Creates a new `TableOptions` instance initialized with settings from a 
given session config.
+    ///
+    /// # Parameters
+    ///
+    /// * `config`: A reference to the session `ConfigOptions` from which to 
derive initial settings.
+    ///
+    /// # Returns
+    ///
+    /// A new `TableOptions` instance with settings applied from the session 
config.
     pub fn default_from_session_config(config: &ConfigOptions) -> Self {
-        let mut initial = TableOptions::default();
-        initial.parquet.global = config.execution.parquet.clone();
+        let initial = TableOptions::default();
+        initial.combine_with_session_config(config);
         initial
     }
 
-    /// Set extensions to provided value
+    /// Updates the current `TableOptions` with settings from a given session 
config.
+    ///
+    /// # Parameters
+    ///
+    /// * `config`: A reference to the session `ConfigOptions` whose settings 
are to be applied.
+    ///
+    /// # Returns
+    ///
+    /// A new `TableOptions` instance with updated settings from the session 
config.
+    pub fn combine_with_session_config(&self, config: &ConfigOptions) -> Self {
+        let mut clone = self.clone();
+        clone.parquet.global = config.execution.parquet.clone();
+        clone
+    }
+
+    /// Sets the extensions for this `TableOptions` instance.
+    ///
+    /// # Parameters
+    ///
+    /// * `extensions`: The `Extensions` instance to set.
+    ///
+    /// # Returns
+    ///
+    /// A new `TableOptions` instance with the specified extensions applied.
     pub fn with_extensions(mut self, extensions: Extensions) -> Self {
         self.extensions = extensions;
         self
     }
 
-    /// Set a configuration option
+    /// Sets a specific configuration option.
+    ///
+    /// # Parameters
+    ///
+    /// * `key`: The configuration key (e.g., "format.delimiter").
+    /// * `value`: The value to set for the specified key.
+    ///
+    /// # Returns
+    ///
+    /// A result indicating success or failure in setting the configuration 
option.
     pub fn set(&mut self, key: &str, value: &str) -> Result<()> {
         let (prefix, _) = key.split_once('.').ok_or_else(|| {
             DataFusionError::Configuration(format!(
@@ -1168,28 +1273,7 @@ impl TableOptions {
             ))
         })?;
 
-        if prefix == "csv" || prefix == "json" || prefix == "parquet" {
-            if let Some(format) = &self.current_format {
-                match format {
-                    FileType::CSV if prefix != "csv" => {
-                        return Err(DataFusionError::Configuration(format!(
-                            "Key \"{key}\" is not applicable for CSV format"
-                        )))
-                    }
-                    #[cfg(feature = "parquet")]
-                    FileType::PARQUET if prefix != "parquet" => {
-                        return Err(DataFusionError::Configuration(format!(
-                            "Key \"{key}\" is not applicable for PARQUET 
format"
-                        )))
-                    }
-                    FileType::JSON if prefix != "json" => {
-                        return Err(DataFusionError::Configuration(format!(
-                            "Key \"{key}\" is not applicable for JSON format"
-                        )))
-                    }
-                    _ => {}
-                }
-            }
+        if prefix == "format" {
             return ConfigField::set(self, key, value);
         }
 
@@ -1202,6 +1286,15 @@ impl TableOptions {
         e.0.set(key, value)
     }
 
+    /// Initializes a new `TableOptions` from a hash map of string settings.
+    ///
+    /// # Parameters
+    ///
+    /// * `settings`: A hash map where each key-value pair represents a 
configuration setting.
+    ///
+    /// # Returns
+    ///
+    /// A result containing the new `TableOptions` instance or an error if any 
setting could not be applied.
     pub fn from_string_hash_map(settings: &HashMap<String, String>) -> 
Result<Self> {
         let mut ret = Self::default();
         for (k, v) in settings {
@@ -1211,6 +1304,15 @@ impl TableOptions {
         Ok(ret)
     }
 
+    /// Modifies the current `TableOptions` instance with settings from a hash 
map.
+    ///
+    /// # Parameters
+    ///
+    /// * `settings`: A hash map where each key-value pair represents a 
configuration setting.
+    ///
+    /// # Returns
+    ///
+    /// A result indicating success or failure in applying the settings.
     pub fn alter_with_string_hash_map(
         &mut self,
         settings: &HashMap<String, String>,
@@ -1221,7 +1323,11 @@ impl TableOptions {
         Ok(())
     }
 
-    /// Returns the [`ConfigEntry`] stored within this [`ConfigOptions`]
+    /// Retrieves all configuration entries from this `TableOptions`.
+    ///
+    /// # Returns
+    ///
+    /// A vector of `ConfigEntry` instances, representing all the 
configuration options within this `TableOptions`.
     pub fn entries(&self) -> Vec<ConfigEntry> {
         struct Visitor(Vec<ConfigEntry>);
 
@@ -1249,9 +1355,7 @@ impl TableOptions {
         }
 
         let mut v = Visitor(vec![]);
-        self.visit(&mut v, "csv", "");
-        self.visit(&mut v, "json", "");
-        self.visit(&mut v, "parquet", "");
+        self.visit(&mut v, "format", "");
 
         v.0.extend(self.extensions.0.values().flat_map(|e| e.0.entries()));
         v.0
@@ -1556,6 +1660,7 @@ mod tests {
     use crate::config::{
         ConfigEntry, ConfigExtension, ExtensionOptions, Extensions, 
TableOptions,
     };
+    use crate::FileType;
 
     #[derive(Default, Debug, Clone)]
     pub struct TestExtensionConfig {
@@ -1609,12 +1714,13 @@ mod tests {
     }
 
     #[test]
-    fn alter_kafka_config() {
+    fn alter_test_extension_config() {
         let mut extension = Extensions::new();
         extension.insert(TestExtensionConfig::default());
         let mut table_config = TableOptions::new().with_extensions(extension);
-        table_config.set("parquet.write_batch_size", "10").unwrap();
-        assert_eq!(table_config.parquet.global.write_batch_size, 10);
+        table_config.set_file_format(FileType::CSV);
+        table_config.set("format.delimiter", ";").unwrap();
+        assert_eq!(table_config.csv.delimiter, b';');
         table_config.set("test.bootstrap.servers", "asd").unwrap();
         let kafka_config = table_config
             .extensions
@@ -1626,11 +1732,25 @@ mod tests {
         );
     }
 
+    #[test]
+    fn csv_u8_table_options() {
+        let mut table_config = TableOptions::new();
+        table_config.set_file_format(FileType::CSV);
+        table_config.set("format.delimiter", ";").unwrap();
+        assert_eq!(table_config.csv.delimiter as char, ';');
+        table_config.set("format.escape", "\"").unwrap();
+        assert_eq!(table_config.csv.escape.unwrap() as char, '"');
+        table_config.set("format.escape", "\'").unwrap();
+        assert_eq!(table_config.csv.escape.unwrap() as char, '\'');
+    }
+
+    #[cfg(feature = "parquet")]
     #[test]
     fn parquet_table_options() {
         let mut table_config = TableOptions::new();
+        table_config.set_file_format(FileType::PARQUET);
         table_config
-            .set("parquet.bloom_filter_enabled::col1", "true")
+            .set("format.bloom_filter_enabled::col1", "true")
             .unwrap();
         assert_eq!(
             
table_config.parquet.column_specific_options["col1"].bloom_filter_enabled,
@@ -1638,26 +1758,17 @@ mod tests {
         );
     }
 
-    #[test]
-    fn csv_u8_table_options() {
-        let mut table_config = TableOptions::new();
-        table_config.set("csv.delimiter", ";").unwrap();
-        assert_eq!(table_config.csv.delimiter as char, ';');
-        table_config.set("csv.escape", "\"").unwrap();
-        assert_eq!(table_config.csv.escape.unwrap() as char, '"');
-        table_config.set("csv.escape", "\'").unwrap();
-        assert_eq!(table_config.csv.escape.unwrap() as char, '\'');
-    }
-
+    #[cfg(feature = "parquet")]
     #[test]
     fn parquet_table_options_config_entry() {
         let mut table_config = TableOptions::new();
+        table_config.set_file_format(FileType::PARQUET);
         table_config
-            .set("parquet.bloom_filter_enabled::col1", "true")
+            .set("format.bloom_filter_enabled::col1", "true")
             .unwrap();
         let entries = table_config.entries();
         assert!(entries
             .iter()
-            .any(|item| item.key == "parquet.bloom_filter_enabled::col1"))
+            .any(|item| item.key == "format.bloom_filter_enabled::col1"))
     }
 }
diff --git a/datafusion/common/src/file_options/mod.rs 
b/datafusion/common/src/file_options/mod.rs
index a72b812adc..eb1ce1b364 100644
--- a/datafusion/common/src/file_options/mod.rs
+++ b/datafusion/common/src/file_options/mod.rs
@@ -35,7 +35,7 @@ mod tests {
         config::TableOptions,
         file_options::{csv_writer::CsvWriterOptions, 
json_writer::JsonWriterOptions},
         parsers::CompressionTypeVariant,
-        Result,
+        FileType, Result,
     };
 
     use parquet::{
@@ -47,35 +47,36 @@ mod tests {
     #[test]
     fn test_writeroptions_parquet_from_statement_options() -> Result<()> {
         let mut option_map: HashMap<String, String> = HashMap::new();
-        option_map.insert("parquet.max_row_group_size".to_owned(), 
"123".to_owned());
-        option_map.insert("parquet.data_pagesize_limit".to_owned(), 
"123".to_owned());
-        option_map.insert("parquet.write_batch_size".to_owned(), 
"123".to_owned());
-        option_map.insert("parquet.writer_version".to_owned(), 
"2.0".to_owned());
+        option_map.insert("format.max_row_group_size".to_owned(), 
"123".to_owned());
+        option_map.insert("format.data_pagesize_limit".to_owned(), 
"123".to_owned());
+        option_map.insert("format.write_batch_size".to_owned(), 
"123".to_owned());
+        option_map.insert("format.writer_version".to_owned(), 
"2.0".to_owned());
         option_map.insert(
-            "parquet.dictionary_page_size_limit".to_owned(),
+            "format.dictionary_page_size_limit".to_owned(),
             "123".to_owned(),
         );
         option_map.insert(
-            "parquet.created_by".to_owned(),
+            "format.created_by".to_owned(),
             "df write unit test".to_owned(),
         );
         option_map.insert(
-            "parquet.column_index_truncate_length".to_owned(),
+            "format.column_index_truncate_length".to_owned(),
             "123".to_owned(),
         );
         option_map.insert(
-            "parquet.data_page_row_count_limit".to_owned(),
+            "format.data_page_row_count_limit".to_owned(),
             "123".to_owned(),
         );
-        option_map.insert("parquet.bloom_filter_enabled".to_owned(), 
"true".to_owned());
-        option_map.insert("parquet.encoding".to_owned(), "plain".to_owned());
-        option_map.insert("parquet.dictionary_enabled".to_owned(), 
"true".to_owned());
-        option_map.insert("parquet.compression".to_owned(), 
"zstd(4)".to_owned());
-        option_map.insert("parquet.statistics_enabled".to_owned(), 
"page".to_owned());
-        option_map.insert("parquet.bloom_filter_fpp".to_owned(), 
"0.123".to_owned());
-        option_map.insert("parquet.bloom_filter_ndv".to_owned(), 
"123".to_owned());
+        option_map.insert("format.bloom_filter_enabled".to_owned(), 
"true".to_owned());
+        option_map.insert("format.encoding".to_owned(), "plain".to_owned());
+        option_map.insert("format.dictionary_enabled".to_owned(), 
"true".to_owned());
+        option_map.insert("format.compression".to_owned(), 
"zstd(4)".to_owned());
+        option_map.insert("format.statistics_enabled".to_owned(), 
"page".to_owned());
+        option_map.insert("format.bloom_filter_fpp".to_owned(), 
"0.123".to_owned());
+        option_map.insert("format.bloom_filter_ndv".to_owned(), 
"123".to_owned());
 
         let mut table_config = TableOptions::new();
+        table_config.set_file_format(FileType::PARQUET);
         table_config.alter_with_string_hash_map(&option_map)?;
 
         let parquet_options = 
ParquetWriterOptions::try_from(&table_config.parquet)?;
@@ -131,54 +132,52 @@ mod tests {
         let mut option_map: HashMap<String, String> = HashMap::new();
 
         option_map.insert(
-            "parquet.bloom_filter_enabled::col1".to_owned(),
+            "format.bloom_filter_enabled::col1".to_owned(),
             "true".to_owned(),
         );
         option_map.insert(
-            "parquet.bloom_filter_enabled::col2.nested".to_owned(),
+            "format.bloom_filter_enabled::col2.nested".to_owned(),
             "true".to_owned(),
         );
-        option_map.insert("parquet.encoding::col1".to_owned(), 
"plain".to_owned());
-        option_map.insert("parquet.encoding::col2.nested".to_owned(), 
"rle".to_owned());
+        option_map.insert("format.encoding::col1".to_owned(), 
"plain".to_owned());
+        option_map.insert("format.encoding::col2.nested".to_owned(), 
"rle".to_owned());
         option_map.insert(
-            "parquet.dictionary_enabled::col1".to_owned(),
+            "format.dictionary_enabled::col1".to_owned(),
             "true".to_owned(),
         );
         option_map.insert(
-            "parquet.dictionary_enabled::col2.nested".to_owned(),
+            "format.dictionary_enabled::col2.nested".to_owned(),
             "true".to_owned(),
         );
-        option_map.insert("parquet.compression::col1".to_owned(), 
"zstd(4)".to_owned());
+        option_map.insert("format.compression::col1".to_owned(), 
"zstd(4)".to_owned());
         option_map.insert(
-            "parquet.compression::col2.nested".to_owned(),
+            "format.compression::col2.nested".to_owned(),
             "zstd(10)".to_owned(),
         );
         option_map.insert(
-            "parquet.statistics_enabled::col1".to_owned(),
+            "format.statistics_enabled::col1".to_owned(),
             "page".to_owned(),
         );
         option_map.insert(
-            "parquet.statistics_enabled::col2.nested".to_owned(),
+            "format.statistics_enabled::col2.nested".to_owned(),
             "none".to_owned(),
         );
         option_map.insert(
-            "parquet.bloom_filter_fpp::col1".to_owned(),
+            "format.bloom_filter_fpp::col1".to_owned(),
             "0.123".to_owned(),
         );
         option_map.insert(
-            "parquet.bloom_filter_fpp::col2.nested".to_owned(),
+            "format.bloom_filter_fpp::col2.nested".to_owned(),
             "0.456".to_owned(),
         );
+        option_map.insert("format.bloom_filter_ndv::col1".to_owned(), 
"123".to_owned());
         option_map.insert(
-            "parquet.bloom_filter_ndv::col1".to_owned(),
-            "123".to_owned(),
-        );
-        option_map.insert(
-            "parquet.bloom_filter_ndv::col2.nested".to_owned(),
+            "format.bloom_filter_ndv::col2.nested".to_owned(),
             "456".to_owned(),
         );
 
         let mut table_config = TableOptions::new();
+        table_config.set_file_format(FileType::PARQUET);
         table_config.alter_with_string_hash_map(&option_map)?;
 
         let parquet_options = 
ParquetWriterOptions::try_from(&table_config.parquet)?;
@@ -271,16 +270,17 @@ mod tests {
     // for StatementOptions
     fn test_writeroptions_csv_from_statement_options() -> Result<()> {
         let mut option_map: HashMap<String, String> = HashMap::new();
-        option_map.insert("csv.has_header".to_owned(), "true".to_owned());
-        option_map.insert("csv.date_format".to_owned(), "123".to_owned());
-        option_map.insert("csv.datetime_format".to_owned(), "123".to_owned());
-        option_map.insert("csv.timestamp_format".to_owned(), "2.0".to_owned());
-        option_map.insert("csv.time_format".to_owned(), "123".to_owned());
-        option_map.insert("csv.null_value".to_owned(), "123".to_owned());
-        option_map.insert("csv.compression".to_owned(), "gzip".to_owned());
-        option_map.insert("csv.delimiter".to_owned(), ";".to_owned());
+        option_map.insert("format.has_header".to_owned(), "true".to_owned());
+        option_map.insert("format.date_format".to_owned(), "123".to_owned());
+        option_map.insert("format.datetime_format".to_owned(), 
"123".to_owned());
+        option_map.insert("format.timestamp_format".to_owned(), 
"2.0".to_owned());
+        option_map.insert("format.time_format".to_owned(), "123".to_owned());
+        option_map.insert("format.null_value".to_owned(), "123".to_owned());
+        option_map.insert("format.compression".to_owned(), "gzip".to_owned());
+        option_map.insert("format.delimiter".to_owned(), ";".to_owned());
 
         let mut table_config = TableOptions::new();
+        table_config.set_file_format(FileType::CSV);
         table_config.alter_with_string_hash_map(&option_map)?;
 
         let csv_options = CsvWriterOptions::try_from(&table_config.csv)?;
@@ -299,9 +299,10 @@ mod tests {
     // for StatementOptions
     fn test_writeroptions_json_from_statement_options() -> Result<()> {
         let mut option_map: HashMap<String, String> = HashMap::new();
-        option_map.insert("json.compression".to_owned(), "gzip".to_owned());
+        option_map.insert("format.compression".to_owned(), "gzip".to_owned());
 
         let mut table_config = TableOptions::new();
+        table_config.set_file_format(FileType::JSON);
         table_config.alter_with_string_hash_map(&option_map)?;
 
         let json_options = JsonWriterOptions::try_from(&table_config.json)?;
diff --git a/datafusion/core/src/dataframe/mod.rs 
b/datafusion/core/src/dataframe/mod.rs
index 2583040157..eea5fc1127 100644
--- a/datafusion/core/src/dataframe/mod.rs
+++ b/datafusion/core/src/dataframe/mod.rs
@@ -1151,8 +1151,8 @@ impl DataFrame {
                 "Overwrites are not implemented for 
DataFrame::write_csv.".to_owned(),
             ));
         }
-        let table_options = self.session_state.default_table_options();
-        let props = writer_options.unwrap_or_else(|| 
table_options.csv.clone());
+        let props = writer_options
+            .unwrap_or_else(|| self.session_state.default_table_options().csv);
 
         let plan = LogicalPlanBuilder::copy_to(
             self.plan,
@@ -1200,9 +1200,8 @@ impl DataFrame {
             ));
         }
 
-        let table_options = self.session_state.default_table_options();
-
-        let props = writer_options.unwrap_or_else(|| 
table_options.json.clone());
+        let props = writer_options
+            .unwrap_or_else(|| 
self.session_state.default_table_options().json);
 
         let plan = LogicalPlanBuilder::copy_to(
             self.plan,
diff --git a/datafusion/core/src/dataframe/parquet.rs 
b/datafusion/core/src/dataframe/parquet.rs
index f4e8c9dfcd..e3f606e322 100644
--- a/datafusion/core/src/dataframe/parquet.rs
+++ b/datafusion/core/src/dataframe/parquet.rs
@@ -57,9 +57,8 @@ impl DataFrame {
             ));
         }
 
-        let table_options = self.session_state.default_table_options();
-
-        let props = writer_options.unwrap_or_else(|| 
table_options.parquet.clone());
+        let props = writer_options
+            .unwrap_or_else(|| 
self.session_state.default_table_options().parquet);
 
         let plan = LogicalPlanBuilder::copy_to(
             self.plan,
diff --git a/datafusion/core/src/datasource/file_format/options.rs 
b/datafusion/core/src/datasource/file_format/options.rs
index f66683c311..f5bd72495d 100644
--- a/datafusion/core/src/datasource/file_format/options.rs
+++ b/datafusion/core/src/datasource/file_format/options.rs
@@ -461,7 +461,7 @@ pub trait ReadOptions<'a> {
             return Ok(Arc::new(s.to_owned()));
         }
 
-        self.to_listing_options(config, state.default_table_options().clone())
+        self.to_listing_options(config, state.default_table_options())
             .infer_schema(&state, &table_path)
             .await
     }
diff --git a/datafusion/core/src/datasource/listing/table.rs 
b/datafusion/core/src/datasource/listing/table.rs
index 2a2551236e..c1e337b5c4 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -118,7 +118,7 @@ impl ListingTableConfig {
         }
     }
 
-    fn infer_format(path: &str) -> Result<(Arc<dyn FileFormat>, String)> {
+    fn infer_file_type(path: &str) -> Result<(FileType, String)> {
         let err_msg = format!("Unable to infer file type from path: {path}");
 
         let mut exts = path.rsplit('.');
@@ -139,20 +139,7 @@ impl ListingTableConfig {
             .get_ext_with_compression(file_compression_type.to_owned())
             .map_err(|_| DataFusionError::Internal(err_msg))?;
 
-        let file_format: Arc<dyn FileFormat> = match file_type {
-            FileType::ARROW => Arc::new(ArrowFormat),
-            FileType::AVRO => Arc::new(AvroFormat),
-            FileType::CSV => Arc::new(
-                
CsvFormat::default().with_file_compression_type(file_compression_type),
-            ),
-            FileType::JSON => Arc::new(
-                
JsonFormat::default().with_file_compression_type(file_compression_type),
-            ),
-            #[cfg(feature = "parquet")]
-            FileType::PARQUET => Arc::new(ParquetFormat::default()),
-        };
-
-        Ok((file_format, ext))
+        Ok((file_type, ext))
     }
 
     /// Infer `ListingOptions` based on `table_path` suffix.
@@ -173,10 +160,27 @@ impl ListingTableConfig {
             .await
             .ok_or_else(|| DataFusionError::Internal("No files for 
table".into()))??;
 
-        let (format, file_extension) =
-            ListingTableConfig::infer_format(file.location.as_ref())?;
+        let (file_type, file_extension) =
+            ListingTableConfig::infer_file_type(file.location.as_ref())?;
+
+        let mut table_options = state.default_table_options();
+        table_options.set_file_format(file_type.clone());
+        let file_format: Arc<dyn FileFormat> = match file_type {
+            FileType::CSV => {
+                Arc::new(CsvFormat::default().with_options(table_options.csv))
+            }
+            #[cfg(feature = "parquet")]
+            FileType::PARQUET => {
+                
Arc::new(ParquetFormat::default().with_options(table_options.parquet))
+            }
+            FileType::AVRO => Arc::new(AvroFormat),
+            FileType::JSON => {
+                
Arc::new(JsonFormat::default().with_options(table_options.json))
+            }
+            FileType::ARROW => Arc::new(ArrowFormat),
+        };
 
-        let listing_options = ListingOptions::new(format)
+        let listing_options = ListingOptions::new(file_format)
             .with_file_extension(file_extension)
             .with_target_partitions(state.config().target_partitions());
 
diff --git a/datafusion/core/src/datasource/listing_table_factory.rs 
b/datafusion/core/src/datasource/listing_table_factory.rs
index 4e126bbba9..b616e0181c 100644
--- a/datafusion/core/src/datasource/listing_table_factory.rs
+++ b/datafusion/core/src/datasource/listing_table_factory.rs
@@ -34,7 +34,6 @@ use crate::datasource::TableProvider;
 use crate::execution::context::SessionState;
 
 use arrow::datatypes::{DataType, SchemaRef};
-use datafusion_common::config::TableOptions;
 use datafusion_common::{arrow_datafusion_err, DataFusionError, FileType};
 use datafusion_expr::CreateExternalTable;
 
@@ -58,8 +57,7 @@ impl TableProviderFactory for ListingTableFactory {
         state: &SessionState,
         cmd: &CreateExternalTable,
     ) -> datafusion_common::Result<Arc<dyn TableProvider>> {
-        let mut table_options =
-            TableOptions::default_from_session_config(state.config_options());
+        let mut table_options = state.default_table_options();
         let file_type = FileType::from_str(cmd.file_type.as_str()).map_err(|_| 
{
             DataFusionError::Execution(format!("Unknown FileType {}", 
cmd.file_type))
         })?;
@@ -227,7 +225,7 @@ mod tests {
         let name = OwnedTableReference::bare("foo".to_string());
 
         let mut options = HashMap::new();
-        options.insert("csv.schema_infer_max_rec".to_owned(), 
"1000".to_owned());
+        options.insert("format.schema_infer_max_rec".to_owned(), 
"1000".to_owned());
         let cmd = CreateExternalTable {
             name,
             location: csv_file.path().to_str().unwrap().to_string(),
diff --git a/datafusion/core/src/execution/context/mod.rs 
b/datafusion/core/src/execution/context/mod.rs
index 1ac7da4652..116e45c8c1 100644
--- a/datafusion/core/src/execution/context/mod.rs
+++ b/datafusion/core/src/execution/context/mod.rs
@@ -384,9 +384,9 @@ impl SessionContext {
         self.state.read().config.clone()
     }
 
-    /// Return a copied version of config for this Session
+    /// Return a copied version of table options for this Session
     pub fn copied_table_options(&self) -> TableOptions {
-        self.state.read().default_table_options().clone()
+        self.state.read().default_table_options()
     }
 
     /// Creates a [`DataFrame`] from SQL query text.
@@ -1750,11 +1750,7 @@ impl SessionState {
                         .0
                         
.insert(ObjectName(vec![Ident::from(table.name.as_str())]));
                 }
-                DFStatement::CopyTo(CopyToStatement {
-                    source,
-                    target: _,
-                    options: _,
-                }) => match source {
+                DFStatement::CopyTo(CopyToStatement { source, .. }) => match 
source {
                     CopyToSource::Relation(table_name) => {
                         visitor.insert(table_name);
                     }
@@ -1963,8 +1959,9 @@ impl SessionState {
     }
 
     /// return the TableOptions options with its extensions
-    pub fn default_table_options(&self) -> &TableOptions {
-        &self.table_option_namespace
+    pub fn default_table_options(&self) -> TableOptions {
+        self.table_option_namespace
+            .combine_with_session_config(self.config_options())
     }
 
     /// Get a new TaskContext to run in this session
diff --git a/datafusion/core/src/physical_planner.rs 
b/datafusion/core/src/physical_planner.rs
index 96f5e1c3ff..ee581ca642 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -595,7 +595,7 @@ impl DefaultPhysicalPlanner {
                         table_partition_cols,
                         overwrite: false,
                     };
-                    let mut table_options = 
session_state.default_table_options().clone();
+                    let mut table_options = 
session_state.default_table_options();
                     let sink_format: Arc<dyn FileFormat> = match 
format_options {
                         FormatOptions::CSV(options) => {
                             table_options.csv = options.clone();
diff --git a/datafusion/core/src/test_util/parquet.rs 
b/datafusion/core/src/test_util/parquet.rs
index 7a466a666d..8113d799a1 100644
--- a/datafusion/core/src/test_util/parquet.rs
+++ b/datafusion/core/src/test_util/parquet.rs
@@ -165,7 +165,7 @@ impl TestParquetFile {
         // run coercion on the filters to coerce types etc.
         let props = ExecutionProps::new();
         let context = 
SimplifyContext::new(&props).with_schema(df_schema.clone());
-        let parquet_options = 
ctx.state().default_table_options().parquet.clone();
+        let parquet_options = ctx.copied_table_options().parquet;
         if let Some(filter) = maybe_filter {
             let simplifier = ExprSimplifier::new(context);
             let filter = simplifier.coerce(filter, df_schema.clone()).unwrap();
diff --git a/datafusion/core/tests/sql/sql_api.rs 
b/datafusion/core/tests/sql/sql_api.rs
index d7adc9611b..b3a819fbc3 100644
--- a/datafusion/core/tests/sql/sql_api.rs
+++ b/datafusion/core/tests/sql/sql_api.rs
@@ -16,6 +16,7 @@
 // under the License.
 
 use datafusion::prelude::*;
+
 use tempfile::TempDir;
 
 #[tokio::test]
@@ -27,7 +28,7 @@ async fn unsupported_ddl_returns_error() {
     // disallow ddl
     let options = SQLOptions::new().with_allow_ddl(false);
 
-    let sql = "create view test_view as select * from test";
+    let sql = "CREATE VIEW test_view AS SELECT * FROM test";
     let df = ctx.sql_with_options(sql, options).await;
     assert_eq!(
         df.unwrap_err().strip_backtrace(),
@@ -46,7 +47,7 @@ async fn unsupported_dml_returns_error() {
 
     let options = SQLOptions::new().with_allow_dml(false);
 
-    let sql = "insert into test values (1)";
+    let sql = "INSERT INTO test VALUES (1)";
     let df = ctx.sql_with_options(sql, options).await;
     assert_eq!(
         df.unwrap_err().strip_backtrace(),
@@ -67,7 +68,10 @@ async fn unsupported_copy_returns_error() {
 
     let options = SQLOptions::new().with_allow_dml(false);
 
-    let sql = format!("copy (values(1)) to '{}'", tmpfile.to_string_lossy());
+    let sql = format!(
+        "COPY (values(1)) TO '{}' STORED AS parquet",
+        tmpfile.to_string_lossy()
+    );
     let df = ctx.sql_with_options(&sql, options).await;
     assert_eq!(
         df.unwrap_err().strip_backtrace(),
@@ -106,7 +110,7 @@ async fn ddl_can_not_be_planned_by_session_state() {
     let state = ctx.state();
 
     // can not create a logical plan for catalog DDL
-    let sql = "drop table test";
+    let sql = "DROP TABLE test";
     let plan = state.create_logical_plan(sql).await.unwrap();
     let physical_plan = state.create_physical_plan(&plan).await;
     assert_eq!(
diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs 
b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
index 93de560dbe..3c43f10075 100644
--- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
@@ -35,7 +35,7 @@ use datafusion_common::config::{FormatOptions, TableOptions};
 use datafusion_common::scalar::ScalarStructBuilder;
 use datafusion_common::{
     internal_err, not_impl_err, plan_err, DFField, DFSchema, DFSchemaRef,
-    DataFusionError, Result, ScalarValue,
+    DataFusionError, FileType, Result, ScalarValue,
 };
 use datafusion_expr::dml::CopyTo;
 use datafusion_expr::expr::{
@@ -314,10 +314,9 @@ async fn roundtrip_logical_plan_copy_to_sql_options() -> 
Result<()> {
     let ctx = SessionContext::new();
 
     let input = create_csv_scan(&ctx).await?;
-
-    let mut table_options =
-        
TableOptions::default_from_session_config(ctx.state().config_options());
-    table_options.set("csv.delimiter", ";")?;
+    let mut table_options = ctx.copied_table_options();
+    table_options.set_file_format(FileType::CSV);
+    table_options.set("format.delimiter", ";")?;
 
     let plan = LogicalPlan::Copy(CopyTo {
         input: Arc::new(input),
diff --git a/datafusion/sql/src/parser.rs b/datafusion/sql/src/parser.rs
index effc1d096c..a5d7970495 100644
--- a/datafusion/sql/src/parser.rs
+++ b/datafusion/sql/src/parser.rs
@@ -17,21 +17,20 @@
 
 //! [`DFParser`]: DataFusion SQL Parser based on [`sqlparser`]
 
+use std::collections::{HashMap, VecDeque};
+use std::fmt;
+use std::str::FromStr;
+
 use datafusion_common::parsers::CompressionTypeVariant;
-use sqlparser::ast::{OrderByExpr, Query, Value};
-use sqlparser::tokenizer::Word;
 use sqlparser::{
     ast::{
-        ColumnDef, ColumnOptionDef, ObjectName, Statement as SQLStatement,
-        TableConstraint,
+        ColumnDef, ColumnOptionDef, ObjectName, OrderByExpr, Query,
+        Statement as SQLStatement, TableConstraint, Value,
     },
     dialect::{keywords::Keyword, Dialect, GenericDialect},
     parser::{Parser, ParserError},
-    tokenizer::{Token, TokenWithLocation, Tokenizer},
+    tokenizer::{Token, TokenWithLocation, Tokenizer, Word},
 };
-use std::collections::VecDeque;
-use std::fmt;
-use std::{collections::HashMap, str::FromStr};
 
 // Use `Parser::expected` instead, if possible
 macro_rules! parser_err {
@@ -102,6 +101,12 @@ pub struct CopyToStatement {
     pub source: CopyToSource,
     /// The URL to where the data is heading
     pub target: String,
+    /// Partition keys
+    pub partitioned_by: Vec<String>,
+    /// Indicates whether there is a header row (e.g. CSV)
+    pub has_header: bool,
+    /// File type (Parquet, NDJSON, CSV etc.)
+    pub stored_as: Option<String>,
     /// Target specific options
     pub options: Vec<(String, Value)>,
 }
@@ -111,15 +116,27 @@ impl fmt::Display for CopyToStatement {
         let Self {
             source,
             target,
+            partitioned_by,
+            stored_as,
             options,
+            ..
         } = self;
 
         write!(f, "COPY {source} TO {target}")?;
+        if let Some(file_type) = stored_as {
+            write!(f, " STORED AS {}", file_type)?;
+        }
+        if !partitioned_by.is_empty() {
+            write!(f, " PARTITIONED BY ({})", partitioned_by.join(", "))?;
+        }
+
+        if self.has_header {
+            write!(f, " WITH HEADER ROW")?;
+        }
 
         if !options.is_empty() {
             let opts: Vec<_> = options.iter().map(|(k, v)| format!("{k} 
{v}")).collect();
-            // print them in sorted order
-            write!(f, " ({})", opts.join(", "))?;
+            write!(f, " OPTIONS ({})", opts.join(", "))?;
         }
 
         Ok(())
@@ -243,6 +260,15 @@ impl fmt::Display for Statement {
     }
 }
 
+fn ensure_not_set<T>(field: &Option<T>, name: &str) -> Result<(), ParserError> 
{
+    if field.is_some() {
+        return Err(ParserError::ParserError(format!(
+            "{name} specified more than once",
+        )));
+    }
+    Ok(())
+}
+
 /// Datafusion SQL Parser based on [`sqlparser`]
 ///
 /// Parses DataFusion's SQL dialect, often delegating to [`sqlparser`]'s 
[`Parser`].
@@ -370,21 +396,79 @@ impl<'a> DFParser<'a> {
             CopyToSource::Relation(table_name)
         };
 
-        self.parser.expect_keyword(Keyword::TO)?;
+        #[derive(Default)]
+        struct Builder {
+            stored_as: Option<String>,
+            target: Option<String>,
+            partitioned_by: Option<Vec<String>>,
+            has_header: Option<bool>,
+            options: Option<Vec<(String, Value)>>,
+        }
 
-        let target = self.parser.parse_literal_string()?;
+        let mut builder = Builder::default();
 
-        // check for options in parens
-        let options = if self.parser.peek_token().token == Token::LParen {
-            self.parse_value_options()?
-        } else {
-            vec![]
+        loop {
+            if let Some(keyword) = self.parser.parse_one_of_keywords(&[
+                Keyword::STORED,
+                Keyword::TO,
+                Keyword::PARTITIONED,
+                Keyword::OPTIONS,
+                Keyword::WITH,
+            ]) {
+                match keyword {
+                    Keyword::STORED => {
+                        self.parser.expect_keyword(Keyword::AS)?;
+                        ensure_not_set(&builder.stored_as, "STORED AS")?;
+                        builder.stored_as = Some(self.parse_file_format()?);
+                    }
+                    Keyword::TO => {
+                        ensure_not_set(&builder.target, "TO")?;
+                        builder.target = 
Some(self.parser.parse_literal_string()?);
+                    }
+                    Keyword::WITH => {
+                        self.parser.expect_keyword(Keyword::HEADER)?;
+                        self.parser.expect_keyword(Keyword::ROW)?;
+                        ensure_not_set(&builder.has_header, "WITH HEADER 
ROW")?;
+                        builder.has_header = Some(true);
+                    }
+                    Keyword::PARTITIONED => {
+                        self.parser.expect_keyword(Keyword::BY)?;
+                        ensure_not_set(&builder.partitioned_by, "PARTITIONED 
BY")?;
+                        builder.partitioned_by = 
Some(self.parse_partitions()?);
+                    }
+                    Keyword::OPTIONS => {
+                        ensure_not_set(&builder.options, "OPTIONS")?;
+                        builder.options = Some(self.parse_value_options()?);
+                    }
+                    _ => {
+                        unreachable!()
+                    }
+                }
+            } else {
+                let token = self.parser.next_token();
+                if token == Token::EOF || token == Token::SemiColon {
+                    break;
+                } else {
+                    return Err(ParserError::ParserError(format!(
+                        "Unexpected token {token}"
+                    )));
+                }
+            }
+        }
+
+        let Some(target) = builder.target else {
+            return Err(ParserError::ParserError(
+                "Missing TO clause in COPY statement".into(),
+            ));
         };
 
         Ok(Statement::CopyTo(CopyToStatement {
             source,
             target,
-            options,
+            partitioned_by: builder.partitioned_by.unwrap_or(vec![]),
+            has_header: builder.has_header.unwrap_or(false),
+            stored_as: builder.stored_as,
+            options: builder.options.unwrap_or(vec![]),
         }))
     }
 
@@ -624,15 +708,6 @@ impl<'a> DFParser<'a> {
         }
         let mut builder = Builder::default();
 
-        fn ensure_not_set<T>(field: &Option<T>, name: &str) -> Result<(), 
ParserError> {
-            if field.is_some() {
-                return Err(ParserError::ParserError(format!(
-                    "{name} specified more than once",
-                )));
-            }
-            Ok(())
-        }
-
         loop {
             if let Some(keyword) = self.parser.parse_one_of_keywords(&[
                 Keyword::STORED,
@@ -1321,10 +1396,13 @@ mod tests {
     #[test]
     fn copy_to_table_to_table() -> Result<(), ParserError> {
         // positive case
-        let sql = "COPY foo TO bar";
+        let sql = "COPY foo TO bar STORED AS CSV";
         let expected = Statement::CopyTo(CopyToStatement {
             source: object_name("foo"),
             target: "bar".to_string(),
+            partitioned_by: vec![],
+            has_header: false,
+            stored_as: Some("CSV".to_owned()),
             options: vec![],
         });
 
@@ -1335,10 +1413,22 @@ mod tests {
     #[test]
     fn explain_copy_to_table_to_table() -> Result<(), ParserError> {
         let cases = vec![
-            ("EXPLAIN COPY foo TO bar", false, false),
-            ("EXPLAIN ANALYZE COPY foo TO bar", true, false),
-            ("EXPLAIN VERBOSE COPY foo TO bar", false, true),
-            ("EXPLAIN ANALYZE VERBOSE COPY foo TO bar", true, true),
+            ("EXPLAIN COPY foo TO bar STORED AS PARQUET", false, false),
+            (
+                "EXPLAIN ANALYZE COPY foo TO bar STORED AS PARQUET",
+                true,
+                false,
+            ),
+            (
+                "EXPLAIN VERBOSE COPY foo TO bar STORED AS PARQUET",
+                false,
+                true,
+            ),
+            (
+                "EXPLAIN ANALYZE VERBOSE COPY foo TO bar STORED AS PARQUET",
+                true,
+                true,
+            ),
         ];
         for (sql, analyze, verbose) in cases {
             println!("sql: {sql}, analyze: {analyze}, verbose: {verbose}");
@@ -1346,6 +1436,9 @@ mod tests {
             let expected_copy = Statement::CopyTo(CopyToStatement {
                 source: object_name("foo"),
                 target: "bar".to_string(),
+                partitioned_by: vec![],
+                has_header: false,
+                stored_as: Some("PARQUET".to_owned()),
                 options: vec![],
             });
             let expected = Statement::Explain(ExplainStatement {
@@ -1375,10 +1468,13 @@ mod tests {
             panic!("Expected query, got {statement:?}");
         };
 
-        let sql = "COPY (SELECT 1) TO bar";
+        let sql = "COPY (SELECT 1) TO bar STORED AS CSV WITH HEADER ROW";
         let expected = Statement::CopyTo(CopyToStatement {
             source: CopyToSource::Query(query),
             target: "bar".to_string(),
+            partitioned_by: vec![],
+            has_header: true,
+            stored_as: Some("CSV".to_owned()),
             options: vec![],
         });
         assert_eq!(verified_stmt(sql), expected);
@@ -1387,10 +1483,31 @@ mod tests {
 
     #[test]
     fn copy_to_options() -> Result<(), ParserError> {
-        let sql = "COPY foo TO bar (row_group_size 55)";
+        let sql = "COPY foo TO bar STORED AS CSV OPTIONS (row_group_size 55)";
+        let expected = Statement::CopyTo(CopyToStatement {
+            source: object_name("foo"),
+            target: "bar".to_string(),
+            partitioned_by: vec![],
+            has_header: false,
+            stored_as: Some("CSV".to_owned()),
+            options: vec![(
+                "row_group_size".to_string(),
+                Value::Number("55".to_string(), false),
+            )],
+        });
+        assert_eq!(verified_stmt(sql), expected);
+        Ok(())
+    }
+
+    #[test]
+    fn copy_to_partitioned_by() -> Result<(), ParserError> {
+        let sql = "COPY foo TO bar STORED AS CSV PARTITIONED BY (a) OPTIONS 
(row_group_size 55)";
         let expected = Statement::CopyTo(CopyToStatement {
             source: object_name("foo"),
             target: "bar".to_string(),
+            partitioned_by: vec!["a".to_string()],
+            has_header: false,
+            stored_as: Some("CSV".to_owned()),
             options: vec![(
                 "row_group_size".to_string(),
                 Value::Number("55".to_string(), false),
@@ -1404,24 +1521,24 @@ mod tests {
     fn copy_to_multi_options() -> Result<(), ParserError> {
         // order of options is preserved
         let sql =
-            "COPY foo TO bar (format parquet, row_group_size 55, compression 
snappy)";
+            "COPY foo TO bar STORED AS parquet OPTIONS 
('format.row_group_size' 55, 'format.compression' snappy)";
 
         let expected_options = vec![
             (
-                "format".to_string(),
-                Value::UnQuotedString("parquet".to_string()),
-            ),
-            (
-                "row_group_size".to_string(),
+                "format.row_group_size".to_string(),
                 Value::Number("55".to_string(), false),
             ),
             (
-                "compression".to_string(),
+                "format.compression".to_string(),
                 Value::UnQuotedString("snappy".to_string()),
             ),
         ];
 
-        let options = if let Statement::CopyTo(copy_to) = verified_stmt(sql) {
+        let mut statements = DFParser::parse_sql(sql).unwrap();
+        assert_eq!(statements.len(), 1);
+        let only_statement = statements.pop_front().unwrap();
+
+        let options = if let Statement::CopyTo(copy_to) = only_statement {
             copy_to.options
         } else {
             panic!("Expected copy");
@@ -1460,7 +1577,10 @@ mod tests {
         }
 
         let only_statement = statements.pop_front().unwrap();
-        assert_eq!(canonical, only_statement.to_string());
+        assert_eq!(
+            canonical.to_uppercase(),
+            only_statement.to_string().to_uppercase()
+        );
         only_statement
     }
 
diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs
index 412c3b753e..e50aceb757 100644
--- a/datafusion/sql/src/statement.rs
+++ b/datafusion/sql/src/statement.rs
@@ -813,20 +813,21 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
     fn copy_to_plan(&self, statement: CopyToStatement) -> Result<LogicalPlan> {
         // determine if source is table or query and handle accordingly
         let copy_source = statement.source;
-        let input = match copy_source {
+        let (input, input_schema, table_ref) = match copy_source {
             CopyToSource::Relation(object_name) => {
-                let table_ref =
-                    self.object_name_to_table_reference(object_name.clone())?;
-                let table_source = 
self.context_provider.get_table_source(table_ref)?;
-                LogicalPlanBuilder::scan(
-                    object_name_to_string(&object_name),
-                    table_source,
-                    None,
-                )?
-                .build()?
+                let table_name = object_name_to_string(&object_name);
+                let table_ref = 
self.object_name_to_table_reference(object_name)?;
+                let table_source =
+                    self.context_provider.get_table_source(table_ref.clone())?;
+                let plan =
+                    LogicalPlanBuilder::scan(table_name, table_source, 
None)?.build()?;
+                let input_schema = plan.schema().clone();
+                (plan, input_schema, Some(table_ref))
             }
             CopyToSource::Query(query) => {
-                self.query_to_plan(query, &mut PlannerContext::new())?
+                let plan = self.query_to_plan(query, &mut 
PlannerContext::new())?;
+                let input_schema = plan.schema().clone();
+                (plan, input_schema, None)
             }
         };
 
@@ -852,8 +853,41 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
             options.insert(key.to_lowercase(), value_string.to_lowercase());
         }
 
-        let file_type = try_infer_file_type(&mut options, &statement.target)?;
-        let partition_by = take_partition_by(&mut options);
+        let file_type = if let Some(file_type) = statement.stored_as {
+            FileType::from_str(&file_type).map_err(|_| {
+                DataFusionError::Configuration(format!("Unknown FileType {}", 
file_type))
+            })?
+        } else {
+            let e = || {
+                DataFusionError::Configuration(
+                "Format not explicitly set and unable to get file extension! 
Use STORED AS to define file format."
+                    .to_string(),
+            )
+            };
+            // try to infer file format from file extension
+            let extension: &str = &Path::new(&statement.target)
+                .extension()
+                .ok_or_else(e)?
+                .to_str()
+                .ok_or_else(e)?
+                .to_lowercase();
+
+            FileType::from_str(extension).map_err(|e| {
+                DataFusionError::Configuration(format!(
+                    "{}. Use STORED AS to define file format.",
+                    e
+                ))
+            })?
+        };
+
+        let partition_by = statement
+            .partitioned_by
+            .iter()
+            .map(|col| input_schema.field_with_name(table_ref.as_ref(), col))
+            .collect::<Result<Vec<_>>>()?
+            .into_iter()
+            .map(|f| f.name().to_owned())
+            .collect();
 
         Ok(LogicalPlan::Copy(CopyTo {
             input: Arc::new(input),
@@ -1469,82 +1503,3 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
             .is_ok()
     }
 }
-
-/// Infers the file type for a given target based on provided options or file 
extension.
-///
-/// This function tries to determine the file type based on the 'format' 
option present
-/// in the provided options hashmap. If 'format' is not explicitly set, the 
function attempts
-/// to infer the file type from the file extension of the target. It returns 
an error if neither
-/// the format option is set nor the file extension can be determined or 
parsed.
-///
-/// # Arguments
-///
-/// * `options` - A mutable reference to a HashMap containing options where 
the file format
-/// might be specified under the 'format' key.
-/// * `target` - A string slice representing the path to the file for which 
the file type needs to be inferred.
-///
-/// # Returns
-///
-/// Returns `Result<FileType>` which is Ok if the file type could be 
successfully inferred,
-/// otherwise returns an error in case of failure to determine or parse the 
file format or extension.
-///
-/// # Errors
-///
-/// This function returns an error in two cases:
-/// - If the 'format' option is not set and the file extension cannot be 
retrieved from `target`.
-/// - If the file extension is found but cannot be converted into a valid 
string.
-///
-pub fn try_infer_file_type(
-    options: &mut HashMap<String, String>,
-    target: &str,
-) -> Result<FileType> {
-    let explicit_format = options.remove("format");
-    let format = match explicit_format {
-        Some(s) => FileType::from_str(&s),
-        None => {
-            // try to infer file format from file extension
-            let extension: &str = &Path::new(target)
-                .extension()
-                .ok_or(DataFusionError::Configuration(
-                    "Format not explicitly set and unable to get file 
extension!"
-                        .to_string(),
-                ))?
-                .to_str()
-                .ok_or(DataFusionError::Configuration(
-                    "Format not explicitly set and failed to parse file 
extension!"
-                        .to_string(),
-                ))?
-                .to_lowercase();
-
-            FileType::from_str(extension)
-        }
-    }?;
-
-    Ok(format)
-}
-
-/// Extracts and parses the 'partition_by' option from a provided options 
hashmap.
-///
-/// This function looks for a 'partition_by' key in the options hashmap. If 
found,
-/// it splits the value by commas, trims each resulting string, and replaces 
double
-/// single quotes with a single quote. It returns a vector of partition column 
names.
-///
-/// # Arguments
-///
-/// * `options` - A mutable reference to a HashMap containing options where 
'partition_by'
-/// might be specified.
-///
-/// # Returns
-///
-/// Returns a `Vec<String>` containing partition column names. If the 
'partition_by' option
-/// is not present, returns an empty vector.
-pub fn take_partition_by(options: &mut HashMap<String, String>) -> Vec<String> 
{
-    let partition_by = options.remove("partition_by");
-    match partition_by {
-        Some(part_cols) => part_cols
-            .split(',')
-            .map(|s| s.trim().replace("''", "'"))
-            .collect::<Vec<_>>(),
-        None => vec![],
-    }
-}
diff --git a/datafusion/sql/tests/sql_integration.rs 
b/datafusion/sql/tests/sql_integration.rs
index b6077353e5..6d335f1f8f 100644
--- a/datafusion/sql/tests/sql_integration.rs
+++ b/datafusion/sql/tests/sql_integration.rs
@@ -22,25 +22,23 @@ use std::{sync::Arc, vec};
 
 use arrow_schema::TimeUnit::Nanosecond;
 use arrow_schema::*;
-use datafusion_sql::planner::PlannerContext;
-use datafusion_sql::unparser::{expr_to_sql, plan_to_sql};
-use sqlparser::dialect::{Dialect, GenericDialect, HiveDialect, MySqlDialect};
-
+use datafusion_common::config::ConfigOptions;
 use datafusion_common::{
-    config::ConfigOptions, DataFusionError, Result, ScalarValue, 
TableReference,
+    plan_err, DFSchema, DataFusionError, ParamValues, Result, ScalarValue, 
TableReference,
 };
-use datafusion_common::{plan_err, DFSchema, ParamValues};
 use datafusion_expr::{
     logical_plan::{LogicalPlan, Prepare},
     AggregateUDF, ColumnarValue, ScalarUDF, ScalarUDFImpl, Signature, 
TableSource,
     Volatility, WindowUDF,
 };
+use datafusion_sql::unparser::{expr_to_sql, plan_to_sql};
 use datafusion_sql::{
     parser::DFParser,
-    planner::{ContextProvider, ParserOptions, SqlToRel},
+    planner::{ContextProvider, ParserOptions, PlannerContext, SqlToRel},
 };
 
 use rstest::rstest;
+use sqlparser::dialect::{Dialect, GenericDialect, HiveDialect, MySqlDialect};
 use sqlparser::parser::Parser;
 
 #[test]
@@ -389,7 +387,7 @@ fn plan_rollback_transaction_chained() {
 
 #[test]
 fn plan_copy_to() {
-    let sql = "COPY test_decimal to 'output.csv'";
+    let sql = "COPY test_decimal to 'output.csv' STORED AS CSV";
     let plan = r#"
 CopyTo: format=csv output_url=output.csv options: ()
   TableScan: test_decimal
@@ -410,6 +408,18 @@ Explain
     quick_test(sql, plan);
 }
 
+#[test]
+fn plan_explain_copy_to_format() {
+    let sql = "EXPLAIN COPY test_decimal to 'output.tbl' STORED AS CSV";
+    let plan = r#"
+Explain
+  CopyTo: format=csv output_url=output.tbl options: ()
+    TableScan: test_decimal
+    "#
+    .trim();
+    quick_test(sql, plan);
+}
+
 #[test]
 fn plan_copy_to_query() {
     let sql = "COPY (select * from test_decimal limit 10) to 'output.csv'";
diff --git a/datafusion/sqllogictest/test_files/copy.slt 
b/datafusion/sqllogictest/test_files/copy.slt
index df23a993eb..4d4f596d0c 100644
--- a/datafusion/sqllogictest/test_files/copy.slt
+++ b/datafusion/sqllogictest/test_files/copy.slt
@@ -21,13 +21,13 @@ create table source_table(col1 integer, col2 varchar) as 
values (1, 'Foo'), (2,
 
 # Copy to directory as multiple files
 query IT
-COPY source_table TO 'test_files/scratch/copy/table/' (format parquet, 
'parquet.compression' 'zstd(10)');
+COPY source_table TO 'test_files/scratch/copy/table/' STORED AS parquet 
OPTIONS ('format.compression' 'zstd(10)');
 ----
 2
 
 # Copy to directory as partitioned files
 query IT
-COPY source_table TO 'test_files/scratch/copy/partitioned_table1/' (format 
parquet, 'parquet.compression' 'zstd(10)', partition_by 'col2');
+COPY source_table TO 'test_files/scratch/copy/partitioned_table1/' STORED AS 
parquet PARTITIONED BY (col2) OPTIONS ('format.compression' 'zstd(10)');
 ----
 2
 
@@ -54,8 +54,8 @@ select * from validate_partitioned_parquet_bar order by col1;
 
 # Copy to directory as partitioned files
 query ITT
-COPY (values (1, 'a', 'x'), (2, 'b', 'y'), (3, 'c', 'z')) TO 
'test_files/scratch/copy/partitioned_table2/' 
-(format parquet, partition_by 'column2, column3', 'parquet.compression' 
'zstd(10)');
+COPY (values (1, 'a', 'x'), (2, 'b', 'y'), (3, 'c', 'z')) TO 
'test_files/scratch/copy/partitioned_table2/' STORED AS parquet PARTITIONED BY 
(column2, column3)
+OPTIONS ('format.compression' 'zstd(10)');
 ----
 3
 
@@ -82,8 +82,8 @@ select * from validate_partitioned_parquet_a_x order by 
column1;
 
 # Copy to directory as partitioned files
 query TTT
-COPY (values ('1', 'a', 'x'), ('2', 'b', 'y'), ('3', 'c', 'z')) TO 
'test_files/scratch/copy/partitioned_table3/' 
-(format parquet, 'parquet.compression' 'zstd(10)', partition_by 'column1, 
column3');
+COPY (values ('1', 'a', 'x'), ('2', 'b', 'y'), ('3', 'c', 'z')) TO 
'test_files/scratch/copy/partitioned_table3/' STORED AS parquet PARTITIONED BY 
(column1, column3)
+OPTIONS ('format.compression' 'zstd(10)');
 ----
 3
 
@@ -111,49 +111,52 @@ a
 statement ok
 create table test ("'test'" varchar, "'test2'" varchar, "'test3'" varchar); 
 
-query TTT
-insert into test VALUES ('a', 'x', 'aa'), ('b','y', 'bb'), ('c', 'z', 'cc')
-----
-3
-
-query T
-select "'test'" from test
-----
-a
-b
-c
-
-# Note to place a single ' inside of a literal string escape by putting two ''
-query TTT
-copy test to 'test_files/scratch/copy/escape_quote' (format csv, partition_by 
'''test2'',''test3''')
-----
-3
-
-statement ok
-CREATE EXTERNAL TABLE validate_partitioned_escape_quote STORED AS CSV 
-LOCATION 'test_files/scratch/copy/escape_quote/' PARTITIONED BY ("'test2'", 
"'test3'");
-
+## Until the partition by parsing uses ColumnDef, this test is meaningless 
since it becomes an overfit. Even in
+## CREATE EXTERNAL TABLE, there is a schema mismatch, this should be an issue.
+#
+#query TTT
+#insert into test VALUES ('a', 'x', 'aa'), ('b','y', 'bb'), ('c', 'z', 'cc')
+#----
+#3
+#
+#query T
+#select "'test'" from test
+#----
+#a
+#b
+#c
+#
+# # Note to place a single ' inside of a literal string escape by putting two 
''
+#query TTT
+#copy test to 'test_files/scratch/copy/escape_quote' STORED AS CSV;
+#----
+#3
+#
+#statement ok
+#CREATE EXTERNAL TABLE validate_partitioned_escape_quote STORED AS CSV
+#LOCATION 'test_files/scratch/copy/escape_quote/' PARTITIONED BY ("'test2'", 
"'test3'");
+#
 # This triggers a panic (index out of bounds)
 # https://github.com/apache/arrow-datafusion/issues/9269
 #query
 #select * from validate_partitioned_escape_quote;
 
 query TT
-EXPLAIN COPY source_table TO 'test_files/scratch/copy/table/' (format parquet, 
'parquet.compression' 'zstd(10)');
+EXPLAIN COPY source_table TO 'test_files/scratch/copy/table/' STORED AS 
PARQUET OPTIONS ('format.compression' 'zstd(10)');
 ----
 logical_plan
-CopyTo: format=parquet output_url=test_files/scratch/copy/table/ options: 
(parquet.compression zstd(10))
+CopyTo: format=parquet output_url=test_files/scratch/copy/table/ options: 
(format.compression zstd(10))
 --TableScan: source_table projection=[col1, col2]
 physical_plan
 FileSinkExec: sink=ParquetSink(file_groups=[])
 --MemoryExec: partitions=1, partition_sizes=[1]
 
 # Error case
-query error DataFusion error: Invalid or Unsupported Configuration: Format not 
explicitly set and unable to get file extension!
+query error DataFusion error: Invalid or Unsupported Configuration: Format not 
explicitly set and unable to get file extension! Use STORED AS to define file 
format.
 EXPLAIN COPY source_table to 'test_files/scratch/copy/table/'
 
 query TT
-EXPLAIN COPY source_table to 'test_files/scratch/copy/table/' (format parquet)
+EXPLAIN COPY source_table to 'test_files/scratch/copy/table/' STORED AS PARQUET
 ----
 logical_plan
 CopyTo: format=parquet output_url=test_files/scratch/copy/table/ options: ()
@@ -164,7 +167,7 @@ FileSinkExec: sink=ParquetSink(file_groups=[])
 
 # Copy more files to directory via query
 query IT
-COPY (select * from source_table UNION ALL select * from source_table) to 
'test_files/scratch/copy/table/' (format parquet);
+COPY (select * from source_table UNION ALL select * from source_table) to 
'test_files/scratch/copy/table/' STORED AS PARQUET;
 ----
 4
 
@@ -185,7 +188,7 @@ select * from validate_parquet;
 query ?
 copy (values (struct(timestamp '2021-01-01 01:00:01', 1)), (struct(timestamp 
'2022-01-01 01:00:01', 2)), 
 (struct(timestamp '2023-01-03 01:00:01', 3)), (struct(timestamp '2024-01-01 
01:00:01', 4)))
-to 'test_files/scratch/copy/table_nested2/' (format parquet);
+to 'test_files/scratch/copy/table_nested2/' STORED AS PARQUET;
 ----
 4
 
@@ -204,7 +207,7 @@ query ??
 COPY 
 (values (struct ('foo', (struct ('foo', make_array(struct('a',1), 
struct('b',2))))), make_array(timestamp '2023-01-01 01:00:01',timestamp 
'2023-01-01 01:00:01')), 
 (struct('bar', (struct ('foo', make_array(struct('aa',10), 
struct('bb',20))))), make_array(timestamp '2024-01-01 01:00:01', timestamp 
'2024-01-01 01:00:01'))) 
-to 'test_files/scratch/copy/table_nested/' (format parquet);
+to 'test_files/scratch/copy/table_nested/' STORED AS PARQUET;
 ----
 2
 
@@ -221,7 +224,7 @@ select * from validate_parquet_nested;
 query ?
 copy (values ([struct('foo', 1), struct('bar', 2)])) 
 to 'test_files/scratch/copy/array_of_struct/'
-(format parquet);
+STORED AS PARQUET;
 ----
 1
 
@@ -236,8 +239,7 @@ select * from validate_array_of_struct;
 
 query ?
 copy (values (struct('foo', [1,2,3], struct('bar', [2,3,4])))) 
-to 'test_files/scratch/copy/struct_with_array/'
-(format parquet);
+to 'test_files/scratch/copy/struct_with_array/' STORED AS PARQUET;
 ----
 1
 
@@ -255,31 +257,32 @@ select * from validate_struct_with_array;
 query IT
 COPY source_table
 TO 'test_files/scratch/copy/table_with_options/'
-(format parquet,
-'parquet.compression' snappy,
-'parquet.compression::col1' 'zstd(5)',
-'parquet.compression::col2' snappy,
-'parquet.max_row_group_size' 12345,
-'parquet.data_pagesize_limit' 1234,
-'parquet.write_batch_size' 1234,
-'parquet.writer_version' 2.0,
-'parquet.dictionary_page_size_limit' 123,
-'parquet.created_by' 'DF copy.slt',
-'parquet.column_index_truncate_length' 123,
-'parquet.data_page_row_count_limit' 1234,
-'parquet.bloom_filter_enabled' true,
-'parquet.bloom_filter_enabled::col1' false,
-'parquet.bloom_filter_fpp::col2' 0.456,
-'parquet.bloom_filter_ndv::col2' 456,
-'parquet.encoding' plain,
-'parquet.encoding::col1' DELTA_BINARY_PACKED,
-'parquet.dictionary_enabled::col2' true,
-'parquet.dictionary_enabled' false,
-'parquet.statistics_enabled' page,
-'parquet.statistics_enabled::col2' none,
-'parquet.max_statistics_size' 123,
-'parquet.bloom_filter_fpp' 0.001,
-'parquet.bloom_filter_ndv' 100
+STORED AS PARQUET
+OPTIONS (
+'format.compression' snappy,
+'format.compression::col1' 'zstd(5)',
+'format.compression::col2' snappy,
+'format.max_row_group_size' 12345,
+'format.data_pagesize_limit' 1234,
+'format.write_batch_size' 1234,
+'format.writer_version' 2.0,
+'format.dictionary_page_size_limit' 123,
+'format.created_by' 'DF copy.slt',
+'format.column_index_truncate_length' 123,
+'format.data_page_row_count_limit' 1234,
+'format.bloom_filter_enabled' true,
+'format.bloom_filter_enabled::col1' false,
+'format.bloom_filter_fpp::col2' 0.456,
+'format.bloom_filter_ndv::col2' 456,
+'format.encoding' plain,
+'format.encoding::col1' DELTA_BINARY_PACKED,
+'format.dictionary_enabled::col2' true,
+'format.dictionary_enabled' false,
+'format.statistics_enabled' page,
+'format.statistics_enabled::col2' none,
+'format.max_statistics_size' 123,
+'format.bloom_filter_fpp' 0.001,
+'format.bloom_filter_ndv' 100
 )
 ----
 2
@@ -312,7 +315,7 @@ select * from validate_parquet_single;
 
 # copy from table to folder of compressed json files
 query IT
-COPY source_table  to 'test_files/scratch/copy/table_json_gz' (format json, 
'json.compression' gzip);
+COPY source_table  to 'test_files/scratch/copy/table_json_gz' STORED AS JSON 
OPTIONS ('format.compression' gzip);
 ----
 2
 
@@ -328,7 +331,7 @@ select * from validate_json_gz;
 
 # copy from table to folder of compressed csv files
 query IT
-COPY source_table  to 'test_files/scratch/copy/table_csv' (format csv, 
'csv.has_header' false, 'csv.compression' gzip);
+COPY source_table  to 'test_files/scratch/copy/table_csv' STORED AS CSV 
OPTIONS ('format.has_header' false, 'format.compression' gzip);
 ----
 2
 
@@ -360,7 +363,7 @@ select * from validate_single_csv;
 
 # Copy from table to folder of json
 query IT
-COPY source_table to 'test_files/scratch/copy/table_json' (format json);
+COPY source_table to 'test_files/scratch/copy/table_json' STORED AS JSON;
 ----
 2
 
@@ -376,7 +379,7 @@ select * from validate_json;
 
 # Copy from table to single json file
 query IT
-COPY source_table  to 'test_files/scratch/copy/table.json';
+COPY source_table  to 'test_files/scratch/copy/table.json' STORED AS JSON ;
 ----
 2
 
@@ -394,12 +397,12 @@ select * from validate_single_json;
 query IT
 COPY source_table
 to 'test_files/scratch/copy/table_csv_with_options'
-(format csv,
-'csv.has_header' false,
-'csv.compression' uncompressed,
-'csv.datetime_format' '%FT%H:%M:%S.%9f',
-'csv.delimiter' ';',
-'csv.null_value' 'NULLVAL');
+STORED AS CSV OPTIONS (
+'format.has_header' false,
+'format.compression' uncompressed,
+'format.datetime_format' '%FT%H:%M:%S.%9f',
+'format.delimiter' ';',
+'format.null_value' 'NULLVAL');
 ----
 2
 
@@ -417,7 +420,7 @@ select * from validate_csv_with_options;
 
 # Copy from table to single arrow file
 query IT
-COPY source_table to 'test_files/scratch/copy/table.arrow';
+COPY source_table to 'test_files/scratch/copy/table.arrow' STORED AS ARROW;
 ----
 2
 
@@ -437,7 +440,7 @@ select * from validate_arrow_file;
 query T?
 COPY (values 
 ('c', arrow_cast('foo', 'Dictionary(Int32, Utf8)')), ('d', arrow_cast('bar', 
'Dictionary(Int32, Utf8)'))) 
-to 'test_files/scratch/copy/table_dict.arrow';
+to 'test_files/scratch/copy/table_dict.arrow' STORED AS ARROW;
 ----
 2
 
@@ -456,7 +459,7 @@ d bar
 
 # Copy from table to folder of json
 query IT
-COPY source_table to 'test_files/scratch/copy/table_arrow' (format arrow);
+COPY source_table to 'test_files/scratch/copy/table_arrow' STORED AS ARROW;
 ----
 2
 
@@ -475,12 +478,12 @@ select * from validate_arrow;
 
 # Copy from table with options
 query error DataFusion error: Invalid or Unsupported Configuration: Config 
value "row_group_size" not found on JsonOptions
-COPY source_table  to 'test_files/scratch/copy/table.json' 
('json.row_group_size' 55);
+COPY source_table  to 'test_files/scratch/copy/table.json' STORED AS JSON 
OPTIONS ('format.row_group_size' 55);
 
 # Incomplete statement
 query error DataFusion error: SQL error: ParserError\("Expected \), found: 
EOF"\)
 COPY (select col2, sum(col1) from source_table
 
 # Copy from table with non literal
-query error DataFusion error: SQL error: ParserError\("Expected ',' or '\)' 
after option definition, found: \+"\)
+query error DataFusion error: SQL error: ParserError\("Unexpected token \("\)
 COPY source_table  to '/tmp/table.parquet' (row_group_size 55 + 102);
diff --git a/datafusion/sqllogictest/test_files/create_external_table.slt 
b/datafusion/sqllogictest/test_files/create_external_table.slt
index 3b85dd9e98..c4a26a5e22 100644
--- a/datafusion/sqllogictest/test_files/create_external_table.slt
+++ b/datafusion/sqllogictest/test_files/create_external_table.slt
@@ -101,8 +101,8 @@ statement error DataFusion error: SQL error: 
ParserError\("Unexpected token FOOB
 CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV FOOBAR BARBAR BARFOO LOCATION 
'foo.csv';
 
 # Conflicting options
-statement error DataFusion error: Invalid or Unsupported Configuration: Key 
"parquet.column_index_truncate_length" is not applicable for CSV format
+statement error DataFusion error: Invalid or Unsupported Configuration: Config 
value "column_index_truncate_length" not found on CsvOptions
 CREATE EXTERNAL TABLE csv_table (column1 int)
 STORED AS CSV
 LOCATION 'foo.csv'
-OPTIONS ('csv.delimiter' ';', 'parquet.column_index_truncate_length' '123')
+OPTIONS ('format.delimiter' ';', 'format.column_index_truncate_length' '123')
diff --git a/datafusion/sqllogictest/test_files/csv_files.slt 
b/datafusion/sqllogictest/test_files/csv_files.slt
index 7b299c0cf1..ab6847afb6 100644
--- a/datafusion/sqllogictest/test_files/csv_files.slt
+++ b/datafusion/sqllogictest/test_files/csv_files.slt
@@ -23,7 +23,7 @@ c2 VARCHAR
 ) STORED AS CSV
 WITH HEADER ROW
 DELIMITER ','
-OPTIONS ('csv.quote' '~')
+OPTIONS ('format.quote' '~')
 LOCATION '../core/tests/data/quote.csv';
 
 statement ok
@@ -33,7 +33,7 @@ c2 VARCHAR
 ) STORED AS CSV
 WITH HEADER ROW
 DELIMITER ','
-OPTIONS ('csv.escape' '\')
+OPTIONS ('format.escape' '\')
 LOCATION '../core/tests/data/escape.csv';
 
 query TT
@@ -71,7 +71,7 @@ c2 VARCHAR
 ) STORED AS CSV
 WITH HEADER ROW
 DELIMITER ','
-OPTIONS ('csv.escape' '"')
+OPTIONS ('format.escape' '"')
 LOCATION '../core/tests/data/escape.csv';
 
 # TODO: Validate this with better data.
@@ -117,14 +117,14 @@ CREATE TABLE src_table_2 (
 
 query ITII
 COPY  src_table_1 TO 'test_files/scratch/csv_files/csv_partitions/1.csv'
-(FORMAT CSV);
+STORED AS CSV;
 ----
 4
 
 
 query ITII
 COPY  src_table_2 TO 'test_files/scratch/csv_files/csv_partitions/2.csv'
-(FORMAT CSV);
+STORED AS CSV;
 ----
 4
 
diff --git a/datafusion/sqllogictest/test_files/group_by.slt 
b/datafusion/sqllogictest/test_files/group_by.slt
index 3d9f8ff3ad..869462b472 100644
--- a/datafusion/sqllogictest/test_files/group_by.slt
+++ b/datafusion/sqllogictest/test_files/group_by.slt
@@ -4506,28 +4506,28 @@ CREATE TABLE src_table (
 query PI
 COPY (SELECT * FROM src_table)
 TO 'test_files/scratch/group_by/timestamp_table/0.csv'
-(FORMAT CSV);
+STORED AS CSV;
 ----
 10
 
 query PI
 COPY (SELECT * FROM src_table)
 TO 'test_files/scratch/group_by/timestamp_table/1.csv'
-(FORMAT CSV);
+STORED AS CSV;
 ----
 10
 
 query PI
 COPY (SELECT * FROM src_table)
 TO 'test_files/scratch/group_by/timestamp_table/2.csv'
-(FORMAT CSV);
+STORED AS CSV;
 ----
 10
 
 query PI
 COPY (SELECT * FROM src_table)
 TO 'test_files/scratch/group_by/timestamp_table/3.csv'
-(FORMAT CSV);
+STORED AS CSV;
 ----
 10
 
diff --git a/datafusion/sqllogictest/test_files/parquet.slt 
b/datafusion/sqllogictest/test_files/parquet.slt
index b7cd1243cb..3cc52666d5 100644
--- a/datafusion/sqllogictest/test_files/parquet.slt
+++ b/datafusion/sqllogictest/test_files/parquet.slt
@@ -45,7 +45,7 @@ CREATE TABLE src_table (
 query ITID
 COPY (SELECT * FROM src_table LIMIT 3)
 TO 'test_files/scratch/parquet/test_table/0.parquet'
-(FORMAT PARQUET);
+STORED AS PARQUET;
 ----
 3
 
@@ -53,7 +53,7 @@ TO 'test_files/scratch/parquet/test_table/0.parquet'
 query ITID
 COPY (SELECT * FROM src_table WHERE int_col > 3 LIMIT 3)
 TO 'test_files/scratch/parquet/test_table/1.parquet'
-(FORMAT PARQUET);
+STORED AS PARQUET;
 ----
 3
 
@@ -128,7 +128,7 @@ SortPreservingMergeExec: [string_col@1 ASC NULLS 
LAST,int_col@0 ASC NULLS LAST]
 query ITID
 COPY (SELECT * FROM src_table WHERE int_col > 6 LIMIT 3)
 TO 'test_files/scratch/parquet/test_table/2.parquet'
-(FORMAT PARQUET);
+STORED AS PARQUET;
 ----
 3
 
@@ -281,7 +281,7 @@ LIMIT 10;
 query ITID
 COPY (SELECT * FROM src_table WHERE int_col > 6 LIMIT 3)
 TO 'test_files/scratch/parquet/test_table/subdir/3.parquet'
-(FORMAT PARQUET);
+STORED AS PARQUET;
 ----
 3
 
diff --git a/datafusion/sqllogictest/test_files/repartition.slt 
b/datafusion/sqllogictest/test_files/repartition.slt
index 391a6739b0..594c52f12d 100644
--- a/datafusion/sqllogictest/test_files/repartition.slt
+++ b/datafusion/sqllogictest/test_files/repartition.slt
@@ -25,7 +25,7 @@ set datafusion.execution.target_partitions = 4;
 
 statement ok
 COPY  (VALUES (1, 2), (2, 5), (3, 2), (4, 5), (5, 0)) TO 
'test_files/scratch/repartition/parquet_table/2.parquet'
-(FORMAT PARQUET);
+STORED AS PARQUET;
 
 statement ok
 CREATE EXTERNAL TABLE parquet_table(column1 int, column2 int)
diff --git a/datafusion/sqllogictest/test_files/repartition_scan.slt 
b/datafusion/sqllogictest/test_files/repartition_scan.slt
index 15fe670a45..fe0f6c1e81 100644
--- a/datafusion/sqllogictest/test_files/repartition_scan.slt
+++ b/datafusion/sqllogictest/test_files/repartition_scan.slt
@@ -35,7 +35,7 @@ set datafusion.optimizer.repartition_file_min_size = 1;
 # Note filename 2.parquet to test sorting (on local file systems it is often 
listed before 1.parquet)
 statement ok
 COPY  (VALUES (1), (2), (3), (4), (5)) TO 
'test_files/scratch/repartition_scan/parquet_table/2.parquet'
-(FORMAT PARQUET);
+STORED AS PARQUET;
 
 statement ok
 CREATE EXTERNAL TABLE parquet_table(column1 int)
@@ -86,7 +86,7 @@ set datafusion.optimizer.enable_round_robin_repartition = 
true;
 # create a second parquet file
 statement ok
 COPY  (VALUES (100), (200)) TO 
'test_files/scratch/repartition_scan/parquet_table/1.parquet'
-(FORMAT PARQUET);
+STORED AS PARQUET;
 
 ## Still expect to see the scan read the file as "4" groups with even sizes. 
One group should read
 ## parts of both files.
@@ -158,7 +158,7 @@ DROP TABLE parquet_table_with_order;
 # create a single csv file
 statement ok
 COPY  (VALUES (1), (2), (3), (4), (5)) TO 
'test_files/scratch/repartition_scan/csv_table/1.csv'
-(FORMAT csv, 'csv.has_header' true);
+STORED AS CSV WITH HEADER ROW;
 
 statement ok
 CREATE EXTERNAL TABLE csv_table(column1 int)
@@ -202,7 +202,7 @@ DROP TABLE csv_table;
 # create a single json file
 statement ok
 COPY  (VALUES (1), (2), (3), (4), (5)) TO 
'test_files/scratch/repartition_scan/json_table/1.json'
-(FORMAT json);
+STORED AS JSON;
 
 statement ok
 CREATE EXTERNAL TABLE json_table (column1 int)
diff --git a/datafusion/sqllogictest/test_files/schema_evolution.slt 
b/datafusion/sqllogictest/test_files/schema_evolution.slt
index aee0e97edc..5572c4a5ff 100644
--- a/datafusion/sqllogictest/test_files/schema_evolution.slt
+++ b/datafusion/sqllogictest/test_files/schema_evolution.slt
@@ -31,7 +31,7 @@ COPY  (
   SELECT column1 as a, column2 as b
   FROM ( VALUES ('foo', 1), ('foo', 2), ('foo', 3) )
  )  TO 'test_files/scratch/schema_evolution/parquet_table/1.parquet'
-(FORMAT PARQUET);
+STORED AS PARQUET;
 
 
 # File2 has only b
@@ -40,7 +40,7 @@ COPY  (
   SELECT column1 as b
   FROM ( VALUES (10) )
  )  TO 'test_files/scratch/schema_evolution/parquet_table/2.parquet'
-(FORMAT PARQUET);
+STORED AS PARQUET;
 
 # File3 has a column from 'z' which does not appear in the table
 # but also values from a which do appear in the table
@@ -49,7 +49,7 @@ COPY  (
   SELECT column1 as z, column2 as a
     FROM ( VALUES ('bar', 'foo'), ('blarg', 'foo') )
  )  TO 'test_files/scratch/schema_evolution/parquet_table/3.parquet'
-(FORMAT PARQUET);
+STORED AS PARQUET;
 
 # File4 has data for b and a (reversed) and d
 statement ok
@@ -57,7 +57,7 @@ COPY  (
   SELECT column1 as b, column2 as a, column3 as c
     FROM ( VALUES (100, 'foo', 10.5), (200, 'foo', 12.6), (300, 'bzz', 13.7) )
  )  TO 'test_files/scratch/schema_evolution/parquet_table/4.parquet'
-(FORMAT PARQUET);
+STORED AS PARQUET;
 
 # The logical distribution of `a`, `b` and `c` in the files is like this:
 #
diff --git a/docs/source/user-guide/sql/dml.md 
b/docs/source/user-guide/sql/dml.md
index 405e77a21b..b9614bb8f9 100644
--- a/docs/source/user-guide/sql/dml.md
+++ b/docs/source/user-guide/sql/dml.md
@@ -49,7 +49,7 @@ Copy the contents of `source_table` to one or more Parquet 
formatted
 files in the `dir_name` directory:
 
 ```sql
-> COPY source_table TO 'dir_name' (FORMAT parquet);
+> COPY source_table TO 'dir_name' STORED AS PARQUET;
 +-------+
 | count |
 +-------+

Reply via email to