sandynz commented on code in PR #20316:
URL: https://github.com/apache/shardingsphere/pull/20316#discussion_r950655347


##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobId.java:
##########
@@ -31,11 +30,7 @@
 @ToString(callSuper = true)
 public final class MigrationJobId extends AbstractPipelineJobId {
     
-    public static final String CURRENT_VERSION = "01";

Review Comment:
   Could we keep CURRENT_VERSION for now, it might still be used in job id.



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java:
##########
@@ -432,6 +472,69 @@ public void dropMigrationSourceResources(final 
Collection<String> resourceNames)
         pipelineResourceAPI.persistMetaDataDataSource(JobType.MIGRATION, 
metaDataDataSource);
     }
     
+    @Override
+    public void createJobAndStart(final CreateMigrationJobParameter parameter) 
{
+        YamlMigrationJobConfiguration result = new 
YamlMigrationJobConfiguration();
+        Map<String, DataSourceProperties> metaDataDataSource = 
pipelineResourceAPI.getMetaDataDataSource(JobType.MIGRATION);
+        Map<String, Object> sourceDataSourceProps = 
DATA_SOURCE_CONFIG_SWAPPER.swapToMap(metaDataDataSource.get(parameter.getSourceDataSourceName()));
+        YamlPipelineDataSourceConfiguration 
sourcePipelineDataSourceConfiguration = 
createYamlPipelineDataSourceConfiguration(StandardPipelineDataSourceConfiguration.TYPE,
+                YamlEngine.marshal(sourceDataSourceProps));
+        result.setSource(sourcePipelineDataSourceConfiguration);
+        result.setSourceDatabaseType(new 
StandardPipelineDataSourceConfiguration(sourceDataSourceProps).getDatabaseType().getType());
+        result.setSourceDataSourceName(parameter.getSourceDataSourceName());
+        result.setSourceTableName(parameter.getSourceTableName());
+        Map<String, Map<String, Object>> targetDataSourceProperties = new 
HashMap<>();
+        for (Entry<String, DataSource> entry : 
parameter.getTargetDataSources().entrySet()) {
+            Map<String, Object> dataSourceProps = 
DATA_SOURCE_CONFIG_SWAPPER.swapToMap(DataSourcePropertiesCreator.create(entry.getValue()));
+            targetDataSourceProperties.put(entry.getKey(), dataSourceProps);
+        }
+        String targetDatabaseName = parameter.getTargetDatabaseName();
+        YamlRootConfiguration targetRootConfig = 
getYamlRootConfiguration(targetDatabaseName, targetDataSourceProperties, 
parameter.getTargetShardingRuleConfig());
+        PipelineDataSourceConfiguration targetPipelineDataSource = new 
ShardingSpherePipelineDataSourceConfiguration(targetRootConfig);
+        
result.setTarget(createYamlPipelineDataSourceConfiguration(targetPipelineDataSource.getType(),
 YamlEngine.marshal(targetPipelineDataSource.getDataSourceConfiguration())));
+        
result.setTargetDatabaseType(targetPipelineDataSource.getDatabaseType().getType());
+        result.setTargetDatabaseName(targetDatabaseName);
+        result.setTargetTableName(parameter.getTargetTableName());
+        
result.setSchemaTablesMap(getSchemaNameMap(sourcePipelineDataSourceConfiguration,
 parameter.getSourceTableName()));
+        extendYamlJobConfiguration(result);
+        MigrationJobConfiguration jobConfiguration = new 
YamlMigrationJobConfigurationSwapper().swapToObject(result);
+        start(jobConfiguration);
+    }
+    
+    private YamlRootConfiguration getYamlRootConfiguration(final String 
databaseName, final Map<String, Map<String, Object>> yamlDataSources, final 
YamlRuleConfiguration shardingRule) {
+        YamlRootConfiguration result = new YamlRootConfiguration();
+        result.setDatabaseName(databaseName);
+        result.setDataSources(yamlDataSources);
+        Collection<YamlRuleConfiguration> yamlRuleConfigs = 
Collections.singletonList(shardingRule);
+        result.setRules(yamlRuleConfigs);
+        return result;
+    }
+    
+    private YamlPipelineDataSourceConfiguration 
createYamlPipelineDataSourceConfiguration(final String type, final String 
parameter) {
+        YamlPipelineDataSourceConfiguration result = new 
YamlPipelineDataSourceConfiguration();
+        result.setType(type);
+        result.setParameter(parameter);
+        return result;
+    }
+    
+    private Map<String, List<String>> getSchemaNameMap(final 
YamlPipelineDataSourceConfiguration pipelineDataSourceConfig, final String 
tableName) {
+        Map<String, List<String>> result = new HashMap<>();
+        try (PipelineDataSourceWrapper dataSource = 
PipelineDataSourceFactory.newInstance(PIPELINE_DATA_SOURCE_CONFIG_SWAPPER.swapToObject(pipelineDataSourceConfig)))
 {
+            try (Connection connection = dataSource.getConnection()) {
+                DatabaseMetaData metaData = connection.getMetaData();
+                ResultSet resultSet = metaData.getTables(null, null, 
tableName, new String[]{"TABLE"});
+                while (resultSet.next()) {
+                    String schemaName = resultSet.getString("TABLE_SCHEM");
+                    result.computeIfAbsent(schemaName, k -> new 
ArrayList<>()).add(resultSet.getString("TABLE_NAME"));
+                }
+            }
+        } catch (final SQLException ex) {
+            log.error("Get schema name map error", ex);
+            throw new AddMigrationSourceResourceException(ex.getMessage());
+        }
+        return result;
+    }

Review Comment:
   Could we put it in SchemaTableUtil?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to