sandynz commented on code in PR #20316:
URL: https://github.com/apache/shardingsphere/pull/20316#discussion_r950655347
##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobId.java:
##########
@@ -31,11 +30,7 @@
@ToString(callSuper = true)
public final class MigrationJobId extends AbstractPipelineJobId {
- public static final String CURRENT_VERSION = "01";
Review Comment:
Could we keep CURRENT_VERSION for now, it might still be used in job id.
##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java:
##########
@@ -432,6 +472,69 @@ public void dropMigrationSourceResources(final
Collection<String> resourceNames)
pipelineResourceAPI.persistMetaDataDataSource(JobType.MIGRATION,
metaDataDataSource);
}
+ @Override
+ public void createJobAndStart(final CreateMigrationJobParameter parameter)
{
+ YamlMigrationJobConfiguration result = new
YamlMigrationJobConfiguration();
+ Map<String, DataSourceProperties> metaDataDataSource =
pipelineResourceAPI.getMetaDataDataSource(JobType.MIGRATION);
+ Map<String, Object> sourceDataSourceProps =
DATA_SOURCE_CONFIG_SWAPPER.swapToMap(metaDataDataSource.get(parameter.getSourceDataSourceName()));
+ YamlPipelineDataSourceConfiguration
sourcePipelineDataSourceConfiguration =
createYamlPipelineDataSourceConfiguration(StandardPipelineDataSourceConfiguration.TYPE,
+ YamlEngine.marshal(sourceDataSourceProps));
+ result.setSource(sourcePipelineDataSourceConfiguration);
+ result.setSourceDatabaseType(new
StandardPipelineDataSourceConfiguration(sourceDataSourceProps).getDatabaseType().getType());
+ result.setSourceDataSourceName(parameter.getSourceDataSourceName());
+ result.setSourceTableName(parameter.getSourceTableName());
+ Map<String, Map<String, Object>> targetDataSourceProperties = new
HashMap<>();
+ for (Entry<String, DataSource> entry :
parameter.getTargetDataSources().entrySet()) {
+ Map<String, Object> dataSourceProps =
DATA_SOURCE_CONFIG_SWAPPER.swapToMap(DataSourcePropertiesCreator.create(entry.getValue()));
+ targetDataSourceProperties.put(entry.getKey(), dataSourceProps);
+ }
+ String targetDatabaseName = parameter.getTargetDatabaseName();
+ YamlRootConfiguration targetRootConfig =
getYamlRootConfiguration(targetDatabaseName, targetDataSourceProperties,
parameter.getTargetShardingRuleConfig());
+ PipelineDataSourceConfiguration targetPipelineDataSource = new
ShardingSpherePipelineDataSourceConfiguration(targetRootConfig);
+
result.setTarget(createYamlPipelineDataSourceConfiguration(targetPipelineDataSource.getType(),
YamlEngine.marshal(targetPipelineDataSource.getDataSourceConfiguration())));
+
result.setTargetDatabaseType(targetPipelineDataSource.getDatabaseType().getType());
+ result.setTargetDatabaseName(targetDatabaseName);
+ result.setTargetTableName(parameter.getTargetTableName());
+
result.setSchemaTablesMap(getSchemaNameMap(sourcePipelineDataSourceConfiguration,
parameter.getSourceTableName()));
+ extendYamlJobConfiguration(result);
+ MigrationJobConfiguration jobConfiguration = new
YamlMigrationJobConfigurationSwapper().swapToObject(result);
+ start(jobConfiguration);
+ }
+
+ private YamlRootConfiguration getYamlRootConfiguration(final String
databaseName, final Map<String, Map<String, Object>> yamlDataSources, final
YamlRuleConfiguration shardingRule) {
+ YamlRootConfiguration result = new YamlRootConfiguration();
+ result.setDatabaseName(databaseName);
+ result.setDataSources(yamlDataSources);
+ Collection<YamlRuleConfiguration> yamlRuleConfigs =
Collections.singletonList(shardingRule);
+ result.setRules(yamlRuleConfigs);
+ return result;
+ }
+
+ private YamlPipelineDataSourceConfiguration
createYamlPipelineDataSourceConfiguration(final String type, final String
parameter) {
+ YamlPipelineDataSourceConfiguration result = new
YamlPipelineDataSourceConfiguration();
+ result.setType(type);
+ result.setParameter(parameter);
+ return result;
+ }
+
+ private Map<String, List<String>> getSchemaNameMap(final
YamlPipelineDataSourceConfiguration pipelineDataSourceConfig, final String
tableName) {
+ Map<String, List<String>> result = new HashMap<>();
+ try (PipelineDataSourceWrapper dataSource =
PipelineDataSourceFactory.newInstance(PIPELINE_DATA_SOURCE_CONFIG_SWAPPER.swapToObject(pipelineDataSourceConfig)))
{
+ try (Connection connection = dataSource.getConnection()) {
+ DatabaseMetaData metaData = connection.getMetaData();
+ ResultSet resultSet = metaData.getTables(null, null,
tableName, new String[]{"TABLE"});
+ while (resultSet.next()) {
+ String schemaName = resultSet.getString("TABLE_SCHEM");
+ result.computeIfAbsent(schemaName, k -> new
ArrayList<>()).add(resultSet.getString("TABLE_NAME"));
+ }
+ }
+ } catch (final SQLException ex) {
+ log.error("Get schema name map error", ex);
+ throw new AddMigrationSourceResourceException(ex.getMessage());
+ }
+ return result;
+ }
Review Comment:
Could we put it in SchemaTableUtil?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]