Repository: incubator-griffin Updated Branches: refs/heads/master 60c022176 -> 90f3b16d7
fix bugs and add record triggered time of measure Author: ahutsunshine <ahutsunsh...@gmail.com> Closes #171 from ahutsunshine/master. Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/90f3b16d Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/90f3b16d Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/90f3b16d Branch: refs/heads/master Commit: 90f3b16d7828965af22f65e41da8ceed6dde8b9e Parents: 60c0221 Author: ahutsunshine <ahutsunsh...@gmail.com> Authored: Mon Nov 6 17:13:15 2017 +0800 Committer: Lionel Liu <bhlx3l...@163.com> Committed: Mon Nov 6 17:13:15 2017 +0800 ---------------------------------------------------------------------- griffin-doc/postman/griffin.json | 2 +- .../apache/griffin/core/job/JobServiceImpl.java | 10 ++++++ .../apache/griffin/core/job/SparkSubmitJob.java | 1 + .../core/measure/MeasureOrgServiceImpl.java | 4 +-- .../core/measure/MeasureServiceImpl.java | 16 ++++----- .../griffin/core/measure/entity/Measure.java | 15 ++++++++ .../griffin/core/measure/repo/MeasureRepo.java | 10 +++--- .../hive/HiveMetaStoreServiceImpl.java | 2 +- .../griffin/core/job/JobServiceImplTest.java | 38 ++++++++++++++++---- .../core/measure/MeasureOrgServiceImplTest.java | 4 +-- .../core/measure/MeasureServiceImplTest.java | 8 ++--- .../core/measure/repo/MeasureRepoTest.java | 4 +-- 12 files changed, 82 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/90f3b16d/griffin-doc/postman/griffin.json ---------------------------------------------------------------------- diff --git a/griffin-doc/postman/griffin.json b/griffin-doc/postman/griffin.json index c692125..468658c 100644 --- a/griffin-doc/postman/griffin.json +++ b/griffin-doc/postman/griffin.json @@ -910,7 +910,7 @@ "helperAttributes": {}, "time": 1509333182624, "name": "Update measure", - "description": "`PUT /api/v1/measure`\n\n#### Request Header\nkey | value\n--- | ---\nContent-Type | application/json\n\n#### Request Body\n\nname | description | type\n--- | --- | --- \nmeasure | measure entity | Measure\n\n#### Response Body Sample\n```\n{\n \"code\": 204,\n \"description\": \"Update Measure Succeed\"\n}\n```\n\nIt may return failed messages.Such as,\n\n```\n {\n \"code\": 400,\n \"description\": \"Resource Not Found\"\n}\n\n```\n\nThe reason for failure may be that measure id doesn't exist.You should check your measure.", + "description": "`PUT /api/v1/measure`\n\n#### Request Header\nkey | value\n--- | ---\nContent-Type | application/json\n\n#### Request Body\n\nname | description | type\n--- | --- | --- \nmeasure | measure entity | Measure\n\n#### Response Body Sample\n```\n{\n \"code\": 204,\n \"description\": \"Update Measure Succeed\"\n}\n```\n\nIt may return failed messages.Such as,\n\n```\n {\n \"code\": 400,\n \"description\": \"Resource Not Found\"\n}\n\n```\n\nThe reason for failure may be that measure id doesn't exist or the measure has been deleted by logically.You should check your measure.", "collectionId": "689bb3f2-1c6a-b45e-5409-4df1ef07554c", "responses": [ { http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/90f3b16d/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java b/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java index 425368c..b878854 100644 --- a/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java +++ b/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java @@ -298,6 +298,16 @@ public class JobServiceImpl implements JobService { @Override public List<JobInstance> findInstancesOfJob(String group, String jobName, int page, int size) { + try { + Scheduler scheduler = factory.getObject(); + JobKey jobKey = new JobKey(jobName, group); + if (!scheduler.checkExists(jobKey) || isJobDeleted(scheduler, jobKey)) { + return new ArrayList<>(); + } + } catch (SchedulerException e) { + LOGGER.error("Quartz schedule error. {}", e.getMessage()); + return new ArrayList<>(); + } //query and return instances Pageable pageRequest = new PageRequest(page, size, Sort.Direction.DESC, "timestamp"); return jobInstanceRepo.findByGroupNameAndJobName(group, jobName, pageRequest); http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/90f3b16d/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java b/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java index 7ae52cc..d5502e5 100644 --- a/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java +++ b/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java @@ -229,6 +229,7 @@ public class SparkSubmitJob implements Job { args.add(sparkJobProps.getProperty("sparkJob.args_1")); // measure String measureJson; + measure.setTriggerTimeStamp(System.currentTimeMillis()); measureJson = JsonUtil.toJsonWithFormat(measure); args.add(measureJson); args.add(sparkJobProps.getProperty("sparkJob.args_3")); http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/90f3b16d/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgServiceImpl.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgServiceImpl.java b/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgServiceImpl.java index bd987a9..d4cb6a9 100644 --- a/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgServiceImpl.java +++ b/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgServiceImpl.java @@ -38,12 +38,12 @@ public class MeasureOrgServiceImpl implements MeasureOrgService { @Override public List<String> getOrgs() { - return measureRepo.findOrganizations(); + return measureRepo.findOrganizations(false); } @Override public List<String> getMetricNameListByOrg(String org) { - return measureRepo.findNameByOrganization(org); + return measureRepo.findNameByOrganization(org,false); } @Override http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/90f3b16d/service/src/main/java/org/apache/griffin/core/measure/MeasureServiceImpl.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/measure/MeasureServiceImpl.java b/service/src/main/java/org/apache/griffin/core/measure/MeasureServiceImpl.java index 0a880cc..8c088c8 100644 --- a/service/src/main/java/org/apache/griffin/core/measure/MeasureServiceImpl.java +++ b/service/src/main/java/org/apache/griffin/core/measure/MeasureServiceImpl.java @@ -32,10 +32,7 @@ import org.springframework.stereotype.Service; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestBody; -import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; @Service public class MeasureServiceImpl implements MeasureService { @@ -53,7 +50,7 @@ public class MeasureServiceImpl implements MeasureService { @Override public Measure getMeasureById(@PathVariable("id") long id) { - return measureRepo.findOne(id); + return measureRepo.findByIdAndDeleted(id, false); } @Override @@ -62,13 +59,13 @@ public class MeasureServiceImpl implements MeasureService { return GriffinOperationMessage.RESOURCE_NOT_FOUND; } else { Measure measure = measureRepo.findOne(measureId); - try{ + try { //pause all jobs related to the measure jobService.deleteJobsRelateToMeasure(measure); measure.setDeleted(true); measureRepo.save(measure); - }catch (SchedulerException e){ - LOGGER.error("Delete measure id: {} name: {} failure. {}", measure.getId(), measure.getName(),e.getMessage()); + } catch (SchedulerException e) { + LOGGER.error("Delete measure id: {} name: {} failure. {}", measure.getId(), measure.getName(), e.getMessage()); return GriffinOperationMessage.DELETE_MEASURE_BY_ID_FAIL; } @@ -83,8 +80,7 @@ public class MeasureServiceImpl implements MeasureService { try { if (measureRepo.save(measure) != null) { return GriffinOperationMessage.CREATE_MEASURE_SUCCESS; - } - else { + } else { return GriffinOperationMessage.CREATE_MEASURE_FAIL; } } catch (Exception e) { @@ -105,7 +101,7 @@ public class MeasureServiceImpl implements MeasureService { @Override public GriffinOperationMessage updateMeasure(@RequestBody Measure measure) { - if (!measureRepo.exists(measure.getId())) { + if (measureRepo.findByIdAndDeleted(measure.getId(), false) == null) { return GriffinOperationMessage.RESOURCE_NOT_FOUND; } else { try { http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/90f3b16d/service/src/main/java/org/apache/griffin/core/measure/entity/Measure.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/measure/entity/Measure.java b/service/src/main/java/org/apache/griffin/core/measure/entity/Measure.java index 60e8147..d8afba4 100644 --- a/service/src/main/java/org/apache/griffin/core/measure/entity/Measure.java +++ b/service/src/main/java/org/apache/griffin/core/measure/entity/Measure.java @@ -36,6 +36,11 @@ public class Measure extends AbstractAuditableEntity { private String processType; + /** + * record triggered time of measure + */ + private Long triggerTimeStamp = -1L; + @OneToMany(fetch = FetchType.EAGER, cascade = {CascadeType.PERSIST, CascadeType.REMOVE, CascadeType.MERGE}) @JoinColumn(name = "measure_id") @@ -116,6 +121,16 @@ public class Measure extends AbstractAuditableEntity { this.deleted = deleted; } + @JsonProperty("timestamp") + public Long getTriggerTimeStamp() { + return triggerTimeStamp; + } + + @JsonProperty("timestamp") + public void setTriggerTimeStamp(Long triggerTimeStamp) { + this.triggerTimeStamp = triggerTimeStamp; + } + public Measure() { } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/90f3b16d/service/src/main/java/org/apache/griffin/core/measure/repo/MeasureRepo.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/measure/repo/MeasureRepo.java b/service/src/main/java/org/apache/griffin/core/measure/repo/MeasureRepo.java index 1e6ac0d..b324f1e 100644 --- a/service/src/main/java/org/apache/griffin/core/measure/repo/MeasureRepo.java +++ b/service/src/main/java/org/apache/griffin/core/measure/repo/MeasureRepo.java @@ -35,12 +35,14 @@ public interface MeasureRepo extends CrudRepository<Measure, Long> { List<Measure> findByOwnerAndDeleted(String owner, Boolean deleted); - @Query("select DISTINCT m.organization from Measure m") - List<String> findOrganizations(); + Measure findByIdAndDeleted(Long id, Boolean deleted); + + @Query("select DISTINCT m.organization from Measure m where m.deleted = ?1") + List<String> findOrganizations(Boolean deleted); @Query("select m.name from Measure m " + - "where m.organization= ?1") - List<String> findNameByOrganization(String organization); + "where m.organization= ?1 and m.deleted= ?2") + List<String> findNameByOrganization(String organization, Boolean deleted); @Query("select m.organization from Measure m " + "where m.name= ?1") http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/90f3b16d/service/src/main/java/org/apache/griffin/core/metastore/hive/HiveMetaStoreServiceImpl.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/metastore/hive/HiveMetaStoreServiceImpl.java b/service/src/main/java/org/apache/griffin/core/metastore/hive/HiveMetaStoreServiceImpl.java index d5861e7..759e370 100644 --- a/service/src/main/java/org/apache/griffin/core/metastore/hive/HiveMetaStoreServiceImpl.java +++ b/service/src/main/java/org/apache/griffin/core/metastore/hive/HiveMetaStoreServiceImpl.java @@ -55,7 +55,7 @@ public class HiveMetaStoreServiceImpl implements HiveMetaStoreService { private ThreadPoolExecutor singleThreadExecutor; public HiveMetaStoreServiceImpl() { - singleThreadExecutor = new ThreadPoolExecutor(1, 1, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1)); + singleThreadExecutor = new ThreadPoolExecutor(1, 5, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<>(3),new ThreadPoolExecutor.DiscardPolicy()); LOGGER.info("HiveMetaStoreServiceImpl single thread pool created."); } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/90f3b16d/service/src/test/java/org/apache/griffin/core/job/JobServiceImplTest.java ---------------------------------------------------------------------- diff --git a/service/src/test/java/org/apache/griffin/core/job/JobServiceImplTest.java b/service/src/test/java/org/apache/griffin/core/job/JobServiceImplTest.java index ef9b34b..cda4104 100644 --- a/service/src/test/java/org/apache/griffin/core/job/JobServiceImplTest.java +++ b/service/src/test/java/org/apache/griffin/core/job/JobServiceImplTest.java @@ -214,18 +214,40 @@ public class JobServiceImplTest { } @Test - public void testFindInstancesOfJob() { + public void testFindInstancesOfJob() throws SchedulerException { + Scheduler scheduler = Mockito.mock(Scheduler.class); String groupName = "BA"; String jobName = "job1"; int page = 0; int size = 2; + JobKey jobKey = new JobKey(jobName,groupName); JobInstance jobInstance = new JobInstance(groupName, jobName, 1, LivySessionStates.State.dead, "app_id", "app_uri", System.currentTimeMillis()); Pageable pageRequest = new PageRequest(page, size, Sort.Direction.DESC, "timestamp"); given(jobInstanceRepo.findByGroupNameAndJobName(groupName, jobName, pageRequest)).willReturn(Arrays.asList(jobInstance)); + given(factory.getObject()).willReturn(scheduler); + given(scheduler.checkExists(jobKey)).willReturn(true); + mockJsonDataMap(scheduler, jobKey,false); assertEquals(service.findInstancesOfJob(groupName, jobName, page, size).size(), 1); } @Test + public void testFindInstancesOfJobForDeleted() throws SchedulerException { + Scheduler scheduler = Mockito.mock(Scheduler.class); + String groupName = "BA"; + String jobName = "job1"; + int page = 0; + int size = 2; + JobKey jobKey = new JobKey(jobName,groupName); + JobInstance jobInstance = new JobInstance(groupName, jobName, 1, LivySessionStates.State.dead, "app_id", "app_uri", System.currentTimeMillis()); + Pageable pageRequest = new PageRequest(page, size, Sort.Direction.DESC, "timestamp"); + given(jobInstanceRepo.findByGroupNameAndJobName(groupName, jobName, pageRequest)).willReturn(Arrays.asList(jobInstance)); + given(factory.getObject()).willReturn(scheduler); + given(scheduler.checkExists(jobKey)).willReturn(true); + mockJsonDataMap(scheduler, jobKey,true); + assertEquals(service.findInstancesOfJob(groupName, jobName, page, size).size(), 0); + } + + @Test public void testSyncInstancesOfJobForSuccess() { JobInstance instance = newJobInstance(); String group = "groupName"; @@ -285,11 +307,7 @@ public class JobServiceImplTest { List<Trigger> triggers = new ArrayList<>(); triggers.add(trigger); given((List<Trigger>) scheduler.getTriggersOfJob(jobKey)).willReturn(triggers); - JobDataMap jobDataMap = mock(JobDataMap.class); - JobDetailImpl jobDetail = new JobDetailImpl(); - jobDetail.setJobDataMap(jobDataMap); - given(scheduler.getJobDetail(jobKey)).willReturn(jobDetail); - given(jobDataMap.getBooleanFromString("deleted")).willReturn(false); + mockJsonDataMap(scheduler, jobKey, false); Set<JobKey> jobKeySet = new HashSet<>(); jobKeySet.add(jobKey); given(scheduler.getJobKeys(GroupMatcher.anyGroup())).willReturn((jobKeySet)); @@ -321,6 +339,14 @@ public class JobServiceImplTest { assertEquals(service.getHealthInfo().getHealthyJobCount(), 0); } + private void mockJsonDataMap(Scheduler scheduler,JobKey jobKey,Boolean deleted) throws SchedulerException { + JobDataMap jobDataMap = mock(JobDataMap.class); + JobDetailImpl jobDetail = new JobDetailImpl(); + jobDetail.setJobDataMap(jobDataMap); + given(scheduler.getJobDetail(jobKey)).willReturn(jobDetail); + given(jobDataMap.getBooleanFromString("deleted")).willReturn(deleted); + } + private Trigger newTriggerInstance(String name, String group, int internalInSeconds) { return newTrigger().withIdentity(TriggerKey.triggerKey(name, group)). withSchedule(SimpleScheduleBuilder.simpleSchedule() http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/90f3b16d/service/src/test/java/org/apache/griffin/core/measure/MeasureOrgServiceImplTest.java ---------------------------------------------------------------------- diff --git a/service/src/test/java/org/apache/griffin/core/measure/MeasureOrgServiceImplTest.java b/service/src/test/java/org/apache/griffin/core/measure/MeasureOrgServiceImplTest.java index dfb49d6..ad9520b 100644 --- a/service/src/test/java/org/apache/griffin/core/measure/MeasureOrgServiceImplTest.java +++ b/service/src/test/java/org/apache/griffin/core/measure/MeasureOrgServiceImplTest.java @@ -49,7 +49,7 @@ public class MeasureOrgServiceImplTest { @Test public void testGetOrgs(){ String orgName = "orgName"; - given(measureRepo.findOrganizations()).willReturn(Arrays.asList(orgName)); + given(measureRepo.findOrganizations(false)).willReturn(Arrays.asList(orgName)); List<String> orgs =service.getOrgs(); assertThat(orgs.size()).isEqualTo(1); assertThat(orgs.get(0)).isEqualTo(orgName); @@ -59,7 +59,7 @@ public class MeasureOrgServiceImplTest { public void testGetMetricNameListByOrg(){ String orgName = "orgName"; String measureName = "measureName"; - given(measureRepo.findNameByOrganization(orgName)).willReturn(Arrays.asList(measureName)); + given(measureRepo.findNameByOrganization(orgName,false)).willReturn(Arrays.asList(measureName)); List<String> measureNames=service.getMetricNameListByOrg(orgName); assertThat(measureNames.size()).isEqualTo(1); assertThat(measureNames.get(0)).isEqualTo(measureName); http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/90f3b16d/service/src/test/java/org/apache/griffin/core/measure/MeasureServiceImplTest.java ---------------------------------------------------------------------- diff --git a/service/src/test/java/org/apache/griffin/core/measure/MeasureServiceImplTest.java b/service/src/test/java/org/apache/griffin/core/measure/MeasureServiceImplTest.java index d1e4cd4..b9859bd 100644 --- a/service/src/test/java/org/apache/griffin/core/measure/MeasureServiceImplTest.java +++ b/service/src/test/java/org/apache/griffin/core/measure/MeasureServiceImplTest.java @@ -69,7 +69,7 @@ public class MeasureServiceImplTest { @Test public void testGetMeasuresById() throws Exception { Measure measure = createATestMeasure("view_item_hourly", "test"); - given(measureRepo.findOne(1L)).willReturn(measure); + given(measureRepo.findByIdAndDeleted(1L,false)).willReturn(measure); Measure m = service.getMeasureById(1); assertEquals(m.getName(), measure.getName()); } @@ -137,7 +137,7 @@ public class MeasureServiceImplTest { @Test public void testUpdateMeasureForSuccess() throws Exception { Measure measure = createATestMeasure("view_item_hourly", "test"); - given(measureRepo.exists(measure.getId())).willReturn(true); + given(measureRepo.findByIdAndDeleted(measure.getId(),false)).willReturn(new Measure()); given(measureRepo.save(measure)).willReturn(measure); GriffinOperationMessage message = service.updateMeasure(measure); assertEquals(message, GriffinOperationMessage.UPDATE_MEASURE_SUCCESS); @@ -146,7 +146,7 @@ public class MeasureServiceImplTest { @Test public void testUpdateMeasureForNotFound() throws Exception { Measure measure = createATestMeasure("view_item_hourly", "test"); - given(measureRepo.exists(measure.getId())).willReturn(false); + given(measureRepo.findByIdAndDeleted(measure.getId(),false)).willReturn(null); GriffinOperationMessage message = service.updateMeasure(measure); assertEquals(message, GriffinOperationMessage.RESOURCE_NOT_FOUND); } @@ -154,7 +154,7 @@ public class MeasureServiceImplTest { @Test public void testUpdateMeasureForFailWithSaveException() throws Exception { Measure measure = createATestMeasure("view_item_hourly", "test"); - given(measureRepo.exists(measure.getId())).willReturn(true); + given(measureRepo.findByIdAndDeleted(measure.getId(),false)).willReturn(new Measure()); given(measureRepo.save(measure)).willThrow(Exception.class); GriffinOperationMessage message = service.updateMeasure(measure); assertEquals(message, GriffinOperationMessage.UPDATE_MEASURE_FAIL); http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/90f3b16d/service/src/test/java/org/apache/griffin/core/measure/repo/MeasureRepoTest.java ---------------------------------------------------------------------- diff --git a/service/src/test/java/org/apache/griffin/core/measure/repo/MeasureRepoTest.java b/service/src/test/java/org/apache/griffin/core/measure/repo/MeasureRepoTest.java index cd9e00e..f6b8725 100644 --- a/service/src/test/java/org/apache/griffin/core/measure/repo/MeasureRepoTest.java +++ b/service/src/test/java/org/apache/griffin/core/measure/repo/MeasureRepoTest.java @@ -52,14 +52,14 @@ public class MeasureRepoTest { @Test public void testFindAllOrganizations() { - List<String> orgs = measureRepo.findOrganizations(); + List<String> orgs = measureRepo.findOrganizations(false); assertThat(orgs.size()).isEqualTo(3); } @Test public void testFindNameByOrganization() { - List<String> orgs = measureRepo.findNameByOrganization("org1"); + List<String> orgs = measureRepo.findNameByOrganization("org1",false); assertThat(orgs.size()).isEqualTo(1); assertThat(orgs.get(0)).isEqualToIgnoringCase("m1");