Samrat002 commented on code in PR #47:
URL: 
https://github.com/apache/flink-connector-aws/pull/47#discussion_r1175579217


##########
flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/GlueOperator.java:
##########
@@ -0,0 +1,1359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue.util;
+
+import org.apache.flink.connector.aws.config.AWSConfig;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogFunctionImpl;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogView;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.glue.GlueCatalogConstants;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.factories.ManagedTableFactory;
+import org.apache.flink.table.resource.ResourceType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.AlreadyExistsException;
+import software.amazon.awssdk.services.glue.model.BatchDeleteTableRequest;
+import software.amazon.awssdk.services.glue.model.BatchDeleteTableResponse;
+import software.amazon.awssdk.services.glue.model.Column;
+import software.amazon.awssdk.services.glue.model.CreateDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.CreateDatabaseResponse;
+import software.amazon.awssdk.services.glue.model.CreatePartitionRequest;
+import software.amazon.awssdk.services.glue.model.CreatePartitionResponse;
+import software.amazon.awssdk.services.glue.model.CreateTableRequest;
+import software.amazon.awssdk.services.glue.model.CreateTableResponse;
+import 
software.amazon.awssdk.services.glue.model.CreateUserDefinedFunctionRequest;
+import 
software.amazon.awssdk.services.glue.model.CreateUserDefinedFunctionResponse;
+import software.amazon.awssdk.services.glue.model.Database;
+import software.amazon.awssdk.services.glue.model.DatabaseInput;
+import software.amazon.awssdk.services.glue.model.DeleteDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.DeleteDatabaseResponse;
+import software.amazon.awssdk.services.glue.model.DeletePartitionRequest;
+import software.amazon.awssdk.services.glue.model.DeletePartitionResponse;
+import software.amazon.awssdk.services.glue.model.DeleteTableRequest;
+import software.amazon.awssdk.services.glue.model.DeleteTableResponse;
+import 
software.amazon.awssdk.services.glue.model.DeleteUserDefinedFunctionRequest;
+import 
software.amazon.awssdk.services.glue.model.DeleteUserDefinedFunctionResponse;
+import software.amazon.awssdk.services.glue.model.EntityNotFoundException;
+import software.amazon.awssdk.services.glue.model.GetDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.GetDatabaseResponse;
+import software.amazon.awssdk.services.glue.model.GetDatabasesRequest;
+import software.amazon.awssdk.services.glue.model.GetDatabasesResponse;
+import software.amazon.awssdk.services.glue.model.GetPartitionRequest;
+import software.amazon.awssdk.services.glue.model.GetPartitionResponse;
+import software.amazon.awssdk.services.glue.model.GetPartitionsRequest;
+import software.amazon.awssdk.services.glue.model.GetPartitionsResponse;
+import software.amazon.awssdk.services.glue.model.GetTableRequest;
+import software.amazon.awssdk.services.glue.model.GetTableResponse;
+import software.amazon.awssdk.services.glue.model.GetTablesRequest;
+import software.amazon.awssdk.services.glue.model.GetTablesResponse;
+import 
software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionRequest;
+import 
software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionResponse;
+import 
software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionsRequest;
+import 
software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionsResponse;
+import software.amazon.awssdk.services.glue.model.GlueException;
+import software.amazon.awssdk.services.glue.model.Partition;
+import software.amazon.awssdk.services.glue.model.PartitionInput;
+import software.amazon.awssdk.services.glue.model.ResourceUri;
+import software.amazon.awssdk.services.glue.model.StorageDescriptor;
+import software.amazon.awssdk.services.glue.model.Table;
+import software.amazon.awssdk.services.glue.model.TableInput;
+import software.amazon.awssdk.services.glue.model.UpdateDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.UpdateDatabaseResponse;
+import software.amazon.awssdk.services.glue.model.UpdateTableRequest;
+import software.amazon.awssdk.services.glue.model.UpdateTableResponse;
+import 
software.amazon.awssdk.services.glue.model.UpdateUserDefinedFunctionRequest;
+import 
software.amazon.awssdk.services.glue.model.UpdateUserDefinedFunctionResponse;
+import software.amazon.awssdk.services.glue.model.UserDefinedFunction;
+import software.amazon.awssdk.services.glue.model.UserDefinedFunctionInput;
+
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;
+
+/**
+ * Utilities for Glue catalog operations. Important Note :
+ * 
https://aws.amazon.com/premiumsupport/knowledge-center/glue-crawler-internal-service-exception/
+ */
+public class GlueOperator {
+
+    /**
+     * Instance of AwsProperties which holds the configs related to configure 
glue and aws setup.
+     */
+    private final AWSConfig awsConfig;
+
+    /** http client for glue client. Current implementation for client is sync 
type. */
+    private final GlueClient glueClient;
+
+    private final String catalogName;
+
+    public final String catalogPath;
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(GlueOperator.class);
+
+    public GlueOperator(
+            String catalogName, String catalogPath, AWSConfig awsConfig, 
GlueClient glueClient) {
+        this.awsConfig = awsConfig;
+        this.glueClient = glueClient;
+        this.catalogPath = catalogPath;
+        this.catalogName = catalogName;
+    }
+
+    public void closeClient() {
+        glueClient.close();
+    }
+
+    // -------------- Database related operations.
+
+    /**
+     * List all databases present.
+     *
+     * @return List of fully qualified database names
+     */
+    public List<String> listGlueDatabases() throws CatalogException {
+        try {
+            GetDatabasesRequest.Builder databasesRequestBuilder =
+                    
GetDatabasesRequest.builder().catalogId(getGlueCatalogId());
+
+            GetDatabasesResponse response =
+                    glueClient.getDatabases(databasesRequestBuilder.build());
+            List<String> glueDatabases =
+                    response.databaseList().stream()
+                            .map(Database::name)
+                            .collect(Collectors.toList());
+            String dbResultNextToken = response.nextToken();
+            if (Optional.ofNullable(dbResultNextToken).isPresent()) {
+                do {
+                    databasesRequestBuilder.nextToken(dbResultNextToken);
+                    response = 
glueClient.getDatabases(databasesRequestBuilder.build());
+                    glueDatabases.addAll(
+                            response.databaseList().stream()
+                                    .map(Database::name)
+                                    .collect(Collectors.toList()));
+                    dbResultNextToken = response.nextToken();
+                } while (Optional.ofNullable(dbResultNextToken).isPresent());
+            }
+            return glueDatabases;
+        } catch (GlueException e) {
+            throw new CatalogException(
+                    GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, 
e.getCause());
+        }
+    }
+
+    /**
+     * Create database in glue data catalog.
+     *
+     * @param databaseName fully qualified name of database.
+     * @param database Instance of {@link CatalogDatabase}.
+     * @throws CatalogException when unknown error from glue servers.
+     * @throws DatabaseAlreadyExistException when database exists already in 
glue data catalog.
+     */
+    public void createGlueDatabase(String databaseName, CatalogDatabase 
database)
+            throws CatalogException, DatabaseAlreadyExistException {
+
+        GlueUtils.validate(databaseName);
+        Map<String, String> properties = new 
HashMap<>(database.getProperties());
+        String str =
+                properties.entrySet().stream()
+                        .map(e -> e.getKey() + ":" + e.getValue())
+                        .collect(Collectors.joining("|"));
+        LOG.info("Create DB MYLOG:- " + str);
+        DatabaseInput.Builder databaseInputBuilder =
+                DatabaseInput.builder()
+                        .name(databaseName)
+                        .description(database.getComment())
+                        .locationUri(
+                                GlueUtils.extractDatabaseLocation(
+                                        properties, databaseName, catalogPath))
+                        .parameters(properties);
+        CreateDatabaseRequest.Builder requestBuilder =
+                CreateDatabaseRequest.builder()
+                        .databaseInput(databaseInputBuilder.build())
+                        .catalogId(getGlueCatalogId());
+        LOG.info(
+                String.format(
+                        "Database Properties Listing :- %s",
+                        properties.entrySet().stream()
+                                .map(e -> e.getKey() + e.getValue())
+                                .collect(Collectors.joining(","))));
+        try {
+            CreateDatabaseResponse response = 
glueClient.createDatabase(requestBuilder.build());
+            if (LOG.isDebugEnabled()) {
+                LOG.debug(GlueUtils.getDebugLog(response));
+            }
+            GlueUtils.validateGlueResponse(response);
+            LOG.info(String.format("%s Database created.", databaseName));
+        } catch (EntityNotFoundException e) {
+            throw new DatabaseAlreadyExistException(catalogName, databaseName, 
e);
+        } catch (GlueException e) {
+            throw new 
CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    /**
+     * Delete a database from Glue data catalog only when database is empty.
+     *
+     * @param databaseName fully qualified name of database
+     * @throws CatalogException Any Exception thrown due to glue error
+     * @throws DatabaseNotExistException when database doesn't exists in glue 
catalog.
+     */
+    public void dropGlueDatabase(String databaseName)
+            throws CatalogException, DatabaseNotExistException {
+
+        GlueUtils.validate(databaseName);
+        DeleteDatabaseRequest deleteDatabaseRequest =
+                DeleteDatabaseRequest.builder()
+                        .name(databaseName)
+                        .catalogId(getGlueCatalogId())
+                        .build();
+        try {
+            DeleteDatabaseResponse response = 
glueClient.deleteDatabase(deleteDatabaseRequest);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug(GlueUtils.getDebugLog(response));
+            }
+            GlueUtils.validateGlueResponse(response);
+            LOG.info(String.format("Database Dropped %s", databaseName));
+        } catch (EntityNotFoundException e) {
+            throw new DatabaseNotExistException(catalogName, databaseName);
+        } catch (GlueException e) {
+            throw new CatalogException(catalogName, e);
+        }
+    }
+
+    /**
+     * Drops list of table in database from glue data catalog.
+     *
+     * @param databaseName fully qualified name of database
+     * @param tables List of tables to remove from database.
+     * @throws CatalogException Any Exception thrown due to glue error
+     */
+    public void deleteTablesFromDatabase(String databaseName, 
Collection<String> tables)
+            throws CatalogException {
+        GlueUtils.validate(databaseName);
+        BatchDeleteTableRequest batchTableRequest =
+                BatchDeleteTableRequest.builder()
+                        .databaseName(databaseName)
+                        .catalogId(getGlueCatalogId())
+                        .tablesToDelete(tables)
+                        .build();
+
+        try {
+            BatchDeleteTableResponse response = 
glueClient.batchDeleteTable(batchTableRequest);
+            if (response.hasErrors()) {
+                String errorMsg =
+                        String.format(
+                                "Glue Table errors:- %s",
+                                response.errors().stream()
+                                        .map(
+                                                e ->
+                                                        "Table: "
+                                                                + e.tableName()
+                                                                + 
"\nErrorDetail:  "
+                                                                + 
e.errorDetail().errorMessage())
+                                        .collect(Collectors.joining("\n")));
+                LOG.error(errorMsg);
+                throw new CatalogException(errorMsg);
+            }
+            GlueUtils.validateGlueResponse(response);
+            LOG.info("Tables deleted.");
+        } catch (GlueException e) {
+            throw new 
CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    /**
+     * Drops list of user defined function in database from glue data catalog.
+     *
+     * @param databaseName fully qualified name of database
+     * @param functions List of tables to remove from database.
+     * @throws CatalogException Any Exception thrown due to glue error
+     */
+    public void deleteFunctionsFromDatabase(String databaseName, 
Collection<String> functions)
+            throws CatalogException {
+        GlueUtils.validate(databaseName);
+        try {
+            DeleteUserDefinedFunctionRequest.Builder requestBuilder =
+                    DeleteUserDefinedFunctionRequest.builder()
+                            .databaseName(databaseName)
+                            .catalogId(getGlueCatalogId());
+            for (String functionName : functions) {
+                requestBuilder.functionName(functionName);
+                DeleteUserDefinedFunctionResponse response =
+                        
glueClient.deleteUserDefinedFunction(requestBuilder.build());
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug(GlueUtils.getDebugLog(response));
+                }
+                GlueUtils.validateGlueResponse(response);
+                LOG.info(String.format("Dropped Function %s", functionName));
+            }
+
+        } catch (GlueException e) {
+            LOG.error(String.format("Error deleting functions in database: 
%s", databaseName));
+            throw new 
CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    /**
+     * Check if database is empty. i.e. it should not contain 1. table 2. 
functions
+     *
+     * @param databaseName name of database.
+     * @return boolean True/False based on the content of database.
+     * @throws CatalogException Any Exception thrown due to glue error
+     */
+    public boolean isDatabaseEmpty(String databaseName) throws 
CatalogException {
+        checkArgument(!isNullOrWhitespaceOnly(databaseName));
+        GlueUtils.validate(databaseName);
+        GetTablesRequest tablesRequest =
+                GetTablesRequest.builder()
+                        .catalogId(getGlueCatalogId())
+                        .databaseName(databaseName)
+                        .maxResults(1)
+                        .build();
+        try {
+            GetTablesResponse response = glueClient.getTables(tablesRequest);
+            return response.sdkHttpResponse().isSuccessful()
+                    && response.tableList().size() == 0
+                    && listGlueFunctions(databaseName).size() == 0;
+        } catch (GlueException e) {
+            throw new 
CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    /**
+     * Get a database from this glue data catalog.
+     *
+     * @param databaseName fully qualified name of database.
+     * @return Instance of {@link CatalogDatabase } .
+     * @throws DatabaseNotExistException when database doesn't exists in Glue 
data catalog.
+     * @throws CatalogException when any unknown error occurs in glue.
+     */
+    public CatalogDatabase getDatabase(String databaseName)
+            throws DatabaseNotExistException, CatalogException {
+        GlueUtils.validate(databaseName);
+        GetDatabaseRequest getDatabaseRequest =
+                GetDatabaseRequest.builder()
+                        .name(databaseName)
+                        .catalogId(getGlueCatalogId())
+                        .build();
+        try {
+            GetDatabaseResponse response = 
glueClient.getDatabase(getDatabaseRequest);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug(
+                        GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER
+                                + ": existing database. Client call response 
:- "
+                                + response.sdkHttpResponse().statusText());
+            }
+            GlueUtils.validateGlueResponse(response);
+            return GlueUtils.getCatalogDatabase(response.database());
+        } catch (EntityNotFoundException e) {
+            throw new DatabaseNotExistException(catalogName, databaseName);
+        } catch (GlueException e) {
+            throw new 
CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    public void updateGlueDatabase(String databaseName, CatalogDatabase 
newDatabase)
+            throws CatalogException {
+
+        GlueUtils.validate(databaseName);
+        Map<String, String> properties = new 
HashMap<>(newDatabase.getProperties());
+        DatabaseInput.Builder databaseInputBuilder =
+                DatabaseInput.builder()
+                        .parameters(properties)
+                        .description(newDatabase.getComment())
+                        .name(databaseName)
+                        .locationUri(
+                                GlueUtils.extractDatabaseLocation(
+                                        properties, databaseName, 
catalogPath));
+
+        UpdateDatabaseRequest updateRequest =
+                UpdateDatabaseRequest.builder()
+                        .databaseInput(databaseInputBuilder.build())
+                        .name(databaseName)
+                        .catalogId(getGlueCatalogId())
+                        .build();
+        UpdateDatabaseResponse response = 
glueClient.updateDatabase(updateRequest);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug(GlueUtils.getDebugLog(response));
+        }
+        GlueUtils.validateGlueResponse(response);
+        LOG.info(String.format("Database Updated. %s", databaseName));
+    }
+
+    // -------------- Table related operations.
+
+    /**
+     * Create table in glue data catalog.
+     *
+     * @param tablePath Fully qualified name of table. {@link ObjectPath}
+     * @param table instance of {@link CatalogBaseTable} containing table 
related information.
+     * @param managedTable identifier if managed table.
+     * @throws CatalogException Any Exception thrown due to glue error
+     */
+    public void createGlueTable(
+            final ObjectPath tablePath, final CatalogBaseTable table, final 
boolean managedTable)
+            throws CatalogException {
+
+        checkNotNull(table, "Table cannot be Null");
+        checkNotNull(tablePath, "Table Path cannot be Null");
+        final Map<String, String> properties = new 
HashMap<>(table.getOptions());
+        String tableOwner = GlueUtils.extractTableOwner(properties);
+        if (managedTable) {
+            properties.put(CONNECTOR.key(), 
ManagedTableFactory.DEFAULT_IDENTIFIER);
+        }
+        Set<Column> glueColumns = 
GlueUtils.getGlueColumnsFromCatalogTable(table);
+        String tableLocation = "";
+        if (properties.containsKey(GlueCatalogConstants.LOCATION_URI)) {
+            tableLocation = 
properties.remove(GlueCatalogConstants.LOCATION_URI);
+        } else {
+            try {
+                tableLocation =
+                        getDatabase(tablePath.getDatabaseName())
+                                        .getProperties()
+                                        .get(GlueCatalogConstants.LOCATION_URI)
+                                + GlueCatalogConstants.LOCATION_SEPARATOR
+                                + tablePath.getObjectName();
+            } catch (DatabaseNotExistException e) {
+                LOG.warn("Database doesn't Exists");
+            }
+        }
+
+        StorageDescriptor.Builder storageDescriptorBuilder =
+                StorageDescriptor.builder()
+                        .inputFormat(GlueUtils.extractInputFormat(properties))
+                        
.outputFormat(GlueUtils.extractOutputFormat(properties))
+                        .location(tableLocation);
+
+        TableInput.Builder tableInputBuilder =
+                TableInput.builder()
+                        .name(tablePath.getObjectName())
+                        .description(table.getComment())
+                        .tableType(table.getTableKind().name())
+                        .lastAccessTime(Instant.now())
+                        .owner(tableOwner);
+
+        if 
(table.getTableKind().name().equalsIgnoreCase(CatalogBaseTable.TableKind.VIEW.name()))
 {
+            
tableInputBuilder.viewExpandedText(GlueUtils.getExpandedQuery(table));
+            
tableInputBuilder.viewOriginalText(GlueUtils.getOriginalQuery(table));
+        }
+
+        CreateTableRequest.Builder requestBuilder =
+                CreateTableRequest.builder()
+                        .catalogId(getGlueCatalogId())
+                        .databaseName(tablePath.getDatabaseName());
+
+        if (table instanceof CatalogTable) {
+            CatalogTable catalogTable = (CatalogTable) table;
+            if (catalogTable.isPartitioned()) {
+                LOG.info("Catalog table is partitioned");
+                Collection<Column> partitionKeys =
+                        GlueUtils.getPartitionKeys(catalogTable, glueColumns);
+                LOG.info(
+                        "Partition columns are -> "
+                                + partitionKeys.stream()
+                                        .map(Column::name)
+                                        .collect(Collectors.joining(",")));
+                tableInputBuilder.partitionKeys(partitionKeys);
+            }
+        }
+
+        try {
+            storageDescriptorBuilder.columns(glueColumns);
+            
tableInputBuilder.storageDescriptor(storageDescriptorBuilder.build());
+            tableInputBuilder.parameters(properties);
+            requestBuilder.tableInput(tableInputBuilder.build());
+            CreateTableResponse response = 
glueClient.createTable(requestBuilder.build());
+            LOG.debug(GlueUtils.getDebugLog(response));
+            GlueUtils.validateGlueResponse(response);
+            LOG.info(String.format("Table created. %s", 
tablePath.getFullName()));
+
+        } catch (GlueException e) {
+            throw new 
CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    /**
+     * @param tablePath fully Qualified table path.
+     * @param newTable instance of {@link CatalogBaseTable} containing 
information for table.
+     * @param managedTable identifier for managed table.
+     * @throws CatalogException Glue related exception.
+     */
+    public void alterGlueTable(
+            ObjectPath tablePath, CatalogBaseTable newTable, boolean 
managedTable)
+            throws CatalogException {
+
+        Map<String, String> properties = new HashMap<>(newTable.getOptions());
+        String tableOwner = GlueUtils.extractTableOwner(properties);
+
+        if (managedTable) {
+            properties.put(CONNECTOR.key(), 
ManagedTableFactory.DEFAULT_IDENTIFIER);
+        }
+
+        Set<Column> glueColumns = 
GlueUtils.getGlueColumnsFromCatalogTable(newTable);
+
+        // create StorageDescriptor for table
+        StorageDescriptor.Builder storageDescriptorBuilder =
+                StorageDescriptor.builder()
+                        .inputFormat(GlueUtils.extractInputFormat(properties))
+                        
.outputFormat(GlueUtils.extractOutputFormat(properties))
+                        .location(
+                                GlueUtils.extractTableLocation(properties, 
tablePath, catalogPath))
+                        .parameters(properties)
+                        .columns(glueColumns);
+
+        // create TableInput Builder with available information.
+        TableInput.Builder tableInputBuilder =
+                TableInput.builder()
+                        .name(tablePath.getObjectName())
+                        .description(newTable.getComment())
+                        .tableType(newTable.getTableKind().name())
+                        .lastAccessTime(Instant.now())
+                        .owner(tableOwner);
+
+        UpdateTableRequest.Builder requestBuilder =
+                UpdateTableRequest.builder()
+                        .tableInput(tableInputBuilder.build())
+                        .catalogId(getGlueCatalogId())
+                        .databaseName(tablePath.getDatabaseName());
+
+        if (newTable instanceof CatalogTable) {
+            CatalogTable catalogTable = (CatalogTable) newTable;
+            if (catalogTable.isPartitioned()) {
+                tableInputBuilder.partitionKeys(
+                        GlueUtils.getPartitionKeys(catalogTable, glueColumns));
+            }
+        }
+
+        // apply storage descriptor and tableInput for request
+        tableInputBuilder.storageDescriptor(storageDescriptorBuilder.build());
+        requestBuilder.tableInput(tableInputBuilder.build());
+
+        try {
+            UpdateTableResponse response = 
glueClient.updateTable(requestBuilder.build());
+            LOG.debug(GlueUtils.getDebugLog(response));
+            GlueUtils.validateGlueResponse(response);
+            LOG.info(String.format("Table updated. %s", 
tablePath.getFullName()));
+        } catch (GlueException e) {
+            throw new 
CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    /**
+     * Get names of all tables or views under this database based on type 
identifier. An empty list
+     * is returned if none exists.
+     *
+     * @param databaseName fully qualified database name.
+     * @return a list of the names of all tables or views in this database 
based on type identifier.
+     * @throws CatalogException in case of any runtime exception
+     */
+    public List<String> getGlueTableList(String databaseName, String type) 
throws CatalogException {
+        GetTablesRequest.Builder tablesRequestBuilder =
+                
GetTablesRequest.builder().databaseName(databaseName).catalogId(getGlueCatalogId());
+        GetTablesResponse response = 
glueClient.getTables(tablesRequestBuilder.build());
+        GlueUtils.validateGlueResponse(response);
+        List<String> finalTableList =
+                response.tableList().stream()
+                        .filter(table -> 
table.tableType().equalsIgnoreCase(type))
+                        .map(Table::name)
+                        .collect(Collectors.toList());
+        String tableResultNextToken = response.nextToken();
+
+        if (Optional.ofNullable(tableResultNextToken).isPresent()) {
+            do {
+                // update token in requestBuilder to fetch next batch
+                tablesRequestBuilder.nextToken(tableResultNextToken);
+                response = glueClient.getTables(tablesRequestBuilder.build());
+                GlueUtils.validateGlueResponse(response);
+                finalTableList.addAll(
+                        response.tableList().stream()
+                                .filter(table -> 
table.tableType().equalsIgnoreCase(type))
+                                .map(Table::name)
+                                .collect(Collectors.toList()));
+                tableResultNextToken = response.nextToken();
+            } while (Optional.ofNullable(tableResultNextToken).isPresent());
+        }
+        return finalTableList;
+    }
+
+    /**
+     * Returns a {@link Table} identified by the given table Path. {@link 
ObjectPath}.
+     *
+     * @param tablePath Path of the table or view
+     * @return The requested table. Glue encapsulates whether table or view in 
its attribute called
+     *     type.
+     * @throws TableNotExistException if the target does not exist
+     * @throws CatalogException in case of any runtime exception
+     */
+    public Table getGlueTable(ObjectPath tablePath)
+            throws TableNotExistException, CatalogException {
+        GetTableRequest tablesRequest =
+                GetTableRequest.builder()
+                        .databaseName(tablePath.getDatabaseName())
+                        .name(tablePath.getObjectName())
+                        .catalogId(getGlueCatalogId())
+                        .build();
+        try {
+            GetTableResponse response = glueClient.getTable(tablesRequest);
+            LOG.info(String.format("Glue table Found %s", 
response.table().name()));
+            GlueUtils.validateGlueResponse(response);
+            return response.table();
+        } catch (EntityNotFoundException e) {
+            throw new TableNotExistException(catalogName, tablePath, e);
+        } catch (GlueException e) {
+            throw new 
CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    /**
+     * Check if a table or view exists in glue data catalog.
+     *
+     * @param tablePath Path of the table or view
+     * @return true if the given table exists in the catalog false otherwise
+     * @throws CatalogException in case of any runtime exception
+     */
+    public boolean glueTableExists(ObjectPath tablePath) throws 
CatalogException {
+        try {
+            Table glueTable = getGlueTable(tablePath);
+            return glueTable != null && 
glueTable.name().equals(tablePath.getObjectName());
+        } catch (TableNotExistException e) {
+            LOG.warn(
+                    String.format(
+                            "%s\nDatabase: %s Table: %s",
+                            GlueCatalogConstants.TABLE_NOT_EXISTS_IDENTIFIER,
+                            tablePath.getDatabaseName(),
+                            tablePath.getObjectName()));
+            return false;
+        } catch (CatalogException e) {
+            LOG.error(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, 
e.getCause());
+            throw new 
CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    /**
+     * Drop a table or view from glue data catalog.
+     *
+     * @param tablePath fully qualified table path
+     * @throws CatalogException on runtime errors.
+     */
+    public void dropGlueTable(ObjectPath tablePath) throws CatalogException {
+        DeleteTableRequest.Builder tableRequestBuilder =
+                DeleteTableRequest.builder()
+                        .databaseName(tablePath.getDatabaseName())
+                        .name(tablePath.getObjectName())
+                        .catalogId(getGlueCatalogId());
+        try {
+            DeleteTableResponse response = 
glueClient.deleteTable(tableRequestBuilder.build());
+            GlueUtils.validateGlueResponse(response);
+            LOG.info(String.format("Dropped Table %s.", 
tablePath.getObjectName()));
+        } catch (GlueException e) {
+            throw new 
CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    // -------------- Function related operations.
+
+    /**
+     * Create a function. Function name should be handled in a 
case-insensitive way.
+     *
+     * @param functionPath path of the function
+     * @param function Flink function to be created
+     * @throws CatalogException in case of any runtime exception
+     */
+    public void createGlueFunction(ObjectPath functionPath, CatalogFunction 
function)
+            throws CatalogException, FunctionAlreadyExistException {
+
+        UserDefinedFunctionInput functionInput =
+                GlueUtils.createFunctionInput(functionPath, function);
+        CreateUserDefinedFunctionRequest.Builder requestBuilder =
+                CreateUserDefinedFunctionRequest.builder()
+                        .databaseName(functionPath.getDatabaseName())
+                        .catalogId(getGlueCatalogId())
+                        .functionInput(functionInput);
+        try {
+            CreateUserDefinedFunctionResponse response =
+                    
glueClient.createUserDefinedFunction(requestBuilder.build());
+            GlueUtils.validateGlueResponse(response);
+            LOG.info(String.format("Function created. %s", 
functionPath.getFullName()));
+        } catch (AlreadyExistsException e) {
+            LOG.error(
+                    String.format(
+                            "%s.%s already Exists. Function language of type: 
%s",
+                            functionPath.getDatabaseName(),
+                            functionPath.getObjectName(),
+                            function.getFunctionLanguage()));
+            throw new FunctionAlreadyExistException(catalogName, functionPath);
+        } catch (GlueException e) {
+            LOG.error("Error creating glue function.", e.getCause());
+            throw new 
CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    /**
+     * Get the user defined function from glue Catalog. Function name should 
be handled in a
+     * case-insensitive way.
+     *
+     * @param functionPath path of the function
+     * @return the requested function
+     * @throws CatalogException in case of any runtime exception
+     */
+    public CatalogFunction getGlueFunction(ObjectPath functionPath) {
+        GetUserDefinedFunctionRequest request =
+                GetUserDefinedFunctionRequest.builder()
+                        .catalogId(getGlueCatalogId())
+                        .databaseName(functionPath.getDatabaseName())
+                        .functionName(functionPath.getObjectName())
+                        .build();
+        GetUserDefinedFunctionResponse response = 
glueClient.getUserDefinedFunction(request);
+        GlueUtils.validateGlueResponse(response);
+        UserDefinedFunction udf = response.userDefinedFunction();
+
+        List<org.apache.flink.table.resource.ResourceUri> resourceUris = new 
LinkedList<>();
+        for (ResourceUri resourceUri : udf.resourceUris()) {
+            resourceUris.add(
+                    new org.apache.flink.table.resource.ResourceUri(
+                            
ResourceType.valueOf(resourceUri.resourceType().name()),
+                            resourceUri.uri()));
+        }
+
+        return new CatalogFunctionImpl(
+                GlueUtils.getCatalogFunctionClassName(udf),
+                GlueUtils.getFunctionalLanguage(udf),
+                resourceUris);
+    }
+
+    public List<String> listGlueFunctions(String databaseName) {
+        GetUserDefinedFunctionsRequest.Builder functionsRequest =
+                GetUserDefinedFunctionsRequest.builder()
+                        .databaseName(databaseName)
+                        .catalogId(getGlueCatalogId());
+
+        List<String> glueFunctions;
+        try {
+            GetUserDefinedFunctionsResponse functionsResponse =
+                    
glueClient.getUserDefinedFunctions(functionsRequest.build());
+            String token = functionsResponse.nextToken();
+            glueFunctions =
+                    functionsResponse.userDefinedFunctions().stream()
+                            .map(UserDefinedFunction::functionName)
+                            .collect(Collectors.toCollection(LinkedList::new));
+            if (Optional.ofNullable(token).isPresent()) {
+                do {
+                    functionsRequest.nextToken(token);
+                    functionsResponse =
+                            
glueClient.getUserDefinedFunctions(functionsRequest.build());
+                    glueFunctions.addAll(
+                            functionsResponse.userDefinedFunctions().stream()
+                                    .map(UserDefinedFunction::functionName)
+                                    
.collect(Collectors.toCollection(LinkedList::new)));
+                    token = functionsResponse.nextToken();
+                } while (Optional.ofNullable(token).isPresent());
+            }
+
+        } catch (GlueException e) {
+            throw new 
CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+        return glueFunctions;
+    }
+
+    public boolean glueFunctionExists(ObjectPath functionPath) {
+        GetUserDefinedFunctionRequest request =
+                GetUserDefinedFunctionRequest.builder()
+                        .functionName(functionPath.getObjectName())
+                        .databaseName(functionPath.getDatabaseName())
+                        .catalogId(getGlueCatalogId())
+                        .build();
+
+        try {
+            GetUserDefinedFunctionResponse response = 
glueClient.getUserDefinedFunction(request);
+            GlueUtils.validateGlueResponse(response);
+            return response.userDefinedFunction()
+                    .functionName()
+                    .equalsIgnoreCase(functionPath.getObjectName());
+        } catch (EntityNotFoundException e) {
+            LOG.warn(
+                    String.format(
+                            "Entry not found for function %s.%s",
+                            functionPath.getObjectName(), 
functionPath.getDatabaseName()));
+            return false;
+        } catch (GlueException e) {
+            LOG.error(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+            throw new 
CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    /**
+     * Modify an existing function. Function name should be handled in a 
case-insensitive way.
+     *
+     * @param functionPath path of function.
+     * @param newFunction modified function.
+     * @throws CatalogException on runtime errors.
+     */
+    public void alterGlueFunction(ObjectPath functionPath, CatalogFunction 
newFunction)
+            throws CatalogException {
+        UserDefinedFunctionInput functionInput =
+                GlueUtils.createFunctionInput(functionPath, newFunction);
+        UpdateUserDefinedFunctionRequest updateUserDefinedFunctionRequest =
+                UpdateUserDefinedFunctionRequest.builder()
+                        .functionName(functionPath.getObjectName())
+                        .databaseName(functionPath.getDatabaseName())
+                        .catalogId(getGlueCatalogId())
+                        .functionInput(functionInput)
+                        .build();
+        UpdateUserDefinedFunctionResponse response =
+                
glueClient.updateUserDefinedFunction(updateUserDefinedFunctionRequest);
+        GlueUtils.validateGlueResponse(response);
+        LOG.info(String.format("Function altered. %s", 
functionPath.getFullName()));
+    }
+
+    /**
+     * @param functionPath fully qualified function path
+     * @throws CatalogException
+     */
+    public void dropGlueFunction(ObjectPath functionPath) throws 
CatalogException {
+        DeleteUserDefinedFunctionRequest request =
+                DeleteUserDefinedFunctionRequest.builder()
+                        .catalogId(getGlueCatalogId())
+                        .functionName(functionPath.getObjectName())
+                        .databaseName(functionPath.getDatabaseName())
+                        .build();
+        DeleteUserDefinedFunctionResponse response = 
glueClient.deleteUserDefinedFunction(request);
+        GlueUtils.validateGlueResponse(response);
+        LOG.info(String.format("Dropped Function. %s", 
functionPath.getFullName()));
+    }
+
+    // -------------- Partition related operations.
+
+    public void ensurePartitionedTable(ObjectPath tablePath, Table glueTable)
+            throws TableNotPartitionedException {
+        if (!glueTable.hasPartitionKeys()) {
+            throw new TableNotPartitionedException(catalogName, tablePath);
+        }
+    }
+
+    /**
+     * create partition in glue data catalog.
+     *
+     * @param glueTable glue table
+     * @param partitionSpec partition spec
+     * @param catalogPartition partition to add
+     */
+    public void createGluePartition(
+            Table glueTable, CatalogPartitionSpec partitionSpec, 
CatalogPartition catalogPartition)
+            throws CatalogException, PartitionSpecInvalidException {
+
+        List<String> partCols = 
GlueUtils.getColumnNames(glueTable.partitionKeys());
+        LOG.info(String.format("Partition Columns are : %s", String.join(", ", 
partCols)));
+        List<String> partitionValues =
+                getOrderedFullPartitionValues(
+                        partitionSpec,
+                        partCols,
+                        new ObjectPath(glueTable.databaseName(), 
glueTable.name()));
+
+        // validate partition values
+        for (int i = 0; i < partCols.size(); i++) {
+            if (isNullOrWhitespaceOnly(partitionValues.get(i))) {
+                throw new PartitionSpecInvalidException(
+                        catalogName,
+                        partCols,
+                        new ObjectPath(glueTable.databaseName(), 
glueTable.name()),
+                        partitionSpec);
+            }
+        }
+        StorageDescriptor.Builder sdBuilder = 
glueTable.storageDescriptor().toBuilder();
+        Map<String, String> partitionProperties = 
catalogPartition.getProperties();
+
+        sdBuilder.location(extractPartitionLocation(partitionProperties));
+        sdBuilder.parameters(partitionSpec.getPartitionSpec());
+        String comment = catalogPartition.getComment();
+        if (comment != null) {
+            partitionProperties.put(GlueCatalogConstants.COMMENT, comment);
+        }
+        PartitionInput.Builder partitionInput =
+                PartitionInput.builder()
+                        .parameters(partitionProperties)
+                        .lastAccessTime(Instant.now())
+                        .storageDescriptor(sdBuilder.build())
+                        .values(partitionValues);
+        CreatePartitionRequest createPartitionRequest =
+                CreatePartitionRequest.builder()
+                        .partitionInput(partitionInput.build())
+                        .catalogId(getGlueCatalogId())
+                        .databaseName(glueTable.databaseName())
+                        .tableName(glueTable.name())
+                        .build();
+
+        try {
+            CreatePartitionResponse response = 
glueClient.createPartition(createPartitionRequest);
+            GlueUtils.validateGlueResponse(response);
+            LOG.info("Partition Created.");
+        } catch (GlueException e) {
+            throw new 
CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    private String extractPartitionLocation(Map<String, String> properties) {
+        return properties.containsKey(GlueCatalogConstants.LOCATION_URI)
+                ? properties.remove(GlueCatalogConstants.LOCATION_URI)
+                : null;
+    }
+
+    /**
+     * Get a list of ordered partition values by re-arranging them based on 
the given list of
+     * partition keys. If the partition value is null, it'll be converted into 
default partition
+     * name.
+     *
+     * @param partitionSpec a partition spec.
+     * @param partitionKeys a list of partition keys.
+     * @param tablePath path of the table to which the partition belongs.
+     * @return A list of partition values ordered according to partitionKeys.
+     * @throws PartitionSpecInvalidException thrown if partitionSpec and 
partitionKeys have
+     *     different sizes, or any key in partitionKeys doesn't exist in 
partitionSpec.
+     */
+    private List<String> getOrderedFullPartitionValues(
+            CatalogPartitionSpec partitionSpec, List<String> partitionKeys, 
ObjectPath tablePath)
+            throws PartitionSpecInvalidException {
+        Map<String, String> spec = partitionSpec.getPartitionSpec();
+        if (spec.size() != partitionKeys.size()) {
+            throw new PartitionSpecInvalidException(
+                    catalogName, partitionKeys, tablePath, partitionSpec);
+        }
+
+        List<String> values = new ArrayList<>(spec.size());
+        for (String key : partitionKeys) {
+            if (!spec.containsKey(key)) {
+                throw new PartitionSpecInvalidException(
+                        catalogName, partitionKeys, tablePath, partitionSpec);
+            } else {
+                String value = spec.get(key);
+                if (value == null) {
+                    value = GlueCatalogConstants.DEFAULT_PARTITION_NAME;
+                }
+                values.add(value);
+            }
+        }
+
+        return values;
+    }
+
+    /**
+     * Update glue table.
+     *
+     * @param tablePath contains database name and table name.
+     * @param partitionSpec Existing partition information.
+     * @param newPartition Partition information with new changes.
+     * @throws CatalogException Exception in failure.
+     */
+    public void alterGluePartition(
+            ObjectPath tablePath, CatalogPartitionSpec partitionSpec, 
CatalogPartition newPartition)
+            throws CatalogException {
+        // todo has to implement
+    }
+
+    /**
+     * Get CatalogPartitionSpec of all partitions from glue data catalog.
+     *
+     * @param tablePath fully qualified table path.
+     * @return List of PartitionSpec
+     */
+    public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath) {
+
+        GetPartitionsRequest.Builder request =
+                GetPartitionsRequest.builder()
+                        .catalogId(getGlueCatalogId())
+                        .databaseName(tablePath.getDatabaseName())
+                        .tableName(tablePath.getObjectName());
+        try {
+            GetPartitionsResponse response = 
glueClient.getPartitions(request.build());
+            GlueUtils.validateGlueResponse(response);
+            List<CatalogPartitionSpec> finalPartitionsList =
+                    response.partitions().stream()
+                            .map(this::getCatalogPartitionSpec)
+                            .collect(Collectors.toList());
+            String partitionsResultNextToken = response.nextToken();
+            if (Optional.ofNullable(partitionsResultNextToken).isPresent()) {
+                do {
+                    // creating a new GetPartitionsResult using next token.
+                    request.nextToken(partitionsResultNextToken);
+                    response = glueClient.getPartitions(request.build());
+                    finalPartitionsList.addAll(
+                            response.partitions().stream()
+                                    .map(this::getCatalogPartitionSpec)
+                                    .collect(Collectors.toList()));
+                    partitionsResultNextToken = response.nextToken();
+                } while 
(Optional.ofNullable(partitionsResultNextToken).isPresent());
+            }
+
+            return finalPartitionsList;
+
+        } catch (GlueException e) {
+            throw new 
CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    /**
+     * Drop partition in Glue data catalog.
+     *
+     * @param tablePath fully qualified table path
+     * @param partitionSpec partition spec details
+     * @throws CatalogException unknown errors
+     * @throws PartitionNotExistException partition doesnt exists
+     */
+    public void dropGluePartition(ObjectPath tablePath, CatalogPartitionSpec 
partitionSpec)
+            throws CatalogException, PartitionNotExistException {
+
+        try {
+            Table glueTable = getGlueTable(tablePath);
+            List<String> partCols = 
GlueUtils.getColumnNames(glueTable.partitionKeys());
+            DeletePartitionRequest deletePartitionRequest =
+                    DeletePartitionRequest.builder()
+                            .catalogId(getGlueCatalogId())
+                            .databaseName(tablePath.getDatabaseName())
+                            .tableName(tablePath.getObjectName())
+                            .partitionValues(
+                                    getOrderedFullPartitionValues(
+                                            partitionSpec, partCols, 
tablePath))
+                            .build();
+            DeletePartitionResponse response = 
glueClient.deletePartition(deletePartitionRequest);
+            GlueUtils.validateGlueResponse(response);
+            LOG.info("Partition Dropped.");
+        } catch (TableNotExistException | PartitionSpecInvalidException e) {
+            e.printStackTrace();
+        } catch (GlueException e) {
+            throw new 
CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    /**
+     * @param tablePath
+     * @param filters
+     * @return List of Partition Spec
+     */
+    public List<CatalogPartitionSpec> listGluePartitionsByFilter(
+            ObjectPath tablePath, List<Expression> filters) {
+        String expressionString =
+                filters.stream()
+                        .map(x -> getExpressionString(x, new StringBuilder()))
+                        .collect(
+                                Collectors.joining(
+                                        GlueCatalogConstants.SPACE
+                                                + GlueCatalogConstants.AND
+                                                + GlueCatalogConstants.SPACE));
+        try {
+            GetPartitionsRequest request =
+                    GetPartitionsRequest.builder()
+                            .databaseName(tablePath.getDatabaseName())
+                            .tableName(tablePath.getObjectName())
+                            .catalogId(getGlueCatalogId())
+                            .expression(expressionString)
+                            .build();
+            GetPartitionsResponse response = glueClient.getPartitions(request);
+            List<CatalogPartitionSpec> catalogPartitionSpecList =
+                    response.partitions().stream()
+                            .map(this::getCatalogPartitionSpec)
+                            .collect(Collectors.toList());
+            GlueUtils.validateGlueResponse(response);
+            String nextToken = response.nextToken();
+            if (Optional.ofNullable(nextToken).isPresent()) {
+                do {
+                    // creating a new GetPartitionsResult using next token.
+                    request =
+                            GetPartitionsRequest.builder()
+                                    .databaseName(tablePath.getDatabaseName())
+                                    .tableName(tablePath.getObjectName())
+                                    .catalogId(getGlueCatalogId())
+                                    .expression(expressionString)
+                                    .nextToken(nextToken)
+                                    .build();
+                    response = glueClient.getPartitions(request);
+                    catalogPartitionSpecList.addAll(
+                            response.partitions().stream()
+                                    .map(this::getCatalogPartitionSpec)
+                                    .collect(Collectors.toList()));
+                    nextToken = response.nextToken();
+                } while (Optional.ofNullable(nextToken).isPresent());
+            }
+            return catalogPartitionSpecList;
+        } catch (GlueException e) {
+            throw new 
CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    private String getExpressionString(Expression expression, StringBuilder 
sb) {
+
+        for (Expression childExpression : expression.getChildren()) {
+            if (childExpression.getChildren() != null && 
childExpression.getChildren().size() > 0) {
+                getExpressionString(childExpression, sb);
+            }
+        }
+        return sb.insert(
+                        0,
+                        expression.asSummaryString()
+                                + GlueCatalogConstants.SPACE
+                                + GlueCatalogConstants.AND)
+                .toString();
+    }
+
+    public Partition getGluePartition(ObjectPath tablePath, 
CatalogPartitionSpec partitionSpec)
+            throws PartitionNotExistException {
+        try {
+            GetPartitionRequest request =
+                    GetPartitionRequest.builder()
+                            .catalogId(getGlueCatalogId())
+                            .databaseName(tablePath.getDatabaseName())
+                            .tableName(tablePath.getObjectName())
+                            .build();
+            GetPartitionResponse response = glueClient.getPartition(request);
+            GlueUtils.validateGlueResponse(response);
+            Partition partition = response.partition();
+            if (partition.hasValues()
+                    && GlueUtils.specSubset(
+                            partitionSpec.getPartitionSpec(),
+                            partition.storageDescriptor().parameters())) {
+                return partition;
+            }
+        } catch (EntityNotFoundException e) {
+            throw new PartitionNotExistException(catalogName, tablePath, 
partitionSpec);
+        }
+        return null;
+    }
+
+    public boolean gluePartitionExists(ObjectPath tablePath, 
CatalogPartitionSpec partitionSpec)
+            throws CatalogException {
+        GetPartitionRequest request =
+                GetPartitionRequest.builder()
+                        .catalogId(getGlueCatalogId())
+                        .databaseName(tablePath.getDatabaseName())
+                        .tableName(tablePath.getObjectName())
+                        .build();
+        try {
+            GetPartitionResponse response = glueClient.getPartition(request);
+            GlueUtils.validateGlueResponse(response);
+            return response.partition()
+                    .parameters()
+                    .keySet()
+                    .containsAll(partitionSpec.getPartitionSpec().keySet());
+        } catch (EntityNotFoundException e) {
+            LOG.warn(String.format("%s is not found", 
partitionSpec.getPartitionSpec()));
+        } catch (GlueException e) {
+            throw new CatalogException(catalogName, e);
+        }
+        return false;
+    }
+
+    private String getGlueCatalogId() {
+        return awsConfig.getGlueCatalogId();
+    }
+
+    /**
+     * Rename glue table. Glue catalog don't support renaming table. For 
renaming in Flink, it has

Review Comment:
   https://issues.apache.org/jira/browse/FLINK-31926



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to