sandynz commented on code in PR #20316:
URL: https://github.com/apache/shardingsphere/pull/20316#discussion_r950807785


##########
shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/MigrateTableUpdater.java:
##########
@@ -17,22 +17,43 @@
 
 package org.apache.shardingsphere.migration.distsql.handler.update;
 
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.ObjectUtils;
 import org.apache.shardingsphere.data.pipeline.api.MigrationJobPublicAPI;
 import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
+import 
org.apache.shardingsphere.data.pipeline.api.pojo.CreateMigrationJobParameter;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
+import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCreationException;
 import org.apache.shardingsphere.infra.distsql.update.RALUpdater;
+import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import 
org.apache.shardingsphere.migration.distsql.statement.MigrateTableStatement;
+import org.apache.shardingsphere.sharding.api.config.ShardingRuleConfiguration;
+import 
org.apache.shardingsphere.sharding.yaml.swapper.YamlShardingRuleConfigurationSwapper;
+
+import java.util.Objects;
 
 /**
  * Migrate table updater.
  */
+@Slf4j
 public final class MigrateTableUpdater implements 
RALUpdater<MigrateTableStatement> {
     
     private static final MigrationJobPublicAPI JOB_API = 
PipelineJobPublicAPIFactory.getMigrationJobPublicAPI();
     
+    private static final YamlShardingRuleConfigurationSwapper 
SHARDING_RULE_CONFIG_SWAPPER = new YamlShardingRuleConfigurationSwapper();
+    
     @Override
     public void executeUpdate(final String databaseName, final 
MigrateTableStatement sqlStatement) {
-        // TODO implement migrate table
-        JOB_API.getType();
+        log.info("start migrate job by {}", sqlStatement);
+        ShardingSphereDatabase targetDatabase = 
PipelineContext.getContextManager().getMetaDataContexts().getMetaData().getDatabase(sqlStatement.getTargetDatabaseName());
+        ShardingRuleConfiguration targetShardingRule = 
targetDatabase.getRuleMetaData().getConfigurations().stream().filter(each -> 
each instanceof ShardingRuleConfiguration)
+                .map(each -> (ShardingRuleConfiguration) 
each).findFirst().orElseThrow(() -> new PipelineJobCreationException("No 
sharding rule found."));
+        targetShardingRule.getTables().removeIf(each -> 
!Objects.equals(each.getLogicTable(), sqlStatement.getTargetTableName()));
+        targetShardingRule.getAutoTables().removeIf(each -> 
!Objects.equals(each.getLogicTable(), sqlStatement.getTargetTableName()));

Review Comment:
   `targetShardingRule` is from current context metadata, some tables and 
autoTables will be removed.
   It's better to clone another ShardingRuleConfiguration and change it.



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java:
##########
@@ -138,24 +135,25 @@ private void prepareAndCheckTarget(final 
MigrationJobItemContext jobItemContext)
     private void prepareTarget(final MigrationJobItemContext jobItemContext) 
throws SQLException {
         MigrationJobConfiguration jobConfig = jobItemContext.getJobConfig();
         TableNameSchemaNameMapping tableNameSchemaNameMapping = 
jobItemContext.getTaskConfig().getDumperConfig().getTableNameSchemaNameMapping();
+        // TODO fill schema
         String targetDatabaseType = jobConfig.getTargetDatabaseType();

Review Comment:
   Is `TODO fill schema` done?



##########
shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/MySQLMigrationGeneralIT.java:
##########
@@ -67,35 +66,39 @@ public static Collection<ScalingParameterized> 
getParameters() {
     }
     
     @Test
-    public void assertManualScalingSuccess() throws InterruptedException {
-        addSourceResource();
-        initShardingAlgorithm();
-        assertTrue(waitShardingAlgorithmEffect(15));
+    public void assertMigrationSuccess() {
         createScalingRule();
-        createOrderTableRule();
-        createOrderItemTableRule();
-        createNoUseTable();
-        createOrderTable();
-        createOrderItemTable();
+        createSourceOrderTable();
+        createSourceOrderItemTable();
+        addSourceResource();
+        addTargetResource();
+        createTargetOrderTableRule();
+        createTargetOrderItemTableRule();
         SnowflakeKeyGenerateAlgorithm keyGenerateAlgorithm = new 
SnowflakeKeyGenerateAlgorithm();
-        for (int i = 0; i < TABLE_INIT_ROW_COUNT / 1000; i++) {
-            Pair<List<Object[]>, List<Object[]>> dataPair = 
ScalingCaseHelper.generateFullInsertData(keyGenerateAlgorithm, 
parameterized.getDatabaseType(), 1000);
-            
getJdbcTemplate().batchUpdate(getExtraSQLCommand().getFullInsertOrder(), 
dataPair.getLeft());
-            
getJdbcTemplate().batchUpdate(getExtraSQLCommand().getFullInsertOrderItem(), 
dataPair.getRight());
+        JdbcTemplate jdbcTemplate = new JdbcTemplate(getSourceDataSource());
+        for (int i = 0; i < TABLE_INIT_ROW_COUNT / 10; i++) {

Review Comment:
   `TABLE_INIT_ROW_COUNT / 10`? Looks need to recover as `1000`



##########
shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java:
##########
@@ -131,12 +133,12 @@ public BaseITCase(final ScalingParameterized 
parameterized) {
         if (ENV.getItEnvType() == ScalingITEnvTypeEnum.NATIVE) {
             cleanUpDataSource();
         }
-        commonSQLCommand = 
JAXB.unmarshal(Objects.requireNonNull(BaseITCase.class.getClassLoader().getResource("env/common/command.xml")),
 CommonSQLCommand.class);
-        scalingWatcher = new ScalingWatcher(composedContainer, jdbcTemplate);
+        migrationDistSQLCommand = 
JAXB.unmarshal(Objects.requireNonNull(BaseITCase.class.getClassLoader().getResource("env/common/command.xml")),
 MigrationDistSQLCommand.class);
+        scalingWatcher = new ScalingWatcher(composedContainer);
     }
     
     private void cleanUpDataSource() {
-        for (String each : Arrays.asList(DS_0, DS_1, DS_2, DS_3, DS_4)) {
+        for (String each : Arrays.asList(DS_0, DS_2, DS_3, DS_4)) {

Review Comment:
   Looks `DS_1` is not needed any more, we could remove it



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java:
##########
@@ -188,6 +189,9 @@ public boolean equals(final @NonNull Object o) {
                     if (thisResult instanceof SQLXML && thatResult instanceof 
SQLXML) {
                         return ((SQLXML) 
thisResult).getString().equals(((SQLXML) thatResult).getString());
                     }
+                    if (thisResult instanceof Integer && thatResult instanceof 
Long) {
+                        return Objects.equals(Integer.toString((Integer) 
thisResult), Long.toString((Long) thatResult));
+                    }

Review Comment:
   1, Add TODO here;
   
   2, Could we compare them by number?
   



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java:
##########
@@ -428,6 +464,52 @@ public void dropMigrationSourceResources(final 
Collection<String> resourceNames)
         dataSourcePersistService.persist(getJobType(), metaDataDataSource);
     }
     
+    @Override
+    public void createJobAndStart(final CreateMigrationJobParameter parameter) 
{
+        YamlMigrationJobConfiguration result = new 
YamlMigrationJobConfiguration();

Review Comment:
   Variable name `result` could be `yamlJobConfig`, or else extract another 
method to create and return YamlMigrationJobConfiguration.



##########
shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/MigrateTableUpdater.java:
##########
@@ -17,22 +17,43 @@
 
 package org.apache.shardingsphere.migration.distsql.handler.update;
 
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.ObjectUtils;
 import org.apache.shardingsphere.data.pipeline.api.MigrationJobPublicAPI;
 import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
+import 
org.apache.shardingsphere.data.pipeline.api.pojo.CreateMigrationJobParameter;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
+import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCreationException;
 import org.apache.shardingsphere.infra.distsql.update.RALUpdater;
+import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import 
org.apache.shardingsphere.migration.distsql.statement.MigrateTableStatement;
+import org.apache.shardingsphere.sharding.api.config.ShardingRuleConfiguration;
+import 
org.apache.shardingsphere.sharding.yaml.swapper.YamlShardingRuleConfigurationSwapper;
+
+import java.util.Objects;
 
 /**
  * Migrate table updater.
  */
+@Slf4j
 public final class MigrateTableUpdater implements 
RALUpdater<MigrateTableStatement> {
     
     private static final MigrationJobPublicAPI JOB_API = 
PipelineJobPublicAPIFactory.getMigrationJobPublicAPI();
     
+    private static final YamlShardingRuleConfigurationSwapper 
SHARDING_RULE_CONFIG_SWAPPER = new YamlShardingRuleConfigurationSwapper();
+    
     @Override
     public void executeUpdate(final String databaseName, final 
MigrateTableStatement sqlStatement) {
-        // TODO implement migrate table
-        JOB_API.getType();
+        log.info("start migrate job by {}", sqlStatement);
+        ShardingSphereDatabase targetDatabase = 
PipelineContext.getContextManager().getMetaDataContexts().getMetaData().getDatabase(sqlStatement.getTargetDatabaseName());
+        ShardingRuleConfiguration targetShardingRule = 
targetDatabase.getRuleMetaData().getConfigurations().stream().filter(each -> 
each instanceof ShardingRuleConfiguration)
+                .map(each -> (ShardingRuleConfiguration) 
each).findFirst().orElseThrow(() -> new PipelineJobCreationException("No 
sharding rule found."));
+        targetShardingRule.getTables().removeIf(each -> 
!Objects.equals(each.getLogicTable(), sqlStatement.getTargetTableName()));
+        targetShardingRule.getAutoTables().removeIf(each -> 
!Objects.equals(each.getLogicTable(), sqlStatement.getTargetTableName()));
+        CreateMigrationJobParameter createMigrationJobParameter = new 
CreateMigrationJobParameter(sqlStatement.getSourceDatabaseName(), 
sqlStatement.getSourceTableName(),
+                
ObjectUtils.defaultIfNull(sqlStatement.getTargetDatabaseName(), 
sqlStatement.getSourceDatabaseName()), sqlStatement.getTargetTableName(),
+                
SHARDING_RULE_CONFIG_SWAPPER.swapToYamlConfiguration(targetShardingRule), 
targetDatabase.getResource().getDataSources());

Review Comment:
   Looks the second parameter of 
`ObjectUtils.defaultIfNull(sqlStatement.getTargetDatabaseName(), 
sqlStatement.getSourceDatabaseName())` should be `databaseName` from method 
parameter.
   It means if use doesn't define target database, then use current selected 
one.



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/CreateMigrationJobParameter.java:
##########
@@ -15,18 +15,28 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.spi.rulealtered;
+package org.apache.shardingsphere.data.pipeline.api.pojo;
 
-import 
org.apache.shardingsphere.data.pipeline.spi.fixture.RuleAlteredJobConfigurationPreparerFixture;
-import org.junit.Test;
+import lombok.Data;
+import lombok.RequiredArgsConstructor;
+import 
org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlRuleConfiguration;
 
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.junit.Assert.assertThat;
+import javax.sql.DataSource;
+import java.util.Map;
 
-public final class RuleAlteredJobConfigurationPreparerFactoryTest {
+@Data
+@RequiredArgsConstructor
+public final class CreateMigrationJobParameter {
     
-    @Test
-    public void assertGetInstance() {
-        assertThat(RuleAlteredJobConfigurationPreparerFactory.getInstance(), 
instanceOf(RuleAlteredJobConfigurationPreparerFixture.class));
-    }
+    private final String sourceDataSourceName;

Review Comment:
   Could we rename `sourceDataSourceName` to `sourceResourceName`? Keep it 
consistent with `ADD RESOURCE`



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java:
##########
@@ -428,6 +464,52 @@ public void dropMigrationSourceResources(final 
Collection<String> resourceNames)
         dataSourcePersistService.persist(getJobType(), metaDataDataSource);
     }
     
+    @Override
+    public void createJobAndStart(final CreateMigrationJobParameter parameter) 
{
+        YamlMigrationJobConfiguration result = new 
YamlMigrationJobConfiguration();
+        Map<String, DataSourceProperties> metaDataDataSource = 
dataSourcePersistService.load(JobType.MIGRATION);
+        Map<String, Object> sourceDataSourceProps = 
DATA_SOURCE_CONFIG_SWAPPER.swapToMap(metaDataDataSource.get(parameter.getSourceDataSourceName()));
+        YamlPipelineDataSourceConfiguration 
sourcePipelineDataSourceConfiguration = 
createYamlPipelineDataSourceConfiguration(StandardPipelineDataSourceConfiguration.TYPE,
+                YamlEngine.marshal(sourceDataSourceProps));
+        result.setSource(sourcePipelineDataSourceConfiguration);
+        result.setSourceDatabaseType(new 
StandardPipelineDataSourceConfiguration(sourceDataSourceProps).getDatabaseType().getType());
+        result.setSourceDataSourceName(parameter.getSourceDataSourceName());
+        result.setSourceTableName(parameter.getSourceTableName());
+        Map<String, Map<String, Object>> targetDataSourceProperties = new 
HashMap<>();
+        for (Entry<String, DataSource> entry : 
parameter.getTargetDataSources().entrySet()) {
+            Map<String, Object> dataSourceProps = 
DATA_SOURCE_CONFIG_SWAPPER.swapToMap(DataSourcePropertiesCreator.create(entry.getValue()));
+            targetDataSourceProperties.put(entry.getKey(), dataSourceProps);
+        }
+        String targetDatabaseName = parameter.getTargetDatabaseName();
+        YamlRootConfiguration targetRootConfig = 
getYamlRootConfiguration(targetDatabaseName, targetDataSourceProperties, 
parameter.getTargetShardingRuleConfig());
+        PipelineDataSourceConfiguration targetPipelineDataSource = new 
ShardingSpherePipelineDataSourceConfiguration(targetRootConfig);
+        
result.setTarget(createYamlPipelineDataSourceConfiguration(targetPipelineDataSource.getType(),
 YamlEngine.marshal(targetPipelineDataSource.getDataSourceConfiguration())));
+        
result.setTargetDatabaseType(targetPipelineDataSource.getDatabaseType().getType());
+        result.setTargetDatabaseName(targetDatabaseName);
+        result.setTargetTableName(parameter.getTargetTableName());
+        
result.setSchemaTablesMap(SchemaTableUtil.getSchemaTablesMapFromActual(PIPELINE_DATA_SOURCE_CONFIG_SWAPPER.swapToObject(sourcePipelineDataSourceConfiguration),
+                parameter.getSourceTableName()));
+        extendYamlJobConfiguration(result);
+        MigrationJobConfiguration jobConfiguration = new 
YamlMigrationJobConfigurationSwapper().swapToObject(result);
+        start(jobConfiguration);
+    }
+    
+    private YamlRootConfiguration getYamlRootConfiguration(final String 
databaseName, final Map<String, Map<String, Object>> yamlDataSources, final 
YamlRuleConfiguration shardingRule) {
+        YamlRootConfiguration result = new YamlRootConfiguration();
+        result.setDatabaseName(databaseName);
+        result.setDataSources(yamlDataSources);
+        Collection<YamlRuleConfiguration> yamlRuleConfigs = 
Collections.singletonList(shardingRule);
+        result.setRules(yamlRuleConfigs);

Review Comment:
   ```
           Collection<YamlRuleConfiguration> yamlRuleConfigs = 
Collections.singletonList(shardingRule);
           result.setRules(yamlRuleConfigs);
   ```
   It will lose other rules, e.g. encrypt rule, so users could not do 
encryption on migration at the same time.
   



##########
shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/MySQLMigrationGeneralIT.java:
##########
@@ -67,35 +66,39 @@ public static Collection<ScalingParameterized> 
getParameters() {
     }
     
     @Test
-    public void assertManualScalingSuccess() throws InterruptedException {
-        addSourceResource();
-        initShardingAlgorithm();
-        assertTrue(waitShardingAlgorithmEffect(15));
+    public void assertMigrationSuccess() {
         createScalingRule();
-        createOrderTableRule();
-        createOrderItemTableRule();
-        createNoUseTable();
-        createOrderTable();
-        createOrderItemTable();
+        createSourceOrderTable();
+        createSourceOrderItemTable();
+        addSourceResource();
+        addTargetResource();
+        createTargetOrderTableRule();
+        createTargetOrderItemTableRule();
         SnowflakeKeyGenerateAlgorithm keyGenerateAlgorithm = new 
SnowflakeKeyGenerateAlgorithm();
-        for (int i = 0; i < TABLE_INIT_ROW_COUNT / 1000; i++) {
-            Pair<List<Object[]>, List<Object[]>> dataPair = 
ScalingCaseHelper.generateFullInsertData(keyGenerateAlgorithm, 
parameterized.getDatabaseType(), 1000);
-            
getJdbcTemplate().batchUpdate(getExtraSQLCommand().getFullInsertOrder(), 
dataPair.getLeft());
-            
getJdbcTemplate().batchUpdate(getExtraSQLCommand().getFullInsertOrderItem(), 
dataPair.getRight());
+        JdbcTemplate jdbcTemplate = new JdbcTemplate(getSourceDataSource());
+        for (int i = 0; i < TABLE_INIT_ROW_COUNT / 10; i++) {
+            Pair<List<Object[]>, List<Object[]>> dataPair = 
ScalingCaseHelper.generateFullInsertData(keyGenerateAlgorithm, 
parameterized.getDatabaseType(), 10);
+            
jdbcTemplate.batchUpdate(getExtraSQLCommand().getFullInsertOrder(), 
dataPair.getLeft());
+            
jdbcTemplate.batchUpdate(getExtraSQLCommand().getFullInsertOrderItem(), 
dataPair.getRight());
         }
-        addTargetResource();
-        startIncrementTask(new MySQLIncrementTask(getJdbcTemplate(), 
keyGenerateAlgorithm, true, 20));
-        
executeWithLog(getCommonSQLCommand().getAlterOrderWithItemAutoTableRule());
-        String jobId = getScalingJobId();
-        waitScalingFinished(jobId);
-        stopScaling(jobId);
-        getJdbcTemplate().update("INSERT INTO t_order 
(id,order_id,user_id,status,t_json) VALUES (?, ?, ?, ?, ?)", 
keyGenerateAlgorithm.generateKey(), keyGenerateAlgorithm.generateKey(),
-                1, "afterStopScaling", "{}");
-        startScaling(jobId);
+        startMigrationOrderItem();
+        checkOrderMigration(keyGenerateAlgorithm, jdbcTemplate);
+        checkOrderItemMigration();
+    }
+    
+    private void checkOrderMigration(final KeyGenerateAlgorithm 
keyGenerateAlgorithm, final JdbcTemplate jdbcTemplate) {
+        startMigrationOrder();
+        startIncrementTask(new MySQLIncrementTask(jdbcTemplate, 
keyGenerateAlgorithm, true, 20));
+        String jobId = getJobIdByTableName("t_order");
+        waitMigrationFinished(jobId);
         assertCheckScalingSuccess(jobId);
         assertGreaterThanInitTableInitRows(TABLE_INIT_ROW_COUNT, "");
-        applyScaling(jobId);
-        assertPreviewTableSuccess("t_order", Arrays.asList("ds_2", "ds_3", 
"ds_4"));
-        assertPreviewTableSuccess("t_order_item", Arrays.asList("ds_2", 
"ds_3", "ds_4"));
+    }
+    
+    private void checkOrderItemMigration() {
+        startMigrationOrderItem();
+        String jobId = getJobIdByTableName("t_order");
+        waitMigrationFinished(jobId);

Review Comment:
   Looks `getJobIdByTableName("t_order")` should be 
`getJobIdByTableName("t_order_item")`



##########
shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/PostgreSQLMigrationGeneralIT.java:
##########
@@ -70,36 +68,39 @@ public static Collection<ScalingParameterized> 
getParameters() {
     }
     
     @Test
-    public void assertManualScalingSuccess() throws InterruptedException {
-        addSourceResource();
-        initShardingAlgorithm();
-        assertTrue(waitShardingAlgorithmEffect(15));
+    public void assertMigrationSuccess() {
         createScalingRule();
-        createSchema("test");
-        createOrderTableRule();
-        createOrderItemTableRule();
-        createOrderTable();
-        createOrderItemTable();
-        // TODO wait kernel support create index if not exists
-        // createTableIndexList("test");
-        executeWithLog("COMMENT ON COLUMN test.t_order.user_id IS 'user id';");
+        createSourceSchema("test");
+        createSourceOrderTable();
+        createSourceOrderItemTable();
+        createSourceTableIndexList("test");
+        createSourceCommentOnList("test");
+        addSourceResource();
+        addTargetResource();
+        createTargetOrderTableRule();
+        createTargetOrderItemTableRule();
         SnowflakeKeyGenerateAlgorithm keyGenerateAlgorithm = new 
SnowflakeKeyGenerateAlgorithm();
         Pair<List<Object[]>, List<Object[]>> dataPair = 
ScalingCaseHelper.generateFullInsertData(keyGenerateAlgorithm, 
parameterized.getDatabaseType(), TABLE_INIT_ROW_COUNT);
-        
getJdbcTemplate().batchUpdate(getExtraSQLCommand().getFullInsertOrder(), 
dataPair.getLeft());
-        
getJdbcTemplate().batchUpdate(getExtraSQLCommand().getFullInsertOrderItem(), 
dataPair.getRight());
-        addTargetResource();
-        startIncrementTask(new PostgreSQLIncrementTask(getJdbcTemplate(), new 
SnowflakeKeyGenerateAlgorithm(), "test", true, 20));
-        
executeWithLog(getCommonSQLCommand().getAlterOrderWithItemAutoTableRule());
-        String jobId = getScalingJobId();
-        waitScalingFinished(jobId);
-        stopScaling(jobId);
-        executeWithLog(String.format("INSERT INTO test.t_order 
(id,order_id,user_id,status) VALUES (%s, %s, %s, '%s')", 
keyGenerateAlgorithm.generateKey(), System.currentTimeMillis(),
-                1, "afterStopScaling"));
-        startScaling(jobId);
+        JdbcTemplate jdbcTemplate = new JdbcTemplate(getSourceDataSource());
+        jdbcTemplate.batchUpdate(getExtraSQLCommand().getFullInsertOrder(), 
dataPair.getLeft());
+        
jdbcTemplate.batchUpdate(getExtraSQLCommand().getFullInsertOrderItem(), 
dataPair.getRight());
+        checkOrderMigration(jdbcTemplate);
+        checkOrderItemMigration();
+    }
+    
+    private void checkOrderMigration(final JdbcTemplate jdbcTemplate) {
+        startMigrationOrder();
+        startIncrementTask(new PostgreSQLIncrementTask(jdbcTemplate, "test", 
false, 20));
+        String jobId = getJobIdByTableName("t_order");
+        waitMigrationFinished(jobId);
         assertCheckScalingSuccess(jobId);
         assertGreaterThanInitTableInitRows(TABLE_INIT_ROW_COUNT, "test");
-        applyScaling(jobId);
-        assertPreviewTableSuccess("t_order", Arrays.asList("ds_2", "ds_3", 
"ds_4"));
-        assertPreviewTableSuccess("t_order_item", Arrays.asList("ds_2", 
"ds_3", "ds_4"));
+    }
+    
+    private void checkOrderItemMigration() {
+        startMigrationOrderItem();
+        String jobId = getJobIdByTableName("t_order");
+        waitMigrationFinished(jobId);

Review Comment:
   Looks `getJobIdByTableName("t_order")` should be 
`getJobIdByTableName("t_order_item")`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to