This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new ba19487 Optimize scaling api (#9574)
ba19487 is described below
commit ba19487a818e204131c3f2602c55699ba6fcca10
Author: 邱鹿 Lucas <[email protected]>
AuthorDate: Wed Mar 3 14:42:33 2021 +0800
Optimize scaling api (#9574)
Co-authored-by: qiulu3 <Lucas209910>
---
.../ral/impl/ShowScalingJobStatusBackendHandler.java | 6 ++++--
.../shardingsphere/scaling/core/api/ScalingAPIFactory.java | 13 +++++++++++++
.../scaling/core/api/impl/ScalingAPIImpl.java | 5 +++--
.../apache/shardingsphere/scaling/core/job/ScalingJob.java | 1 +
.../scaling/core/job/progress/JobProgress.java | 13 +++++++++++--
5 files changed, 32 insertions(+), 6 deletions(-)
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/impl/ShowScalingJobStatusBackendHandler.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/impl/ShowScalingJobStatusBackendHandler.java
index 119d225..4067992 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/impl/ShowScalingJobStatusBackendHandler.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/impl/ShowScalingJobStatusBackendHandler.java
@@ -55,9 +55,10 @@ public final class ShowScalingJobStatusBackendHandler
implements TextProtocolBac
private List<QueryHeader> getQueryHeader() {
List<QueryHeader> result = Lists.newArrayList();
result.add(new QueryHeader("", "", "item", "", Types.BIGINT, "BIGINT",
255, 0, false, false, false, false));
+ result.add(new QueryHeader("", "", "data_source", "", Types.CHAR,
"CHAR", 255, 0, false, false, false, false));
result.add(new QueryHeader("", "", "status", "", Types.CHAR, "CHAR",
255, 0, false, false, false, false));
result.add(new QueryHeader("", "", "inventory_finished_percentage",
"", Types.TINYINT, "TINYINT", 255, 0, false, false, false, false));
- result.add(new QueryHeader("", "",
"incremental_average_delay_milliseconds", "", Types.BIGINT, "BIGINT", 255, 0,
false, false, false, false));
+ result.add(new QueryHeader("", "", "incremental_delay_milliseconds",
"", Types.BIGINT, "BIGINT", 255, 0, false, false, false, false));
return result;
}
@@ -74,8 +75,9 @@ public final class ShowScalingJobStatusBackendHandler
implements TextProtocolBac
map.put("item", entry.getKey());
if (null != entry.getValue()) {
map.put("status", entry.getValue().getStatus());
+ map.put("data_source",
entry.getValue().getDataSource());
map.put("inventory_finished_percentage",
entry.getValue().getInventoryFinishedPercentage());
- map.put("incremental_average_delay_milliseconds",
entry.getValue().getIncrementalAverageDelayMilliseconds());
+ map.put("incremental_delay_milliseconds",
entry.getValue().getIncrementalDelayMilliseconds());
}
return map;
})
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingAPIFactory.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingAPIFactory.java
index d6c7ebf..e477d0e 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingAPIFactory.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingAPIFactory.java
@@ -24,6 +24,7 @@ import lombok.Getter;
import lombok.NoArgsConstructor;
import org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobAPIFactory;
import
org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobConfigurationAPI;
+import org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobOperateAPI;
import
org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobStatisticsAPI;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import
org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
@@ -85,6 +86,15 @@ public final class ScalingAPIFactory {
}
/**
+ * Get job operate API.
+ *
+ * @return job operate API
+ */
+ public static JobOperateAPI getJobOperateAPI() {
+ return ElasticJobAPIHolder.getInstance().getJobOperateAPI();
+ }
+
+ /**
* Get registry center.
*
* @return Coordinator registry center
@@ -155,12 +165,15 @@ public final class ScalingAPIFactory {
private final JobConfigurationAPI jobConfigurationAPI;
+ private final JobOperateAPI jobOperateAPI;
+
private ElasticJobAPIHolder() {
checkServerConfig();
GovernanceConfiguration governanceConfig =
ScalingContext.getInstance().getServerConfig().getGovernanceConfig();
String namespace = governanceConfig.getName() +
ScalingConstant.SCALING_ROOT;
jobStatisticsAPI =
JobAPIFactory.createJobStatisticsAPI(governanceConfig.getRegistryCenterConfiguration().getServerLists(),
namespace, null);
jobConfigurationAPI =
JobAPIFactory.createJobConfigurationAPI(governanceConfig.getRegistryCenterConfiguration().getServerLists(),
namespace, null);
+ jobOperateAPI =
JobAPIFactory.createJobOperateAPI(governanceConfig.getRegistryCenterConfiguration().getServerLists(),
namespace, null);
}
public static ElasticJobAPIHolder getInstance() {
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java
index 8c3ff12..355d11a 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java
@@ -38,7 +38,7 @@ import
org.apache.shardingsphere.scaling.core.util.JobConfigurationUtil;
import java.sql.SQLException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -113,13 +113,14 @@ public final class ScalingAPIImpl implements ScalingAPI {
@Override
public void remove(final long jobId) {
log.info("Remove scaling job {}", jobId);
+ ScalingAPIFactory.getJobOperateAPI().remove(String.valueOf(jobId),
null);
ScalingAPIFactory.getRegistryRepositoryAPI().deleteJob(jobId);
}
@Override
public Map<Integer, JobProgress> getProgress(final long jobId) {
return IntStream.range(0,
getJobConfig(jobId).getHandleConfig().getShardingTotalCount()).boxed()
- .collect(HashMap::new, (map, each) -> map.put(each,
ScalingAPIFactory.getRegistryRepositoryAPI().getJobProgress(jobId, each)),
HashMap::putAll);
+ .collect(LinkedHashMap::new, (map, each) -> map.put(each,
ScalingAPIFactory.getRegistryRepositoryAPI().getJobProgress(jobId, each)),
LinkedHashMap::putAll);
}
@Override
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ScalingJob.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ScalingJob.java
index fd62c0f..661ef20 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ScalingJob.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ScalingJob.java
@@ -45,6 +45,7 @@ public final class ScalingJob implements SimpleJob {
JobContext jobContext = new JobContext(jobConfig);
jobContext.setInitProgress(registryRepositoryAPI.getJobProgress(jobContext.getJobId(),
jobContext.getShardingItem()));
jobPreparer.prepare(jobContext);
+ registryRepositoryAPI.persistJobProgress(jobContext);
JobSchedulerCenter.start(jobContext);
}
}
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/progress/JobProgress.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/progress/JobProgress.java
index 8664875..2844ac0 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/progress/JobProgress.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/progress/JobProgress.java
@@ -91,6 +91,15 @@ public final class JobProgress {
}
/**
+ * Get data source.
+ *
+ * @return data source
+ */
+ public String getDataSource() {
+ return
incrementalTaskProgressMap.keySet().stream().findAny().orElse("");
+ }
+
+ /**
* Get inventory finished percentage.
*
* @return finished percentage
@@ -103,11 +112,11 @@ public final class JobProgress {
}
/**
- * Get incremental average delay milliseconds.
+ * Get incremental delay milliseconds.
*
* @return average delay
*/
- public long getIncrementalAverageDelayMilliseconds() {
+ public long getIncrementalDelayMilliseconds() {
List<Long> delays = incrementalTaskProgressMap.values().stream()
.map(each ->
each.getIncrementalTaskDelay().getDelayMilliseconds())
.collect(Collectors.toList());