This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new 35771549 [improve] support modify column type/comment, support auto create database if not exists (#408) 35771549 is described below commit 35771549180bed4981689071de000fd6e2b9e080 Author: North Lin <37775475+qg-...@users.noreply.github.com> AuthorDate: Wed Jun 26 14:19:51 2024 +0800 [improve] support modify column type/comment, support auto create database if not exists (#408) --- .../flink/sink/schema/SchemaChangeHelper.java | 67 ++++++++++-- .../flink/sink/schema/SchemaChangeManager.java | 75 ++++++++++++- .../flink/sink/schema/SchemaManagerITCase.java | 121 +++++++++++++++++---- 3 files changed, 230 insertions(+), 33 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java index 4c29c348..8d365ffc 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java @@ -29,6 +29,8 @@ import java.util.Map; import java.util.Map.Entry; public class SchemaChangeHelper { + public static final String DEFAULT_DATABASE = "information_schema"; + private static final List<String> dropFieldSchemas = Lists.newArrayList(); private static final List<FieldSchema> addFieldSchemas = Lists.newArrayList(); // Used to determine whether the doris table supports ddl @@ -38,6 +40,11 @@ public class SchemaChangeHelper { private static final String RENAME_DDL = "ALTER TABLE %s RENAME COLUMN %s %s"; private static final String CHECK_COLUMN_EXISTS = "SELECT COLUMN_NAME FROM information_schema.`COLUMNS` WHERE TABLE_SCHEMA = '%s' AND TABLE_NAME = '%s' AND COLUMN_NAME = '%s'"; + private static final String CHECK_DATABASE_EXISTS = + "SELECT `SCHEMA_NAME` FROM `INFORMATION_SCHEMA`.`SCHEMATA` WHERE SCHEMA_NAME = '%s'"; + private static final String CREATE_DATABASE_DDL = "CREATE DATABASE IF NOT EXISTS %s"; + private static final String MODIFY_TYPE_DDL = "ALTER TABLE %s MODIFY COLUMN %s %s"; + private static final String MODIFY_COMMENT_DDL = "ALTER TABLE %s MODIFY COLUMN %s COMMENT '%s'"; public static void compareSchema( Map<String, FieldSchema> updateFiledSchemaMap, @@ -104,19 +111,18 @@ public class SchemaChangeHelper { String type = fieldSchema.getTypeString(); String defaultValue = fieldSchema.getDefaultValue(); String comment = fieldSchema.getComment(); - String addDDL = - String.format( - ADD_DDL, - DorisSystem.quoteTableIdentifier(tableIdentifier), - DorisSystem.identifier(name), - type); + StringBuilder addDDL = + new StringBuilder( + String.format( + ADD_DDL, + DorisSystem.quoteTableIdentifier(tableIdentifier), + DorisSystem.identifier(name), + type)); if (defaultValue != null) { - addDDL = addDDL + " DEFAULT " + DorisSystem.quoteDefaultValue(defaultValue); - } - if (!StringUtils.isNullOrWhitespaceOnly(comment)) { - addDDL = addDDL + " COMMENT '" + DorisSystem.quoteComment(comment) + "'"; + addDDL.append(" DEFAULT ").append(DorisSystem.quoteDefaultValue(defaultValue)); } - return addDDL; + commentColumn(addDDL, comment); + return addDDL.toString(); } public static String buildDropColumnDDL(String tableIdentifier, String columName) { @@ -139,6 +145,45 @@ public class SchemaChangeHelper { return String.format(CHECK_COLUMN_EXISTS, database, table, column); } + public static String buildDatabaseExistsQuery(String database) { + return String.format(CHECK_DATABASE_EXISTS, database); + } + + public static String buildCreateDatabaseDDL(String database) { + return String.format(CREATE_DATABASE_DDL, database); + } + + public static String buildModifyColumnCommentDDL( + String tableIdentifier, String columnName, String newComment) { + return String.format( + MODIFY_COMMENT_DDL, + DorisSystem.quoteTableIdentifier(tableIdentifier), + DorisSystem.identifier(columnName), + DorisSystem.quoteComment(newComment)); + } + + public static String buildModifyColumnDataTypeDDL( + String tableIdentifier, FieldSchema fieldSchema) { + String columnName = fieldSchema.getName(); + String dataType = fieldSchema.getTypeString(); + String comment = fieldSchema.getComment(); + StringBuilder modifyDDL = + new StringBuilder( + String.format( + MODIFY_TYPE_DDL, + DorisSystem.quoteTableIdentifier(tableIdentifier), + DorisSystem.identifier(columnName), + dataType)); + commentColumn(modifyDDL, comment); + return modifyDDL.toString(); + } + + private static void commentColumn(StringBuilder ddl, String comment) { + if (!StringUtils.isNullOrWhitespaceOnly(comment)) { + ddl.append(" COMMENT '").append(DorisSystem.quoteComment(comment)).append("'"); + } + } + public static List<DDLSchema> getDdlSchemas() { return ddlSchemas; } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java index d2bacf26..27f2aece 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java @@ -30,6 +30,8 @@ import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.exception.DorisSchemaChangeException; import org.apache.doris.flink.exception.IllegalArgumentException; import org.apache.doris.flink.rest.RestService; +import org.apache.doris.flink.rest.models.Field; +import org.apache.doris.flink.rest.models.Schema; import org.apache.doris.flink.sink.HttpGetWithEntity; import org.apache.http.HttpHeaders; import org.apache.http.client.methods.CloseableHttpResponse; @@ -47,6 +49,7 @@ import java.io.Serializable; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; +import java.util.Optional; public class SchemaChangeManager implements Serializable { private static final long serialVersionUID = 1L; @@ -68,6 +71,12 @@ public class SchemaChangeManager implements Serializable { } public boolean createTable(TableSchema table) throws IOException, IllegalArgumentException { + // auto create database if not exists + if (!checkDatabaseExists(table.getDatabase())) { + execute( + SchemaChangeHelper.buildCreateDatabaseDDL(table.getDatabase()), + SchemaChangeHelper.DEFAULT_DATABASE); + } String createTableDDL = DorisSystem.buildCreateTableDDL(table); return execute(createTableDDL, table.getDatabase()); } @@ -109,6 +118,54 @@ public class SchemaChangeManager implements Serializable { database, table, buildRequestParam(true, oldColumnName), renameColumnDDL); } + public boolean modifyColumnDataType(String database, String table, FieldSchema field) + throws IOException, IllegalArgumentException { + if (!checkColumnExists(database, table, field.getName())) { + LOG.warn( + "The column {} is not exists in table {}, can not modify it type", + field.getName(), + table); + return false; + } + // If user does not give a comment, need fill it from + // original table schema to avoid miss comment + if (StringUtils.isNullOrWhitespaceOnly(field.getComment())) { + Schema tableSchema = getTableSchema(database, table); + Optional<Field> originalField = + tableSchema.getProperties().stream() + .filter(column -> column.getName().equals(field.getName())) + .findAny(); + originalField.ifPresent(oldField -> field.setComment(oldField.getComment())); + } + String tableIdentifier = getTableIdentifier(database, table); + String modifyColumnDDL = + SchemaChangeHelper.buildModifyColumnDataTypeDDL(tableIdentifier, field); + return schemaChange( + database, table, buildRequestParam(false, field.getName()), modifyColumnDDL); + } + + public boolean modifyColumnComment( + String database, String table, String columnName, String newComment) + throws IOException, IllegalArgumentException { + if (!checkColumnExists(database, table, columnName)) { + LOG.warn( + "The column {} is not exists in table {}, can not modify it's comment", + columnName, + table); + return false; + } + String tableIdentifier = getTableIdentifier(database, table); + String modifyColumnCommentDDL = + SchemaChangeHelper.buildModifyColumnCommentDDL( + tableIdentifier, columnName, newComment); + return schemaChange( + database, table, buildRequestParam(false, columnName), modifyColumnCommentDDL); + } + + public Schema getTableSchema(String database, String table) { + return RestService.getSchema(dorisOptions, database, table, LOG); + } + public boolean schemaChange( String database, String table, Map<String, Object> params, String sql) throws IOException, IllegalArgumentException { @@ -215,7 +272,18 @@ public class SchemaChangeManager implements Serializable { public boolean checkColumnExists(String database, String table, String columnName) throws IllegalArgumentException, IOException { String existsQuery = SchemaChangeHelper.buildColumnExistsQuery(database, table, columnName); - HttpPost httpPost = buildHttpPost(existsQuery, database); + return sendHttpPostRequest(existsQuery, database); + } + + private boolean checkDatabaseExists(String database) + throws IllegalArgumentException, IOException { + String existsQuery = SchemaChangeHelper.buildDatabaseExistsQuery(database); + return sendHttpPostRequest(existsQuery, SchemaChangeHelper.DEFAULT_DATABASE); + } + + private boolean sendHttpPostRequest(String sql, String database) + throws IOException, IllegalArgumentException { + HttpPost httpPost = buildHttpPost(sql, database); try (CloseableHttpClient httpclient = HttpClients.createDefault()) { CloseableHttpResponse response = httpclient.execute(httpPost); final int statusCode = response.getStatusLine().getStatusCode(); @@ -231,7 +299,10 @@ public class SchemaChangeManager implements Serializable { } } } catch (Exception e) { - LOG.error("check column exist request error {}, default return false", e.getMessage()); + LOG.error( + "send http post request error {}, default return false, SQL:{}", + e.getMessage(), + sql); } return false; } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerITCase.java index 8d2a9b0d..3dde08d5 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerITCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerITCase.java @@ -18,9 +18,13 @@ package org.apache.doris.flink.sink.schema; import org.apache.doris.flink.DorisTestBase; +import org.apache.doris.flink.catalog.doris.DataModel; import org.apache.doris.flink.catalog.doris.FieldSchema; +import org.apache.doris.flink.catalog.doris.TableSchema; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.exception.IllegalArgumentException; +import org.apache.doris.flink.rest.models.Field; +import org.apache.doris.flink.rest.models.Schema; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -28,11 +32,13 @@ import org.junit.Test; import java.io.IOException; import java.sql.Connection; import java.sql.DriverManager; -import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Optional; +import java.util.Random; public class SchemaManagerITCase extends DorisTestBase { @@ -87,7 +93,7 @@ public class SchemaManagerITCase extends DorisTestBase { @Test public void testAddColumnWithChineseComment() - throws SQLException, IOException, IllegalArgumentException { + throws SQLException, IOException, IllegalArgumentException, InterruptedException { String addColumnTbls = "add_column"; initDorisSchemaChangeTable(addColumnTbls); @@ -105,7 +111,7 @@ public class SchemaManagerITCase extends DorisTestBase { private void addColumnWithChineseCommentAndAssert( String tableName, String addColumnName, String chineseComment, boolean assertFlag) - throws SQLException, IOException, IllegalArgumentException { + throws IOException, IllegalArgumentException, InterruptedException { FieldSchema field = new FieldSchema(addColumnName, "string", chineseComment); schemaChangeManager.addColumn(DATABASE, tableName, field); boolean exists = schemaChangeManager.addColumn(DATABASE, tableName, field); @@ -115,28 +121,31 @@ public class SchemaManagerITCase extends DorisTestBase { Assert.assertTrue(exists); // check Chinese comment - Map<String, String> columnComments = getColumnComments(tableName); + Thread.sleep(3_000); + String comment = getColumnComment(tableName, addColumnName); if (assertFlag) { - Assert.assertEquals(columnComments.get(addColumnName), chineseComment); + Assert.assertEquals(comment, chineseComment); } else { - Assert.assertNotEquals(columnComments.get(addColumnName), chineseComment); + Assert.assertNotEquals(comment, chineseComment); } } - private Map<String, String> getColumnComments(String table) throws SQLException { - Map<String, String> columnCommentsMap = new HashMap<>(); - try (Connection connection = - DriverManager.getConnection( - String.format(URL, DORIS_CONTAINER.getHost()), USERNAME, PASSWORD)) { - ResultSet columns = connection.getMetaData().getColumns(null, DATABASE, table, null); - - while (columns.next()) { - String columnName = columns.getString("COLUMN_NAME"); - String comment = columns.getString("REMARKS"); - columnCommentsMap.put(columnName, comment); - } - } - return columnCommentsMap; + private String getColumnComment(String table, String columnName) { + Schema schema = schemaChangeManager.getTableSchema(DATABASE, table); + Optional<Field> first = + schema.getProperties().stream() + .filter(col -> col.getName().equals(columnName)) + .findFirst(); + return first.map(Field::getComment).orElse(null); + } + + private String getColumnType(String table, String columnName) { + Schema schema = schemaChangeManager.getTableSchema(DATABASE, table); + Optional<Field> first = + schema.getProperties().stream() + .filter(col -> col.getName().equals(columnName)) + .findFirst(); + return first.map(Field::getType).orElse(null); } @Test @@ -162,4 +171,76 @@ public class SchemaManagerITCase extends DorisTestBase { exists = schemaChangeManager.checkColumnExists(DATABASE, renameColumnTbls, "age"); Assert.assertFalse(exists); } + + @Test + public void testModifyColumnComment() + throws SQLException, IOException, IllegalArgumentException { + String modifyColumnCommentTbls = "modify_column_comment"; + initDorisSchemaChangeTable(modifyColumnCommentTbls); + String columnName = "age"; + String newComment = "new comment of age"; + schemaChangeManager.modifyColumnComment( + DATABASE, modifyColumnCommentTbls, columnName, newComment); + + String comment = getColumnComment(modifyColumnCommentTbls, columnName); + Assert.assertEquals(newComment, comment); + } + + @Test + public void testOnlyModifyColumnType() + throws SQLException, IOException, IllegalArgumentException, InterruptedException { + String modifyColumnTbls = "modify_column_type"; + String columnName = "age"; + String newColumnType = "bigint"; + initDorisSchemaChangeTable(modifyColumnTbls); + FieldSchema field = new FieldSchema(columnName, newColumnType, ""); + schemaChangeManager.modifyColumnDataType(DATABASE, modifyColumnTbls, field); + + Thread.sleep(3_000); + String columnType = getColumnType(modifyColumnTbls, columnName); + Assert.assertEquals(newColumnType, columnType.toLowerCase()); + } + + @Test + public void testModifyColumnTypeAndComment() + throws SQLException, IOException, IllegalArgumentException, InterruptedException { + String modifyColumnTbls = "modify_column_type_and_comment"; + initDorisSchemaChangeTable(modifyColumnTbls); + String columnName = "age"; + String newColumnType = "bigint"; + String newComment = "new comment of age"; + FieldSchema field = new FieldSchema(columnName, newColumnType, newComment); + schemaChangeManager.modifyColumnDataType(DATABASE, modifyColumnTbls, field); + + Thread.sleep(3_000); + String comment = getColumnComment(modifyColumnTbls, columnName); + Assert.assertEquals(newComment, comment); + + String columnType = getColumnType(modifyColumnTbls, columnName); + Assert.assertEquals(newColumnType, columnType.toLowerCase()); + } + + @Test + public void testCreateTableWhenDatabaseNotExists() + throws IOException, IllegalArgumentException, InterruptedException { + String databaseName = DATABASE + "_" + Integer.toUnsignedString(new Random().nextInt(), 36); + String tableName = "auto_create_database"; + + TableSchema tableSchema = new TableSchema(); + tableSchema.setDatabase(databaseName); + tableSchema.setTable(tableName); + Map<String, FieldSchema> fields = new HashMap<>(); + fields.put("id", new FieldSchema("id", "varchar(32)", "")); + fields.put("age", new FieldSchema("age", "int", "")); + tableSchema.setFields(fields); + tableSchema.setDistributeKeys(Collections.singletonList("id")); + tableSchema.setModel(DataModel.DUPLICATE); + Map<String, String> tableProperties = new HashMap<>(); + tableProperties.put("replication_num", "1"); + tableSchema.setProperties(tableProperties); + schemaChangeManager.createTable(tableSchema); + + Thread.sleep(3_000); + Assert.assertNotNull(schemaChangeManager.getTableSchema(databaseName, tableName)); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org