martin-g commented on code in PR #19383:
URL: https://github.com/apache/datafusion/pull/19383#discussion_r2632624444


##########
datafusion-examples/examples/sql_ops/custom_sql_parser.rs:
##########
@@ -0,0 +1,410 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This example demonstrates extending the DataFusion SQL parser to support
+//! custom DDL statements, specifically `CREATE EXTERNAL CATALOG`.
+//!
+//! ### Custom Syntax
+//! ```sql
+//! CREATE EXTERNAL CATALOG my_catalog
+//! STORED AS ICEBERG
+//! LOCATION 's3://my-bucket/warehouse/'
+//! OPTIONS (
+//!   'region' = 'us-west-2'
+//! );
+//! ```
+//!
+//! Note: For the purpose of this example, we use `local://workspace/` to
+//! automatically discover and register files from the project's test data.
+
+use std::collections::HashMap;
+use std::fmt::Display;
+use std::sync::Arc;
+
+use datafusion::catalog::{
+    CatalogProvider, MemoryCatalogProvider, MemorySchemaProvider, 
SchemaProvider,
+    TableProviderFactory,
+};
+use datafusion::datasource::listing_table_factory::ListingTableFactory;
+use datafusion::error::{DataFusionError, Result};
+use datafusion::prelude::SessionContext;
+use datafusion::sql::{
+    parser::{DFParser, DFParserBuilder, Statement},
+    sqlparser::{
+        ast::{ObjectName, Value},
+        keywords::Keyword,
+        tokenizer::Token,
+    },
+};
+use datafusion_common::{DFSchema, TableReference};
+use datafusion_expr::CreateExternalTable;
+use futures::StreamExt;
+use insta::assert_snapshot;
+use object_store::ObjectStore;
+use object_store::local::LocalFileSystem;
+
+/// Entry point for the example.
+pub async fn custom_sql_parser() -> Result<()> {
+    // Use standard Parquet testing data as our "external" source.
+    let base_path = datafusion::common::test_util::parquet_test_data();
+    let base_path = std::path::Path::new(&base_path).canonicalize()?;
+
+    // Make the path relative to the workspace root
+    let workspace_root = workspace_root();
+    let location = base_path
+        .strip_prefix(&workspace_root)
+        .map(|p| p.to_string_lossy().to_string())
+        .unwrap_or_else(|_| base_path.to_string_lossy().to_string());
+
+    let create_catalog_sql = format!(
+        "CREATE EXTERNAL CATALOG parquet_testing
+         STORED AS parquet
+         LOCATION 'local://workspace/{location}'
+         OPTIONS (
+           'schema_name' = 'staged_data',
+           'format.pruning' = 'true'
+         )"
+    );
+
+    // 
=========================================================================
+    // Part 1: Standard DataFusion parser rejects the custom DDL
+    // 
=========================================================================
+    println!("=== Part 1: Standard DataFusion Parser ===\n");
+    println!("Parsing: {}\n", create_catalog_sql.trim());
+
+    let ctx_standard = SessionContext::new();
+    let err = ctx_standard
+        .sql(&create_catalog_sql)
+        .await
+        .expect_err("Standard parser should reject CREATE EXTERNAL CATALOG");

Review Comment:
   ```suggestion
           .expect_err("Expected the standard parser to reject CREATE EXTERNAL 
CATALOG (custom DDL syntax)")
   ```
   slightly better



##########
datafusion-examples/examples/sql_ops/custom_sql_parser.rs:
##########
@@ -0,0 +1,410 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This example demonstrates extending the DataFusion SQL parser to support
+//! custom DDL statements, specifically `CREATE EXTERNAL CATALOG`.
+//!
+//! ### Custom Syntax
+//! ```sql
+//! CREATE EXTERNAL CATALOG my_catalog
+//! STORED AS ICEBERG
+//! LOCATION 's3://my-bucket/warehouse/'
+//! OPTIONS (
+//!   'region' = 'us-west-2'
+//! );
+//! ```
+//!
+//! Note: For the purpose of this example, we use `local://workspace/` to
+//! automatically discover and register files from the project's test data.
+
+use std::collections::HashMap;
+use std::fmt::Display;
+use std::sync::Arc;
+
+use datafusion::catalog::{
+    CatalogProvider, MemoryCatalogProvider, MemorySchemaProvider, 
SchemaProvider,
+    TableProviderFactory,
+};
+use datafusion::datasource::listing_table_factory::ListingTableFactory;
+use datafusion::error::{DataFusionError, Result};
+use datafusion::prelude::SessionContext;
+use datafusion::sql::{
+    parser::{DFParser, DFParserBuilder, Statement},
+    sqlparser::{
+        ast::{ObjectName, Value},
+        keywords::Keyword,
+        tokenizer::Token,
+    },
+};
+use datafusion_common::{DFSchema, TableReference};
+use datafusion_expr::CreateExternalTable;
+use futures::StreamExt;
+use insta::assert_snapshot;
+use object_store::ObjectStore;
+use object_store::local::LocalFileSystem;
+
+/// Entry point for the example.
+pub async fn custom_sql_parser() -> Result<()> {
+    // Use standard Parquet testing data as our "external" source.
+    let base_path = datafusion::common::test_util::parquet_test_data();
+    let base_path = std::path::Path::new(&base_path).canonicalize()?;
+
+    // Make the path relative to the workspace root
+    let workspace_root = workspace_root();
+    let location = base_path
+        .strip_prefix(&workspace_root)
+        .map(|p| p.to_string_lossy().to_string())
+        .unwrap_or_else(|_| base_path.to_string_lossy().to_string());
+
+    let create_catalog_sql = format!(
+        "CREATE EXTERNAL CATALOG parquet_testing
+         STORED AS parquet
+         LOCATION 'local://workspace/{location}'
+         OPTIONS (
+           'schema_name' = 'staged_data',
+           'format.pruning' = 'true'
+         )"
+    );
+
+    // 
=========================================================================
+    // Part 1: Standard DataFusion parser rejects the custom DDL
+    // 
=========================================================================
+    println!("=== Part 1: Standard DataFusion Parser ===\n");
+    println!("Parsing: {}\n", create_catalog_sql.trim());
+
+    let ctx_standard = SessionContext::new();
+    let err = ctx_standard
+        .sql(&create_catalog_sql)
+        .await
+        .expect_err("Standard parser should reject CREATE EXTERNAL CATALOG");
+
+    println!("Error: {err}\n");
+    assert_snapshot!(err.to_string(), @r#"SQL error: ParserError("Expected: 
TABLE, found: CATALOG at Line: 1, Column: 17")"#);
+
+    // 
=========================================================================
+    // Part 2: Custom parser handles the statement
+    // 
=========================================================================
+    println!("=== Part 2: Custom Parser ===\n");
+    println!("Parsing: {}\n", create_catalog_sql.trim());
+
+    let ctx = SessionContext::new();
+
+    let mut parser = CustomParser::new(&create_catalog_sql)?;
+    let statement = parser.parse_statement()?;
+    match statement {
+        CustomStatement::CreateExternalCatalog(stmt) => {
+            handle_create_external_catalog(&ctx, stmt).await?;
+        }
+        CustomStatement::DFStatement(_) => {
+            panic!("Expected CreateExternalCatalog statement");
+        }
+    }
+
+    // Query a table from the registered catalog
+    let query_sql = "SELECT id, bool_col, tinyint_col FROM 
parquet_testing.staged_data.alltypes_plain LIMIT 5";
+    println!("Executing: {query_sql}\n");
+
+    let results = execute_sql(&ctx, query_sql).await?;
+    println!("{results}");
+    assert_snapshot!(results, @r"
+    +----+----------+-------------+
+    | id | bool_col | tinyint_col |
+    +----+----------+-------------+
+    | 4  | true     | 0           |
+    | 5  | false    | 1           |
+    | 6  | true     | 0           |
+    | 7  | false    | 1           |
+    | 2  | true     | 0           |
+    +----+----------+-------------+
+    ");
+
+    Ok(())
+}
+
+/// Execute SQL and return formatted results.
+async fn execute_sql(ctx: &SessionContext, sql: &str) -> Result<String> {
+    let batches = ctx.sql(sql).await?.collect().await?;
+    Ok(arrow::util::pretty::pretty_format_batches(&batches)?.to_string())
+}
+
+/// Custom handler for the `CREATE EXTERNAL CATALOG` statement.
+async fn handle_create_external_catalog(
+    ctx: &SessionContext,
+    stmt: CreateExternalCatalog,
+) -> Result<()> {
+    let factory = ListingTableFactory::new();
+    let catalog = Arc::new(MemoryCatalogProvider::new());
+    let schema = Arc::new(MemorySchemaProvider::new());
+
+    // Extract options
+    let mut schema_name = "public".to_string();
+    let mut table_options = HashMap::new();
+
+    for (k, v) in stmt.options {
+        let val_str = match v {
+            Value::SingleQuotedString(ref s) | Value::DoubleQuotedString(ref 
s) => {
+                s.to_string()
+            }
+            Value::Number(ref n, _) => n.to_string(),
+            Value::Boolean(b) => b.to_string(),
+            _ => v.to_string(),
+        };
+
+        if k == "schema_name" {
+            schema_name = val_str;
+        } else {
+            table_options.insert(k, val_str);
+        }
+    }
+
+    println!("  Target Catalog: {}", stmt.name);
+    println!("  Data Location: {}", stmt.location);
+    println!("  Resolved Schema: {schema_name}");
+
+    // Register a local object store rooted at the workspace root.
+    // We use a specific authority 'workspace' to ensure consistent resolution.
+    let store = Arc::new(LocalFileSystem::new_with_prefix(workspace_root())?);
+    let store_url = url::Url::parse("local://workspace").unwrap();
+    ctx.register_object_store(&store_url, Arc::clone(&store) as _);
+
+    let target_ext = format!(".{}", stmt.catalog_type.to_lowercase());
+
+    // For 'local://workspace/parquet-testing/data', the path is 
'parquet-testing/data'.
+    let path_str = stmt
+        .location
+        .strip_prefix("local://workspace/")
+        .unwrap_or(&stmt.location);
+    let prefix = object_store::path::Path::from(path_str);
+
+    // Discover data files using the ObjectStore API
+    let mut table_count = 0;
+    let mut list_stream = store.list(Some(&prefix));
+
+    while let Some(meta) = list_stream.next().await {
+        let meta = meta?;
+        let path = &meta.location;
+
+        if path.as_ref().ends_with(&target_ext) {
+            let name = std::path::Path::new(path.as_ref())
+                .file_stem()
+                .unwrap()
+                .to_string_lossy()
+                .to_string();
+
+            let table_url = format!("local://workspace/{path}");
+
+            let cmd = CreateExternalTable::builder(
+                TableReference::bare(name.clone()),
+                table_url,
+                stmt.catalog_type.clone(),
+                Arc::new(DFSchema::empty()),
+            )
+            .with_options(table_options.clone())
+            .build();
+
+            if let Ok(table) = factory.create(&ctx.state(), &cmd).await {
+                schema.register_table(name, table)?;
+                table_count += 1;
+            }

Review Comment:
   eprintln!() if the creation fails ?!



##########
datafusion-examples/examples/sql_ops/custom_sql_parser.rs:
##########
@@ -0,0 +1,410 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This example demonstrates extending the DataFusion SQL parser to support
+//! custom DDL statements, specifically `CREATE EXTERNAL CATALOG`.
+//!
+//! ### Custom Syntax
+//! ```sql
+//! CREATE EXTERNAL CATALOG my_catalog
+//! STORED AS ICEBERG
+//! LOCATION 's3://my-bucket/warehouse/'
+//! OPTIONS (
+//!   'region' = 'us-west-2'
+//! );
+//! ```
+//!
+//! Note: For the purpose of this example, we use `local://workspace/` to
+//! automatically discover and register files from the project's test data.
+
+use std::collections::HashMap;
+use std::fmt::Display;
+use std::sync::Arc;
+
+use datafusion::catalog::{
+    CatalogProvider, MemoryCatalogProvider, MemorySchemaProvider, 
SchemaProvider,
+    TableProviderFactory,
+};
+use datafusion::datasource::listing_table_factory::ListingTableFactory;
+use datafusion::error::{DataFusionError, Result};
+use datafusion::prelude::SessionContext;
+use datafusion::sql::{
+    parser::{DFParser, DFParserBuilder, Statement},
+    sqlparser::{
+        ast::{ObjectName, Value},
+        keywords::Keyword,
+        tokenizer::Token,
+    },
+};
+use datafusion_common::{DFSchema, TableReference};
+use datafusion_expr::CreateExternalTable;
+use futures::StreamExt;
+use insta::assert_snapshot;
+use object_store::ObjectStore;
+use object_store::local::LocalFileSystem;
+
+/// Entry point for the example.
+pub async fn custom_sql_parser() -> Result<()> {
+    // Use standard Parquet testing data as our "external" source.
+    let base_path = datafusion::common::test_util::parquet_test_data();
+    let base_path = std::path::Path::new(&base_path).canonicalize()?;
+
+    // Make the path relative to the workspace root
+    let workspace_root = workspace_root();
+    let location = base_path
+        .strip_prefix(&workspace_root)
+        .map(|p| p.to_string_lossy().to_string())
+        .unwrap_or_else(|_| base_path.to_string_lossy().to_string());
+
+    let create_catalog_sql = format!(
+        "CREATE EXTERNAL CATALOG parquet_testing
+         STORED AS parquet
+         LOCATION 'local://workspace/{location}'
+         OPTIONS (
+           'schema_name' = 'staged_data',
+           'format.pruning' = 'true'
+         )"
+    );
+
+    // 
=========================================================================
+    // Part 1: Standard DataFusion parser rejects the custom DDL
+    // 
=========================================================================
+    println!("=== Part 1: Standard DataFusion Parser ===\n");
+    println!("Parsing: {}\n", create_catalog_sql.trim());
+
+    let ctx_standard = SessionContext::new();
+    let err = ctx_standard
+        .sql(&create_catalog_sql)
+        .await
+        .expect_err("Standard parser should reject CREATE EXTERNAL CATALOG");
+
+    println!("Error: {err}\n");
+    assert_snapshot!(err.to_string(), @r#"SQL error: ParserError("Expected: 
TABLE, found: CATALOG at Line: 1, Column: 17")"#);
+
+    // 
=========================================================================
+    // Part 2: Custom parser handles the statement
+    // 
=========================================================================
+    println!("=== Part 2: Custom Parser ===\n");
+    println!("Parsing: {}\n", create_catalog_sql.trim());
+
+    let ctx = SessionContext::new();
+
+    let mut parser = CustomParser::new(&create_catalog_sql)?;
+    let statement = parser.parse_statement()?;
+    match statement {
+        CustomStatement::CreateExternalCatalog(stmt) => {
+            handle_create_external_catalog(&ctx, stmt).await?;
+        }
+        CustomStatement::DFStatement(_) => {
+            panic!("Expected CreateExternalCatalog statement");
+        }
+    }
+
+    // Query a table from the registered catalog
+    let query_sql = "SELECT id, bool_col, tinyint_col FROM 
parquet_testing.staged_data.alltypes_plain LIMIT 5";
+    println!("Executing: {query_sql}\n");
+
+    let results = execute_sql(&ctx, query_sql).await?;
+    println!("{results}");
+    assert_snapshot!(results, @r"
+    +----+----------+-------------+
+    | id | bool_col | tinyint_col |
+    +----+----------+-------------+
+    | 4  | true     | 0           |
+    | 5  | false    | 1           |
+    | 6  | true     | 0           |
+    | 7  | false    | 1           |
+    | 2  | true     | 0           |
+    +----+----------+-------------+
+    ");
+
+    Ok(())
+}
+
+/// Execute SQL and return formatted results.
+async fn execute_sql(ctx: &SessionContext, sql: &str) -> Result<String> {
+    let batches = ctx.sql(sql).await?.collect().await?;
+    Ok(arrow::util::pretty::pretty_format_batches(&batches)?.to_string())
+}
+
+/// Custom handler for the `CREATE EXTERNAL CATALOG` statement.
+async fn handle_create_external_catalog(
+    ctx: &SessionContext,
+    stmt: CreateExternalCatalog,
+) -> Result<()> {
+    let factory = ListingTableFactory::new();
+    let catalog = Arc::new(MemoryCatalogProvider::new());
+    let schema = Arc::new(MemorySchemaProvider::new());
+
+    // Extract options
+    let mut schema_name = "public".to_string();
+    let mut table_options = HashMap::new();
+
+    for (k, v) in stmt.options {
+        let val_str = match v {
+            Value::SingleQuotedString(ref s) | Value::DoubleQuotedString(ref 
s) => {
+                s.to_string()
+            }
+            Value::Number(ref n, _) => n.to_string(),
+            Value::Boolean(b) => b.to_string(),
+            _ => v.to_string(),
+        };
+
+        if k == "schema_name" {
+            schema_name = val_str;
+        } else {
+            table_options.insert(k, val_str);
+        }
+    }
+
+    println!("  Target Catalog: {}", stmt.name);
+    println!("  Data Location: {}", stmt.location);
+    println!("  Resolved Schema: {schema_name}");
+
+    // Register a local object store rooted at the workspace root.
+    // We use a specific authority 'workspace' to ensure consistent resolution.
+    let store = Arc::new(LocalFileSystem::new_with_prefix(workspace_root())?);
+    let store_url = url::Url::parse("local://workspace").unwrap();
+    ctx.register_object_store(&store_url, Arc::clone(&store) as _);
+
+    let target_ext = format!(".{}", stmt.catalog_type.to_lowercase());
+
+    // For 'local://workspace/parquet-testing/data', the path is 
'parquet-testing/data'.
+    let path_str = stmt
+        .location
+        .strip_prefix("local://workspace/")
+        .unwrap_or(&stmt.location);
+    let prefix = object_store::path::Path::from(path_str);
+
+    // Discover data files using the ObjectStore API
+    let mut table_count = 0;
+    let mut list_stream = store.list(Some(&prefix));
+
+    while let Some(meta) = list_stream.next().await {
+        let meta = meta?;
+        let path = &meta.location;
+
+        if path.as_ref().ends_with(&target_ext) {
+            let name = std::path::Path::new(path.as_ref())
+                .file_stem()
+                .unwrap()
+                .to_string_lossy()
+                .to_string();
+
+            let table_url = format!("local://workspace/{path}");
+
+            let cmd = CreateExternalTable::builder(
+                TableReference::bare(name.clone()),
+                table_url,
+                stmt.catalog_type.clone(),
+                Arc::new(DFSchema::empty()),
+            )
+            .with_options(table_options.clone())
+            .build();
+
+            if let Ok(table) = factory.create(&ctx.state(), &cmd).await {
+                schema.register_table(name, table)?;
+                table_count += 1;
+            }
+        }
+    }
+    println!("  Registered {table_count} tables into schema: {schema_name}");
+
+    catalog.register_schema(&schema_name, schema)?;
+    ctx.register_catalog(stmt.name.to_string(), catalog);
+
+    Ok(())
+}
+
+/// Possible statements returned by our custom parser.
+#[derive(Debug, Clone)]
+pub enum CustomStatement {
+    /// Standard DataFusion statement
+    DFStatement(Box<Statement>),
+    /// Custom `CREATE EXTERNAL CATALOG` statement
+    CreateExternalCatalog(CreateExternalCatalog),
+}
+
+/// Data structure for `CREATE EXTERNAL CATALOG`.
+#[derive(Debug, Clone)]
+pub struct CreateExternalCatalog {
+    pub name: ObjectName,
+    pub catalog_type: String,
+    pub location: String,
+    pub options: Vec<(String, Value)>,
+}
+
+impl Display for CustomStatement {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        match self {
+            Self::DFStatement(s) => write!(f, "{s}"),
+            Self::CreateExternalCatalog(s) => write!(f, "{s}"),
+        }
+    }
+}
+
+impl Display for CreateExternalCatalog {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(
+            f,
+            "CREATE EXTERNAL CATALOG {} STORED AS {} LOCATION '{}'",
+            self.name, self.catalog_type, self.location
+        )?;
+        if !self.options.is_empty() {
+            write!(f, " OPTIONS (")?;
+            for (i, (k, v)) in self.options.iter().enumerate() {
+                if i > 0 {
+                    write!(f, ", ")?;
+                }
+                write!(f, "'{k}' = '{v}'")?;
+            }
+            write!(f, ")")?;
+        }
+        Ok(())
+    }
+}
+
+/// A parser that extends `DFParser` with custom syntax.
+struct CustomParser<'a> {
+    df_parser: DFParser<'a>,
+}
+
+impl<'a> CustomParser<'a> {
+    fn new(sql: &'a str) -> Result<Self> {
+        Ok(Self {
+            df_parser: DFParserBuilder::new(sql).build()?,
+        })
+    }
+
+    pub fn parse_statement(&mut self) -> Result<CustomStatement> {
+        if self.is_create_external_catalog() {
+            return self.parse_create_external_catalog();
+        }
+        Ok(CustomStatement::DFStatement(Box::new(
+            self.df_parser.parse_statement()?,
+        )))
+    }
+
+    fn is_create_external_catalog(&self) -> bool {
+        let t1 = &self.df_parser.parser.peek_nth_token(0).token;
+        let t2 = &self.df_parser.parser.peek_nth_token(1).token;
+        let t3 = &self.df_parser.parser.peek_nth_token(2).token;
+
+        matches!(t1, Token::Word(w) if w.keyword == Keyword::CREATE)
+            && matches!(t2, Token::Word(w) if w.keyword == Keyword::EXTERNAL)
+            && matches!(t3, Token::Word(w) if w.value.to_uppercase() == 
"CATALOG")
+    }
+
+    fn parse_create_external_catalog(&mut self) -> Result<CustomStatement> {
+        // Consume prefix tokens: CREATE EXTERNAL CATALOG
+        for _ in 0..3 {
+            self.df_parser.parser.next_token();
+        }
+
+        let name = self
+            .df_parser
+            .parser
+            .parse_object_name(false)
+            .map_err(|e| DataFusionError::External(Box::new(e)))?;
+
+        let mut catalog_type = None;
+        let mut location = None;
+        let mut options = vec![];
+
+        while let Some(keyword) = 
self.df_parser.parser.parse_one_of_keywords(&[
+            Keyword::STORED,
+            Keyword::LOCATION,
+            Keyword::OPTIONS,
+        ]) {
+            match keyword {
+                Keyword::STORED => {
+                    self.df_parser
+                        .parser
+                        .expect_keyword(Keyword::AS)
+                        .map_err(|e| DataFusionError::External(Box::new(e)))?;
+                    catalog_type = Some(
+                        self.df_parser
+                            .parser
+                            .parse_identifier()
+                            .map_err(|e| 
DataFusionError::External(Box::new(e)))?
+                            .value,
+                    );
+                }
+                Keyword::LOCATION => {
+                    location = Some(

Review Comment:
   ```suggestion
                       if location.is_some() {
                           return Err(DataFusionError::Plan("Duplicate LOCATION 
clause".to_string()));
                       }
                       location = Some(
   ```



##########
datafusion-examples/examples/sql_ops/custom_sql_parser.rs:
##########
@@ -0,0 +1,410 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This example demonstrates extending the DataFusion SQL parser to support
+//! custom DDL statements, specifically `CREATE EXTERNAL CATALOG`.
+//!
+//! ### Custom Syntax
+//! ```sql
+//! CREATE EXTERNAL CATALOG my_catalog
+//! STORED AS ICEBERG
+//! LOCATION 's3://my-bucket/warehouse/'
+//! OPTIONS (
+//!   'region' = 'us-west-2'
+//! );
+//! ```
+//!
+//! Note: For the purpose of this example, we use `local://workspace/` to
+//! automatically discover and register files from the project's test data.
+
+use std::collections::HashMap;
+use std::fmt::Display;
+use std::sync::Arc;
+
+use datafusion::catalog::{
+    CatalogProvider, MemoryCatalogProvider, MemorySchemaProvider, 
SchemaProvider,
+    TableProviderFactory,
+};
+use datafusion::datasource::listing_table_factory::ListingTableFactory;
+use datafusion::error::{DataFusionError, Result};
+use datafusion::prelude::SessionContext;
+use datafusion::sql::{
+    parser::{DFParser, DFParserBuilder, Statement},
+    sqlparser::{
+        ast::{ObjectName, Value},
+        keywords::Keyword,
+        tokenizer::Token,
+    },
+};
+use datafusion_common::{DFSchema, TableReference};
+use datafusion_expr::CreateExternalTable;
+use futures::StreamExt;
+use insta::assert_snapshot;
+use object_store::ObjectStore;
+use object_store::local::LocalFileSystem;
+
+/// Entry point for the example.
+pub async fn custom_sql_parser() -> Result<()> {
+    // Use standard Parquet testing data as our "external" source.
+    let base_path = datafusion::common::test_util::parquet_test_data();
+    let base_path = std::path::Path::new(&base_path).canonicalize()?;
+
+    // Make the path relative to the workspace root
+    let workspace_root = workspace_root();
+    let location = base_path
+        .strip_prefix(&workspace_root)
+        .map(|p| p.to_string_lossy().to_string())
+        .unwrap_or_else(|_| base_path.to_string_lossy().to_string());
+
+    let create_catalog_sql = format!(
+        "CREATE EXTERNAL CATALOG parquet_testing
+         STORED AS parquet
+         LOCATION 'local://workspace/{location}'
+         OPTIONS (
+           'schema_name' = 'staged_data',
+           'format.pruning' = 'true'
+         )"
+    );
+
+    // 
=========================================================================
+    // Part 1: Standard DataFusion parser rejects the custom DDL
+    // 
=========================================================================
+    println!("=== Part 1: Standard DataFusion Parser ===\n");
+    println!("Parsing: {}\n", create_catalog_sql.trim());
+
+    let ctx_standard = SessionContext::new();
+    let err = ctx_standard
+        .sql(&create_catalog_sql)
+        .await
+        .expect_err("Standard parser should reject CREATE EXTERNAL CATALOG");
+
+    println!("Error: {err}\n");
+    assert_snapshot!(err.to_string(), @r#"SQL error: ParserError("Expected: 
TABLE, found: CATALOG at Line: 1, Column: 17")"#);
+
+    // 
=========================================================================
+    // Part 2: Custom parser handles the statement
+    // 
=========================================================================
+    println!("=== Part 2: Custom Parser ===\n");
+    println!("Parsing: {}\n", create_catalog_sql.trim());
+
+    let ctx = SessionContext::new();
+
+    let mut parser = CustomParser::new(&create_catalog_sql)?;
+    let statement = parser.parse_statement()?;
+    match statement {
+        CustomStatement::CreateExternalCatalog(stmt) => {
+            handle_create_external_catalog(&ctx, stmt).await?;
+        }
+        CustomStatement::DFStatement(_) => {
+            panic!("Expected CreateExternalCatalog statement");
+        }
+    }
+
+    // Query a table from the registered catalog
+    let query_sql = "SELECT id, bool_col, tinyint_col FROM 
parquet_testing.staged_data.alltypes_plain LIMIT 5";
+    println!("Executing: {query_sql}\n");
+
+    let results = execute_sql(&ctx, query_sql).await?;
+    println!("{results}");
+    assert_snapshot!(results, @r"
+    +----+----------+-------------+
+    | id | bool_col | tinyint_col |
+    +----+----------+-------------+
+    | 4  | true     | 0           |
+    | 5  | false    | 1           |
+    | 6  | true     | 0           |
+    | 7  | false    | 1           |
+    | 2  | true     | 0           |
+    +----+----------+-------------+
+    ");
+
+    Ok(())
+}
+
+/// Execute SQL and return formatted results.
+async fn execute_sql(ctx: &SessionContext, sql: &str) -> Result<String> {
+    let batches = ctx.sql(sql).await?.collect().await?;
+    Ok(arrow::util::pretty::pretty_format_batches(&batches)?.to_string())
+}
+
+/// Custom handler for the `CREATE EXTERNAL CATALOG` statement.
+async fn handle_create_external_catalog(
+    ctx: &SessionContext,
+    stmt: CreateExternalCatalog,
+) -> Result<()> {
+    let factory = ListingTableFactory::new();
+    let catalog = Arc::new(MemoryCatalogProvider::new());
+    let schema = Arc::new(MemorySchemaProvider::new());
+
+    // Extract options
+    let mut schema_name = "public".to_string();
+    let mut table_options = HashMap::new();
+
+    for (k, v) in stmt.options {
+        let val_str = match v {
+            Value::SingleQuotedString(ref s) | Value::DoubleQuotedString(ref 
s) => {
+                s.to_string()
+            }
+            Value::Number(ref n, _) => n.to_string(),
+            Value::Boolean(b) => b.to_string(),
+            _ => v.to_string(),
+        };
+
+        if k == "schema_name" {
+            schema_name = val_str;
+        } else {
+            table_options.insert(k, val_str);
+        }
+    }
+
+    println!("  Target Catalog: {}", stmt.name);
+    println!("  Data Location: {}", stmt.location);
+    println!("  Resolved Schema: {schema_name}");
+
+    // Register a local object store rooted at the workspace root.
+    // We use a specific authority 'workspace' to ensure consistent resolution.
+    let store = Arc::new(LocalFileSystem::new_with_prefix(workspace_root())?);
+    let store_url = url::Url::parse("local://workspace").unwrap();
+    ctx.register_object_store(&store_url, Arc::clone(&store) as _);
+
+    let target_ext = format!(".{}", stmt.catalog_type.to_lowercase());
+
+    // For 'local://workspace/parquet-testing/data', the path is 
'parquet-testing/data'.
+    let path_str = stmt
+        .location
+        .strip_prefix("local://workspace/")
+        .unwrap_or(&stmt.location);
+    let prefix = object_store::path::Path::from(path_str);
+
+    // Discover data files using the ObjectStore API
+    let mut table_count = 0;
+    let mut list_stream = store.list(Some(&prefix));
+
+    while let Some(meta) = list_stream.next().await {
+        let meta = meta?;
+        let path = &meta.location;
+
+        if path.as_ref().ends_with(&target_ext) {
+            let name = std::path::Path::new(path.as_ref())
+                .file_stem()
+                .unwrap()
+                .to_string_lossy()
+                .to_string();
+
+            let table_url = format!("local://workspace/{path}");
+
+            let cmd = CreateExternalTable::builder(
+                TableReference::bare(name.clone()),
+                table_url,
+                stmt.catalog_type.clone(),
+                Arc::new(DFSchema::empty()),
+            )
+            .with_options(table_options.clone())
+            .build();
+
+            if let Ok(table) = factory.create(&ctx.state(), &cmd).await {
+                schema.register_table(name, table)?;
+                table_count += 1;
+            }
+        }
+    }
+    println!("  Registered {table_count} tables into schema: {schema_name}");
+
+    catalog.register_schema(&schema_name, schema)?;
+    ctx.register_catalog(stmt.name.to_string(), catalog);
+
+    Ok(())
+}
+
+/// Possible statements returned by our custom parser.
+#[derive(Debug, Clone)]
+pub enum CustomStatement {
+    /// Standard DataFusion statement
+    DFStatement(Box<Statement>),
+    /// Custom `CREATE EXTERNAL CATALOG` statement
+    CreateExternalCatalog(CreateExternalCatalog),
+}
+
+/// Data structure for `CREATE EXTERNAL CATALOG`.
+#[derive(Debug, Clone)]
+pub struct CreateExternalCatalog {
+    pub name: ObjectName,
+    pub catalog_type: String,
+    pub location: String,
+    pub options: Vec<(String, Value)>,
+}
+
+impl Display for CustomStatement {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        match self {
+            Self::DFStatement(s) => write!(f, "{s}"),
+            Self::CreateExternalCatalog(s) => write!(f, "{s}"),
+        }
+    }
+}
+
+impl Display for CreateExternalCatalog {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(
+            f,
+            "CREATE EXTERNAL CATALOG {} STORED AS {} LOCATION '{}'",
+            self.name, self.catalog_type, self.location
+        )?;
+        if !self.options.is_empty() {
+            write!(f, " OPTIONS (")?;
+            for (i, (k, v)) in self.options.iter().enumerate() {
+                if i > 0 {
+                    write!(f, ", ")?;
+                }
+                write!(f, "'{k}' = '{v}'")?;
+            }
+            write!(f, ")")?;
+        }
+        Ok(())
+    }
+}
+
+/// A parser that extends `DFParser` with custom syntax.
+struct CustomParser<'a> {
+    df_parser: DFParser<'a>,
+}
+
+impl<'a> CustomParser<'a> {
+    fn new(sql: &'a str) -> Result<Self> {
+        Ok(Self {
+            df_parser: DFParserBuilder::new(sql).build()?,
+        })
+    }
+
+    pub fn parse_statement(&mut self) -> Result<CustomStatement> {
+        if self.is_create_external_catalog() {
+            return self.parse_create_external_catalog();
+        }
+        Ok(CustomStatement::DFStatement(Box::new(
+            self.df_parser.parse_statement()?,
+        )))
+    }
+
+    fn is_create_external_catalog(&self) -> bool {
+        let t1 = &self.df_parser.parser.peek_nth_token(0).token;
+        let t2 = &self.df_parser.parser.peek_nth_token(1).token;
+        let t3 = &self.df_parser.parser.peek_nth_token(2).token;
+
+        matches!(t1, Token::Word(w) if w.keyword == Keyword::CREATE)
+            && matches!(t2, Token::Word(w) if w.keyword == Keyword::EXTERNAL)
+            && matches!(t3, Token::Word(w) if w.value.to_uppercase() == 
"CATALOG")
+    }
+
+    fn parse_create_external_catalog(&mut self) -> Result<CustomStatement> {
+        // Consume prefix tokens: CREATE EXTERNAL CATALOG
+        for _ in 0..3 {
+            self.df_parser.parser.next_token();
+        }
+
+        let name = self
+            .df_parser
+            .parser
+            .parse_object_name(false)
+            .map_err(|e| DataFusionError::External(Box::new(e)))?;
+
+        let mut catalog_type = None;
+        let mut location = None;
+        let mut options = vec![];
+
+        while let Some(keyword) = 
self.df_parser.parser.parse_one_of_keywords(&[
+            Keyword::STORED,
+            Keyword::LOCATION,
+            Keyword::OPTIONS,
+        ]) {
+            match keyword {
+                Keyword::STORED => {
+                    self.df_parser

Review Comment:
   ```suggestion
                       if catalog_type.is_some() {
                           return Err(DataFusionError::Plan("Duplicate STORED 
AS clause".to_string()));
                       }
                       self.df_parser
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to