Fix bugs and update info Fix: 1.fix data connector name cannot be repeated in one measure bug 2.fix read properties bug 3.fix data unit may empty string bug 4.fix range and data timezone bug 5.fix file predicator check wrong bug Update: 1.add get job config api 2.update rule some field not null 3.update measure rule description and add null check 4.update add job return type 5.update code style
Author: ahutsunshine <ahutsunsh...@gmail.com> Author: He Wang <wanghe...@qq.com> Author: dodobel <1254288...@qq.com> Closes #211 from ahutsunshine/master. Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/fd31809a Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/fd31809a Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/fd31809a Branch: refs/heads/master Commit: fd31809a60755c9069eba405e7af456e3e242eff Parents: 0f5d742 Author: ahutsunshine <ahutsunsh...@gmail.com> Authored: Tue Feb 6 10:36:25 2018 +0800 Committer: Lionel Liu <bhlx3l...@163.com> Committed: Tue Feb 6 10:36:25 2018 +0800 ---------------------------------------------------------------------- .../apache/griffin/measure/util/MailUtil.java | 20 ++-- .../griffin/core/common/CacheEvictor.java | 6 +- .../griffin/core/config/PropertiesConfig.java | 49 ++------- .../griffin/core/config/SchedulerConfig.java | 7 +- .../core/exception/GriffinExceptionMessage.java | 1 + .../griffin/core/job/FileExistPredicator.java | 10 +- .../apache/griffin/core/job/JobController.java | 5 + .../apache/griffin/core/job/JobInstance.java | 77 +++++++------ .../org/apache/griffin/core/job/JobService.java | 2 + .../apache/griffin/core/job/JobServiceImpl.java | 110 +++++++++++-------- .../apache/griffin/core/job/SparkSubmitJob.java | 24 ++-- .../griffin/core/job/entity/JobSchedule.java | 10 +- .../core/job/entity/SegmentPredicate.java | 10 +- .../griffin/core/job/entity/SegmentRange.java | 2 +- .../core/job/factory/PredicatorFactory.java | 4 + .../griffin/core/job/repo/JobInstanceRepo.java | 2 +- .../griffin/core/job/repo/JobScheduleRepo.java | 2 + .../measure/ExternalMeasureOperationImpl.java | 15 +-- .../measure/GriffinMeasureOperationImpl.java | 55 ++-------- .../griffin/core/measure/MeasureController.java | 6 + .../griffin/core/measure/MeasureService.java | 10 +- .../core/measure/MeasureServiceImpl.java | 9 ++ .../core/measure/entity/DataConnector.java | 27 ++++- .../core/measure/entity/GriffinMeasure.java | 41 ++++++- .../griffin/core/measure/entity/Rule.java | 87 ++++++++++++--- .../core/metastore/hive/HiveMetaStoreProxy.java | 2 +- .../hive/HiveMetaStoreServiceImpl.java | 25 +++-- .../griffin/core/metric/MetricStoreImpl.java | 7 +- .../apache/griffin/core/util/MeasureUtil.java | 76 +++++++++++++ .../griffin/core/util/PropertiesUtil.java | 52 +++++++++ .../org/apache/griffin/core/util/TimeUtil.java | 81 +++++++++----- service/src/main/resources/sparkJob.properties | 15 ++- .../core/config/PropertiesConfigTest.java | 48 +++----- .../griffin/core/job/JobControllerTest.java | 3 +- .../griffin/core/job/JobInstanceTest.java | 8 +- .../griffin/core/job/JobServiceImplTest.java | 42 +++---- .../GriffinMeasureOperationImplTest.java | 11 -- .../core/measure/MeasureControllerTest.java | 33 +++++- .../core/measure/MeasureServiceImplTest.java | 46 +++++++- .../griffin/core/util/PropertiesUtilTest.java | 38 ++++++- .../apache/griffin/core/util/TimeUtilTest.java | 27 ++--- service/src/test/resources/sparkJob.properties | 21 ++-- 42 files changed, 723 insertions(+), 403 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fd31809a/measure/src/main/scala/org/apache/griffin/measure/util/MailUtil.java ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/util/MailUtil.java b/measure/src/main/scala/org/apache/griffin/measure/util/MailUtil.java index 4bbc581..1d27838 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/util/MailUtil.java +++ b/measure/src/main/scala/org/apache/griffin/measure/util/MailUtil.java @@ -23,7 +23,10 @@ package org.apache.griffin.measure.util; import org.apache.griffin.measure.config.params.env.EmailParam; import javax.mail.*; -import javax.mail.internet.*; +import javax.mail.internet.InternetAddress; +import javax.mail.internet.MimeBodyPart; +import javax.mail.internet.MimeMessage; +import javax.mail.internet.MimeMultipart; import java.util.Properties; /** @@ -41,18 +44,17 @@ public class MailUtil { Message msg = new MimeMessage(session); msg.setFrom(new InternetAddress(emailParam.mail())); //msg.setRecipient(Message.RecipientType.TO, new InternetAddress(UserArr)); - String[] arr=null; - if (UserArr.indexOf(",")==-1) { - arr=new String[]{UserArr}; - } - else { - arr=UserArr.split(","); + String[] arr = null; + if (UserArr.indexOf(",") == -1) { + arr = new String[]{UserArr}; + } else { + arr = UserArr.split(","); } Address[] tos = new InternetAddress[arr.length]; - for (int i=0; i<arr.length; i++){ + for (int i = 0; i < arr.length; i++) { tos[i] = new InternetAddress(arr[i]); } - msg.setRecipients(Message.RecipientType.TO,tos); + msg.setRecipients(Message.RecipientType.TO, tos); msg.setSubject(Title); Multipart mainPart = new MimeMultipart(); BodyPart html = new MimeBodyPart(); http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fd31809a/service/src/main/java/org/apache/griffin/core/common/CacheEvictor.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/common/CacheEvictor.java b/service/src/main/java/org/apache/griffin/core/common/CacheEvictor.java index c40f4e1..17c0f91 100644 --- a/service/src/main/java/org/apache/griffin/core/common/CacheEvictor.java +++ b/service/src/main/java/org/apache/griffin/core/common/CacheEvictor.java @@ -30,8 +30,12 @@ import org.springframework.stereotype.Component; public class CacheEvictor { private static final Logger LOGGER = LoggerFactory.getLogger(CacheEvictor.class); + private final HiveMetaStoreService hiveMetaStoreService; + @Autowired - private HiveMetaStoreService hiveMetaStoreService; + public CacheEvictor(HiveMetaStoreService hiveMetaStoreService) { + this.hiveMetaStoreService = hiveMetaStoreService; + } @Scheduled(fixedRateString = "${cache.evict.hive.fixedRate.in.milliseconds}") @CacheEvict(cacheNames = "hive", allEntries = true, beforeInvocation = true) http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fd31809a/service/src/main/java/org/apache/griffin/core/config/PropertiesConfig.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/config/PropertiesConfig.java b/service/src/main/java/org/apache/griffin/core/config/PropertiesConfig.java index bfaba35..0f27513 100644 --- a/service/src/main/java/org/apache/griffin/core/config/PropertiesConfig.java +++ b/service/src/main/java/org/apache/griffin/core/config/PropertiesConfig.java @@ -19,22 +19,19 @@ under the License. package org.apache.griffin.core.config; -import org.apache.commons.lang.StringUtils; -import org.apache.griffin.core.util.PropertiesUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.ClassPathResource; -import org.springframework.core.io.InputStreamResource; -import org.springframework.core.io.Resource; -import java.io.File; -import java.io.FileInputStream; import java.io.FileNotFoundException; import java.util.Properties; +import static org.apache.griffin.core.util.PropertiesUtil.getConf; +import static org.apache.griffin.core.util.PropertiesUtil.getProperties; + @Configuration public class PropertiesConfig { @@ -47,50 +44,24 @@ public class PropertiesConfig { this.location = location; } - private String getPath(String defaultPath, String name) throws FileNotFoundException { - String path = defaultPath; - File file = new File(location); - LOGGER.info("File absolute path:" + file.getAbsolutePath()); - File[] files = file.listFiles(); - if (files == null || files.length == 0) { - LOGGER.error("The defaultPath {} does not exist.Please check your config in application.properties.", location); - throw new FileNotFoundException(); - } - for (File f : files) { - if (f.getName().equals(name)) { - path = location + File.separator + name; - LOGGER.info("config real path: {}", path); - } - } - return path; - } - @Bean(name = "appConf") public Properties appConf() { String path = "/application.properties"; - return PropertiesUtil.getProperties(path, new ClassPathResource(path)); + return getProperties(path, new ClassPathResource(path)); } @Bean(name = "livyConf") public Properties livyConf() throws FileNotFoundException { - String path = "/sparkJob.properties"; - if (StringUtils.isEmpty(location)) { - return PropertiesUtil.getProperties(path, new ClassPathResource(path)); - } - path = getPath(path, "sparkJob.properties"); - Resource resource = new InputStreamResource(new FileInputStream(path)); - return PropertiesUtil.getProperties(path, resource); + String name = "sparkJob.properties"; + String defaultPath = "/" + name; + return getConf(name, defaultPath, location); } @Bean(name = "quartzConf") public Properties quartzConf() throws FileNotFoundException { - String path = "/quartz.properties"; - if (StringUtils.isEmpty(location)) { - return PropertiesUtil.getProperties(path, new ClassPathResource(path)); - } - path = getPath(path, "quartz.properties"); - Resource resource = new InputStreamResource(new FileInputStream(path)); - return PropertiesUtil.getProperties(path, resource); + String name = "quartz.properties"; + String defaultPath = "/" + name; + return getConf(name, defaultPath, location); } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fd31809a/service/src/main/java/org/apache/griffin/core/config/SchedulerConfig.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/config/SchedulerConfig.java b/service/src/main/java/org/apache/griffin/core/config/SchedulerConfig.java index 896ab00..7295168 100644 --- a/service/src/main/java/org/apache/griffin/core/config/SchedulerConfig.java +++ b/service/src/main/java/org/apache/griffin/core/config/SchedulerConfig.java @@ -35,9 +35,12 @@ import java.util.Properties; @Configuration public class SchedulerConfig { + private final Properties quartzConf; + @Autowired - @Qualifier("quartzConf") - private Properties quartzConf; + public SchedulerConfig(@Qualifier("quartzConf") Properties quartzConf) { + this.quartzConf = quartzConf; + } @Bean public JobFactory jobFactory(ApplicationContext applicationContext) { http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fd31809a/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionMessage.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionMessage.java b/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionMessage.java index 83fbc23..aab8769 100644 --- a/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionMessage.java +++ b/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionMessage.java @@ -31,6 +31,7 @@ public enum GriffinExceptionMessage { INVALID_METRIC_RECORDS_SIZE(40007, "Size must not be less than zero"), INVALID_METRIC_VALUE_FORMAT(40008, "Metric value format is invalid"), INVALID_MEASURE_ID(40009, "Property 'measure.id' is invalid"), + INVALID_CRON_EXPRESSION(40010, "Property 'cron.expression' is invalid"), //404, "Not Found" MEASURE_ID_DOES_NOT_EXIST(40401, "Measure id does not exist"), http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fd31809a/service/src/main/java/org/apache/griffin/core/job/FileExistPredicator.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/FileExistPredicator.java b/service/src/main/java/org/apache/griffin/core/job/FileExistPredicator.java index 7354176..7ee8642 100644 --- a/service/src/main/java/org/apache/griffin/core/job/FileExistPredicator.java +++ b/service/src/main/java/org/apache/griffin/core/job/FileExistPredicator.java @@ -19,10 +19,13 @@ under the License. package org.apache.griffin.core.job; +import org.apache.commons.lang.ArrayUtils; +import org.apache.commons.lang.StringUtils; import org.apache.griffin.core.job.entity.SegmentPredicate; import org.apache.griffin.core.util.FSUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.util.CollectionUtils; import java.io.IOException; import java.util.Map; @@ -45,11 +48,12 @@ public class FileExistPredicator implements Predicator { public boolean predicate() throws IOException { Map<String, String> config = predicate.getConfigMap(); String[] paths = null; - if (config.get(PREDICT_PATH) != null) { + String rootPath = null; + if (config != null && !StringUtils.isEmpty(config.get(PREDICT_PATH))) { paths = config.get(PREDICT_PATH).split(PATH_CONNECTOR_CHARACTER); + rootPath = config.get(PREDICT_ROOT_PATH); } - String rootPath = config.get(PREDICT_ROOT_PATH); - if (paths == null || rootPath == null) { + if (ArrayUtils.isEmpty(paths) || StringUtils.isEmpty(rootPath)) { LOGGER.error("Predicate path is null.Please check predicates config root.path and path."); throw new NullPointerException(); } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fd31809a/service/src/main/java/org/apache/griffin/core/job/JobController.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/JobController.java b/service/src/main/java/org/apache/griffin/core/job/JobController.java index 3f03f8c..2d09d8b 100644 --- a/service/src/main/java/org/apache/griffin/core/job/JobController.java +++ b/service/src/main/java/org/apache/griffin/core/job/JobController.java @@ -41,6 +41,11 @@ public class JobController { return jobService.getAliveJobs(); } + @RequestMapping(value = "/jobs/config/{jobName}") + public JobSchedule getJobSchedule(@PathVariable("jobName") String jobName) { + return jobService.getJobSchedule(jobName); + } + @RequestMapping(value = "/jobs", method = RequestMethod.POST) @ResponseStatus(HttpStatus.CREATED) public JobSchedule addJob(@RequestBody JobSchedule jobSchedule) throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fd31809a/service/src/main/java/org/apache/griffin/core/job/JobInstance.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/JobInstance.java b/service/src/main/java/org/apache/griffin/core/job/JobInstance.java index ba0b1fb..5cffceb 100644 --- a/service/src/main/java/org/apache/griffin/core/job/JobInstance.java +++ b/service/src/main/java/org/apache/griffin/core/job/JobInstance.java @@ -38,13 +38,13 @@ import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.scheduling.quartz.SchedulerFactoryBean; import java.io.IOException; -import java.text.ParseException; import java.util.*; import static org.apache.griffin.core.job.JobServiceImpl.GRIFFIN_JOB_ID; import static org.apache.griffin.core.job.JobServiceImpl.JOB_SCHEDULE_ID; import static org.quartz.JobBuilder.newJob; import static org.quartz.JobKey.jobKey; +import static org.quartz.SimpleScheduleBuilder.simpleSchedule; import static org.quartz.TriggerBuilder.newTrigger; import static org.quartz.TriggerKey.triggerKey; @@ -55,7 +55,7 @@ public class JobInstance implements Job { public static final String MEASURE_KEY = "measure"; public static final String PREDICATES_KEY = "predicts"; public static final String PREDICATE_JOB_NAME = "predicateJobName"; - public static final String JOB_NAME = "jobName"; + static final String JOB_NAME = "jobName"; static final String PATH_CONNECTOR_CHARACTER = ","; @Autowired @@ -73,12 +73,12 @@ public class JobInstance implements Job { private JobSchedule jobSchedule; private GriffinMeasure measure; private GriffinJob griffinJob; - private List<SegmentPredicate> mPredicts; + private List<SegmentPredicate> mPredicates; private Long jobStartTime; @Override - public void execute(JobExecutionContext context) throws JobExecutionException { + public void execute(JobExecutionContext context) { try { initParam(context); setSourcesPartitionsAndPredicates(measure.getDataSources()); @@ -89,7 +89,7 @@ public class JobInstance implements Job { } private void initParam(JobExecutionContext context) throws SchedulerException { - mPredicts = new ArrayList<>(); + mPredicates = new ArrayList<>(); JobDetail jobDetail = context.getJobDetail(); Long jobScheduleId = jobDetail.getJobDataMap().getLong(JOB_SCHEDULE_ID); Long griffinJobId = jobDetail.getJobDataMap().getLong(GRIFFIN_JOB_ID); @@ -102,7 +102,7 @@ public class JobInstance implements Job { } private void setJobStartTime(JobDetail jobDetail) throws SchedulerException { - Scheduler scheduler = factory.getObject(); + Scheduler scheduler = factory.getScheduler(); JobKey jobKey = jobDetail.getKey(); List<Trigger> triggers = (List<Trigger>) scheduler.getTriggersOfJob(jobKey); Date triggerTime = triggers.get(0).getPreviousFireTime(); @@ -145,13 +145,14 @@ public class JobInstance implements Job { * split data into several part and get every part start timestamp * * @param segRange config of data + * @param dc data connector * @return split timestamps of data */ - private Long[] genSampleTs(SegmentRange segRange, DataConnector dc) throws IOException { + private Long[] genSampleTs(SegmentRange segRange, DataConnector dc) { Long offset = TimeUtil.str2Long(segRange.getBegin()); Long range = TimeUtil.str2Long(segRange.getLength()); String unit = dc.getDataUnit(); - Long dataUnit = TimeUtil.str2Long(unit != null ? unit : dc.getDefaultDataUnit()); + Long dataUnit = TimeUtil.str2Long(StringUtils.isEmpty(unit) ? dc.getDefaultDataUnit() : unit); //offset usually is negative Long dataStartTime = jobStartTime + offset; if (range < 0) { @@ -172,59 +173,68 @@ public class JobInstance implements Job { /** * set data connector predicates * + * @param dc data connector * @param sampleTs collection of data split start timestamp */ private void setConnectorPredicates(DataConnector dc, Long[] sampleTs) throws IOException { List<SegmentPredicate> predicates = dc.getPredicates(); - if (predicates != null) { - for (SegmentPredicate predicate : predicates) { - genConfMap(predicate.getConfigMap(), sampleTs); - //Do not forget to update origin string config - predicate.setConfigMap(predicate.getConfigMap()); - mPredicts.add(predicate); - } + for (SegmentPredicate predicate : predicates) { + genConfMap(dc, sampleTs); + //Do not forget to update origin string config + predicate.setConfigMap(predicate.getConfigMap()); + mPredicates.add(predicate); } } - /** - * set data connector configs - * - * @param sampleTs collection of data split start timestamp - */ private void setConnectorConf(DataConnector dc, Long[] sampleTs) throws IOException { - genConfMap(dc.getConfigMap(), sampleTs); + genConfMap(dc, sampleTs); dc.setConfigMap(dc.getConfigMap()); } /** - * @param conf map with file predicate,data split and partitions info + * @param dc data connector * @param sampleTs collection of data split start timestamp * @return all config data combine,like {"where": "year=2017 AND month=11 AND dt=15 AND hour=09,year=2017 AND month=11 AND dt=15 AND hour=10"} * or like {"path": "/year=2017/month=11/dt=15/hour=09/_DONE,/year=2017/month=11/dt=15/hour=10/_DONE"} */ - private void genConfMap(Map<String, String> conf, Long[] sampleTs) { + private void genConfMap(DataConnector dc, Long[] sampleTs) { + Map<String, String> conf = dc.getConfigMap(); + if (conf == null) { + LOGGER.warn("Predicate config is null."); + return; + } for (Map.Entry<String, String> entry : conf.entrySet()) { String value = entry.getValue(); Set<String> set = new HashSet<>(); + if (StringUtils.isEmpty(value)) { + continue; + } for (Long timestamp : sampleTs) { - set.add(TimeUtil.format(value, timestamp,jobSchedule.getTimeZone())); + set.add(TimeUtil.format(value, timestamp, getTimeZone(dc))); } conf.put(entry.getKey(), StringUtils.join(set, PATH_CONNECTOR_CHARACTER)); } } + private TimeZone getTimeZone(DataConnector dc) { + if (StringUtils.isEmpty(dc.getDataTimeZone())) { + return TimeZone.getDefault(); + } + return TimeZone.getTimeZone(dc.getDataTimeZone()); + } + private boolean createJobInstance(Map<String, Object> confMap) throws Exception { Map<String, Object> config = (Map<String, Object>) confMap.get("checkdonefile.schedule"); Long interval = TimeUtil.str2Long((String) config.get("interval")); Integer repeat = Integer.valueOf(config.get("repeat").toString()); String groupName = "PG"; String jobName = griffinJob.getJobName() + "_predicate_" + System.currentTimeMillis(); - Scheduler scheduler = factory.getObject(); + Scheduler scheduler = factory.getScheduler(); TriggerKey triggerKey = triggerKey(jobName, groupName); return !(scheduler.checkExists(triggerKey) || !saveGriffinJob(jobName, groupName) - || !createJobInstance(scheduler, triggerKey, interval, repeat, jobName)); + || !createJobInstance(triggerKey, interval, repeat, jobName)); } private boolean saveGriffinJob(String pName, String pGroup) { @@ -236,26 +246,27 @@ public class JobInstance implements Job { return true; } - private boolean createJobInstance(Scheduler scheduler, TriggerKey triggerKey, Long interval, Integer repeatCount, String pJobName) throws Exception { - JobDetail jobDetail = addJobDetail(scheduler, triggerKey, pJobName); - scheduler.scheduleJob(newTriggerInstance(triggerKey, jobDetail, interval, repeatCount)); + private boolean createJobInstance(TriggerKey triggerKey, Long interval, Integer repeatCount, String pJobName) throws Exception { + JobDetail jobDetail = addJobDetail(triggerKey, pJobName); + factory.getScheduler().scheduleJob(newTriggerInstance(triggerKey, jobDetail, interval, repeatCount)); return true; } - private Trigger newTriggerInstance(TriggerKey triggerKey, JobDetail jd, Long interval, Integer repeatCount) throws ParseException { + private Trigger newTriggerInstance(TriggerKey triggerKey, JobDetail jd, Long interval, Integer repeatCount) { return newTrigger() .withIdentity(triggerKey) .forJob(jd) .startNow() - .withSchedule(SimpleScheduleBuilder.simpleSchedule() + .withSchedule(simpleSchedule() .withIntervalInMilliseconds(interval) .withRepeatCount(repeatCount) ) .build(); } - private JobDetail addJobDetail(Scheduler scheduler, TriggerKey triggerKey, String pJobName) throws SchedulerException, JsonProcessingException { + private JobDetail addJobDetail(TriggerKey triggerKey, String pJobName) throws SchedulerException, JsonProcessingException { + Scheduler scheduler = factory.getScheduler(); JobKey jobKey = jobKey(triggerKey.getName(), triggerKey.getGroup()); JobDetail jobDetail; Boolean isJobKeyExist = scheduler.checkExists(jobKey); @@ -275,7 +286,7 @@ public class JobInstance implements Job { private void setJobDataMap(JobDetail jobDetail, String pJobName) throws JsonProcessingException { JobDataMap dataMap = jobDetail.getJobDataMap(); dataMap.put(MEASURE_KEY, JsonUtil.toJson(measure)); - dataMap.put(PREDICATES_KEY, JsonUtil.toJson(mPredicts)); + dataMap.put(PREDICATES_KEY, JsonUtil.toJson(mPredicates)); dataMap.put(JOB_NAME, griffinJob.getJobName()); dataMap.put(PREDICATE_JOB_NAME, pJobName); } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fd31809a/service/src/main/java/org/apache/griffin/core/job/JobService.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/JobService.java b/service/src/main/java/org/apache/griffin/core/job/JobService.java index 3c15f8f..a238311 100644 --- a/service/src/main/java/org/apache/griffin/core/job/JobService.java +++ b/service/src/main/java/org/apache/griffin/core/job/JobService.java @@ -31,6 +31,8 @@ public interface JobService { List<JobDataBean> getAliveJobs(); + JobSchedule getJobSchedule(String jobName); + JobSchedule addJob(JobSchedule jobSchedule) throws Exception; void pauseJob(String group, String name) throws SchedulerException; http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fd31809a/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java b/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java index aa84f11..b2faa72 100644 --- a/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java +++ b/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java @@ -47,10 +47,12 @@ import org.springframework.web.client.RestClientException; import org.springframework.web.client.RestTemplate; import java.io.IOException; -import java.text.ParseException; import java.util.*; +import static java.util.TimeZone.getTimeZone; import static org.apache.griffin.core.exception.GriffinExceptionMessage.*; +import static org.quartz.CronExpression.isValidExpression; +import static org.quartz.CronScheduleBuilder.cronSchedule; import static org.quartz.JobBuilder.newJob; import static org.quartz.JobKey.jobKey; import static org.quartz.TriggerBuilder.newTrigger; @@ -61,8 +63,8 @@ public class JobServiceImpl implements JobService { private static final Logger LOGGER = LoggerFactory.getLogger(JobServiceImpl.class); public static final String JOB_SCHEDULE_ID = "jobScheduleId"; public static final String GRIFFIN_JOB_ID = "griffinJobId"; - static final int MAX_PAGE_SIZE = 1024; - static final int DEFAULT_PAGE_SIZE = 10; + private static final int MAX_PAGE_SIZE = 1024; + private static final int DEFAULT_PAGE_SIZE = 10; @Autowired private SchedulerFactoryBean factory; @@ -86,12 +88,11 @@ public class JobServiceImpl implements JobService { @Override public List<JobDataBean> getAliveJobs() { - Scheduler scheduler = factory.getObject(); List<JobDataBean> dataList = new ArrayList<>(); try { List<GriffinJob> jobs = jobRepo.findByDeleted(false); for (GriffinJob job : jobs) { - JobDataBean jobData = genJobData(scheduler, jobKey(job.getQuartzName(), job.getQuartzGroup()), job); + JobDataBean jobData = genJobData(jobKey(job.getQuartzName(), job.getQuartzGroup()), job); if (jobData != null) { dataList.add(jobData); } @@ -103,7 +104,8 @@ public class JobServiceImpl implements JobService { return dataList; } - private JobDataBean genJobData(Scheduler scheduler, JobKey jobKey, GriffinJob job) throws SchedulerException { + private JobDataBean genJobData(JobKey jobKey, GriffinJob job) throws SchedulerException { + Scheduler scheduler = factory.getScheduler(); List<Trigger> triggers = (List<Trigger>) scheduler.getTriggersOfJob(jobKey); if (CollectionUtils.isEmpty(triggers)) { return null; @@ -136,15 +138,25 @@ public class JobServiceImpl implements JobService { } @Override + public JobSchedule getJobSchedule(String jobName) { + JobSchedule jobSchedule = jobScheduleRepo.findByJobName(jobName); + if (jobSchedule == null) { + LOGGER.warn("Job name {} does not exist.", jobName); + throw new GriffinException.NotFoundException(JOB_NAME_DOES_NOT_EXIST); + } + return jobSchedule; + } + + @Override @Transactional(rollbackFor = Exception.class) public JobSchedule addJob(JobSchedule js) throws Exception { Long measureId = js.getMeasureId(); GriffinMeasure measure = getMeasureIfValid(measureId); - checkJobScheduleParams(js, measure); + validateJobScheduleParams(js, measure); String qName = getQuartzName(js); - String qGroup = getQuartzGroupName(); + String qGroup = getQuartzGroup(); TriggerKey triggerKey = triggerKey(qName, qGroup); - if (factory.getObject().checkExists(triggerKey)) { + if (factory.getScheduler().checkExists(triggerKey)) { throw new GriffinException.ConflictException(QUARTZ_JOB_ALREADY_EXIST); } GriffinJob job = new GriffinJob(measure.getId(), js.getJobName(), qName, qGroup, false); @@ -155,33 +167,35 @@ public class JobServiceImpl implements JobService { } private void addJob(TriggerKey triggerKey, JobSchedule js, GriffinJob job) throws Exception { - Scheduler scheduler = factory.getObject(); - JobDetail jobDetail = addJobDetail(scheduler, triggerKey, js, job); - scheduler.scheduleJob(genTriggerInstance(triggerKey, jobDetail, js)); + JobDetail jobDetail = addJobDetail(triggerKey, js, job); + factory.getScheduler().scheduleJob(genTriggerInstance(triggerKey, jobDetail, js)); } private String getQuartzName(JobSchedule js) { return js.getJobName() + "_" + System.currentTimeMillis(); } - private String getQuartzGroupName() { + private String getQuartzGroup() { return "BA"; } - private void checkJobScheduleParams(JobSchedule js, GriffinMeasure measure) { - if (!isJobNameValid(js.getJobName())) { + private void validateJobScheduleParams(JobSchedule js, GriffinMeasure measure) { + if (!isValidJobName(js.getJobName())) { throw new GriffinException.BadRequestException(INVALID_JOB_NAME); } - if (!isBaseLineValid(js.getSegments())) { + if (!isValidCronExpression(js.getCronExpression())) { + throw new GriffinException.BadRequestException(INVALID_CRON_EXPRESSION); + } + if (!isValidBaseLine(js.getSegments())) { throw new GriffinException.BadRequestException(MISSING_BASELINE_CONFIG); } List<String> names = getConnectorNames(measure); - if (!isConnectorNamesValid(js.getSegments(), names)) { + if (!isValidConnectorNames(js.getSegments(), names)) { throw new GriffinException.BadRequestException(INVALID_CONNECTOR_NAME); } } - private boolean isJobNameValid(String jobName) { + private boolean isValidJobName(String jobName) { if (StringUtils.isEmpty(jobName)) { LOGGER.warn("Job name cannot be empty."); return false; @@ -194,7 +208,19 @@ public class JobServiceImpl implements JobService { return true; } - private boolean isBaseLineValid(List<JobDataSegment> segments) { + private boolean isValidCronExpression(String cronExpression) { + if (StringUtils.isEmpty(cronExpression)) { + LOGGER.warn("Cron Expression is empty."); + return false; + } + if (!isValidExpression(cronExpression)) { + LOGGER.warn("Cron Expression is invalid."); + return false; + } + return true; + } + + private boolean isValidBaseLine(List<JobDataSegment> segments) { for (JobDataSegment jds : segments) { if (jds.getBaseline()) { return true; @@ -204,34 +230,25 @@ public class JobServiceImpl implements JobService { return false; } - private boolean isConnectorNamesValid(List<JobDataSegment> segments, List<String> names) { - Set<String> dcSets = new HashSet<>(); + private boolean isValidConnectorNames(List<JobDataSegment> segments, List<String> names) { + Set<String> sets = new HashSet<>(); for (JobDataSegment segment : segments) { String dcName = segment.getDataConnectorName(); - dcSets.add(dcName); - if (!isConnectorNameValid(dcName, names)) { + sets.add(dcName); + boolean exist = names.stream().anyMatch(name -> name.equals(dcName)); + if (!exist) { + LOGGER.warn("Param {} is a illegal string. Please input one of strings in {}.", dcName, names); return false; } } - if (dcSets.size() < segments.size()) { - LOGGER.warn("Connector names in job data segment cannot be repeated."); + if (sets.size() < segments.size()) { + LOGGER.warn("Connector names in job data segment cannot duplicate."); return false; } return true; } - private boolean isConnectorNameValid(String param, List<String> names) { - for (String name : names) { - if (name.equals(param)) { - return true; - } - } - LOGGER.warn("Param {} is a illegal string. Please input one of strings in {}.", param, names); - return false; - } - private List<String> getConnectorNames(GriffinMeasure measure) { - List<String> names = new ArrayList<>(); Set<String> sets = new HashSet<>(); List<DataSource> sources = measure.getDataSources(); for (DataSource source : sources) { @@ -241,8 +258,7 @@ public class JobServiceImpl implements JobService { LOGGER.warn("Connector names cannot be repeated."); return Collections.emptyList(); } - names.addAll(sets); - return names; + return new ArrayList<>(sets); } private GriffinMeasure getMeasureIfValid(Long measureId) { @@ -254,18 +270,18 @@ public class JobServiceImpl implements JobService { return measure; } - - private Trigger genTriggerInstance(TriggerKey triggerKey, JobDetail jd, JobSchedule js) throws ParseException { + private Trigger genTriggerInstance(TriggerKey triggerKey, JobDetail jd, JobSchedule js) { return newTrigger() .withIdentity(triggerKey) .forJob(jd) - .withSchedule(CronScheduleBuilder.cronSchedule(new CronExpression(js.getCronExpression())) - .inTimeZone(TimeZone.getTimeZone(js.getTimeZone())) + .withSchedule(cronSchedule(js.getCronExpression()) + .inTimeZone(getTimeZone(js.getTimeZone())) ) .build(); } - private JobDetail addJobDetail(Scheduler scheduler, TriggerKey triggerKey, JobSchedule js, GriffinJob job) throws SchedulerException { + private JobDetail addJobDetail(TriggerKey triggerKey, JobSchedule js, GriffinJob job) throws SchedulerException { + Scheduler scheduler = factory.getScheduler(); JobKey jobKey = jobKey(triggerKey.getName(), triggerKey.getGroup()); JobDetail jobDetail; Boolean isJobKeyExist = scheduler.checkExists(jobKey); @@ -315,7 +331,7 @@ public class JobServiceImpl implements JobService { @Override public void pauseJob(String group, String name) throws SchedulerException { - Scheduler scheduler = factory.getObject(); + Scheduler scheduler = factory.getScheduler(); JobKey jobKey = new JobKey(name, group); if (!scheduler.checkExists(jobKey)) { LOGGER.warn("Job({},{}) does not exist.", jobKey.getGroup(), jobKey.getName()); @@ -388,7 +404,7 @@ public class JobServiceImpl implements JobService { } private void deleteJob(String group, String name) throws SchedulerException { - Scheduler scheduler = factory.getObject(); + Scheduler scheduler = factory.getScheduler(); JobKey jobKey = new JobKey(name, group); if (!scheduler.checkExists(jobKey)) { LOGGER.info("Job({},{}) does not exist.", jobKey.getGroup(), jobKey.getName()); @@ -450,7 +466,6 @@ public class JobServiceImpl implements JobService { } } - /** * call livy to update part of job instance table data associated with group and jobName in mysql. * @@ -476,7 +491,6 @@ public class JobServiceImpl implements JobService { } } - private void setJobInstanceIdAndUri(JobInstanceBean instance, HashMap<String, Object> resultMap) { if (resultMap != null && resultMap.size() != 0 && resultMap.get("state") != null) { instance.setState(LivySessionStates.State.valueOf(resultMap.get("state").toString())); @@ -528,7 +542,7 @@ public class JobServiceImpl implements JobService { JobKey jobKey = new JobKey(job.getQuartzName(), job.getQuartzGroup()); List<Trigger> triggers; try { - triggers = (List<Trigger>) factory.getObject().getTriggersOfJob(jobKey); + triggers = (List<Trigger>) factory.getScheduler().getTriggersOfJob(jobKey); } catch (SchedulerException e) { LOGGER.error("Job schedule exception. {}", e.getMessage()); throw new GriffinException.ServiceException("Fail to Get HealthInfo", e); http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fd31809a/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java b/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java index 9a9785f..f1b254a 100644 --- a/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java +++ b/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java @@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.util.CollectionUtils; +import org.springframework.util.StringUtils; import org.springframework.web.client.RestTemplate; import java.io.IOException; @@ -58,7 +59,7 @@ public class SparkSubmitJob implements Job { private GriffinMeasure measure; private String livyUri; - private List<SegmentPredicate> mPredicts; + private List<SegmentPredicate> mPredicates; private JobInstanceBean jobInstance; private RestTemplate restTemplate = new RestTemplate(); private LivyConf livyConf = new LivyConf(); @@ -69,7 +70,7 @@ public class SparkSubmitJob implements Job { try { initParam(jd); setLivyConf(); - if (!success(mPredicts)) { + if (!success(mPredicates)) { updateJobInstanceState(context); return; } @@ -107,7 +108,7 @@ public class SparkSubmitJob implements Job { for (SegmentPredicate segPredicate : predicates) { Predicator predicator = PredicatorFactory.newPredicateInstance(segPredicate); try { - if (!predicator.predicate()) { + if (predicator != null && !predicator.predicate()) { return false; } } catch (Exception e) { @@ -119,7 +120,7 @@ public class SparkSubmitJob implements Job { } private void initParam(JobDetail jd) throws IOException { - mPredicts = new ArrayList<>(); + mPredicates = new ArrayList<>(); livyUri = livyConfProps.getProperty("livy.uri"); jobInstance = jobInstanceRepo.findByPredicateName(jd.getJobDataMap().getString(PREDICATE_JOB_NAME)); measure = JsonUtil.toEntity(jd.getJobDataMap().getString(MEASURE_KEY), GriffinMeasure.class); @@ -128,15 +129,16 @@ public class SparkSubmitJob implements Job { } private void setPredicts(String json) throws IOException { + if (StringUtils.isEmpty(json)) { + return; + } List<Map<String, Object>> maps = JsonUtil.toEntity(json, new TypeReference<List<Map>>() { }); - if (maps != null) { - for (Map<String, Object> map : maps) { - SegmentPredicate sp = new SegmentPredicate(); - sp.setType((String) map.get("type")); - sp.setConfigMap((Map<String, String>) map.get("config")); - mPredicts.add(sp); - } + for (Map<String, Object> map : maps) { + SegmentPredicate sp = new SegmentPredicate(); + sp.setType((String) map.get("type")); + sp.setConfigMap((Map<String, String>) map.get("config")); + mPredicates.add(sp); } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fd31809a/service/src/main/java/org/apache/griffin/core/job/entity/JobSchedule.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/JobSchedule.java b/service/src/main/java/org/apache/griffin/core/job/entity/JobSchedule.java index d1dd44f..0a1c2c4 100644 --- a/service/src/main/java/org/apache/griffin/core/job/entity/JobSchedule.java +++ b/service/src/main/java/org/apache/griffin/core/job/entity/JobSchedule.java @@ -129,13 +129,15 @@ public class JobSchedule extends AbstractAuditableEntity { } private void setPredicateConfig(String config) throws IOException { - this.predicateConfig = config; - this.configMap = JsonUtil.toEntity(config, new TypeReference<Map<String, Object>>() { - }); + if (!StringUtils.isEmpty(config)) { + this.predicateConfig = config; + this.configMap = JsonUtil.toEntity(config, new TypeReference<Map<String, Object>>() { + }); + } } @JsonProperty("predicate.config") - public Map<String, Object> getConfigMap() throws IOException { + public Map<String, Object> getConfigMap() { return configMap; } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fd31809a/service/src/main/java/org/apache/griffin/core/job/entity/SegmentPredicate.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/SegmentPredicate.java b/service/src/main/java/org/apache/griffin/core/job/entity/SegmentPredicate.java index 78b2794..ac51f97 100644 --- a/service/src/main/java/org/apache/griffin/core/job/entity/SegmentPredicate.java +++ b/service/src/main/java/org/apache/griffin/core/job/entity/SegmentPredicate.java @@ -60,13 +60,15 @@ public class SegmentPredicate extends AbstractAuditableEntity { } public void setConfig(String config) throws IOException { - this.config = config; - this.configMap = JsonUtil.toEntity(config, new TypeReference<Map<String, String>>() { - }); + if (!StringUtils.isEmpty(config)) { + this.config = config; + this.configMap = JsonUtil.toEntity(config, new TypeReference<Map<String, String>>() { + }); + } } @JsonProperty("config") - public Map<String, String> getConfigMap() throws IOException { + public Map<String, String> getConfigMap(){ return configMap; } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fd31809a/service/src/main/java/org/apache/griffin/core/job/entity/SegmentRange.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/SegmentRange.java b/service/src/main/java/org/apache/griffin/core/job/entity/SegmentRange.java index 5393f22..fbd5fbc 100644 --- a/service/src/main/java/org/apache/griffin/core/job/entity/SegmentRange.java +++ b/service/src/main/java/org/apache/griffin/core/job/entity/SegmentRange.java @@ -29,7 +29,7 @@ import javax.persistence.Entity; public class SegmentRange extends AbstractAuditableEntity { @Column(name = "data_begin") - private String begin = "1h"; + private String begin = "-1h"; private String length = "1h"; http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fd31809a/service/src/main/java/org/apache/griffin/core/job/factory/PredicatorFactory.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/factory/PredicatorFactory.java b/service/src/main/java/org/apache/griffin/core/job/factory/PredicatorFactory.java index 8af39f4..3aa7403 100644 --- a/service/src/main/java/org/apache/griffin/core/job/factory/PredicatorFactory.java +++ b/service/src/main/java/org/apache/griffin/core/job/factory/PredicatorFactory.java @@ -22,8 +22,11 @@ package org.apache.griffin.core.job.factory; import org.apache.griffin.core.job.FileExistPredicator; import org.apache.griffin.core.job.Predicator; import org.apache.griffin.core.job.entity.SegmentPredicate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class PredicatorFactory { + private static final Logger LOGGER = LoggerFactory.getLogger(PredicatorFactory.class); public static Predicator newPredicateInstance(SegmentPredicate segPredicate) { Predicator predicate = null; switch (segPredicate.getType()) { @@ -31,6 +34,7 @@ public class PredicatorFactory { predicate = new FileExistPredicator(segPredicate); break; default: + LOGGER.warn("There is no predicate type that you input."); break; } return predicate; http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fd31809a/service/src/main/java/org/apache/griffin/core/job/repo/JobInstanceRepo.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/repo/JobInstanceRepo.java b/service/src/main/java/org/apache/griffin/core/job/repo/JobInstanceRepo.java index 1714789..c873a97 100644 --- a/service/src/main/java/org/apache/griffin/core/job/repo/JobInstanceRepo.java +++ b/service/src/main/java/org/apache/griffin/core/job/repo/JobInstanceRepo.java @@ -40,7 +40,7 @@ public interface JobInstanceRepo extends CrudRepository<JobInstanceBean, Long> { List<JobInstanceBean> findByExpireTmsLessThanEqual(Long expireTms); - @Transactional + @Transactional(rollbackFor = Exception.class) @Modifying @Query("delete from JobInstanceBean j where j.expireTms <= ?1") int deleteByExpireTimestamp(Long expireTms); http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fd31809a/service/src/main/java/org/apache/griffin/core/job/repo/JobScheduleRepo.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/repo/JobScheduleRepo.java b/service/src/main/java/org/apache/griffin/core/job/repo/JobScheduleRepo.java index 1b360e4..49e5db9 100644 --- a/service/src/main/java/org/apache/griffin/core/job/repo/JobScheduleRepo.java +++ b/service/src/main/java/org/apache/griffin/core/job/repo/JobScheduleRepo.java @@ -23,4 +23,6 @@ import org.apache.griffin.core.job.entity.JobSchedule; import org.springframework.data.repository.CrudRepository; public interface JobScheduleRepo extends CrudRepository<JobSchedule, Long> { + + JobSchedule findByJobName(String jobName); } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fd31809a/service/src/main/java/org/apache/griffin/core/measure/ExternalMeasureOperationImpl.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/measure/ExternalMeasureOperationImpl.java b/service/src/main/java/org/apache/griffin/core/measure/ExternalMeasureOperationImpl.java index dc7c056..bb76c09 100644 --- a/service/src/main/java/org/apache/griffin/core/measure/ExternalMeasureOperationImpl.java +++ b/service/src/main/java/org/apache/griffin/core/measure/ExternalMeasureOperationImpl.java @@ -19,8 +19,6 @@ under the License. package org.apache.griffin.core.measure; -import org.apache.commons.lang.StringUtils; -import org.apache.griffin.core.exception.GriffinException; import org.apache.griffin.core.job.entity.VirtualJob; import org.apache.griffin.core.job.repo.VirtualJobRepo; import org.apache.griffin.core.measure.entity.ExternalMeasure; @@ -32,7 +30,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; -import static org.apache.griffin.core.exception.GriffinExceptionMessage.MISSING_METRIC_NAME; +import static org.apache.griffin.core.util.MeasureUtil.validateMeasure; @Component("externalOperation") public class ExternalMeasureOperationImpl implements MeasureOperation { @@ -47,11 +45,7 @@ public class ExternalMeasureOperationImpl implements MeasureOperation { @Transactional public Measure create(Measure measure) { ExternalMeasure em = (ExternalMeasure) measure; - if (StringUtils.isBlank(em.getMetricName())) { - LOGGER.warn("Failed to create external measure {}. Its metric name is blank.", measure.getName()); - throw new GriffinException.BadRequestException(MISSING_METRIC_NAME); - - } + validateMeasure(em); em.setVirtualJob(new VirtualJob()); em = measureRepo.save(em); VirtualJob vj = genVirtualJob(em, em.getVirtualJob()); @@ -62,10 +56,7 @@ public class ExternalMeasureOperationImpl implements MeasureOperation { @Override public void update(Measure measure) { ExternalMeasure latestMeasure = (ExternalMeasure) measure; - if (StringUtils.isBlank(latestMeasure.getMetricName())) { - LOGGER.warn("Failed to update external measure {}. Its metric name is blank.", measure.getName()); - throw new GriffinException.BadRequestException(MISSING_METRIC_NAME); - } + validateMeasure(latestMeasure); ExternalMeasure originMeasure = measureRepo.findOne(latestMeasure.getId()); VirtualJob vj = genVirtualJob(latestMeasure, originMeasure.getVirtualJob()); latestMeasure.setVirtualJob(vj); http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fd31809a/service/src/main/java/org/apache/griffin/core/measure/GriffinMeasureOperationImpl.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/measure/GriffinMeasureOperationImpl.java b/service/src/main/java/org/apache/griffin/core/measure/GriffinMeasureOperationImpl.java index 22561a4..c15ff23 100644 --- a/service/src/main/java/org/apache/griffin/core/measure/GriffinMeasureOperationImpl.java +++ b/service/src/main/java/org/apache/griffin/core/measure/GriffinMeasureOperationImpl.java @@ -19,46 +19,41 @@ under the License. package org.apache.griffin.core.measure; -import org.apache.commons.lang.StringUtils; -import org.apache.griffin.core.exception.GriffinException; import org.apache.griffin.core.job.JobServiceImpl; -import org.apache.griffin.core.measure.entity.DataConnector; -import org.apache.griffin.core.measure.entity.DataSource; -import org.apache.griffin.core.measure.entity.GriffinMeasure; import org.apache.griffin.core.measure.entity.Measure; -import org.apache.griffin.core.measure.repo.DataConnectorRepo; import org.apache.griffin.core.measure.repo.MeasureRepo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import org.springframework.util.CollectionUtils; -import java.util.ArrayList; -import java.util.List; - -import static org.apache.griffin.core.exception.GriffinExceptionMessage.INVALID_CONNECTOR_NAME; +import static org.apache.griffin.core.util.MeasureUtil.validateMeasure; @Component("griffinOperation") public class GriffinMeasureOperationImpl implements MeasureOperation { private static final Logger LOGGER = LoggerFactory.getLogger(GriffinMeasureOperationImpl.class); + private final MeasureRepo<Measure> measureRepo; + + private final JobServiceImpl jobService; + @Autowired - private MeasureRepo<Measure> measureRepo; - @Autowired - private DataConnectorRepo dcRepo; - @Autowired - private JobServiceImpl jobService; + public GriffinMeasureOperationImpl(MeasureRepo<Measure> measureRepo, JobServiceImpl jobService) { + this.measureRepo = measureRepo; + this.jobService = jobService; + } @Override public Measure create(Measure measure) { - checkConnectorNames((GriffinMeasure) measure); + validateMeasure(measure); return measureRepo.save(measure); } @Override public void update(Measure measure) { + validateMeasure(measure); + measure.setDeleted(false); measureRepo.save(measure); } @@ -68,30 +63,4 @@ public class GriffinMeasureOperationImpl implements MeasureOperation { measure.setDeleted(true); measureRepo.save(measure); } - - private void checkConnectorNames(GriffinMeasure measure) { - List<String> names = getConnectorNames(measure); - if (names.size() == 0) { - LOGGER.warn("Connector names cannot be empty."); - throw new GriffinException.BadRequestException(INVALID_CONNECTOR_NAME); - } - List<DataConnector> connectors = dcRepo.findByConnectorNames(names); - if (!CollectionUtils.isEmpty(connectors)) { - LOGGER.warn("Failed to create new measure {}. It's connector names already exist. ", measure.getName()); - throw new GriffinException.BadRequestException(INVALID_CONNECTOR_NAME); - } - } - - private List<String> getConnectorNames(GriffinMeasure measure) { - List<String> names = new ArrayList<>(); - for (DataSource source : measure.getDataSources()) { - for (DataConnector dc : source.getConnectors()) { - String name = dc.getName(); - if (!StringUtils.isEmpty(name)) { - names.add(name); - } - } - } - return names; - } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fd31809a/service/src/main/java/org/apache/griffin/core/measure/MeasureController.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/measure/MeasureController.java b/service/src/main/java/org/apache/griffin/core/measure/MeasureController.java index 91ffaac..eda0a1f 100644 --- a/service/src/main/java/org/apache/griffin/core/measure/MeasureController.java +++ b/service/src/main/java/org/apache/griffin/core/measure/MeasureController.java @@ -48,6 +48,12 @@ public class MeasureController { measureService.deleteMeasureById(id); } + @RequestMapping(value = "/measures", method = RequestMethod.DELETE) + @ResponseStatus(HttpStatus.NO_CONTENT) + public void deleteMeasures() { + measureService.deleteMeasures(); + } + @RequestMapping(value = "/measures", method = RequestMethod.PUT) @ResponseStatus(HttpStatus.NO_CONTENT) public void updateMeasure(@RequestBody Measure measure) { http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fd31809a/service/src/main/java/org/apache/griffin/core/measure/MeasureService.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/measure/MeasureService.java b/service/src/main/java/org/apache/griffin/core/measure/MeasureService.java index 818db47..a3d9640 100644 --- a/service/src/main/java/org/apache/griffin/core/measure/MeasureService.java +++ b/service/src/main/java/org/apache/griffin/core/measure/MeasureService.java @@ -30,17 +30,9 @@ public interface MeasureService { Measure getMeasureById(long id); -/* - Measure getMeasureByName(String measureName); -*/ - - void deleteMeasureById(Long id); - -/* - GriffinOperationMessage deleteMeasureByName(String measureName) ; -*/ + void deleteMeasures(); void updateMeasure(Measure measure); http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fd31809a/service/src/main/java/org/apache/griffin/core/measure/MeasureServiceImpl.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/measure/MeasureServiceImpl.java b/service/src/main/java/org/apache/griffin/core/measure/MeasureServiceImpl.java index d9d7cd8..66d7252 100644 --- a/service/src/main/java/org/apache/griffin/core/measure/MeasureServiceImpl.java +++ b/service/src/main/java/org/apache/griffin/core/measure/MeasureServiceImpl.java @@ -102,6 +102,15 @@ public class MeasureServiceImpl implements MeasureService { op.delete(measure); } + @Override + public void deleteMeasures() { + List<Measure> measures = measureRepo.findByDeleted(false); + for (Measure m : measures) { + MeasureOperation op = getOperation(m); + op.delete(m); + } + } + private MeasureOperation getOperation(Measure measure) { if (measure instanceof GriffinMeasure) { return griffinOp; http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fd31809a/service/src/main/java/org/apache/griffin/core/measure/entity/DataConnector.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/measure/entity/DataConnector.java b/service/src/main/java/org/apache/griffin/core/measure/entity/DataConnector.java index 8a87ea5..dae35e1 100644 --- a/service/src/main/java/org/apache/griffin/core/measure/entity/DataConnector.java +++ b/service/src/main/java/org/apache/griffin/core/measure/entity/DataConnector.java @@ -53,6 +53,9 @@ public class DataConnector extends AbstractAuditableEntity { @JsonInclude(JsonInclude.Include.NON_NULL) private String dataUnit; + @JsonInclude(JsonInclude.Include.NON_NULL) + private String dataTimeZone; + @JsonIgnore @Transient private String defaultDataUnit = "365000d"; @@ -77,7 +80,7 @@ public class DataConnector extends AbstractAuditableEntity { } @JsonProperty("config") - public Map<String, String> getConfigMap() throws IOException { + public Map<String, String> getConfigMap() { return configMap; } @@ -88,12 +91,14 @@ public class DataConnector extends AbstractAuditableEntity { } public void setConfig(String config) throws IOException { - this.config = config; - this.configMap = JsonUtil.toEntity(config, new TypeReference<Map<String, String>>() { - }); + if (!StringUtils.isEmpty(config)) { + this.config = config; + this.configMap = JsonUtil.toEntity(config, new TypeReference<Map<String, String>>() { + }); + } } - public String getConfig() throws IOException { + public String getConfig() { return config; } @@ -107,6 +112,16 @@ public class DataConnector extends AbstractAuditableEntity { this.dataUnit = dataUnit; } + @JsonProperty("data.time.zone") + public String getDataTimeZone() { + return dataTimeZone; + } + + @JsonProperty("data.time.zone") + public void setDataTimeZone(String dataTimeZone) { + this.dataTimeZone = dataTimeZone; + } + public String getDefaultDataUnit() { return defaultDataUnit; } @@ -156,7 +171,7 @@ public class DataConnector extends AbstractAuditableEntity { }); } - public DataConnector(String name, String dataUnit, Map configMap,List<SegmentPredicate> predicates) throws IOException { + public DataConnector(String name, String dataUnit, Map configMap, List<SegmentPredicate> predicates) throws IOException { this.name = name; this.dataUnit = dataUnit; this.configMap = configMap; http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fd31809a/service/src/main/java/org/apache/griffin/core/measure/entity/GriffinMeasure.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/measure/entity/GriffinMeasure.java b/service/src/main/java/org/apache/griffin/core/measure/entity/GriffinMeasure.java index 8576b86..d597a6d 100644 --- a/service/src/main/java/org/apache/griffin/core/measure/entity/GriffinMeasure.java +++ b/service/src/main/java/org/apache/griffin/core/measure/entity/GriffinMeasure.java @@ -19,14 +19,21 @@ under the License. package org.apache.griffin.core.measure.entity; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; import org.apache.commons.collections.CollectionUtils; +import org.apache.griffin.core.util.JsonUtil; +import org.springframework.util.StringUtils; import javax.persistence.*; import javax.validation.constraints.NotNull; +import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Map; /** * Measures processed on Griffin @@ -40,6 +47,15 @@ public class GriffinMeasure extends Measure { @JsonInclude(JsonInclude.Include.NON_NULL) private Long timestamp; + @JsonIgnore + @Access(AccessType.PROPERTY) + @Column(length = 1024) + private String ruleDescription; + + @Transient + @JsonInclude(JsonInclude.Include.NON_NULL) + private Map<String, Object> ruleDescriptionMap; + @NotNull @OneToMany(fetch = FetchType.EAGER, cascade = {CascadeType.PERSIST, CascadeType.REMOVE, CascadeType.MERGE}) @JoinColumn(name = "measure_id") @@ -86,6 +102,29 @@ public class GriffinMeasure extends Measure { this.evaluateRule = evaluateRule; } + public String getRuleDescription() { + return ruleDescription; + } + + public void setRuleDescription(String ruleDescription) throws IOException { + if (!StringUtils.isEmpty(ruleDescription)) { + this.ruleDescription = ruleDescription; + this.ruleDescriptionMap = JsonUtil.toEntity(ruleDescription, new TypeReference<Map<String, Object>>() { + }); + } + } + + @JsonProperty("rule.description") + public Map<String, Object> getRuleDescriptionMap() { + return ruleDescriptionMap; + } + + @JsonProperty("rule.description") + public void setRuleDescriptionMap(Map<String, Object> ruleDescriptionMap) throws JsonProcessingException { + this.ruleDescriptionMap = ruleDescriptionMap; + this.ruleDescription = JsonUtil.toJson(ruleDescriptionMap); + } + public Long getTimestamp() { return timestamp; } @@ -110,7 +149,7 @@ public class GriffinMeasure extends Measure { this.evaluateRule = evaluateRule; } - public GriffinMeasure(Long measureId,String name, String owner, List<DataSource> dataSources, EvaluateRule evaluateRule) { + public GriffinMeasure(Long measureId, String name, String owner, List<DataSource> dataSources, EvaluateRule evaluateRule) { this.setId(measureId); this.name = name; this.owner = owner; http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fd31809a/service/src/main/java/org/apache/griffin/core/measure/entity/Rule.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/measure/entity/Rule.java b/service/src/main/java/org/apache/griffin/core/measure/entity/Rule.java index 8a61bfb..a7b424d 100644 --- a/service/src/main/java/org/apache/griffin/core/measure/entity/Rule.java +++ b/service/src/main/java/org/apache/griffin/core/measure/entity/Rule.java @@ -22,10 +22,13 @@ package org.apache.griffin.core.measure.entity; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import org.apache.griffin.core.util.JsonUtil; +import org.springframework.util.StringUtils; import javax.persistence.*; +import javax.validation.constraints.NotNull; import java.io.IOException; import java.util.Map; @@ -36,18 +39,18 @@ public class Rule extends AbstractAuditableEntity { /** * three type:1.griffin-dsl 2.df-opr 3.spark-sql */ + @NotNull private String dslType; + @NotNull private String dqType; @Column(length = 8 * 1024) + @NotNull private String rule; private String name; - @Column(length = 1024) - private String description; - @JsonIgnore @Access(AccessType.PROPERTY) @Column(length = 1024) @@ -57,6 +60,22 @@ public class Rule extends AbstractAuditableEntity { @JsonInclude(JsonInclude.Include.NON_NULL) private Map<String, Object> detailsMap; + @JsonIgnore + @Access(AccessType.PROPERTY) + private String metric; + + @Transient + @JsonInclude(JsonInclude.Include.NON_NULL) + private Map<String, Object> metricMap; + + @JsonIgnore + @Access(AccessType.PROPERTY) + private String record; + + @Transient + @JsonInclude(JsonInclude.Include.NON_NULL) + private Map<String, Object> recordMap; + @JsonProperty("dsl.type") public String getDslType() { return dslType; @@ -90,9 +109,11 @@ public class Rule extends AbstractAuditableEntity { } private void setDetails(String details) throws IOException { - this.details = details; - detailsMap = JsonUtil.toEntity(details, new TypeReference<Map<String, Object>>() { - }); + if (!StringUtils.isEmpty(details)) { + this.details = details; + this.detailsMap = JsonUtil.toEntity(details, new TypeReference<Map<String, Object>>() { + }); + } } @JsonProperty("details") @@ -106,20 +127,58 @@ public class Rule extends AbstractAuditableEntity { this.details = JsonUtil.toJson(details); } - public String getName() { - return name; + public String getMetric() { + return metric; } - public void setName(String name) { - this.name = name; + public void setMetric(String metric) throws IOException { + if (!StringUtils.isEmpty(metric)) { + this.metric = metric; + this.metricMap = JsonUtil.toEntity(metric, new TypeReference<Map<String, Object>>() { + }); + } + } + + @JsonProperty("metric") + public Map<String, Object> getMetricMap() { + return metricMap; + } + + @JsonProperty("metric") + public void setMetricMap(Map<String, Object> metricMap) throws JsonProcessingException { + this.metricMap = metricMap; + this.metric = JsonUtil.toJson(metricMap); + } + + public String getRecord() { + return record; + } + + public void setRecord(String record) throws IOException { + if (!StringUtils.isEmpty(record)) { + this.record = record; + this.recordMap = JsonUtil.toEntity(record, new TypeReference<Map<String, Object>>() { + }); + } } - public String getDescription() { - return description; + @JsonProperty("record") + public Map<String, Object> getRecordMap() { + return recordMap; } - public void setDescription(String description) { - this.description = description; + @JsonProperty("record") + public void setRecordMap(Map<String, Object> recordMap) throws JsonProcessingException { + this.recordMap = recordMap; + this.record = JsonUtil.toJson(recordMap); + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; } public Rule() { http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fd31809a/service/src/main/java/org/apache/griffin/core/metastore/hive/HiveMetaStoreProxy.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/metastore/hive/HiveMetaStoreProxy.java b/service/src/main/java/org/apache/griffin/core/metastore/hive/HiveMetaStoreProxy.java index 4df5796..f632f14 100644 --- a/service/src/main/java/org/apache/griffin/core/metastore/hive/HiveMetaStoreProxy.java +++ b/service/src/main/java/org/apache/griffin/core/metastore/hive/HiveMetaStoreProxy.java @@ -70,7 +70,7 @@ public class HiveMetaStoreProxy { } @PreDestroy - public void destroy() throws Exception { + public void destroy() { if (null != client) { client.close(); } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fd31809a/service/src/main/java/org/apache/griffin/core/metastore/hive/HiveMetaStoreServiceImpl.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/metastore/hive/HiveMetaStoreServiceImpl.java b/service/src/main/java/org/apache/griffin/core/metastore/hive/HiveMetaStoreServiceImpl.java index 8c41008..e941aaa 100644 --- a/service/src/main/java/org/apache/griffin/core/metastore/hive/HiveMetaStoreServiceImpl.java +++ b/service/src/main/java/org/apache/griffin/core/metastore/hive/HiveMetaStoreServiceImpl.java @@ -59,14 +59,6 @@ public class HiveMetaStoreServiceImpl implements HiveMetaStoreService { LOGGER.info("HiveMetaStoreServiceImpl single thread pool created."); } - private String getUseDbName(String dbName) { - if (!StringUtils.hasText(dbName)) { - return defaultDbName; - } else { - return dbName; - } - } - @Override @Cacheable public Iterable<String> getAllDatabases() { @@ -121,10 +113,11 @@ public class HiveMetaStoreServiceImpl implements HiveMetaStoreService { return results; } dbs = getAllDatabases(); - if (dbs != null) { - for (String db : dbs) { - results.put(db, getTables(db)); - } + if (dbs == null) { + return results; + } + for (String db : dbs) { + results.put(db, getTables(db)); } return results; } @@ -168,6 +161,14 @@ public class HiveMetaStoreServiceImpl implements HiveMetaStoreService { return allTables; } + private String getUseDbName(String dbName) { + if (!StringUtils.hasText(dbName)) { + return defaultDbName; + } else { + return dbName; + } + } + private void reconnect() { if (singleThreadExecutor.getActiveCount() == 0) { System.out.println("execute create thread."); http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fd31809a/service/src/main/java/org/apache/griffin/core/metric/MetricStoreImpl.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/metric/MetricStoreImpl.java b/service/src/main/java/org/apache/griffin/core/metric/MetricStoreImpl.java index 360d4ec..a080f30 100644 --- a/service/src/main/java/org/apache/griffin/core/metric/MetricStoreImpl.java +++ b/service/src/main/java/org/apache/griffin/core/metric/MetricStoreImpl.java @@ -116,9 +116,10 @@ public class MetricStoreImpl implements MetricStore { if (jsonNode.hasNonNull("hits") && jsonNode.get("hits").hasNonNull("hits")) { for (JsonNode node : jsonNode.get("hits").get("hits")) { JsonNode sourceNode = node.get("_source"); - metricValues.add(new MetricValue(sourceNode.get("name").asText(), Long.parseLong(sourceNode.get("tmst").asText()), - JsonUtil.toEntity(sourceNode.get("value").toString(), new TypeReference<Map<String, Object>>() { - }))); + Map<String, Object> value = JsonUtil.toEntity(sourceNode.get("value").toString(), new TypeReference<Map<String, Object>>() { + }); + MetricValue metricValue = new MetricValue(sourceNode.get("name").asText(), Long.parseLong(sourceNode.get("tmst").asText()), value); + metricValues.add(metricValue); } } return metricValues; http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fd31809a/service/src/main/java/org/apache/griffin/core/util/MeasureUtil.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/util/MeasureUtil.java b/service/src/main/java/org/apache/griffin/core/util/MeasureUtil.java new file mode 100644 index 0000000..f59d614 --- /dev/null +++ b/service/src/main/java/org/apache/griffin/core/util/MeasureUtil.java @@ -0,0 +1,76 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package org.apache.griffin.core.util; + +import org.apache.commons.lang.StringUtils; +import org.apache.griffin.core.exception.GriffinException; +import org.apache.griffin.core.measure.entity.DataSource; +import org.apache.griffin.core.measure.entity.ExternalMeasure; +import org.apache.griffin.core.measure.entity.GriffinMeasure; +import org.apache.griffin.core.measure.entity.Measure; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.apache.griffin.core.exception.GriffinExceptionMessage.INVALID_CONNECTOR_NAME; +import static org.apache.griffin.core.exception.GriffinExceptionMessage.MISSING_METRIC_NAME; + +public class MeasureUtil { + private static final Logger LOGGER = LoggerFactory.getLogger(MeasureUtil.class); + + public static void validateMeasure(Measure measure) { + if (measure instanceof GriffinMeasure) { + validateGriffinMeasure((GriffinMeasure) measure); + } else if (measure instanceof ExternalMeasure) { + validateExternalMeasure((ExternalMeasure) measure); + } + + } + + private static void validateGriffinMeasure(GriffinMeasure measure) { + if (getConnectorNamesIfValid(measure) == null) { + throw new GriffinException.BadRequestException(INVALID_CONNECTOR_NAME); + } + } + + private static void validateExternalMeasure(ExternalMeasure measure) { + if (StringUtils.isBlank(measure.getMetricName())) { + LOGGER.warn("Failed to create external measure {}. Its metric name is blank.", measure.getName()); + throw new GriffinException.BadRequestException(MISSING_METRIC_NAME); + } + } + + private static List<String> getConnectorNamesIfValid(GriffinMeasure measure) { + Set<String> sets = new HashSet<>(); + List<DataSource> sources = measure.getDataSources(); + for (DataSource source : sources) { + source.getConnectors().stream().filter(dc -> dc.getName() != null).forEach(dc -> sets.add(dc.getName())); + } + if (sets.size() == 0 || sets.size() < sources.size()) { + LOGGER.warn("Connector names cannot be repeated or empty."); + return null; + } + return new ArrayList<>(sets); + } +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fd31809a/service/src/main/java/org/apache/griffin/core/util/PropertiesUtil.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/util/PropertiesUtil.java b/service/src/main/java/org/apache/griffin/core/util/PropertiesUtil.java index 728ee9e..415c9c1 100644 --- a/service/src/main/java/org/apache/griffin/core/util/PropertiesUtil.java +++ b/service/src/main/java/org/apache/griffin/core/util/PropertiesUtil.java @@ -19,11 +19,17 @@ under the License. package org.apache.griffin.core.util; +import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.config.PropertiesFactoryBean; +import org.springframework.core.io.ClassPathResource; +import org.springframework.core.io.InputStreamResource; import org.springframework.core.io.Resource; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; import java.io.IOException; import java.util.Properties; @@ -43,4 +49,50 @@ public class PropertiesUtil { } return properties; } + + /** + * @param name properties name like sparkJob.properties + * @param defaultPath properties classpath like /application.properties + * @param location custom properties path + * @return Properties + * @throws FileNotFoundException location setting is wrong that there is no target file. + */ + public static Properties getConf(String name, String defaultPath, String location) throws FileNotFoundException { + String path = getConfPath(name, location); + Resource resource; + if (path == null) { + resource = new ClassPathResource(defaultPath); + path = defaultPath; + } else { + resource = new InputStreamResource(new FileInputStream(path)); + } + return PropertiesUtil.getProperties(path, resource); + } + + private static String getConfPath(String name, String location) throws FileNotFoundException { + if (StringUtils.isEmpty(location)) { + LOGGER.info("Config location is empty. Read from default path."); + return null; + } + File file = new File(location); + LOGGER.info("File absolute path:" + file.getAbsolutePath()); + File[] files = file.listFiles(); + if (files == null) { + LOGGER.warn("The defaultPath {} does not exist.Please check your config in application.properties.", location); + throw new FileNotFoundException(); + } + return getConfPath(name, files, location); + } + + private static String getConfPath(String name, File[] files, String location) { + String path = null; + for (File f : files) { + if (f.getName().equals(name)) { + path = location + File.separator + name; + LOGGER.info("config real path: {}", path); + } + } + return path; + } + } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fd31809a/service/src/main/java/org/apache/griffin/core/util/TimeUtil.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/util/TimeUtil.java b/service/src/main/java/org/apache/griffin/core/util/TimeUtil.java index 5f4396a..75f068d 100644 --- a/service/src/main/java/org/apache/griffin/core/util/TimeUtil.java +++ b/service/src/main/java/org/apache/griffin/core/util/TimeUtil.java @@ -23,13 +23,31 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.text.SimpleDateFormat; -import java.util.*; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.TimeZone; import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; public class TimeUtil { private static final Logger LOGGER = LoggerFactory.getLogger(TimeUtil.class); + private static final String MILLISECONDS_PATTERN = "(?i)m(illi)?s(ec(ond)?)?"; + private static final String SECONDS_PATTERN = "(?i)s(ec(ond)?)?"; + private static final String MINUTES_PATTERN = "(?i)m(in(ute)?)?"; + private static final String HOURS_PATTERN = "(?i)h((ou)?r)?"; + private static final String DAYS_PATTERN = "(?i)d(ay)?"; + + private static class TimeUnitPair { + private long t; + private String unit; + + TimeUnitPair(long t, String unit) { + this.t = t; + this.unit = unit; + } + } public static Long str2Long(String timeStr) { if (timeStr == null) { @@ -42,18 +60,14 @@ public class TimeUtil { trimTimeStr = trimTimeStr.substring(1); positive = false; } + List<TimeUnitPair> list = getTimeUnitPairs(trimTimeStr); + return str2Long(positive, list); + } - String timePattern = "(?i)\\d+(ms|s|m|h|d)"; - Pattern pattern = Pattern.compile(timePattern); - Matcher matcher = pattern.matcher(trimTimeStr); - List<String> list = new ArrayList<>(); - while (matcher.find()) { - String group = matcher.group(); - list.add(group.toLowerCase()); - } + private static Long str2Long(boolean positive, List<TimeUnitPair> list) { long time = 0; - for (String aList : list) { - long t = milliseconds(aList.toLowerCase()); + for (TimeUnitPair tu : list) { + long t = milliseconds(tu); if (positive) { time += t; } else { @@ -63,19 +77,36 @@ public class TimeUtil { return time; } - private static Long milliseconds(String str) { - if (str.endsWith("ms")) { - return milliseconds(Long.parseLong(str.substring(0, str.length() - 2)), TimeUnit.MILLISECONDS); - } else if (str.endsWith("s")) { - return milliseconds(Long.parseLong(str.substring(0, str.length() - 1)), TimeUnit.SECONDS); - } else if (str.endsWith("m")) { - return milliseconds(Long.parseLong(str.substring(0, str.length() - 1)), TimeUnit.MINUTES); - } else if (str.endsWith("h")) { - return milliseconds(Long.parseLong(str.substring(0, str.length() - 1)), TimeUnit.HOURS); - } else if (str.endsWith("d")) { - return milliseconds(Long.parseLong(str.substring(0, str.length() - 1)), TimeUnit.DAYS); + private static List<TimeUnitPair> getTimeUnitPairs(String timeStr) { + // "1d2h3m" -> "1d", "2h", "3m" + String timePattern = "(?i)(\\d+)([a-zA-Z]+)"; + Pattern pattern = Pattern.compile(timePattern); + Matcher matcher = pattern.matcher(timeStr); + List<TimeUnitPair> list = new ArrayList<>(); + while (matcher.find()) { + String num = matcher.group(1); + String unit = matcher.group(2); + TimeUnitPair tu = new TimeUnitPair(Long.valueOf(num), unit); + list.add(tu); + } + return list; + } + + private static Long milliseconds(TimeUnitPair tu) { + long t = tu.t; + String unit = tu.unit; + if (unit.matches(MILLISECONDS_PATTERN)) { + return milliseconds(t, TimeUnit.MILLISECONDS); + } else if (unit.matches(SECONDS_PATTERN)) { + return milliseconds(t, TimeUnit.SECONDS); + } else if (unit.matches(MINUTES_PATTERN)) { + return milliseconds(t, TimeUnit.MINUTES); + } else if (unit.matches(HOURS_PATTERN)) { + return milliseconds(t, TimeUnit.HOURS); + } else if (unit.matches(DAYS_PATTERN)) { + return milliseconds(t, TimeUnit.DAYS); } else { - LOGGER.error("Time string format error.It only supports d(day),h(hour),m(minute),s(second),ms(millsecond).Please check your time format.)"); + LOGGER.error("Time string format error.It only supports d(day),h(hour),m(minute),s(second),ms(millsecond).Please check your time format."); return 0L; } } @@ -84,7 +115,7 @@ public class TimeUtil { return unit.toMillis(duration); } - public static String format(String timeFormat, long time,String timeZone) { + public static String format(String timeFormat, long time, TimeZone timeZone) { String timePattern = "#(?:\\\\#|[^#])*#"; Date t = new Date(time); Pattern ptn = Pattern.compile(timePattern); @@ -95,7 +126,7 @@ public class TimeUtil { String content = group.substring(1, group.length() - 1); String pattern = refreshEscapeHashTag(content); SimpleDateFormat sdf = new SimpleDateFormat(pattern); - sdf.setTimeZone(TimeZone.getTimeZone(timeZone)); + sdf.setTimeZone(timeZone); matcher.appendReplacement(sb, sdf.format(t)); } matcher.appendTail(sb); http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fd31809a/service/src/main/resources/sparkJob.properties ---------------------------------------------------------------------- diff --git a/service/src/main/resources/sparkJob.properties b/service/src/main/resources/sparkJob.properties index a9be693..6323914 100644 --- a/service/src/main/resources/sparkJob.properties +++ b/service/src/main/resources/sparkJob.properties @@ -27,18 +27,17 @@ sparkJob.name=griffin sparkJob.queue=default # options -sparkJob.numExecutors=10 +sparkJob.numExecutors=2 sparkJob.executorCores=1 -sparkJob.driverMemory=2g -sparkJob.executorMemory=2g +sparkJob.driverMemory=1g +sparkJob.executorMemory=1g # shouldn't config in server, but in -sparkJob.jars = hdfs://livy/spark-avro_2.11-2.0.1.jar;\ - hdfs://livy/datanucleus-api-jdo-3.2.6.jar;\ - hdfs://livy/datanucleus-core-3.2.10.jar;\ - hdfs://livy/datanucleus-rdbms-3.2.9.jar +sparkJob.jars = hdfs:///livy/datanucleus-api-jdo-3.2.6.jar;\ + hdfs:///livy/datanucleus-core-3.2.10.jar;\ + hdfs:///livy/datanucleus-rdbms-3.2.9.jar -spark.yarn.dist.files = hdfs://livy/hive-site.xml +spark.yarn.dist.files = hdfs:///home/spark_conf/hive-site.xml # livy # livy.uri=http://10.9.246.187:8998/batches