leekeiabstraction commented on code in PR #206:
URL:
https://github.com/apache/flink-connector-aws/pull/206#discussion_r2095889230
##########
flink-catalog-aws/flink-catalog-aws-glue/README.md:
##########
@@ -0,0 +1,358 @@
+# Flink AWS Glue Catalog Connector
+
+The Flink AWS Glue Catalog connector provides integration between Apache Flink
and the AWS Glue Data Catalog. This connector enables Flink applications to use
AWS Glue as a metadata catalog for tables, databases, and schemas, allowing
seamless SQL queries against AWS resources.
+
+## Features
+
+- Register AWS Glue as a catalog in Flink applications
+- Access Glue databases and tables through Flink SQL
+- Support for various AWS data sources (S3, Kinesis, MSK)
+- Mapping between Flink and AWS Glue data types
+- Compatibility with Flink's Table API and SQL interface
+
+## Prerequisites
+
+Before getting started, ensure you have the following:
+
+- **AWS account** with appropriate permissions for AWS Glue and other required
services
+- **AWS credentials** properly configured
+
+## Getting Started
+
+### 1. Add Dependency
+
+Add the AWS Glue Catalog connector to your Flink project:
+
+### 2. Configure AWS Credentials
+
+Ensure AWS credentials are configured using one of these methods:
+
+- Environment variables
+- AWS credentials file
+- IAM roles (for applications running on AWS)
+
+### 3. Register the Glue Catalog
+
+You can register the AWS Glue catalog using either the Table API or SQL:
+
+#### Using Table API (Java/Scala)
+
+```java
+// Java/Scala
+import org.apache.flink.table.catalog.glue.GlueCatalog;
+import org.apache.flink.table.catalog.Catalog;
+
+// Create Glue catalog instance
+Catalog glueCatalog = new GlueCatalog(
+ "glue_catalog", // Catalog name
+ "default", // Default database
+ "us-east-1"); // AWS region
+
+
+// Register with table environment
+tableEnv.registerCatalog("glue_catalog", glueCatalog);
+tableEnv.useCatalog("glue_catalog");
+```
+
+#### Using Table API (Python)
+
+```python
+# Python
+from pyflink.table.catalog import GlueCatalog
+
+# Create and register Glue catalog
+glue_catalog = GlueCatalog(
+ "glue_catalog", # Catalog name
+ "default", # Default database
+ "us-east-1") # AWS region
+
+t_env.register_catalog("glue_catalog", glue_catalog)
+t_env.use_catalog("glue_catalog")
+```
+
+#### Using SQL
+
+In the Flink SQL Client, create and use the Glue catalog:
+
+```sql
+-- Create a catalog using Glue
+CREATE CATALOG glue_catalog WITH (
+ 'type' = 'glue',
+ 'catalog-name' = 'glue_catalog',
+ 'default-database' = 'default',
+ 'region' = 'us-east-1'
+);
+
+-- Use the created catalog
+USE CATALOG glue_catalog;
+
+-- Use a specific database
+USE default;
+```
+
+### 4. Create or Reference Glue Tables
Review Comment:
In the example below, will user need to have the
`flink-sql-connector-kinesis` within classpath/library/module of Flink? If so,
we should mention it here or earlier in prerequisite.
##########
flink-catalog-aws/flink-catalog-aws-glue/README.md:
##########
@@ -0,0 +1,358 @@
+# Flink AWS Glue Catalog Connector
+
+The Flink AWS Glue Catalog connector provides integration between Apache Flink
and the AWS Glue Data Catalog. This connector enables Flink applications to use
AWS Glue as a metadata catalog for tables, databases, and schemas, allowing
seamless SQL queries against AWS resources.
+
+## Features
+
+- Register AWS Glue as a catalog in Flink applications
+- Access Glue databases and tables through Flink SQL
+- Support for various AWS data sources (S3, Kinesis, MSK)
+- Mapping between Flink and AWS Glue data types
+- Compatibility with Flink's Table API and SQL interface
+
+## Prerequisites
+
+Before getting started, ensure you have the following:
+
+- **AWS account** with appropriate permissions for AWS Glue and other required
services
+- **AWS credentials** properly configured
+
+## Getting Started
+
+### 1. Add Dependency
+
+Add the AWS Glue Catalog connector to your Flink project:
+
+### 2. Configure AWS Credentials
Review Comment:
We should mention here or under pre-requisite that the role/user/session
associated with the credentials should have the appropriate permission to Glue
database.
##########
flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java:
##########
@@ -0,0 +1,935 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue;
+
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogView;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import
org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
+import org.apache.flink.table.catalog.glue.operator.GlueDatabaseOperator;
+import org.apache.flink.table.catalog.glue.operator.GlueFunctionOperator;
+import org.apache.flink.table.catalog.glue.operator.GlueTableOperator;
+import org.apache.flink.table.catalog.glue.util.GlueCatalogConstants;
+import org.apache.flink.table.catalog.glue.util.GlueTableUtils;
+import org.apache.flink.table.catalog.glue.util.GlueTypeConverter;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.functions.FunctionIdentifier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.StorageDescriptor;
+import software.amazon.awssdk.services.glue.model.Table;
+import software.amazon.awssdk.services.glue.model.TableInput;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * GlueCatalog is an implementation of the Flink AbstractCatalog that
interacts with AWS Glue.
+ * This class allows Flink to perform various catalog operations such as
creating, deleting, and retrieving
+ * databases and tables from Glue. It encapsulates AWS Glue's API and provides
a Flink-compatible interface.
+ *
+ * <p>This catalog uses GlueClient to interact with AWS Glue services, and
operations related to databases and
+ * tables are delegated to respective helper classes like
GlueDatabaseOperations and GlueTableOperations.</p>
+ */
+public class GlueCatalog extends AbstractCatalog {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(GlueCatalog.class);
+
+ private final GlueClient glueClient;
+ private final GlueTypeConverter glueTypeConverter;
+ private final GlueDatabaseOperator glueDatabaseOperations;
+ private final GlueTableOperator glueTableOperations;
+ private final GlueFunctionOperator glueFunctionsOperations;
+ private final GlueTableUtils glueTableUtils;
+
+ /**
+ * Constructs a GlueCatalog with a provided Glue client.
+ *
+ * @param name the name of the catalog
+ * @param defaultDatabase the default database for the catalog
+ * @param region the AWS region to be used for Glue operations
+ * @param glueClient Glue Client so we can decide which one to use
for testing
+ */
+ public GlueCatalog(String name, String defaultDatabase, String region,
GlueClient glueClient) {
+ super(name, defaultDatabase);
+
+ // Initialize GlueClient in the constructor
+ if (glueClient != null) {
+ this.glueClient = glueClient;
+ } else {
+ // If no GlueClient is provided, initialize it using the default
region
+ this.glueClient = GlueClient.builder()
+ .region(Region.of(region))
+ .build();
+ }
+ this.glueTypeConverter = new GlueTypeConverter();
+ this.glueTableUtils = new GlueTableUtils(glueTypeConverter);
+ this.glueDatabaseOperations = new GlueDatabaseOperator(glueClient,
getName());
+ this.glueTableOperations = new GlueTableOperator(glueClient,
getName());
+ this.glueFunctionsOperations = new GlueFunctionOperator(glueClient,
getName());
+ }
+
+ /**
+ * Constructs a GlueCatalog with default client.
+ *
+ * @param name the name of the catalog
+ * @param defaultDatabase the default database for the catalog
+ * @param region the AWS region to be used for Glue operations
+ */
+ public GlueCatalog(String name, String defaultDatabase, String region) {
+ super(name, defaultDatabase);
+
+ // Create a synchronized client builder to avoid concurrent
modification exceptions
+ this.glueClient = GlueClient.builder()
+ .region(Region.of(region))
+
.credentialsProvider(software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider.create())
+ .build();
+ this.glueTypeConverter = new GlueTypeConverter();
+ this.glueTableUtils = new GlueTableUtils(glueTypeConverter);
+ this.glueDatabaseOperations = new GlueDatabaseOperator(glueClient,
getName());
+ this.glueTableOperations = new GlueTableOperator(glueClient,
getName());
+ this.glueFunctionsOperations = new GlueFunctionOperator(glueClient,
getName());
+ }
+
+ /**
+ * Opens the GlueCatalog and initializes necessary resources.
+ *
+ * @throws CatalogException if an error occurs during the opening process
+ */
+ @Override
+ public void open() throws CatalogException {
+ LOG.info("Opening GlueCatalog with client: {}", glueClient);
+ }
+
+ /**
+ * Closes the GlueCatalog and releases resources.
+ *
+ * @throws CatalogException if an error occurs during the closing process
+ */
+ @Override
+ public void close() throws CatalogException {
+ if (glueClient != null) {
+ LOG.info("Closing GlueCatalog client");
+ int maxRetries = 3;
+ int retryCount = 0;
+ long retryDelayMs = 200;
+ while (retryCount < maxRetries) {
+ try {
+ glueClient.close();
+ LOG.info("Successfully closed GlueCatalog client");
+ return;
+ } catch (RuntimeException e) {
+ retryCount++;
+ if (retryCount >= maxRetries) {
+ LOG.warn("Failed to close GlueCatalog client after {}
retries", maxRetries, e);
+ throw new CatalogException("Failed to close
GlueCatalog client", e);
+ }
+ LOG.warn("Failed to close GlueCatalog client (attempt
{}/{}), retrying in {} ms",
+ retryCount, maxRetries, retryDelayMs, e);
+ try {
+ Thread.sleep(retryDelayMs);
+ // Exponential backoff
+ retryDelayMs *= 2;
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw new CatalogException("Interrupted while retrying
to close GlueCatalog client", ie);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Lists all the databases available in the Glue catalog.
+ *
+ * @return a list of database names
+ * @throws CatalogException if an error occurs while listing the databases
+ */
+ @Override
+ public List<String> listDatabases() throws CatalogException {
+ return glueDatabaseOperations.listDatabases();
+ }
+
+ /**
+ * Retrieves a specific database by its name.
+ *
+ * @param databaseName the name of the database to retrieve
+ * @return the database if found
+ * @throws DatabaseNotExistException if the database does not exist
+ * @throws CatalogException if an error occurs while retrieving
the database
+ */
+ @Override
+ public CatalogDatabase getDatabase(String databaseName) throws
DatabaseNotExistException, CatalogException {
+ boolean databaseExists = databaseExists(databaseName);
+ if (!databaseExists) {
+ throw new DatabaseNotExistException(getName(), databaseName);
+ }
+
+ return glueDatabaseOperations.getDatabase(databaseName);
+ }
+
+ /**
+ * Checks if a database exists in Glue.
+ *
+ * @param databaseName the name of the database
+ * @return true if the database exists, false otherwise
+ * @throws CatalogException if an error occurs while checking the database
+ */
+ @Override
+ public boolean databaseExists(String databaseName) throws CatalogException
{
+ return glueDatabaseOperations.glueDatabaseExists(databaseName);
+ }
+
+ /**
+ * Creates a new database in Glue.
+ *
+ * @param databaseName the name of the database to create
+ * @param catalogDatabase the catalog database object containing database
metadata
+ * @param ifNotExists flag indicating whether to ignore the error if
the database already exists
+ * @throws DatabaseAlreadyExistException if the database already exists
and ifNotExists is false
+ * @throws CatalogException if an error occurs while creating
the database
+ */
+ @Override
+ public void createDatabase(String databaseName, CatalogDatabase
catalogDatabase, boolean ifNotExists)
+ throws DatabaseAlreadyExistException, CatalogException {
+ boolean exists = databaseExists(databaseName);
+
+ if (exists && !ifNotExists) {
+ throw new DatabaseAlreadyExistException(getName(), databaseName);
+ }
+
+ if (!exists) {
+ glueDatabaseOperations.createDatabase(databaseName,
catalogDatabase);
+ }
+ }
+
+ /**
+ * Drops an existing database in Glue.
+ *
+ * @param databaseName the name of the database to drop
+ * @param ignoreIfNotExists flag to ignore the exception if the database
doesn't exist
+ * @param cascade flag indicating whether to cascade the
operation to drop related objects
+ * @throws DatabaseNotExistException if the database does not exist and
ignoreIfNotExists is false
+ * @throws DatabaseNotEmptyException if the database contains objects and
cascade is false
+ * @throws CatalogException if an error occurs while dropping the
database
+ */
+ @Override
+ public void dropDatabase(String databaseName, boolean ignoreIfNotExists,
boolean cascade)
+ throws DatabaseNotExistException, DatabaseNotEmptyException,
CatalogException {
Review Comment:
Is DatabaseNotEmptyException thrown from within this method?
##########
flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java:
##########
@@ -0,0 +1,935 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue;
+
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogView;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import
org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
+import org.apache.flink.table.catalog.glue.operator.GlueDatabaseOperator;
+import org.apache.flink.table.catalog.glue.operator.GlueFunctionOperator;
+import org.apache.flink.table.catalog.glue.operator.GlueTableOperator;
+import org.apache.flink.table.catalog.glue.util.GlueCatalogConstants;
+import org.apache.flink.table.catalog.glue.util.GlueTableUtils;
+import org.apache.flink.table.catalog.glue.util.GlueTypeConverter;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.functions.FunctionIdentifier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.StorageDescriptor;
+import software.amazon.awssdk.services.glue.model.Table;
+import software.amazon.awssdk.services.glue.model.TableInput;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * GlueCatalog is an implementation of the Flink AbstractCatalog that
interacts with AWS Glue.
+ * This class allows Flink to perform various catalog operations such as
creating, deleting, and retrieving
+ * databases and tables from Glue. It encapsulates AWS Glue's API and provides
a Flink-compatible interface.
+ *
+ * <p>This catalog uses GlueClient to interact with AWS Glue services, and
operations related to databases and
+ * tables are delegated to respective helper classes like
GlueDatabaseOperations and GlueTableOperations.</p>
+ */
+public class GlueCatalog extends AbstractCatalog {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(GlueCatalog.class);
+
+ private final GlueClient glueClient;
+ private final GlueTypeConverter glueTypeConverter;
+ private final GlueDatabaseOperator glueDatabaseOperations;
+ private final GlueTableOperator glueTableOperations;
+ private final GlueFunctionOperator glueFunctionsOperations;
+ private final GlueTableUtils glueTableUtils;
+
+ /**
+ * Constructs a GlueCatalog with a provided Glue client.
+ *
+ * @param name the name of the catalog
+ * @param defaultDatabase the default database for the catalog
+ * @param region the AWS region to be used for Glue operations
+ * @param glueClient Glue Client so we can decide which one to use
for testing
Review Comment:
The 2 constructors have a number of duplicated code.
It looks like the duplicated code is primarily to instantiate client
wrappers. Would it make sense to do the following instead (argument checks
omitted)?
```
@VisibleForTesting
GlueCatalog(String name, String defaultDatabase, GlueClient glueClient) {
super(name, defaultDatabase);
setup(glueClient);
}
public GlueCatalog(String name, String defaultDatabase, String region) {
super(name, defaultDatabase);
GlueClient client = GlueClient.builder()
.region(Region.of(region))
.credentialsProvider(software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider.create())
.build());
setup(client);
}
private void setup(GlueClient glueClient) {
this.glueClient = glueClient;
this.glueTypeConverter = new GlueTypeConverter();
this.glueTableUtils = new GlueTableUtils(glueTypeConverter);
this.glueDatabaseOperations = new GlueDatabaseOperator(glueClient,
getName());
this.glueTableOperations = new GlueTableOperator(glueClient,
getName());
this.glueFunctionsOperations = new GlueFunctionOperator(glueClient,
getName());
}
```
##########
flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java:
##########
@@ -0,0 +1,935 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue;
+
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogView;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import
org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
+import org.apache.flink.table.catalog.glue.operator.GlueDatabaseOperator;
+import org.apache.flink.table.catalog.glue.operator.GlueFunctionOperator;
+import org.apache.flink.table.catalog.glue.operator.GlueTableOperator;
+import org.apache.flink.table.catalog.glue.util.GlueCatalogConstants;
+import org.apache.flink.table.catalog.glue.util.GlueTableUtils;
+import org.apache.flink.table.catalog.glue.util.GlueTypeConverter;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.functions.FunctionIdentifier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.StorageDescriptor;
+import software.amazon.awssdk.services.glue.model.Table;
+import software.amazon.awssdk.services.glue.model.TableInput;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * GlueCatalog is an implementation of the Flink AbstractCatalog that
interacts with AWS Glue.
+ * This class allows Flink to perform various catalog operations such as
creating, deleting, and retrieving
+ * databases and tables from Glue. It encapsulates AWS Glue's API and provides
a Flink-compatible interface.
+ *
+ * <p>This catalog uses GlueClient to interact with AWS Glue services, and
operations related to databases and
+ * tables are delegated to respective helper classes like
GlueDatabaseOperations and GlueTableOperations.</p>
+ */
+public class GlueCatalog extends AbstractCatalog {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(GlueCatalog.class);
+
+ private final GlueClient glueClient;
+ private final GlueTypeConverter glueTypeConverter;
+ private final GlueDatabaseOperator glueDatabaseOperations;
+ private final GlueTableOperator glueTableOperations;
+ private final GlueFunctionOperator glueFunctionsOperations;
+ private final GlueTableUtils glueTableUtils;
+
+ /**
+ * Constructs a GlueCatalog with a provided Glue client.
+ *
+ * @param name the name of the catalog
+ * @param defaultDatabase the default database for the catalog
+ * @param region the AWS region to be used for Glue operations
+ * @param glueClient Glue Client so we can decide which one to use
for testing
+ */
+ public GlueCatalog(String name, String defaultDatabase, String region,
GlueClient glueClient) {
+ super(name, defaultDatabase);
+
+ // Initialize GlueClient in the constructor
+ if (glueClient != null) {
+ this.glueClient = glueClient;
+ } else {
+ // If no GlueClient is provided, initialize it using the default
region
+ this.glueClient = GlueClient.builder()
+ .region(Region.of(region))
+ .build();
+ }
+ this.glueTypeConverter = new GlueTypeConverter();
+ this.glueTableUtils = new GlueTableUtils(glueTypeConverter);
+ this.glueDatabaseOperations = new GlueDatabaseOperator(glueClient,
getName());
+ this.glueTableOperations = new GlueTableOperator(glueClient,
getName());
+ this.glueFunctionsOperations = new GlueFunctionOperator(glueClient,
getName());
+ }
+
+ /**
+ * Constructs a GlueCatalog with default client.
+ *
+ * @param name the name of the catalog
+ * @param defaultDatabase the default database for the catalog
+ * @param region the AWS region to be used for Glue operations
+ */
+ public GlueCatalog(String name, String defaultDatabase, String region) {
+ super(name, defaultDatabase);
+
+ // Create a synchronized client builder to avoid concurrent
modification exceptions
+ this.glueClient = GlueClient.builder()
+ .region(Region.of(region))
+
.credentialsProvider(software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider.create())
+ .build();
+ this.glueTypeConverter = new GlueTypeConverter();
+ this.glueTableUtils = new GlueTableUtils(glueTypeConverter);
+ this.glueDatabaseOperations = new GlueDatabaseOperator(glueClient,
getName());
+ this.glueTableOperations = new GlueTableOperator(glueClient,
getName());
+ this.glueFunctionsOperations = new GlueFunctionOperator(glueClient,
getName());
+ }
+
+ /**
+ * Opens the GlueCatalog and initializes necessary resources.
+ *
+ * @throws CatalogException if an error occurs during the opening process
+ */
+ @Override
+ public void open() throws CatalogException {
+ LOG.info("Opening GlueCatalog with client: {}", glueClient);
+ }
+
+ /**
+ * Closes the GlueCatalog and releases resources.
+ *
+ * @throws CatalogException if an error occurs during the closing process
+ */
+ @Override
+ public void close() throws CatalogException {
+ if (glueClient != null) {
Review Comment:
Is there a scenario where glueClient would be null?
##########
flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java:
##########
@@ -0,0 +1,935 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue;
+
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogView;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import
org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
+import org.apache.flink.table.catalog.glue.operator.GlueDatabaseOperator;
+import org.apache.flink.table.catalog.glue.operator.GlueFunctionOperator;
+import org.apache.flink.table.catalog.glue.operator.GlueTableOperator;
+import org.apache.flink.table.catalog.glue.util.GlueCatalogConstants;
+import org.apache.flink.table.catalog.glue.util.GlueTableUtils;
+import org.apache.flink.table.catalog.glue.util.GlueTypeConverter;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.functions.FunctionIdentifier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.StorageDescriptor;
+import software.amazon.awssdk.services.glue.model.Table;
+import software.amazon.awssdk.services.glue.model.TableInput;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * GlueCatalog is an implementation of the Flink AbstractCatalog that
interacts with AWS Glue.
+ * This class allows Flink to perform various catalog operations such as
creating, deleting, and retrieving
+ * databases and tables from Glue. It encapsulates AWS Glue's API and provides
a Flink-compatible interface.
+ *
+ * <p>This catalog uses GlueClient to interact with AWS Glue services, and
operations related to databases and
+ * tables are delegated to respective helper classes like
GlueDatabaseOperations and GlueTableOperations.</p>
+ */
+public class GlueCatalog extends AbstractCatalog {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(GlueCatalog.class);
+
+ private final GlueClient glueClient;
+ private final GlueTypeConverter glueTypeConverter;
+ private final GlueDatabaseOperator glueDatabaseOperations;
+ private final GlueTableOperator glueTableOperations;
+ private final GlueFunctionOperator glueFunctionsOperations;
+ private final GlueTableUtils glueTableUtils;
+
+ /**
+ * Constructs a GlueCatalog with a provided Glue client.
+ *
+ * @param name the name of the catalog
+ * @param defaultDatabase the default database for the catalog
+ * @param region the AWS region to be used for Glue operations
+ * @param glueClient Glue Client so we can decide which one to use
for testing
+ */
+ public GlueCatalog(String name, String defaultDatabase, String region,
GlueClient glueClient) {
+ super(name, defaultDatabase);
+
+ // Initialize GlueClient in the constructor
+ if (glueClient != null) {
+ this.glueClient = glueClient;
+ } else {
+ // If no GlueClient is provided, initialize it using the default
region
+ this.glueClient = GlueClient.builder()
+ .region(Region.of(region))
+ .build();
+ }
+ this.glueTypeConverter = new GlueTypeConverter();
+ this.glueTableUtils = new GlueTableUtils(glueTypeConverter);
+ this.glueDatabaseOperations = new GlueDatabaseOperator(glueClient,
getName());
+ this.glueTableOperations = new GlueTableOperator(glueClient,
getName());
+ this.glueFunctionsOperations = new GlueFunctionOperator(glueClient,
getName());
+ }
+
+ /**
+ * Constructs a GlueCatalog with default client.
+ *
+ * @param name the name of the catalog
+ * @param defaultDatabase the default database for the catalog
+ * @param region the AWS region to be used for Glue operations
+ */
+ public GlueCatalog(String name, String defaultDatabase, String region) {
+ super(name, defaultDatabase);
+
+ // Create a synchronized client builder to avoid concurrent
modification exceptions
+ this.glueClient = GlueClient.builder()
+ .region(Region.of(region))
+
.credentialsProvider(software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider.create())
+ .build();
+ this.glueTypeConverter = new GlueTypeConverter();
+ this.glueTableUtils = new GlueTableUtils(glueTypeConverter);
+ this.glueDatabaseOperations = new GlueDatabaseOperator(glueClient,
getName());
+ this.glueTableOperations = new GlueTableOperator(glueClient,
getName());
+ this.glueFunctionsOperations = new GlueFunctionOperator(glueClient,
getName());
+ }
+
+ /**
+ * Opens the GlueCatalog and initializes necessary resources.
+ *
+ * @throws CatalogException if an error occurs during the opening process
+ */
+ @Override
+ public void open() throws CatalogException {
+ LOG.info("Opening GlueCatalog with client: {}", glueClient);
+ }
+
+ /**
+ * Closes the GlueCatalog and releases resources.
+ *
+ * @throws CatalogException if an error occurs during the closing process
+ */
+ @Override
+ public void close() throws CatalogException {
+ if (glueClient != null) {
+ LOG.info("Closing GlueCatalog client");
+ int maxRetries = 3;
+ int retryCount = 0;
+ long retryDelayMs = 200;
+ while (retryCount < maxRetries) {
Review Comment:
Curious on what is the motivation for retries here? Is there a specific
RuntimeException that we have in mind?
I had a look at AWS sdk, the close is best-effort and logs in case of any
exceptions:
https://github.com/aws/aws-sdk-java-v2/blob/master/utils/src/main/java/software/amazon/awssdk/utils/IoUtils.java#L76-L85
We can simplify code here by removing retry logic as sdk does not seem to be
surfacing exceptions during close().
##########
flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java:
##########
@@ -0,0 +1,935 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue;
+
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogView;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import
org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
+import org.apache.flink.table.catalog.glue.operator.GlueDatabaseOperator;
+import org.apache.flink.table.catalog.glue.operator.GlueFunctionOperator;
+import org.apache.flink.table.catalog.glue.operator.GlueTableOperator;
+import org.apache.flink.table.catalog.glue.util.GlueCatalogConstants;
+import org.apache.flink.table.catalog.glue.util.GlueTableUtils;
+import org.apache.flink.table.catalog.glue.util.GlueTypeConverter;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.functions.FunctionIdentifier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.StorageDescriptor;
+import software.amazon.awssdk.services.glue.model.Table;
+import software.amazon.awssdk.services.glue.model.TableInput;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * GlueCatalog is an implementation of the Flink AbstractCatalog that
interacts with AWS Glue.
+ * This class allows Flink to perform various catalog operations such as
creating, deleting, and retrieving
+ * databases and tables from Glue. It encapsulates AWS Glue's API and provides
a Flink-compatible interface.
+ *
+ * <p>This catalog uses GlueClient to interact with AWS Glue services, and
operations related to databases and
+ * tables are delegated to respective helper classes like
GlueDatabaseOperations and GlueTableOperations.</p>
+ */
+public class GlueCatalog extends AbstractCatalog {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(GlueCatalog.class);
+
+ private final GlueClient glueClient;
+ private final GlueTypeConverter glueTypeConverter;
+ private final GlueDatabaseOperator glueDatabaseOperations;
+ private final GlueTableOperator glueTableOperations;
+ private final GlueFunctionOperator glueFunctionsOperations;
+ private final GlueTableUtils glueTableUtils;
+
+ /**
+ * Constructs a GlueCatalog with a provided Glue client.
+ *
+ * @param name the name of the catalog
+ * @param defaultDatabase the default database for the catalog
+ * @param region the AWS region to be used for Glue operations
+ * @param glueClient Glue Client so we can decide which one to use
for testing
+ */
+ public GlueCatalog(String name, String defaultDatabase, String region,
GlueClient glueClient) {
+ super(name, defaultDatabase);
+
+ // Initialize GlueClient in the constructor
+ if (glueClient != null) {
+ this.glueClient = glueClient;
+ } else {
+ // If no GlueClient is provided, initialize it using the default
region
+ this.glueClient = GlueClient.builder()
+ .region(Region.of(region))
+ .build();
+ }
+ this.glueTypeConverter = new GlueTypeConverter();
+ this.glueTableUtils = new GlueTableUtils(glueTypeConverter);
+ this.glueDatabaseOperations = new GlueDatabaseOperator(glueClient,
getName());
+ this.glueTableOperations = new GlueTableOperator(glueClient,
getName());
+ this.glueFunctionsOperations = new GlueFunctionOperator(glueClient,
getName());
+ }
+
+ /**
+ * Constructs a GlueCatalog with default client.
+ *
+ * @param name the name of the catalog
+ * @param defaultDatabase the default database for the catalog
+ * @param region the AWS region to be used for Glue operations
+ */
+ public GlueCatalog(String name, String defaultDatabase, String region) {
+ super(name, defaultDatabase);
+
+ // Create a synchronized client builder to avoid concurrent
modification exceptions
+ this.glueClient = GlueClient.builder()
+ .region(Region.of(region))
+
.credentialsProvider(software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider.create())
+ .build();
+ this.glueTypeConverter = new GlueTypeConverter();
+ this.glueTableUtils = new GlueTableUtils(glueTypeConverter);
+ this.glueDatabaseOperations = new GlueDatabaseOperator(glueClient,
getName());
+ this.glueTableOperations = new GlueTableOperator(glueClient,
getName());
+ this.glueFunctionsOperations = new GlueFunctionOperator(glueClient,
getName());
+ }
+
+ /**
+ * Opens the GlueCatalog and initializes necessary resources.
+ *
+ * @throws CatalogException if an error occurs during the opening process
+ */
+ @Override
+ public void open() throws CatalogException {
+ LOG.info("Opening GlueCatalog with client: {}", glueClient);
+ }
+
+ /**
+ * Closes the GlueCatalog and releases resources.
+ *
+ * @throws CatalogException if an error occurs during the closing process
+ */
+ @Override
+ public void close() throws CatalogException {
+ if (glueClient != null) {
+ LOG.info("Closing GlueCatalog client");
+ int maxRetries = 3;
+ int retryCount = 0;
+ long retryDelayMs = 200;
+ while (retryCount < maxRetries) {
+ try {
+ glueClient.close();
+ LOG.info("Successfully closed GlueCatalog client");
+ return;
+ } catch (RuntimeException e) {
+ retryCount++;
+ if (retryCount >= maxRetries) {
+ LOG.warn("Failed to close GlueCatalog client after {}
retries", maxRetries, e);
+ throw new CatalogException("Failed to close
GlueCatalog client", e);
+ }
+ LOG.warn("Failed to close GlueCatalog client (attempt
{}/{}), retrying in {} ms",
+ retryCount, maxRetries, retryDelayMs, e);
+ try {
+ Thread.sleep(retryDelayMs);
+ // Exponential backoff
+ retryDelayMs *= 2;
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw new CatalogException("Interrupted while retrying
to close GlueCatalog client", ie);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Lists all the databases available in the Glue catalog.
+ *
+ * @return a list of database names
+ * @throws CatalogException if an error occurs while listing the databases
+ */
+ @Override
+ public List<String> listDatabases() throws CatalogException {
+ return glueDatabaseOperations.listDatabases();
+ }
+
+ /**
+ * Retrieves a specific database by its name.
+ *
+ * @param databaseName the name of the database to retrieve
+ * @return the database if found
+ * @throws DatabaseNotExistException if the database does not exist
+ * @throws CatalogException if an error occurs while retrieving
the database
+ */
+ @Override
+ public CatalogDatabase getDatabase(String databaseName) throws
DatabaseNotExistException, CatalogException {
+ boolean databaseExists = databaseExists(databaseName);
+ if (!databaseExists) {
+ throw new DatabaseNotExistException(getName(), databaseName);
+ }
+
+ return glueDatabaseOperations.getDatabase(databaseName);
+ }
+
+ /**
+ * Checks if a database exists in Glue.
+ *
+ * @param databaseName the name of the database
+ * @return true if the database exists, false otherwise
+ * @throws CatalogException if an error occurs while checking the database
+ */
+ @Override
+ public boolean databaseExists(String databaseName) throws CatalogException
{
+ return glueDatabaseOperations.glueDatabaseExists(databaseName);
+ }
+
+ /**
+ * Creates a new database in Glue.
+ *
+ * @param databaseName the name of the database to create
+ * @param catalogDatabase the catalog database object containing database
metadata
+ * @param ifNotExists flag indicating whether to ignore the error if
the database already exists
+ * @throws DatabaseAlreadyExistException if the database already exists
and ifNotExists is false
+ * @throws CatalogException if an error occurs while creating
the database
+ */
+ @Override
+ public void createDatabase(String databaseName, CatalogDatabase
catalogDatabase, boolean ifNotExists)
+ throws DatabaseAlreadyExistException, CatalogException {
+ boolean exists = databaseExists(databaseName);
+
+ if (exists && !ifNotExists) {
+ throw new DatabaseAlreadyExistException(getName(), databaseName);
+ }
+
+ if (!exists) {
+ glueDatabaseOperations.createDatabase(databaseName,
catalogDatabase);
+ }
+ }
+
+ /**
+ * Drops an existing database in Glue.
+ *
+ * @param databaseName the name of the database to drop
+ * @param ignoreIfNotExists flag to ignore the exception if the database
doesn't exist
+ * @param cascade flag indicating whether to cascade the
operation to drop related objects
+ * @throws DatabaseNotExistException if the database does not exist and
ignoreIfNotExists is false
+ * @throws DatabaseNotEmptyException if the database contains objects and
cascade is false
+ * @throws CatalogException if an error occurs while dropping the
database
+ */
+ @Override
+ public void dropDatabase(String databaseName, boolean ignoreIfNotExists,
boolean cascade)
+ throws DatabaseNotExistException, DatabaseNotEmptyException,
CatalogException {
+ if (databaseExists(databaseName)) {
+ glueDatabaseOperations.dropGlueDatabase(databaseName);
+ } else if (!ignoreIfNotExists) {
+ throw new DatabaseNotExistException(getName(), databaseName);
+ }
+ }
+
+ /**
+ * Lists all tables in a specified database.
+ *
+ * @param databaseName the name of the database
+ * @return a list of table names in the database
+ * @throws DatabaseNotExistException if the database does not exist
+ * @throws CatalogException if an error occurs while listing the
tables
+ */
+ @Override
+ public List<String> listTables(String databaseName) throws
DatabaseNotExistException, CatalogException {
+ if (!databaseExists(databaseName)) {
+ throw new DatabaseNotExistException(getName(), databaseName);
+ }
Review Comment:
These lines seems to be repeated in multiple methods. They can be refactored
into `validateDatabaseExists(databaseName)`.
##########
flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java:
##########
@@ -0,0 +1,935 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue;
+
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogView;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import
org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
+import org.apache.flink.table.catalog.glue.operator.GlueDatabaseOperator;
+import org.apache.flink.table.catalog.glue.operator.GlueFunctionOperator;
+import org.apache.flink.table.catalog.glue.operator.GlueTableOperator;
+import org.apache.flink.table.catalog.glue.util.GlueCatalogConstants;
+import org.apache.flink.table.catalog.glue.util.GlueTableUtils;
+import org.apache.flink.table.catalog.glue.util.GlueTypeConverter;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.functions.FunctionIdentifier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.StorageDescriptor;
+import software.amazon.awssdk.services.glue.model.Table;
+import software.amazon.awssdk.services.glue.model.TableInput;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * GlueCatalog is an implementation of the Flink AbstractCatalog that
interacts with AWS Glue.
+ * This class allows Flink to perform various catalog operations such as
creating, deleting, and retrieving
+ * databases and tables from Glue. It encapsulates AWS Glue's API and provides
a Flink-compatible interface.
+ *
+ * <p>This catalog uses GlueClient to interact with AWS Glue services, and
operations related to databases and
+ * tables are delegated to respective helper classes like
GlueDatabaseOperations and GlueTableOperations.</p>
+ */
+public class GlueCatalog extends AbstractCatalog {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(GlueCatalog.class);
+
+ private final GlueClient glueClient;
+ private final GlueTypeConverter glueTypeConverter;
+ private final GlueDatabaseOperator glueDatabaseOperations;
+ private final GlueTableOperator glueTableOperations;
+ private final GlueFunctionOperator glueFunctionsOperations;
+ private final GlueTableUtils glueTableUtils;
+
+ /**
+ * Constructs a GlueCatalog with a provided Glue client.
+ *
+ * @param name the name of the catalog
+ * @param defaultDatabase the default database for the catalog
+ * @param region the AWS region to be used for Glue operations
+ * @param glueClient Glue Client so we can decide which one to use
for testing
+ */
+ public GlueCatalog(String name, String defaultDatabase, String region,
GlueClient glueClient) {
+ super(name, defaultDatabase);
+
+ // Initialize GlueClient in the constructor
+ if (glueClient != null) {
+ this.glueClient = glueClient;
+ } else {
+ // If no GlueClient is provided, initialize it using the default
region
+ this.glueClient = GlueClient.builder()
+ .region(Region.of(region))
+ .build();
+ }
+ this.glueTypeConverter = new GlueTypeConverter();
+ this.glueTableUtils = new GlueTableUtils(glueTypeConverter);
+ this.glueDatabaseOperations = new GlueDatabaseOperator(glueClient,
getName());
+ this.glueTableOperations = new GlueTableOperator(glueClient,
getName());
+ this.glueFunctionsOperations = new GlueFunctionOperator(glueClient,
getName());
+ }
+
+ /**
+ * Constructs a GlueCatalog with default client.
+ *
+ * @param name the name of the catalog
+ * @param defaultDatabase the default database for the catalog
+ * @param region the AWS region to be used for Glue operations
+ */
+ public GlueCatalog(String name, String defaultDatabase, String region) {
+ super(name, defaultDatabase);
+
+ // Create a synchronized client builder to avoid concurrent
modification exceptions
+ this.glueClient = GlueClient.builder()
+ .region(Region.of(region))
+
.credentialsProvider(software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider.create())
+ .build();
+ this.glueTypeConverter = new GlueTypeConverter();
+ this.glueTableUtils = new GlueTableUtils(glueTypeConverter);
+ this.glueDatabaseOperations = new GlueDatabaseOperator(glueClient,
getName());
+ this.glueTableOperations = new GlueTableOperator(glueClient,
getName());
+ this.glueFunctionsOperations = new GlueFunctionOperator(glueClient,
getName());
+ }
+
+ /**
+ * Opens the GlueCatalog and initializes necessary resources.
+ *
+ * @throws CatalogException if an error occurs during the opening process
+ */
+ @Override
+ public void open() throws CatalogException {
+ LOG.info("Opening GlueCatalog with client: {}", glueClient);
+ }
+
+ /**
+ * Closes the GlueCatalog and releases resources.
+ *
+ * @throws CatalogException if an error occurs during the closing process
+ */
+ @Override
+ public void close() throws CatalogException {
+ if (glueClient != null) {
+ LOG.info("Closing GlueCatalog client");
+ int maxRetries = 3;
+ int retryCount = 0;
+ long retryDelayMs = 200;
+ while (retryCount < maxRetries) {
+ try {
+ glueClient.close();
+ LOG.info("Successfully closed GlueCatalog client");
+ return;
+ } catch (RuntimeException e) {
+ retryCount++;
+ if (retryCount >= maxRetries) {
+ LOG.warn("Failed to close GlueCatalog client after {}
retries", maxRetries, e);
+ throw new CatalogException("Failed to close
GlueCatalog client", e);
+ }
+ LOG.warn("Failed to close GlueCatalog client (attempt
{}/{}), retrying in {} ms",
+ retryCount, maxRetries, retryDelayMs, e);
+ try {
+ Thread.sleep(retryDelayMs);
+ // Exponential backoff
+ retryDelayMs *= 2;
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw new CatalogException("Interrupted while retrying
to close GlueCatalog client", ie);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Lists all the databases available in the Glue catalog.
+ *
+ * @return a list of database names
+ * @throws CatalogException if an error occurs while listing the databases
+ */
+ @Override
+ public List<String> listDatabases() throws CatalogException {
+ return glueDatabaseOperations.listDatabases();
+ }
+
+ /**
+ * Retrieves a specific database by its name.
+ *
+ * @param databaseName the name of the database to retrieve
+ * @return the database if found
+ * @throws DatabaseNotExistException if the database does not exist
+ * @throws CatalogException if an error occurs while retrieving
the database
+ */
+ @Override
+ public CatalogDatabase getDatabase(String databaseName) throws
DatabaseNotExistException, CatalogException {
+ boolean databaseExists = databaseExists(databaseName);
+ if (!databaseExists) {
+ throw new DatabaseNotExistException(getName(), databaseName);
+ }
+
+ return glueDatabaseOperations.getDatabase(databaseName);
+ }
+
+ /**
+ * Checks if a database exists in Glue.
+ *
+ * @param databaseName the name of the database
+ * @return true if the database exists, false otherwise
+ * @throws CatalogException if an error occurs while checking the database
+ */
+ @Override
+ public boolean databaseExists(String databaseName) throws CatalogException
{
+ return glueDatabaseOperations.glueDatabaseExists(databaseName);
+ }
+
+ /**
+ * Creates a new database in Glue.
+ *
+ * @param databaseName the name of the database to create
+ * @param catalogDatabase the catalog database object containing database
metadata
+ * @param ifNotExists flag indicating whether to ignore the error if
the database already exists
+ * @throws DatabaseAlreadyExistException if the database already exists
and ifNotExists is false
+ * @throws CatalogException if an error occurs while creating
the database
+ */
+ @Override
+ public void createDatabase(String databaseName, CatalogDatabase
catalogDatabase, boolean ifNotExists)
+ throws DatabaseAlreadyExistException, CatalogException {
+ boolean exists = databaseExists(databaseName);
+
+ if (exists && !ifNotExists) {
+ throw new DatabaseAlreadyExistException(getName(), databaseName);
+ }
+
+ if (!exists) {
+ glueDatabaseOperations.createDatabase(databaseName,
catalogDatabase);
+ }
+ }
+
+ /**
+ * Drops an existing database in Glue.
+ *
+ * @param databaseName the name of the database to drop
+ * @param ignoreIfNotExists flag to ignore the exception if the database
doesn't exist
+ * @param cascade flag indicating whether to cascade the
operation to drop related objects
+ * @throws DatabaseNotExistException if the database does not exist and
ignoreIfNotExists is false
+ * @throws DatabaseNotEmptyException if the database contains objects and
cascade is false
+ * @throws CatalogException if an error occurs while dropping the
database
+ */
+ @Override
+ public void dropDatabase(String databaseName, boolean ignoreIfNotExists,
boolean cascade)
+ throws DatabaseNotExistException, DatabaseNotEmptyException,
CatalogException {
+ if (databaseExists(databaseName)) {
+ glueDatabaseOperations.dropGlueDatabase(databaseName);
+ } else if (!ignoreIfNotExists) {
+ throw new DatabaseNotExistException(getName(), databaseName);
+ }
+ }
+
+ /**
+ * Lists all tables in a specified database.
+ *
+ * @param databaseName the name of the database
+ * @return a list of table names in the database
+ * @throws DatabaseNotExistException if the database does not exist
+ * @throws CatalogException if an error occurs while listing the
tables
+ */
+ @Override
+ public List<String> listTables(String databaseName) throws
DatabaseNotExistException, CatalogException {
+ if (!databaseExists(databaseName)) {
+ throw new DatabaseNotExistException(getName(), databaseName);
+ }
+
+ return glueTableOperations.listTables(databaseName);
+ }
+
+ /**
+ * Retrieves a table from the catalog using its object path.
+ *
+ * @param objectPath the object path of the table to retrieve
+ * @return the corresponding CatalogBaseTable for the specified table
+ * @throws TableNotExistException if the table does not exist
+ * @throws CatalogException if an error occurs while retrieving the
table
+ */
+ @Override
+ public CatalogBaseTable getTable(ObjectPath objectPath) throws
TableNotExistException, CatalogException {
+ String databaseName = objectPath.getDatabaseName();
+ String tableName = objectPath.getObjectName();
+
+ Table glueTable = glueTableOperations.getGlueTable(databaseName,
tableName);
+ return getCatalogBaseTableFromGlueTable(glueTable);
+ }
+
+ /**
+ * Checks if a table exists in the Glue catalog.
+ *
+ * @param objectPath the object path of the table to check
+ * @return true if the table exists, false otherwise
+ * @throws CatalogException if an error occurs while checking the table's
existence
+ */
+ @Override
+ public boolean tableExists(ObjectPath objectPath) throws CatalogException {
+ String databaseName = objectPath.getDatabaseName();
+ String tableName = objectPath.getObjectName();
+
+ // Delegate existence check to GlueTableOperations
+ return glueTableOperations.glueTableExists(databaseName, tableName);
+ }
+
+ /**
+ * Drops a table from the Glue catalog.
+ *
+ * @param objectPath the object path of the table to drop
+ * @param ifExists flag indicating whether to ignore the exception if
the table does not exist
+ * @throws TableNotExistException if the table does not exist and ifExists
is false
+ * @throws CatalogException if an error occurs while dropping the
table
+ */
+ @Override
+ public void dropTable(ObjectPath objectPath, boolean ifExists) throws
TableNotExistException, CatalogException {
+ String databaseName = objectPath.getDatabaseName();
+ String tableName = objectPath.getObjectName();
+
+ if (!glueTableOperations.glueTableExists(databaseName, tableName)) {
+ if (!ifExists) {
+ throw new TableNotExistException(getName(), objectPath);
+ }
+ return; // Table doesn't exist, and IF EXISTS is true
+ }
+
+ glueTableOperations.dropTable(databaseName, tableName);
+ }
+
+ /**
+ * Creates a table in the Glue catalog.
+ *
+ * @param objectPath the object path of the table to create
+ * @param catalogBaseTable the table definition containing the schema and
properties
+ * @param ifNotExists flag indicating whether to ignore the exception
if the table already exists
+ * @throws TableAlreadyExistException if the table already exists and
ifNotExists is false
Review Comment:
Javadoc should mention NPE here as null check is done on objectPath and
catalogBaseTable
##########
flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java:
##########
@@ -0,0 +1,935 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue;
+
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogView;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import
org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
+import org.apache.flink.table.catalog.glue.operator.GlueDatabaseOperator;
+import org.apache.flink.table.catalog.glue.operator.GlueFunctionOperator;
+import org.apache.flink.table.catalog.glue.operator.GlueTableOperator;
+import org.apache.flink.table.catalog.glue.util.GlueCatalogConstants;
+import org.apache.flink.table.catalog.glue.util.GlueTableUtils;
+import org.apache.flink.table.catalog.glue.util.GlueTypeConverter;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.functions.FunctionIdentifier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.StorageDescriptor;
+import software.amazon.awssdk.services.glue.model.Table;
+import software.amazon.awssdk.services.glue.model.TableInput;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * GlueCatalog is an implementation of the Flink AbstractCatalog that
interacts with AWS Glue.
+ * This class allows Flink to perform various catalog operations such as
creating, deleting, and retrieving
+ * databases and tables from Glue. It encapsulates AWS Glue's API and provides
a Flink-compatible interface.
+ *
+ * <p>This catalog uses GlueClient to interact with AWS Glue services, and
operations related to databases and
+ * tables are delegated to respective helper classes like
GlueDatabaseOperations and GlueTableOperations.</p>
+ */
+public class GlueCatalog extends AbstractCatalog {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(GlueCatalog.class);
+
+ private final GlueClient glueClient;
+ private final GlueTypeConverter glueTypeConverter;
+ private final GlueDatabaseOperator glueDatabaseOperations;
+ private final GlueTableOperator glueTableOperations;
+ private final GlueFunctionOperator glueFunctionsOperations;
+ private final GlueTableUtils glueTableUtils;
+
+ /**
+ * Constructs a GlueCatalog with a provided Glue client.
+ *
+ * @param name the name of the catalog
+ * @param defaultDatabase the default database for the catalog
+ * @param region the AWS region to be used for Glue operations
+ * @param glueClient Glue Client so we can decide which one to use
for testing
+ */
+ public GlueCatalog(String name, String defaultDatabase, String region,
GlueClient glueClient) {
+ super(name, defaultDatabase);
+
+ // Initialize GlueClient in the constructor
+ if (glueClient != null) {
+ this.glueClient = glueClient;
+ } else {
+ // If no GlueClient is provided, initialize it using the default
region
+ this.glueClient = GlueClient.builder()
+ .region(Region.of(region))
+ .build();
+ }
+ this.glueTypeConverter = new GlueTypeConverter();
+ this.glueTableUtils = new GlueTableUtils(glueTypeConverter);
+ this.glueDatabaseOperations = new GlueDatabaseOperator(glueClient,
getName());
+ this.glueTableOperations = new GlueTableOperator(glueClient,
getName());
+ this.glueFunctionsOperations = new GlueFunctionOperator(glueClient,
getName());
+ }
+
+ /**
+ * Constructs a GlueCatalog with default client.
+ *
+ * @param name the name of the catalog
+ * @param defaultDatabase the default database for the catalog
+ * @param region the AWS region to be used for Glue operations
+ */
+ public GlueCatalog(String name, String defaultDatabase, String region) {
+ super(name, defaultDatabase);
+
+ // Create a synchronized client builder to avoid concurrent
modification exceptions
+ this.glueClient = GlueClient.builder()
+ .region(Region.of(region))
+
.credentialsProvider(software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider.create())
+ .build();
+ this.glueTypeConverter = new GlueTypeConverter();
+ this.glueTableUtils = new GlueTableUtils(glueTypeConverter);
+ this.glueDatabaseOperations = new GlueDatabaseOperator(glueClient,
getName());
+ this.glueTableOperations = new GlueTableOperator(glueClient,
getName());
+ this.glueFunctionsOperations = new GlueFunctionOperator(glueClient,
getName());
+ }
+
+ /**
+ * Opens the GlueCatalog and initializes necessary resources.
+ *
+ * @throws CatalogException if an error occurs during the opening process
+ */
+ @Override
+ public void open() throws CatalogException {
+ LOG.info("Opening GlueCatalog with client: {}", glueClient);
+ }
+
+ /**
+ * Closes the GlueCatalog and releases resources.
+ *
+ * @throws CatalogException if an error occurs during the closing process
+ */
+ @Override
+ public void close() throws CatalogException {
+ if (glueClient != null) {
+ LOG.info("Closing GlueCatalog client");
+ int maxRetries = 3;
+ int retryCount = 0;
+ long retryDelayMs = 200;
+ while (retryCount < maxRetries) {
+ try {
+ glueClient.close();
+ LOG.info("Successfully closed GlueCatalog client");
+ return;
+ } catch (RuntimeException e) {
+ retryCount++;
+ if (retryCount >= maxRetries) {
+ LOG.warn("Failed to close GlueCatalog client after {}
retries", maxRetries, e);
+ throw new CatalogException("Failed to close
GlueCatalog client", e);
+ }
+ LOG.warn("Failed to close GlueCatalog client (attempt
{}/{}), retrying in {} ms",
+ retryCount, maxRetries, retryDelayMs, e);
+ try {
+ Thread.sleep(retryDelayMs);
+ // Exponential backoff
+ retryDelayMs *= 2;
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw new CatalogException("Interrupted while retrying
to close GlueCatalog client", ie);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Lists all the databases available in the Glue catalog.
+ *
+ * @return a list of database names
+ * @throws CatalogException if an error occurs while listing the databases
+ */
+ @Override
+ public List<String> listDatabases() throws CatalogException {
+ return glueDatabaseOperations.listDatabases();
+ }
+
+ /**
+ * Retrieves a specific database by its name.
+ *
+ * @param databaseName the name of the database to retrieve
+ * @return the database if found
+ * @throws DatabaseNotExistException if the database does not exist
+ * @throws CatalogException if an error occurs while retrieving
the database
+ */
+ @Override
+ public CatalogDatabase getDatabase(String databaseName) throws
DatabaseNotExistException, CatalogException {
+ boolean databaseExists = databaseExists(databaseName);
+ if (!databaseExists) {
+ throw new DatabaseNotExistException(getName(), databaseName);
+ }
+
+ return glueDatabaseOperations.getDatabase(databaseName);
+ }
+
+ /**
+ * Checks if a database exists in Glue.
+ *
+ * @param databaseName the name of the database
+ * @return true if the database exists, false otherwise
+ * @throws CatalogException if an error occurs while checking the database
+ */
+ @Override
+ public boolean databaseExists(String databaseName) throws CatalogException
{
+ return glueDatabaseOperations.glueDatabaseExists(databaseName);
+ }
+
+ /**
+ * Creates a new database in Glue.
+ *
+ * @param databaseName the name of the database to create
+ * @param catalogDatabase the catalog database object containing database
metadata
+ * @param ifNotExists flag indicating whether to ignore the error if
the database already exists
+ * @throws DatabaseAlreadyExistException if the database already exists
and ifNotExists is false
+ * @throws CatalogException if an error occurs while creating
the database
+ */
+ @Override
+ public void createDatabase(String databaseName, CatalogDatabase
catalogDatabase, boolean ifNotExists)
+ throws DatabaseAlreadyExistException, CatalogException {
+ boolean exists = databaseExists(databaseName);
+
+ if (exists && !ifNotExists) {
+ throw new DatabaseAlreadyExistException(getName(), databaseName);
+ }
+
+ if (!exists) {
+ glueDatabaseOperations.createDatabase(databaseName,
catalogDatabase);
+ }
+ }
+
+ /**
+ * Drops an existing database in Glue.
+ *
+ * @param databaseName the name of the database to drop
+ * @param ignoreIfNotExists flag to ignore the exception if the database
doesn't exist
+ * @param cascade flag indicating whether to cascade the
operation to drop related objects
+ * @throws DatabaseNotExistException if the database does not exist and
ignoreIfNotExists is false
+ * @throws DatabaseNotEmptyException if the database contains objects and
cascade is false
+ * @throws CatalogException if an error occurs while dropping the
database
+ */
+ @Override
+ public void dropDatabase(String databaseName, boolean ignoreIfNotExists,
boolean cascade)
+ throws DatabaseNotExistException, DatabaseNotEmptyException,
CatalogException {
+ if (databaseExists(databaseName)) {
+ glueDatabaseOperations.dropGlueDatabase(databaseName);
+ } else if (!ignoreIfNotExists) {
+ throw new DatabaseNotExistException(getName(), databaseName);
+ }
+ }
+
+ /**
+ * Lists all tables in a specified database.
+ *
+ * @param databaseName the name of the database
+ * @return a list of table names in the database
+ * @throws DatabaseNotExistException if the database does not exist
+ * @throws CatalogException if an error occurs while listing the
tables
+ */
+ @Override
+ public List<String> listTables(String databaseName) throws
DatabaseNotExistException, CatalogException {
+ if (!databaseExists(databaseName)) {
+ throw new DatabaseNotExistException(getName(), databaseName);
+ }
+
+ return glueTableOperations.listTables(databaseName);
+ }
+
+ /**
+ * Retrieves a table from the catalog using its object path.
+ *
+ * @param objectPath the object path of the table to retrieve
+ * @return the corresponding CatalogBaseTable for the specified table
+ * @throws TableNotExistException if the table does not exist
+ * @throws CatalogException if an error occurs while retrieving the
table
+ */
+ @Override
+ public CatalogBaseTable getTable(ObjectPath objectPath) throws
TableNotExistException, CatalogException {
+ String databaseName = objectPath.getDatabaseName();
+ String tableName = objectPath.getObjectName();
+
+ Table glueTable = glueTableOperations.getGlueTable(databaseName,
tableName);
+ return getCatalogBaseTableFromGlueTable(glueTable);
+ }
+
+ /**
+ * Checks if a table exists in the Glue catalog.
+ *
+ * @param objectPath the object path of the table to check
+ * @return true if the table exists, false otherwise
+ * @throws CatalogException if an error occurs while checking the table's
existence
+ */
+ @Override
+ public boolean tableExists(ObjectPath objectPath) throws CatalogException {
+ String databaseName = objectPath.getDatabaseName();
+ String tableName = objectPath.getObjectName();
+
+ // Delegate existence check to GlueTableOperations
+ return glueTableOperations.glueTableExists(databaseName, tableName);
+ }
+
+ /**
+ * Drops a table from the Glue catalog.
+ *
+ * @param objectPath the object path of the table to drop
+ * @param ifExists flag indicating whether to ignore the exception if
the table does not exist
+ * @throws TableNotExistException if the table does not exist and ifExists
is false
+ * @throws CatalogException if an error occurs while dropping the
table
+ */
+ @Override
+ public void dropTable(ObjectPath objectPath, boolean ifExists) throws
TableNotExistException, CatalogException {
+ String databaseName = objectPath.getDatabaseName();
+ String tableName = objectPath.getObjectName();
+
+ if (!glueTableOperations.glueTableExists(databaseName, tableName)) {
+ if (!ifExists) {
+ throw new TableNotExistException(getName(), objectPath);
+ }
+ return; // Table doesn't exist, and IF EXISTS is true
+ }
+
+ glueTableOperations.dropTable(databaseName, tableName);
+ }
+
+ /**
+ * Creates a table in the Glue catalog.
+ *
+ * @param objectPath the object path of the table to create
+ * @param catalogBaseTable the table definition containing the schema and
properties
+ * @param ifNotExists flag indicating whether to ignore the exception
if the table already exists
+ * @throws TableAlreadyExistException if the table already exists and
ifNotExists is false
+ * @throws DatabaseNotExistException if the database does not exist
+ * @throws CatalogException if an error occurs while creating
the table
+ */
+ @Override
+ public void createTable(ObjectPath objectPath, CatalogBaseTable
catalogBaseTable, boolean ifNotExists)
+ throws TableAlreadyExistException, DatabaseNotExistException,
CatalogException {
+
+ // Validate that required parameters are not null
+ if (objectPath == null) {
+ throw new NullPointerException("ObjectPath cannot be null");
+ }
+
+ if (catalogBaseTable == null) {
+ throw new NullPointerException("CatalogBaseTable cannot be null");
+ }
+
+ String databaseName = objectPath.getDatabaseName();
+ String tableName = objectPath.getObjectName();
+
+ // Check if the database exists
+ if (!databaseExists(databaseName)) {
+ throw new DatabaseNotExistException(getName(), databaseName);
+ }
+
+ // Check if the table already exists
+ if (glueTableOperations.glueTableExists(databaseName, tableName)) {
+ if (!ifNotExists) {
+ throw new TableAlreadyExistException(getName(), objectPath);
+ }
+ return; // Table exists, and IF NOT EXISTS is true
+ }
+
+ // Get common properties
+ Map<String, String> tableProperties = new
HashMap<>(catalogBaseTable.getOptions());
+
+ try {
+ // Process based on table type
+ if (catalogBaseTable.getTableKind() ==
CatalogBaseTable.TableKind.TABLE) {
+ createRegularTable(objectPath, (CatalogTable)
catalogBaseTable, tableProperties);
+ } else if (catalogBaseTable.getTableKind() ==
CatalogBaseTable.TableKind.VIEW) {
+ createView(objectPath, (CatalogView) catalogBaseTable,
tableProperties);
+ } else {
+ throw new CatalogException("Unsupported table kind: " +
catalogBaseTable.getTableKind());
+ }
+ LOG.info("Successfully created {}.{} of kind {}", databaseName,
tableName, catalogBaseTable.getTableKind());
+ } catch (Exception e) {
+ throw new CatalogException(
+ String.format("Failed to create table %s.%s: %s",
databaseName, tableName, e.getMessage()), e);
+ }
+ }
+
+ /**
+ * Lists all views in a specified database.
+ *
+ * @param databaseName the name of the database
+ * @return a list of view names in the database
+ * @throws DatabaseNotExistException if the database does not exist
+ * @throws CatalogException if an error occurs while listing the
views
+ */
+ @Override
+ public List<String> listViews(String databaseName) throws
DatabaseNotExistException, CatalogException {
+
+ // Check if the database exists before listing views
+ if (!databaseExists(databaseName)) {
+ throw new DatabaseNotExistException(getName(), databaseName);
+ }
+
+ try {
+ // Get all tables in the database
+ List<Table> allTables = glueClient.getTables(builder ->
builder.databaseName(databaseName))
+ .tableList();
+
+ // Filter tables to only include those that are of type VIEW
+ List<String> viewNames = allTables.stream()
+ .filter(table -> {
+ String tableType = table.tableType();
+ return tableType != null &&
tableType.equalsIgnoreCase(CatalogBaseTable.TableKind.VIEW.name());
+ })
+ .map(Table::name)
+ .collect(Collectors.toList());
+
+ return viewNames;
+ } catch (Exception e) {
+ LOG.error("Failed to list views in database {}: {}", databaseName,
e.getMessage());
+ throw new CatalogException(
+ String.format("Error listing views in database %s: %s",
databaseName, e.getMessage()), e);
+ }
+ }
+
+ @Override
+ public void alterDatabase(String s, CatalogDatabase catalogDatabase,
boolean b) throws DatabaseNotExistException, CatalogException {
+ throw new UnsupportedOperationException("Altering databases is not
supported by the Glue Catalog.");
+ }
+
+ @Override
+ public void renameTable(ObjectPath objectPath, String s, boolean b) throws
TableNotExistException, TableAlreadyExistException, CatalogException {
+ throw new UnsupportedOperationException("Renaming tables is not
supported by the Glue Catalog.");
+ }
+
+ @Override
+ public void alterTable(ObjectPath objectPath, CatalogBaseTable
catalogBaseTable, boolean b) throws TableNotExistException, CatalogException {
+ throw new UnsupportedOperationException("Altering tables is not
supported by the Glue Catalog.");
+ }
+
+ @Override
+ public List<CatalogPartitionSpec> listPartitions(ObjectPath objectPath)
throws TableNotExistException, TableNotPartitionedException, CatalogException {
+ throw new UnsupportedOperationException("Table partitioning operations
are not supported by the Glue Catalog.");
+ }
+
+ @Override
+ public List<CatalogPartitionSpec> listPartitions(ObjectPath objectPath,
CatalogPartitionSpec catalogPartitionSpec) throws TableNotExistException,
TableNotPartitionedException, PartitionSpecInvalidException, CatalogException {
+ throw new UnsupportedOperationException("Table partitioning operations
are not supported by the Glue Catalog.");
+ }
+
+ @Override
+ public List<CatalogPartitionSpec> listPartitionsByFilter(ObjectPath
objectPath, List<Expression> list) throws TableNotExistException,
TableNotPartitionedException, CatalogException {
+ throw new UnsupportedOperationException("Table partitioning operations
are not supported by the Glue Catalog.");
+ }
+
+ @Override
+ public CatalogPartition getPartition(ObjectPath objectPath,
CatalogPartitionSpec catalogPartitionSpec) throws PartitionNotExistException,
CatalogException {
+ throw new UnsupportedOperationException("Table partitioning operations
are not supported by the Glue Catalog.");
+ }
+
+ @Override
+ public boolean partitionExists(ObjectPath objectPath, CatalogPartitionSpec
catalogPartitionSpec) throws CatalogException {
+ throw new UnsupportedOperationException("Table partitioning operations
are not supported by the Glue Catalog.");
+ }
+
+ @Override
+ public void createPartition(ObjectPath objectPath, CatalogPartitionSpec
catalogPartitionSpec, CatalogPartition catalogPartition, boolean b) throws
TableNotExistException, TableNotPartitionedException,
PartitionSpecInvalidException, PartitionAlreadyExistsException,
CatalogException {
+ throw new UnsupportedOperationException("Table partitioning operations
are not supported by the Glue Catalog.");
+ }
+
+ @Override
+ public void dropPartition(ObjectPath objectPath, CatalogPartitionSpec
catalogPartitionSpec, boolean b) throws PartitionNotExistException,
CatalogException {
+ throw new UnsupportedOperationException("Table partitioning operations
are not supported by the Glue Catalog.");
+ }
+
+ @Override
+ public void alterPartition(ObjectPath objectPath, CatalogPartitionSpec
catalogPartitionSpec, CatalogPartition catalogPartition, boolean b) throws
PartitionNotExistException, CatalogException {
+ throw new UnsupportedOperationException("Table partitioning operations
are not supported by the Glue Catalog.");
+ }
+
+ /**
+ * Normalizes an object path according to catalog-specific normalization
rules.
+ * For functions, this ensures consistent case handling in function names.
+ *
+ * @param path the object path to normalize
+ * @return the normalized object path
+ */
+ public ObjectPath normalize(ObjectPath path) {
+ if (path == null) {
+ throw new NullPointerException("ObjectPath cannot be null");
+ }
+
+ return new ObjectPath(
+ path.getDatabaseName(),
+ FunctionIdentifier.normalizeName(path.getObjectName()));
+ }
+
+ /**
+ * Lists all functions in a specified database.
+ *
+ * @param databaseName the name of the database
+ * @return a list of function names in the database
+ * @throws DatabaseNotExistException if the database does not exist
+ * @throws CatalogException if an error occurs while listing the
functions
+ */
+ @Override
+ public List<String> listFunctions(String databaseName) throws
DatabaseNotExistException, CatalogException {
+ if (!databaseExists(databaseName)) {
+ throw new DatabaseNotExistException(getName(), databaseName);
+ }
+
+ try {
+ List<String> functions =
glueFunctionsOperations.listGlueFunctions(databaseName);
+ return functions;
+ } catch (CatalogException e) {
+ LOG.error("Failed to list functions in database {}: {}",
databaseName, e.getMessage());
+ throw new CatalogException(
+ String.format("Error listing functions in database %s:
%s", databaseName, e.getMessage()), e);
+ }
+ }
+
+ /**
+ * Retrieves a function from the catalog.
+ *
+ * @param functionPath the object path of the function to retrieve
+ * @return the corresponding CatalogFunction
+ * @throws FunctionNotExistException if the function does not exist
+ * @throws CatalogException if an error occurs while retrieving
the function
+ */
+ @Override
+ public CatalogFunction getFunction(ObjectPath functionPath) throws
FunctionNotExistException, CatalogException {
+
+ // Normalize the path for case-insensitive handling
+ ObjectPath normalizedPath = normalize(functionPath);
+
+ if (!databaseExists(normalizedPath.getDatabaseName())) {
+ throw new CatalogException(getName());
+ }
+
+ try {
+ return glueFunctionsOperations.getGlueFunction(normalizedPath);
+ } catch (FunctionNotExistException e) {
+ throw e;
Review Comment:
The catch and rethrow as-is is redundant.
Do you mean to add additional information here by wrapping with another
exception? Otherwise, remove the catch entirely.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]