nicusX commented on code in PR #206:
URL: 
https://github.com/apache/flink-connector-aws/pull/206#discussion_r2109663034


##########
flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java:
##########
@@ -0,0 +1,935 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue;
+
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogView;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import 
org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
+import org.apache.flink.table.catalog.glue.operator.GlueDatabaseOperator;
+import org.apache.flink.table.catalog.glue.operator.GlueFunctionOperator;
+import org.apache.flink.table.catalog.glue.operator.GlueTableOperator;
+import org.apache.flink.table.catalog.glue.util.GlueCatalogConstants;
+import org.apache.flink.table.catalog.glue.util.GlueTableUtils;
+import org.apache.flink.table.catalog.glue.util.GlueTypeConverter;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.functions.FunctionIdentifier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.StorageDescriptor;
+import software.amazon.awssdk.services.glue.model.Table;
+import software.amazon.awssdk.services.glue.model.TableInput;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * GlueCatalog is an implementation of the Flink AbstractCatalog that 
interacts with AWS Glue.
+ * This class allows Flink to perform various catalog operations such as 
creating, deleting, and retrieving
+ * databases and tables from Glue. It encapsulates AWS Glue's API and provides 
a Flink-compatible interface.
+ *
+ * <p>This catalog uses GlueClient to interact with AWS Glue services, and 
operations related to databases and
+ * tables are delegated to respective helper classes like 
GlueDatabaseOperations and GlueTableOperations.</p>
+ */
+public class GlueCatalog extends AbstractCatalog {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(GlueCatalog.class);
+
+    private final GlueClient glueClient;
+    private final GlueTypeConverter glueTypeConverter;
+    private final GlueDatabaseOperator glueDatabaseOperations;
+    private final GlueTableOperator glueTableOperations;
+    private final GlueFunctionOperator glueFunctionsOperations;
+    private final GlueTableUtils glueTableUtils;
+
+    /**
+     * Constructs a GlueCatalog with a provided Glue client.
+     *
+     * @param name            the name of the catalog
+     * @param defaultDatabase the default database for the catalog
+     * @param region          the AWS region to be used for Glue operations
+     * @param glueClient      Glue Client so we can decide which one to use 
for testing
+     */
+    public GlueCatalog(String name, String defaultDatabase, String region, 
GlueClient glueClient) {
+        super(name, defaultDatabase);
+
+        // Initialize GlueClient in the constructor
+        if (glueClient != null) {

Review Comment:
   `region` should be validated with `Preconditions` (I see `name` and 
`defaultDatabase` are validated in the superclass)



##########
flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java:
##########
@@ -0,0 +1,935 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue;
+
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogView;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import 
org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
+import org.apache.flink.table.catalog.glue.operator.GlueDatabaseOperator;
+import org.apache.flink.table.catalog.glue.operator.GlueFunctionOperator;
+import org.apache.flink.table.catalog.glue.operator.GlueTableOperator;
+import org.apache.flink.table.catalog.glue.util.GlueCatalogConstants;
+import org.apache.flink.table.catalog.glue.util.GlueTableUtils;
+import org.apache.flink.table.catalog.glue.util.GlueTypeConverter;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.functions.FunctionIdentifier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.StorageDescriptor;
+import software.amazon.awssdk.services.glue.model.Table;
+import software.amazon.awssdk.services.glue.model.TableInput;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * GlueCatalog is an implementation of the Flink AbstractCatalog that 
interacts with AWS Glue.
+ * This class allows Flink to perform various catalog operations such as 
creating, deleting, and retrieving
+ * databases and tables from Glue. It encapsulates AWS Glue's API and provides 
a Flink-compatible interface.
+ *
+ * <p>This catalog uses GlueClient to interact with AWS Glue services, and 
operations related to databases and
+ * tables are delegated to respective helper classes like 
GlueDatabaseOperations and GlueTableOperations.</p>
+ */
+public class GlueCatalog extends AbstractCatalog {

Review Comment:
   General comment: I would make most public methods expecting database names, 
table names etc more robust, checking the argument is not null and not just 
whitespace. To make errors bit more user-frendly, rather than just throwing NPE.
   Copying what happens in other parts of Flink, for example.
   ```
   Preconditions.checkArgument(
                   !StringUtils.isNullOrWhitespaceOnly(databaseName),
                   "databaseName cannot be null or empty"); 
   ```                



##########
flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java:
##########
@@ -0,0 +1,935 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue;
+
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogView;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import 
org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
+import org.apache.flink.table.catalog.glue.operator.GlueDatabaseOperator;
+import org.apache.flink.table.catalog.glue.operator.GlueFunctionOperator;
+import org.apache.flink.table.catalog.glue.operator.GlueTableOperator;
+import org.apache.flink.table.catalog.glue.util.GlueCatalogConstants;
+import org.apache.flink.table.catalog.glue.util.GlueTableUtils;
+import org.apache.flink.table.catalog.glue.util.GlueTypeConverter;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.functions.FunctionIdentifier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.StorageDescriptor;
+import software.amazon.awssdk.services.glue.model.Table;
+import software.amazon.awssdk.services.glue.model.TableInput;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * GlueCatalog is an implementation of the Flink AbstractCatalog that 
interacts with AWS Glue.
+ * This class allows Flink to perform various catalog operations such as 
creating, deleting, and retrieving
+ * databases and tables from Glue. It encapsulates AWS Glue's API and provides 
a Flink-compatible interface.
+ *
+ * <p>This catalog uses GlueClient to interact with AWS Glue services, and 
operations related to databases and
+ * tables are delegated to respective helper classes like 
GlueDatabaseOperations and GlueTableOperations.</p>
+ */
+public class GlueCatalog extends AbstractCatalog {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(GlueCatalog.class);
+
+    private final GlueClient glueClient;
+    private final GlueTypeConverter glueTypeConverter;
+    private final GlueDatabaseOperator glueDatabaseOperations;
+    private final GlueTableOperator glueTableOperations;
+    private final GlueFunctionOperator glueFunctionsOperations;
+    private final GlueTableUtils glueTableUtils;
+
+    /**
+     * Constructs a GlueCatalog with a provided Glue client.
+     *
+     * @param name            the name of the catalog
+     * @param defaultDatabase the default database for the catalog
+     * @param region          the AWS region to be used for Glue operations
+     * @param glueClient      Glue Client so we can decide which one to use 
for testing
+     */
+    public GlueCatalog(String name, String defaultDatabase, String region, 
GlueClient glueClient) {
+        super(name, defaultDatabase);
+
+        // Initialize GlueClient in the constructor
+        if (glueClient != null) {
+            this.glueClient = glueClient;
+        } else {
+            // If no GlueClient is provided, initialize it using the default 
region
+            this.glueClient = GlueClient.builder()
+                    .region(Region.of(region))
+                    .build();
+        }
+        this.glueTypeConverter = new GlueTypeConverter();
+        this.glueTableUtils = new GlueTableUtils(glueTypeConverter);
+        this.glueDatabaseOperations = new GlueDatabaseOperator(glueClient, 
getName());
+        this.glueTableOperations = new GlueTableOperator(glueClient, 
getName());
+        this.glueFunctionsOperations = new GlueFunctionOperator(glueClient, 
getName());
+    }
+
+    /**
+     * Constructs a GlueCatalog with default client.
+     *
+     * @param name            the name of the catalog
+     * @param defaultDatabase the default database for the catalog
+     * @param region          the AWS region to be used for Glue operations
+     */
+    public GlueCatalog(String name, String defaultDatabase, String region) {
+        super(name, defaultDatabase);
+
+        // Create a synchronized client builder to avoid concurrent 
modification exceptions
+        this.glueClient = GlueClient.builder()
+                .region(Region.of(region))
+                
.credentialsProvider(software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider.create())
+                .build();
+        this.glueTypeConverter = new GlueTypeConverter();
+        this.glueTableUtils = new GlueTableUtils(glueTypeConverter);
+        this.glueDatabaseOperations = new GlueDatabaseOperator(glueClient, 
getName());
+        this.glueTableOperations = new GlueTableOperator(glueClient, 
getName());
+        this.glueFunctionsOperations = new GlueFunctionOperator(glueClient, 
getName());
+    }
+
+    /**
+     * Opens the GlueCatalog and initializes necessary resources.
+     *
+     * @throws CatalogException if an error occurs during the opening process
+     */
+    @Override
+    public void open() throws CatalogException {
+        LOG.info("Opening GlueCatalog with client: {}", glueClient);
+    }
+
+    /**
+     * Closes the GlueCatalog and releases resources.
+     *
+     * @throws CatalogException if an error occurs during the closing process
+     */
+    @Override
+    public void close() throws CatalogException {
+        if (glueClient != null) {
+            LOG.info("Closing GlueCatalog client");
+            int maxRetries = 3;
+            int retryCount = 0;
+            long retryDelayMs = 200;
+            while (retryCount < maxRetries) {
+                try {
+                    glueClient.close();
+                    LOG.info("Successfully closed GlueCatalog client");
+                    return;
+                } catch (RuntimeException e) {
+                    retryCount++;
+                    if (retryCount >= maxRetries) {
+                        LOG.warn("Failed to close GlueCatalog client after {} 
retries", maxRetries, e);
+                        throw new CatalogException("Failed to close 
GlueCatalog client", e);
+                    }
+                    LOG.warn("Failed to close GlueCatalog client (attempt 
{}/{}), retrying in {} ms",
+                            retryCount, maxRetries, retryDelayMs, e);
+                    try {
+                        Thread.sleep(retryDelayMs);
+                        // Exponential backoff
+                        retryDelayMs *= 2;
+                    } catch (InterruptedException ie) {
+                        Thread.currentThread().interrupt();
+                        throw new CatalogException("Interrupted while retrying 
to close GlueCatalog client", ie);
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Lists all the databases available in the Glue catalog.
+     *
+     * @return a list of database names
+     * @throws CatalogException if an error occurs while listing the databases
+     */
+    @Override
+    public List<String> listDatabases() throws CatalogException {
+        return glueDatabaseOperations.listDatabases();
+    }
+
+    /**
+     * Retrieves a specific database by its name.
+     *
+     * @param databaseName the name of the database to retrieve
+     * @return the database if found
+     * @throws DatabaseNotExistException if the database does not exist
+     * @throws CatalogException          if an error occurs while retrieving 
the database
+     */
+    @Override
+    public CatalogDatabase getDatabase(String databaseName) throws 
DatabaseNotExistException, CatalogException {
+        boolean databaseExists = databaseExists(databaseName);
+        if (!databaseExists) {
+            throw new DatabaseNotExistException(getName(), databaseName);
+        }
+
+        return glueDatabaseOperations.getDatabase(databaseName);
+    }
+
+    /**
+     * Checks if a database exists in Glue.
+     *
+     * @param databaseName the name of the database
+     * @return true if the database exists, false otherwise
+     * @throws CatalogException if an error occurs while checking the database
+     */
+    @Override
+    public boolean databaseExists(String databaseName) throws CatalogException 
{
+        return glueDatabaseOperations.glueDatabaseExists(databaseName);
+    }
+
+    /**
+     * Creates a new database in Glue.
+     *
+     * @param databaseName    the name of the database to create
+     * @param catalogDatabase the catalog database object containing database 
metadata
+     * @param ifNotExists     flag indicating whether to ignore the error if 
the database already exists
+     * @throws DatabaseAlreadyExistException if the database already exists 
and ifNotExists is false
+     * @throws CatalogException              if an error occurs while creating 
the database
+     */
+    @Override
+    public void createDatabase(String databaseName, CatalogDatabase 
catalogDatabase, boolean ifNotExists)
+            throws DatabaseAlreadyExistException, CatalogException {
+        boolean exists = databaseExists(databaseName);
+
+        if (exists && !ifNotExists) {
+            throw new DatabaseAlreadyExistException(getName(), databaseName);
+        }
+
+        if (!exists) {
+            glueDatabaseOperations.createDatabase(databaseName, 
catalogDatabase);
+        }
+    }
+
+    /**
+     * Drops an existing database in Glue.
+     *
+     * @param databaseName      the name of the database to drop
+     * @param ignoreIfNotExists flag to ignore the exception if the database 
doesn't exist
+     * @param cascade           flag indicating whether to cascade the 
operation to drop related objects
+     * @throws DatabaseNotExistException if the database does not exist and 
ignoreIfNotExists is false
+     * @throws DatabaseNotEmptyException if the database contains objects and 
cascade is false
+     * @throws CatalogException          if an error occurs while dropping the 
database
+     */
+    @Override
+    public void dropDatabase(String databaseName, boolean ignoreIfNotExists, 
boolean cascade)
+            throws DatabaseNotExistException, DatabaseNotEmptyException, 
CatalogException {
+        if (databaseExists(databaseName)) {
+            glueDatabaseOperations.dropGlueDatabase(databaseName);
+        } else if (!ignoreIfNotExists) {
+            throw new DatabaseNotExistException(getName(), databaseName);
+        }
+    }
+
+    /**
+     * Lists all tables in a specified database.
+     *
+     * @param databaseName the name of the database
+     * @return a list of table names in the database
+     * @throws DatabaseNotExistException if the database does not exist
+     * @throws CatalogException          if an error occurs while listing the 
tables
+     */
+    @Override
+    public List<String> listTables(String databaseName) throws 
DatabaseNotExistException, CatalogException {
+        if (!databaseExists(databaseName)) {
+            throw new DatabaseNotExistException(getName(), databaseName);
+        }
+
+        return glueTableOperations.listTables(databaseName);
+    }
+
+    /**
+     * Retrieves a table from the catalog using its object path.
+     *
+     * @param objectPath the object path of the table to retrieve
+     * @return the corresponding CatalogBaseTable for the specified table
+     * @throws TableNotExistException if the table does not exist
+     * @throws CatalogException       if an error occurs while retrieving the 
table
+     */
+    @Override
+    public CatalogBaseTable getTable(ObjectPath objectPath) throws 
TableNotExistException, CatalogException {
+        String databaseName = objectPath.getDatabaseName();
+        String tableName = objectPath.getObjectName();
+
+        Table glueTable = glueTableOperations.getGlueTable(databaseName, 
tableName);
+        return getCatalogBaseTableFromGlueTable(glueTable);
+    }
+
+    /**
+     * Checks if a table exists in the Glue catalog.
+     *
+     * @param objectPath the object path of the table to check
+     * @return true if the table exists, false otherwise
+     * @throws CatalogException if an error occurs while checking the table's 
existence
+     */
+    @Override
+    public boolean tableExists(ObjectPath objectPath) throws CatalogException {
+        String databaseName = objectPath.getDatabaseName();
+        String tableName = objectPath.getObjectName();
+
+        // Delegate existence check to GlueTableOperations
+        return glueTableOperations.glueTableExists(databaseName, tableName);
+    }
+
+    /**
+     * Drops a table from the Glue catalog.
+     *
+     * @param objectPath the object path of the table to drop
+     * @param ifExists   flag indicating whether to ignore the exception if 
the table does not exist
+     * @throws TableNotExistException if the table does not exist and ifExists 
is false
+     * @throws CatalogException       if an error occurs while dropping the 
table
+     */
+    @Override
+    public void dropTable(ObjectPath objectPath, boolean ifExists) throws 
TableNotExistException, CatalogException {
+        String databaseName = objectPath.getDatabaseName();
+        String tableName = objectPath.getObjectName();
+
+        if (!glueTableOperations.glueTableExists(databaseName, tableName)) {
+            if (!ifExists) {
+                throw new TableNotExistException(getName(), objectPath);
+            }
+            return; // Table doesn't exist, and IF EXISTS is true
+        }
+
+        glueTableOperations.dropTable(databaseName, tableName);
+    }
+
+    /**
+     * Creates a table in the Glue catalog.
+     *
+     * @param objectPath       the object path of the table to create
+     * @param catalogBaseTable the table definition containing the schema and 
properties
+     * @param ifNotExists      flag indicating whether to ignore the exception 
if the table already exists
+     * @throws TableAlreadyExistException if the table already exists and 
ifNotExists is false
+     * @throws DatabaseNotExistException  if the database does not exist
+     * @throws CatalogException           if an error occurs while creating 
the table
+     */
+    @Override
+    public void createTable(ObjectPath objectPath, CatalogBaseTable 
catalogBaseTable, boolean ifNotExists)
+            throws TableAlreadyExistException, DatabaseNotExistException, 
CatalogException {
+
+        // Validate that required parameters are not null
+        if (objectPath == null) {

Review Comment:
   You could use `Preconditions.checkNotNull(ref, message )` to be more 
Flink-proper :) 



##########
flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java:
##########
@@ -0,0 +1,935 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue;
+
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogView;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import 
org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
+import org.apache.flink.table.catalog.glue.operator.GlueDatabaseOperator;
+import org.apache.flink.table.catalog.glue.operator.GlueFunctionOperator;
+import org.apache.flink.table.catalog.glue.operator.GlueTableOperator;
+import org.apache.flink.table.catalog.glue.util.GlueCatalogConstants;
+import org.apache.flink.table.catalog.glue.util.GlueTableUtils;
+import org.apache.flink.table.catalog.glue.util.GlueTypeConverter;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.functions.FunctionIdentifier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.StorageDescriptor;
+import software.amazon.awssdk.services.glue.model.Table;
+import software.amazon.awssdk.services.glue.model.TableInput;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * GlueCatalog is an implementation of the Flink AbstractCatalog that 
interacts with AWS Glue.
+ * This class allows Flink to perform various catalog operations such as 
creating, deleting, and retrieving
+ * databases and tables from Glue. It encapsulates AWS Glue's API and provides 
a Flink-compatible interface.
+ *
+ * <p>This catalog uses GlueClient to interact with AWS Glue services, and 
operations related to databases and
+ * tables are delegated to respective helper classes like 
GlueDatabaseOperations and GlueTableOperations.</p>
+ */
+public class GlueCatalog extends AbstractCatalog {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(GlueCatalog.class);
+
+    private final GlueClient glueClient;
+    private final GlueTypeConverter glueTypeConverter;
+    private final GlueDatabaseOperator glueDatabaseOperations;
+    private final GlueTableOperator glueTableOperations;
+    private final GlueFunctionOperator glueFunctionsOperations;
+    private final GlueTableUtils glueTableUtils;
+
+    /**
+     * Constructs a GlueCatalog with a provided Glue client.
+     *
+     * @param name            the name of the catalog
+     * @param defaultDatabase the default database for the catalog
+     * @param region          the AWS region to be used for Glue operations
+     * @param glueClient      Glue Client so we can decide which one to use 
for testing
+     */
+    public GlueCatalog(String name, String defaultDatabase, String region, 
GlueClient glueClient) {
+        super(name, defaultDatabase);
+
+        // Initialize GlueClient in the constructor
+        if (glueClient != null) {
+            this.glueClient = glueClient;
+        } else {
+            // If no GlueClient is provided, initialize it using the default 
region
+            this.glueClient = GlueClient.builder()
+                    .region(Region.of(region))
+                    .build();
+        }
+        this.glueTypeConverter = new GlueTypeConverter();
+        this.glueTableUtils = new GlueTableUtils(glueTypeConverter);
+        this.glueDatabaseOperations = new GlueDatabaseOperator(glueClient, 
getName());
+        this.glueTableOperations = new GlueTableOperator(glueClient, 
getName());
+        this.glueFunctionsOperations = new GlueFunctionOperator(glueClient, 
getName());
+    }
+
+    /**
+     * Constructs a GlueCatalog with default client.
+     *
+     * @param name            the name of the catalog
+     * @param defaultDatabase the default database for the catalog
+     * @param region          the AWS region to be used for Glue operations
+     */
+    public GlueCatalog(String name, String defaultDatabase, String region) {
+        super(name, defaultDatabase);
+
+        // Create a synchronized client builder to avoid concurrent 
modification exceptions
+        this.glueClient = GlueClient.builder()
+                .region(Region.of(region))
+                
.credentialsProvider(software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider.create())
+                .build();
+        this.glueTypeConverter = new GlueTypeConverter();
+        this.glueTableUtils = new GlueTableUtils(glueTypeConverter);
+        this.glueDatabaseOperations = new GlueDatabaseOperator(glueClient, 
getName());
+        this.glueTableOperations = new GlueTableOperator(glueClient, 
getName());
+        this.glueFunctionsOperations = new GlueFunctionOperator(glueClient, 
getName());
+    }
+
+    /**
+     * Opens the GlueCatalog and initializes necessary resources.
+     *
+     * @throws CatalogException if an error occurs during the opening process
+     */
+    @Override
+    public void open() throws CatalogException {
+        LOG.info("Opening GlueCatalog with client: {}", glueClient);
+    }
+
+    /**
+     * Closes the GlueCatalog and releases resources.
+     *
+     * @throws CatalogException if an error occurs during the closing process
+     */
+    @Override
+    public void close() throws CatalogException {
+        if (glueClient != null) {
+            LOG.info("Closing GlueCatalog client");
+            int maxRetries = 3;
+            int retryCount = 0;
+            long retryDelayMs = 200;
+            while (retryCount < maxRetries) {
+                try {
+                    glueClient.close();
+                    LOG.info("Successfully closed GlueCatalog client");
+                    return;
+                } catch (RuntimeException e) {
+                    retryCount++;
+                    if (retryCount >= maxRetries) {
+                        LOG.warn("Failed to close GlueCatalog client after {} 
retries", maxRetries, e);
+                        throw new CatalogException("Failed to close 
GlueCatalog client", e);
+                    }
+                    LOG.warn("Failed to close GlueCatalog client (attempt 
{}/{}), retrying in {} ms",
+                            retryCount, maxRetries, retryDelayMs, e);
+                    try {
+                        Thread.sleep(retryDelayMs);
+                        // Exponential backoff
+                        retryDelayMs *= 2;
+                    } catch (InterruptedException ie) {
+                        Thread.currentThread().interrupt();
+                        throw new CatalogException("Interrupted while retrying 
to close GlueCatalog client", ie);
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Lists all the databases available in the Glue catalog.
+     *
+     * @return a list of database names
+     * @throws CatalogException if an error occurs while listing the databases
+     */
+    @Override
+    public List<String> listDatabases() throws CatalogException {
+        return glueDatabaseOperations.listDatabases();
+    }
+
+    /**
+     * Retrieves a specific database by its name.
+     *
+     * @param databaseName the name of the database to retrieve
+     * @return the database if found
+     * @throws DatabaseNotExistException if the database does not exist
+     * @throws CatalogException          if an error occurs while retrieving 
the database
+     */
+    @Override
+    public CatalogDatabase getDatabase(String databaseName) throws 
DatabaseNotExistException, CatalogException {
+        boolean databaseExists = databaseExists(databaseName);
+        if (!databaseExists) {
+            throw new DatabaseNotExistException(getName(), databaseName);
+        }
+
+        return glueDatabaseOperations.getDatabase(databaseName);
+    }
+
+    /**
+     * Checks if a database exists in Glue.
+     *
+     * @param databaseName the name of the database
+     * @return true if the database exists, false otherwise
+     * @throws CatalogException if an error occurs while checking the database
+     */
+    @Override
+    public boolean databaseExists(String databaseName) throws CatalogException 
{
+        return glueDatabaseOperations.glueDatabaseExists(databaseName);
+    }
+
+    /**
+     * Creates a new database in Glue.
+     *
+     * @param databaseName    the name of the database to create
+     * @param catalogDatabase the catalog database object containing database 
metadata
+     * @param ifNotExists     flag indicating whether to ignore the error if 
the database already exists
+     * @throws DatabaseAlreadyExistException if the database already exists 
and ifNotExists is false
+     * @throws CatalogException              if an error occurs while creating 
the database
+     */
+    @Override
+    public void createDatabase(String databaseName, CatalogDatabase 
catalogDatabase, boolean ifNotExists)
+            throws DatabaseAlreadyExistException, CatalogException {
+        boolean exists = databaseExists(databaseName);
+
+        if (exists && !ifNotExists) {
+            throw new DatabaseAlreadyExistException(getName(), databaseName);
+        }
+
+        if (!exists) {
+            glueDatabaseOperations.createDatabase(databaseName, 
catalogDatabase);
+        }
+    }
+
+    /**
+     * Drops an existing database in Glue.
+     *
+     * @param databaseName      the name of the database to drop
+     * @param ignoreIfNotExists flag to ignore the exception if the database 
doesn't exist
+     * @param cascade           flag indicating whether to cascade the 
operation to drop related objects
+     * @throws DatabaseNotExistException if the database does not exist and 
ignoreIfNotExists is false
+     * @throws DatabaseNotEmptyException if the database contains objects and 
cascade is false
+     * @throws CatalogException          if an error occurs while dropping the 
database
+     */
+    @Override
+    public void dropDatabase(String databaseName, boolean ignoreIfNotExists, 
boolean cascade)
+            throws DatabaseNotExistException, DatabaseNotEmptyException, 
CatalogException {
+        if (databaseExists(databaseName)) {
+            glueDatabaseOperations.dropGlueDatabase(databaseName);
+        } else if (!ignoreIfNotExists) {
+            throw new DatabaseNotExistException(getName(), databaseName);
+        }
+    }
+
+    /**
+     * Lists all tables in a specified database.
+     *
+     * @param databaseName the name of the database
+     * @return a list of table names in the database
+     * @throws DatabaseNotExistException if the database does not exist
+     * @throws CatalogException          if an error occurs while listing the 
tables
+     */
+    @Override
+    public List<String> listTables(String databaseName) throws 
DatabaseNotExistException, CatalogException {
+        if (!databaseExists(databaseName)) {
+            throw new DatabaseNotExistException(getName(), databaseName);
+        }
+
+        return glueTableOperations.listTables(databaseName);
+    }
+
+    /**
+     * Retrieves a table from the catalog using its object path.
+     *
+     * @param objectPath the object path of the table to retrieve
+     * @return the corresponding CatalogBaseTable for the specified table
+     * @throws TableNotExistException if the table does not exist
+     * @throws CatalogException       if an error occurs while retrieving the 
table
+     */
+    @Override
+    public CatalogBaseTable getTable(ObjectPath objectPath) throws 
TableNotExistException, CatalogException {
+        String databaseName = objectPath.getDatabaseName();
+        String tableName = objectPath.getObjectName();
+
+        Table glueTable = glueTableOperations.getGlueTable(databaseName, 
tableName);
+        return getCatalogBaseTableFromGlueTable(glueTable);
+    }
+
+    /**
+     * Checks if a table exists in the Glue catalog.
+     *
+     * @param objectPath the object path of the table to check
+     * @return true if the table exists, false otherwise
+     * @throws CatalogException if an error occurs while checking the table's 
existence
+     */
+    @Override
+    public boolean tableExists(ObjectPath objectPath) throws CatalogException {
+        String databaseName = objectPath.getDatabaseName();
+        String tableName = objectPath.getObjectName();
+
+        // Delegate existence check to GlueTableOperations
+        return glueTableOperations.glueTableExists(databaseName, tableName);
+    }
+
+    /**
+     * Drops a table from the Glue catalog.
+     *
+     * @param objectPath the object path of the table to drop
+     * @param ifExists   flag indicating whether to ignore the exception if 
the table does not exist
+     * @throws TableNotExistException if the table does not exist and ifExists 
is false
+     * @throws CatalogException       if an error occurs while dropping the 
table
+     */
+    @Override
+    public void dropTable(ObjectPath objectPath, boolean ifExists) throws 
TableNotExistException, CatalogException {
+        String databaseName = objectPath.getDatabaseName();
+        String tableName = objectPath.getObjectName();
+
+        if (!glueTableOperations.glueTableExists(databaseName, tableName)) {
+            if (!ifExists) {
+                throw new TableNotExistException(getName(), objectPath);
+            }
+            return; // Table doesn't exist, and IF EXISTS is true
+        }
+
+        glueTableOperations.dropTable(databaseName, tableName);
+    }
+
+    /**
+     * Creates a table in the Glue catalog.
+     *
+     * @param objectPath       the object path of the table to create
+     * @param catalogBaseTable the table definition containing the schema and 
properties
+     * @param ifNotExists      flag indicating whether to ignore the exception 
if the table already exists
+     * @throws TableAlreadyExistException if the table already exists and 
ifNotExists is false
+     * @throws DatabaseNotExistException  if the database does not exist
+     * @throws CatalogException           if an error occurs while creating 
the table
+     */
+    @Override
+    public void createTable(ObjectPath objectPath, CatalogBaseTable 
catalogBaseTable, boolean ifNotExists)
+            throws TableAlreadyExistException, DatabaseNotExistException, 
CatalogException {
+
+        // Validate that required parameters are not null
+        if (objectPath == null) {
+            throw new NullPointerException("ObjectPath cannot be null");
+        }
+
+        if (catalogBaseTable == null) {
+            throw new NullPointerException("CatalogBaseTable cannot be null");
+        }
+
+        String databaseName = objectPath.getDatabaseName();
+        String tableName = objectPath.getObjectName();
+
+        // Check if the database exists
+        if (!databaseExists(databaseName)) {
+            throw new DatabaseNotExistException(getName(), databaseName);
+        }
+
+        // Check if the table already exists
+        if (glueTableOperations.glueTableExists(databaseName, tableName)) {
+            if (!ifNotExists) {
+                throw new TableAlreadyExistException(getName(), objectPath);
+            }
+            return; // Table exists, and IF NOT EXISTS is true
+        }
+
+        // Get common properties
+        Map<String, String> tableProperties = new 
HashMap<>(catalogBaseTable.getOptions());
+
+        try {
+            // Process based on table type
+            if (catalogBaseTable.getTableKind() == 
CatalogBaseTable.TableKind.TABLE) {
+                createRegularTable(objectPath, (CatalogTable) 
catalogBaseTable, tableProperties);
+            } else if (catalogBaseTable.getTableKind() == 
CatalogBaseTable.TableKind.VIEW) {
+                createView(objectPath, (CatalogView) catalogBaseTable, 
tableProperties);
+            } else {
+                throw new CatalogException("Unsupported table kind: " + 
catalogBaseTable.getTableKind());
+            }
+            LOG.info("Successfully created {}.{} of kind {}", databaseName, 
tableName, catalogBaseTable.getTableKind());
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format("Failed to create table %s.%s: %s", 
databaseName, tableName, e.getMessage()), e);
+        }
+    }
+
+    /**
+     * Lists all views in a specified database.
+     *
+     * @param databaseName the name of the database
+     * @return a list of view names in the database
+     * @throws DatabaseNotExistException if the database does not exist
+     * @throws CatalogException          if an error occurs while listing the 
views
+     */
+    @Override
+    public List<String> listViews(String databaseName) throws 
DatabaseNotExistException, CatalogException {
+
+        // Check if the database exists before listing views
+        if (!databaseExists(databaseName)) {
+            throw new DatabaseNotExistException(getName(), databaseName);
+        }
+
+        try {
+            // Get all tables in the database
+            List<Table> allTables = glueClient.getTables(builder -> 
builder.databaseName(databaseName))
+                    .tableList();
+
+            // Filter tables to only include those that are of type VIEW
+            List<String> viewNames = allTables.stream()
+                    .filter(table -> {
+                        String tableType = table.tableType();
+                        return tableType != null && 
tableType.equalsIgnoreCase(CatalogBaseTable.TableKind.VIEW.name());
+                    })
+                    .map(Table::name)
+                    .collect(Collectors.toList());
+
+            return viewNames;
+        } catch (Exception e) {
+            LOG.error("Failed to list views in database {}: {}", databaseName, 
e.getMessage());
+            throw new CatalogException(
+                    String.format("Error listing views in database %s: %s", 
databaseName, e.getMessage()), e);
+        }
+    }
+
+    @Override
+    public void alterDatabase(String s, CatalogDatabase catalogDatabase, 
boolean b) throws DatabaseNotExistException, CatalogException {
+        throw new UnsupportedOperationException("Altering databases is not 
supported by the Glue Catalog.");
+    }
+
+    @Override
+    public void renameTable(ObjectPath objectPath, String s, boolean b) throws 
TableNotExistException, TableAlreadyExistException, CatalogException {
+        throw new UnsupportedOperationException("Renaming tables is not 
supported by the Glue Catalog.");
+    }
+
+    @Override
+    public void alterTable(ObjectPath objectPath, CatalogBaseTable 
catalogBaseTable, boolean b) throws TableNotExistException, CatalogException {
+        throw new UnsupportedOperationException("Altering tables is not 
supported by the Glue Catalog.");
+    }
+
+    @Override
+    public List<CatalogPartitionSpec> listPartitions(ObjectPath objectPath) 
throws TableNotExistException, TableNotPartitionedException, CatalogException {
+        throw new UnsupportedOperationException("Table partitioning operations 
are not supported by the Glue Catalog.");
+    }
+
+    @Override
+    public List<CatalogPartitionSpec> listPartitions(ObjectPath objectPath, 
CatalogPartitionSpec catalogPartitionSpec) throws TableNotExistException, 
TableNotPartitionedException, PartitionSpecInvalidException, CatalogException {
+        throw new UnsupportedOperationException("Table partitioning operations 
are not supported by the Glue Catalog.");
+    }
+
+    @Override
+    public List<CatalogPartitionSpec> listPartitionsByFilter(ObjectPath 
objectPath, List<Expression> list) throws TableNotExistException, 
TableNotPartitionedException, CatalogException {
+        throw new UnsupportedOperationException("Table partitioning operations 
are not supported by the Glue Catalog.");
+    }
+
+    @Override
+    public CatalogPartition getPartition(ObjectPath objectPath, 
CatalogPartitionSpec catalogPartitionSpec) throws PartitionNotExistException, 
CatalogException {
+        throw new UnsupportedOperationException("Table partitioning operations 
are not supported by the Glue Catalog.");
+    }
+
+    @Override
+    public boolean partitionExists(ObjectPath objectPath, CatalogPartitionSpec 
catalogPartitionSpec) throws CatalogException {
+        throw new UnsupportedOperationException("Table partitioning operations 
are not supported by the Glue Catalog.");
+    }
+
+    @Override
+    public void createPartition(ObjectPath objectPath, CatalogPartitionSpec 
catalogPartitionSpec, CatalogPartition catalogPartition, boolean b) throws 
TableNotExistException, TableNotPartitionedException, 
PartitionSpecInvalidException, PartitionAlreadyExistsException, 
CatalogException {
+        throw new UnsupportedOperationException("Table partitioning operations 
are not supported by the Glue Catalog.");
+    }
+
+    @Override
+    public void dropPartition(ObjectPath objectPath, CatalogPartitionSpec 
catalogPartitionSpec, boolean b) throws PartitionNotExistException, 
CatalogException {
+        throw new UnsupportedOperationException("Table partitioning operations 
are not supported by the Glue Catalog.");
+    }
+
+    @Override
+    public void alterPartition(ObjectPath objectPath, CatalogPartitionSpec 
catalogPartitionSpec, CatalogPartition catalogPartition, boolean b) throws 
PartitionNotExistException, CatalogException {
+        throw new UnsupportedOperationException("Table partitioning operations 
are not supported by the Glue Catalog.");
+    }
+
+    /**
+     * Normalizes an object path according to catalog-specific normalization 
rules.
+     * For functions, this ensures consistent case handling in function names.
+     *
+     * @param path the object path to normalize
+     * @return the normalized object path
+     */
+    public ObjectPath normalize(ObjectPath path) {

Review Comment:
   Is this useful for a user.
   Seems to me more an internal method.
   If so, better making it private



##########
flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java:
##########
@@ -0,0 +1,935 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue;
+
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogView;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import 
org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
+import org.apache.flink.table.catalog.glue.operator.GlueDatabaseOperator;
+import org.apache.flink.table.catalog.glue.operator.GlueFunctionOperator;
+import org.apache.flink.table.catalog.glue.operator.GlueTableOperator;
+import org.apache.flink.table.catalog.glue.util.GlueCatalogConstants;
+import org.apache.flink.table.catalog.glue.util.GlueTableUtils;
+import org.apache.flink.table.catalog.glue.util.GlueTypeConverter;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.functions.FunctionIdentifier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.StorageDescriptor;
+import software.amazon.awssdk.services.glue.model.Table;
+import software.amazon.awssdk.services.glue.model.TableInput;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * GlueCatalog is an implementation of the Flink AbstractCatalog that 
interacts with AWS Glue.
+ * This class allows Flink to perform various catalog operations such as 
creating, deleting, and retrieving
+ * databases and tables from Glue. It encapsulates AWS Glue's API and provides 
a Flink-compatible interface.
+ *
+ * <p>This catalog uses GlueClient to interact with AWS Glue services, and 
operations related to databases and
+ * tables are delegated to respective helper classes like 
GlueDatabaseOperations and GlueTableOperations.</p>
+ */
+public class GlueCatalog extends AbstractCatalog {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(GlueCatalog.class);
+
+    private final GlueClient glueClient;
+    private final GlueTypeConverter glueTypeConverter;
+    private final GlueDatabaseOperator glueDatabaseOperations;
+    private final GlueTableOperator glueTableOperations;
+    private final GlueFunctionOperator glueFunctionsOperations;
+    private final GlueTableUtils glueTableUtils;
+
+    /**
+     * Constructs a GlueCatalog with a provided Glue client.
+     *
+     * @param name            the name of the catalog
+     * @param defaultDatabase the default database for the catalog
+     * @param region          the AWS region to be used for Glue operations
+     * @param glueClient      Glue Client so we can decide which one to use 
for testing
+     */
+    public GlueCatalog(String name, String defaultDatabase, String region, 
GlueClient glueClient) {
+        super(name, defaultDatabase);
+
+        // Initialize GlueClient in the constructor
+        if (glueClient != null) {
+            this.glueClient = glueClient;
+        } else {
+            // If no GlueClient is provided, initialize it using the default 
region
+            this.glueClient = GlueClient.builder()
+                    .region(Region.of(region))
+                    .build();
+        }
+        this.glueTypeConverter = new GlueTypeConverter();
+        this.glueTableUtils = new GlueTableUtils(glueTypeConverter);
+        this.glueDatabaseOperations = new GlueDatabaseOperator(glueClient, 
getName());
+        this.glueTableOperations = new GlueTableOperator(glueClient, 
getName());
+        this.glueFunctionsOperations = new GlueFunctionOperator(glueClient, 
getName());
+    }
+
+    /**
+     * Constructs a GlueCatalog with default client.
+     *
+     * @param name            the name of the catalog
+     * @param defaultDatabase the default database for the catalog
+     * @param region          the AWS region to be used for Glue operations
+     */
+    public GlueCatalog(String name, String defaultDatabase, String region) {
+        super(name, defaultDatabase);
+
+        // Create a synchronized client builder to avoid concurrent 
modification exceptions
+        this.glueClient = GlueClient.builder()
+                .region(Region.of(region))
+                
.credentialsProvider(software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider.create())
+                .build();
+        this.glueTypeConverter = new GlueTypeConverter();
+        this.glueTableUtils = new GlueTableUtils(glueTypeConverter);
+        this.glueDatabaseOperations = new GlueDatabaseOperator(glueClient, 
getName());
+        this.glueTableOperations = new GlueTableOperator(glueClient, 
getName());
+        this.glueFunctionsOperations = new GlueFunctionOperator(glueClient, 
getName());
+    }
+
+    /**
+     * Opens the GlueCatalog and initializes necessary resources.
+     *
+     * @throws CatalogException if an error occurs during the opening process
+     */
+    @Override
+    public void open() throws CatalogException {
+        LOG.info("Opening GlueCatalog with client: {}", glueClient);
+    }
+
+    /**
+     * Closes the GlueCatalog and releases resources.
+     *
+     * @throws CatalogException if an error occurs during the closing process
+     */
+    @Override
+    public void close() throws CatalogException {
+        if (glueClient != null) {
+            LOG.info("Closing GlueCatalog client");
+            int maxRetries = 3;

Review Comment:
   These magic numbers should be constants



##########
flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java:
##########
@@ -0,0 +1,935 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue;
+
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogView;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import 
org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
+import org.apache.flink.table.catalog.glue.operator.GlueDatabaseOperator;
+import org.apache.flink.table.catalog.glue.operator.GlueFunctionOperator;
+import org.apache.flink.table.catalog.glue.operator.GlueTableOperator;
+import org.apache.flink.table.catalog.glue.util.GlueCatalogConstants;
+import org.apache.flink.table.catalog.glue.util.GlueTableUtils;
+import org.apache.flink.table.catalog.glue.util.GlueTypeConverter;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.functions.FunctionIdentifier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.StorageDescriptor;
+import software.amazon.awssdk.services.glue.model.Table;
+import software.amazon.awssdk.services.glue.model.TableInput;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * GlueCatalog is an implementation of the Flink AbstractCatalog that 
interacts with AWS Glue.
+ * This class allows Flink to perform various catalog operations such as 
creating, deleting, and retrieving
+ * databases and tables from Glue. It encapsulates AWS Glue's API and provides 
a Flink-compatible interface.
+ *
+ * <p>This catalog uses GlueClient to interact with AWS Glue services, and 
operations related to databases and
+ * tables are delegated to respective helper classes like 
GlueDatabaseOperations and GlueTableOperations.</p>
+ */
+public class GlueCatalog extends AbstractCatalog {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(GlueCatalog.class);
+
+    private final GlueClient glueClient;
+    private final GlueTypeConverter glueTypeConverter;
+    private final GlueDatabaseOperator glueDatabaseOperations;
+    private final GlueTableOperator glueTableOperations;
+    private final GlueFunctionOperator glueFunctionsOperations;
+    private final GlueTableUtils glueTableUtils;
+
+    /**
+     * Constructs a GlueCatalog with a provided Glue client.
+     *
+     * @param name            the name of the catalog
+     * @param defaultDatabase the default database for the catalog
+     * @param region          the AWS region to be used for Glue operations
+     * @param glueClient      Glue Client so we can decide which one to use 
for testing
+     */
+    public GlueCatalog(String name, String defaultDatabase, String region, 
GlueClient glueClient) {
+        super(name, defaultDatabase);
+
+        // Initialize GlueClient in the constructor
+        if (glueClient != null) {
+            this.glueClient = glueClient;
+        } else {
+            // If no GlueClient is provided, initialize it using the default 
region
+            this.glueClient = GlueClient.builder()
+                    .region(Region.of(region))
+                    .build();
+        }
+        this.glueTypeConverter = new GlueTypeConverter();
+        this.glueTableUtils = new GlueTableUtils(glueTypeConverter);
+        this.glueDatabaseOperations = new GlueDatabaseOperator(glueClient, 
getName());
+        this.glueTableOperations = new GlueTableOperator(glueClient, 
getName());
+        this.glueFunctionsOperations = new GlueFunctionOperator(glueClient, 
getName());
+    }
+
+    /**
+     * Constructs a GlueCatalog with default client.
+     *
+     * @param name            the name of the catalog
+     * @param defaultDatabase the default database for the catalog
+     * @param region          the AWS region to be used for Glue operations
+     */
+    public GlueCatalog(String name, String defaultDatabase, String region) {
+        super(name, defaultDatabase);
+
+        // Create a synchronized client builder to avoid concurrent 
modification exceptions
+        this.glueClient = GlueClient.builder()
+                .region(Region.of(region))
+                
.credentialsProvider(software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider.create())
+                .build();
+        this.glueTypeConverter = new GlueTypeConverter();
+        this.glueTableUtils = new GlueTableUtils(glueTypeConverter);
+        this.glueDatabaseOperations = new GlueDatabaseOperator(glueClient, 
getName());
+        this.glueTableOperations = new GlueTableOperator(glueClient, 
getName());
+        this.glueFunctionsOperations = new GlueFunctionOperator(glueClient, 
getName());
+    }
+
+    /**
+     * Opens the GlueCatalog and initializes necessary resources.
+     *
+     * @throws CatalogException if an error occurs during the opening process
+     */
+    @Override
+    public void open() throws CatalogException {
+        LOG.info("Opening GlueCatalog with client: {}", glueClient);
+    }
+
+    /**
+     * Closes the GlueCatalog and releases resources.
+     *
+     * @throws CatalogException if an error occurs during the closing process
+     */
+    @Override
+    public void close() throws CatalogException {
+        if (glueClient != null) {
+            LOG.info("Closing GlueCatalog client");
+            int maxRetries = 3;
+            int retryCount = 0;
+            long retryDelayMs = 200;
+            while (retryCount < maxRetries) {
+                try {
+                    glueClient.close();
+                    LOG.info("Successfully closed GlueCatalog client");
+                    return;
+                } catch (RuntimeException e) {
+                    retryCount++;
+                    if (retryCount >= maxRetries) {
+                        LOG.warn("Failed to close GlueCatalog client after {} 
retries", maxRetries, e);
+                        throw new CatalogException("Failed to close 
GlueCatalog client", e);
+                    }
+                    LOG.warn("Failed to close GlueCatalog client (attempt 
{}/{}), retrying in {} ms",
+                            retryCount, maxRetries, retryDelayMs, e);
+                    try {
+                        Thread.sleep(retryDelayMs);
+                        // Exponential backoff
+                        retryDelayMs *= 2;
+                    } catch (InterruptedException ie) {
+                        Thread.currentThread().interrupt();
+                        throw new CatalogException("Interrupted while retrying 
to close GlueCatalog client", ie);
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Lists all the databases available in the Glue catalog.
+     *
+     * @return a list of database names
+     * @throws CatalogException if an error occurs while listing the databases
+     */
+    @Override
+    public List<String> listDatabases() throws CatalogException {
+        return glueDatabaseOperations.listDatabases();
+    }
+
+    /**
+     * Retrieves a specific database by its name.
+     *
+     * @param databaseName the name of the database to retrieve
+     * @return the database if found
+     * @throws DatabaseNotExistException if the database does not exist
+     * @throws CatalogException          if an error occurs while retrieving 
the database
+     */
+    @Override
+    public CatalogDatabase getDatabase(String databaseName) throws 
DatabaseNotExistException, CatalogException {
+        boolean databaseExists = databaseExists(databaseName);
+        if (!databaseExists) {
+            throw new DatabaseNotExistException(getName(), databaseName);
+        }
+
+        return glueDatabaseOperations.getDatabase(databaseName);
+    }
+
+    /**
+     * Checks if a database exists in Glue.
+     *
+     * @param databaseName the name of the database
+     * @return true if the database exists, false otherwise
+     * @throws CatalogException if an error occurs while checking the database
+     */
+    @Override
+    public boolean databaseExists(String databaseName) throws CatalogException 
{
+        return glueDatabaseOperations.glueDatabaseExists(databaseName);
+    }
+
+    /**
+     * Creates a new database in Glue.
+     *
+     * @param databaseName    the name of the database to create
+     * @param catalogDatabase the catalog database object containing database 
metadata
+     * @param ifNotExists     flag indicating whether to ignore the error if 
the database already exists
+     * @throws DatabaseAlreadyExistException if the database already exists 
and ifNotExists is false
+     * @throws CatalogException              if an error occurs while creating 
the database
+     */
+    @Override
+    public void createDatabase(String databaseName, CatalogDatabase 
catalogDatabase, boolean ifNotExists)
+            throws DatabaseAlreadyExistException, CatalogException {
+        boolean exists = databaseExists(databaseName);
+
+        if (exists && !ifNotExists) {
+            throw new DatabaseAlreadyExistException(getName(), databaseName);
+        }
+
+        if (!exists) {
+            glueDatabaseOperations.createDatabase(databaseName, 
catalogDatabase);
+        }
+    }
+
+    /**
+     * Drops an existing database in Glue.
+     *
+     * @param databaseName      the name of the database to drop
+     * @param ignoreIfNotExists flag to ignore the exception if the database 
doesn't exist
+     * @param cascade           flag indicating whether to cascade the 
operation to drop related objects
+     * @throws DatabaseNotExistException if the database does not exist and 
ignoreIfNotExists is false
+     * @throws DatabaseNotEmptyException if the database contains objects and 
cascade is false
+     * @throws CatalogException          if an error occurs while dropping the 
database
+     */
+    @Override
+    public void dropDatabase(String databaseName, boolean ignoreIfNotExists, 
boolean cascade)
+            throws DatabaseNotExistException, DatabaseNotEmptyException, 
CatalogException {
+        if (databaseExists(databaseName)) {
+            glueDatabaseOperations.dropGlueDatabase(databaseName);
+        } else if (!ignoreIfNotExists) {
+            throw new DatabaseNotExistException(getName(), databaseName);
+        }
+    }
+
+    /**
+     * Lists all tables in a specified database.
+     *
+     * @param databaseName the name of the database
+     * @return a list of table names in the database
+     * @throws DatabaseNotExistException if the database does not exist
+     * @throws CatalogException          if an error occurs while listing the 
tables
+     */
+    @Override
+    public List<String> listTables(String databaseName) throws 
DatabaseNotExistException, CatalogException {
+        if (!databaseExists(databaseName)) {
+            throw new DatabaseNotExistException(getName(), databaseName);
+        }
+
+        return glueTableOperations.listTables(databaseName);
+    }
+
+    /**
+     * Retrieves a table from the catalog using its object path.
+     *
+     * @param objectPath the object path of the table to retrieve
+     * @return the corresponding CatalogBaseTable for the specified table
+     * @throws TableNotExistException if the table does not exist
+     * @throws CatalogException       if an error occurs while retrieving the 
table
+     */
+    @Override
+    public CatalogBaseTable getTable(ObjectPath objectPath) throws 
TableNotExistException, CatalogException {
+        String databaseName = objectPath.getDatabaseName();
+        String tableName = objectPath.getObjectName();
+
+        Table glueTable = glueTableOperations.getGlueTable(databaseName, 
tableName);
+        return getCatalogBaseTableFromGlueTable(glueTable);
+    }
+
+    /**
+     * Checks if a table exists in the Glue catalog.
+     *
+     * @param objectPath the object path of the table to check
+     * @return true if the table exists, false otherwise
+     * @throws CatalogException if an error occurs while checking the table's 
existence
+     */
+    @Override
+    public boolean tableExists(ObjectPath objectPath) throws CatalogException {
+        String databaseName = objectPath.getDatabaseName();
+        String tableName = objectPath.getObjectName();
+
+        // Delegate existence check to GlueTableOperations
+        return glueTableOperations.glueTableExists(databaseName, tableName);
+    }
+
+    /**
+     * Drops a table from the Glue catalog.
+     *
+     * @param objectPath the object path of the table to drop
+     * @param ifExists   flag indicating whether to ignore the exception if 
the table does not exist
+     * @throws TableNotExistException if the table does not exist and ifExists 
is false
+     * @throws CatalogException       if an error occurs while dropping the 
table
+     */
+    @Override
+    public void dropTable(ObjectPath objectPath, boolean ifExists) throws 
TableNotExistException, CatalogException {
+        String databaseName = objectPath.getDatabaseName();
+        String tableName = objectPath.getObjectName();
+
+        if (!glueTableOperations.glueTableExists(databaseName, tableName)) {
+            if (!ifExists) {
+                throw new TableNotExistException(getName(), objectPath);
+            }
+            return; // Table doesn't exist, and IF EXISTS is true
+        }
+
+        glueTableOperations.dropTable(databaseName, tableName);
+    }
+
+    /**
+     * Creates a table in the Glue catalog.
+     *
+     * @param objectPath       the object path of the table to create
+     * @param catalogBaseTable the table definition containing the schema and 
properties
+     * @param ifNotExists      flag indicating whether to ignore the exception 
if the table already exists
+     * @throws TableAlreadyExistException if the table already exists and 
ifNotExists is false
+     * @throws DatabaseNotExistException  if the database does not exist
+     * @throws CatalogException           if an error occurs while creating 
the table
+     */
+    @Override
+    public void createTable(ObjectPath objectPath, CatalogBaseTable 
catalogBaseTable, boolean ifNotExists)
+            throws TableAlreadyExistException, DatabaseNotExistException, 
CatalogException {
+
+        // Validate that required parameters are not null
+        if (objectPath == null) {
+            throw new NullPointerException("ObjectPath cannot be null");
+        }
+
+        if (catalogBaseTable == null) {
+            throw new NullPointerException("CatalogBaseTable cannot be null");
+        }
+
+        String databaseName = objectPath.getDatabaseName();
+        String tableName = objectPath.getObjectName();
+
+        // Check if the database exists
+        if (!databaseExists(databaseName)) {
+            throw new DatabaseNotExistException(getName(), databaseName);
+        }
+
+        // Check if the table already exists
+        if (glueTableOperations.glueTableExists(databaseName, tableName)) {
+            if (!ifNotExists) {
+                throw new TableAlreadyExistException(getName(), objectPath);
+            }
+            return; // Table exists, and IF NOT EXISTS is true
+        }
+
+        // Get common properties
+        Map<String, String> tableProperties = new 
HashMap<>(catalogBaseTable.getOptions());
+
+        try {
+            // Process based on table type
+            if (catalogBaseTable.getTableKind() == 
CatalogBaseTable.TableKind.TABLE) {
+                createRegularTable(objectPath, (CatalogTable) 
catalogBaseTable, tableProperties);
+            } else if (catalogBaseTable.getTableKind() == 
CatalogBaseTable.TableKind.VIEW) {
+                createView(objectPath, (CatalogView) catalogBaseTable, 
tableProperties);
+            } else {
+                throw new CatalogException("Unsupported table kind: " + 
catalogBaseTable.getTableKind());
+            }
+            LOG.info("Successfully created {}.{} of kind {}", databaseName, 
tableName, catalogBaseTable.getTableKind());
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format("Failed to create table %s.%s: %s", 
databaseName, tableName, e.getMessage()), e);
+        }
+    }
+
+    /**
+     * Lists all views in a specified database.
+     *
+     * @param databaseName the name of the database
+     * @return a list of view names in the database
+     * @throws DatabaseNotExistException if the database does not exist
+     * @throws CatalogException          if an error occurs while listing the 
views
+     */
+    @Override
+    public List<String> listViews(String databaseName) throws 
DatabaseNotExistException, CatalogException {
+
+        // Check if the database exists before listing views
+        if (!databaseExists(databaseName)) {
+            throw new DatabaseNotExistException(getName(), databaseName);
+        }
+
+        try {
+            // Get all tables in the database
+            List<Table> allTables = glueClient.getTables(builder -> 
builder.databaseName(databaseName))
+                    .tableList();
+
+            // Filter tables to only include those that are of type VIEW
+            List<String> viewNames = allTables.stream()

Review Comment:
   You can return here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to