This is an automated email from the ASF dual-hosted git repository.
totalo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 5834943db47 Refactor PipelineDDLGenerator (#20314)
5834943db47 is described below
commit 5834943db4789942d4d9c4ca000b246f1a1d4d62
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Aug 20 00:37:44 2022 +0800
Refactor PipelineDDLGenerator (#20314)
---
.../metadata/generator/PipelineDDLGenerator.java | 45 ++++++++++------------
.../core/prepare/PipelineJobPreparerUtils.java | 3 +-
.../datasource/AbstractDataSourcePreparer.java | 6 +--
.../prepare/datasource/DataSourcePreparer.java | 5 ++-
.../pipeline/scenario/migration/MigrationJob.java | 9 ++++-
.../scenario/migration/MigrationJobPreparer.java | 9 +++--
.../datasource/MySQLDataSourcePreparerTest.java | 6 +--
.../datasource/OpenGaussDataSourcePreparer.java | 2 +-
.../datasource/PostgreSQLDataSourcePreparer.java | 2 +-
9 files changed, 45 insertions(+), 42 deletions(-)
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
index b45548fdaf3..be81ce3c0f2 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
@@ -17,8 +17,6 @@
package org.apache.shardingsphere.data.pipeline.core.metadata.generator;
-import lombok.RequiredArgsConstructor;
-import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.spi.ddlgenerator.CreateTableSQLGeneratorFactory;
import org.apache.shardingsphere.infra.binder.LogicSQL;
@@ -39,7 +37,6 @@ import
org.apache.shardingsphere.sql.parser.sql.common.segment.ddl.constraint.Co
import
org.apache.shardingsphere.sql.parser.sql.common.segment.ddl.index.IndexSegment;
import
org.apache.shardingsphere.sql.parser.sql.common.segment.generic.table.SimpleTableSegment;
import
org.apache.shardingsphere.sql.parser.sql.common.segment.generic.table.TableNameSegment;
-import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import javax.sql.DataSource;
import java.sql.Connection;
@@ -52,9 +49,8 @@ import java.util.Optional;
import java.util.TreeMap;
/**
- * Pipeline ddl generator.
+ * Pipeline DDL generator.
*/
-@RequiredArgsConstructor
@Slf4j
public final class PipelineDDLGenerator {
@@ -64,29 +60,29 @@ public final class PipelineDDLGenerator {
/**
* Generate logic DDL.
- *
- * @param sourceDataSource source data source
+ *
* @param databaseType database type
+ * @param sourceDataSource source data source
* @param schemaName schema name
* @param logicTableName table name
* @param actualTableName actual table name
* @param parserEngine parser engine
* @return DDL
+ * @throws SQLException SQL exception
*/
- @SneakyThrows(SQLException.class)
- public String generateLogicDDL(final DataSource sourceDataSource, final
DatabaseType databaseType,
- final String schemaName, final String
logicTableName, final String actualTableName, final
ShardingSphereSQLParserEngine parserEngine) {
+ public String generateLogicDDL(final DatabaseType databaseType, final
DataSource sourceDataSource,
+ final String schemaName, final String
logicTableName, final String actualTableName, final
ShardingSphereSQLParserEngine parserEngine) throws SQLException {
log.info("generateLogicDDLSQL, databaseType={}, schemaName={},
tableName={}", databaseType.getType(), schemaName, logicTableName);
StringBuilder result = new StringBuilder();
for (String each :
CreateTableSQLGeneratorFactory.getInstance(databaseType).generate(sourceDataSource,
schemaName, actualTableName)) {
- Optional<String> logicSQL = decorate(databaseType, schemaName,
sourceDataSource, each, logicTableName, parserEngine);
+ Optional<String> logicSQL = decorate(databaseType,
sourceDataSource, schemaName, logicTableName, parserEngine, each);
logicSQL.ifPresent(ddlSQL ->
result.append(ddlSQL).append(DELIMITER).append(System.lineSeparator()));
}
return result.toString();
}
- private Optional<String> decorate(final DatabaseType databaseType, final
String schemaName, final DataSource dataSource, final String sql, final String
logicTableName,
- final ShardingSphereSQLParserEngine
parserEngine) throws SQLException {
+ private Optional<String> decorate(final DatabaseType databaseType, final
DataSource dataSource, final String schemaName, final String logicTableName,
+ final ShardingSphereSQLParserEngine
parserEngine, final String sql) throws SQLException {
if (sql.trim().isEmpty()) {
return Optional.empty();
}
@@ -94,7 +90,7 @@ public final class PipelineDDLGenerator {
try (Connection connection = dataSource.getConnection()) {
databaseName = connection.getCatalog();
}
- String result = decorateActualSQL(sql.trim(), logicTableName,
databaseName, parserEngine);
+ String result = decorateActualSQL(databaseName, logicTableName,
parserEngine, sql.trim());
// TODO remove it after set search_path is supported.
if ("openGauss".equals(databaseType.getType())) {
return decorateOpenGauss(databaseName, schemaName, result,
parserEngine);
@@ -102,8 +98,8 @@ public final class PipelineDDLGenerator {
return Optional.of(result);
}
- private String decorateActualSQL(final String sql, final String
logicTableName, final String databaseName, final ShardingSphereSQLParserEngine
parserEngine) {
- LogicSQL logicSQL = getLogicSQL(sql, databaseName, parserEngine);
+ private String decorateActualSQL(final String databaseName, final String
logicTableName, final ShardingSphereSQLParserEngine parserEngine, final String
sql) {
+ LogicSQL logicSQL = getLogicSQL(databaseName, parserEngine, sql);
SQLStatementContext<?> sqlStatementContext =
logicSQL.getSqlStatementContext();
Map<SQLSegment, String> replaceMap = new
TreeMap<>(Comparator.comparing(SQLSegment::getStartIndex));
if (sqlStatementContext instanceof CreateTableStatementContext) {
@@ -124,6 +120,11 @@ public final class PipelineDDLGenerator {
return doDecorateActualTable(replaceMap, sql);
}
+ private LogicSQL getLogicSQL(final String databaseName, final
ShardingSphereSQLParserEngine parserEngine, final String sql) {
+ SQLStatementContext<?> sqlStatementContext =
SQLStatementContextFactory.newInstance(null, parserEngine.parse(sql, false),
databaseName);
+ return new LogicSQL(sqlStatementContext, sql, Collections.emptyList());
+ }
+
private void appendFromIndexAndConstraint(final Map<SQLSegment, String>
replaceMap, final String logicTableName, final SQLStatementContext<?>
sqlStatementContext) {
if (!(sqlStatementContext instanceof TableAvailable) ||
((TableAvailable)
sqlStatementContext).getTablesContext().getTables().isEmpty()) {
return;
@@ -167,12 +168,6 @@ public final class PipelineDDLGenerator {
return result.toString();
}
- private LogicSQL getLogicSQL(final String sql, final String databaseName,
final ShardingSphereSQLParserEngine parserEngine) {
- SQLStatement sqlStatement = parserEngine.parse(sql, false);
- SQLStatementContext<?> sqlStatementContext =
SQLStatementContextFactory.newInstance(null, sqlStatement, databaseName);
- return new LogicSQL(sqlStatementContext, sql, Collections.emptyList());
- }
-
// TODO remove it after set search_path is supported.
private Optional<String> decorateOpenGauss(final String databaseName,
final String schemaName, final String logicSQL,
final
ShardingSphereSQLParserEngine parserEngine) {
@@ -183,10 +178,10 @@ public final class PipelineDDLGenerator {
}
private String replaceTableNameWithPrefix(final String sql, final String
prefix, final String databaseName, final ShardingSphereSQLParserEngine
parserEngine) {
- LogicSQL logicSQL = getLogicSQL(sql, databaseName, parserEngine);
+ LogicSQL logicSQL = getLogicSQL(databaseName, parserEngine, sql);
SQLStatementContext<?> sqlStatementContext =
logicSQL.getSqlStatementContext();
- if (sqlStatementContext instanceof CreateTableStatementContext ||
sqlStatementContext instanceof CommentStatementContext || sqlStatementContext
instanceof CreateIndexStatementContext
- || sqlStatementContext instanceof AlterTableStatementContext) {
+ if (sqlStatementContext instanceof CreateTableStatementContext ||
sqlStatementContext instanceof CommentStatementContext
+ || sqlStatementContext instanceof CreateIndexStatementContext
|| sqlStatementContext instanceof AlterTableStatementContext) {
if (!sqlStatementContext.getTablesContext().getTables().isEmpty())
{
TableNameSegment tableNameSegment =
sqlStatementContext.getTablesContext().getTables().iterator().next().getTableName();
Map<SQLSegment, String> replaceMap = new
TreeMap<>(Comparator.comparing(SQLSegment::getStartIndex));
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java
index fd4eb6b7012..3b364c51da6 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java
@@ -71,8 +71,9 @@ public final class PipelineJobPreparerUtils {
*
* @param databaseType database type
* @param prepareTargetTablesParameter prepare target tables parameter
+ * @throws SQLException SQL exception
*/
- public static void prepareTargetTables(final String databaseType, final
PrepareTargetTablesParameter prepareTargetTablesParameter) {
+ public static void prepareTargetTables(final String databaseType, final
PrepareTargetTablesParameter prepareTargetTablesParameter) throws SQLException {
Optional<DataSourcePreparer> dataSourcePreparer =
DataSourcePreparerFactory.getInstance(databaseType);
if (!dataSourcePreparer.isPresent()) {
log.info("dataSourcePreparer null, ignore prepare target");
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
index c4d196c2f01..9d19924716b 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
@@ -119,16 +119,16 @@ public abstract class AbstractDataSourcePreparer
implements DataSourcePreparer {
return
PATTERN_CREATE_TABLE.matcher(createTableSQL).replaceFirst("CREATE TABLE IF NOT
EXISTS ");
}
- protected List<String> listCreateLogicalTableSQL(final
PrepareTargetTablesParameter parameter) {
+ protected final List<String> listCreateLogicalTableSQL(final
PrepareTargetTablesParameter parameter) throws SQLException {
PipelineDDLGenerator generator = new PipelineDDLGenerator();
List<String> result = new LinkedList<>();
for (JobDataNodeEntry each :
parameter.getTablesFirstDataNodes().getEntries()) {
- String schemaName =
parameter.getTableNameSchemaNameMapping().getSchemaName(each.getLogicTableName());
String dataSourceName =
each.getDataNodes().get(0).getDataSourceName();
DataSource dataSource =
parameter.getSourceDataSourceMap().get(dataSourceName);
DatabaseType databaseType =
DatabaseTypeEngine.getDatabaseType(Collections.singletonList(dataSource));
+ String schemaName =
parameter.getTableNameSchemaNameMapping().getSchemaName(each.getLogicTableName());
String actualTableName =
parameter.getTableNameMap().get(each.getLogicTableName());
- result.add(generator.generateLogicDDL(dataSource, databaseType,
schemaName, each.getLogicTableName(), actualTableName,
parameter.getSqlParserEngine()));
+ result.add(generator.generateLogicDDL(databaseType, dataSource,
schemaName, each.getLogicTableName(), actualTableName,
parameter.getSqlParserEngine()));
}
return result;
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/DataSourcePreparer.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/DataSourcePreparer.java
index bd959499ab3..e4ef48ab384 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/DataSourcePreparer.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/DataSourcePreparer.java
@@ -20,6 +20,8 @@ package
org.apache.shardingsphere.data.pipeline.core.prepare.datasource;
import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
+import java.sql.SQLException;
+
/**
* Data source preparer.
*/
@@ -37,6 +39,7 @@ public interface DataSourcePreparer extends TypedSPI {
* Prepare target tables.
*
* @param parameter prepare target tables parameter
+ * @throws SQLException SQL exception
*/
- void prepareTargetTables(PrepareTargetTablesParameter parameter);
+ void prepareTargetTables(PrepareTargetTablesParameter parameter) throws
SQLException;
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index b7bd288daac..bdb54293c0a 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -37,6 +37,8 @@ import
org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarr
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
+import java.sql.SQLException;
+
/**
* Migration job.
*/
@@ -86,13 +88,16 @@ public final class MigrationJob extends AbstractPipelineJob
implements SimpleJob
log.info("pipeline ignore exception: {}", ex.getMessage());
PipelineJobCenter.stop(getJobId());
// CHECKSTYLE:OFF
- } catch (final RuntimeException ex) {
+ } catch (final SQLException | RuntimeException ex) {
// CHECKSTYLE:ON
log.error("job prepare failed, {}-{}", getJobId(),
jobItemContext.getShardingItem(), ex);
PipelineJobCenter.stop(getJobId());
jobItemContext.setStatus(JobStatus.PREPARING_FAILURE);
MigrationJobAPIFactory.getInstance().persistJobItemProgress(jobItemContext);
- throw ex;
+ if (ex instanceof RuntimeException) {
+ throw (RuntimeException) ex;
+ }
+ throw new RuntimeException(ex);
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
index 384f9fdd1a0..eae08b224aa 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
@@ -70,8 +70,9 @@ public final class MigrationJobPreparer {
* Do prepare work for scaling job.
*
* @param jobItemContext job item context
+ * @throws SQLException SQL exception
*/
- public void prepare(final MigrationJobItemContext jobItemContext) {
+ public void prepare(final MigrationJobItemContext jobItemContext) throws
SQLException {
PipelineJobPreparerUtils.checkSourceDataSource(jobItemContext.getJobConfig().getSourceDatabaseType(),
Collections.singleton(jobItemContext.getSourceDataSource()));
if (jobItemContext.isStopping()) {
throw new PipelineIgnoredException("Job stopping, jobId=" +
jobItemContext.getJobId());
@@ -95,7 +96,7 @@ public final class MigrationJobPreparer {
}
}
- private void prepareAndCheckTargetWithLock(final MigrationJobItemContext
jobItemContext) {
+ private void prepareAndCheckTargetWithLock(final MigrationJobItemContext
jobItemContext) throws SQLException {
MigrationJobConfiguration jobConfig = jobItemContext.getJobConfig();
String lockName = "prepare-" + jobConfig.getJobId();
LockContext lockContext =
PipelineContext.getContextManager().getInstanceContext().getLockContext();
@@ -124,7 +125,7 @@ public final class MigrationJobPreparer {
}
}
- private void prepareAndCheckTarget(final MigrationJobItemContext
jobItemContext) {
+ private void prepareAndCheckTarget(final MigrationJobItemContext
jobItemContext) throws SQLException {
prepareTarget(jobItemContext);
InventoryIncrementalJobItemProgress initProgress =
jobItemContext.getInitProgress();
if (null == initProgress || initProgress.getStatus() ==
JobStatus.PREPARING_FAILURE) {
@@ -134,7 +135,7 @@ public final class MigrationJobPreparer {
}
}
- private void prepareTarget(final MigrationJobItemContext jobItemContext) {
+ private void prepareTarget(final MigrationJobItemContext jobItemContext)
throws SQLException {
MigrationJobConfiguration jobConfig = jobItemContext.getJobConfig();
TableNameSchemaNameMapping tableNameSchemaNameMapping =
jobItemContext.getTaskConfig().getDumperConfig().getTableNameSchemaNameMapping();
String targetDatabaseType = jobConfig.getTargetDatabaseType();
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java
index 5d55e4990bf..98cc575d786 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java
@@ -96,8 +96,7 @@ public final class MySQLDataSourcePreparerTest {
.thenReturn(sourceScalingDataSourceConfig);
mockedStaticPipelineDataSourceConfigurationFactory.when(() ->
PipelineDataSourceConfigurationFactory.newInstance(eq("ShardingSphereJDBC"),
eq("target")))
.thenReturn(targetScalingDataSourceConfig);
- MySQLDataSourcePreparer mySQLDataSourcePreparer = new
MySQLDataSourcePreparer();
-
mySQLDataSourcePreparer.prepareTargetTables(prepareTargetTablesParameter);
+ new
MySQLDataSourcePreparer().prepareTargetTables(prepareTargetTablesParameter);
verify(sourceDataSourceWrapper).getConnection();
verify(targetDataSourceWrapper).getConnection();
}
@@ -111,8 +110,7 @@ public final class MySQLDataSourcePreparerTest {
mockedStaticPipelineDataSourceConfigurationFactory.when(() ->
PipelineDataSourceConfigurationFactory.newInstance(eq("ShardingSphereJDBC"),
eq("target")))
.thenReturn(targetScalingDataSourceConfig);
when(sourceDataSourceWrapper.getConnection()).thenThrow(SQLException.class);
- MySQLDataSourcePreparer mySQLDataSourcePreparer = new
MySQLDataSourcePreparer();
-
mySQLDataSourcePreparer.prepareTargetTables(prepareTargetTablesParameter);
+ new
MySQLDataSourcePreparer().prepareTargetTables(prepareTargetTablesParameter);
}
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java
index f42546f4471..d58133902b0 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java
@@ -36,7 +36,7 @@ import java.util.stream.Collectors;
public final class OpenGaussDataSourcePreparer extends
AbstractDataSourcePreparer {
@Override
- public void prepareTargetTables(final PrepareTargetTablesParameter
parameter) {
+ public void prepareTargetTables(final PrepareTargetTablesParameter
parameter) throws SQLException {
List<String> createLogicalTableSQLs =
listCreateLogicalTableSQL(parameter);
try (Connection targetConnection =
getCachedDataSource(parameter.getTargetDataSourceConfig(),
parameter.getDataSourceManager()).getConnection()) {
for (String createLogicalTableSQL : createLogicalTableSQLs) {
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSourcePreparer.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSource
[...]
index 589b1860065..8f75e968183 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSourcePreparer.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSourcePreparer.java
@@ -36,7 +36,7 @@ import java.util.stream.Collectors;
public final class PostgreSQLDataSourcePreparer extends
AbstractDataSourcePreparer {
@Override
- public void prepareTargetTables(final PrepareTargetTablesParameter
parameter) {
+ public void prepareTargetTables(final PrepareTargetTablesParameter
parameter) throws SQLException {
List<String> createLogicalTableSQLs =
listCreateLogicalTableSQL(parameter);
try (Connection targetConnection =
getCachedDataSource(parameter.getTargetDataSourceConfig(),
parameter.getDataSourceManager()).getConnection()) {
for (String createLogicalTableSQL : createLogicalTableSQLs) {