foxus commented on code in PR #47:
URL: 
https://github.com/apache/flink-connector-aws/pull/47#discussion_r1690278314


##########
flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/GlueUtils.java:
##########
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.FunctionLanguage;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.glue.GlueCatalogOptions;
+import org.apache.flink.table.catalog.glue.constants.GlueCatalogConstants;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.glue.model.Column;
+import software.amazon.awssdk.services.glue.model.Database;
+import software.amazon.awssdk.services.glue.model.GlueResponse;
+import software.amazon.awssdk.services.glue.model.Table;
+import software.amazon.awssdk.services.glue.model.UserDefinedFunction;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;
+
+/** Utilities related glue Operation. */
+@Internal
+public class GlueUtils {
+
+    private static final Logger LOG = LoggerFactory.getLogger(GlueUtils.class);
+
+    /**
+     * Glue supports lowercase naming convention.
+     *
+     * @param name fully qualified name.
+     * @return modified name according to glue convention.
+     */
+    public static String getGlueConventionalName(String name) {
+
+        return name.toLowerCase(Locale.ROOT);
+    }
+
+    /**
+     * Extract location from database properties if present and remove 
location from properties.
+     * fallback to create default location if not present
+     *
+     * @param databaseProperties database properties.
+     * @param databaseName fully qualified name for database.
+     * @return location for database.
+     */
+    public static String extractDatabaseLocation(
+            final Map<String, String> databaseProperties,
+            final String databaseName,
+            final String catalogPath) {
+        if (databaseProperties.containsKey(GlueCatalogConstants.LOCATION_URI)) 
{
+            return 
databaseProperties.remove(GlueCatalogConstants.LOCATION_URI);
+        } else {
+            LOG.info("No location URI Set. Using Catalog Path as default");
+            return catalogPath + GlueCatalogConstants.LOCATION_SEPARATOR + 
databaseName;
+        }
+    }
+
+    /**
+     * Extract location from database properties if present and remove 
location from properties.
+     * fallback to create default location if not present
+     *
+     * @param tableProperties table properties.
+     * @param tablePath fully qualified object for table.
+     * @return location for table.
+     */
+    public static String extractTableLocation(
+            final Map<String, String> tableProperties,
+            final ObjectPath tablePath,
+            final String catalogPath) {
+        if (tableProperties.containsKey(GlueCatalogConstants.LOCATION_URI)) {
+            return tableProperties.remove(GlueCatalogConstants.LOCATION_URI);
+        } else {
+            return catalogPath
+                    + GlueCatalogConstants.LOCATION_SEPARATOR
+                    + tablePath.getDatabaseName()
+                    + GlueCatalogConstants.LOCATION_SEPARATOR
+                    + tablePath.getObjectName();
+        }
+    }
+
+    /**
+     * Build CatalogDatabase instance using information from glue Database 
instance.
+     *
+     * @param glueDatabase {@link Database }
+     * @return {@link CatalogDatabase } instance.
+     */
+    public static CatalogDatabase getCatalogDatabase(final Database 
glueDatabase) {
+        Map<String, String> properties = new 
HashMap<>(glueDatabase.parameters());
+        return new CatalogDatabaseImpl(properties, glueDatabase.description());
+    }
+
+    /**
+     * A Glue database name cannot be longer than 252 characters. The only 
acceptable characters are
+     * lowercase letters, numbers, and the underscore character. More details: 
<a
+     * 
href="https://docs.aws.amazon.com/athena/latest/ug/glue-best-practices.html";>...</a>
+     *
+     * @param name name
+     */
+    public static void validate(String name) {
+        checkArgument(
+                name != null && 
GlueCatalogConstants.GLUE_DB_PATTERN.matcher(name).find(),
+                "Database name is not according to Glue Norms, "
+                        + "check here 
https://docs.aws.amazon.com/athena/latest/ug/glue-best-practices.html";);
+    }
+
+    /** validate response from client call. */
+    public static void validateGlueResponse(GlueResponse response) {
+        if (response != null && !response.sdkHttpResponse().isSuccessful()) {
+            throw new 
CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER);
+        }
+    }
+
+    /**
+     * @param udf Instance of UserDefinedFunction
+     * @return ClassName for function
+     */
+    public static String getCatalogFunctionClassName(final UserDefinedFunction 
udf) {
+        validateUDFClassName(udf.className());
+        String[] splitName = 
udf.className().split(GlueCatalogConstants.DEFAULT_SEPARATOR);
+        return splitName[splitName.length - 1];
+    }
+
+    /**
+     * Validates UDF class name from glue.
+     *
+     * @param name name of UDF.
+     */
+    private static void validateUDFClassName(final String name) {
+        checkArgument(!isNullOrWhitespaceOnly(name));
+
+        if (name.split(GlueCatalogConstants.DEFAULT_SEPARATOR).length
+                != GlueCatalogConstants.UDF_CLASS_NAME_SIZE) {
+            throw new ValidationException("Improper Classname");
+        }
+    }
+
+    /**
+     * Derive functionalLanguage from glue function name. Glue doesn't have 
any attribute to save
+     * the functionalLanguage Name. Thus, storing FunctionalLanguage in the 
name itself.
+     *
+     * @param glueFunction Function name from glue.
+     * @return Identifier for FunctionalLanguage.
+     */
+    public static FunctionLanguage getFunctionalLanguage(final 
UserDefinedFunction glueFunction) {
+        if 
(glueFunction.className().startsWith(GlueCatalogConstants.FLINK_JAVA_FUNCTION_PREFIX))
 {
+            return FunctionLanguage.JAVA;
+        } else if (glueFunction
+                .className()
+                
.startsWith(GlueCatalogConstants.FLINK_PYTHON_FUNCTION_PREFIX)) {
+            return FunctionLanguage.PYTHON;
+        } else if (glueFunction
+                .className()
+                .startsWith(GlueCatalogConstants.FLINK_SCALA_FUNCTION_PREFIX)) 
{
+            return FunctionLanguage.SCALA;
+        } else {
+            throw new CatalogException("Invalid Functional Language");
+        }
+    }
+
+    /**
+     * Get expanded Query from CatalogBaseTable.
+     *
+     * @param table Instance of catalogBaseTable.
+     * @return expandedQuery for Glue Table.
+     */
+    public static String getExpandedQuery(CatalogBaseTable table) {
+        // https://issues.apache.org/jira/browse/FLINK-31961
+        return "";
+    }
+
+    /**
+     * Get Original Query from CatalogBaseTable.
+     *
+     * @param table Instance of CatalogBaseTable.
+     * @return OriginalQuery for Glue Table.
+     */
+    public static String getOriginalQuery(CatalogBaseTable table) {
+        // https://issues.apache.org/jira/browse/FLINK-31961
+        return "";
+    }
+
+    /**
+     * Extract table owner name and remove from properties.
+     *
+     * @param properties Map of properties.
+     * @return fully qualified owner name.
+     */
+    public static String extractTableOwner(Map<String, String> properties) {
+        return properties.containsKey(GlueCatalogConstants.TABLE_OWNER)
+                ? properties.remove(GlueCatalogConstants.TABLE_OWNER)
+                : null;
+    }
+
+    /**
+     * Derive Instance of Glue Column from {@link CatalogBaseTable}.
+     *
+     * @param column Instance of {@link org.apache.flink.table.catalog.Column}.
+     * @throws CatalogException Throws exception in case of failure.
+     */
+    public static Column getGlueColumn(final Schema.UnresolvedColumn column)

Review Comment:
   This whole method looks incomplete - did you mean to remove it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to