This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new ba19487  Optimize scaling api (#9574)
ba19487 is described below

commit ba19487a818e204131c3f2602c55699ba6fcca10
Author: 邱鹿 Lucas <[email protected]>
AuthorDate: Wed Mar 3 14:42:33 2021 +0800

    Optimize scaling api (#9574)
    
    Co-authored-by: qiulu3 <Lucas209910>
---
 .../ral/impl/ShowScalingJobStatusBackendHandler.java        |  6 ++++--
 .../shardingsphere/scaling/core/api/ScalingAPIFactory.java  | 13 +++++++++++++
 .../scaling/core/api/impl/ScalingAPIImpl.java               |  5 +++--
 .../apache/shardingsphere/scaling/core/job/ScalingJob.java  |  1 +
 .../scaling/core/job/progress/JobProgress.java              | 13 +++++++++++--
 5 files changed, 32 insertions(+), 6 deletions(-)

diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/impl/ShowScalingJobStatusBackendHandler.java
 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/impl/ShowScalingJobStatusBackendHandler.java
index 119d225..4067992 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/impl/ShowScalingJobStatusBackendHandler.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/impl/ShowScalingJobStatusBackendHandler.java
@@ -55,9 +55,10 @@ public final class ShowScalingJobStatusBackendHandler 
implements TextProtocolBac
     private List<QueryHeader> getQueryHeader() {
         List<QueryHeader> result = Lists.newArrayList();
         result.add(new QueryHeader("", "", "item", "", Types.BIGINT, "BIGINT", 
255, 0, false, false, false, false));
+        result.add(new QueryHeader("", "", "data_source", "", Types.CHAR, 
"CHAR", 255, 0, false, false, false, false));
         result.add(new QueryHeader("", "", "status", "", Types.CHAR, "CHAR", 
255, 0, false, false, false, false));
         result.add(new QueryHeader("", "", "inventory_finished_percentage", 
"", Types.TINYINT, "TINYINT", 255, 0, false, false, false, false));
-        result.add(new QueryHeader("", "", 
"incremental_average_delay_milliseconds", "", Types.BIGINT, "BIGINT", 255, 0, 
false, false, false, false));
+        result.add(new QueryHeader("", "", "incremental_delay_milliseconds", 
"", Types.BIGINT, "BIGINT", 255, 0, false, false, false, false));
         return result;
     }
     
@@ -74,8 +75,9 @@ public final class ShowScalingJobStatusBackendHandler 
implements TextProtocolBac
                     map.put("item", entry.getKey());
                     if (null != entry.getValue()) {
                         map.put("status", entry.getValue().getStatus());
+                        map.put("data_source", 
entry.getValue().getDataSource());
                         map.put("inventory_finished_percentage", 
entry.getValue().getInventoryFinishedPercentage());
-                        map.put("incremental_average_delay_milliseconds", 
entry.getValue().getIncrementalAverageDelayMilliseconds());
+                        map.put("incremental_delay_milliseconds", 
entry.getValue().getIncrementalDelayMilliseconds());
                     }
                     return map;
                 })
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingAPIFactory.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingAPIFactory.java
index d6c7ebf..e477d0e 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingAPIFactory.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingAPIFactory.java
@@ -24,6 +24,7 @@ import lombok.Getter;
 import lombok.NoArgsConstructor;
 import org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobAPIFactory;
 import 
org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobConfigurationAPI;
+import org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobOperateAPI;
 import 
org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobStatisticsAPI;
 import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
 import 
org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
@@ -85,6 +86,15 @@ public final class ScalingAPIFactory {
     }
     
     /**
+     * Get job operate API.
+     *
+     * @return job operate API
+     */
+    public static JobOperateAPI getJobOperateAPI() {
+        return ElasticJobAPIHolder.getInstance().getJobOperateAPI();
+    }
+    
+    /**
      * Get registry center.
      *
      * @return Coordinator registry center
@@ -155,12 +165,15 @@ public final class ScalingAPIFactory {
         
         private final JobConfigurationAPI jobConfigurationAPI;
         
+        private final JobOperateAPI jobOperateAPI;
+        
         private ElasticJobAPIHolder() {
             checkServerConfig();
             GovernanceConfiguration governanceConfig = 
ScalingContext.getInstance().getServerConfig().getGovernanceConfig();
             String namespace = governanceConfig.getName() + 
ScalingConstant.SCALING_ROOT;
             jobStatisticsAPI = 
JobAPIFactory.createJobStatisticsAPI(governanceConfig.getRegistryCenterConfiguration().getServerLists(),
 namespace, null);
             jobConfigurationAPI = 
JobAPIFactory.createJobConfigurationAPI(governanceConfig.getRegistryCenterConfiguration().getServerLists(),
 namespace, null);
+            jobOperateAPI = 
JobAPIFactory.createJobOperateAPI(governanceConfig.getRegistryCenterConfiguration().getServerLists(),
 namespace, null);
         }
         
         public static ElasticJobAPIHolder getInstance() {
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java
index 8c3ff12..355d11a 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java
@@ -38,7 +38,7 @@ import 
org.apache.shardingsphere.scaling.core.util.JobConfigurationUtil;
 import java.sql.SQLException;
 import java.time.LocalDateTime;
 import java.time.format.DateTimeFormatter;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -113,13 +113,14 @@ public final class ScalingAPIImpl implements ScalingAPI {
     @Override
     public void remove(final long jobId) {
         log.info("Remove scaling job {}", jobId);
+        ScalingAPIFactory.getJobOperateAPI().remove(String.valueOf(jobId), 
null);
         ScalingAPIFactory.getRegistryRepositoryAPI().deleteJob(jobId);
     }
     
     @Override
     public Map<Integer, JobProgress> getProgress(final long jobId) {
         return IntStream.range(0, 
getJobConfig(jobId).getHandleConfig().getShardingTotalCount()).boxed()
-                .collect(HashMap::new, (map, each) -> map.put(each, 
ScalingAPIFactory.getRegistryRepositoryAPI().getJobProgress(jobId, each)), 
HashMap::putAll);
+                .collect(LinkedHashMap::new, (map, each) -> map.put(each, 
ScalingAPIFactory.getRegistryRepositoryAPI().getJobProgress(jobId, each)), 
LinkedHashMap::putAll);
     }
     
     @Override
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ScalingJob.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ScalingJob.java
index fd62c0f..661ef20 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ScalingJob.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ScalingJob.java
@@ -45,6 +45,7 @@ public final class ScalingJob implements SimpleJob {
         JobContext jobContext = new JobContext(jobConfig);
         
jobContext.setInitProgress(registryRepositoryAPI.getJobProgress(jobContext.getJobId(),
 jobContext.getShardingItem()));
         jobPreparer.prepare(jobContext);
+        registryRepositoryAPI.persistJobProgress(jobContext);
         JobSchedulerCenter.start(jobContext);
     }
 }
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/progress/JobProgress.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/progress/JobProgress.java
index 8664875..2844ac0 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/progress/JobProgress.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/progress/JobProgress.java
@@ -91,6 +91,15 @@ public final class JobProgress {
     }
     
     /**
+     * Get data source.
+     *
+     * @return data source
+     */
+    public String getDataSource() {
+        return 
incrementalTaskProgressMap.keySet().stream().findAny().orElse("");
+    }
+    
+    /**
      * Get inventory finished percentage.
      *
      * @return finished percentage
@@ -103,11 +112,11 @@ public final class JobProgress {
     }
     
     /**
-     * Get incremental average delay milliseconds.
+     * Get incremental delay milliseconds.
      *
      * @return average delay
      */
-    public long getIncrementalAverageDelayMilliseconds() {
+    public long getIncrementalDelayMilliseconds() {
         List<Long> delays = incrementalTaskProgressMap.values().stream()
                 .map(each -> 
each.getIncrementalTaskDelay().getDelayMilliseconds())
                 .collect(Collectors.toList());

Reply via email to