sandynz commented on code in PR #20062:
URL: https://github.com/apache/shardingsphere/pull/20062#discussion_r942385098


##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java:
##########
@@ -90,27 +89,25 @@ private void prepareAndCheckTargetWithLock(final 
RuleAlteredJobContext jobContex
         String lockName = "prepare-" + jobConfig.getJobId();
         LockContext lockContext = 
PipelineContext.getContextManager().getInstanceContext().getLockContext();
         LockDefinition lockDefinition = new ExclusiveLockDefinition(lockName);
-        if (lockContext.tryLock(lockDefinition, 3000)) {
+        if (lockContext.tryLock(lockDefinition, 180000)) {
             log.info("try lock success, jobId={}, shardingItem={}", 
jobConfig.getJobId(), jobContext.getShardingItem());
             try {
-                prepareAndCheckTarget(jobContext);
+                JobProgress jobProgress = 
RuleAlteredJobAPIFactory.getInstance().getJobProgress(jobContext.getJobId(), 
jobContext.getShardingItem());
+                boolean prepareFlag = null == jobProgress || 
JobStatus.RUNNING.equals(jobProgress.getStatus()) || 
JobStatus.PREPARING_FAILURE.equals(jobProgress.getStatus());
+                if (prepareFlag) {
+                    log.info("execute prepare, jobId={}, shardingItem={}", 
jobConfig.getJobId(), jobContext.getShardingItem());
+                    jobContext.setStatus(JobStatus.PREPARING);
+                    
RuleAlteredJobAPIFactory.getInstance().persistJobProgress(jobContext);
+                    prepareAndCheckTarget(jobContext);
+                    jobContext.setStatus(JobStatus.PREPARE_SUCCESS);
+                    for (int i = 0; i <= 
jobContext.getJobConfig().getJobShardingCount(); i++) {
+                        
RuleAlteredJobAPIFactory.getInstance().persistJobProgress(jobContext);
+                    }

Review Comment:
   Seems `persistJobProgress(jobContext)` in for loop will cause all sharding 
item's progress will overwritten by current sharding item's progress, it might 
cause issue.
   
   Could we just update every sharding item progress's status? Try `void 
updateShardingJobStatus(String jobId, int shardingItem, JobStatus status);`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to