nicusX commented on code in PR #206: URL: https://github.com/apache/flink-connector-aws/pull/206#discussion_r2109663034
########## flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java: ########## @@ -0,0 +1,935 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue; + +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.catalog.AbstractCatalog; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogFunction; +import org.apache.flink.table.catalog.CatalogPartition; +import org.apache.flink.table.catalog.CatalogPartitionSpec; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.CatalogView; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.ResolvedCatalogBaseTable; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.FunctionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException; +import org.apache.flink.table.catalog.exceptions.PartitionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException; +import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; +import org.apache.flink.table.catalog.exceptions.TablePartitionedException; +import org.apache.flink.table.catalog.glue.operator.GlueDatabaseOperator; +import org.apache.flink.table.catalog.glue.operator.GlueFunctionOperator; +import org.apache.flink.table.catalog.glue.operator.GlueTableOperator; +import org.apache.flink.table.catalog.glue.util.GlueCatalogConstants; +import org.apache.flink.table.catalog.glue.util.GlueTableUtils; +import org.apache.flink.table.catalog.glue.util.GlueTypeConverter; +import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; +import org.apache.flink.table.catalog.stats.CatalogTableStatistics; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.functions.FunctionIdentifier; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.glue.GlueClient; +import software.amazon.awssdk.services.glue.model.StorageDescriptor; +import software.amazon.awssdk.services.glue.model.Table; +import software.amazon.awssdk.services.glue.model.TableInput; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * GlueCatalog is an implementation of the Flink AbstractCatalog that interacts with AWS Glue. + * This class allows Flink to perform various catalog operations such as creating, deleting, and retrieving + * databases and tables from Glue. It encapsulates AWS Glue's API and provides a Flink-compatible interface. + * + * <p>This catalog uses GlueClient to interact with AWS Glue services, and operations related to databases and + * tables are delegated to respective helper classes like GlueDatabaseOperations and GlueTableOperations.</p> + */ +public class GlueCatalog extends AbstractCatalog { + + private static final Logger LOG = LoggerFactory.getLogger(GlueCatalog.class); + + private final GlueClient glueClient; + private final GlueTypeConverter glueTypeConverter; + private final GlueDatabaseOperator glueDatabaseOperations; + private final GlueTableOperator glueTableOperations; + private final GlueFunctionOperator glueFunctionsOperations; + private final GlueTableUtils glueTableUtils; + + /** + * Constructs a GlueCatalog with a provided Glue client. + * + * @param name the name of the catalog + * @param defaultDatabase the default database for the catalog + * @param region the AWS region to be used for Glue operations + * @param glueClient Glue Client so we can decide which one to use for testing + */ + public GlueCatalog(String name, String defaultDatabase, String region, GlueClient glueClient) { + super(name, defaultDatabase); + + // Initialize GlueClient in the constructor + if (glueClient != null) { Review Comment: `region` should be validated with `Preconditions` (I see `name` and `defaultDatabase` are validated in the superclass) ########## flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java: ########## @@ -0,0 +1,935 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue; + +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.catalog.AbstractCatalog; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogFunction; +import org.apache.flink.table.catalog.CatalogPartition; +import org.apache.flink.table.catalog.CatalogPartitionSpec; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.CatalogView; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.ResolvedCatalogBaseTable; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.FunctionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException; +import org.apache.flink.table.catalog.exceptions.PartitionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException; +import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; +import org.apache.flink.table.catalog.exceptions.TablePartitionedException; +import org.apache.flink.table.catalog.glue.operator.GlueDatabaseOperator; +import org.apache.flink.table.catalog.glue.operator.GlueFunctionOperator; +import org.apache.flink.table.catalog.glue.operator.GlueTableOperator; +import org.apache.flink.table.catalog.glue.util.GlueCatalogConstants; +import org.apache.flink.table.catalog.glue.util.GlueTableUtils; +import org.apache.flink.table.catalog.glue.util.GlueTypeConverter; +import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; +import org.apache.flink.table.catalog.stats.CatalogTableStatistics; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.functions.FunctionIdentifier; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.glue.GlueClient; +import software.amazon.awssdk.services.glue.model.StorageDescriptor; +import software.amazon.awssdk.services.glue.model.Table; +import software.amazon.awssdk.services.glue.model.TableInput; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * GlueCatalog is an implementation of the Flink AbstractCatalog that interacts with AWS Glue. + * This class allows Flink to perform various catalog operations such as creating, deleting, and retrieving + * databases and tables from Glue. It encapsulates AWS Glue's API and provides a Flink-compatible interface. + * + * <p>This catalog uses GlueClient to interact with AWS Glue services, and operations related to databases and + * tables are delegated to respective helper classes like GlueDatabaseOperations and GlueTableOperations.</p> + */ +public class GlueCatalog extends AbstractCatalog { Review Comment: General comment: I would make most public methods expecting database names, table names etc more robust, checking the argument is not null and not just whitespace. To make errors bit more user-frendly, rather than just throwing NPE. Copying what happens in other parts of Flink, for example. ``` Preconditions.checkArgument( !StringUtils.isNullOrWhitespaceOnly(databaseName), "databaseName cannot be null or empty"); ``` ########## flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java: ########## @@ -0,0 +1,935 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue; + +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.catalog.AbstractCatalog; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogFunction; +import org.apache.flink.table.catalog.CatalogPartition; +import org.apache.flink.table.catalog.CatalogPartitionSpec; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.CatalogView; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.ResolvedCatalogBaseTable; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.FunctionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException; +import org.apache.flink.table.catalog.exceptions.PartitionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException; +import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; +import org.apache.flink.table.catalog.exceptions.TablePartitionedException; +import org.apache.flink.table.catalog.glue.operator.GlueDatabaseOperator; +import org.apache.flink.table.catalog.glue.operator.GlueFunctionOperator; +import org.apache.flink.table.catalog.glue.operator.GlueTableOperator; +import org.apache.flink.table.catalog.glue.util.GlueCatalogConstants; +import org.apache.flink.table.catalog.glue.util.GlueTableUtils; +import org.apache.flink.table.catalog.glue.util.GlueTypeConverter; +import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; +import org.apache.flink.table.catalog.stats.CatalogTableStatistics; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.functions.FunctionIdentifier; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.glue.GlueClient; +import software.amazon.awssdk.services.glue.model.StorageDescriptor; +import software.amazon.awssdk.services.glue.model.Table; +import software.amazon.awssdk.services.glue.model.TableInput; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * GlueCatalog is an implementation of the Flink AbstractCatalog that interacts with AWS Glue. + * This class allows Flink to perform various catalog operations such as creating, deleting, and retrieving + * databases and tables from Glue. It encapsulates AWS Glue's API and provides a Flink-compatible interface. + * + * <p>This catalog uses GlueClient to interact with AWS Glue services, and operations related to databases and + * tables are delegated to respective helper classes like GlueDatabaseOperations and GlueTableOperations.</p> + */ +public class GlueCatalog extends AbstractCatalog { + + private static final Logger LOG = LoggerFactory.getLogger(GlueCatalog.class); + + private final GlueClient glueClient; + private final GlueTypeConverter glueTypeConverter; + private final GlueDatabaseOperator glueDatabaseOperations; + private final GlueTableOperator glueTableOperations; + private final GlueFunctionOperator glueFunctionsOperations; + private final GlueTableUtils glueTableUtils; + + /** + * Constructs a GlueCatalog with a provided Glue client. + * + * @param name the name of the catalog + * @param defaultDatabase the default database for the catalog + * @param region the AWS region to be used for Glue operations + * @param glueClient Glue Client so we can decide which one to use for testing + */ + public GlueCatalog(String name, String defaultDatabase, String region, GlueClient glueClient) { + super(name, defaultDatabase); + + // Initialize GlueClient in the constructor + if (glueClient != null) { + this.glueClient = glueClient; + } else { + // If no GlueClient is provided, initialize it using the default region + this.glueClient = GlueClient.builder() + .region(Region.of(region)) + .build(); + } + this.glueTypeConverter = new GlueTypeConverter(); + this.glueTableUtils = new GlueTableUtils(glueTypeConverter); + this.glueDatabaseOperations = new GlueDatabaseOperator(glueClient, getName()); + this.glueTableOperations = new GlueTableOperator(glueClient, getName()); + this.glueFunctionsOperations = new GlueFunctionOperator(glueClient, getName()); + } + + /** + * Constructs a GlueCatalog with default client. + * + * @param name the name of the catalog + * @param defaultDatabase the default database for the catalog + * @param region the AWS region to be used for Glue operations + */ + public GlueCatalog(String name, String defaultDatabase, String region) { + super(name, defaultDatabase); + + // Create a synchronized client builder to avoid concurrent modification exceptions + this.glueClient = GlueClient.builder() + .region(Region.of(region)) + .credentialsProvider(software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider.create()) + .build(); + this.glueTypeConverter = new GlueTypeConverter(); + this.glueTableUtils = new GlueTableUtils(glueTypeConverter); + this.glueDatabaseOperations = new GlueDatabaseOperator(glueClient, getName()); + this.glueTableOperations = new GlueTableOperator(glueClient, getName()); + this.glueFunctionsOperations = new GlueFunctionOperator(glueClient, getName()); + } + + /** + * Opens the GlueCatalog and initializes necessary resources. + * + * @throws CatalogException if an error occurs during the opening process + */ + @Override + public void open() throws CatalogException { + LOG.info("Opening GlueCatalog with client: {}", glueClient); + } + + /** + * Closes the GlueCatalog and releases resources. + * + * @throws CatalogException if an error occurs during the closing process + */ + @Override + public void close() throws CatalogException { + if (glueClient != null) { + LOG.info("Closing GlueCatalog client"); + int maxRetries = 3; + int retryCount = 0; + long retryDelayMs = 200; + while (retryCount < maxRetries) { + try { + glueClient.close(); + LOG.info("Successfully closed GlueCatalog client"); + return; + } catch (RuntimeException e) { + retryCount++; + if (retryCount >= maxRetries) { + LOG.warn("Failed to close GlueCatalog client after {} retries", maxRetries, e); + throw new CatalogException("Failed to close GlueCatalog client", e); + } + LOG.warn("Failed to close GlueCatalog client (attempt {}/{}), retrying in {} ms", + retryCount, maxRetries, retryDelayMs, e); + try { + Thread.sleep(retryDelayMs); + // Exponential backoff + retryDelayMs *= 2; + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new CatalogException("Interrupted while retrying to close GlueCatalog client", ie); + } + } + } + } + } + + /** + * Lists all the databases available in the Glue catalog. + * + * @return a list of database names + * @throws CatalogException if an error occurs while listing the databases + */ + @Override + public List<String> listDatabases() throws CatalogException { + return glueDatabaseOperations.listDatabases(); + } + + /** + * Retrieves a specific database by its name. + * + * @param databaseName the name of the database to retrieve + * @return the database if found + * @throws DatabaseNotExistException if the database does not exist + * @throws CatalogException if an error occurs while retrieving the database + */ + @Override + public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException { + boolean databaseExists = databaseExists(databaseName); + if (!databaseExists) { + throw new DatabaseNotExistException(getName(), databaseName); + } + + return glueDatabaseOperations.getDatabase(databaseName); + } + + /** + * Checks if a database exists in Glue. + * + * @param databaseName the name of the database + * @return true if the database exists, false otherwise + * @throws CatalogException if an error occurs while checking the database + */ + @Override + public boolean databaseExists(String databaseName) throws CatalogException { + return glueDatabaseOperations.glueDatabaseExists(databaseName); + } + + /** + * Creates a new database in Glue. + * + * @param databaseName the name of the database to create + * @param catalogDatabase the catalog database object containing database metadata + * @param ifNotExists flag indicating whether to ignore the error if the database already exists + * @throws DatabaseAlreadyExistException if the database already exists and ifNotExists is false + * @throws CatalogException if an error occurs while creating the database + */ + @Override + public void createDatabase(String databaseName, CatalogDatabase catalogDatabase, boolean ifNotExists) + throws DatabaseAlreadyExistException, CatalogException { + boolean exists = databaseExists(databaseName); + + if (exists && !ifNotExists) { + throw new DatabaseAlreadyExistException(getName(), databaseName); + } + + if (!exists) { + glueDatabaseOperations.createDatabase(databaseName, catalogDatabase); + } + } + + /** + * Drops an existing database in Glue. + * + * @param databaseName the name of the database to drop + * @param ignoreIfNotExists flag to ignore the exception if the database doesn't exist + * @param cascade flag indicating whether to cascade the operation to drop related objects + * @throws DatabaseNotExistException if the database does not exist and ignoreIfNotExists is false + * @throws DatabaseNotEmptyException if the database contains objects and cascade is false + * @throws CatalogException if an error occurs while dropping the database + */ + @Override + public void dropDatabase(String databaseName, boolean ignoreIfNotExists, boolean cascade) + throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException { + if (databaseExists(databaseName)) { + glueDatabaseOperations.dropGlueDatabase(databaseName); + } else if (!ignoreIfNotExists) { + throw new DatabaseNotExistException(getName(), databaseName); + } + } + + /** + * Lists all tables in a specified database. + * + * @param databaseName the name of the database + * @return a list of table names in the database + * @throws DatabaseNotExistException if the database does not exist + * @throws CatalogException if an error occurs while listing the tables + */ + @Override + public List<String> listTables(String databaseName) throws DatabaseNotExistException, CatalogException { + if (!databaseExists(databaseName)) { + throw new DatabaseNotExistException(getName(), databaseName); + } + + return glueTableOperations.listTables(databaseName); + } + + /** + * Retrieves a table from the catalog using its object path. + * + * @param objectPath the object path of the table to retrieve + * @return the corresponding CatalogBaseTable for the specified table + * @throws TableNotExistException if the table does not exist + * @throws CatalogException if an error occurs while retrieving the table + */ + @Override + public CatalogBaseTable getTable(ObjectPath objectPath) throws TableNotExistException, CatalogException { + String databaseName = objectPath.getDatabaseName(); + String tableName = objectPath.getObjectName(); + + Table glueTable = glueTableOperations.getGlueTable(databaseName, tableName); + return getCatalogBaseTableFromGlueTable(glueTable); + } + + /** + * Checks if a table exists in the Glue catalog. + * + * @param objectPath the object path of the table to check + * @return true if the table exists, false otherwise + * @throws CatalogException if an error occurs while checking the table's existence + */ + @Override + public boolean tableExists(ObjectPath objectPath) throws CatalogException { + String databaseName = objectPath.getDatabaseName(); + String tableName = objectPath.getObjectName(); + + // Delegate existence check to GlueTableOperations + return glueTableOperations.glueTableExists(databaseName, tableName); + } + + /** + * Drops a table from the Glue catalog. + * + * @param objectPath the object path of the table to drop + * @param ifExists flag indicating whether to ignore the exception if the table does not exist + * @throws TableNotExistException if the table does not exist and ifExists is false + * @throws CatalogException if an error occurs while dropping the table + */ + @Override + public void dropTable(ObjectPath objectPath, boolean ifExists) throws TableNotExistException, CatalogException { + String databaseName = objectPath.getDatabaseName(); + String tableName = objectPath.getObjectName(); + + if (!glueTableOperations.glueTableExists(databaseName, tableName)) { + if (!ifExists) { + throw new TableNotExistException(getName(), objectPath); + } + return; // Table doesn't exist, and IF EXISTS is true + } + + glueTableOperations.dropTable(databaseName, tableName); + } + + /** + * Creates a table in the Glue catalog. + * + * @param objectPath the object path of the table to create + * @param catalogBaseTable the table definition containing the schema and properties + * @param ifNotExists flag indicating whether to ignore the exception if the table already exists + * @throws TableAlreadyExistException if the table already exists and ifNotExists is false + * @throws DatabaseNotExistException if the database does not exist + * @throws CatalogException if an error occurs while creating the table + */ + @Override + public void createTable(ObjectPath objectPath, CatalogBaseTable catalogBaseTable, boolean ifNotExists) + throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { + + // Validate that required parameters are not null + if (objectPath == null) { Review Comment: You could use `Preconditions.checkNotNull(ref, message )` to be more Flink-proper :) ########## flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java: ########## @@ -0,0 +1,935 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue; + +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.catalog.AbstractCatalog; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogFunction; +import org.apache.flink.table.catalog.CatalogPartition; +import org.apache.flink.table.catalog.CatalogPartitionSpec; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.CatalogView; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.ResolvedCatalogBaseTable; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.FunctionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException; +import org.apache.flink.table.catalog.exceptions.PartitionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException; +import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; +import org.apache.flink.table.catalog.exceptions.TablePartitionedException; +import org.apache.flink.table.catalog.glue.operator.GlueDatabaseOperator; +import org.apache.flink.table.catalog.glue.operator.GlueFunctionOperator; +import org.apache.flink.table.catalog.glue.operator.GlueTableOperator; +import org.apache.flink.table.catalog.glue.util.GlueCatalogConstants; +import org.apache.flink.table.catalog.glue.util.GlueTableUtils; +import org.apache.flink.table.catalog.glue.util.GlueTypeConverter; +import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; +import org.apache.flink.table.catalog.stats.CatalogTableStatistics; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.functions.FunctionIdentifier; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.glue.GlueClient; +import software.amazon.awssdk.services.glue.model.StorageDescriptor; +import software.amazon.awssdk.services.glue.model.Table; +import software.amazon.awssdk.services.glue.model.TableInput; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * GlueCatalog is an implementation of the Flink AbstractCatalog that interacts with AWS Glue. + * This class allows Flink to perform various catalog operations such as creating, deleting, and retrieving + * databases and tables from Glue. It encapsulates AWS Glue's API and provides a Flink-compatible interface. + * + * <p>This catalog uses GlueClient to interact with AWS Glue services, and operations related to databases and + * tables are delegated to respective helper classes like GlueDatabaseOperations and GlueTableOperations.</p> + */ +public class GlueCatalog extends AbstractCatalog { + + private static final Logger LOG = LoggerFactory.getLogger(GlueCatalog.class); + + private final GlueClient glueClient; + private final GlueTypeConverter glueTypeConverter; + private final GlueDatabaseOperator glueDatabaseOperations; + private final GlueTableOperator glueTableOperations; + private final GlueFunctionOperator glueFunctionsOperations; + private final GlueTableUtils glueTableUtils; + + /** + * Constructs a GlueCatalog with a provided Glue client. + * + * @param name the name of the catalog + * @param defaultDatabase the default database for the catalog + * @param region the AWS region to be used for Glue operations + * @param glueClient Glue Client so we can decide which one to use for testing + */ + public GlueCatalog(String name, String defaultDatabase, String region, GlueClient glueClient) { + super(name, defaultDatabase); + + // Initialize GlueClient in the constructor + if (glueClient != null) { + this.glueClient = glueClient; + } else { + // If no GlueClient is provided, initialize it using the default region + this.glueClient = GlueClient.builder() + .region(Region.of(region)) + .build(); + } + this.glueTypeConverter = new GlueTypeConverter(); + this.glueTableUtils = new GlueTableUtils(glueTypeConverter); + this.glueDatabaseOperations = new GlueDatabaseOperator(glueClient, getName()); + this.glueTableOperations = new GlueTableOperator(glueClient, getName()); + this.glueFunctionsOperations = new GlueFunctionOperator(glueClient, getName()); + } + + /** + * Constructs a GlueCatalog with default client. + * + * @param name the name of the catalog + * @param defaultDatabase the default database for the catalog + * @param region the AWS region to be used for Glue operations + */ + public GlueCatalog(String name, String defaultDatabase, String region) { + super(name, defaultDatabase); + + // Create a synchronized client builder to avoid concurrent modification exceptions + this.glueClient = GlueClient.builder() + .region(Region.of(region)) + .credentialsProvider(software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider.create()) + .build(); + this.glueTypeConverter = new GlueTypeConverter(); + this.glueTableUtils = new GlueTableUtils(glueTypeConverter); + this.glueDatabaseOperations = new GlueDatabaseOperator(glueClient, getName()); + this.glueTableOperations = new GlueTableOperator(glueClient, getName()); + this.glueFunctionsOperations = new GlueFunctionOperator(glueClient, getName()); + } + + /** + * Opens the GlueCatalog and initializes necessary resources. + * + * @throws CatalogException if an error occurs during the opening process + */ + @Override + public void open() throws CatalogException { + LOG.info("Opening GlueCatalog with client: {}", glueClient); + } + + /** + * Closes the GlueCatalog and releases resources. + * + * @throws CatalogException if an error occurs during the closing process + */ + @Override + public void close() throws CatalogException { + if (glueClient != null) { + LOG.info("Closing GlueCatalog client"); + int maxRetries = 3; + int retryCount = 0; + long retryDelayMs = 200; + while (retryCount < maxRetries) { + try { + glueClient.close(); + LOG.info("Successfully closed GlueCatalog client"); + return; + } catch (RuntimeException e) { + retryCount++; + if (retryCount >= maxRetries) { + LOG.warn("Failed to close GlueCatalog client after {} retries", maxRetries, e); + throw new CatalogException("Failed to close GlueCatalog client", e); + } + LOG.warn("Failed to close GlueCatalog client (attempt {}/{}), retrying in {} ms", + retryCount, maxRetries, retryDelayMs, e); + try { + Thread.sleep(retryDelayMs); + // Exponential backoff + retryDelayMs *= 2; + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new CatalogException("Interrupted while retrying to close GlueCatalog client", ie); + } + } + } + } + } + + /** + * Lists all the databases available in the Glue catalog. + * + * @return a list of database names + * @throws CatalogException if an error occurs while listing the databases + */ + @Override + public List<String> listDatabases() throws CatalogException { + return glueDatabaseOperations.listDatabases(); + } + + /** + * Retrieves a specific database by its name. + * + * @param databaseName the name of the database to retrieve + * @return the database if found + * @throws DatabaseNotExistException if the database does not exist + * @throws CatalogException if an error occurs while retrieving the database + */ + @Override + public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException { + boolean databaseExists = databaseExists(databaseName); + if (!databaseExists) { + throw new DatabaseNotExistException(getName(), databaseName); + } + + return glueDatabaseOperations.getDatabase(databaseName); + } + + /** + * Checks if a database exists in Glue. + * + * @param databaseName the name of the database + * @return true if the database exists, false otherwise + * @throws CatalogException if an error occurs while checking the database + */ + @Override + public boolean databaseExists(String databaseName) throws CatalogException { + return glueDatabaseOperations.glueDatabaseExists(databaseName); + } + + /** + * Creates a new database in Glue. + * + * @param databaseName the name of the database to create + * @param catalogDatabase the catalog database object containing database metadata + * @param ifNotExists flag indicating whether to ignore the error if the database already exists + * @throws DatabaseAlreadyExistException if the database already exists and ifNotExists is false + * @throws CatalogException if an error occurs while creating the database + */ + @Override + public void createDatabase(String databaseName, CatalogDatabase catalogDatabase, boolean ifNotExists) + throws DatabaseAlreadyExistException, CatalogException { + boolean exists = databaseExists(databaseName); + + if (exists && !ifNotExists) { + throw new DatabaseAlreadyExistException(getName(), databaseName); + } + + if (!exists) { + glueDatabaseOperations.createDatabase(databaseName, catalogDatabase); + } + } + + /** + * Drops an existing database in Glue. + * + * @param databaseName the name of the database to drop + * @param ignoreIfNotExists flag to ignore the exception if the database doesn't exist + * @param cascade flag indicating whether to cascade the operation to drop related objects + * @throws DatabaseNotExistException if the database does not exist and ignoreIfNotExists is false + * @throws DatabaseNotEmptyException if the database contains objects and cascade is false + * @throws CatalogException if an error occurs while dropping the database + */ + @Override + public void dropDatabase(String databaseName, boolean ignoreIfNotExists, boolean cascade) + throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException { + if (databaseExists(databaseName)) { + glueDatabaseOperations.dropGlueDatabase(databaseName); + } else if (!ignoreIfNotExists) { + throw new DatabaseNotExistException(getName(), databaseName); + } + } + + /** + * Lists all tables in a specified database. + * + * @param databaseName the name of the database + * @return a list of table names in the database + * @throws DatabaseNotExistException if the database does not exist + * @throws CatalogException if an error occurs while listing the tables + */ + @Override + public List<String> listTables(String databaseName) throws DatabaseNotExistException, CatalogException { + if (!databaseExists(databaseName)) { + throw new DatabaseNotExistException(getName(), databaseName); + } + + return glueTableOperations.listTables(databaseName); + } + + /** + * Retrieves a table from the catalog using its object path. + * + * @param objectPath the object path of the table to retrieve + * @return the corresponding CatalogBaseTable for the specified table + * @throws TableNotExistException if the table does not exist + * @throws CatalogException if an error occurs while retrieving the table + */ + @Override + public CatalogBaseTable getTable(ObjectPath objectPath) throws TableNotExistException, CatalogException { + String databaseName = objectPath.getDatabaseName(); + String tableName = objectPath.getObjectName(); + + Table glueTable = glueTableOperations.getGlueTable(databaseName, tableName); + return getCatalogBaseTableFromGlueTable(glueTable); + } + + /** + * Checks if a table exists in the Glue catalog. + * + * @param objectPath the object path of the table to check + * @return true if the table exists, false otherwise + * @throws CatalogException if an error occurs while checking the table's existence + */ + @Override + public boolean tableExists(ObjectPath objectPath) throws CatalogException { + String databaseName = objectPath.getDatabaseName(); + String tableName = objectPath.getObjectName(); + + // Delegate existence check to GlueTableOperations + return glueTableOperations.glueTableExists(databaseName, tableName); + } + + /** + * Drops a table from the Glue catalog. + * + * @param objectPath the object path of the table to drop + * @param ifExists flag indicating whether to ignore the exception if the table does not exist + * @throws TableNotExistException if the table does not exist and ifExists is false + * @throws CatalogException if an error occurs while dropping the table + */ + @Override + public void dropTable(ObjectPath objectPath, boolean ifExists) throws TableNotExistException, CatalogException { + String databaseName = objectPath.getDatabaseName(); + String tableName = objectPath.getObjectName(); + + if (!glueTableOperations.glueTableExists(databaseName, tableName)) { + if (!ifExists) { + throw new TableNotExistException(getName(), objectPath); + } + return; // Table doesn't exist, and IF EXISTS is true + } + + glueTableOperations.dropTable(databaseName, tableName); + } + + /** + * Creates a table in the Glue catalog. + * + * @param objectPath the object path of the table to create + * @param catalogBaseTable the table definition containing the schema and properties + * @param ifNotExists flag indicating whether to ignore the exception if the table already exists + * @throws TableAlreadyExistException if the table already exists and ifNotExists is false + * @throws DatabaseNotExistException if the database does not exist + * @throws CatalogException if an error occurs while creating the table + */ + @Override + public void createTable(ObjectPath objectPath, CatalogBaseTable catalogBaseTable, boolean ifNotExists) + throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { + + // Validate that required parameters are not null + if (objectPath == null) { + throw new NullPointerException("ObjectPath cannot be null"); + } + + if (catalogBaseTable == null) { + throw new NullPointerException("CatalogBaseTable cannot be null"); + } + + String databaseName = objectPath.getDatabaseName(); + String tableName = objectPath.getObjectName(); + + // Check if the database exists + if (!databaseExists(databaseName)) { + throw new DatabaseNotExistException(getName(), databaseName); + } + + // Check if the table already exists + if (glueTableOperations.glueTableExists(databaseName, tableName)) { + if (!ifNotExists) { + throw new TableAlreadyExistException(getName(), objectPath); + } + return; // Table exists, and IF NOT EXISTS is true + } + + // Get common properties + Map<String, String> tableProperties = new HashMap<>(catalogBaseTable.getOptions()); + + try { + // Process based on table type + if (catalogBaseTable.getTableKind() == CatalogBaseTable.TableKind.TABLE) { + createRegularTable(objectPath, (CatalogTable) catalogBaseTable, tableProperties); + } else if (catalogBaseTable.getTableKind() == CatalogBaseTable.TableKind.VIEW) { + createView(objectPath, (CatalogView) catalogBaseTable, tableProperties); + } else { + throw new CatalogException("Unsupported table kind: " + catalogBaseTable.getTableKind()); + } + LOG.info("Successfully created {}.{} of kind {}", databaseName, tableName, catalogBaseTable.getTableKind()); + } catch (Exception e) { + throw new CatalogException( + String.format("Failed to create table %s.%s: %s", databaseName, tableName, e.getMessage()), e); + } + } + + /** + * Lists all views in a specified database. + * + * @param databaseName the name of the database + * @return a list of view names in the database + * @throws DatabaseNotExistException if the database does not exist + * @throws CatalogException if an error occurs while listing the views + */ + @Override + public List<String> listViews(String databaseName) throws DatabaseNotExistException, CatalogException { + + // Check if the database exists before listing views + if (!databaseExists(databaseName)) { + throw new DatabaseNotExistException(getName(), databaseName); + } + + try { + // Get all tables in the database + List<Table> allTables = glueClient.getTables(builder -> builder.databaseName(databaseName)) + .tableList(); + + // Filter tables to only include those that are of type VIEW + List<String> viewNames = allTables.stream() + .filter(table -> { + String tableType = table.tableType(); + return tableType != null && tableType.equalsIgnoreCase(CatalogBaseTable.TableKind.VIEW.name()); + }) + .map(Table::name) + .collect(Collectors.toList()); + + return viewNames; + } catch (Exception e) { + LOG.error("Failed to list views in database {}: {}", databaseName, e.getMessage()); + throw new CatalogException( + String.format("Error listing views in database %s: %s", databaseName, e.getMessage()), e); + } + } + + @Override + public void alterDatabase(String s, CatalogDatabase catalogDatabase, boolean b) throws DatabaseNotExistException, CatalogException { + throw new UnsupportedOperationException("Altering databases is not supported by the Glue Catalog."); + } + + @Override + public void renameTable(ObjectPath objectPath, String s, boolean b) throws TableNotExistException, TableAlreadyExistException, CatalogException { + throw new UnsupportedOperationException("Renaming tables is not supported by the Glue Catalog."); + } + + @Override + public void alterTable(ObjectPath objectPath, CatalogBaseTable catalogBaseTable, boolean b) throws TableNotExistException, CatalogException { + throw new UnsupportedOperationException("Altering tables is not supported by the Glue Catalog."); + } + + @Override + public List<CatalogPartitionSpec> listPartitions(ObjectPath objectPath) throws TableNotExistException, TableNotPartitionedException, CatalogException { + throw new UnsupportedOperationException("Table partitioning operations are not supported by the Glue Catalog."); + } + + @Override + public List<CatalogPartitionSpec> listPartitions(ObjectPath objectPath, CatalogPartitionSpec catalogPartitionSpec) throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, CatalogException { + throw new UnsupportedOperationException("Table partitioning operations are not supported by the Glue Catalog."); + } + + @Override + public List<CatalogPartitionSpec> listPartitionsByFilter(ObjectPath objectPath, List<Expression> list) throws TableNotExistException, TableNotPartitionedException, CatalogException { + throw new UnsupportedOperationException("Table partitioning operations are not supported by the Glue Catalog."); + } + + @Override + public CatalogPartition getPartition(ObjectPath objectPath, CatalogPartitionSpec catalogPartitionSpec) throws PartitionNotExistException, CatalogException { + throw new UnsupportedOperationException("Table partitioning operations are not supported by the Glue Catalog."); + } + + @Override + public boolean partitionExists(ObjectPath objectPath, CatalogPartitionSpec catalogPartitionSpec) throws CatalogException { + throw new UnsupportedOperationException("Table partitioning operations are not supported by the Glue Catalog."); + } + + @Override + public void createPartition(ObjectPath objectPath, CatalogPartitionSpec catalogPartitionSpec, CatalogPartition catalogPartition, boolean b) throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionAlreadyExistsException, CatalogException { + throw new UnsupportedOperationException("Table partitioning operations are not supported by the Glue Catalog."); + } + + @Override + public void dropPartition(ObjectPath objectPath, CatalogPartitionSpec catalogPartitionSpec, boolean b) throws PartitionNotExistException, CatalogException { + throw new UnsupportedOperationException("Table partitioning operations are not supported by the Glue Catalog."); + } + + @Override + public void alterPartition(ObjectPath objectPath, CatalogPartitionSpec catalogPartitionSpec, CatalogPartition catalogPartition, boolean b) throws PartitionNotExistException, CatalogException { + throw new UnsupportedOperationException("Table partitioning operations are not supported by the Glue Catalog."); + } + + /** + * Normalizes an object path according to catalog-specific normalization rules. + * For functions, this ensures consistent case handling in function names. + * + * @param path the object path to normalize + * @return the normalized object path + */ + public ObjectPath normalize(ObjectPath path) { Review Comment: Is this useful for a user. Seems to me more an internal method. If so, better making it private ########## flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java: ########## @@ -0,0 +1,935 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue; + +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.catalog.AbstractCatalog; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogFunction; +import org.apache.flink.table.catalog.CatalogPartition; +import org.apache.flink.table.catalog.CatalogPartitionSpec; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.CatalogView; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.ResolvedCatalogBaseTable; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.FunctionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException; +import org.apache.flink.table.catalog.exceptions.PartitionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException; +import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; +import org.apache.flink.table.catalog.exceptions.TablePartitionedException; +import org.apache.flink.table.catalog.glue.operator.GlueDatabaseOperator; +import org.apache.flink.table.catalog.glue.operator.GlueFunctionOperator; +import org.apache.flink.table.catalog.glue.operator.GlueTableOperator; +import org.apache.flink.table.catalog.glue.util.GlueCatalogConstants; +import org.apache.flink.table.catalog.glue.util.GlueTableUtils; +import org.apache.flink.table.catalog.glue.util.GlueTypeConverter; +import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; +import org.apache.flink.table.catalog.stats.CatalogTableStatistics; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.functions.FunctionIdentifier; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.glue.GlueClient; +import software.amazon.awssdk.services.glue.model.StorageDescriptor; +import software.amazon.awssdk.services.glue.model.Table; +import software.amazon.awssdk.services.glue.model.TableInput; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * GlueCatalog is an implementation of the Flink AbstractCatalog that interacts with AWS Glue. + * This class allows Flink to perform various catalog operations such as creating, deleting, and retrieving + * databases and tables from Glue. It encapsulates AWS Glue's API and provides a Flink-compatible interface. + * + * <p>This catalog uses GlueClient to interact with AWS Glue services, and operations related to databases and + * tables are delegated to respective helper classes like GlueDatabaseOperations and GlueTableOperations.</p> + */ +public class GlueCatalog extends AbstractCatalog { + + private static final Logger LOG = LoggerFactory.getLogger(GlueCatalog.class); + + private final GlueClient glueClient; + private final GlueTypeConverter glueTypeConverter; + private final GlueDatabaseOperator glueDatabaseOperations; + private final GlueTableOperator glueTableOperations; + private final GlueFunctionOperator glueFunctionsOperations; + private final GlueTableUtils glueTableUtils; + + /** + * Constructs a GlueCatalog with a provided Glue client. + * + * @param name the name of the catalog + * @param defaultDatabase the default database for the catalog + * @param region the AWS region to be used for Glue operations + * @param glueClient Glue Client so we can decide which one to use for testing + */ + public GlueCatalog(String name, String defaultDatabase, String region, GlueClient glueClient) { + super(name, defaultDatabase); + + // Initialize GlueClient in the constructor + if (glueClient != null) { + this.glueClient = glueClient; + } else { + // If no GlueClient is provided, initialize it using the default region + this.glueClient = GlueClient.builder() + .region(Region.of(region)) + .build(); + } + this.glueTypeConverter = new GlueTypeConverter(); + this.glueTableUtils = new GlueTableUtils(glueTypeConverter); + this.glueDatabaseOperations = new GlueDatabaseOperator(glueClient, getName()); + this.glueTableOperations = new GlueTableOperator(glueClient, getName()); + this.glueFunctionsOperations = new GlueFunctionOperator(glueClient, getName()); + } + + /** + * Constructs a GlueCatalog with default client. + * + * @param name the name of the catalog + * @param defaultDatabase the default database for the catalog + * @param region the AWS region to be used for Glue operations + */ + public GlueCatalog(String name, String defaultDatabase, String region) { + super(name, defaultDatabase); + + // Create a synchronized client builder to avoid concurrent modification exceptions + this.glueClient = GlueClient.builder() + .region(Region.of(region)) + .credentialsProvider(software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider.create()) + .build(); + this.glueTypeConverter = new GlueTypeConverter(); + this.glueTableUtils = new GlueTableUtils(glueTypeConverter); + this.glueDatabaseOperations = new GlueDatabaseOperator(glueClient, getName()); + this.glueTableOperations = new GlueTableOperator(glueClient, getName()); + this.glueFunctionsOperations = new GlueFunctionOperator(glueClient, getName()); + } + + /** + * Opens the GlueCatalog and initializes necessary resources. + * + * @throws CatalogException if an error occurs during the opening process + */ + @Override + public void open() throws CatalogException { + LOG.info("Opening GlueCatalog with client: {}", glueClient); + } + + /** + * Closes the GlueCatalog and releases resources. + * + * @throws CatalogException if an error occurs during the closing process + */ + @Override + public void close() throws CatalogException { + if (glueClient != null) { + LOG.info("Closing GlueCatalog client"); + int maxRetries = 3; Review Comment: These magic numbers should be constants ########## flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java: ########## @@ -0,0 +1,935 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue; + +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.catalog.AbstractCatalog; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogFunction; +import org.apache.flink.table.catalog.CatalogPartition; +import org.apache.flink.table.catalog.CatalogPartitionSpec; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.CatalogView; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.ResolvedCatalogBaseTable; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.FunctionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException; +import org.apache.flink.table.catalog.exceptions.PartitionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException; +import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; +import org.apache.flink.table.catalog.exceptions.TablePartitionedException; +import org.apache.flink.table.catalog.glue.operator.GlueDatabaseOperator; +import org.apache.flink.table.catalog.glue.operator.GlueFunctionOperator; +import org.apache.flink.table.catalog.glue.operator.GlueTableOperator; +import org.apache.flink.table.catalog.glue.util.GlueCatalogConstants; +import org.apache.flink.table.catalog.glue.util.GlueTableUtils; +import org.apache.flink.table.catalog.glue.util.GlueTypeConverter; +import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; +import org.apache.flink.table.catalog.stats.CatalogTableStatistics; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.functions.FunctionIdentifier; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.glue.GlueClient; +import software.amazon.awssdk.services.glue.model.StorageDescriptor; +import software.amazon.awssdk.services.glue.model.Table; +import software.amazon.awssdk.services.glue.model.TableInput; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * GlueCatalog is an implementation of the Flink AbstractCatalog that interacts with AWS Glue. + * This class allows Flink to perform various catalog operations such as creating, deleting, and retrieving + * databases and tables from Glue. It encapsulates AWS Glue's API and provides a Flink-compatible interface. + * + * <p>This catalog uses GlueClient to interact with AWS Glue services, and operations related to databases and + * tables are delegated to respective helper classes like GlueDatabaseOperations and GlueTableOperations.</p> + */ +public class GlueCatalog extends AbstractCatalog { + + private static final Logger LOG = LoggerFactory.getLogger(GlueCatalog.class); + + private final GlueClient glueClient; + private final GlueTypeConverter glueTypeConverter; + private final GlueDatabaseOperator glueDatabaseOperations; + private final GlueTableOperator glueTableOperations; + private final GlueFunctionOperator glueFunctionsOperations; + private final GlueTableUtils glueTableUtils; + + /** + * Constructs a GlueCatalog with a provided Glue client. + * + * @param name the name of the catalog + * @param defaultDatabase the default database for the catalog + * @param region the AWS region to be used for Glue operations + * @param glueClient Glue Client so we can decide which one to use for testing + */ + public GlueCatalog(String name, String defaultDatabase, String region, GlueClient glueClient) { + super(name, defaultDatabase); + + // Initialize GlueClient in the constructor + if (glueClient != null) { + this.glueClient = glueClient; + } else { + // If no GlueClient is provided, initialize it using the default region + this.glueClient = GlueClient.builder() + .region(Region.of(region)) + .build(); + } + this.glueTypeConverter = new GlueTypeConverter(); + this.glueTableUtils = new GlueTableUtils(glueTypeConverter); + this.glueDatabaseOperations = new GlueDatabaseOperator(glueClient, getName()); + this.glueTableOperations = new GlueTableOperator(glueClient, getName()); + this.glueFunctionsOperations = new GlueFunctionOperator(glueClient, getName()); + } + + /** + * Constructs a GlueCatalog with default client. + * + * @param name the name of the catalog + * @param defaultDatabase the default database for the catalog + * @param region the AWS region to be used for Glue operations + */ + public GlueCatalog(String name, String defaultDatabase, String region) { + super(name, defaultDatabase); + + // Create a synchronized client builder to avoid concurrent modification exceptions + this.glueClient = GlueClient.builder() + .region(Region.of(region)) + .credentialsProvider(software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider.create()) + .build(); + this.glueTypeConverter = new GlueTypeConverter(); + this.glueTableUtils = new GlueTableUtils(glueTypeConverter); + this.glueDatabaseOperations = new GlueDatabaseOperator(glueClient, getName()); + this.glueTableOperations = new GlueTableOperator(glueClient, getName()); + this.glueFunctionsOperations = new GlueFunctionOperator(glueClient, getName()); + } + + /** + * Opens the GlueCatalog and initializes necessary resources. + * + * @throws CatalogException if an error occurs during the opening process + */ + @Override + public void open() throws CatalogException { + LOG.info("Opening GlueCatalog with client: {}", glueClient); + } + + /** + * Closes the GlueCatalog and releases resources. + * + * @throws CatalogException if an error occurs during the closing process + */ + @Override + public void close() throws CatalogException { + if (glueClient != null) { + LOG.info("Closing GlueCatalog client"); + int maxRetries = 3; + int retryCount = 0; + long retryDelayMs = 200; + while (retryCount < maxRetries) { + try { + glueClient.close(); + LOG.info("Successfully closed GlueCatalog client"); + return; + } catch (RuntimeException e) { + retryCount++; + if (retryCount >= maxRetries) { + LOG.warn("Failed to close GlueCatalog client after {} retries", maxRetries, e); + throw new CatalogException("Failed to close GlueCatalog client", e); + } + LOG.warn("Failed to close GlueCatalog client (attempt {}/{}), retrying in {} ms", + retryCount, maxRetries, retryDelayMs, e); + try { + Thread.sleep(retryDelayMs); + // Exponential backoff + retryDelayMs *= 2; + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new CatalogException("Interrupted while retrying to close GlueCatalog client", ie); + } + } + } + } + } + + /** + * Lists all the databases available in the Glue catalog. + * + * @return a list of database names + * @throws CatalogException if an error occurs while listing the databases + */ + @Override + public List<String> listDatabases() throws CatalogException { + return glueDatabaseOperations.listDatabases(); + } + + /** + * Retrieves a specific database by its name. + * + * @param databaseName the name of the database to retrieve + * @return the database if found + * @throws DatabaseNotExistException if the database does not exist + * @throws CatalogException if an error occurs while retrieving the database + */ + @Override + public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException { + boolean databaseExists = databaseExists(databaseName); + if (!databaseExists) { + throw new DatabaseNotExistException(getName(), databaseName); + } + + return glueDatabaseOperations.getDatabase(databaseName); + } + + /** + * Checks if a database exists in Glue. + * + * @param databaseName the name of the database + * @return true if the database exists, false otherwise + * @throws CatalogException if an error occurs while checking the database + */ + @Override + public boolean databaseExists(String databaseName) throws CatalogException { + return glueDatabaseOperations.glueDatabaseExists(databaseName); + } + + /** + * Creates a new database in Glue. + * + * @param databaseName the name of the database to create + * @param catalogDatabase the catalog database object containing database metadata + * @param ifNotExists flag indicating whether to ignore the error if the database already exists + * @throws DatabaseAlreadyExistException if the database already exists and ifNotExists is false + * @throws CatalogException if an error occurs while creating the database + */ + @Override + public void createDatabase(String databaseName, CatalogDatabase catalogDatabase, boolean ifNotExists) + throws DatabaseAlreadyExistException, CatalogException { + boolean exists = databaseExists(databaseName); + + if (exists && !ifNotExists) { + throw new DatabaseAlreadyExistException(getName(), databaseName); + } + + if (!exists) { + glueDatabaseOperations.createDatabase(databaseName, catalogDatabase); + } + } + + /** + * Drops an existing database in Glue. + * + * @param databaseName the name of the database to drop + * @param ignoreIfNotExists flag to ignore the exception if the database doesn't exist + * @param cascade flag indicating whether to cascade the operation to drop related objects + * @throws DatabaseNotExistException if the database does not exist and ignoreIfNotExists is false + * @throws DatabaseNotEmptyException if the database contains objects and cascade is false + * @throws CatalogException if an error occurs while dropping the database + */ + @Override + public void dropDatabase(String databaseName, boolean ignoreIfNotExists, boolean cascade) + throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException { + if (databaseExists(databaseName)) { + glueDatabaseOperations.dropGlueDatabase(databaseName); + } else if (!ignoreIfNotExists) { + throw new DatabaseNotExistException(getName(), databaseName); + } + } + + /** + * Lists all tables in a specified database. + * + * @param databaseName the name of the database + * @return a list of table names in the database + * @throws DatabaseNotExistException if the database does not exist + * @throws CatalogException if an error occurs while listing the tables + */ + @Override + public List<String> listTables(String databaseName) throws DatabaseNotExistException, CatalogException { + if (!databaseExists(databaseName)) { + throw new DatabaseNotExistException(getName(), databaseName); + } + + return glueTableOperations.listTables(databaseName); + } + + /** + * Retrieves a table from the catalog using its object path. + * + * @param objectPath the object path of the table to retrieve + * @return the corresponding CatalogBaseTable for the specified table + * @throws TableNotExistException if the table does not exist + * @throws CatalogException if an error occurs while retrieving the table + */ + @Override + public CatalogBaseTable getTable(ObjectPath objectPath) throws TableNotExistException, CatalogException { + String databaseName = objectPath.getDatabaseName(); + String tableName = objectPath.getObjectName(); + + Table glueTable = glueTableOperations.getGlueTable(databaseName, tableName); + return getCatalogBaseTableFromGlueTable(glueTable); + } + + /** + * Checks if a table exists in the Glue catalog. + * + * @param objectPath the object path of the table to check + * @return true if the table exists, false otherwise + * @throws CatalogException if an error occurs while checking the table's existence + */ + @Override + public boolean tableExists(ObjectPath objectPath) throws CatalogException { + String databaseName = objectPath.getDatabaseName(); + String tableName = objectPath.getObjectName(); + + // Delegate existence check to GlueTableOperations + return glueTableOperations.glueTableExists(databaseName, tableName); + } + + /** + * Drops a table from the Glue catalog. + * + * @param objectPath the object path of the table to drop + * @param ifExists flag indicating whether to ignore the exception if the table does not exist + * @throws TableNotExistException if the table does not exist and ifExists is false + * @throws CatalogException if an error occurs while dropping the table + */ + @Override + public void dropTable(ObjectPath objectPath, boolean ifExists) throws TableNotExistException, CatalogException { + String databaseName = objectPath.getDatabaseName(); + String tableName = objectPath.getObjectName(); + + if (!glueTableOperations.glueTableExists(databaseName, tableName)) { + if (!ifExists) { + throw new TableNotExistException(getName(), objectPath); + } + return; // Table doesn't exist, and IF EXISTS is true + } + + glueTableOperations.dropTable(databaseName, tableName); + } + + /** + * Creates a table in the Glue catalog. + * + * @param objectPath the object path of the table to create + * @param catalogBaseTable the table definition containing the schema and properties + * @param ifNotExists flag indicating whether to ignore the exception if the table already exists + * @throws TableAlreadyExistException if the table already exists and ifNotExists is false + * @throws DatabaseNotExistException if the database does not exist + * @throws CatalogException if an error occurs while creating the table + */ + @Override + public void createTable(ObjectPath objectPath, CatalogBaseTable catalogBaseTable, boolean ifNotExists) + throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { + + // Validate that required parameters are not null + if (objectPath == null) { + throw new NullPointerException("ObjectPath cannot be null"); + } + + if (catalogBaseTable == null) { + throw new NullPointerException("CatalogBaseTable cannot be null"); + } + + String databaseName = objectPath.getDatabaseName(); + String tableName = objectPath.getObjectName(); + + // Check if the database exists + if (!databaseExists(databaseName)) { + throw new DatabaseNotExistException(getName(), databaseName); + } + + // Check if the table already exists + if (glueTableOperations.glueTableExists(databaseName, tableName)) { + if (!ifNotExists) { + throw new TableAlreadyExistException(getName(), objectPath); + } + return; // Table exists, and IF NOT EXISTS is true + } + + // Get common properties + Map<String, String> tableProperties = new HashMap<>(catalogBaseTable.getOptions()); + + try { + // Process based on table type + if (catalogBaseTable.getTableKind() == CatalogBaseTable.TableKind.TABLE) { + createRegularTable(objectPath, (CatalogTable) catalogBaseTable, tableProperties); + } else if (catalogBaseTable.getTableKind() == CatalogBaseTable.TableKind.VIEW) { + createView(objectPath, (CatalogView) catalogBaseTable, tableProperties); + } else { + throw new CatalogException("Unsupported table kind: " + catalogBaseTable.getTableKind()); + } + LOG.info("Successfully created {}.{} of kind {}", databaseName, tableName, catalogBaseTable.getTableKind()); + } catch (Exception e) { + throw new CatalogException( + String.format("Failed to create table %s.%s: %s", databaseName, tableName, e.getMessage()), e); + } + } + + /** + * Lists all views in a specified database. + * + * @param databaseName the name of the database + * @return a list of view names in the database + * @throws DatabaseNotExistException if the database does not exist + * @throws CatalogException if an error occurs while listing the views + */ + @Override + public List<String> listViews(String databaseName) throws DatabaseNotExistException, CatalogException { + + // Check if the database exists before listing views + if (!databaseExists(databaseName)) { + throw new DatabaseNotExistException(getName(), databaseName); + } + + try { + // Get all tables in the database + List<Table> allTables = glueClient.getTables(builder -> builder.databaseName(databaseName)) + .tableList(); + + // Filter tables to only include those that are of type VIEW + List<String> viewNames = allTables.stream() Review Comment: You can return here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
