This is an automated email from the ASF dual-hosted git repository. liyang pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit b15367e3e2682b195966f5b59dbf09f8867a792e Author: Hang Jia <754332...@qq.com> AuthorDate: Thu Jun 29 14:24:12 2023 +0800 KYLIN-5760 Support matching model exactly for list jobs --- .../kap/secondstorage/metadata/TableFlow.java | 13 ++++++ .../kylin/rest/controller/JobController.java | 4 +- .../kylin/rest/controller/v2/JobControllerV2.java | 2 +- .../kylin/rest/controller/JobControllerTest.java | 6 +-- .../rest/controller/v2/JobControllerV2Test.java | 11 ++--- .../org/apache/kylin/rest/request/JobFilter.java | 2 + .../org/apache/kylin/rest/service/JobService.java | 5 +++ .../apache/kylin/rest/service/JobServiceTest.java | 49 ++++++++++++++++++---- .../apache/kylin/rest/service/ModelService.java | 9 ++-- 9 files changed, 77 insertions(+), 24 deletions(-) diff --git a/outdated/second-storage/core/src/main/java/io/kyligence/kap/secondstorage/metadata/TableFlow.java b/outdated/second-storage/core/src/main/java/io/kyligence/kap/secondstorage/metadata/TableFlow.java index a4add8edca..908f591e13 100644 --- a/outdated/second-storage/core/src/main/java/io/kyligence/kap/secondstorage/metadata/TableFlow.java +++ b/outdated/second-storage/core/src/main/java/io/kyligence/kap/secondstorage/metadata/TableFlow.java @@ -29,6 +29,7 @@ import java.util.stream.Collectors; import org.apache.commons.collections.CollectionUtils; import org.apache.kylin.common.persistence.RootPersistentEntity; +import org.apache.kylin.metadata.MetadataConstants; import org.apache.kylin.metadata.cube.model.LayoutEntity; import com.fasterxml.jackson.annotation.JsonManagedReference; @@ -72,6 +73,8 @@ public class TableFlow extends RootPersistentEntity return new Builder(); } + public static final String TABLEFLOW_RESOURCE_ROOT = "/clickhouse_data"; + protected transient Manager<TableFlow> manager; @Override public void setManager(Manager<TableFlow> manager) { @@ -171,6 +174,16 @@ public class TableFlow extends RootPersistentEntity }); } + @Override + public String getResourcePath() { + return concatResourcePath(getUuid(), manager.project); + } + + public static String concatResourcePath(String name, String project) { + return new StringBuilder().append("/").append(project).append(TABLEFLOW_RESOURCE_ROOT) + .append("/").append(name).append(MetadataConstants.FILE_SURFIX).toString(); + } + @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/src/data-loading-server/src/main/java/org/apache/kylin/rest/controller/JobController.java b/src/data-loading-server/src/main/java/org/apache/kylin/rest/controller/JobController.java index ea3e921baa..e47d01b658 100644 --- a/src/data-loading-server/src/main/java/org/apache/kylin/rest/controller/JobController.java +++ b/src/data-loading-server/src/main/java/org/apache/kylin/rest/controller/JobController.java @@ -94,6 +94,7 @@ public class JobController extends BaseController { @RequestParam(value = "time_filter") Integer timeFilter, @RequestParam(value = "subject", required = false) String subject, @RequestParam(value = "key", required = false) String key, + @RequestParam(value = "exact", required = false, defaultValue = "false") boolean exactMatch, @RequestParam(value = "project", required = false) String project, @RequestParam(value = "page_offset", required = false, defaultValue = "0") Integer pageOffset, @RequestParam(value = "page_size", required = false, defaultValue = "10") Integer pageSize, @@ -101,7 +102,8 @@ public class JobController extends BaseController { @RequestParam(value = "reverse", required = false, defaultValue = "true") boolean reverse) { jobService.checkJobStatus(statuses); checkRequiredArg("time_filter", timeFilter); - JobFilter jobFilter = new JobFilter(statuses, jobNames, timeFilter, subject, key, project, sortBy, reverse); + JobFilter jobFilter = new JobFilter(statuses, jobNames, timeFilter, subject, key, exactMatch, project, sortBy, + reverse); DataResult<List<ExecutableResponse>> executables; if (!StringUtils.isEmpty(project)) { executables = jobService.listJobs(jobFilter, pageOffset, pageSize); diff --git a/src/data-loading-server/src/main/java/org/apache/kylin/rest/controller/v2/JobControllerV2.java b/src/data-loading-server/src/main/java/org/apache/kylin/rest/controller/v2/JobControllerV2.java index 720238fb53..6844578785 100644 --- a/src/data-loading-server/src/main/java/org/apache/kylin/rest/controller/v2/JobControllerV2.java +++ b/src/data-loading-server/src/main/java/org/apache/kylin/rest/controller/v2/JobControllerV2.java @@ -108,7 +108,7 @@ public class JobControllerV2 extends BaseController { JobFilter jobFilter = new JobFilter(statuses, Objects.isNull(jobName) ? Lists.newArrayList() : Lists.newArrayList(jobName), timeFilter, null, key, - project, sortBy, reverse); + false, project, sortBy, reverse); List<ExecutableResponse> executables = null; if (!StringUtils.isEmpty(project)) { executables = jobService.listJobs(jobFilter); diff --git a/src/data-loading-server/src/test/java/org/apache/kylin/rest/controller/JobControllerTest.java b/src/data-loading-server/src/test/java/org/apache/kylin/rest/controller/JobControllerTest.java index 1cc841cd8e..560bf6ee1f 100644 --- a/src/data-loading-server/src/test/java/org/apache/kylin/rest/controller/JobControllerTest.java +++ b/src/data-loading-server/src/test/java/org/apache/kylin/rest/controller/JobControllerTest.java @@ -92,7 +92,7 @@ public class JobControllerTest extends NLocalFileMetadataTestCase { List<ExecutableResponse> jobs = new ArrayList<>(); List<String> jobNames = Lists.newArrayList(); List<String> statuses = Lists.newArrayList("NEW", "RUNNING"); - JobFilter jobFilter = new JobFilter(statuses, jobNames, 4, "", "", "default", "job_name", false); + JobFilter jobFilter = new JobFilter(statuses, jobNames, 4, "", "", false, "default", "job_name", false); Mockito.when(jobService.listJobs(jobFilter)).thenReturn(jobs); mockMvc.perform(MockMvcRequestBuilders.get("/api/jobs").contentType(MediaType.APPLICATION_JSON) .param("project", "default").param("page_offset", "0").param("page_size", "10") @@ -100,8 +100,8 @@ public class JobControllerTest extends NLocalFileMetadataTestCase { .param("statuses", "NEW,RUNNING").accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_JSON))) .andExpect(MockMvcResultMatchers.status().isOk()).andReturn(); - Mockito.verify(jobController).getJobList(statuses, jobNames, 1, "", "", "default", 0, 10, "last_modified", - true); + Mockito.verify(jobController).getJobList(statuses, jobNames, 1, "", "", false, "default", 0, 10, + "last_modified", true); } @Test diff --git a/src/data-loading-server/src/test/java/org/apache/kylin/rest/controller/v2/JobControllerV2Test.java b/src/data-loading-server/src/test/java/org/apache/kylin/rest/controller/v2/JobControllerV2Test.java index eec40babd4..ad66615c13 100644 --- a/src/data-loading-server/src/test/java/org/apache/kylin/rest/controller/v2/JobControllerV2Test.java +++ b/src/data-loading-server/src/test/java/org/apache/kylin/rest/controller/v2/JobControllerV2Test.java @@ -107,8 +107,8 @@ public class JobControllerV2Test extends NLocalFileMetadataTestCase { public void testGetJobs() throws Exception { List<ExecutableResponse> jobs = new ArrayList<>(); List<String> jobNames = Lists.newArrayList(); - JobFilter jobFilter = new JobFilter(Lists.newArrayList("NEW"), jobNames, 4, "", "", "default", "job_name", - false); + JobFilter jobFilter = new JobFilter(Lists.newArrayList("NEW"), jobNames, 4, "", "", false, "default", + "job_name", false); Mockito.when(jobService.listJobs(jobFilter)).thenReturn(jobs); mockMvc.perform(MockMvcRequestBuilders.get("/api/jobs").contentType(MediaType.APPLICATION_JSON) .param("projectName", "default").param("pageOffset", "0").param("pageSize", "10") @@ -124,7 +124,8 @@ public class JobControllerV2Test extends NLocalFileMetadataTestCase { public void testGetJobsWithoutProjectAndSortby() throws Exception { List<ExecutableResponse> jobs = new ArrayList<>(); List<String> jobNames = Lists.newArrayList(); - JobFilter jobFilter = new JobFilter(Lists.newArrayList(), jobNames, 4, null, null, null, "job_name", true); + JobFilter jobFilter = new JobFilter(Lists.newArrayList(), jobNames, 4, null, null, false, null, "job_name", + true); Mockito.when(jobService.listGlobalJobs(jobFilter, 0, Integer.MAX_VALUE)) .thenReturn(new DataResult<>(jobs, 0, 0, 0)); mockMvc.perform( @@ -171,8 +172,8 @@ public class JobControllerV2Test extends NLocalFileMetadataTestCase { public void testGetJobsException_pageOffset_pageSize() throws Exception { List<ExecutableResponse> jobs = new ArrayList<>(); List<String> jobNames = Lists.newArrayList(); - JobFilter jobFilter = new JobFilter(Lists.newArrayList("NEW"), jobNames, 4, "", "", "default", "job_name", - false); + JobFilter jobFilter = new JobFilter(Lists.newArrayList("NEW"), jobNames, 4, "", "", false, "default", + "job_name", false); Mockito.when(jobService.listJobs(jobFilter)).thenReturn(jobs); mockMvc.perform(MockMvcRequestBuilders.get("/api/jobs").contentType(MediaType.APPLICATION_JSON) .param("projectName", "default").param("pageOffset", "a").param("pageSize", "10") diff --git a/src/data-loading-service/src/main/java/org/apache/kylin/rest/request/JobFilter.java b/src/data-loading-service/src/main/java/org/apache/kylin/rest/request/JobFilter.java index 05f8618b43..d184b15fc4 100644 --- a/src/data-loading-service/src/main/java/org/apache/kylin/rest/request/JobFilter.java +++ b/src/data-loading-service/src/main/java/org/apache/kylin/rest/request/JobFilter.java @@ -39,6 +39,8 @@ public class JobFilter { private String key; + private boolean exactMatch; + private String project; private String sortBy; diff --git a/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java b/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java index 71d18e1771..212f9f4365 100644 --- a/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java +++ b/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java @@ -257,6 +257,11 @@ public class JobService extends BasicService implements JobSupporter, ISmartAppl modelManagerMaps.put(executablePO.getProject(), modelManager); } return createExecutablePOSortBean(executablePO, modelManager); + }).filter(executablePO -> { + if (!jobFilter.isExactMatch() || StringUtils.isEmpty(jobFilter.getKey())) { + return true; + } + return jobFilter.getKey().equalsIgnoreCase(executablePO.getTargetSubject()); }).sorted(comparator).collect(Collectors.toList()); } diff --git a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java index 7e076453fc..5a98b308b9 100644 --- a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java +++ b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java @@ -265,7 +265,7 @@ public class JobServiceTest extends LogOutputTestCase { getTestConfig().setProperty("kylin.streaming.enabled", "false"); // test size List<String> jobNames = Lists.newArrayList(); - JobFilter jobFilter = new JobFilter(Lists.newArrayList(), jobNames, 4, "", "", "default", "", true); + JobFilter jobFilter = new JobFilter(Lists.newArrayList(), jobNames, 4, "", "", false, "default", "", true); List<ExecutableResponse> jobs = jobService.listJobs(jobFilter); Assert.assertEquals(3, jobs.size()); jobService.addOldParams(jobs); @@ -399,8 +399,8 @@ public class JobServiceTest extends LogOutputTestCase { Mockito.when(executableDao.getJobs(Mockito.anyLong(), Mockito.anyLong())).thenReturn(mockJobs); { List<String> jobNames = Lists.newArrayList(); - JobFilter jobFilter = new JobFilter(Lists.newArrayList(), jobNames, 0, "", "", "default", "total_duration", - true); + JobFilter jobFilter = new JobFilter(Lists.newArrayList(), jobNames, 0, "", "", false, "default", + "total_duration", true); List<ExecutableResponse> jobs = jobService.listJobs(jobFilter); val totalDurationArrays = jobs.stream().map(ExecutableResponse::getTotalDuration) @@ -419,11 +419,41 @@ public class JobServiceTest extends LogOutputTestCase { } } List<String> jobNames = Lists.newArrayList(); - JobFilter jobFilter = new JobFilter(Lists.newArrayList(), jobNames, 0, "", "default", "default", "", false); + JobFilter jobFilter = new JobFilter(Lists.newArrayList(), jobNames, 0, "", "default", false, "default", "", + false); List<ExecutableResponse> jobs = jobService.listJobs(jobFilter); Assert.assertEquals(2, jobs.size()); } + @Test + public void testFilterJobExactMatch() throws Exception { + NExecutableManager executableManager = NExecutableManager.getInstance(getTestConfig(), "default"); + Mockito.when(jobService.getManager(NExecutableManager.class, "default")).thenReturn(executableManager); + ReflectionTestUtils.setField(executableManager, "executableDao", executableDao); + val mockJobs = mockDetailJobs(false); + Mockito.when(executableDao.getJobs(Mockito.anyLong(), Mockito.anyLong())).thenReturn(mockJobs); + + for (int i = 0; i < 3; i++) { + if (i < 2) { + mockJobs.get(i).setJobType(JobTypeEnum.SECOND_STORAGE_NODE_CLEAN); + } else { + mockJobs.get(i).setJobType(JobTypeEnum.TABLE_SAMPLING); + } + } + List<String> jobNames = Lists.newArrayList(); + JobFilter jobFilter = new JobFilter(Lists.newArrayList(), jobNames, 0, "", "def", false, "default", "", false); + List<ExecutableResponse> jobs = jobService.listJobs(jobFilter); + Assert.assertEquals(2, jobs.size()); + + JobFilter jobFilter2 = new JobFilter(Lists.newArrayList(), jobNames, 0, "", "def", true, "default", "", false); + List<ExecutableResponse> jobs2 = jobService.listJobs(jobFilter2); + Assert.assertEquals(0, jobs2.size()); + + JobFilter jobFilter3 = new JobFilter(Lists.newArrayList(), jobNames, 0, "", null, true, "default", "", false); + List<ExecutableResponse> jobs3 = jobService.listJobs(jobFilter3); + Assert.assertEquals(3, jobs3.size()); + } + private List<ProjectInstance> mockProjects() { ProjectInstance defaultProject = new ProjectInstance(); defaultProject.setName("default"); @@ -467,7 +497,7 @@ public class JobServiceTest extends LogOutputTestCase { Mockito.when(executableManager1.getAllExecutables(Mockito.anyLong(), Mockito.anyLong())).thenReturn(mockJobs1); List<String> jobNames = Lists.newArrayList(); - JobFilter jobFilter = new JobFilter(Lists.newArrayList(), jobNames, 4, "", "", "default", "", true); + JobFilter jobFilter = new JobFilter(Lists.newArrayList(), jobNames, 4, "", "", false, "default", "", true); List<ExecutableResponse> jobs = jobService.listGlobalJobs(jobFilter, 0, 10).getValue(); Assert.assertEquals(4, jobs.size()); Assert.assertEquals("default1", jobs.get(3).getProject()); @@ -1107,7 +1137,7 @@ public class JobServiceTest extends LogOutputTestCase { executable.setName("test_create_time"); manager.addJob(executable); List<String> jobNames = Lists.newArrayList(); - JobFilter jobFilter = new JobFilter(Lists.newArrayList(), jobNames, 4, "", "", "default", "", true); + JobFilter jobFilter = new JobFilter(Lists.newArrayList(), jobNames, 4, "", "", false, "default", "", true); List<ExecutableResponse> jobs = jobService.listJobs(jobFilter); Assert.assertTrue(jobs.get(0).getCreateTime() > 0); } @@ -1125,7 +1155,7 @@ public class JobServiceTest extends LogOutputTestCase { manager.addJob(job1); manager.addJob(samplingJob); List<String> jobNames = Lists.newArrayList(); - JobFilter jobFilter = new JobFilter(Lists.newArrayList(), jobNames, 4, "", "", "default", "", true); + JobFilter jobFilter = new JobFilter(Lists.newArrayList(), jobNames, 4, "", "", false, "default", "", true); jobFilter.setSortBy("job_name"); List<ExecutableResponse> jobs = jobService.listJobs(jobFilter); @@ -1300,7 +1330,8 @@ public class JobServiceTest extends LogOutputTestCase { Mockito.when(manager.getAllJobs(Mockito.anyLong(), Mockito.anyLong())) .thenReturn(Collections.singletonList(job1)); - JobFilter jobFilter = new JobFilter(Lists.newArrayList(), Lists.newArrayList(), 4, "", "", "default", "", true); + JobFilter jobFilter = new JobFilter(Lists.newArrayList(), Lists.newArrayList(), 4, "", "", false, "default", "", + true); List<ExecutableResponse> jobs = jobService.listJobs(jobFilter); Assert.assertEquals(1, jobs.size()); @@ -1959,7 +1990,7 @@ public class JobServiceTest extends LogOutputTestCase { getTestConfig().setProperty("kylin.streaming.enabled", "false"); // test size List<String> jobNames = Lists.newArrayList(); - JobFilter jobFilter = new JobFilter(Lists.newArrayList(), jobNames, 4, "", "", "default", "", true); + JobFilter jobFilter = new JobFilter(Lists.newArrayList(), jobNames, 4, "", "", false, "default", "", true); List<ExecutableResponse> jobs = jobService.listJobs(jobFilter); List<ExecutableResponse> executableResponses = jobService.addOldParams(jobs); ExecutableResponse executable = executableResponses.get(0); diff --git a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelService.java b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelService.java index b4b1721dee..1ad9ce6cd1 100644 --- a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelService.java +++ b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelService.java @@ -372,11 +372,10 @@ public class ModelService extends AbstractModelService implements TableModelSupp private void addOldSegmentParams(NDataModel model, NDataModelOldParams oldParams, List<AbstractExecutable> executables) { - List<NDataSegmentResponse> segments = getSegmentsResponse(model.getId(), model.getProject(), "1", - String.valueOf(Long.MAX_VALUE - 1), null, executables, LAST_MODIFY, true); - calculateRecordSizeAndCount(segments, oldParams); - - if (model instanceof NDataModelResponse) { + if (((model instanceof NDataModelResponse) && !(model instanceof NDataModelLiteResponse)) || model.isFusionModel()) { + List<NDataSegmentResponse> segments = getSegmentsResponse(model.getId(), model.getProject(), "1", + String.valueOf(Long.MAX_VALUE - 1), null, executables, LAST_MODIFY, true); + calculateRecordSizeAndCount(segments, oldParams); ((NDataModelResponse) model).setSegments(segments); ((NDataModelResponse) model).setHasSegments( ((NDataModelResponse) model).isHasSegments() || CollectionUtils.isNotEmpty(segments));