This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 3d9fd0d3e82 Improve openGauss pg_get_tabledef SQLs append schema for
migration (#37820)
3d9fd0d3e82 is described below
commit 3d9fd0d3e825de0305cad2bc140c79dd459b95c3
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Fri Jan 23 15:22:25 2026 +0800
Improve openGauss pg_get_tabledef SQLs append schema for migration (#37820)
* Improve TableExtractor.extractTablesFromSQLStatement: add CREATE TABLE,
ALTER TABLE, COMMENT
* Improve OpenGaussPipelineSQLBuilder.buildCreateTableSQLs: not append
schema for ALTER TABLE, do it in PipelineDDLDecorator by SQL AST replace
* Tiny refactoring
* Update test
---
.../datasource/PipelineJobDataSourcePreparer.java | 12 +++---
.../sqlbuilder/OpenGaussPipelineSQLBuilder.java | 12 ++----
.../OpenGaussPipelineSQLBuilderTest.java | 2 +-
.../statement/core/extractor/TableExtractor.java | 32 +++++-----------
.../core/extractor/TableExtractorTest.java | 44 ++++++++++++++++++----
.../pipeline/util/PipelineE2EDistSQLFacade.java | 4 +-
6 files changed, 59 insertions(+), 47 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparer.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparer.java
index b8a132e589e..a5638f53a61 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparer.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparer.java
@@ -29,11 +29,11 @@ import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.Cr
import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetSchemasParameter;
import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter;
import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
-import
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
import
org.apache.shardingsphere.database.connector.core.metadata.database.metadata.DialectDatabaseMetaData;
import
org.apache.shardingsphere.database.connector.core.spi.DatabaseTypedSPILoader;
import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
import
org.apache.shardingsphere.database.connector.core.type.DatabaseTypeRegistry;
+import
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.infra.parser.SQLParserEngine;
@@ -115,13 +115,15 @@ public final class PipelineJobDataSourcePreparer {
final long startTimeMillis = System.currentTimeMillis();
PipelineDataSourceManager dataSourceManager =
param.getDataSourceManager();
for (CreateTableConfiguration each :
param.getCreateTableConfigurations()) {
- try (Connection targetConnection =
dataSourceManager.getDataSource(each.getTargetDataSourceConfig()).getConnection())
{
+ try (
+ Connection targetConnection =
dataSourceManager.getDataSource(each.getTargetDataSourceConfig()).getConnection();
+ Statement statement = targetConnection.createStatement()) {
List<String> createTargetTableSQL =
getCreateTargetTableSQL(each, dataSourceManager);
for (String sql : createTargetTableSQL) {
ShardingSphereMetaData metaData =
((ShardingSphereConnection)
targetConnection).getContextManager().getMetaDataContexts().getMetaData();
Optional<String> decoratedSQL =
decorateTargetTableSQL(each, param.getSqlParserEngine(), metaData,
param.getTargetDatabaseName(), sql);
if (decoratedSQL.isPresent()) {
- executeTargetTableSQL(targetConnection,
addIfNotExistsForCreateTableSQL(decoratedSQL.get()));
+ executeTargetTableSQL(statement,
addIfNotExistsForCreateTableSQL(decoratedSQL.get()));
}
}
}
@@ -146,9 +148,9 @@ public final class PipelineJobDataSourcePreparer {
return decoratedSQL.map(String::trim).filter(trimmedSql ->
!Strings.isNullOrEmpty(trimmedSql));
}
- private void executeTargetTableSQL(final Connection targetConnection,
final String sql) throws SQLException {
+ private void executeTargetTableSQL(final Statement statement, final String
sql) throws SQLException {
log.info("Execute target table SQL: {}", sql);
- try (Statement statement = targetConnection.createStatement()) {
+ try {
statement.execute(sql);
} catch (final SQLException ex) {
for (String each :
DatabaseTypedSPILoader.findService(DialectPipelineJobDataSourcePrepareOption.class,
databaseType)
diff --git
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
index 7bc4e68337a..c06d11f5d95 100644
---
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
+++
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
@@ -19,8 +19,8 @@ package
org.apache.shardingsphere.data.pipeline.opengauss.sqlbuilder;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.CreateTableSQLGenerateException;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
-import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.segment.PipelineSQLSegmentBuilder;
import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder;
+import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.segment.PipelineSQLSegmentBuilder;
import javax.sql.DataSource;
import java.sql.Connection;
@@ -77,14 +77,8 @@ public final class OpenGaussPipelineSQLBuilder implements
DialectPipelineSQLBuil
ResultSet resultSet =
statement.executeQuery(String.format("SELECT * FROM pg_get_tabledef('%s.%s')",
schemaName, tableName))) {
if (resultSet.next()) {
// TODO use ";" to split is not always correct if return
value's comments contains ";"
- Collection<String> defSQLs =
Arrays.asList(resultSet.getString("pg_get_tabledef").split(";"));
- return defSQLs.stream().map(sql -> {
- String targetPrefix = "ALTER TABLE " + tableName;
- if (sql.trim().startsWith(targetPrefix)) {
- return sql.replaceFirst(targetPrefix, "ALTER TABLE " +
schemaName + "." + tableName);
- }
- return sql;
- }).collect(Collectors.toList());
+ String tableDefinition =
resultSet.getString("pg_get_tabledef");
+ return Arrays.asList(tableDefinition.split(";"));
}
}
throw new CreateTableSQLGenerateException(tableName);
diff --git
a/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilderTest.java
b/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilderTest.java
index a9e933ed704..d908c03382c 100644
---
a/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilderTest.java
+++
b/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilderTest.java
@@ -98,7 +98,7 @@ class OpenGaussPipelineSQLBuilderTest {
when(resultSet.getString("pg_get_tabledef")).thenReturn("CREATE TABLE
foo_tbl (id INT PRIMARY KEY);ALTER TABLE foo_tbl OWNER TO root");
when(connection.createStatement().executeQuery("SELECT * FROM
pg_get_tabledef('foo_schema.foo_tbl')")).thenReturn(resultSet);
assertThat(sqlBuilder.buildCreateTableSQLs(new
MockedDataSource(connection), "foo_schema", "foo_tbl"),
- is(Arrays.asList("CREATE TABLE foo_tbl (id INT PRIMARY KEY)",
"ALTER TABLE foo_schema.foo_tbl OWNER TO root")));
+ is(Arrays.asList("CREATE TABLE foo_tbl (id INT PRIMARY KEY)",
"ALTER TABLE foo_tbl OWNER TO root")));
}
@Test
diff --git
a/parser/sql/statement/core/src/main/java/org/apache/shardingsphere/sql/parser/statement/core/extractor/TableExtractor.java
b/parser/sql/statement/core/src/main/java/org/apache/shardingsphere/sql/parser/statement/core/extractor/TableExtractor.java
index 63127caef1c..308ac86d705 100644
---
a/parser/sql/statement/core/src/main/java/org/apache/shardingsphere/sql/parser/statement/core/extractor/TableExtractor.java
+++
b/parser/sql/statement/core/src/main/java/org/apache/shardingsphere/sql/parser/statement/core/extractor/TableExtractor.java
@@ -59,8 +59,10 @@ import
org.apache.shardingsphere.sql.parser.statement.core.segment.generic.table
import
org.apache.shardingsphere.sql.parser.statement.core.segment.generic.table.TableNameSegment;
import
org.apache.shardingsphere.sql.parser.statement.core.segment.generic.table.TableSegment;
import
org.apache.shardingsphere.sql.parser.statement.core.statement.SQLStatement;
+import
org.apache.shardingsphere.sql.parser.statement.core.statement.type.ddl.CommentStatement;
import
org.apache.shardingsphere.sql.parser.statement.core.statement.type.ddl.index.CreateIndexStatement;
import
org.apache.shardingsphere.sql.parser.statement.core.statement.type.ddl.index.DropIndexStatement;
+import
org.apache.shardingsphere.sql.parser.statement.core.statement.type.ddl.table.AlterTableStatement;
import
org.apache.shardingsphere.sql.parser.statement.core.statement.type.ddl.table.CreateTableStatement;
import
org.apache.shardingsphere.sql.parser.statement.core.statement.type.ddl.view.CreateViewStatement;
import
org.apache.shardingsphere.sql.parser.statement.core.statement.type.dml.DeleteStatement;
@@ -402,12 +404,18 @@ public final class TableExtractor {
extractTablesFromUpdate((UpdateStatement) sqlStatement);
} else if (sqlStatement instanceof DeleteStatement) {
extractTablesFromDelete((DeleteStatement) sqlStatement);
+ } else if (sqlStatement instanceof CreateTableStatement) {
+ extractTablesFromTableSegment(((CreateTableStatement)
sqlStatement).getTable());
+ } else if (sqlStatement instanceof AlterTableStatement) {
+ extractTablesFromTableSegment(((AlterTableStatement)
sqlStatement).getTable());
+ } else if (sqlStatement instanceof CommentStatement) {
+ extractTablesFromTableSegment(((CommentStatement)
sqlStatement).getTable());
} else if (sqlStatement instanceof CreateViewStatement) {
extractTablesFromCreateViewStatement((CreateViewStatement)
sqlStatement);
} else if (sqlStatement instanceof CreateIndexStatement) {
- extractTablesFromCreateIndexStatement((CreateIndexStatement)
sqlStatement);
+ extractTablesFromTableSegment(((CreateIndexStatement)
sqlStatement).getTable());
} else if (sqlStatement instanceof DropIndexStatement) {
- extractTablesFromDropIndexStatement((DropIndexStatement)
sqlStatement);
+ ((DropIndexStatement)
sqlStatement).getSimpleTable().ifPresent(this::extractTablesFromTableSegment);
}
}
@@ -421,24 +429,4 @@ public final class TableExtractor {
rewriteTables.add(createViewStatement.getView());
extractTablesFromSelect(createViewStatement.getSelect());
}
-
- /**
- * Extract table that should be rewritten from create index statement.
- *
- * @param createIndexStatement create index statement
- */
- public void extractTablesFromCreateIndexStatement(final
CreateIndexStatement createIndexStatement) {
- if (null != createIndexStatement.getTable()) {
- extractTablesFromTableSegment(createIndexStatement.getTable());
- }
- }
-
- /**
- * Extract table that should be rewritten from drop index statement.
- *
- * @param dropIndexStatement drop index statement
- */
- public void extractTablesFromDropIndexStatement(final DropIndexStatement
dropIndexStatement) {
-
dropIndexStatement.getSimpleTable().ifPresent(this::extractTablesFromTableSegment);
- }
}
diff --git
a/parser/sql/statement/core/src/test/java/org/apache/shardingsphere/sql/parser/statement/core/extractor/TableExtractorTest.java
b/parser/sql/statement/core/src/test/java/org/apache/shardingsphere/sql/parser/statement/core/extractor/TableExtractorTest.java
index 97bafbe9e25..5ad458530e6 100644
---
a/parser/sql/statement/core/src/test/java/org/apache/shardingsphere/sql/parser/statement/core/extractor/TableExtractorTest.java
+++
b/parser/sql/statement/core/src/test/java/org/apache/shardingsphere/sql/parser/statement/core/extractor/TableExtractorTest.java
@@ -43,8 +43,11 @@ import
org.apache.shardingsphere.sql.parser.statement.core.segment.generic.table
import
org.apache.shardingsphere.sql.parser.statement.core.segment.generic.table.SimpleTableSegment;
import
org.apache.shardingsphere.sql.parser.statement.core.segment.generic.table.TableNameSegment;
import
org.apache.shardingsphere.sql.parser.statement.core.statement.SQLStatement;
+import
org.apache.shardingsphere.sql.parser.statement.core.statement.type.ddl.CommentStatement;
+import
org.apache.shardingsphere.sql.parser.statement.core.statement.type.ddl.DDLStatement;
import
org.apache.shardingsphere.sql.parser.statement.core.statement.type.ddl.index.CreateIndexStatement;
import
org.apache.shardingsphere.sql.parser.statement.core.statement.type.ddl.index.DropIndexStatement;
+import
org.apache.shardingsphere.sql.parser.statement.core.statement.type.ddl.table.AlterTableStatement;
import
org.apache.shardingsphere.sql.parser.statement.core.statement.type.ddl.table.CreateTableStatement;
import
org.apache.shardingsphere.sql.parser.statement.core.statement.type.dml.InsertStatement;
import
org.apache.shardingsphere.sql.parser.statement.core.statement.type.dml.SelectStatement;
@@ -238,22 +241,47 @@ class TableExtractorTest {
assertThat(tableExtractor.getJoinTables().iterator().next(),
is(joinTableSegment));
}
+ @Test
+ void assertExtractTablesFromCreateTableStatement() {
+ CreateTableStatement createTableStatement =
mock(CreateTableStatement.class);
+
when(createTableStatement.getTable()).thenReturn(createSimpleTableSegmentForDDLStatement());
+ assertSimpleTableSegmentFromDDLStatement(createTableStatement);
+ }
+
+ @Test
+ void assertExtractTablesFromAlterTableStatement() {
+ AlterTableStatement alterTableStatement =
mock(AlterTableStatement.class);
+
when(alterTableStatement.getTable()).thenReturn(createSimpleTableSegmentForDDLStatement());
+ assertSimpleTableSegmentFromDDLStatement(alterTableStatement);
+ }
+
+ @Test
+ void assertExtractTablesFromCommentStatement() {
+ CommentStatement commentStatement = mock(CommentStatement.class);
+
when(commentStatement.getTable()).thenReturn(createSimpleTableSegmentForDDLStatement());
+ assertSimpleTableSegmentFromDDLStatement(commentStatement);
+ }
+
@Test
void assertExtractTablesFromCreateIndexStatement() {
CreateIndexStatement createIndexStatement =
mock(CreateIndexStatement.class);
- SimpleTableSegment tableSegment = new SimpleTableSegment(new
TableNameSegment(30, 40, new IdentifierValue("t_order")));
- when(createIndexStatement.getTable()).thenReturn(tableSegment);
-
tableExtractor.extractTablesFromCreateIndexStatement(createIndexStatement);
- assertThat(tableExtractor.getRewriteTables().size(), is(1));
-
assertTableSegment(tableExtractor.getRewriteTables().iterator().next(), 30, 40,
"t_order");
+
when(createIndexStatement.getTable()).thenReturn(createSimpleTableSegmentForDDLStatement());
+ assertSimpleTableSegmentFromDDLStatement(createIndexStatement);
}
@Test
void assertExtractTablesFromDropIndexStatement() {
DropIndexStatement dropIndexStatement = mock(DropIndexStatement.class);
- SimpleTableSegment tableSegment = new SimpleTableSegment(new
TableNameSegment(30, 40, new IdentifierValue("t_order")));
-
when(dropIndexStatement.getSimpleTable()).thenReturn(Optional.of(tableSegment));
- tableExtractor.extractTablesFromDropIndexStatement(dropIndexStatement);
+
when(dropIndexStatement.getSimpleTable()).thenReturn(Optional.of(createSimpleTableSegmentForDDLStatement()));
+ assertSimpleTableSegmentFromDDLStatement(dropIndexStatement);
+ }
+
+ private SimpleTableSegment createSimpleTableSegmentForDDLStatement() {
+ return new SimpleTableSegment(new TableNameSegment(30, 40, new
IdentifierValue("t_order")));
+ }
+
+ private void assertSimpleTableSegmentFromDDLStatement(final DDLStatement
statement) {
+ tableExtractor.extractTablesFromSQLStatement(statement);
assertThat(tableExtractor.getRewriteTables().size(), is(1));
assertTableSegment(tableExtractor.getRewriteTables().iterator().next(), 30, 40,
"t_order");
}
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/PipelineE2EDistSQLFacade.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/PipelineE2EDistSQLFacade.java
index 404d0a529b3..b748f3236ab 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/PipelineE2EDistSQLFacade.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/PipelineE2EDistSQLFacade.java
@@ -24,7 +24,7 @@ import
org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
import
org.apache.shardingsphere.database.connector.opengauss.type.OpenGaussDatabaseType;
import org.apache.shardingsphere.test.e2e.env.runtime.E2ETestEnvironment;
-import org.apache.shardingsphere.test.e2e.env.runtime.type.RunEnvironment.Type;
+import org.apache.shardingsphere.test.e2e.env.runtime.type.RunEnvironment;
import
org.apache.shardingsphere.test.e2e.operation.pipeline.cases.PipelineContainerComposer;
import org.awaitility.Awaitility;
@@ -72,7 +72,7 @@ public final class PipelineE2EDistSQLFacade {
String registerStorageUnitSQL = "REGISTER STORAGE UNIT ${ds} (
URL='${url}', USER='${user}', PASSWORD='${password}')".replace("${ds}",
storageUnitName)
.replace("${user}", username)
.replace("${password}", containerComposer.getPassword())
- .replace("${url}",
containerComposer.getActualJdbcUrlTemplate(storageUnitName, Type.DOCKER ==
E2ETestEnvironment.getInstance().getRunEnvironment().getType()));
+ .replace("${url}",
containerComposer.getActualJdbcUrlTemplate(storageUnitName,
RunEnvironment.Type.DOCKER ==
E2ETestEnvironment.getInstance().getRunEnvironment().getType()));
containerComposer.proxyExecuteWithLog(registerStorageUnitSQL, 0);
int timeout = containerComposer.getDatabaseType() instanceof
OpenGaussDatabaseType ? 60 : 10;
Awaitility.waitAtMost(timeout,
TimeUnit.SECONDS).ignoreExceptions().pollInterval(3L,
TimeUnit.SECONDS).until(() ->
containerComposer.showStorageUnitsName().contains(storageUnitName));