azexcy commented on code in PR #20222:
URL: https://github.com/apache/shardingsphere/pull/20222#discussion_r946770123


##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java:
##########
@@ -85,23 +87,31 @@ private String convertJobConfigurationToText(final 
PipelineJobConfiguration jobC
     @Override
     public void startDisabledJob(final String jobId) {
         log.info("Start disabled pipeline job {}", jobId);
+        
MigrationDistributedCountDownLatch.getInstance().removeParentNode(PipelineMetaDataNode.getScalingJobBarrierDisablePath(jobId));
         JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
         if (!jobConfigPOJO.isDisabled()) {
             throw new PipelineVerifyFailedException("Job is already started.");
         }
         jobConfigPOJO.setDisabled(false);
         jobConfigPOJO.getProps().remove("stop_time");
         
PipelineAPIFactory.getJobConfigurationAPI().updateJobConfiguration(jobConfigPOJO);
+        String barrierPath = 
PipelineMetaDataNode.getScalingJobBarrierEnablePath(jobId);
+        MigrationDistributedCountDownLatch.getInstance().register(barrierPath, 
jobConfigPOJO.getShardingTotalCount());
+        MigrationDistributedCountDownLatch.getInstance().await(barrierPath, 
10, TimeUnit.SECONDS);

Review Comment:
   Not neet to clean up when timeout, the cleanup method at `removeParentNode` 
will be execute at the opposite method



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to