This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d9fd0d3e82 Improve openGauss pg_get_tabledef SQLs append schema for 
migration (#37820)
3d9fd0d3e82 is described below

commit 3d9fd0d3e825de0305cad2bc140c79dd459b95c3
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Fri Jan 23 15:22:25 2026 +0800

    Improve openGauss pg_get_tabledef SQLs append schema for migration (#37820)
    
    * Improve TableExtractor.extractTablesFromSQLStatement: add CREATE TABLE, 
ALTER TABLE, COMMENT
    
    * Improve OpenGaussPipelineSQLBuilder.buildCreateTableSQLs: not append 
schema for ALTER TABLE, do it in PipelineDDLDecorator by SQL AST replace
    
    * Tiny refactoring
    
    * Update test
---
 .../datasource/PipelineJobDataSourcePreparer.java  | 12 +++---
 .../sqlbuilder/OpenGaussPipelineSQLBuilder.java    | 12 ++----
 .../OpenGaussPipelineSQLBuilderTest.java           |  2 +-
 .../statement/core/extractor/TableExtractor.java   | 32 +++++-----------
 .../core/extractor/TableExtractorTest.java         | 44 ++++++++++++++++++----
 .../pipeline/util/PipelineE2EDistSQLFacade.java    |  4 +-
 6 files changed, 59 insertions(+), 47 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparer.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparer.java
index b8a132e589e..a5638f53a61 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparer.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparer.java
@@ -29,11 +29,11 @@ import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.Cr
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetSchemasParameter;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter;
 import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
-import 
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
 import 
org.apache.shardingsphere.database.connector.core.metadata.database.metadata.DialectDatabaseMetaData;
 import 
org.apache.shardingsphere.database.connector.core.spi.DatabaseTypedSPILoader;
 import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
 import 
org.apache.shardingsphere.database.connector.core.type.DatabaseTypeRegistry;
+import 
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
 import org.apache.shardingsphere.infra.parser.SQLParserEngine;
 
@@ -115,13 +115,15 @@ public final class PipelineJobDataSourcePreparer {
         final long startTimeMillis = System.currentTimeMillis();
         PipelineDataSourceManager dataSourceManager = 
param.getDataSourceManager();
         for (CreateTableConfiguration each : 
param.getCreateTableConfigurations()) {
-            try (Connection targetConnection = 
dataSourceManager.getDataSource(each.getTargetDataSourceConfig()).getConnection())
 {
+            try (
+                    Connection targetConnection = 
dataSourceManager.getDataSource(each.getTargetDataSourceConfig()).getConnection();
+                    Statement statement = targetConnection.createStatement()) {
                 List<String> createTargetTableSQL = 
getCreateTargetTableSQL(each, dataSourceManager);
                 for (String sql : createTargetTableSQL) {
                     ShardingSphereMetaData metaData = 
((ShardingSphereConnection) 
targetConnection).getContextManager().getMetaDataContexts().getMetaData();
                     Optional<String> decoratedSQL = 
decorateTargetTableSQL(each, param.getSqlParserEngine(), metaData, 
param.getTargetDatabaseName(), sql);
                     if (decoratedSQL.isPresent()) {
-                        executeTargetTableSQL(targetConnection, 
addIfNotExistsForCreateTableSQL(decoratedSQL.get()));
+                        executeTargetTableSQL(statement, 
addIfNotExistsForCreateTableSQL(decoratedSQL.get()));
                     }
                 }
             }
@@ -146,9 +148,9 @@ public final class PipelineJobDataSourcePreparer {
         return decoratedSQL.map(String::trim).filter(trimmedSql -> 
!Strings.isNullOrEmpty(trimmedSql));
     }
     
-    private void executeTargetTableSQL(final Connection targetConnection, 
final String sql) throws SQLException {
+    private void executeTargetTableSQL(final Statement statement, final String 
sql) throws SQLException {
         log.info("Execute target table SQL: {}", sql);
-        try (Statement statement = targetConnection.createStatement()) {
+        try {
             statement.execute(sql);
         } catch (final SQLException ex) {
             for (String each : 
DatabaseTypedSPILoader.findService(DialectPipelineJobDataSourcePrepareOption.class,
 databaseType)
diff --git 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
index 7bc4e68337a..c06d11f5d95 100644
--- 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
+++ 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
@@ -19,8 +19,8 @@ package 
org.apache.shardingsphere.data.pipeline.opengauss.sqlbuilder;
 
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.CreateTableSQLGenerateException;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
-import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.segment.PipelineSQLSegmentBuilder;
 import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder;
+import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.segment.PipelineSQLSegmentBuilder;
 
 import javax.sql.DataSource;
 import java.sql.Connection;
@@ -77,14 +77,8 @@ public final class OpenGaussPipelineSQLBuilder implements 
DialectPipelineSQLBuil
                 ResultSet resultSet = 
statement.executeQuery(String.format("SELECT * FROM pg_get_tabledef('%s.%s')", 
schemaName, tableName))) {
             if (resultSet.next()) {
                 // TODO use ";" to split is not always correct if return 
value's comments contains ";"
-                Collection<String> defSQLs = 
Arrays.asList(resultSet.getString("pg_get_tabledef").split(";"));
-                return defSQLs.stream().map(sql -> {
-                    String targetPrefix = "ALTER TABLE " + tableName;
-                    if (sql.trim().startsWith(targetPrefix)) {
-                        return sql.replaceFirst(targetPrefix, "ALTER TABLE " + 
schemaName + "." + tableName);
-                    }
-                    return sql;
-                }).collect(Collectors.toList());
+                String tableDefinition = 
resultSet.getString("pg_get_tabledef");
+                return Arrays.asList(tableDefinition.split(";"));
             }
         }
         throw new CreateTableSQLGenerateException(tableName);
diff --git 
a/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilderTest.java
 
b/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilderTest.java
index a9e933ed704..d908c03382c 100644
--- 
a/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilderTest.java
+++ 
b/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilderTest.java
@@ -98,7 +98,7 @@ class OpenGaussPipelineSQLBuilderTest {
         when(resultSet.getString("pg_get_tabledef")).thenReturn("CREATE TABLE 
foo_tbl (id INT PRIMARY KEY);ALTER TABLE foo_tbl OWNER TO root");
         when(connection.createStatement().executeQuery("SELECT * FROM 
pg_get_tabledef('foo_schema.foo_tbl')")).thenReturn(resultSet);
         assertThat(sqlBuilder.buildCreateTableSQLs(new 
MockedDataSource(connection), "foo_schema", "foo_tbl"),
-                is(Arrays.asList("CREATE TABLE foo_tbl (id INT PRIMARY KEY)", 
"ALTER TABLE foo_schema.foo_tbl OWNER TO root")));
+                is(Arrays.asList("CREATE TABLE foo_tbl (id INT PRIMARY KEY)", 
"ALTER TABLE foo_tbl OWNER TO root")));
     }
     
     @Test
diff --git 
a/parser/sql/statement/core/src/main/java/org/apache/shardingsphere/sql/parser/statement/core/extractor/TableExtractor.java
 
b/parser/sql/statement/core/src/main/java/org/apache/shardingsphere/sql/parser/statement/core/extractor/TableExtractor.java
index 63127caef1c..308ac86d705 100644
--- 
a/parser/sql/statement/core/src/main/java/org/apache/shardingsphere/sql/parser/statement/core/extractor/TableExtractor.java
+++ 
b/parser/sql/statement/core/src/main/java/org/apache/shardingsphere/sql/parser/statement/core/extractor/TableExtractor.java
@@ -59,8 +59,10 @@ import 
org.apache.shardingsphere.sql.parser.statement.core.segment.generic.table
 import 
org.apache.shardingsphere.sql.parser.statement.core.segment.generic.table.TableNameSegment;
 import 
org.apache.shardingsphere.sql.parser.statement.core.segment.generic.table.TableSegment;
 import 
org.apache.shardingsphere.sql.parser.statement.core.statement.SQLStatement;
+import 
org.apache.shardingsphere.sql.parser.statement.core.statement.type.ddl.CommentStatement;
 import 
org.apache.shardingsphere.sql.parser.statement.core.statement.type.ddl.index.CreateIndexStatement;
 import 
org.apache.shardingsphere.sql.parser.statement.core.statement.type.ddl.index.DropIndexStatement;
+import 
org.apache.shardingsphere.sql.parser.statement.core.statement.type.ddl.table.AlterTableStatement;
 import 
org.apache.shardingsphere.sql.parser.statement.core.statement.type.ddl.table.CreateTableStatement;
 import 
org.apache.shardingsphere.sql.parser.statement.core.statement.type.ddl.view.CreateViewStatement;
 import 
org.apache.shardingsphere.sql.parser.statement.core.statement.type.dml.DeleteStatement;
@@ -402,12 +404,18 @@ public final class TableExtractor {
             extractTablesFromUpdate((UpdateStatement) sqlStatement);
         } else if (sqlStatement instanceof DeleteStatement) {
             extractTablesFromDelete((DeleteStatement) sqlStatement);
+        } else if (sqlStatement instanceof CreateTableStatement) {
+            extractTablesFromTableSegment(((CreateTableStatement) 
sqlStatement).getTable());
+        } else if (sqlStatement instanceof AlterTableStatement) {
+            extractTablesFromTableSegment(((AlterTableStatement) 
sqlStatement).getTable());
+        } else if (sqlStatement instanceof CommentStatement) {
+            extractTablesFromTableSegment(((CommentStatement) 
sqlStatement).getTable());
         } else if (sqlStatement instanceof CreateViewStatement) {
             extractTablesFromCreateViewStatement((CreateViewStatement) 
sqlStatement);
         } else if (sqlStatement instanceof CreateIndexStatement) {
-            extractTablesFromCreateIndexStatement((CreateIndexStatement) 
sqlStatement);
+            extractTablesFromTableSegment(((CreateIndexStatement) 
sqlStatement).getTable());
         } else if (sqlStatement instanceof DropIndexStatement) {
-            extractTablesFromDropIndexStatement((DropIndexStatement) 
sqlStatement);
+            ((DropIndexStatement) 
sqlStatement).getSimpleTable().ifPresent(this::extractTablesFromTableSegment);
         }
     }
     
@@ -421,24 +429,4 @@ public final class TableExtractor {
         rewriteTables.add(createViewStatement.getView());
         extractTablesFromSelect(createViewStatement.getSelect());
     }
-    
-    /**
-     * Extract table that should be rewritten from create index statement.
-     *
-     * @param createIndexStatement create index statement
-     */
-    public void extractTablesFromCreateIndexStatement(final 
CreateIndexStatement createIndexStatement) {
-        if (null != createIndexStatement.getTable()) {
-            extractTablesFromTableSegment(createIndexStatement.getTable());
-        }
-    }
-    
-    /**
-     * Extract table that should be rewritten from drop index statement.
-     *
-     * @param dropIndexStatement drop index statement
-     */
-    public void extractTablesFromDropIndexStatement(final DropIndexStatement 
dropIndexStatement) {
-        
dropIndexStatement.getSimpleTable().ifPresent(this::extractTablesFromTableSegment);
-    }
 }
diff --git 
a/parser/sql/statement/core/src/test/java/org/apache/shardingsphere/sql/parser/statement/core/extractor/TableExtractorTest.java
 
b/parser/sql/statement/core/src/test/java/org/apache/shardingsphere/sql/parser/statement/core/extractor/TableExtractorTest.java
index 97bafbe9e25..5ad458530e6 100644
--- 
a/parser/sql/statement/core/src/test/java/org/apache/shardingsphere/sql/parser/statement/core/extractor/TableExtractorTest.java
+++ 
b/parser/sql/statement/core/src/test/java/org/apache/shardingsphere/sql/parser/statement/core/extractor/TableExtractorTest.java
@@ -43,8 +43,11 @@ import 
org.apache.shardingsphere.sql.parser.statement.core.segment.generic.table
 import 
org.apache.shardingsphere.sql.parser.statement.core.segment.generic.table.SimpleTableSegment;
 import 
org.apache.shardingsphere.sql.parser.statement.core.segment.generic.table.TableNameSegment;
 import 
org.apache.shardingsphere.sql.parser.statement.core.statement.SQLStatement;
+import 
org.apache.shardingsphere.sql.parser.statement.core.statement.type.ddl.CommentStatement;
+import 
org.apache.shardingsphere.sql.parser.statement.core.statement.type.ddl.DDLStatement;
 import 
org.apache.shardingsphere.sql.parser.statement.core.statement.type.ddl.index.CreateIndexStatement;
 import 
org.apache.shardingsphere.sql.parser.statement.core.statement.type.ddl.index.DropIndexStatement;
+import 
org.apache.shardingsphere.sql.parser.statement.core.statement.type.ddl.table.AlterTableStatement;
 import 
org.apache.shardingsphere.sql.parser.statement.core.statement.type.ddl.table.CreateTableStatement;
 import 
org.apache.shardingsphere.sql.parser.statement.core.statement.type.dml.InsertStatement;
 import 
org.apache.shardingsphere.sql.parser.statement.core.statement.type.dml.SelectStatement;
@@ -238,22 +241,47 @@ class TableExtractorTest {
         assertThat(tableExtractor.getJoinTables().iterator().next(), 
is(joinTableSegment));
     }
     
+    @Test
+    void assertExtractTablesFromCreateTableStatement() {
+        CreateTableStatement createTableStatement = 
mock(CreateTableStatement.class);
+        
when(createTableStatement.getTable()).thenReturn(createSimpleTableSegmentForDDLStatement());
+        assertSimpleTableSegmentFromDDLStatement(createTableStatement);
+    }
+    
+    @Test
+    void assertExtractTablesFromAlterTableStatement() {
+        AlterTableStatement alterTableStatement = 
mock(AlterTableStatement.class);
+        
when(alterTableStatement.getTable()).thenReturn(createSimpleTableSegmentForDDLStatement());
+        assertSimpleTableSegmentFromDDLStatement(alterTableStatement);
+    }
+    
+    @Test
+    void assertExtractTablesFromCommentStatement() {
+        CommentStatement commentStatement = mock(CommentStatement.class);
+        
when(commentStatement.getTable()).thenReturn(createSimpleTableSegmentForDDLStatement());
+        assertSimpleTableSegmentFromDDLStatement(commentStatement);
+    }
+    
     @Test
     void assertExtractTablesFromCreateIndexStatement() {
         CreateIndexStatement createIndexStatement = 
mock(CreateIndexStatement.class);
-        SimpleTableSegment tableSegment = new SimpleTableSegment(new 
TableNameSegment(30, 40, new IdentifierValue("t_order")));
-        when(createIndexStatement.getTable()).thenReturn(tableSegment);
-        
tableExtractor.extractTablesFromCreateIndexStatement(createIndexStatement);
-        assertThat(tableExtractor.getRewriteTables().size(), is(1));
-        
assertTableSegment(tableExtractor.getRewriteTables().iterator().next(), 30, 40, 
"t_order");
+        
when(createIndexStatement.getTable()).thenReturn(createSimpleTableSegmentForDDLStatement());
+        assertSimpleTableSegmentFromDDLStatement(createIndexStatement);
     }
     
     @Test
     void assertExtractTablesFromDropIndexStatement() {
         DropIndexStatement dropIndexStatement = mock(DropIndexStatement.class);
-        SimpleTableSegment tableSegment = new SimpleTableSegment(new 
TableNameSegment(30, 40, new IdentifierValue("t_order")));
-        
when(dropIndexStatement.getSimpleTable()).thenReturn(Optional.of(tableSegment));
-        tableExtractor.extractTablesFromDropIndexStatement(dropIndexStatement);
+        
when(dropIndexStatement.getSimpleTable()).thenReturn(Optional.of(createSimpleTableSegmentForDDLStatement()));
+        assertSimpleTableSegmentFromDDLStatement(dropIndexStatement);
+    }
+    
+    private SimpleTableSegment createSimpleTableSegmentForDDLStatement() {
+        return new SimpleTableSegment(new TableNameSegment(30, 40, new 
IdentifierValue("t_order")));
+    }
+    
+    private void assertSimpleTableSegmentFromDDLStatement(final DDLStatement 
statement) {
+        tableExtractor.extractTablesFromSQLStatement(statement);
         assertThat(tableExtractor.getRewriteTables().size(), is(1));
         
assertTableSegment(tableExtractor.getRewriteTables().iterator().next(), 30, 40, 
"t_order");
     }
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/PipelineE2EDistSQLFacade.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/PipelineE2EDistSQLFacade.java
index 404d0a529b3..b748f3236ab 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/PipelineE2EDistSQLFacade.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/PipelineE2EDistSQLFacade.java
@@ -24,7 +24,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
 import 
org.apache.shardingsphere.database.connector.opengauss.type.OpenGaussDatabaseType;
 import org.apache.shardingsphere.test.e2e.env.runtime.E2ETestEnvironment;
-import org.apache.shardingsphere.test.e2e.env.runtime.type.RunEnvironment.Type;
+import org.apache.shardingsphere.test.e2e.env.runtime.type.RunEnvironment;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.cases.PipelineContainerComposer;
 import org.awaitility.Awaitility;
 
@@ -72,7 +72,7 @@ public final class PipelineE2EDistSQLFacade {
         String registerStorageUnitSQL = "REGISTER STORAGE UNIT ${ds} ( 
URL='${url}', USER='${user}', PASSWORD='${password}')".replace("${ds}", 
storageUnitName)
                 .replace("${user}", username)
                 .replace("${password}", containerComposer.getPassword())
-                .replace("${url}", 
containerComposer.getActualJdbcUrlTemplate(storageUnitName, Type.DOCKER == 
E2ETestEnvironment.getInstance().getRunEnvironment().getType()));
+                .replace("${url}", 
containerComposer.getActualJdbcUrlTemplate(storageUnitName, 
RunEnvironment.Type.DOCKER == 
E2ETestEnvironment.getInstance().getRunEnvironment().getType()));
         containerComposer.proxyExecuteWithLog(registerStorageUnitSQL, 0);
         int timeout = containerComposer.getDatabaseType() instanceof 
OpenGaussDatabaseType ? 60 : 10;
         Awaitility.waitAtMost(timeout, 
TimeUnit.SECONDS).ignoreExceptions().pollInterval(3L, 
TimeUnit.SECONDS).until(() -> 
containerComposer.showStorageUnitsName().contains(storageUnitName));

Reply via email to