sandynz commented on code in PR #22013:
URL: https://github.com/apache/shardingsphere/pull/22013#discussion_r1016478579
##########
test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java:
##########
@@ -86,20 +91,27 @@ public void assertMigrationSuccess() throws SQLException,
InterruptedException {
createTargetOrderTableRule();
createTargetOrderTableEncryptRule();
createTargetOrderItemTableRule();
- AutoIncrementKeyGenerateAlgorithm keyGenerateAlgorithm = new
AutoIncrementKeyGenerateAlgorithm();
+ AutoIncrementKeyGenerateAlgorithm orderKeyGenerate = new
AutoIncrementKeyGenerateAlgorithm();
+ AutoIncrementKeyGenerateAlgorithm orderItemKeyGenerate = new
AutoIncrementKeyGenerateAlgorithm();
JdbcTemplate jdbcTemplate = new JdbcTemplate(getSourceDataSource());
- Pair<List<Object[]>, List<Object[]>> dataPair =
ScalingCaseHelper.generateFullInsertData(keyGenerateAlgorithm,
parameterized.getDatabaseType(), 3000);
+ Pair<List<Object[]>, List<Object[]>> dataPair =
ScalingCaseHelper.generateFullInsertData(orderKeyGenerate,
orderItemKeyGenerate, parameterized.getDatabaseType(), 3000);
log.info("init data begin: {}", LocalDateTime.now());
jdbcTemplate.batchUpdate(getExtraSQLCommand().getFullInsertOrder(getSourceTableOrderName()),
dataPair.getLeft());
jdbcTemplate.batchUpdate(getExtraSQLCommand().getFullInsertOrderItem(),
dataPair.getRight());
log.info("init data end: {}", LocalDateTime.now());
startMigration(getSourceTableOrderName(), getTargetTableOrderName());
startMigration("t_order_item", "t_order_item");
- // TODO wait preparation done (binlog position got), else
insert/update/delete might be consumed in inventory task
- startIncrementTask(new MySQLIncrementTask(jdbcTemplate,
getSourceTableOrderName(), keyGenerateAlgorithm, 30));
String orderJobId = getJobIdByTableName(getSourceTableOrderName());
- String orderItemJobId = getJobIdByTableName("t_order_item");
+ for (int i = 0; i < 5; i++) {
+ List<Map<String, Object>> jobStatus =
waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'",
orderJobId));
+ Set<String> statusSet = jobStatus.stream().map(each ->
String.valueOf(each.get("status"))).collect(Collectors.toSet());
+ if (statusSet.contains(JobStatus.PREPARING.name())) {
+ ThreadUtil.sleep(2, TimeUnit.SECONDS);
+ }
+ }
Review Comment:
1, Seems job status might be RUNNING or empty, sleep might not take effect.
2, Could we extract this code block to super class for common usage?
##########
test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/helper/ScalingCaseHelper.java:
##########
@@ -53,19 +53,21 @@ public static long generateSnowflakeKey() {
* Generate MySQL insert data, contains full fields.
*
* @param orderIdGenerate order id generate algorithm
+ * @param orderItemIdGenerate order item id generate algorithm
* @param databaseType database type
* @param insertRows insert rows
* @return insert data list
*/
- public static Pair<List<Object[]>, List<Object[]>>
generateFullInsertData(final AutoIncrementKeyGenerateAlgorithm orderIdGenerate,
final DatabaseType databaseType, final int insertRows) {
+ public static Pair<List<Object[]>, List<Object[]>>
generateFullInsertData(final AutoIncrementKeyGenerateAlgorithm orderIdGenerate,
final AutoIncrementKeyGenerateAlgorithm orderItemIdGenerate,
+
final DatabaseType databaseType, final int insertRows) {
Review Comment:
Could we keep one generateAlgorithm parameter, or just remove
generateAlgorithm parameter?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]