RocMarshal commented on a change in pull request #16962:
URL: https://github.com/apache/flink/pull/16962#discussion_r740036725
##########
File path: docs/content/docs/connectors/table/jdbc.md
##########
@@ -308,47 +308,50 @@ As there is no standard syntax for upsert, the following
table describes the dat
</tbody>
</table>
-### Postgres Database as a Catalog
+JDBC Catalog
+------------
The `JdbcCatalog` enables users to connect Flink to relational databases over
JDBC protocol.
-Currently, `PostgresCatalog` is the only implementation of JDBC Catalog at the
moment, `PostgresCatalog` only supports limited `Catalog` methods include:
+Currently, there are two JDBC catalog implementations, `PostgresCatalog` and
`MySQLCatalog`. They support the following catalog methods. Other methods are
currently not supported.
Review comment:
done.
##########
File path:
flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySQLCatalogITCase.java
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.catalog;
+
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.CollectionUtil;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.text.ParseException;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import static
org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM;
+import static org.junit.Assert.assertEquals;
+
+/** E2E test for {@link MySQLCatalog}. */
+public class MySQLCatalogITCase extends MySQLCatalogTestBase {
+
+ private static final List<Row> ALL_TYPES_ROWS =
+ Lists.newArrayList(
+ Row.ofKind(
+ RowKind.INSERT,
+ 1L,
+ -1L,
+ new BigDecimal(1),
+ null,
+ true,
+ null,
+ "hello",
+ Date.valueOf("2021-08-04").toLocalDate(),
+ Timestamp.valueOf("2021-08-04
01:54:16").toLocalDateTime(),
+ new BigDecimal(-1),
+ new BigDecimal(1),
+ -1.0d,
+ 1.0d,
+ "enum2",
+ -9.1f,
+ 9.1f,
+ null,
+ null,
+ -1,
+ 1L,
+ -1,
+ 1L,
+ "{\"k1\": \"v1\"}",
+ null,
+ null,
+ "col_longtext",
+ null,
+ -1,
+ 1,
+ "col_mediumtext",
+ null,
+ null,
+ null,
+ new BigDecimal(-99),
+ new BigDecimal(99),
+ null,
+ null,
+ -1.0d,
+ 1.0d,
+ "set_ele1",
+ Short.parseShort("-1"),
+ 1,
+ "col_text",
+ Time.valueOf("10:32:34").toLocalTime(),
+ Timestamp.valueOf("2021-08-04
01:54:16").toLocalDateTime(),
+ "col_tinytext",
+ Byte.parseByte("-1"),
+ Byte.parseByte("1"),
+ null,
+ "col_varchar",
+ Date.valueOf("1999-01-01").toLocalDate(),
+ Timestamp.valueOf("2021-08-04
01:54:16.463").toLocalDateTime(),
+ Time.valueOf("09:33:43").toLocalTime(),
+ Timestamp.valueOf("2021-08-04
01:54:16.463").toLocalDateTime(),
+ null),
+ Row.ofKind(
+ RowKind.INSERT,
+ 2L,
+ -1L,
+ new BigDecimal(1),
+ null,
+ true,
+ null,
+ "hello",
+ Date.valueOf("2021-08-04").toLocalDate(),
+ Timestamp.valueOf("2021-08-04
01:53:19").toLocalDateTime(),
+ new BigDecimal(-1),
+ new BigDecimal(1),
+ -1.0d,
+ 1.0d,
+ "enum2",
+ -9.1f,
+ 9.1f,
+ null,
+ null,
+ -1,
+ 1L,
+ -1,
+ 1L,
+ "{\"k1\": \"v1\"}",
+ null,
+ null,
+ "col_longtext",
+ null,
+ -1,
+ 1,
+ "col_mediumtext",
+ null,
+ null,
+ null,
+ new BigDecimal(-99),
+ new BigDecimal(99),
+ null,
+ null,
+ -1.0d,
+ 1.0d,
+ "set_ele1,set_ele12",
+ Short.parseShort("-1"),
+ 1,
+ "col_text",
+ Time.valueOf("10:32:34").toLocalTime(),
+ Timestamp.valueOf("2021-08-04
01:53:19").toLocalDateTime(),
+ "col_tinytext",
+ Byte.parseByte("-1"),
+ Byte.parseByte("1"),
+ null,
+ "col_varchar",
+ Date.valueOf("1999-01-01").toLocalDate(),
+ Timestamp.valueOf("2021-08-04
01:53:19.098").toLocalDateTime(),
+ Time.valueOf("09:33:43").toLocalTime(),
+ Timestamp.valueOf("2021-08-04
01:53:19.098").toLocalDateTime(),
+ null));
+
+ private TableEnvironment tEnv;
+
+ @Before
+ public void setup() {
+ this.tEnv =
TableEnvironment.create(EnvironmentSettings.inStreamingMode());
+ tEnv.getConfig()
+ .getConfiguration()
+ .setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM.key(), 1);
+
+ // Use mysql catalog.
+ tEnv.registerCatalog(TEST_CATALOG_NAME, catalog);
+ tEnv.useCatalog(TEST_CATALOG_NAME);
+ }
+
+ @Test
+ public void testSelectField() {
+ List<Row> results =
+ CollectionUtil.iteratorToList(
+ tEnv.sqlQuery(String.format("select pid from %s",
TEST_TABLE_ALL_TYPES))
+ .execute()
+ .collect());
+ assertEquals(
+ Lists.newArrayList(Row.ofKind(RowKind.INSERT, 1L),
Row.ofKind(RowKind.INSERT, 2L)),
+ results);
+ }
+
+ @Test
+ public void testWithoutCatalogDB() {
+ List<Row> results =
+ CollectionUtil.iteratorToList(
+ tEnv.sqlQuery(String.format("select * from %s",
TEST_TABLE_ALL_TYPES))
+ .execute()
+ .collect());
+
+ assertEquals(ALL_TYPES_ROWS, results);
+ }
+
+ @Test
+ public void testWithoutCatalog() {
+ List<Row> results =
+ CollectionUtil.iteratorToList(
+ tEnv.sqlQuery(
+ String.format(
+ "select * from `%s`.`%s`",
+ TEST_DB, TEST_TABLE_ALL_TYPES))
+ .execute()
+ .collect());
+ assertEquals(ALL_TYPES_ROWS, results);
+ }
+
+ @Test
+ public void testFullPath() throws ParseException {
+ List<Row> results =
+ CollectionUtil.iteratorToList(
+ tEnv.sqlQuery(
+ String.format(
+ "select * from %s.%s.`%s`",
+ TEST_CATALOG_NAME,
+ catalog.getDefaultDatabase(),
+ TEST_TABLE_ALL_TYPES))
+ .execute()
+ .collect());
+ assertEquals(ALL_TYPES_ROWS, results);
+ }
+
+ @Test
+ public void testSelectToInsertWithoutYearType() throws Exception {
+
+ String sql =
+ String.format(
+ "insert into `%s` select * from `%s`",
+ TEST_SINK_TABLE_ALL_TYPES_WITHOUT_YEAR_TYPE,
TEST_TABLE_ALL_TYPES);
+ tEnv.executeSql(sql).await();
+
+ List<Row> results =
+ CollectionUtil.iteratorToList(
+ tEnv.sqlQuery(
+ String.format(
+ "select * from %s",
+
TEST_SINK_TABLE_ALL_TYPES_WITHOUT_YEAR_TYPE))
+ .execute()
+ .collect());
+ assertEquals(ALL_TYPES_ROWS, results);
+ }
+
+ @Test
+ public void testSelectToInsertWithYearType() throws Exception {
+ exception.expect(ExecutionException.class);
+ exception.expectMessage("TableException: Failed to wait job finish");
Review comment:
@Airblader What about deleting the test case here ?
##########
File path:
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/MySQLCatalog.java
##########
@@ -0,0 +1,520 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.compress.utils.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.PASSWORD;
+import static
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.TABLE_NAME;
+import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.URL;
+import static
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.USERNAME;
+import static
org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory.IDENTIFIER;
+import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
+
+/** Catalog for MySQL. */
+@Internal
+public class MySQLCatalog extends AbstractJdbcCatalog {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(MySQLCatalog.class);
+
+ private final String databaseVersion;
+ private final String driverVersion;
+
+ // ============================data types=====================
+
+ public static final String MYSQL_UNKNOWN = "UNKNOWN";
+ public static final String MYSQL_BIT = "BIT";
+
+ // -------------------------number----------------------------
+ public static final String MYSQL_TINYINT = "TINYINT";
+ public static final String MYSQL_TINYINT_UNSIGNED = "TINYINT UNSIGNED";
+ public static final String MYSQL_SMALLINT = "SMALLINT";
+ public static final String MYSQL_SMALLINT_UNSIGNED = "SMALLINT UNSIGNED";
+ public static final String MYSQL_MEDIUMINT = "MEDIUMINT";
+ public static final String MYSQL_MEDIUMINT_UNSIGNED = "MEDIUMINT UNSIGNED";
+ public static final String MYSQL_INT = "INT";
+ public static final String MYSQL_INT_UNSIGNED = "INT UNSIGNED";
+ public static final String MYSQL_INTEGER = "INTEGER";
+ public static final String MYSQL_INTEGER_UNSIGNED = "INTEGER UNSIGNED";
+ public static final String MYSQL_BIGINT = "BIGINT";
+ public static final String MYSQL_BIGINT_UNSIGNED = "BIGINT UNSIGNED";
+ public static final String MYSQL_DECIMAL = "DECIMAL";
+ public static final String MYSQL_DECIMAL_UNSIGNED = "DECIMAL UNSIGNED";
+ public static final String MYSQL_FLOAT = "FLOAT";
+ public static final String MYSQL_FLOAT_UNSIGNED = "FLOAT UNSIGNED";
+ public static final String MYSQL_DOUBLE = "DOUBLE";
+ public static final String MYSQL_DOUBLE_UNSIGNED = "DOUBLE UNSIGNED";
+
+ // -------------------------string----------------------------
+ public static final String MYSQL_CHAR = "CHAR";
+ public static final String MYSQL_VARCHAR = "VARCHAR";
+ public static final String MYSQL_TINYTEXT = "TINYTEXT";
+ public static final String MYSQL_MEDIUMTEXT = "MEDIUMTEXT";
+ public static final String MYSQL_TEXT = "TEXT";
+ public static final String MYSQL_LONGTEXT = "LONGTEXT";
+ public static final String MYSQL_JSON = "JSON";
+
+ // ------------------------------time-------------------------
+ public static final String MYSQL_DATE = "DATE";
+ public static final String MYSQL_DATETIME = "DATETIME";
+ public static final String MYSQL_TIME = "TIME";
+ public static final String MYSQL_TIMESTAMP = "TIMESTAMP";
+ public static final String MYSQL_YEAR = "YEAR";
+
+ // ------------------------------blob-------------------------
+ public static final String MYSQL_TINYBLOB = "TINYBLOB";
+ public static final String MYSQL_MEDIUMBLOB = "MEDIUMBLOB";
+ public static final String MYSQL_BLOB = "BLOB";
+ public static final String MYSQL_LONGBLOB = "LONGBLOB";
+ public static final String MYSQL_BINARY = "BINARY";
+ public static final String MYSQL_VARBINARY = "VARBINARY";
+ public static final String MYSQL_GEOMETRY = "GEOMETRY";
+
+ // column class names
+ public static final String COLUMN_CLASS_BOOLEAN = "java.lang.Boolean";
+ public static final String COLUMN_CLASS_INTEGER = "java.lang.Integer";
+ public static final String COLUMN_CLASS_BIG_INTEGER =
"java.math.BigInteger";
+ public static final String COLUMN_CLASS_LONG = "java.lang.Long";
+ public static final String COLUMN_CLASS_FLOAT = "java.lang.Float";
+ public static final String COLUMN_CLASS_DOUBLE = "java.lang.Double";
+ public static final String COLUMN_CLASS_BIG_DECIMAL =
"java.math.BigDecimal";
+ public static final String COLUMN_CLASS_BYTE_ARRAY = "[B";
+ public static final String COLUMN_CLASS_STRING = "java.lang.String";
+ public static final String COLUMN_CLASS_DATE = "java.sql.Date";
+ public static final String COLUMN_CLASS_TIME = "java.sql.Time";
+ public static final String COLUMN_CLASS_TIMESTAMP = "java.sql.Timestamp";
+
+ public static final int RAW_TIME_LENGTH = 10;
+ public static final int RAW_TIMESTAMP_LENGTH = 19;
+
+ private static final Set<String> builtinDatabases =
+ new HashSet<String>() {
+ {
+ add("information_schema");
+ add("mysql");
+ add("performance_schema");
+ add("sys");
+ }
+ };
+
+ public MySQLCatalog(
+ String catalogName,
+ String defaultDatabase,
+ String username,
+ String pwd,
+ String baseUrl) {
+ super(catalogName, defaultDatabase, username, pwd, baseUrl);
+ this.driverVersion =
+ Preconditions.checkNotNull(getDriverVersion(), "driver version
must not be null.");
+ this.databaseVersion =
+ Preconditions.checkNotNull(
+ getDatabaseVersion(), "database version must not be
null.");
+ LOG.info("Driver version: {}, database version: {}", driverVersion,
databaseVersion);
+ }
+
+ @Override
+ public List<String> listDatabases() throws CatalogException {
+ String sql = "SELECT `SCHEMA_NAME` FROM
`INFORMATION_SCHEMA`.`SCHEMATA`;";
+ return extractColumnValuesBySQL(
+ defaultUrl,
+ sql,
+ 1,
+ (FilterFunction<String>) dbName ->
!builtinDatabases.contains(dbName));
+ }
+
+ @Override
+ public CatalogDatabase getDatabase(String databaseName)
+ throws DatabaseNotExistException, CatalogException {
+ Preconditions.checkState(
+ StringUtils.isNotBlank(databaseName), "Database name must not
be blank.");
+ if (listDatabases().contains(databaseName)) {
+ return new CatalogDatabaseImpl(Collections.emptyMap(), null);
+ } else {
+ throw new DatabaseNotExistException(getName(), databaseName);
+ }
+ }
+
+ @Override
+ public List<String> listTables(String databaseName)
+ throws DatabaseNotExistException, CatalogException {
+ Preconditions.checkState(
+ StringUtils.isNotBlank(databaseName), "Database name must not
be blank.");
+ if (!databaseExists(databaseName)) {
+ throw new DatabaseNotExistException(getName(), databaseName);
+ }
+ String connUrl = baseUrl + databaseName;
+ return extractColumnValuesBySQL(
+ connUrl,
+ "SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE
TABLE_SCHEMA = ?",
+ 1,
+ null,
+ databaseName);
+ }
+
+ // ------ retrieve PK constraint ------
+
+ private Optional<UniqueConstraint> getPrimaryKey(
+ DatabaseMetaData metaData, String schema, ObjectPath table) throws
SQLException {
+
+ // According to the Javadoc of
java.sql.DatabaseMetaData#getPrimaryKeys,
+ // the returned primary key columns are ordered by COLUMN_NAME, not by
KEY_SEQ.
+ // We need to sort them based on the KEY_SEQ value.
+ ResultSet rs =
+ metaData.getPrimaryKeys(table.getDatabaseName(), schema,
table.getObjectName());
+
+ Map<Integer, String> keySeqColumnName = new HashMap<>();
+ String pkName = null;
+ while (rs.next()) {
+ String columnName = rs.getString("COLUMN_NAME");
+ pkName = rs.getString("PK_NAME"); // all the PK_NAME should be the
same
+ int keySeq = rs.getInt("KEY_SEQ");
+ keySeqColumnName.put(keySeq - 1, columnName); // KEY_SEQ is
1-based index
+ }
+ List<String> pkFields =
+ Arrays.asList(new String[keySeqColumnName.size()]); //
initialize size
+ keySeqColumnName.forEach(pkFields::set);
+ if (!pkFields.isEmpty()) {
+ // PK_NAME maybe null according to the javadoc, generate an unique
name in that case
+ pkName = pkName == null ? "pk_" + String.join("_", pkFields) :
pkName;
+ return Optional.of(UniqueConstraint.primaryKey(pkName, pkFields));
+ }
+ return Optional.empty();
+ }
+
+ @Override
+ public CatalogBaseTable getTable(ObjectPath tablePath)
+ throws TableNotExistException, CatalogException {
+ if (!tableExists(tablePath)) {
+ throw new TableNotExistException(getName(), tablePath);
+ }
+ try (Connection conn =
+ DriverManager.getConnection(
+ baseUrl + tablePath.getDatabaseName(),
username, pwd);
+ PreparedStatement ps =
+ conn.prepareStatement("SELECT * FROM " +
tablePath.getObjectName())) {
+
+ DatabaseMetaData metaData = conn.getMetaData();
+ Optional<UniqueConstraint> primaryKey = getPrimaryKey(metaData,
null, tablePath);
+ ResultSetMetaData resultSetMetaData = ps.getMetaData();
+
+ String[] columnNames = new
String[resultSetMetaData.getColumnCount()];
+ DataType[] types = new
DataType[resultSetMetaData.getColumnCount()];
+ for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+ columnNames[i - 1] = resultSetMetaData.getColumnName(i);
+ types[i - 1] = fromJDBCType(tablePath, resultSetMetaData, i);
+ if (resultSetMetaData.isNullable(i) ==
ResultSetMetaData.columnNoNulls) {
+ types[i - 1] = types[i - 1].notNull();
+ }
+ }
+ Schema.Builder schemaBuilder =
Schema.newBuilder().fromFields(columnNames, types);
+ primaryKey.ifPresent(
+ pk -> schemaBuilder.primaryKeyNamed(pk.getName(),
pk.getColumns()));
+ Schema tableSchema = schemaBuilder.build();
+ Map<String, String> props = new HashMap<>();
+ props.put(CONNECTOR.key(), IDENTIFIER);
+ props.put(URL.key(), baseUrl + tablePath.getDatabaseName());
+ props.put(TABLE_NAME.key(), tablePath.getObjectName());
+ props.put(USERNAME.key(), username);
+ props.put(PASSWORD.key(), pwd);
+ return CatalogTable.of(tableSchema, null, Lists.newArrayList(),
props);
+ } catch (Exception e) {
+ throw new CatalogException(
+ String.format("Failed getting table %s",
tablePath.getFullName()), e);
+ }
+ }
+
+ @Override
+ public boolean tableExists(ObjectPath tablePath) throws CatalogException {
+ return !extractColumnValuesBySQL(
+ baseUrl,
+ "SELECT TABLE_NAME FROM information_schema.`TABLES`
WHERE TABLE_SCHEMA=? and TABLE_NAME=?",
+ 1,
+ null,
+ tablePath.getDatabaseName(),
+ tablePath.getObjectName())
+ .isEmpty();
+ }
+
+ private List<String> extractColumnValuesBySQL(
+ String connUrl,
+ String sql,
+ int columnIndex,
+ FilterFunction<String> filterFunc,
+ Object... params) {
+ List<String> columnValues = Lists.newArrayList();
+ try (Connection conn = DriverManager.getConnection(connUrl, username,
pwd);
+ PreparedStatement ps = conn.prepareStatement(sql)) {
+ if (Objects.nonNull(params) && params.length > 0) {
+ for (int i = 0; i < params.length; i++) {
+ ps.setObject(i + 1, params[i]);
+ }
+ }
+ ResultSet rs = ps.executeQuery();
+ while (rs.next()) {
+ String columnValue = rs.getString(columnIndex);
+ if (Objects.isNull(filterFunc) ||
filterFunc.filter(columnValue)) {
+ columnValues.add(columnValue);
+ }
+ }
+ return columnValues;
+ } catch (Exception e) {
+ throw new CatalogException(
+ String.format("Failed in processing query sql %s, connUrl
%s", sql, connUrl),
+ e);
+ }
+ }
+
+ private String getDatabaseVersion() {
+ try (Connection conn = DriverManager.getConnection(defaultUrl,
username, pwd)) {
+ return conn.getMetaData().getDatabaseProductVersion();
+ } catch (Exception e) {
+ throw new CatalogException(
+ String.format("Failed in getting MySQL version by %s.",
defaultUrl), e);
+ }
+ }
+
+ private String getDriverVersion() {
+ try (Connection conn = DriverManager.getConnection(defaultUrl,
username, pwd)) {
+ String driverVersion = conn.getMetaData().getDriverVersion();
+ Pattern regexp = Pattern.compile("\\d+?\\.\\d+?\\.\\d+");
+ Matcher matcher = regexp.matcher(driverVersion);
+ return matcher.find() ? matcher.group(0) : null;
+ } catch (Exception e) {
+ throw new CatalogException(
+ String.format("Failed in getting mysql driver version by
%s.", defaultUrl), e);
+ }
+ }
+
+ /** Converts MySQL type to Flink {@link DataType}. */
+ private DataType fromJDBCType(ObjectPath tablePath, ResultSetMetaData
metadata, int colIndex)
+ throws SQLException {
+ String mysqlType = metadata.getColumnTypeName(colIndex).toUpperCase();
+ String columnName = metadata.getColumnName(colIndex);
+ int precision = metadata.getPrecision(colIndex);
+ int scale = metadata.getScale(colIndex);
+
+ switch (mysqlType) {
+ case MYSQL_BIT:
+ return DataTypes.BOOLEAN();
+ case MYSQL_TINYBLOB:
+ case MYSQL_MEDIUMBLOB:
+ case MYSQL_BLOB:
+ case MYSQL_LONGBLOB:
+ case MYSQL_VARBINARY:
+ case MYSQL_BINARY:
+ return DataTypes.BYTES();
+ case MYSQL_TINYINT:
+ return DataTypes.TINYINT();
+ case MYSQL_TINYINT_UNSIGNED:
+ case MYSQL_SMALLINT:
+ return DataTypes.SMALLINT();
+ case MYSQL_SMALLINT_UNSIGNED:
+ case MYSQL_MEDIUMINT:
+ case MYSQL_MEDIUMINT_UNSIGNED:
+ case MYSQL_INT:
+ case MYSQL_INTEGER:
+ return DataTypes.INT();
+ case MYSQL_INT_UNSIGNED:
+ case MYSQL_INTEGER_UNSIGNED:
+ case MYSQL_BIGINT:
+ return DataTypes.BIGINT();
+ case MYSQL_BIGINT_UNSIGNED:
+ return DataTypes.DECIMAL(20, 0);
+ case MYSQL_DECIMAL:
+ return DataTypes.DECIMAL(precision, scale);
+ case MYSQL_DECIMAL_UNSIGNED:
+ checkMaxPrecision(tablePath, columnName, precision);
+ return DataTypes.DECIMAL(precision + 1, scale);
+ case MYSQL_FLOAT:
+ return DataTypes.FLOAT();
+ case MYSQL_FLOAT_UNSIGNED:
+ LOG.warn("{} will probably cause value overflow.",
MYSQL_FLOAT_UNSIGNED);
+ return DataTypes.FLOAT();
+ case MYSQL_DOUBLE:
+ return DataTypes.DOUBLE();
+ case MYSQL_DOUBLE_UNSIGNED:
+ LOG.warn("{} will probably cause value overflow.",
MYSQL_DOUBLE_UNSIGNED);
+ return DataTypes.DOUBLE();
+ case MYSQL_CHAR:
+ case MYSQL_VARCHAR:
+ case MYSQL_TINYTEXT:
+ case MYSQL_MEDIUMTEXT:
+ case MYSQL_TEXT:
+ case MYSQL_JSON:
+ return DataTypes.STRING();
+ case MYSQL_LONGTEXT:
+ LOG.warn(
+ "The max precision of type '{}' in mysql is 536870911,
and the max "
+ + "precision here has to be set as 2147483647
due to the "
+ + "limitation of the flink sql types system.",
+ MYSQL_LONGTEXT);
+ return DataTypes.STRING();
+ case MYSQL_YEAR:
+ LOG.warn(
+ "The type {} in mysql catalog is supported in
read-mode, "
+ + "but not in write-mode.",
+ MYSQL_YEAR);
+ return DataTypes.DATE();
+ case MYSQL_DATE:
+ return DataTypes.DATE();
+ case MYSQL_TIME:
+ return isExplicitPrecision(precision, RAW_TIME_LENGTH)
+ ? DataTypes.TIME(precision - RAW_TIME_LENGTH - 1)
+ : DataTypes.TIME(0);
+ case MYSQL_DATETIME:
+ case MYSQL_TIMESTAMP:
+ return isExplicitPrecision(precision, RAW_TIMESTAMP_LENGTH)
+ ? DataTypes.TIMESTAMP(precision - RAW_TIMESTAMP_LENGTH
- 1)
+ : DataTypes.TIMESTAMP(0);
+ case MYSQL_GEOMETRY:
+ LOG.warn(
+ "{} type in mysql catalog is supported in read-mode by
the form of bytes,"
+ + " but not in write-mode.",
+ MYSQL_GEOMETRY);
+ return DataTypes.BYTES();
+ case MYSQL_UNKNOWN:
+ return fromJDBCClassType(tablePath, metadata, colIndex);
+ default:
+ throw new UnsupportedOperationException(
+ String.format(
+ "Doesn't support mysql type '%s' in mysql
version %s, driver version %s yet.",
Review comment:
Good catch. I modified it.
##########
File path:
flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySQLCatalogTest.java
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.catalog;
+
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/** Test for {@link MySQLCatalog}. */
+public class MySQLCatalogTest extends MySQLCatalogTestBase {
Review comment:
@Airblader , Obviously, you're right. I moved the test cases into
`MySQLCatalogITCase` and deleted `MySQLCatalogTest`.
##########
File path:
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/MySQLCatalog.java
##########
@@ -0,0 +1,520 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.compress.utils.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.PASSWORD;
+import static
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.TABLE_NAME;
+import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.URL;
+import static
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.USERNAME;
+import static
org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory.IDENTIFIER;
+import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
+
+/** Catalog for MySQL. */
+@Internal
+public class MySQLCatalog extends AbstractJdbcCatalog {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(MySQLCatalog.class);
+
+ private final String databaseVersion;
+ private final String driverVersion;
+
+ // ============================data types=====================
+
+ public static final String MYSQL_UNKNOWN = "UNKNOWN";
+ public static final String MYSQL_BIT = "BIT";
+
+ // -------------------------number----------------------------
+ public static final String MYSQL_TINYINT = "TINYINT";
+ public static final String MYSQL_TINYINT_UNSIGNED = "TINYINT UNSIGNED";
+ public static final String MYSQL_SMALLINT = "SMALLINT";
+ public static final String MYSQL_SMALLINT_UNSIGNED = "SMALLINT UNSIGNED";
+ public static final String MYSQL_MEDIUMINT = "MEDIUMINT";
+ public static final String MYSQL_MEDIUMINT_UNSIGNED = "MEDIUMINT UNSIGNED";
+ public static final String MYSQL_INT = "INT";
+ public static final String MYSQL_INT_UNSIGNED = "INT UNSIGNED";
+ public static final String MYSQL_INTEGER = "INTEGER";
+ public static final String MYSQL_INTEGER_UNSIGNED = "INTEGER UNSIGNED";
+ public static final String MYSQL_BIGINT = "BIGINT";
+ public static final String MYSQL_BIGINT_UNSIGNED = "BIGINT UNSIGNED";
+ public static final String MYSQL_DECIMAL = "DECIMAL";
+ public static final String MYSQL_DECIMAL_UNSIGNED = "DECIMAL UNSIGNED";
+ public static final String MYSQL_FLOAT = "FLOAT";
+ public static final String MYSQL_FLOAT_UNSIGNED = "FLOAT UNSIGNED";
+ public static final String MYSQL_DOUBLE = "DOUBLE";
+ public static final String MYSQL_DOUBLE_UNSIGNED = "DOUBLE UNSIGNED";
+
+ // -------------------------string----------------------------
+ public static final String MYSQL_CHAR = "CHAR";
+ public static final String MYSQL_VARCHAR = "VARCHAR";
+ public static final String MYSQL_TINYTEXT = "TINYTEXT";
+ public static final String MYSQL_MEDIUMTEXT = "MEDIUMTEXT";
+ public static final String MYSQL_TEXT = "TEXT";
+ public static final String MYSQL_LONGTEXT = "LONGTEXT";
+ public static final String MYSQL_JSON = "JSON";
+
+ // ------------------------------time-------------------------
+ public static final String MYSQL_DATE = "DATE";
+ public static final String MYSQL_DATETIME = "DATETIME";
+ public static final String MYSQL_TIME = "TIME";
+ public static final String MYSQL_TIMESTAMP = "TIMESTAMP";
+ public static final String MYSQL_YEAR = "YEAR";
+
+ // ------------------------------blob-------------------------
+ public static final String MYSQL_TINYBLOB = "TINYBLOB";
+ public static final String MYSQL_MEDIUMBLOB = "MEDIUMBLOB";
+ public static final String MYSQL_BLOB = "BLOB";
+ public static final String MYSQL_LONGBLOB = "LONGBLOB";
+ public static final String MYSQL_BINARY = "BINARY";
+ public static final String MYSQL_VARBINARY = "VARBINARY";
+ public static final String MYSQL_GEOMETRY = "GEOMETRY";
+
+ // column class names
+ public static final String COLUMN_CLASS_BOOLEAN = "java.lang.Boolean";
+ public static final String COLUMN_CLASS_INTEGER = "java.lang.Integer";
+ public static final String COLUMN_CLASS_BIG_INTEGER =
"java.math.BigInteger";
+ public static final String COLUMN_CLASS_LONG = "java.lang.Long";
+ public static final String COLUMN_CLASS_FLOAT = "java.lang.Float";
+ public static final String COLUMN_CLASS_DOUBLE = "java.lang.Double";
+ public static final String COLUMN_CLASS_BIG_DECIMAL =
"java.math.BigDecimal";
+ public static final String COLUMN_CLASS_BYTE_ARRAY = "[B";
+ public static final String COLUMN_CLASS_STRING = "java.lang.String";
+ public static final String COLUMN_CLASS_DATE = "java.sql.Date";
+ public static final String COLUMN_CLASS_TIME = "java.sql.Time";
+ public static final String COLUMN_CLASS_TIMESTAMP = "java.sql.Timestamp";
+
+ public static final int RAW_TIME_LENGTH = 10;
+ public static final int RAW_TIMESTAMP_LENGTH = 19;
+
+ private static final Set<String> builtinDatabases =
+ new HashSet<String>() {
+ {
+ add("information_schema");
+ add("mysql");
+ add("performance_schema");
+ add("sys");
+ }
+ };
+
+ public MySQLCatalog(
+ String catalogName,
+ String defaultDatabase,
+ String username,
+ String pwd,
+ String baseUrl) {
+ super(catalogName, defaultDatabase, username, pwd, baseUrl);
+ this.driverVersion =
+ Preconditions.checkNotNull(getDriverVersion(), "driver version
must not be null.");
+ this.databaseVersion =
+ Preconditions.checkNotNull(
+ getDatabaseVersion(), "database version must not be
null.");
+ LOG.info("Driver version: {}, database version: {}", driverVersion,
databaseVersion);
+ }
+
+ @Override
+ public List<String> listDatabases() throws CatalogException {
+ String sql = "SELECT `SCHEMA_NAME` FROM
`INFORMATION_SCHEMA`.`SCHEMATA`;";
+ return extractColumnValuesBySQL(
+ defaultUrl,
+ sql,
+ 1,
+ (FilterFunction<String>) dbName ->
!builtinDatabases.contains(dbName));
+ }
+
+ @Override
+ public CatalogDatabase getDatabase(String databaseName)
+ throws DatabaseNotExistException, CatalogException {
+ Preconditions.checkState(
+ StringUtils.isNotBlank(databaseName), "Database name must not
be blank.");
+ if (listDatabases().contains(databaseName)) {
+ return new CatalogDatabaseImpl(Collections.emptyMap(), null);
+ } else {
+ throw new DatabaseNotExistException(getName(), databaseName);
+ }
+ }
+
+ @Override
+ public List<String> listTables(String databaseName)
+ throws DatabaseNotExistException, CatalogException {
+ Preconditions.checkState(
+ StringUtils.isNotBlank(databaseName), "Database name must not
be blank.");
+ if (!databaseExists(databaseName)) {
+ throw new DatabaseNotExistException(getName(), databaseName);
+ }
+ String connUrl = baseUrl + databaseName;
+ return extractColumnValuesBySQL(
+ connUrl,
+ "SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE
TABLE_SCHEMA = ?",
+ 1,
+ null,
+ databaseName);
+ }
+
+ // ------ retrieve PK constraint ------
+
+ private Optional<UniqueConstraint> getPrimaryKey(
+ DatabaseMetaData metaData, String schema, ObjectPath table) throws
SQLException {
+
+ // According to the Javadoc of
java.sql.DatabaseMetaData#getPrimaryKeys,
+ // the returned primary key columns are ordered by COLUMN_NAME, not by
KEY_SEQ.
+ // We need to sort them based on the KEY_SEQ value.
+ ResultSet rs =
+ metaData.getPrimaryKeys(table.getDatabaseName(), schema,
table.getObjectName());
+
+ Map<Integer, String> keySeqColumnName = new HashMap<>();
+ String pkName = null;
+ while (rs.next()) {
+ String columnName = rs.getString("COLUMN_NAME");
+ pkName = rs.getString("PK_NAME"); // all the PK_NAME should be the
same
+ int keySeq = rs.getInt("KEY_SEQ");
+ keySeqColumnName.put(keySeq - 1, columnName); // KEY_SEQ is
1-based index
+ }
+ List<String> pkFields =
+ Arrays.asList(new String[keySeqColumnName.size()]); //
initialize size
+ keySeqColumnName.forEach(pkFields::set);
+ if (!pkFields.isEmpty()) {
+ // PK_NAME maybe null according to the javadoc, generate an unique
name in that case
+ pkName = pkName == null ? "pk_" + String.join("_", pkFields) :
pkName;
+ return Optional.of(UniqueConstraint.primaryKey(pkName, pkFields));
+ }
+ return Optional.empty();
+ }
+
+ @Override
+ public CatalogBaseTable getTable(ObjectPath tablePath)
+ throws TableNotExistException, CatalogException {
+ if (!tableExists(tablePath)) {
+ throw new TableNotExistException(getName(), tablePath);
+ }
+ try (Connection conn =
+ DriverManager.getConnection(
+ baseUrl + tablePath.getDatabaseName(),
username, pwd);
+ PreparedStatement ps =
+ conn.prepareStatement("SELECT * FROM " +
tablePath.getObjectName())) {
+
+ DatabaseMetaData metaData = conn.getMetaData();
+ Optional<UniqueConstraint> primaryKey = getPrimaryKey(metaData,
null, tablePath);
+ ResultSetMetaData resultSetMetaData = ps.getMetaData();
+
+ String[] columnNames = new
String[resultSetMetaData.getColumnCount()];
+ DataType[] types = new
DataType[resultSetMetaData.getColumnCount()];
+ for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+ columnNames[i - 1] = resultSetMetaData.getColumnName(i);
+ types[i - 1] = fromJDBCType(tablePath, resultSetMetaData, i);
+ if (resultSetMetaData.isNullable(i) ==
ResultSetMetaData.columnNoNulls) {
+ types[i - 1] = types[i - 1].notNull();
+ }
+ }
+ Schema.Builder schemaBuilder =
Schema.newBuilder().fromFields(columnNames, types);
+ primaryKey.ifPresent(
+ pk -> schemaBuilder.primaryKeyNamed(pk.getName(),
pk.getColumns()));
+ Schema tableSchema = schemaBuilder.build();
+ Map<String, String> props = new HashMap<>();
+ props.put(CONNECTOR.key(), IDENTIFIER);
+ props.put(URL.key(), baseUrl + tablePath.getDatabaseName());
+ props.put(TABLE_NAME.key(), tablePath.getObjectName());
+ props.put(USERNAME.key(), username);
+ props.put(PASSWORD.key(), pwd);
+ return CatalogTable.of(tableSchema, null, Lists.newArrayList(),
props);
+ } catch (Exception e) {
+ throw new CatalogException(
+ String.format("Failed getting table %s",
tablePath.getFullName()), e);
+ }
+ }
+
+ @Override
+ public boolean tableExists(ObjectPath tablePath) throws CatalogException {
+ return !extractColumnValuesBySQL(
+ baseUrl,
+ "SELECT TABLE_NAME FROM information_schema.`TABLES`
WHERE TABLE_SCHEMA=? and TABLE_NAME=?",
+ 1,
+ null,
+ tablePath.getDatabaseName(),
+ tablePath.getObjectName())
+ .isEmpty();
+ }
+
+ private List<String> extractColumnValuesBySQL(
Review comment:
@Airblader Good idea. I updated it.
##########
File path:
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/MySQLCatalog.java
##########
@@ -0,0 +1,520 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.compress.utils.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.PASSWORD;
+import static
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.TABLE_NAME;
+import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.URL;
+import static
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.USERNAME;
+import static
org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory.IDENTIFIER;
+import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
+
+/** Catalog for MySQL. */
+@Internal
+public class MySQLCatalog extends AbstractJdbcCatalog {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(MySQLCatalog.class);
+
+ private final String databaseVersion;
+ private final String driverVersion;
+
+ // ============================data types=====================
+
+ public static final String MYSQL_UNKNOWN = "UNKNOWN";
+ public static final String MYSQL_BIT = "BIT";
+
+ // -------------------------number----------------------------
+ public static final String MYSQL_TINYINT = "TINYINT";
+ public static final String MYSQL_TINYINT_UNSIGNED = "TINYINT UNSIGNED";
+ public static final String MYSQL_SMALLINT = "SMALLINT";
+ public static final String MYSQL_SMALLINT_UNSIGNED = "SMALLINT UNSIGNED";
+ public static final String MYSQL_MEDIUMINT = "MEDIUMINT";
+ public static final String MYSQL_MEDIUMINT_UNSIGNED = "MEDIUMINT UNSIGNED";
+ public static final String MYSQL_INT = "INT";
+ public static final String MYSQL_INT_UNSIGNED = "INT UNSIGNED";
+ public static final String MYSQL_INTEGER = "INTEGER";
+ public static final String MYSQL_INTEGER_UNSIGNED = "INTEGER UNSIGNED";
+ public static final String MYSQL_BIGINT = "BIGINT";
+ public static final String MYSQL_BIGINT_UNSIGNED = "BIGINT UNSIGNED";
+ public static final String MYSQL_DECIMAL = "DECIMAL";
+ public static final String MYSQL_DECIMAL_UNSIGNED = "DECIMAL UNSIGNED";
+ public static final String MYSQL_FLOAT = "FLOAT";
+ public static final String MYSQL_FLOAT_UNSIGNED = "FLOAT UNSIGNED";
+ public static final String MYSQL_DOUBLE = "DOUBLE";
+ public static final String MYSQL_DOUBLE_UNSIGNED = "DOUBLE UNSIGNED";
+
+ // -------------------------string----------------------------
+ public static final String MYSQL_CHAR = "CHAR";
+ public static final String MYSQL_VARCHAR = "VARCHAR";
+ public static final String MYSQL_TINYTEXT = "TINYTEXT";
+ public static final String MYSQL_MEDIUMTEXT = "MEDIUMTEXT";
+ public static final String MYSQL_TEXT = "TEXT";
+ public static final String MYSQL_LONGTEXT = "LONGTEXT";
+ public static final String MYSQL_JSON = "JSON";
+
+ // ------------------------------time-------------------------
+ public static final String MYSQL_DATE = "DATE";
+ public static final String MYSQL_DATETIME = "DATETIME";
+ public static final String MYSQL_TIME = "TIME";
+ public static final String MYSQL_TIMESTAMP = "TIMESTAMP";
+ public static final String MYSQL_YEAR = "YEAR";
+
+ // ------------------------------blob-------------------------
+ public static final String MYSQL_TINYBLOB = "TINYBLOB";
+ public static final String MYSQL_MEDIUMBLOB = "MEDIUMBLOB";
+ public static final String MYSQL_BLOB = "BLOB";
+ public static final String MYSQL_LONGBLOB = "LONGBLOB";
+ public static final String MYSQL_BINARY = "BINARY";
+ public static final String MYSQL_VARBINARY = "VARBINARY";
+ public static final String MYSQL_GEOMETRY = "GEOMETRY";
+
+ // column class names
+ public static final String COLUMN_CLASS_BOOLEAN = "java.lang.Boolean";
+ public static final String COLUMN_CLASS_INTEGER = "java.lang.Integer";
+ public static final String COLUMN_CLASS_BIG_INTEGER =
"java.math.BigInteger";
+ public static final String COLUMN_CLASS_LONG = "java.lang.Long";
+ public static final String COLUMN_CLASS_FLOAT = "java.lang.Float";
+ public static final String COLUMN_CLASS_DOUBLE = "java.lang.Double";
+ public static final String COLUMN_CLASS_BIG_DECIMAL =
"java.math.BigDecimal";
+ public static final String COLUMN_CLASS_BYTE_ARRAY = "[B";
+ public static final String COLUMN_CLASS_STRING = "java.lang.String";
+ public static final String COLUMN_CLASS_DATE = "java.sql.Date";
+ public static final String COLUMN_CLASS_TIME = "java.sql.Time";
+ public static final String COLUMN_CLASS_TIMESTAMP = "java.sql.Timestamp";
+
+ public static final int RAW_TIME_LENGTH = 10;
+ public static final int RAW_TIMESTAMP_LENGTH = 19;
+
+ private static final Set<String> builtinDatabases =
+ new HashSet<String>() {
+ {
+ add("information_schema");
+ add("mysql");
+ add("performance_schema");
+ add("sys");
+ }
+ };
+
+ public MySQLCatalog(
+ String catalogName,
+ String defaultDatabase,
+ String username,
+ String pwd,
+ String baseUrl) {
+ super(catalogName, defaultDatabase, username, pwd, baseUrl);
+ this.driverVersion =
+ Preconditions.checkNotNull(getDriverVersion(), "driver version
must not be null.");
+ this.databaseVersion =
+ Preconditions.checkNotNull(
+ getDatabaseVersion(), "database version must not be
null.");
+ LOG.info("Driver version: {}, database version: {}", driverVersion,
databaseVersion);
+ }
+
+ @Override
+ public List<String> listDatabases() throws CatalogException {
+ String sql = "SELECT `SCHEMA_NAME` FROM
`INFORMATION_SCHEMA`.`SCHEMATA`;";
+ return extractColumnValuesBySQL(
+ defaultUrl,
+ sql,
+ 1,
+ (FilterFunction<String>) dbName ->
!builtinDatabases.contains(dbName));
+ }
+
+ @Override
+ public CatalogDatabase getDatabase(String databaseName)
+ throws DatabaseNotExistException, CatalogException {
+ Preconditions.checkState(
+ StringUtils.isNotBlank(databaseName), "Database name must not
be blank.");
+ if (listDatabases().contains(databaseName)) {
+ return new CatalogDatabaseImpl(Collections.emptyMap(), null);
+ } else {
+ throw new DatabaseNotExistException(getName(), databaseName);
+ }
+ }
+
+ @Override
+ public List<String> listTables(String databaseName)
+ throws DatabaseNotExistException, CatalogException {
+ Preconditions.checkState(
+ StringUtils.isNotBlank(databaseName), "Database name must not
be blank.");
+ if (!databaseExists(databaseName)) {
+ throw new DatabaseNotExistException(getName(), databaseName);
+ }
+ String connUrl = baseUrl + databaseName;
+ return extractColumnValuesBySQL(
+ connUrl,
+ "SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE
TABLE_SCHEMA = ?",
+ 1,
+ null,
+ databaseName);
+ }
+
+ // ------ retrieve PK constraint ------
+
+ private Optional<UniqueConstraint> getPrimaryKey(
+ DatabaseMetaData metaData, String schema, ObjectPath table) throws
SQLException {
+
+ // According to the Javadoc of
java.sql.DatabaseMetaData#getPrimaryKeys,
+ // the returned primary key columns are ordered by COLUMN_NAME, not by
KEY_SEQ.
+ // We need to sort them based on the KEY_SEQ value.
+ ResultSet rs =
+ metaData.getPrimaryKeys(table.getDatabaseName(), schema,
table.getObjectName());
+
+ Map<Integer, String> keySeqColumnName = new HashMap<>();
+ String pkName = null;
+ while (rs.next()) {
+ String columnName = rs.getString("COLUMN_NAME");
+ pkName = rs.getString("PK_NAME"); // all the PK_NAME should be the
same
+ int keySeq = rs.getInt("KEY_SEQ");
+ keySeqColumnName.put(keySeq - 1, columnName); // KEY_SEQ is
1-based index
+ }
+ List<String> pkFields =
+ Arrays.asList(new String[keySeqColumnName.size()]); //
initialize size
+ keySeqColumnName.forEach(pkFields::set);
+ if (!pkFields.isEmpty()) {
+ // PK_NAME maybe null according to the javadoc, generate an unique
name in that case
+ pkName = pkName == null ? "pk_" + String.join("_", pkFields) :
pkName;
+ return Optional.of(UniqueConstraint.primaryKey(pkName, pkFields));
+ }
+ return Optional.empty();
+ }
+
+ @Override
+ public CatalogBaseTable getTable(ObjectPath tablePath)
+ throws TableNotExistException, CatalogException {
+ if (!tableExists(tablePath)) {
+ throw new TableNotExistException(getName(), tablePath);
+ }
+ try (Connection conn =
+ DriverManager.getConnection(
+ baseUrl + tablePath.getDatabaseName(),
username, pwd);
+ PreparedStatement ps =
+ conn.prepareStatement("SELECT * FROM " +
tablePath.getObjectName())) {
+
+ DatabaseMetaData metaData = conn.getMetaData();
+ Optional<UniqueConstraint> primaryKey = getPrimaryKey(metaData,
null, tablePath);
+ ResultSetMetaData resultSetMetaData = ps.getMetaData();
+
+ String[] columnNames = new
String[resultSetMetaData.getColumnCount()];
+ DataType[] types = new
DataType[resultSetMetaData.getColumnCount()];
+ for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+ columnNames[i - 1] = resultSetMetaData.getColumnName(i);
+ types[i - 1] = fromJDBCType(tablePath, resultSetMetaData, i);
+ if (resultSetMetaData.isNullable(i) ==
ResultSetMetaData.columnNoNulls) {
+ types[i - 1] = types[i - 1].notNull();
+ }
+ }
+ Schema.Builder schemaBuilder =
Schema.newBuilder().fromFields(columnNames, types);
+ primaryKey.ifPresent(
+ pk -> schemaBuilder.primaryKeyNamed(pk.getName(),
pk.getColumns()));
+ Schema tableSchema = schemaBuilder.build();
+ Map<String, String> props = new HashMap<>();
+ props.put(CONNECTOR.key(), IDENTIFIER);
+ props.put(URL.key(), baseUrl + tablePath.getDatabaseName());
+ props.put(TABLE_NAME.key(), tablePath.getObjectName());
+ props.put(USERNAME.key(), username);
+ props.put(PASSWORD.key(), pwd);
+ return CatalogTable.of(tableSchema, null, Lists.newArrayList(),
props);
+ } catch (Exception e) {
+ throw new CatalogException(
+ String.format("Failed getting table %s",
tablePath.getFullName()), e);
+ }
+ }
+
+ @Override
+ public boolean tableExists(ObjectPath tablePath) throws CatalogException {
+ return !extractColumnValuesBySQL(
+ baseUrl,
+ "SELECT TABLE_NAME FROM information_schema.`TABLES`
WHERE TABLE_SCHEMA=? and TABLE_NAME=?",
+ 1,
+ null,
+ tablePath.getDatabaseName(),
+ tablePath.getObjectName())
+ .isEmpty();
+ }
+
+ private List<String> extractColumnValuesBySQL(
+ String connUrl,
+ String sql,
+ int columnIndex,
+ FilterFunction<String> filterFunc,
+ Object... params) {
+ List<String> columnValues = Lists.newArrayList();
+ try (Connection conn = DriverManager.getConnection(connUrl, username,
pwd);
+ PreparedStatement ps = conn.prepareStatement(sql)) {
+ if (Objects.nonNull(params) && params.length > 0) {
+ for (int i = 0; i < params.length; i++) {
+ ps.setObject(i + 1, params[i]);
+ }
+ }
+ ResultSet rs = ps.executeQuery();
+ while (rs.next()) {
+ String columnValue = rs.getString(columnIndex);
+ if (Objects.isNull(filterFunc) ||
filterFunc.filter(columnValue)) {
+ columnValues.add(columnValue);
+ }
+ }
+ return columnValues;
+ } catch (Exception e) {
+ throw new CatalogException(
+ String.format("Failed in processing query sql %s, connUrl
%s", sql, connUrl),
+ e);
+ }
+ }
+
+ private String getDatabaseVersion() {
+ try (Connection conn = DriverManager.getConnection(defaultUrl,
username, pwd)) {
+ return conn.getMetaData().getDatabaseProductVersion();
+ } catch (Exception e) {
+ throw new CatalogException(
+ String.format("Failed in getting MySQL version by %s.",
defaultUrl), e);
+ }
+ }
+
+ private String getDriverVersion() {
+ try (Connection conn = DriverManager.getConnection(defaultUrl,
username, pwd)) {
+ String driverVersion = conn.getMetaData().getDriverVersion();
+ Pattern regexp = Pattern.compile("\\d+?\\.\\d+?\\.\\d+");
+ Matcher matcher = regexp.matcher(driverVersion);
+ return matcher.find() ? matcher.group(0) : null;
+ } catch (Exception e) {
+ throw new CatalogException(
+ String.format("Failed in getting mysql driver version by
%s.", defaultUrl), e);
+ }
+ }
+
+ /** Converts MySQL type to Flink {@link DataType}. */
+ private DataType fromJDBCType(ObjectPath tablePath, ResultSetMetaData
metadata, int colIndex)
+ throws SQLException {
+ String mysqlType = metadata.getColumnTypeName(colIndex).toUpperCase();
+ String columnName = metadata.getColumnName(colIndex);
+ int precision = metadata.getPrecision(colIndex);
+ int scale = metadata.getScale(colIndex);
+
+ switch (mysqlType) {
+ case MYSQL_BIT:
+ return DataTypes.BOOLEAN();
+ case MYSQL_TINYBLOB:
+ case MYSQL_MEDIUMBLOB:
+ case MYSQL_BLOB:
+ case MYSQL_LONGBLOB:
+ case MYSQL_VARBINARY:
+ case MYSQL_BINARY:
+ return DataTypes.BYTES();
+ case MYSQL_TINYINT:
+ return DataTypes.TINYINT();
+ case MYSQL_TINYINT_UNSIGNED:
+ case MYSQL_SMALLINT:
+ return DataTypes.SMALLINT();
+ case MYSQL_SMALLINT_UNSIGNED:
+ case MYSQL_MEDIUMINT:
+ case MYSQL_MEDIUMINT_UNSIGNED:
+ case MYSQL_INT:
+ case MYSQL_INTEGER:
+ return DataTypes.INT();
+ case MYSQL_INT_UNSIGNED:
+ case MYSQL_INTEGER_UNSIGNED:
+ case MYSQL_BIGINT:
+ return DataTypes.BIGINT();
+ case MYSQL_BIGINT_UNSIGNED:
+ return DataTypes.DECIMAL(20, 0);
+ case MYSQL_DECIMAL:
+ return DataTypes.DECIMAL(precision, scale);
+ case MYSQL_DECIMAL_UNSIGNED:
+ checkMaxPrecision(tablePath, columnName, precision);
+ return DataTypes.DECIMAL(precision + 1, scale);
+ case MYSQL_FLOAT:
+ return DataTypes.FLOAT();
+ case MYSQL_FLOAT_UNSIGNED:
+ LOG.warn("{} will probably cause value overflow.",
MYSQL_FLOAT_UNSIGNED);
+ return DataTypes.FLOAT();
+ case MYSQL_DOUBLE:
+ return DataTypes.DOUBLE();
+ case MYSQL_DOUBLE_UNSIGNED:
+ LOG.warn("{} will probably cause value overflow.",
MYSQL_DOUBLE_UNSIGNED);
+ return DataTypes.DOUBLE();
+ case MYSQL_CHAR:
+ case MYSQL_VARCHAR:
+ case MYSQL_TINYTEXT:
+ case MYSQL_MEDIUMTEXT:
+ case MYSQL_TEXT:
+ case MYSQL_JSON:
+ return DataTypes.STRING();
+ case MYSQL_LONGTEXT:
+ LOG.warn(
+ "The max precision of type '{}' in mysql is 536870911,
and the max "
+ + "precision here has to be set as 2147483647
due to the "
+ + "limitation of the flink sql types system.",
+ MYSQL_LONGTEXT);
+ return DataTypes.STRING();
+ case MYSQL_YEAR:
+ LOG.warn(
+ "The type {} in mysql catalog is supported in
read-mode, "
+ + "but not in write-mode.",
+ MYSQL_YEAR);
Review comment:
done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]