leekeiabstraction commented on code in PR #206:
URL: 
https://github.com/apache/flink-connector-aws/pull/206#discussion_r2095889230


##########
flink-catalog-aws/flink-catalog-aws-glue/README.md:
##########
@@ -0,0 +1,358 @@
+# Flink AWS Glue Catalog Connector
+
+The Flink AWS Glue Catalog connector provides integration between Apache Flink 
and the AWS Glue Data Catalog. This connector enables Flink applications to use 
AWS Glue as a metadata catalog for tables, databases, and schemas, allowing 
seamless SQL queries against AWS resources.
+
+## Features
+
+- Register AWS Glue as a catalog in Flink applications
+- Access Glue databases and tables through Flink SQL
+- Support for various AWS data sources (S3, Kinesis, MSK)
+- Mapping between Flink and AWS Glue data types
+- Compatibility with Flink's Table API and SQL interface
+
+## Prerequisites
+
+Before getting started, ensure you have the following:
+
+- **AWS account** with appropriate permissions for AWS Glue and other required 
services
+- **AWS credentials** properly configured
+
+## Getting Started
+
+### 1. Add Dependency
+
+Add the AWS Glue Catalog connector to your Flink project:
+
+### 2. Configure AWS Credentials
+
+Ensure AWS credentials are configured using one of these methods:
+
+- Environment variables
+- AWS credentials file
+- IAM roles (for applications running on AWS)
+
+### 3. Register the Glue Catalog
+
+You can register the AWS Glue catalog using either the Table API or SQL:
+
+#### Using Table API (Java/Scala)
+
+```java
+// Java/Scala
+import org.apache.flink.table.catalog.glue.GlueCatalog;
+import org.apache.flink.table.catalog.Catalog;
+
+// Create Glue catalog instance
+Catalog glueCatalog = new GlueCatalog(
+    "glue_catalog",      // Catalog name
+    "default",           // Default database
+    "us-east-1");         // AWS region
+
+
+// Register with table environment
+tableEnv.registerCatalog("glue_catalog", glueCatalog);
+tableEnv.useCatalog("glue_catalog");
+```
+
+#### Using Table API (Python)
+
+```python
+# Python
+from pyflink.table.catalog import GlueCatalog
+
+# Create and register Glue catalog
+glue_catalog = GlueCatalog(
+    "glue_catalog",      # Catalog name
+    "default",           # Default database
+    "us-east-1")         # AWS region
+
+t_env.register_catalog("glue_catalog", glue_catalog)
+t_env.use_catalog("glue_catalog")
+```
+
+#### Using SQL
+
+In the Flink SQL Client, create and use the Glue catalog:
+
+```sql
+-- Create a catalog using Glue
+CREATE CATALOG glue_catalog WITH (
+    'type' = 'glue',
+    'catalog-name' = 'glue_catalog',
+    'default-database' = 'default',
+    'region' = 'us-east-1'
+);
+
+-- Use the created catalog
+USE CATALOG glue_catalog;
+
+-- Use a specific database
+USE default;
+```
+
+### 4. Create or Reference Glue Tables

Review Comment:
   In the example below, will user need to have the 
`flink-sql-connector-kinesis` within classpath/library/module of Flink? If so, 
we should mention it here or earlier in prerequisite.



##########
flink-catalog-aws/flink-catalog-aws-glue/README.md:
##########
@@ -0,0 +1,358 @@
+# Flink AWS Glue Catalog Connector
+
+The Flink AWS Glue Catalog connector provides integration between Apache Flink 
and the AWS Glue Data Catalog. This connector enables Flink applications to use 
AWS Glue as a metadata catalog for tables, databases, and schemas, allowing 
seamless SQL queries against AWS resources.
+
+## Features
+
+- Register AWS Glue as a catalog in Flink applications
+- Access Glue databases and tables through Flink SQL
+- Support for various AWS data sources (S3, Kinesis, MSK)
+- Mapping between Flink and AWS Glue data types
+- Compatibility with Flink's Table API and SQL interface
+
+## Prerequisites
+
+Before getting started, ensure you have the following:
+
+- **AWS account** with appropriate permissions for AWS Glue and other required 
services
+- **AWS credentials** properly configured
+
+## Getting Started
+
+### 1. Add Dependency
+
+Add the AWS Glue Catalog connector to your Flink project:
+
+### 2. Configure AWS Credentials

Review Comment:
   We should mention here or under pre-requisite that the role/user/session 
associated with the credentials should have the appropriate permission to Glue 
database. 



##########
flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java:
##########
@@ -0,0 +1,935 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue;
+
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogView;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import 
org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
+import org.apache.flink.table.catalog.glue.operator.GlueDatabaseOperator;
+import org.apache.flink.table.catalog.glue.operator.GlueFunctionOperator;
+import org.apache.flink.table.catalog.glue.operator.GlueTableOperator;
+import org.apache.flink.table.catalog.glue.util.GlueCatalogConstants;
+import org.apache.flink.table.catalog.glue.util.GlueTableUtils;
+import org.apache.flink.table.catalog.glue.util.GlueTypeConverter;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.functions.FunctionIdentifier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.StorageDescriptor;
+import software.amazon.awssdk.services.glue.model.Table;
+import software.amazon.awssdk.services.glue.model.TableInput;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * GlueCatalog is an implementation of the Flink AbstractCatalog that 
interacts with AWS Glue.
+ * This class allows Flink to perform various catalog operations such as 
creating, deleting, and retrieving
+ * databases and tables from Glue. It encapsulates AWS Glue's API and provides 
a Flink-compatible interface.
+ *
+ * <p>This catalog uses GlueClient to interact with AWS Glue services, and 
operations related to databases and
+ * tables are delegated to respective helper classes like 
GlueDatabaseOperations and GlueTableOperations.</p>
+ */
+public class GlueCatalog extends AbstractCatalog {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(GlueCatalog.class);
+
+    private final GlueClient glueClient;
+    private final GlueTypeConverter glueTypeConverter;
+    private final GlueDatabaseOperator glueDatabaseOperations;
+    private final GlueTableOperator glueTableOperations;
+    private final GlueFunctionOperator glueFunctionsOperations;
+    private final GlueTableUtils glueTableUtils;
+
+    /**
+     * Constructs a GlueCatalog with a provided Glue client.
+     *
+     * @param name            the name of the catalog
+     * @param defaultDatabase the default database for the catalog
+     * @param region          the AWS region to be used for Glue operations
+     * @param glueClient      Glue Client so we can decide which one to use 
for testing
+     */
+    public GlueCatalog(String name, String defaultDatabase, String region, 
GlueClient glueClient) {
+        super(name, defaultDatabase);
+
+        // Initialize GlueClient in the constructor
+        if (glueClient != null) {
+            this.glueClient = glueClient;
+        } else {
+            // If no GlueClient is provided, initialize it using the default 
region
+            this.glueClient = GlueClient.builder()
+                    .region(Region.of(region))
+                    .build();
+        }
+        this.glueTypeConverter = new GlueTypeConverter();
+        this.glueTableUtils = new GlueTableUtils(glueTypeConverter);
+        this.glueDatabaseOperations = new GlueDatabaseOperator(glueClient, 
getName());
+        this.glueTableOperations = new GlueTableOperator(glueClient, 
getName());
+        this.glueFunctionsOperations = new GlueFunctionOperator(glueClient, 
getName());
+    }
+
+    /**
+     * Constructs a GlueCatalog with default client.
+     *
+     * @param name            the name of the catalog
+     * @param defaultDatabase the default database for the catalog
+     * @param region          the AWS region to be used for Glue operations
+     */
+    public GlueCatalog(String name, String defaultDatabase, String region) {
+        super(name, defaultDatabase);
+
+        // Create a synchronized client builder to avoid concurrent 
modification exceptions
+        this.glueClient = GlueClient.builder()
+                .region(Region.of(region))
+                
.credentialsProvider(software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider.create())
+                .build();
+        this.glueTypeConverter = new GlueTypeConverter();
+        this.glueTableUtils = new GlueTableUtils(glueTypeConverter);
+        this.glueDatabaseOperations = new GlueDatabaseOperator(glueClient, 
getName());
+        this.glueTableOperations = new GlueTableOperator(glueClient, 
getName());
+        this.glueFunctionsOperations = new GlueFunctionOperator(glueClient, 
getName());
+    }
+
+    /**
+     * Opens the GlueCatalog and initializes necessary resources.
+     *
+     * @throws CatalogException if an error occurs during the opening process
+     */
+    @Override
+    public void open() throws CatalogException {
+        LOG.info("Opening GlueCatalog with client: {}", glueClient);
+    }
+
+    /**
+     * Closes the GlueCatalog and releases resources.
+     *
+     * @throws CatalogException if an error occurs during the closing process
+     */
+    @Override
+    public void close() throws CatalogException {
+        if (glueClient != null) {
+            LOG.info("Closing GlueCatalog client");
+            int maxRetries = 3;
+            int retryCount = 0;
+            long retryDelayMs = 200;
+            while (retryCount < maxRetries) {
+                try {
+                    glueClient.close();
+                    LOG.info("Successfully closed GlueCatalog client");
+                    return;
+                } catch (RuntimeException e) {
+                    retryCount++;
+                    if (retryCount >= maxRetries) {
+                        LOG.warn("Failed to close GlueCatalog client after {} 
retries", maxRetries, e);
+                        throw new CatalogException("Failed to close 
GlueCatalog client", e);
+                    }
+                    LOG.warn("Failed to close GlueCatalog client (attempt 
{}/{}), retrying in {} ms",
+                            retryCount, maxRetries, retryDelayMs, e);
+                    try {
+                        Thread.sleep(retryDelayMs);
+                        // Exponential backoff
+                        retryDelayMs *= 2;
+                    } catch (InterruptedException ie) {
+                        Thread.currentThread().interrupt();
+                        throw new CatalogException("Interrupted while retrying 
to close GlueCatalog client", ie);
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Lists all the databases available in the Glue catalog.
+     *
+     * @return a list of database names
+     * @throws CatalogException if an error occurs while listing the databases
+     */
+    @Override
+    public List<String> listDatabases() throws CatalogException {
+        return glueDatabaseOperations.listDatabases();
+    }
+
+    /**
+     * Retrieves a specific database by its name.
+     *
+     * @param databaseName the name of the database to retrieve
+     * @return the database if found
+     * @throws DatabaseNotExistException if the database does not exist
+     * @throws CatalogException          if an error occurs while retrieving 
the database
+     */
+    @Override
+    public CatalogDatabase getDatabase(String databaseName) throws 
DatabaseNotExistException, CatalogException {
+        boolean databaseExists = databaseExists(databaseName);
+        if (!databaseExists) {
+            throw new DatabaseNotExistException(getName(), databaseName);
+        }
+
+        return glueDatabaseOperations.getDatabase(databaseName);
+    }
+
+    /**
+     * Checks if a database exists in Glue.
+     *
+     * @param databaseName the name of the database
+     * @return true if the database exists, false otherwise
+     * @throws CatalogException if an error occurs while checking the database
+     */
+    @Override
+    public boolean databaseExists(String databaseName) throws CatalogException 
{
+        return glueDatabaseOperations.glueDatabaseExists(databaseName);
+    }
+
+    /**
+     * Creates a new database in Glue.
+     *
+     * @param databaseName    the name of the database to create
+     * @param catalogDatabase the catalog database object containing database 
metadata
+     * @param ifNotExists     flag indicating whether to ignore the error if 
the database already exists
+     * @throws DatabaseAlreadyExistException if the database already exists 
and ifNotExists is false
+     * @throws CatalogException              if an error occurs while creating 
the database
+     */
+    @Override
+    public void createDatabase(String databaseName, CatalogDatabase 
catalogDatabase, boolean ifNotExists)
+            throws DatabaseAlreadyExistException, CatalogException {
+        boolean exists = databaseExists(databaseName);
+
+        if (exists && !ifNotExists) {
+            throw new DatabaseAlreadyExistException(getName(), databaseName);
+        }
+
+        if (!exists) {
+            glueDatabaseOperations.createDatabase(databaseName, 
catalogDatabase);
+        }
+    }
+
+    /**
+     * Drops an existing database in Glue.
+     *
+     * @param databaseName      the name of the database to drop
+     * @param ignoreIfNotExists flag to ignore the exception if the database 
doesn't exist
+     * @param cascade           flag indicating whether to cascade the 
operation to drop related objects
+     * @throws DatabaseNotExistException if the database does not exist and 
ignoreIfNotExists is false
+     * @throws DatabaseNotEmptyException if the database contains objects and 
cascade is false
+     * @throws CatalogException          if an error occurs while dropping the 
database
+     */
+    @Override
+    public void dropDatabase(String databaseName, boolean ignoreIfNotExists, 
boolean cascade)
+            throws DatabaseNotExistException, DatabaseNotEmptyException, 
CatalogException {

Review Comment:
   Is DatabaseNotEmptyException thrown from within this method?



##########
flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java:
##########
@@ -0,0 +1,935 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue;
+
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogView;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import 
org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
+import org.apache.flink.table.catalog.glue.operator.GlueDatabaseOperator;
+import org.apache.flink.table.catalog.glue.operator.GlueFunctionOperator;
+import org.apache.flink.table.catalog.glue.operator.GlueTableOperator;
+import org.apache.flink.table.catalog.glue.util.GlueCatalogConstants;
+import org.apache.flink.table.catalog.glue.util.GlueTableUtils;
+import org.apache.flink.table.catalog.glue.util.GlueTypeConverter;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.functions.FunctionIdentifier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.StorageDescriptor;
+import software.amazon.awssdk.services.glue.model.Table;
+import software.amazon.awssdk.services.glue.model.TableInput;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * GlueCatalog is an implementation of the Flink AbstractCatalog that 
interacts with AWS Glue.
+ * This class allows Flink to perform various catalog operations such as 
creating, deleting, and retrieving
+ * databases and tables from Glue. It encapsulates AWS Glue's API and provides 
a Flink-compatible interface.
+ *
+ * <p>This catalog uses GlueClient to interact with AWS Glue services, and 
operations related to databases and
+ * tables are delegated to respective helper classes like 
GlueDatabaseOperations and GlueTableOperations.</p>
+ */
+public class GlueCatalog extends AbstractCatalog {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(GlueCatalog.class);
+
+    private final GlueClient glueClient;
+    private final GlueTypeConverter glueTypeConverter;
+    private final GlueDatabaseOperator glueDatabaseOperations;
+    private final GlueTableOperator glueTableOperations;
+    private final GlueFunctionOperator glueFunctionsOperations;
+    private final GlueTableUtils glueTableUtils;
+
+    /**
+     * Constructs a GlueCatalog with a provided Glue client.
+     *
+     * @param name            the name of the catalog
+     * @param defaultDatabase the default database for the catalog
+     * @param region          the AWS region to be used for Glue operations
+     * @param glueClient      Glue Client so we can decide which one to use 
for testing

Review Comment:
   The 2 constructors have a number of duplicated code.
   
   It looks like the duplicated code is primarily to instantiate client 
wrappers. Would it make sense to do the following instead (argument checks 
omitted)?
   
   ```
       @VisibleForTesting
       GlueCatalog(String name, String defaultDatabase, GlueClient glueClient) {
           super(name, defaultDatabase);
           setup(glueClient);
       }
       
       public GlueCatalog(String name, String defaultDatabase, String region) {
           super(name,  defaultDatabase);
           
           GlueClient client = GlueClient.builder()
                   .region(Region.of(region))
                   
.credentialsProvider(software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider.create())
                   .build());
            setup(client);
       }
       
       private void setup(GlueClient glueClient) {
           this.glueClient = glueClient;
   
           this.glueTypeConverter = new GlueTypeConverter();
           this.glueTableUtils = new GlueTableUtils(glueTypeConverter);
           this.glueDatabaseOperations = new GlueDatabaseOperator(glueClient, 
getName());
           this.glueTableOperations = new GlueTableOperator(glueClient, 
getName());
           this.glueFunctionsOperations = new GlueFunctionOperator(glueClient, 
getName());
       }
       
   ```



##########
flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java:
##########
@@ -0,0 +1,935 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue;
+
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogView;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import 
org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
+import org.apache.flink.table.catalog.glue.operator.GlueDatabaseOperator;
+import org.apache.flink.table.catalog.glue.operator.GlueFunctionOperator;
+import org.apache.flink.table.catalog.glue.operator.GlueTableOperator;
+import org.apache.flink.table.catalog.glue.util.GlueCatalogConstants;
+import org.apache.flink.table.catalog.glue.util.GlueTableUtils;
+import org.apache.flink.table.catalog.glue.util.GlueTypeConverter;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.functions.FunctionIdentifier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.StorageDescriptor;
+import software.amazon.awssdk.services.glue.model.Table;
+import software.amazon.awssdk.services.glue.model.TableInput;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * GlueCatalog is an implementation of the Flink AbstractCatalog that 
interacts with AWS Glue.
+ * This class allows Flink to perform various catalog operations such as 
creating, deleting, and retrieving
+ * databases and tables from Glue. It encapsulates AWS Glue's API and provides 
a Flink-compatible interface.
+ *
+ * <p>This catalog uses GlueClient to interact with AWS Glue services, and 
operations related to databases and
+ * tables are delegated to respective helper classes like 
GlueDatabaseOperations and GlueTableOperations.</p>
+ */
+public class GlueCatalog extends AbstractCatalog {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(GlueCatalog.class);
+
+    private final GlueClient glueClient;
+    private final GlueTypeConverter glueTypeConverter;
+    private final GlueDatabaseOperator glueDatabaseOperations;
+    private final GlueTableOperator glueTableOperations;
+    private final GlueFunctionOperator glueFunctionsOperations;
+    private final GlueTableUtils glueTableUtils;
+
+    /**
+     * Constructs a GlueCatalog with a provided Glue client.
+     *
+     * @param name            the name of the catalog
+     * @param defaultDatabase the default database for the catalog
+     * @param region          the AWS region to be used for Glue operations
+     * @param glueClient      Glue Client so we can decide which one to use 
for testing
+     */
+    public GlueCatalog(String name, String defaultDatabase, String region, 
GlueClient glueClient) {
+        super(name, defaultDatabase);
+
+        // Initialize GlueClient in the constructor
+        if (glueClient != null) {
+            this.glueClient = glueClient;
+        } else {
+            // If no GlueClient is provided, initialize it using the default 
region
+            this.glueClient = GlueClient.builder()
+                    .region(Region.of(region))
+                    .build();
+        }
+        this.glueTypeConverter = new GlueTypeConverter();
+        this.glueTableUtils = new GlueTableUtils(glueTypeConverter);
+        this.glueDatabaseOperations = new GlueDatabaseOperator(glueClient, 
getName());
+        this.glueTableOperations = new GlueTableOperator(glueClient, 
getName());
+        this.glueFunctionsOperations = new GlueFunctionOperator(glueClient, 
getName());
+    }
+
+    /**
+     * Constructs a GlueCatalog with default client.
+     *
+     * @param name            the name of the catalog
+     * @param defaultDatabase the default database for the catalog
+     * @param region          the AWS region to be used for Glue operations
+     */
+    public GlueCatalog(String name, String defaultDatabase, String region) {
+        super(name, defaultDatabase);
+
+        // Create a synchronized client builder to avoid concurrent 
modification exceptions
+        this.glueClient = GlueClient.builder()
+                .region(Region.of(region))
+                
.credentialsProvider(software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider.create())
+                .build();
+        this.glueTypeConverter = new GlueTypeConverter();
+        this.glueTableUtils = new GlueTableUtils(glueTypeConverter);
+        this.glueDatabaseOperations = new GlueDatabaseOperator(glueClient, 
getName());
+        this.glueTableOperations = new GlueTableOperator(glueClient, 
getName());
+        this.glueFunctionsOperations = new GlueFunctionOperator(glueClient, 
getName());
+    }
+
+    /**
+     * Opens the GlueCatalog and initializes necessary resources.
+     *
+     * @throws CatalogException if an error occurs during the opening process
+     */
+    @Override
+    public void open() throws CatalogException {
+        LOG.info("Opening GlueCatalog with client: {}", glueClient);
+    }
+
+    /**
+     * Closes the GlueCatalog and releases resources.
+     *
+     * @throws CatalogException if an error occurs during the closing process
+     */
+    @Override
+    public void close() throws CatalogException {
+        if (glueClient != null) {

Review Comment:
   Is there a scenario where glueClient would be null?



##########
flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java:
##########
@@ -0,0 +1,935 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue;
+
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogView;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import 
org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
+import org.apache.flink.table.catalog.glue.operator.GlueDatabaseOperator;
+import org.apache.flink.table.catalog.glue.operator.GlueFunctionOperator;
+import org.apache.flink.table.catalog.glue.operator.GlueTableOperator;
+import org.apache.flink.table.catalog.glue.util.GlueCatalogConstants;
+import org.apache.flink.table.catalog.glue.util.GlueTableUtils;
+import org.apache.flink.table.catalog.glue.util.GlueTypeConverter;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.functions.FunctionIdentifier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.StorageDescriptor;
+import software.amazon.awssdk.services.glue.model.Table;
+import software.amazon.awssdk.services.glue.model.TableInput;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * GlueCatalog is an implementation of the Flink AbstractCatalog that 
interacts with AWS Glue.
+ * This class allows Flink to perform various catalog operations such as 
creating, deleting, and retrieving
+ * databases and tables from Glue. It encapsulates AWS Glue's API and provides 
a Flink-compatible interface.
+ *
+ * <p>This catalog uses GlueClient to interact with AWS Glue services, and 
operations related to databases and
+ * tables are delegated to respective helper classes like 
GlueDatabaseOperations and GlueTableOperations.</p>
+ */
+public class GlueCatalog extends AbstractCatalog {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(GlueCatalog.class);
+
+    private final GlueClient glueClient;
+    private final GlueTypeConverter glueTypeConverter;
+    private final GlueDatabaseOperator glueDatabaseOperations;
+    private final GlueTableOperator glueTableOperations;
+    private final GlueFunctionOperator glueFunctionsOperations;
+    private final GlueTableUtils glueTableUtils;
+
+    /**
+     * Constructs a GlueCatalog with a provided Glue client.
+     *
+     * @param name            the name of the catalog
+     * @param defaultDatabase the default database for the catalog
+     * @param region          the AWS region to be used for Glue operations
+     * @param glueClient      Glue Client so we can decide which one to use 
for testing
+     */
+    public GlueCatalog(String name, String defaultDatabase, String region, 
GlueClient glueClient) {
+        super(name, defaultDatabase);
+
+        // Initialize GlueClient in the constructor
+        if (glueClient != null) {
+            this.glueClient = glueClient;
+        } else {
+            // If no GlueClient is provided, initialize it using the default 
region
+            this.glueClient = GlueClient.builder()
+                    .region(Region.of(region))
+                    .build();
+        }
+        this.glueTypeConverter = new GlueTypeConverter();
+        this.glueTableUtils = new GlueTableUtils(glueTypeConverter);
+        this.glueDatabaseOperations = new GlueDatabaseOperator(glueClient, 
getName());
+        this.glueTableOperations = new GlueTableOperator(glueClient, 
getName());
+        this.glueFunctionsOperations = new GlueFunctionOperator(glueClient, 
getName());
+    }
+
+    /**
+     * Constructs a GlueCatalog with default client.
+     *
+     * @param name            the name of the catalog
+     * @param defaultDatabase the default database for the catalog
+     * @param region          the AWS region to be used for Glue operations
+     */
+    public GlueCatalog(String name, String defaultDatabase, String region) {
+        super(name, defaultDatabase);
+
+        // Create a synchronized client builder to avoid concurrent 
modification exceptions
+        this.glueClient = GlueClient.builder()
+                .region(Region.of(region))
+                
.credentialsProvider(software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider.create())
+                .build();
+        this.glueTypeConverter = new GlueTypeConverter();
+        this.glueTableUtils = new GlueTableUtils(glueTypeConverter);
+        this.glueDatabaseOperations = new GlueDatabaseOperator(glueClient, 
getName());
+        this.glueTableOperations = new GlueTableOperator(glueClient, 
getName());
+        this.glueFunctionsOperations = new GlueFunctionOperator(glueClient, 
getName());
+    }
+
+    /**
+     * Opens the GlueCatalog and initializes necessary resources.
+     *
+     * @throws CatalogException if an error occurs during the opening process
+     */
+    @Override
+    public void open() throws CatalogException {
+        LOG.info("Opening GlueCatalog with client: {}", glueClient);
+    }
+
+    /**
+     * Closes the GlueCatalog and releases resources.
+     *
+     * @throws CatalogException if an error occurs during the closing process
+     */
+    @Override
+    public void close() throws CatalogException {
+        if (glueClient != null) {
+            LOG.info("Closing GlueCatalog client");
+            int maxRetries = 3;
+            int retryCount = 0;
+            long retryDelayMs = 200;
+            while (retryCount < maxRetries) {

Review Comment:
   Curious on what is the motivation for retries here? Is there a specific 
RuntimeException that we have in mind? 
   
   I had a look at AWS sdk, the close is best-effort and logs in case of any 
exceptions: 
https://github.com/aws/aws-sdk-java-v2/blob/master/utils/src/main/java/software/amazon/awssdk/utils/IoUtils.java#L76-L85
   
   We can simplify code here by removing retry logic as sdk does not seem to be 
surfacing exceptions during close().



##########
flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java:
##########
@@ -0,0 +1,935 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue;
+
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogView;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import 
org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
+import org.apache.flink.table.catalog.glue.operator.GlueDatabaseOperator;
+import org.apache.flink.table.catalog.glue.operator.GlueFunctionOperator;
+import org.apache.flink.table.catalog.glue.operator.GlueTableOperator;
+import org.apache.flink.table.catalog.glue.util.GlueCatalogConstants;
+import org.apache.flink.table.catalog.glue.util.GlueTableUtils;
+import org.apache.flink.table.catalog.glue.util.GlueTypeConverter;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.functions.FunctionIdentifier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.StorageDescriptor;
+import software.amazon.awssdk.services.glue.model.Table;
+import software.amazon.awssdk.services.glue.model.TableInput;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * GlueCatalog is an implementation of the Flink AbstractCatalog that 
interacts with AWS Glue.
+ * This class allows Flink to perform various catalog operations such as 
creating, deleting, and retrieving
+ * databases and tables from Glue. It encapsulates AWS Glue's API and provides 
a Flink-compatible interface.
+ *
+ * <p>This catalog uses GlueClient to interact with AWS Glue services, and 
operations related to databases and
+ * tables are delegated to respective helper classes like 
GlueDatabaseOperations and GlueTableOperations.</p>
+ */
+public class GlueCatalog extends AbstractCatalog {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(GlueCatalog.class);
+
+    private final GlueClient glueClient;
+    private final GlueTypeConverter glueTypeConverter;
+    private final GlueDatabaseOperator glueDatabaseOperations;
+    private final GlueTableOperator glueTableOperations;
+    private final GlueFunctionOperator glueFunctionsOperations;
+    private final GlueTableUtils glueTableUtils;
+
+    /**
+     * Constructs a GlueCatalog with a provided Glue client.
+     *
+     * @param name            the name of the catalog
+     * @param defaultDatabase the default database for the catalog
+     * @param region          the AWS region to be used for Glue operations
+     * @param glueClient      Glue Client so we can decide which one to use 
for testing
+     */
+    public GlueCatalog(String name, String defaultDatabase, String region, 
GlueClient glueClient) {
+        super(name, defaultDatabase);
+
+        // Initialize GlueClient in the constructor
+        if (glueClient != null) {
+            this.glueClient = glueClient;
+        } else {
+            // If no GlueClient is provided, initialize it using the default 
region
+            this.glueClient = GlueClient.builder()
+                    .region(Region.of(region))
+                    .build();
+        }
+        this.glueTypeConverter = new GlueTypeConverter();
+        this.glueTableUtils = new GlueTableUtils(glueTypeConverter);
+        this.glueDatabaseOperations = new GlueDatabaseOperator(glueClient, 
getName());
+        this.glueTableOperations = new GlueTableOperator(glueClient, 
getName());
+        this.glueFunctionsOperations = new GlueFunctionOperator(glueClient, 
getName());
+    }
+
+    /**
+     * Constructs a GlueCatalog with default client.
+     *
+     * @param name            the name of the catalog
+     * @param defaultDatabase the default database for the catalog
+     * @param region          the AWS region to be used for Glue operations
+     */
+    public GlueCatalog(String name, String defaultDatabase, String region) {
+        super(name, defaultDatabase);
+
+        // Create a synchronized client builder to avoid concurrent 
modification exceptions
+        this.glueClient = GlueClient.builder()
+                .region(Region.of(region))
+                
.credentialsProvider(software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider.create())
+                .build();
+        this.glueTypeConverter = new GlueTypeConverter();
+        this.glueTableUtils = new GlueTableUtils(glueTypeConverter);
+        this.glueDatabaseOperations = new GlueDatabaseOperator(glueClient, 
getName());
+        this.glueTableOperations = new GlueTableOperator(glueClient, 
getName());
+        this.glueFunctionsOperations = new GlueFunctionOperator(glueClient, 
getName());
+    }
+
+    /**
+     * Opens the GlueCatalog and initializes necessary resources.
+     *
+     * @throws CatalogException if an error occurs during the opening process
+     */
+    @Override
+    public void open() throws CatalogException {
+        LOG.info("Opening GlueCatalog with client: {}", glueClient);
+    }
+
+    /**
+     * Closes the GlueCatalog and releases resources.
+     *
+     * @throws CatalogException if an error occurs during the closing process
+     */
+    @Override
+    public void close() throws CatalogException {
+        if (glueClient != null) {
+            LOG.info("Closing GlueCatalog client");
+            int maxRetries = 3;
+            int retryCount = 0;
+            long retryDelayMs = 200;
+            while (retryCount < maxRetries) {
+                try {
+                    glueClient.close();
+                    LOG.info("Successfully closed GlueCatalog client");
+                    return;
+                } catch (RuntimeException e) {
+                    retryCount++;
+                    if (retryCount >= maxRetries) {
+                        LOG.warn("Failed to close GlueCatalog client after {} 
retries", maxRetries, e);
+                        throw new CatalogException("Failed to close 
GlueCatalog client", e);
+                    }
+                    LOG.warn("Failed to close GlueCatalog client (attempt 
{}/{}), retrying in {} ms",
+                            retryCount, maxRetries, retryDelayMs, e);
+                    try {
+                        Thread.sleep(retryDelayMs);
+                        // Exponential backoff
+                        retryDelayMs *= 2;
+                    } catch (InterruptedException ie) {
+                        Thread.currentThread().interrupt();
+                        throw new CatalogException("Interrupted while retrying 
to close GlueCatalog client", ie);
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Lists all the databases available in the Glue catalog.
+     *
+     * @return a list of database names
+     * @throws CatalogException if an error occurs while listing the databases
+     */
+    @Override
+    public List<String> listDatabases() throws CatalogException {
+        return glueDatabaseOperations.listDatabases();
+    }
+
+    /**
+     * Retrieves a specific database by its name.
+     *
+     * @param databaseName the name of the database to retrieve
+     * @return the database if found
+     * @throws DatabaseNotExistException if the database does not exist
+     * @throws CatalogException          if an error occurs while retrieving 
the database
+     */
+    @Override
+    public CatalogDatabase getDatabase(String databaseName) throws 
DatabaseNotExistException, CatalogException {
+        boolean databaseExists = databaseExists(databaseName);
+        if (!databaseExists) {
+            throw new DatabaseNotExistException(getName(), databaseName);
+        }
+
+        return glueDatabaseOperations.getDatabase(databaseName);
+    }
+
+    /**
+     * Checks if a database exists in Glue.
+     *
+     * @param databaseName the name of the database
+     * @return true if the database exists, false otherwise
+     * @throws CatalogException if an error occurs while checking the database
+     */
+    @Override
+    public boolean databaseExists(String databaseName) throws CatalogException 
{
+        return glueDatabaseOperations.glueDatabaseExists(databaseName);
+    }
+
+    /**
+     * Creates a new database in Glue.
+     *
+     * @param databaseName    the name of the database to create
+     * @param catalogDatabase the catalog database object containing database 
metadata
+     * @param ifNotExists     flag indicating whether to ignore the error if 
the database already exists
+     * @throws DatabaseAlreadyExistException if the database already exists 
and ifNotExists is false
+     * @throws CatalogException              if an error occurs while creating 
the database
+     */
+    @Override
+    public void createDatabase(String databaseName, CatalogDatabase 
catalogDatabase, boolean ifNotExists)
+            throws DatabaseAlreadyExistException, CatalogException {
+        boolean exists = databaseExists(databaseName);
+
+        if (exists && !ifNotExists) {
+            throw new DatabaseAlreadyExistException(getName(), databaseName);
+        }
+
+        if (!exists) {
+            glueDatabaseOperations.createDatabase(databaseName, 
catalogDatabase);
+        }
+    }
+
+    /**
+     * Drops an existing database in Glue.
+     *
+     * @param databaseName      the name of the database to drop
+     * @param ignoreIfNotExists flag to ignore the exception if the database 
doesn't exist
+     * @param cascade           flag indicating whether to cascade the 
operation to drop related objects
+     * @throws DatabaseNotExistException if the database does not exist and 
ignoreIfNotExists is false
+     * @throws DatabaseNotEmptyException if the database contains objects and 
cascade is false
+     * @throws CatalogException          if an error occurs while dropping the 
database
+     */
+    @Override
+    public void dropDatabase(String databaseName, boolean ignoreIfNotExists, 
boolean cascade)
+            throws DatabaseNotExistException, DatabaseNotEmptyException, 
CatalogException {
+        if (databaseExists(databaseName)) {
+            glueDatabaseOperations.dropGlueDatabase(databaseName);
+        } else if (!ignoreIfNotExists) {
+            throw new DatabaseNotExistException(getName(), databaseName);
+        }
+    }
+
+    /**
+     * Lists all tables in a specified database.
+     *
+     * @param databaseName the name of the database
+     * @return a list of table names in the database
+     * @throws DatabaseNotExistException if the database does not exist
+     * @throws CatalogException          if an error occurs while listing the 
tables
+     */
+    @Override
+    public List<String> listTables(String databaseName) throws 
DatabaseNotExistException, CatalogException {
+        if (!databaseExists(databaseName)) {
+            throw new DatabaseNotExistException(getName(), databaseName);
+        }

Review Comment:
   These lines seems to be repeated in multiple methods. They can be refactored 
into `validateDatabaseExists(databaseName)`.



##########
flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java:
##########
@@ -0,0 +1,935 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue;
+
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogView;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import 
org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
+import org.apache.flink.table.catalog.glue.operator.GlueDatabaseOperator;
+import org.apache.flink.table.catalog.glue.operator.GlueFunctionOperator;
+import org.apache.flink.table.catalog.glue.operator.GlueTableOperator;
+import org.apache.flink.table.catalog.glue.util.GlueCatalogConstants;
+import org.apache.flink.table.catalog.glue.util.GlueTableUtils;
+import org.apache.flink.table.catalog.glue.util.GlueTypeConverter;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.functions.FunctionIdentifier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.StorageDescriptor;
+import software.amazon.awssdk.services.glue.model.Table;
+import software.amazon.awssdk.services.glue.model.TableInput;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * GlueCatalog is an implementation of the Flink AbstractCatalog that 
interacts with AWS Glue.
+ * This class allows Flink to perform various catalog operations such as 
creating, deleting, and retrieving
+ * databases and tables from Glue. It encapsulates AWS Glue's API and provides 
a Flink-compatible interface.
+ *
+ * <p>This catalog uses GlueClient to interact with AWS Glue services, and 
operations related to databases and
+ * tables are delegated to respective helper classes like 
GlueDatabaseOperations and GlueTableOperations.</p>
+ */
+public class GlueCatalog extends AbstractCatalog {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(GlueCatalog.class);
+
+    private final GlueClient glueClient;
+    private final GlueTypeConverter glueTypeConverter;
+    private final GlueDatabaseOperator glueDatabaseOperations;
+    private final GlueTableOperator glueTableOperations;
+    private final GlueFunctionOperator glueFunctionsOperations;
+    private final GlueTableUtils glueTableUtils;
+
+    /**
+     * Constructs a GlueCatalog with a provided Glue client.
+     *
+     * @param name            the name of the catalog
+     * @param defaultDatabase the default database for the catalog
+     * @param region          the AWS region to be used for Glue operations
+     * @param glueClient      Glue Client so we can decide which one to use 
for testing
+     */
+    public GlueCatalog(String name, String defaultDatabase, String region, 
GlueClient glueClient) {
+        super(name, defaultDatabase);
+
+        // Initialize GlueClient in the constructor
+        if (glueClient != null) {
+            this.glueClient = glueClient;
+        } else {
+            // If no GlueClient is provided, initialize it using the default 
region
+            this.glueClient = GlueClient.builder()
+                    .region(Region.of(region))
+                    .build();
+        }
+        this.glueTypeConverter = new GlueTypeConverter();
+        this.glueTableUtils = new GlueTableUtils(glueTypeConverter);
+        this.glueDatabaseOperations = new GlueDatabaseOperator(glueClient, 
getName());
+        this.glueTableOperations = new GlueTableOperator(glueClient, 
getName());
+        this.glueFunctionsOperations = new GlueFunctionOperator(glueClient, 
getName());
+    }
+
+    /**
+     * Constructs a GlueCatalog with default client.
+     *
+     * @param name            the name of the catalog
+     * @param defaultDatabase the default database for the catalog
+     * @param region          the AWS region to be used for Glue operations
+     */
+    public GlueCatalog(String name, String defaultDatabase, String region) {
+        super(name, defaultDatabase);
+
+        // Create a synchronized client builder to avoid concurrent 
modification exceptions
+        this.glueClient = GlueClient.builder()
+                .region(Region.of(region))
+                
.credentialsProvider(software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider.create())
+                .build();
+        this.glueTypeConverter = new GlueTypeConverter();
+        this.glueTableUtils = new GlueTableUtils(glueTypeConverter);
+        this.glueDatabaseOperations = new GlueDatabaseOperator(glueClient, 
getName());
+        this.glueTableOperations = new GlueTableOperator(glueClient, 
getName());
+        this.glueFunctionsOperations = new GlueFunctionOperator(glueClient, 
getName());
+    }
+
+    /**
+     * Opens the GlueCatalog and initializes necessary resources.
+     *
+     * @throws CatalogException if an error occurs during the opening process
+     */
+    @Override
+    public void open() throws CatalogException {
+        LOG.info("Opening GlueCatalog with client: {}", glueClient);
+    }
+
+    /**
+     * Closes the GlueCatalog and releases resources.
+     *
+     * @throws CatalogException if an error occurs during the closing process
+     */
+    @Override
+    public void close() throws CatalogException {
+        if (glueClient != null) {
+            LOG.info("Closing GlueCatalog client");
+            int maxRetries = 3;
+            int retryCount = 0;
+            long retryDelayMs = 200;
+            while (retryCount < maxRetries) {
+                try {
+                    glueClient.close();
+                    LOG.info("Successfully closed GlueCatalog client");
+                    return;
+                } catch (RuntimeException e) {
+                    retryCount++;
+                    if (retryCount >= maxRetries) {
+                        LOG.warn("Failed to close GlueCatalog client after {} 
retries", maxRetries, e);
+                        throw new CatalogException("Failed to close 
GlueCatalog client", e);
+                    }
+                    LOG.warn("Failed to close GlueCatalog client (attempt 
{}/{}), retrying in {} ms",
+                            retryCount, maxRetries, retryDelayMs, e);
+                    try {
+                        Thread.sleep(retryDelayMs);
+                        // Exponential backoff
+                        retryDelayMs *= 2;
+                    } catch (InterruptedException ie) {
+                        Thread.currentThread().interrupt();
+                        throw new CatalogException("Interrupted while retrying 
to close GlueCatalog client", ie);
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Lists all the databases available in the Glue catalog.
+     *
+     * @return a list of database names
+     * @throws CatalogException if an error occurs while listing the databases
+     */
+    @Override
+    public List<String> listDatabases() throws CatalogException {
+        return glueDatabaseOperations.listDatabases();
+    }
+
+    /**
+     * Retrieves a specific database by its name.
+     *
+     * @param databaseName the name of the database to retrieve
+     * @return the database if found
+     * @throws DatabaseNotExistException if the database does not exist
+     * @throws CatalogException          if an error occurs while retrieving 
the database
+     */
+    @Override
+    public CatalogDatabase getDatabase(String databaseName) throws 
DatabaseNotExistException, CatalogException {
+        boolean databaseExists = databaseExists(databaseName);
+        if (!databaseExists) {
+            throw new DatabaseNotExistException(getName(), databaseName);
+        }
+
+        return glueDatabaseOperations.getDatabase(databaseName);
+    }
+
+    /**
+     * Checks if a database exists in Glue.
+     *
+     * @param databaseName the name of the database
+     * @return true if the database exists, false otherwise
+     * @throws CatalogException if an error occurs while checking the database
+     */
+    @Override
+    public boolean databaseExists(String databaseName) throws CatalogException 
{
+        return glueDatabaseOperations.glueDatabaseExists(databaseName);
+    }
+
+    /**
+     * Creates a new database in Glue.
+     *
+     * @param databaseName    the name of the database to create
+     * @param catalogDatabase the catalog database object containing database 
metadata
+     * @param ifNotExists     flag indicating whether to ignore the error if 
the database already exists
+     * @throws DatabaseAlreadyExistException if the database already exists 
and ifNotExists is false
+     * @throws CatalogException              if an error occurs while creating 
the database
+     */
+    @Override
+    public void createDatabase(String databaseName, CatalogDatabase 
catalogDatabase, boolean ifNotExists)
+            throws DatabaseAlreadyExistException, CatalogException {
+        boolean exists = databaseExists(databaseName);
+
+        if (exists && !ifNotExists) {
+            throw new DatabaseAlreadyExistException(getName(), databaseName);
+        }
+
+        if (!exists) {
+            glueDatabaseOperations.createDatabase(databaseName, 
catalogDatabase);
+        }
+    }
+
+    /**
+     * Drops an existing database in Glue.
+     *
+     * @param databaseName      the name of the database to drop
+     * @param ignoreIfNotExists flag to ignore the exception if the database 
doesn't exist
+     * @param cascade           flag indicating whether to cascade the 
operation to drop related objects
+     * @throws DatabaseNotExistException if the database does not exist and 
ignoreIfNotExists is false
+     * @throws DatabaseNotEmptyException if the database contains objects and 
cascade is false
+     * @throws CatalogException          if an error occurs while dropping the 
database
+     */
+    @Override
+    public void dropDatabase(String databaseName, boolean ignoreIfNotExists, 
boolean cascade)
+            throws DatabaseNotExistException, DatabaseNotEmptyException, 
CatalogException {
+        if (databaseExists(databaseName)) {
+            glueDatabaseOperations.dropGlueDatabase(databaseName);
+        } else if (!ignoreIfNotExists) {
+            throw new DatabaseNotExistException(getName(), databaseName);
+        }
+    }
+
+    /**
+     * Lists all tables in a specified database.
+     *
+     * @param databaseName the name of the database
+     * @return a list of table names in the database
+     * @throws DatabaseNotExistException if the database does not exist
+     * @throws CatalogException          if an error occurs while listing the 
tables
+     */
+    @Override
+    public List<String> listTables(String databaseName) throws 
DatabaseNotExistException, CatalogException {
+        if (!databaseExists(databaseName)) {
+            throw new DatabaseNotExistException(getName(), databaseName);
+        }
+
+        return glueTableOperations.listTables(databaseName);
+    }
+
+    /**
+     * Retrieves a table from the catalog using its object path.
+     *
+     * @param objectPath the object path of the table to retrieve
+     * @return the corresponding CatalogBaseTable for the specified table
+     * @throws TableNotExistException if the table does not exist
+     * @throws CatalogException       if an error occurs while retrieving the 
table
+     */
+    @Override
+    public CatalogBaseTable getTable(ObjectPath objectPath) throws 
TableNotExistException, CatalogException {
+        String databaseName = objectPath.getDatabaseName();
+        String tableName = objectPath.getObjectName();
+
+        Table glueTable = glueTableOperations.getGlueTable(databaseName, 
tableName);
+        return getCatalogBaseTableFromGlueTable(glueTable);
+    }
+
+    /**
+     * Checks if a table exists in the Glue catalog.
+     *
+     * @param objectPath the object path of the table to check
+     * @return true if the table exists, false otherwise
+     * @throws CatalogException if an error occurs while checking the table's 
existence
+     */
+    @Override
+    public boolean tableExists(ObjectPath objectPath) throws CatalogException {
+        String databaseName = objectPath.getDatabaseName();
+        String tableName = objectPath.getObjectName();
+
+        // Delegate existence check to GlueTableOperations
+        return glueTableOperations.glueTableExists(databaseName, tableName);
+    }
+
+    /**
+     * Drops a table from the Glue catalog.
+     *
+     * @param objectPath the object path of the table to drop
+     * @param ifExists   flag indicating whether to ignore the exception if 
the table does not exist
+     * @throws TableNotExistException if the table does not exist and ifExists 
is false
+     * @throws CatalogException       if an error occurs while dropping the 
table
+     */
+    @Override
+    public void dropTable(ObjectPath objectPath, boolean ifExists) throws 
TableNotExistException, CatalogException {
+        String databaseName = objectPath.getDatabaseName();
+        String tableName = objectPath.getObjectName();
+
+        if (!glueTableOperations.glueTableExists(databaseName, tableName)) {
+            if (!ifExists) {
+                throw new TableNotExistException(getName(), objectPath);
+            }
+            return; // Table doesn't exist, and IF EXISTS is true
+        }
+
+        glueTableOperations.dropTable(databaseName, tableName);
+    }
+
+    /**
+     * Creates a table in the Glue catalog.
+     *
+     * @param objectPath       the object path of the table to create
+     * @param catalogBaseTable the table definition containing the schema and 
properties
+     * @param ifNotExists      flag indicating whether to ignore the exception 
if the table already exists
+     * @throws TableAlreadyExistException if the table already exists and 
ifNotExists is false

Review Comment:
   Javadoc should mention NPE here as null check is done on objectPath and 
catalogBaseTable



##########
flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java:
##########
@@ -0,0 +1,935 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue;
+
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogView;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import 
org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
+import org.apache.flink.table.catalog.glue.operator.GlueDatabaseOperator;
+import org.apache.flink.table.catalog.glue.operator.GlueFunctionOperator;
+import org.apache.flink.table.catalog.glue.operator.GlueTableOperator;
+import org.apache.flink.table.catalog.glue.util.GlueCatalogConstants;
+import org.apache.flink.table.catalog.glue.util.GlueTableUtils;
+import org.apache.flink.table.catalog.glue.util.GlueTypeConverter;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.functions.FunctionIdentifier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.StorageDescriptor;
+import software.amazon.awssdk.services.glue.model.Table;
+import software.amazon.awssdk.services.glue.model.TableInput;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * GlueCatalog is an implementation of the Flink AbstractCatalog that 
interacts with AWS Glue.
+ * This class allows Flink to perform various catalog operations such as 
creating, deleting, and retrieving
+ * databases and tables from Glue. It encapsulates AWS Glue's API and provides 
a Flink-compatible interface.
+ *
+ * <p>This catalog uses GlueClient to interact with AWS Glue services, and 
operations related to databases and
+ * tables are delegated to respective helper classes like 
GlueDatabaseOperations and GlueTableOperations.</p>
+ */
+public class GlueCatalog extends AbstractCatalog {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(GlueCatalog.class);
+
+    private final GlueClient glueClient;
+    private final GlueTypeConverter glueTypeConverter;
+    private final GlueDatabaseOperator glueDatabaseOperations;
+    private final GlueTableOperator glueTableOperations;
+    private final GlueFunctionOperator glueFunctionsOperations;
+    private final GlueTableUtils glueTableUtils;
+
+    /**
+     * Constructs a GlueCatalog with a provided Glue client.
+     *
+     * @param name            the name of the catalog
+     * @param defaultDatabase the default database for the catalog
+     * @param region          the AWS region to be used for Glue operations
+     * @param glueClient      Glue Client so we can decide which one to use 
for testing
+     */
+    public GlueCatalog(String name, String defaultDatabase, String region, 
GlueClient glueClient) {
+        super(name, defaultDatabase);
+
+        // Initialize GlueClient in the constructor
+        if (glueClient != null) {
+            this.glueClient = glueClient;
+        } else {
+            // If no GlueClient is provided, initialize it using the default 
region
+            this.glueClient = GlueClient.builder()
+                    .region(Region.of(region))
+                    .build();
+        }
+        this.glueTypeConverter = new GlueTypeConverter();
+        this.glueTableUtils = new GlueTableUtils(glueTypeConverter);
+        this.glueDatabaseOperations = new GlueDatabaseOperator(glueClient, 
getName());
+        this.glueTableOperations = new GlueTableOperator(glueClient, 
getName());
+        this.glueFunctionsOperations = new GlueFunctionOperator(glueClient, 
getName());
+    }
+
+    /**
+     * Constructs a GlueCatalog with default client.
+     *
+     * @param name            the name of the catalog
+     * @param defaultDatabase the default database for the catalog
+     * @param region          the AWS region to be used for Glue operations
+     */
+    public GlueCatalog(String name, String defaultDatabase, String region) {
+        super(name, defaultDatabase);
+
+        // Create a synchronized client builder to avoid concurrent 
modification exceptions
+        this.glueClient = GlueClient.builder()
+                .region(Region.of(region))
+                
.credentialsProvider(software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider.create())
+                .build();
+        this.glueTypeConverter = new GlueTypeConverter();
+        this.glueTableUtils = new GlueTableUtils(glueTypeConverter);
+        this.glueDatabaseOperations = new GlueDatabaseOperator(glueClient, 
getName());
+        this.glueTableOperations = new GlueTableOperator(glueClient, 
getName());
+        this.glueFunctionsOperations = new GlueFunctionOperator(glueClient, 
getName());
+    }
+
+    /**
+     * Opens the GlueCatalog and initializes necessary resources.
+     *
+     * @throws CatalogException if an error occurs during the opening process
+     */
+    @Override
+    public void open() throws CatalogException {
+        LOG.info("Opening GlueCatalog with client: {}", glueClient);
+    }
+
+    /**
+     * Closes the GlueCatalog and releases resources.
+     *
+     * @throws CatalogException if an error occurs during the closing process
+     */
+    @Override
+    public void close() throws CatalogException {
+        if (glueClient != null) {
+            LOG.info("Closing GlueCatalog client");
+            int maxRetries = 3;
+            int retryCount = 0;
+            long retryDelayMs = 200;
+            while (retryCount < maxRetries) {
+                try {
+                    glueClient.close();
+                    LOG.info("Successfully closed GlueCatalog client");
+                    return;
+                } catch (RuntimeException e) {
+                    retryCount++;
+                    if (retryCount >= maxRetries) {
+                        LOG.warn("Failed to close GlueCatalog client after {} 
retries", maxRetries, e);
+                        throw new CatalogException("Failed to close 
GlueCatalog client", e);
+                    }
+                    LOG.warn("Failed to close GlueCatalog client (attempt 
{}/{}), retrying in {} ms",
+                            retryCount, maxRetries, retryDelayMs, e);
+                    try {
+                        Thread.sleep(retryDelayMs);
+                        // Exponential backoff
+                        retryDelayMs *= 2;
+                    } catch (InterruptedException ie) {
+                        Thread.currentThread().interrupt();
+                        throw new CatalogException("Interrupted while retrying 
to close GlueCatalog client", ie);
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Lists all the databases available in the Glue catalog.
+     *
+     * @return a list of database names
+     * @throws CatalogException if an error occurs while listing the databases
+     */
+    @Override
+    public List<String> listDatabases() throws CatalogException {
+        return glueDatabaseOperations.listDatabases();
+    }
+
+    /**
+     * Retrieves a specific database by its name.
+     *
+     * @param databaseName the name of the database to retrieve
+     * @return the database if found
+     * @throws DatabaseNotExistException if the database does not exist
+     * @throws CatalogException          if an error occurs while retrieving 
the database
+     */
+    @Override
+    public CatalogDatabase getDatabase(String databaseName) throws 
DatabaseNotExistException, CatalogException {
+        boolean databaseExists = databaseExists(databaseName);
+        if (!databaseExists) {
+            throw new DatabaseNotExistException(getName(), databaseName);
+        }
+
+        return glueDatabaseOperations.getDatabase(databaseName);
+    }
+
+    /**
+     * Checks if a database exists in Glue.
+     *
+     * @param databaseName the name of the database
+     * @return true if the database exists, false otherwise
+     * @throws CatalogException if an error occurs while checking the database
+     */
+    @Override
+    public boolean databaseExists(String databaseName) throws CatalogException 
{
+        return glueDatabaseOperations.glueDatabaseExists(databaseName);
+    }
+
+    /**
+     * Creates a new database in Glue.
+     *
+     * @param databaseName    the name of the database to create
+     * @param catalogDatabase the catalog database object containing database 
metadata
+     * @param ifNotExists     flag indicating whether to ignore the error if 
the database already exists
+     * @throws DatabaseAlreadyExistException if the database already exists 
and ifNotExists is false
+     * @throws CatalogException              if an error occurs while creating 
the database
+     */
+    @Override
+    public void createDatabase(String databaseName, CatalogDatabase 
catalogDatabase, boolean ifNotExists)
+            throws DatabaseAlreadyExistException, CatalogException {
+        boolean exists = databaseExists(databaseName);
+
+        if (exists && !ifNotExists) {
+            throw new DatabaseAlreadyExistException(getName(), databaseName);
+        }
+
+        if (!exists) {
+            glueDatabaseOperations.createDatabase(databaseName, 
catalogDatabase);
+        }
+    }
+
+    /**
+     * Drops an existing database in Glue.
+     *
+     * @param databaseName      the name of the database to drop
+     * @param ignoreIfNotExists flag to ignore the exception if the database 
doesn't exist
+     * @param cascade           flag indicating whether to cascade the 
operation to drop related objects
+     * @throws DatabaseNotExistException if the database does not exist and 
ignoreIfNotExists is false
+     * @throws DatabaseNotEmptyException if the database contains objects and 
cascade is false
+     * @throws CatalogException          if an error occurs while dropping the 
database
+     */
+    @Override
+    public void dropDatabase(String databaseName, boolean ignoreIfNotExists, 
boolean cascade)
+            throws DatabaseNotExistException, DatabaseNotEmptyException, 
CatalogException {
+        if (databaseExists(databaseName)) {
+            glueDatabaseOperations.dropGlueDatabase(databaseName);
+        } else if (!ignoreIfNotExists) {
+            throw new DatabaseNotExistException(getName(), databaseName);
+        }
+    }
+
+    /**
+     * Lists all tables in a specified database.
+     *
+     * @param databaseName the name of the database
+     * @return a list of table names in the database
+     * @throws DatabaseNotExistException if the database does not exist
+     * @throws CatalogException          if an error occurs while listing the 
tables
+     */
+    @Override
+    public List<String> listTables(String databaseName) throws 
DatabaseNotExistException, CatalogException {
+        if (!databaseExists(databaseName)) {
+            throw new DatabaseNotExistException(getName(), databaseName);
+        }
+
+        return glueTableOperations.listTables(databaseName);
+    }
+
+    /**
+     * Retrieves a table from the catalog using its object path.
+     *
+     * @param objectPath the object path of the table to retrieve
+     * @return the corresponding CatalogBaseTable for the specified table
+     * @throws TableNotExistException if the table does not exist
+     * @throws CatalogException       if an error occurs while retrieving the 
table
+     */
+    @Override
+    public CatalogBaseTable getTable(ObjectPath objectPath) throws 
TableNotExistException, CatalogException {
+        String databaseName = objectPath.getDatabaseName();
+        String tableName = objectPath.getObjectName();
+
+        Table glueTable = glueTableOperations.getGlueTable(databaseName, 
tableName);
+        return getCatalogBaseTableFromGlueTable(glueTable);
+    }
+
+    /**
+     * Checks if a table exists in the Glue catalog.
+     *
+     * @param objectPath the object path of the table to check
+     * @return true if the table exists, false otherwise
+     * @throws CatalogException if an error occurs while checking the table's 
existence
+     */
+    @Override
+    public boolean tableExists(ObjectPath objectPath) throws CatalogException {
+        String databaseName = objectPath.getDatabaseName();
+        String tableName = objectPath.getObjectName();
+
+        // Delegate existence check to GlueTableOperations
+        return glueTableOperations.glueTableExists(databaseName, tableName);
+    }
+
+    /**
+     * Drops a table from the Glue catalog.
+     *
+     * @param objectPath the object path of the table to drop
+     * @param ifExists   flag indicating whether to ignore the exception if 
the table does not exist
+     * @throws TableNotExistException if the table does not exist and ifExists 
is false
+     * @throws CatalogException       if an error occurs while dropping the 
table
+     */
+    @Override
+    public void dropTable(ObjectPath objectPath, boolean ifExists) throws 
TableNotExistException, CatalogException {
+        String databaseName = objectPath.getDatabaseName();
+        String tableName = objectPath.getObjectName();
+
+        if (!glueTableOperations.glueTableExists(databaseName, tableName)) {
+            if (!ifExists) {
+                throw new TableNotExistException(getName(), objectPath);
+            }
+            return; // Table doesn't exist, and IF EXISTS is true
+        }
+
+        glueTableOperations.dropTable(databaseName, tableName);
+    }
+
+    /**
+     * Creates a table in the Glue catalog.
+     *
+     * @param objectPath       the object path of the table to create
+     * @param catalogBaseTable the table definition containing the schema and 
properties
+     * @param ifNotExists      flag indicating whether to ignore the exception 
if the table already exists
+     * @throws TableAlreadyExistException if the table already exists and 
ifNotExists is false
+     * @throws DatabaseNotExistException  if the database does not exist
+     * @throws CatalogException           if an error occurs while creating 
the table
+     */
+    @Override
+    public void createTable(ObjectPath objectPath, CatalogBaseTable 
catalogBaseTable, boolean ifNotExists)
+            throws TableAlreadyExistException, DatabaseNotExistException, 
CatalogException {
+
+        // Validate that required parameters are not null
+        if (objectPath == null) {
+            throw new NullPointerException("ObjectPath cannot be null");
+        }
+
+        if (catalogBaseTable == null) {
+            throw new NullPointerException("CatalogBaseTable cannot be null");
+        }
+
+        String databaseName = objectPath.getDatabaseName();
+        String tableName = objectPath.getObjectName();
+
+        // Check if the database exists
+        if (!databaseExists(databaseName)) {
+            throw new DatabaseNotExistException(getName(), databaseName);
+        }
+
+        // Check if the table already exists
+        if (glueTableOperations.glueTableExists(databaseName, tableName)) {
+            if (!ifNotExists) {
+                throw new TableAlreadyExistException(getName(), objectPath);
+            }
+            return; // Table exists, and IF NOT EXISTS is true
+        }
+
+        // Get common properties
+        Map<String, String> tableProperties = new 
HashMap<>(catalogBaseTable.getOptions());
+
+        try {
+            // Process based on table type
+            if (catalogBaseTable.getTableKind() == 
CatalogBaseTable.TableKind.TABLE) {
+                createRegularTable(objectPath, (CatalogTable) 
catalogBaseTable, tableProperties);
+            } else if (catalogBaseTable.getTableKind() == 
CatalogBaseTable.TableKind.VIEW) {
+                createView(objectPath, (CatalogView) catalogBaseTable, 
tableProperties);
+            } else {
+                throw new CatalogException("Unsupported table kind: " + 
catalogBaseTable.getTableKind());
+            }
+            LOG.info("Successfully created {}.{} of kind {}", databaseName, 
tableName, catalogBaseTable.getTableKind());
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format("Failed to create table %s.%s: %s", 
databaseName, tableName, e.getMessage()), e);
+        }
+    }
+
+    /**
+     * Lists all views in a specified database.
+     *
+     * @param databaseName the name of the database
+     * @return a list of view names in the database
+     * @throws DatabaseNotExistException if the database does not exist
+     * @throws CatalogException          if an error occurs while listing the 
views
+     */
+    @Override
+    public List<String> listViews(String databaseName) throws 
DatabaseNotExistException, CatalogException {
+
+        // Check if the database exists before listing views
+        if (!databaseExists(databaseName)) {
+            throw new DatabaseNotExistException(getName(), databaseName);
+        }
+
+        try {
+            // Get all tables in the database
+            List<Table> allTables = glueClient.getTables(builder -> 
builder.databaseName(databaseName))
+                    .tableList();
+
+            // Filter tables to only include those that are of type VIEW
+            List<String> viewNames = allTables.stream()
+                    .filter(table -> {
+                        String tableType = table.tableType();
+                        return tableType != null && 
tableType.equalsIgnoreCase(CatalogBaseTable.TableKind.VIEW.name());
+                    })
+                    .map(Table::name)
+                    .collect(Collectors.toList());
+
+            return viewNames;
+        } catch (Exception e) {
+            LOG.error("Failed to list views in database {}: {}", databaseName, 
e.getMessage());
+            throw new CatalogException(
+                    String.format("Error listing views in database %s: %s", 
databaseName, e.getMessage()), e);
+        }
+    }
+
+    @Override
+    public void alterDatabase(String s, CatalogDatabase catalogDatabase, 
boolean b) throws DatabaseNotExistException, CatalogException {
+        throw new UnsupportedOperationException("Altering databases is not 
supported by the Glue Catalog.");
+    }
+
+    @Override
+    public void renameTable(ObjectPath objectPath, String s, boolean b) throws 
TableNotExistException, TableAlreadyExistException, CatalogException {
+        throw new UnsupportedOperationException("Renaming tables is not 
supported by the Glue Catalog.");
+    }
+
+    @Override
+    public void alterTable(ObjectPath objectPath, CatalogBaseTable 
catalogBaseTable, boolean b) throws TableNotExistException, CatalogException {
+        throw new UnsupportedOperationException("Altering tables is not 
supported by the Glue Catalog.");
+    }
+
+    @Override
+    public List<CatalogPartitionSpec> listPartitions(ObjectPath objectPath) 
throws TableNotExistException, TableNotPartitionedException, CatalogException {
+        throw new UnsupportedOperationException("Table partitioning operations 
are not supported by the Glue Catalog.");
+    }
+
+    @Override
+    public List<CatalogPartitionSpec> listPartitions(ObjectPath objectPath, 
CatalogPartitionSpec catalogPartitionSpec) throws TableNotExistException, 
TableNotPartitionedException, PartitionSpecInvalidException, CatalogException {
+        throw new UnsupportedOperationException("Table partitioning operations 
are not supported by the Glue Catalog.");
+    }
+
+    @Override
+    public List<CatalogPartitionSpec> listPartitionsByFilter(ObjectPath 
objectPath, List<Expression> list) throws TableNotExistException, 
TableNotPartitionedException, CatalogException {
+        throw new UnsupportedOperationException("Table partitioning operations 
are not supported by the Glue Catalog.");
+    }
+
+    @Override
+    public CatalogPartition getPartition(ObjectPath objectPath, 
CatalogPartitionSpec catalogPartitionSpec) throws PartitionNotExistException, 
CatalogException {
+        throw new UnsupportedOperationException("Table partitioning operations 
are not supported by the Glue Catalog.");
+    }
+
+    @Override
+    public boolean partitionExists(ObjectPath objectPath, CatalogPartitionSpec 
catalogPartitionSpec) throws CatalogException {
+        throw new UnsupportedOperationException("Table partitioning operations 
are not supported by the Glue Catalog.");
+    }
+
+    @Override
+    public void createPartition(ObjectPath objectPath, CatalogPartitionSpec 
catalogPartitionSpec, CatalogPartition catalogPartition, boolean b) throws 
TableNotExistException, TableNotPartitionedException, 
PartitionSpecInvalidException, PartitionAlreadyExistsException, 
CatalogException {
+        throw new UnsupportedOperationException("Table partitioning operations 
are not supported by the Glue Catalog.");
+    }
+
+    @Override
+    public void dropPartition(ObjectPath objectPath, CatalogPartitionSpec 
catalogPartitionSpec, boolean b) throws PartitionNotExistException, 
CatalogException {
+        throw new UnsupportedOperationException("Table partitioning operations 
are not supported by the Glue Catalog.");
+    }
+
+    @Override
+    public void alterPartition(ObjectPath objectPath, CatalogPartitionSpec 
catalogPartitionSpec, CatalogPartition catalogPartition, boolean b) throws 
PartitionNotExistException, CatalogException {
+        throw new UnsupportedOperationException("Table partitioning operations 
are not supported by the Glue Catalog.");
+    }
+
+    /**
+     * Normalizes an object path according to catalog-specific normalization 
rules.
+     * For functions, this ensures consistent case handling in function names.
+     *
+     * @param path the object path to normalize
+     * @return the normalized object path
+     */
+    public ObjectPath normalize(ObjectPath path) {
+        if (path == null) {
+            throw new NullPointerException("ObjectPath cannot be null");
+        }
+
+        return new ObjectPath(
+                path.getDatabaseName(),
+                FunctionIdentifier.normalizeName(path.getObjectName()));
+    }
+
+    /**
+     * Lists all functions in a specified database.
+     *
+     * @param databaseName the name of the database
+     * @return a list of function names in the database
+     * @throws DatabaseNotExistException if the database does not exist
+     * @throws CatalogException          if an error occurs while listing the 
functions
+     */
+    @Override
+    public List<String> listFunctions(String databaseName) throws 
DatabaseNotExistException, CatalogException {
+        if (!databaseExists(databaseName)) {
+            throw new DatabaseNotExistException(getName(), databaseName);
+        }
+
+        try {
+            List<String> functions = 
glueFunctionsOperations.listGlueFunctions(databaseName);
+            return functions;
+        } catch (CatalogException e) {
+            LOG.error("Failed to list functions in database {}: {}", 
databaseName, e.getMessage());
+            throw new CatalogException(
+                    String.format("Error listing functions in database %s: 
%s", databaseName, e.getMessage()), e);
+        }
+    }
+
+    /**
+     * Retrieves a function from the catalog.
+     *
+     * @param functionPath the object path of the function to retrieve
+     * @return the corresponding CatalogFunction
+     * @throws FunctionNotExistException if the function does not exist
+     * @throws CatalogException          if an error occurs while retrieving 
the function
+     */
+    @Override
+    public CatalogFunction getFunction(ObjectPath functionPath) throws 
FunctionNotExistException, CatalogException {
+
+        // Normalize the path for case-insensitive handling
+        ObjectPath normalizedPath = normalize(functionPath);
+
+        if (!databaseExists(normalizedPath.getDatabaseName())) {
+            throw new CatalogException(getName());
+        }
+
+        try {
+            return glueFunctionsOperations.getGlueFunction(normalizedPath);
+        } catch (FunctionNotExistException e) {
+            throw e;

Review Comment:
   The catch and rethrow as-is is redundant. 
   
   Do you mean to add additional information here by wrapping with another 
exception? Otherwise, remove the catch entirely.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to