This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 35771549 [improve] support modify column type/comment, support auto 
create database if not exists (#408)
35771549 is described below

commit 35771549180bed4981689071de000fd6e2b9e080
Author: North Lin <37775475+qg-...@users.noreply.github.com>
AuthorDate: Wed Jun 26 14:19:51 2024 +0800

    [improve] support modify column type/comment, support auto create database 
if not exists (#408)
---
 .../flink/sink/schema/SchemaChangeHelper.java      |  67 ++++++++++--
 .../flink/sink/schema/SchemaChangeManager.java     |  75 ++++++++++++-
 .../flink/sink/schema/SchemaManagerITCase.java     | 121 +++++++++++++++++----
 3 files changed, 230 insertions(+), 33 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java
index 4c29c348..8d365ffc 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java
@@ -29,6 +29,8 @@ import java.util.Map;
 import java.util.Map.Entry;
 
 public class SchemaChangeHelper {
+    public static final String DEFAULT_DATABASE = "information_schema";
+
     private static final List<String> dropFieldSchemas = Lists.newArrayList();
     private static final List<FieldSchema> addFieldSchemas = 
Lists.newArrayList();
     // Used to determine whether the doris table supports ddl
@@ -38,6 +40,11 @@ public class SchemaChangeHelper {
     private static final String RENAME_DDL = "ALTER TABLE %s RENAME COLUMN %s 
%s";
     private static final String CHECK_COLUMN_EXISTS =
             "SELECT COLUMN_NAME FROM information_schema.`COLUMNS` WHERE 
TABLE_SCHEMA = '%s' AND TABLE_NAME = '%s' AND COLUMN_NAME = '%s'";
+    private static final String CHECK_DATABASE_EXISTS =
+            "SELECT `SCHEMA_NAME` FROM `INFORMATION_SCHEMA`.`SCHEMATA` WHERE 
SCHEMA_NAME = '%s'";
+    private static final String CREATE_DATABASE_DDL = "CREATE DATABASE IF NOT 
EXISTS %s";
+    private static final String MODIFY_TYPE_DDL = "ALTER TABLE %s MODIFY 
COLUMN %s %s";
+    private static final String MODIFY_COMMENT_DDL = "ALTER TABLE %s MODIFY 
COLUMN %s COMMENT '%s'";
 
     public static void compareSchema(
             Map<String, FieldSchema> updateFiledSchemaMap,
@@ -104,19 +111,18 @@ public class SchemaChangeHelper {
         String type = fieldSchema.getTypeString();
         String defaultValue = fieldSchema.getDefaultValue();
         String comment = fieldSchema.getComment();
-        String addDDL =
-                String.format(
-                        ADD_DDL,
-                        DorisSystem.quoteTableIdentifier(tableIdentifier),
-                        DorisSystem.identifier(name),
-                        type);
+        StringBuilder addDDL =
+                new StringBuilder(
+                        String.format(
+                                ADD_DDL,
+                                
DorisSystem.quoteTableIdentifier(tableIdentifier),
+                                DorisSystem.identifier(name),
+                                type));
         if (defaultValue != null) {
-            addDDL = addDDL + " DEFAULT " + 
DorisSystem.quoteDefaultValue(defaultValue);
-        }
-        if (!StringUtils.isNullOrWhitespaceOnly(comment)) {
-            addDDL = addDDL + " COMMENT '" + DorisSystem.quoteComment(comment) 
+ "'";
+            addDDL.append(" DEFAULT 
").append(DorisSystem.quoteDefaultValue(defaultValue));
         }
-        return addDDL;
+        commentColumn(addDDL, comment);
+        return addDDL.toString();
     }
 
     public static String buildDropColumnDDL(String tableIdentifier, String 
columName) {
@@ -139,6 +145,45 @@ public class SchemaChangeHelper {
         return String.format(CHECK_COLUMN_EXISTS, database, table, column);
     }
 
+    public static String buildDatabaseExistsQuery(String database) {
+        return String.format(CHECK_DATABASE_EXISTS, database);
+    }
+
+    public static String buildCreateDatabaseDDL(String database) {
+        return String.format(CREATE_DATABASE_DDL, database);
+    }
+
+    public static String buildModifyColumnCommentDDL(
+            String tableIdentifier, String columnName, String newComment) {
+        return String.format(
+                MODIFY_COMMENT_DDL,
+                DorisSystem.quoteTableIdentifier(tableIdentifier),
+                DorisSystem.identifier(columnName),
+                DorisSystem.quoteComment(newComment));
+    }
+
+    public static String buildModifyColumnDataTypeDDL(
+            String tableIdentifier, FieldSchema fieldSchema) {
+        String columnName = fieldSchema.getName();
+        String dataType = fieldSchema.getTypeString();
+        String comment = fieldSchema.getComment();
+        StringBuilder modifyDDL =
+                new StringBuilder(
+                        String.format(
+                                MODIFY_TYPE_DDL,
+                                
DorisSystem.quoteTableIdentifier(tableIdentifier),
+                                DorisSystem.identifier(columnName),
+                                dataType));
+        commentColumn(modifyDDL, comment);
+        return modifyDDL.toString();
+    }
+
+    private static void commentColumn(StringBuilder ddl, String comment) {
+        if (!StringUtils.isNullOrWhitespaceOnly(comment)) {
+            ddl.append(" COMMENT 
'").append(DorisSystem.quoteComment(comment)).append("'");
+        }
+    }
+
     public static List<DDLSchema> getDdlSchemas() {
         return ddlSchemas;
     }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java
index d2bacf26..27f2aece 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java
@@ -30,6 +30,8 @@ import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.exception.DorisSchemaChangeException;
 import org.apache.doris.flink.exception.IllegalArgumentException;
 import org.apache.doris.flink.rest.RestService;
+import org.apache.doris.flink.rest.models.Field;
+import org.apache.doris.flink.rest.models.Schema;
 import org.apache.doris.flink.sink.HttpGetWithEntity;
 import org.apache.http.HttpHeaders;
 import org.apache.http.client.methods.CloseableHttpResponse;
@@ -47,6 +49,7 @@ import java.io.Serializable;
 import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 
 public class SchemaChangeManager implements Serializable {
     private static final long serialVersionUID = 1L;
@@ -68,6 +71,12 @@ public class SchemaChangeManager implements Serializable {
     }
 
     public boolean createTable(TableSchema table) throws IOException, 
IllegalArgumentException {
+        // auto create database if not exists
+        if (!checkDatabaseExists(table.getDatabase())) {
+            execute(
+                    
SchemaChangeHelper.buildCreateDatabaseDDL(table.getDatabase()),
+                    SchemaChangeHelper.DEFAULT_DATABASE);
+        }
         String createTableDDL = DorisSystem.buildCreateTableDDL(table);
         return execute(createTableDDL, table.getDatabase());
     }
@@ -109,6 +118,54 @@ public class SchemaChangeManager implements Serializable {
                 database, table, buildRequestParam(true, oldColumnName), 
renameColumnDDL);
     }
 
+    public boolean modifyColumnDataType(String database, String table, 
FieldSchema field)
+            throws IOException, IllegalArgumentException {
+        if (!checkColumnExists(database, table, field.getName())) {
+            LOG.warn(
+                    "The column {} is not exists in table {}, can not modify 
it type",
+                    field.getName(),
+                    table);
+            return false;
+        }
+        // If user does not give a comment, need fill it from
+        // original table schema to avoid miss comment
+        if (StringUtils.isNullOrWhitespaceOnly(field.getComment())) {
+            Schema tableSchema = getTableSchema(database, table);
+            Optional<Field> originalField =
+                    tableSchema.getProperties().stream()
+                            .filter(column -> 
column.getName().equals(field.getName()))
+                            .findAny();
+            originalField.ifPresent(oldField -> 
field.setComment(oldField.getComment()));
+        }
+        String tableIdentifier = getTableIdentifier(database, table);
+        String modifyColumnDDL =
+                
SchemaChangeHelper.buildModifyColumnDataTypeDDL(tableIdentifier, field);
+        return schemaChange(
+                database, table, buildRequestParam(false, field.getName()), 
modifyColumnDDL);
+    }
+
+    public boolean modifyColumnComment(
+            String database, String table, String columnName, String 
newComment)
+            throws IOException, IllegalArgumentException {
+        if (!checkColumnExists(database, table, columnName)) {
+            LOG.warn(
+                    "The column {} is not exists in table {}, can not modify 
it's comment",
+                    columnName,
+                    table);
+            return false;
+        }
+        String tableIdentifier = getTableIdentifier(database, table);
+        String modifyColumnCommentDDL =
+                SchemaChangeHelper.buildModifyColumnCommentDDL(
+                        tableIdentifier, columnName, newComment);
+        return schemaChange(
+                database, table, buildRequestParam(false, columnName), 
modifyColumnCommentDDL);
+    }
+
+    public Schema getTableSchema(String database, String table) {
+        return RestService.getSchema(dorisOptions, database, table, LOG);
+    }
+
     public boolean schemaChange(
             String database, String table, Map<String, Object> params, String 
sql)
             throws IOException, IllegalArgumentException {
@@ -215,7 +272,18 @@ public class SchemaChangeManager implements Serializable {
     public boolean checkColumnExists(String database, String table, String 
columnName)
             throws IllegalArgumentException, IOException {
         String existsQuery = 
SchemaChangeHelper.buildColumnExistsQuery(database, table, columnName);
-        HttpPost httpPost = buildHttpPost(existsQuery, database);
+        return sendHttpPostRequest(existsQuery, database);
+    }
+
+    private boolean checkDatabaseExists(String database)
+            throws IllegalArgumentException, IOException {
+        String existsQuery = 
SchemaChangeHelper.buildDatabaseExistsQuery(database);
+        return sendHttpPostRequest(existsQuery, 
SchemaChangeHelper.DEFAULT_DATABASE);
+    }
+
+    private boolean sendHttpPostRequest(String sql, String database)
+            throws IOException, IllegalArgumentException {
+        HttpPost httpPost = buildHttpPost(sql, database);
         try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
             CloseableHttpResponse response = httpclient.execute(httpPost);
             final int statusCode = response.getStatusLine().getStatusCode();
@@ -231,7 +299,10 @@ public class SchemaChangeManager implements Serializable {
                 }
             }
         } catch (Exception e) {
-            LOG.error("check column exist request error {}, default return 
false", e.getMessage());
+            LOG.error(
+                    "send http post request error {}, default return false, 
SQL:{}",
+                    e.getMessage(),
+                    sql);
         }
         return false;
     }
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerITCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerITCase.java
index 8d2a9b0d..3dde08d5 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerITCase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerITCase.java
@@ -18,9 +18,13 @@
 package org.apache.doris.flink.sink.schema;
 
 import org.apache.doris.flink.DorisTestBase;
+import org.apache.doris.flink.catalog.doris.DataModel;
 import org.apache.doris.flink.catalog.doris.FieldSchema;
+import org.apache.doris.flink.catalog.doris.TableSchema;
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.exception.IllegalArgumentException;
+import org.apache.doris.flink.rest.models.Field;
+import org.apache.doris.flink.rest.models.Schema;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -28,11 +32,13 @@ import org.junit.Test;
 import java.io.IOException;
 import java.sql.Connection;
 import java.sql.DriverManager;
-import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
 
 public class SchemaManagerITCase extends DorisTestBase {
 
@@ -87,7 +93,7 @@ public class SchemaManagerITCase extends DorisTestBase {
 
     @Test
     public void testAddColumnWithChineseComment()
-            throws SQLException, IOException, IllegalArgumentException {
+            throws SQLException, IOException, IllegalArgumentException, 
InterruptedException {
         String addColumnTbls = "add_column";
         initDorisSchemaChangeTable(addColumnTbls);
 
@@ -105,7 +111,7 @@ public class SchemaManagerITCase extends DorisTestBase {
 
     private void addColumnWithChineseCommentAndAssert(
             String tableName, String addColumnName, String chineseComment, 
boolean assertFlag)
-            throws SQLException, IOException, IllegalArgumentException {
+            throws IOException, IllegalArgumentException, InterruptedException 
{
         FieldSchema field = new FieldSchema(addColumnName, "string", 
chineseComment);
         schemaChangeManager.addColumn(DATABASE, tableName, field);
         boolean exists = schemaChangeManager.addColumn(DATABASE, tableName, 
field);
@@ -115,28 +121,31 @@ public class SchemaManagerITCase extends DorisTestBase {
         Assert.assertTrue(exists);
 
         // check Chinese comment
-        Map<String, String> columnComments = getColumnComments(tableName);
+        Thread.sleep(3_000);
+        String comment = getColumnComment(tableName, addColumnName);
         if (assertFlag) {
-            Assert.assertEquals(columnComments.get(addColumnName), 
chineseComment);
+            Assert.assertEquals(comment, chineseComment);
         } else {
-            Assert.assertNotEquals(columnComments.get(addColumnName), 
chineseComment);
+            Assert.assertNotEquals(comment, chineseComment);
         }
     }
 
-    private Map<String, String> getColumnComments(String table) throws 
SQLException {
-        Map<String, String> columnCommentsMap = new HashMap<>();
-        try (Connection connection =
-                DriverManager.getConnection(
-                        String.format(URL, DORIS_CONTAINER.getHost()), 
USERNAME, PASSWORD)) {
-            ResultSet columns = connection.getMetaData().getColumns(null, 
DATABASE, table, null);
-
-            while (columns.next()) {
-                String columnName = columns.getString("COLUMN_NAME");
-                String comment = columns.getString("REMARKS");
-                columnCommentsMap.put(columnName, comment);
-            }
-        }
-        return columnCommentsMap;
+    private String getColumnComment(String table, String columnName) {
+        Schema schema = schemaChangeManager.getTableSchema(DATABASE, table);
+        Optional<Field> first =
+                schema.getProperties().stream()
+                        .filter(col -> col.getName().equals(columnName))
+                        .findFirst();
+        return first.map(Field::getComment).orElse(null);
+    }
+
+    private String getColumnType(String table, String columnName) {
+        Schema schema = schemaChangeManager.getTableSchema(DATABASE, table);
+        Optional<Field> first =
+                schema.getProperties().stream()
+                        .filter(col -> col.getName().equals(columnName))
+                        .findFirst();
+        return first.map(Field::getType).orElse(null);
     }
 
     @Test
@@ -162,4 +171,76 @@ public class SchemaManagerITCase extends DorisTestBase {
         exists = schemaChangeManager.checkColumnExists(DATABASE, 
renameColumnTbls, "age");
         Assert.assertFalse(exists);
     }
+
+    @Test
+    public void testModifyColumnComment()
+            throws SQLException, IOException, IllegalArgumentException {
+        String modifyColumnCommentTbls = "modify_column_comment";
+        initDorisSchemaChangeTable(modifyColumnCommentTbls);
+        String columnName = "age";
+        String newComment = "new comment of age";
+        schemaChangeManager.modifyColumnComment(
+                DATABASE, modifyColumnCommentTbls, columnName, newComment);
+
+        String comment = getColumnComment(modifyColumnCommentTbls, columnName);
+        Assert.assertEquals(newComment, comment);
+    }
+
+    @Test
+    public void testOnlyModifyColumnType()
+            throws SQLException, IOException, IllegalArgumentException, 
InterruptedException {
+        String modifyColumnTbls = "modify_column_type";
+        String columnName = "age";
+        String newColumnType = "bigint";
+        initDorisSchemaChangeTable(modifyColumnTbls);
+        FieldSchema field = new FieldSchema(columnName, newColumnType, "");
+        schemaChangeManager.modifyColumnDataType(DATABASE, modifyColumnTbls, 
field);
+
+        Thread.sleep(3_000);
+        String columnType = getColumnType(modifyColumnTbls, columnName);
+        Assert.assertEquals(newColumnType, columnType.toLowerCase());
+    }
+
+    @Test
+    public void testModifyColumnTypeAndComment()
+            throws SQLException, IOException, IllegalArgumentException, 
InterruptedException {
+        String modifyColumnTbls = "modify_column_type_and_comment";
+        initDorisSchemaChangeTable(modifyColumnTbls);
+        String columnName = "age";
+        String newColumnType = "bigint";
+        String newComment = "new comment of age";
+        FieldSchema field = new FieldSchema(columnName, newColumnType, 
newComment);
+        schemaChangeManager.modifyColumnDataType(DATABASE, modifyColumnTbls, 
field);
+
+        Thread.sleep(3_000);
+        String comment = getColumnComment(modifyColumnTbls, columnName);
+        Assert.assertEquals(newComment, comment);
+
+        String columnType = getColumnType(modifyColumnTbls, columnName);
+        Assert.assertEquals(newColumnType, columnType.toLowerCase());
+    }
+
+    @Test
+    public void testCreateTableWhenDatabaseNotExists()
+            throws IOException, IllegalArgumentException, InterruptedException 
{
+        String databaseName = DATABASE + "_" + Integer.toUnsignedString(new 
Random().nextInt(), 36);
+        String tableName = "auto_create_database";
+
+        TableSchema tableSchema = new TableSchema();
+        tableSchema.setDatabase(databaseName);
+        tableSchema.setTable(tableName);
+        Map<String, FieldSchema> fields = new HashMap<>();
+        fields.put("id", new FieldSchema("id", "varchar(32)", ""));
+        fields.put("age", new FieldSchema("age", "int", ""));
+        tableSchema.setFields(fields);
+        tableSchema.setDistributeKeys(Collections.singletonList("id"));
+        tableSchema.setModel(DataModel.DUPLICATE);
+        Map<String, String> tableProperties = new HashMap<>();
+        tableProperties.put("replication_num", "1");
+        tableSchema.setProperties(tableProperties);
+        schemaChangeManager.createTable(tableSchema);
+
+        Thread.sleep(3_000);
+        Assert.assertNotNull(schemaChangeManager.getTableSchema(databaseName, 
tableName));
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to