RocMarshal commented on a change in pull request #16962: URL: https://github.com/apache/flink/pull/16962#discussion_r712277089
########## File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/MySQLCatalog.java ########## @@ -0,0 +1,499 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.catalog; + +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.constraints.UniqueConstraint; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogDatabaseImpl; +import org.apache.flink.table.catalog.CatalogTableImpl; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.util.Preconditions; + +import org.apache.commons.compress.utils.Lists; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.PASSWORD; +import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.TABLE_NAME; +import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.URL; +import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.USERNAME; +import static org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory.IDENTIFIER; +import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR; + +/** Catalog for MySQL. */ +public class MySQLCatalog extends AbstractJdbcCatalog { + + private static final Logger LOG = LoggerFactory.getLogger(MySQLCatalog.class); + + private final String databaseVersion; + private final String driverVersion; + + // ============================data types===================== + + public static final String MYSQL_UNKNOWN = "UNKNOWN"; + public static final String MYSQL_BIT = "BIT"; + + // -------------------------number---------------------------- + public static final String MYSQL_TINYINT = "TINYINT"; + public static final String MYSQL_TINYINT_UNSIGNED = "TINYINT UNSIGNED"; + public static final String MYSQL_SMALLINT = "SMALLINT"; + public static final String MYSQL_SMALLINT_UNSIGNED = "SMALLINT UNSIGNED"; + public static final String MYSQL_MEDIUMINT = "MEDIUMINT"; + public static final String MYSQL_MEDIUMINT_UNSIGNED = "MEDIUMINT UNSIGNED"; + public static final String MYSQL_INT = "INT"; + public static final String MYSQL_INT_UNSIGNED = "INT UNSIGNED"; + public static final String MYSQL_INTEGER = "INTEGER"; + public static final String MYSQL_INTEGER_UNSIGNED = "INTEGER UNSIGNED"; + public static final String MYSQL_BIGINT = "BIGINT"; + public static final String MYSQL_BIGINT_UNSIGNED = "BIGINT UNSIGNED"; + public static final String MYSQL_DECIMAL = "DECIMAL"; + public static final String MYSQL_DECIMAL_UNSIGNED = "DECIMAL UNSIGNED"; + public static final String MYSQL_FLOAT = "FLOAT"; + public static final String MYSQL_FLOAT_UNSIGNED = "FLOAT UNSIGNED"; + public static final String MYSQL_DOUBLE = "DOUBLE"; + public static final String MYSQL_DOUBLE_UNSIGNED = "DOUBLE UNSIGNED"; + + // -------------------------string---------------------------- + public static final String MYSQL_CHAR = "CHAR"; + public static final String MYSQL_VARCHAR = "VARCHAR"; + public static final String MYSQL_TINYTEXT = "TINYTEXT"; + public static final String MYSQL_MEDIUMTEXT = "MEDIUMTEXT"; + public static final String MYSQL_TEXT = "TEXT"; + public static final String MYSQL_LONGTEXT = "LONGTEXT"; + public static final String MYSQL_JSON = "JSON"; + + // ------------------------------time------------------------- + public static final String MYSQL_DATE = "DATE"; + public static final String MYSQL_DATETIME = "DATETIME"; + public static final String MYSQL_TIME = "TIME"; + public static final String MYSQL_TIMESTAMP = "TIMESTAMP"; + public static final String MYSQL_YEAR = "YEAR"; + + // ------------------------------blob------------------------- + public static final String MYSQL_TINYBLOB = "TINYBLOB"; + public static final String MYSQL_MEDIUMBLOB = "MEDIUMBLOB"; + public static final String MYSQL_BLOB = "BLOB"; + public static final String MYSQL_LONGBLOB = "LONGBLOB"; + public static final String MYSQL_BINARY = "BINARY"; + public static final String MYSQL_VARBINARY = "VARBINARY"; + public static final String MYSQL_GEOMETRY = "GEOMETRY"; + + // column class names + public static final String COLUMN_CLASS_BOOLEAN = "java.lang.Boolean"; + public static final String COLUMN_CLASS_INTEGER = "java.lang.Integer"; + public static final String COLUMN_CLASS_BIG_INTEGER = "java.math.BigInteger"; + public static final String COLUMN_CLASS_LONG = "java.lang.Long"; + public static final String COLUMN_CLASS_FLOAT = "java.lang.Float"; + public static final String COLUMN_CLASS_DOUBLE = "java.lang.Double"; + public static final String COLUMN_CLASS_BIG_DECIMAL = "java.math.BigDecimal"; + public static final String COLUMN_CLASS_BYTE_ARRAY = "[B"; + public static final String COLUMN_CLASS_STRING = "java.lang.String"; + public static final String COLUMN_CLASS_DATE = "java.sql.Date"; + public static final String COLUMN_CLASS_TIME = "java.sql.Time"; + public static final String COLUMN_CLASS_TIMESTAMP = "java.sql.Timestamp"; + + public static final int RAW_TIME_LENGTH = 10; + public static final int RAW_TIMESTAMP_LENGTH = 19; + + private static final Set<String> builtinDatabases = + new HashSet<String>() { + { + add("information_schema"); + add("mysql"); + add("performance_schema"); + add("sys"); + } + }; + + public MySQLCatalog( + String catalogName, + String defaultDatabase, + String username, + String pwd, + String baseUrl) { + super(catalogName, defaultDatabase, username, pwd, baseUrl); + this.driverVersion = + Preconditions.checkNotNull(getDriverVersion(), "driver version must not be null."); + this.databaseVersion = + Preconditions.checkNotNull( + getDatabaseVersion(), "database version must not be null."); + LOG.info("Driver version: {}, database version: {}", driverVersion, databaseVersion); + } + + @Override + public List<String> listDatabases() throws CatalogException { + String sql = "SELECT `SCHEMA_NAME` FROM `INFORMATION_SCHEMA`.`SCHEMATA`;"; + return extractColumnValuesBySQL( + defaultUrl, + sql, + 1, + (FilterFunction<String>) dbName -> !builtinDatabases.contains(dbName)); + } + + @Override + public CatalogDatabase getDatabase(String databaseName) + throws DatabaseNotExistException, CatalogException { + Preconditions.checkState( + StringUtils.isNotBlank(databaseName), "Database name must not be blank."); + if (listDatabases().contains(databaseName)) { + return new CatalogDatabaseImpl(Collections.emptyMap(), null); + } else { + throw new DatabaseNotExistException(getName(), databaseName); + } + } + + @Override + public List<String> listTables(String databaseName) + throws DatabaseNotExistException, CatalogException { + Preconditions.checkState( + StringUtils.isNotBlank(databaseName), "Database name must not be blank."); + if (!databaseExists(databaseName)) { + throw new DatabaseNotExistException(getName(), databaseName); + } + String connUrl = baseUrl + databaseName; + String sql = + String.format( + "SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE TABLE_SCHEMA = '%s'", + databaseName); + return extractColumnValuesBySQL(connUrl, sql, 1, null); + } + + // ------ retrieve PK constraint ------ + + private Optional<UniqueConstraint> getPrimaryKey( + DatabaseMetaData metaData, String schema, ObjectPath table) throws SQLException { + + // According to the Javadoc of java.sql.DatabaseMetaData#getPrimaryKeys, + // the returned primary key columns are ordered by COLUMN_NAME, not by KEY_SEQ. + // We need to sort them based on the KEY_SEQ value. + ResultSet rs = + metaData.getPrimaryKeys(table.getDatabaseName(), schema, table.getObjectName()); + + Map<Integer, String> keySeqColumnName = new HashMap<>(); + String pkName = null; + while (rs.next()) { + String columnName = rs.getString("COLUMN_NAME"); + pkName = rs.getString("PK_NAME"); // all the PK_NAME should be the same + int keySeq = rs.getInt("KEY_SEQ"); + keySeqColumnName.put(keySeq - 1, columnName); // KEY_SEQ is 1-based index + } + List<String> pkFields = + Arrays.asList(new String[keySeqColumnName.size()]); // initialize size + keySeqColumnName.forEach(pkFields::set); + if (!pkFields.isEmpty()) { + // PK_NAME maybe null according to the javadoc, generate an unique name in that case + pkName = pkName == null ? "pk_" + String.join("_", pkFields) : pkName; + return Optional.of(UniqueConstraint.primaryKey(pkName, pkFields)); + } + return Optional.empty(); + } + + @Override + public CatalogBaseTable getTable(ObjectPath tablePath) + throws TableNotExistException, CatalogException { + if (!tableExists(tablePath)) { + throw new TableNotExistException(getName(), tablePath); + } + try (Connection conn = + DriverManager.getConnection( + baseUrl + tablePath.getDatabaseName(), username, pwd); + PreparedStatement ps = + conn.prepareStatement( + String.format("SELECT * FROM %s;", tablePath.getObjectName()))) { Review comment: done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org