This is an automated email from the ASF dual-hosted git repository.

sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new b76937ea73a Refactor PipelineJobManager (#29077)
b76937ea73a is described below

commit b76937ea73a467521d9916b9a605d0c521ac777f
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Nov 18 19:38:55 2023 +0800

    Refactor PipelineJobManager (#29077)
---
 .../data/pipeline/core/job/service/PipelineJobManager.java  | 13 ++++---------
 .../distsql/handler/query/ShowStreamingListExecutor.java    |  2 +-
 .../distsql/handler/query/ShowMigrationListExecutor.java    |  2 +-
 3 files changed, 6 insertions(+), 11 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
index 3eab31d08af..4ffeaafbee9 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
@@ -35,7 +35,6 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressSwapper;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
-import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.JobBriefInfo;
 import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
@@ -47,7 +46,6 @@ import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 /**
  * Pipeline job manager.
@@ -191,18 +189,15 @@ public final class PipelineJobManager {
      * @param contextKey context key
      * @return jobs info
      */
-    public List<PipelineJobInfo> getPipelineJobInfos(final PipelineContextKey 
contextKey) {
+    public List<PipelineJobInfo> getJobInfos(final PipelineContextKey 
contextKey) {
         if (jobAPI instanceof InventoryIncrementalJobAPI) {
-            return getJobBriefInfos(contextKey, jobAPI.getType()).map(each -> 
((InventoryIncrementalJobAPI) 
jobAPI).getJobInfo(each.getJobName())).collect(Collectors.toList());
+            return 
PipelineAPIFactory.getJobStatisticsAPI(contextKey).getAllJobsBriefInfo().stream()
+                    .filter(each -> !each.getJobName().startsWith("_") && 
jobAPI.getType().equals(PipelineJobIdUtils.parseJobType(each.getJobName()).getType()))
+                    .map(each -> ((InventoryIncrementalJobAPI) 
jobAPI).getJobInfo(each.getJobName())).collect(Collectors.toList());
         }
         return Collections.emptyList();
     }
     
-    private Stream<JobBriefInfo> getJobBriefInfos(final PipelineContextKey 
contextKey, final String jobType) {
-        return 
PipelineAPIFactory.getJobStatisticsAPI(contextKey).getAllJobsBriefInfo().stream().filter(each
 -> !each.getJobName().startsWith("_"))
-                .filter(each -> 
jobType.equals(PipelineJobIdUtils.parseJobType(each.getJobName()).getType()));
-    }
-    
     /**
      * Get job item progress.
      *
diff --git 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingListExecutor.java
 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingListExecutor.java
index f6ca1ea172a..c5c64bb35ff 100644
--- 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingListExecutor.java
+++ 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingListExecutor.java
@@ -40,7 +40,7 @@ public final class ShowStreamingListExecutor implements 
QueryableRALExecutor<Sho
     
     @Override
     public Collection<LocalDataQueryResultRow> getRows(final 
ShowStreamingListStatement sqlStatement) {
-        return pipelineJobManager.getPipelineJobInfos(new 
PipelineContextKey(InstanceType.PROXY)).stream().map(each -> new 
LocalDataQueryResultRow(each.getJobMetaData().getJobId(),
+        return pipelineJobManager.getJobInfos(new 
PipelineContextKey(InstanceType.PROXY)).stream().map(each -> new 
LocalDataQueryResultRow(each.getJobMetaData().getJobId(),
                 ((TableBasedPipelineJobInfo) each).getDatabaseName(), 
((TableBasedPipelineJobInfo) each).getTable(),
                 each.getJobMetaData().getJobItemCount(), 
each.getJobMetaData().isActive() ? Boolean.TRUE.toString() : 
Boolean.FALSE.toString(),
                 each.getJobMetaData().getCreateTime(), 
Optional.ofNullable(each.getJobMetaData().getStopTime()).orElse(""))).collect(Collectors.toList());
diff --git 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationListExecutor.java
 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationListExecutor.java
index a731a088a97..ce1877a5719 100644
--- 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationListExecutor.java
+++ 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationListExecutor.java
@@ -39,7 +39,7 @@ public final class ShowMigrationListExecutor implements 
QueryableRALExecutor<Sho
     
     @Override
     public Collection<LocalDataQueryResultRow> getRows(final 
ShowMigrationListStatement sqlStatement) {
-        return pipelineJobManager.getPipelineJobInfos(new 
PipelineContextKey(InstanceType.PROXY)).stream().map(each -> new 
LocalDataQueryResultRow(each.getJobMetaData().getJobId(),
+        return pipelineJobManager.getJobInfos(new 
PipelineContextKey(InstanceType.PROXY)).stream().map(each -> new 
LocalDataQueryResultRow(each.getJobMetaData().getJobId(),
                 ((TableBasedPipelineJobInfo) each).getTable(), 
each.getJobMetaData().getJobItemCount(),
                 each.getJobMetaData().isActive() ? Boolean.TRUE.toString() : 
Boolean.FALSE.toString(),
                 each.getJobMetaData().getCreateTime(), 
each.getJobMetaData().getStopTime())).collect(Collectors.toList());

Reply via email to