This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 91224b81d67 Improve pipeline barrier register time (#21777)
91224b81d67 is described below
commit 91224b81d676906574bf3380037110d9a9d1b483
Author: Xinze Guo <[email protected]>
AuthorDate: Thu Oct 27 10:00:25 2022 +0800
Improve pipeline barrier register time (#21777)
---
.../data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
index ff09f119145..f2ca78eb99e 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
@@ -124,9 +124,9 @@ public abstract class AbstractPipelineJobAPIImpl implements
PipelineJobAPI {
jobConfigPOJO.setDisabled(false);
jobConfigPOJO.getProps().remove("stop_time");
jobConfigPOJO.getProps().remove("stop_time_millis");
-
PipelineAPIFactory.getJobConfigurationAPI().updateJobConfiguration(jobConfigPOJO);
String barrierPath =
PipelineMetaDataNode.getJobBarrierEnablePath(jobId);
pipelineDistributedBarrier.register(barrierPath,
jobConfigPOJO.getShardingTotalCount());
+
PipelineAPIFactory.getJobConfigurationAPI().updateJobConfiguration(jobConfigPOJO);
pipelineDistributedBarrier.await(barrierPath, 5, TimeUnit.SECONDS);
}
@@ -138,9 +138,9 @@ public abstract class AbstractPipelineJobAPIImpl implements
PipelineJobAPI {
jobConfigPOJO.setDisabled(true);
jobConfigPOJO.getProps().setProperty("stop_time",
LocalDateTime.now().format(DATE_TIME_FORMATTER));
jobConfigPOJO.getProps().setProperty("stop_time_millis",
System.currentTimeMillis() + "");
-
PipelineAPIFactory.getJobConfigurationAPI().updateJobConfiguration(jobConfigPOJO);
String barrierPath =
PipelineMetaDataNode.getJobBarrierDisablePath(jobId);
pipelineDistributedBarrier.register(barrierPath,
jobConfigPOJO.getShardingTotalCount());
+
PipelineAPIFactory.getJobConfigurationAPI().updateJobConfiguration(jobConfigPOJO);
pipelineDistributedBarrier.await(barrierPath, 5, TimeUnit.SECONDS);
}