This is an automated email from the ASF dual-hosted git repository.
sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new b76937ea73a Refactor PipelineJobManager (#29077)
b76937ea73a is described below
commit b76937ea73a467521d9916b9a605d0c521ac777f
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Nov 18 19:38:55 2023 +0800
Refactor PipelineJobManager (#29077)
---
.../data/pipeline/core/job/service/PipelineJobManager.java | 13 ++++---------
.../distsql/handler/query/ShowStreamingListExecutor.java | 2 +-
.../distsql/handler/query/ShowMigrationListExecutor.java | 2 +-
3 files changed, 6 insertions(+), 11 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
index 3eab31d08af..4ffeaafbee9 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
@@ -35,7 +35,6 @@ import
org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressSwapper;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
-import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.JobBriefInfo;
import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
@@ -47,7 +46,6 @@ import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
-import java.util.stream.Stream;
/**
* Pipeline job manager.
@@ -191,18 +189,15 @@ public final class PipelineJobManager {
* @param contextKey context key
* @return jobs info
*/
- public List<PipelineJobInfo> getPipelineJobInfos(final PipelineContextKey
contextKey) {
+ public List<PipelineJobInfo> getJobInfos(final PipelineContextKey
contextKey) {
if (jobAPI instanceof InventoryIncrementalJobAPI) {
- return getJobBriefInfos(contextKey, jobAPI.getType()).map(each ->
((InventoryIncrementalJobAPI)
jobAPI).getJobInfo(each.getJobName())).collect(Collectors.toList());
+ return
PipelineAPIFactory.getJobStatisticsAPI(contextKey).getAllJobsBriefInfo().stream()
+ .filter(each -> !each.getJobName().startsWith("_") &&
jobAPI.getType().equals(PipelineJobIdUtils.parseJobType(each.getJobName()).getType()))
+ .map(each -> ((InventoryIncrementalJobAPI)
jobAPI).getJobInfo(each.getJobName())).collect(Collectors.toList());
}
return Collections.emptyList();
}
- private Stream<JobBriefInfo> getJobBriefInfos(final PipelineContextKey
contextKey, final String jobType) {
- return
PipelineAPIFactory.getJobStatisticsAPI(contextKey).getAllJobsBriefInfo().stream().filter(each
-> !each.getJobName().startsWith("_"))
- .filter(each ->
jobType.equals(PipelineJobIdUtils.parseJobType(each.getJobName()).getType()));
- }
-
/**
* Get job item progress.
*
diff --git
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingListExecutor.java
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingListExecutor.java
index f6ca1ea172a..c5c64bb35ff 100644
---
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingListExecutor.java
+++
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingListExecutor.java
@@ -40,7 +40,7 @@ public final class ShowStreamingListExecutor implements
QueryableRALExecutor<Sho
@Override
public Collection<LocalDataQueryResultRow> getRows(final
ShowStreamingListStatement sqlStatement) {
- return pipelineJobManager.getPipelineJobInfos(new
PipelineContextKey(InstanceType.PROXY)).stream().map(each -> new
LocalDataQueryResultRow(each.getJobMetaData().getJobId(),
+ return pipelineJobManager.getJobInfos(new
PipelineContextKey(InstanceType.PROXY)).stream().map(each -> new
LocalDataQueryResultRow(each.getJobMetaData().getJobId(),
((TableBasedPipelineJobInfo) each).getDatabaseName(),
((TableBasedPipelineJobInfo) each).getTable(),
each.getJobMetaData().getJobItemCount(),
each.getJobMetaData().isActive() ? Boolean.TRUE.toString() :
Boolean.FALSE.toString(),
each.getJobMetaData().getCreateTime(),
Optional.ofNullable(each.getJobMetaData().getStopTime()).orElse(""))).collect(Collectors.toList());
diff --git
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationListExecutor.java
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationListExecutor.java
index a731a088a97..ce1877a5719 100644
---
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationListExecutor.java
+++
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationListExecutor.java
@@ -39,7 +39,7 @@ public final class ShowMigrationListExecutor implements
QueryableRALExecutor<Sho
@Override
public Collection<LocalDataQueryResultRow> getRows(final
ShowMigrationListStatement sqlStatement) {
- return pipelineJobManager.getPipelineJobInfos(new
PipelineContextKey(InstanceType.PROXY)).stream().map(each -> new
LocalDataQueryResultRow(each.getJobMetaData().getJobId(),
+ return pipelineJobManager.getJobInfos(new
PipelineContextKey(InstanceType.PROXY)).stream().map(each -> new
LocalDataQueryResultRow(each.getJobMetaData().getJobId(),
((TableBasedPipelineJobInfo) each).getTable(),
each.getJobMetaData().getJobItemCount(),
each.getJobMetaData().isActive() ? Boolean.TRUE.toString() :
Boolean.FALSE.toString(),
each.getJobMetaData().getCreateTime(),
each.getJobMetaData().getStopTime())).collect(Collectors.toList());