Fix bugs and update info

Fix:
        1.fix data connector name cannot be repeated in one measure bug
        2.fix read properties bug
        3.fix data unit may empty string bug
        4.fix range and data timezone bug
        5.fix file predicator check wrong bug
Update:
       1.add get job config api
       2.update rule some field not null
       3.update measure rule description and add null check
       4.update add job return type
       5.update code style

Author: ahutsunshine <ahutsunsh...@gmail.com>
Author: He Wang <wanghe...@qq.com>
Author: dodobel <1254288...@qq.com>

Closes #211 from ahutsunshine/master.


Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/fd31809a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/fd31809a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/fd31809a

Branch: refs/heads/master
Commit: fd31809a60755c9069eba405e7af456e3e242eff
Parents: 0f5d742
Author: ahutsunshine <ahutsunsh...@gmail.com>
Authored: Tue Feb 6 10:36:25 2018 +0800
Committer: Lionel Liu <bhlx3l...@163.com>
Committed: Tue Feb 6 10:36:25 2018 +0800

----------------------------------------------------------------------
 .../apache/griffin/measure/util/MailUtil.java   |  20 ++--
 .../griffin/core/common/CacheEvictor.java       |   6 +-
 .../griffin/core/config/PropertiesConfig.java   |  49 ++-------
 .../griffin/core/config/SchedulerConfig.java    |   7 +-
 .../core/exception/GriffinExceptionMessage.java |   1 +
 .../griffin/core/job/FileExistPredicator.java   |  10 +-
 .../apache/griffin/core/job/JobController.java  |   5 +
 .../apache/griffin/core/job/JobInstance.java    |  77 +++++++------
 .../org/apache/griffin/core/job/JobService.java |   2 +
 .../apache/griffin/core/job/JobServiceImpl.java | 110 +++++++++++--------
 .../apache/griffin/core/job/SparkSubmitJob.java |  24 ++--
 .../griffin/core/job/entity/JobSchedule.java    |  10 +-
 .../core/job/entity/SegmentPredicate.java       |  10 +-
 .../griffin/core/job/entity/SegmentRange.java   |   2 +-
 .../core/job/factory/PredicatorFactory.java     |   4 +
 .../griffin/core/job/repo/JobInstanceRepo.java  |   2 +-
 .../griffin/core/job/repo/JobScheduleRepo.java  |   2 +
 .../measure/ExternalMeasureOperationImpl.java   |  15 +--
 .../measure/GriffinMeasureOperationImpl.java    |  55 ++--------
 .../griffin/core/measure/MeasureController.java |   6 +
 .../griffin/core/measure/MeasureService.java    |  10 +-
 .../core/measure/MeasureServiceImpl.java        |   9 ++
 .../core/measure/entity/DataConnector.java      |  27 ++++-
 .../core/measure/entity/GriffinMeasure.java     |  41 ++++++-
 .../griffin/core/measure/entity/Rule.java       |  87 ++++++++++++---
 .../core/metastore/hive/HiveMetaStoreProxy.java |   2 +-
 .../hive/HiveMetaStoreServiceImpl.java          |  25 +++--
 .../griffin/core/metric/MetricStoreImpl.java    |   7 +-
 .../apache/griffin/core/util/MeasureUtil.java   |  76 +++++++++++++
 .../griffin/core/util/PropertiesUtil.java       |  52 +++++++++
 .../org/apache/griffin/core/util/TimeUtil.java  |  81 +++++++++-----
 service/src/main/resources/sparkJob.properties  |  15 ++-
 .../core/config/PropertiesConfigTest.java       |  48 +++-----
 .../griffin/core/job/JobControllerTest.java     |   3 +-
 .../griffin/core/job/JobInstanceTest.java       |   8 +-
 .../griffin/core/job/JobServiceImplTest.java    |  42 +++----
 .../GriffinMeasureOperationImplTest.java        |  11 --
 .../core/measure/MeasureControllerTest.java     |  33 +++++-
 .../core/measure/MeasureServiceImplTest.java    |  46 +++++++-
 .../griffin/core/util/PropertiesUtilTest.java   |  38 ++++++-
 .../apache/griffin/core/util/TimeUtilTest.java  |  27 ++---
 service/src/test/resources/sparkJob.properties  |  21 ++--
 42 files changed, 723 insertions(+), 403 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fd31809a/measure/src/main/scala/org/apache/griffin/measure/util/MailUtil.java
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/util/MailUtil.java 
b/measure/src/main/scala/org/apache/griffin/measure/util/MailUtil.java
index 4bbc581..1d27838 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/util/MailUtil.java
+++ b/measure/src/main/scala/org/apache/griffin/measure/util/MailUtil.java
@@ -23,7 +23,10 @@ package org.apache.griffin.measure.util;
 import org.apache.griffin.measure.config.params.env.EmailParam;
 
 import javax.mail.*;
-import javax.mail.internet.*;
+import javax.mail.internet.InternetAddress;
+import javax.mail.internet.MimeBodyPart;
+import javax.mail.internet.MimeMessage;
+import javax.mail.internet.MimeMultipart;
 import java.util.Properties;
 
 /**
@@ -41,18 +44,17 @@ public class MailUtil {
         Message msg = new MimeMessage(session);
         msg.setFrom(new InternetAddress(emailParam.mail()));
         //msg.setRecipient(Message.RecipientType.TO, new 
InternetAddress(UserArr));
-        String[] arr=null;
-        if (UserArr.indexOf(",")==-1) {
-            arr=new String[]{UserArr};
-        }
-        else {
-            arr=UserArr.split(",");
+        String[] arr = null;
+        if (UserArr.indexOf(",") == -1) {
+            arr = new String[]{UserArr};
+        } else {
+            arr = UserArr.split(",");
         }
         Address[] tos = new InternetAddress[arr.length];
-        for (int i=0; i<arr.length; i++){
+        for (int i = 0; i < arr.length; i++) {
             tos[i] = new InternetAddress(arr[i]);
         }
-        msg.setRecipients(Message.RecipientType.TO,tos);
+        msg.setRecipients(Message.RecipientType.TO, tos);
         msg.setSubject(Title);
         Multipart mainPart = new MimeMultipart();
         BodyPart html = new MimeBodyPart();

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fd31809a/service/src/main/java/org/apache/griffin/core/common/CacheEvictor.java
----------------------------------------------------------------------
diff --git 
a/service/src/main/java/org/apache/griffin/core/common/CacheEvictor.java 
b/service/src/main/java/org/apache/griffin/core/common/CacheEvictor.java
index c40f4e1..17c0f91 100644
--- a/service/src/main/java/org/apache/griffin/core/common/CacheEvictor.java
+++ b/service/src/main/java/org/apache/griffin/core/common/CacheEvictor.java
@@ -30,8 +30,12 @@ import org.springframework.stereotype.Component;
 public class CacheEvictor {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(CacheEvictor.class);
 
+    private final HiveMetaStoreService hiveMetaStoreService;
+
     @Autowired
-    private HiveMetaStoreService hiveMetaStoreService;
+    public CacheEvictor(HiveMetaStoreService hiveMetaStoreService) {
+        this.hiveMetaStoreService = hiveMetaStoreService;
+    }
 
     @Scheduled(fixedRateString = 
"${cache.evict.hive.fixedRate.in.milliseconds}")
     @CacheEvict(cacheNames = "hive", allEntries = true, beforeInvocation = 
true)

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fd31809a/service/src/main/java/org/apache/griffin/core/config/PropertiesConfig.java
----------------------------------------------------------------------
diff --git 
a/service/src/main/java/org/apache/griffin/core/config/PropertiesConfig.java 
b/service/src/main/java/org/apache/griffin/core/config/PropertiesConfig.java
index bfaba35..0f27513 100644
--- a/service/src/main/java/org/apache/griffin/core/config/PropertiesConfig.java
+++ b/service/src/main/java/org/apache/griffin/core/config/PropertiesConfig.java
@@ -19,22 +19,19 @@ under the License.
 
 package org.apache.griffin.core.config;
 
-import org.apache.commons.lang.StringUtils;
-import org.apache.griffin.core.util.PropertiesUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.core.io.ClassPathResource;
-import org.springframework.core.io.InputStreamResource;
-import org.springframework.core.io.Resource;
 
-import java.io.File;
-import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.util.Properties;
 
+import static org.apache.griffin.core.util.PropertiesUtil.getConf;
+import static org.apache.griffin.core.util.PropertiesUtil.getProperties;
+
 @Configuration
 public class PropertiesConfig {
 
@@ -47,50 +44,24 @@ public class PropertiesConfig {
         this.location = location;
     }
 
-    private String getPath(String defaultPath, String name) throws 
FileNotFoundException {
-        String path = defaultPath;
-        File file = new File(location);
-        LOGGER.info("File absolute path:" + file.getAbsolutePath());
-        File[] files = file.listFiles();
-        if (files == null || files.length == 0) {
-            LOGGER.error("The defaultPath {} does not exist.Please check your 
config in application.properties.", location);
-            throw new FileNotFoundException();
-        }
-        for (File f : files) {
-            if (f.getName().equals(name)) {
-                path = location + File.separator + name;
-                LOGGER.info("config real path: {}", path);
-            }
-        }
-        return path;
-    }
-
 
     @Bean(name = "appConf")
     public Properties appConf() {
         String path = "/application.properties";
-        return PropertiesUtil.getProperties(path, new ClassPathResource(path));
+        return getProperties(path, new ClassPathResource(path));
     }
 
     @Bean(name = "livyConf")
     public Properties livyConf() throws FileNotFoundException {
-        String path = "/sparkJob.properties";
-        if (StringUtils.isEmpty(location)) {
-            return PropertiesUtil.getProperties(path, new 
ClassPathResource(path));
-        }
-        path = getPath(path, "sparkJob.properties");
-        Resource resource = new InputStreamResource(new FileInputStream(path));
-        return PropertiesUtil.getProperties(path, resource);
+        String name = "sparkJob.properties";
+        String defaultPath = "/" + name;
+        return getConf(name, defaultPath, location);
     }
 
     @Bean(name = "quartzConf")
     public Properties quartzConf() throws FileNotFoundException {
-        String path = "/quartz.properties";
-        if (StringUtils.isEmpty(location)) {
-            return PropertiesUtil.getProperties(path, new 
ClassPathResource(path));
-        }
-        path = getPath(path, "quartz.properties");
-        Resource resource = new InputStreamResource(new FileInputStream(path));
-        return PropertiesUtil.getProperties(path, resource);
+        String name = "quartz.properties";
+        String defaultPath = "/" + name;
+        return getConf(name, defaultPath, location);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fd31809a/service/src/main/java/org/apache/griffin/core/config/SchedulerConfig.java
----------------------------------------------------------------------
diff --git 
a/service/src/main/java/org/apache/griffin/core/config/SchedulerConfig.java 
b/service/src/main/java/org/apache/griffin/core/config/SchedulerConfig.java
index 896ab00..7295168 100644
--- a/service/src/main/java/org/apache/griffin/core/config/SchedulerConfig.java
+++ b/service/src/main/java/org/apache/griffin/core/config/SchedulerConfig.java
@@ -35,9 +35,12 @@ import java.util.Properties;
 @Configuration
 public class SchedulerConfig {
 
+    private final Properties quartzConf;
+
     @Autowired
-    @Qualifier("quartzConf")
-    private Properties quartzConf;
+    public SchedulerConfig(@Qualifier("quartzConf") Properties quartzConf) {
+        this.quartzConf = quartzConf;
+    }
 
     @Bean
     public JobFactory jobFactory(ApplicationContext applicationContext) {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fd31809a/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionMessage.java
----------------------------------------------------------------------
diff --git 
a/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionMessage.java
 
b/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionMessage.java
index 83fbc23..aab8769 100644
--- 
a/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionMessage.java
+++ 
b/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionMessage.java
@@ -31,6 +31,7 @@ public enum GriffinExceptionMessage {
     INVALID_METRIC_RECORDS_SIZE(40007, "Size must not be less than zero"),
     INVALID_METRIC_VALUE_FORMAT(40008, "Metric value format is invalid"),
     INVALID_MEASURE_ID(40009, "Property 'measure.id' is invalid"),
+    INVALID_CRON_EXPRESSION(40010, "Property 'cron.expression' is invalid"),
 
     //404, "Not Found"
     MEASURE_ID_DOES_NOT_EXIST(40401, "Measure id does not exist"),

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fd31809a/service/src/main/java/org/apache/griffin/core/job/FileExistPredicator.java
----------------------------------------------------------------------
diff --git 
a/service/src/main/java/org/apache/griffin/core/job/FileExistPredicator.java 
b/service/src/main/java/org/apache/griffin/core/job/FileExistPredicator.java
index 7354176..7ee8642 100644
--- a/service/src/main/java/org/apache/griffin/core/job/FileExistPredicator.java
+++ b/service/src/main/java/org/apache/griffin/core/job/FileExistPredicator.java
@@ -19,10 +19,13 @@ under the License.
 
 package org.apache.griffin.core.job;
 
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.lang.StringUtils;
 import org.apache.griffin.core.job.entity.SegmentPredicate;
 import org.apache.griffin.core.util.FSUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.util.CollectionUtils;
 
 import java.io.IOException;
 import java.util.Map;
@@ -45,11 +48,12 @@ public class FileExistPredicator implements Predicator {
     public boolean predicate() throws IOException {
         Map<String, String> config = predicate.getConfigMap();
         String[] paths = null;
-        if (config.get(PREDICT_PATH) != null) {
+        String rootPath = null;
+        if (config != null && !StringUtils.isEmpty(config.get(PREDICT_PATH))) {
             paths = config.get(PREDICT_PATH).split(PATH_CONNECTOR_CHARACTER);
+            rootPath = config.get(PREDICT_ROOT_PATH);
         }
-        String rootPath = config.get(PREDICT_ROOT_PATH);
-        if (paths == null || rootPath == null) {
+        if (ArrayUtils.isEmpty(paths) || StringUtils.isEmpty(rootPath)) {
             LOGGER.error("Predicate path is null.Please check predicates 
config root.path and path.");
             throw new NullPointerException();
         }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fd31809a/service/src/main/java/org/apache/griffin/core/job/JobController.java
----------------------------------------------------------------------
diff --git 
a/service/src/main/java/org/apache/griffin/core/job/JobController.java 
b/service/src/main/java/org/apache/griffin/core/job/JobController.java
index 3f03f8c..2d09d8b 100644
--- a/service/src/main/java/org/apache/griffin/core/job/JobController.java
+++ b/service/src/main/java/org/apache/griffin/core/job/JobController.java
@@ -41,6 +41,11 @@ public class JobController {
         return jobService.getAliveJobs();
     }
 
+    @RequestMapping(value = "/jobs/config/{jobName}")
+    public JobSchedule getJobSchedule(@PathVariable("jobName") String jobName) 
{
+        return jobService.getJobSchedule(jobName);
+    }
+
     @RequestMapping(value = "/jobs", method = RequestMethod.POST)
     @ResponseStatus(HttpStatus.CREATED)
     public JobSchedule addJob(@RequestBody JobSchedule jobSchedule) throws 
Exception {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fd31809a/service/src/main/java/org/apache/griffin/core/job/JobInstance.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/JobInstance.java 
b/service/src/main/java/org/apache/griffin/core/job/JobInstance.java
index ba0b1fb..5cffceb 100644
--- a/service/src/main/java/org/apache/griffin/core/job/JobInstance.java
+++ b/service/src/main/java/org/apache/griffin/core/job/JobInstance.java
@@ -38,13 +38,13 @@ import 
org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.scheduling.quartz.SchedulerFactoryBean;
 
 import java.io.IOException;
-import java.text.ParseException;
 import java.util.*;
 
 import static org.apache.griffin.core.job.JobServiceImpl.GRIFFIN_JOB_ID;
 import static org.apache.griffin.core.job.JobServiceImpl.JOB_SCHEDULE_ID;
 import static org.quartz.JobBuilder.newJob;
 import static org.quartz.JobKey.jobKey;
+import static org.quartz.SimpleScheduleBuilder.simpleSchedule;
 import static org.quartz.TriggerBuilder.newTrigger;
 import static org.quartz.TriggerKey.triggerKey;
 
@@ -55,7 +55,7 @@ public class JobInstance implements Job {
     public static final String MEASURE_KEY = "measure";
     public static final String PREDICATES_KEY = "predicts";
     public static final String PREDICATE_JOB_NAME = "predicateJobName";
-    public static final String JOB_NAME = "jobName";
+    static final String JOB_NAME = "jobName";
     static final String PATH_CONNECTOR_CHARACTER = ",";
 
     @Autowired
@@ -73,12 +73,12 @@ public class JobInstance implements Job {
     private JobSchedule jobSchedule;
     private GriffinMeasure measure;
     private GriffinJob griffinJob;
-    private List<SegmentPredicate> mPredicts;
+    private List<SegmentPredicate> mPredicates;
     private Long jobStartTime;
 
 
     @Override
-    public void execute(JobExecutionContext context) throws 
JobExecutionException {
+    public void execute(JobExecutionContext context) {
         try {
             initParam(context);
             setSourcesPartitionsAndPredicates(measure.getDataSources());
@@ -89,7 +89,7 @@ public class JobInstance implements Job {
     }
 
     private void initParam(JobExecutionContext context) throws 
SchedulerException {
-        mPredicts = new ArrayList<>();
+        mPredicates = new ArrayList<>();
         JobDetail jobDetail = context.getJobDetail();
         Long jobScheduleId = 
jobDetail.getJobDataMap().getLong(JOB_SCHEDULE_ID);
         Long griffinJobId = jobDetail.getJobDataMap().getLong(GRIFFIN_JOB_ID);
@@ -102,7 +102,7 @@ public class JobInstance implements Job {
     }
 
     private void setJobStartTime(JobDetail jobDetail) throws 
SchedulerException {
-        Scheduler scheduler = factory.getObject();
+        Scheduler scheduler = factory.getScheduler();
         JobKey jobKey = jobDetail.getKey();
         List<Trigger> triggers = (List<Trigger>) 
scheduler.getTriggersOfJob(jobKey);
         Date triggerTime = triggers.get(0).getPreviousFireTime();
@@ -145,13 +145,14 @@ public class JobInstance implements Job {
      * split data into several part and get every part start timestamp
      *
      * @param segRange config of data
+     * @param dc data connector
      * @return split timestamps of data
      */
-    private Long[] genSampleTs(SegmentRange segRange, DataConnector dc) throws 
IOException {
+    private Long[] genSampleTs(SegmentRange segRange, DataConnector dc) {
         Long offset = TimeUtil.str2Long(segRange.getBegin());
         Long range = TimeUtil.str2Long(segRange.getLength());
         String unit = dc.getDataUnit();
-        Long dataUnit = TimeUtil.str2Long(unit != null ? unit : 
dc.getDefaultDataUnit());
+        Long dataUnit = TimeUtil.str2Long(StringUtils.isEmpty(unit) ? 
dc.getDefaultDataUnit() : unit);
         //offset usually is negative
         Long dataStartTime = jobStartTime + offset;
         if (range < 0) {
@@ -172,59 +173,68 @@ public class JobInstance implements Job {
     /**
      * set data connector predicates
      *
+     * @param dc data connector
      * @param sampleTs collection of data split start timestamp
      */
     private void setConnectorPredicates(DataConnector dc, Long[] sampleTs) 
throws IOException {
         List<SegmentPredicate> predicates = dc.getPredicates();
-        if (predicates != null) {
-            for (SegmentPredicate predicate : predicates) {
-                genConfMap(predicate.getConfigMap(), sampleTs);
-                //Do not forget to update origin string config
-                predicate.setConfigMap(predicate.getConfigMap());
-                mPredicts.add(predicate);
-            }
+        for (SegmentPredicate predicate : predicates) {
+            genConfMap(dc, sampleTs);
+            //Do not forget to update origin string config
+            predicate.setConfigMap(predicate.getConfigMap());
+            mPredicates.add(predicate);
         }
     }
 
-    /**
-     * set data connector configs
-     *
-     * @param sampleTs collection of data split start timestamp
-     */
     private void setConnectorConf(DataConnector dc, Long[] sampleTs) throws 
IOException {
-        genConfMap(dc.getConfigMap(), sampleTs);
+        genConfMap(dc, sampleTs);
         dc.setConfigMap(dc.getConfigMap());
     }
 
 
     /**
-     * @param conf     map with file predicate,data split and partitions info
+     * @param dc     data connector
      * @param sampleTs collection of data split start timestamp
      * @return all config data combine,like {"where": "year=2017 AND month=11 
AND dt=15 AND hour=09,year=2017 AND month=11 AND dt=15 AND hour=10"}
      * or like {"path": 
"/year=2017/month=11/dt=15/hour=09/_DONE,/year=2017/month=11/dt=15/hour=10/_DONE"}
      */
-    private void genConfMap(Map<String, String> conf, Long[] sampleTs) {
+    private void genConfMap(DataConnector dc,  Long[] sampleTs) {
+        Map<String, String> conf = dc.getConfigMap();
+        if (conf == null) {
+            LOGGER.warn("Predicate config is null.");
+            return;
+        }
         for (Map.Entry<String, String> entry : conf.entrySet()) {
             String value = entry.getValue();
             Set<String> set = new HashSet<>();
+            if (StringUtils.isEmpty(value)) {
+                continue;
+            }
             for (Long timestamp : sampleTs) {
-                set.add(TimeUtil.format(value, 
timestamp,jobSchedule.getTimeZone()));
+                set.add(TimeUtil.format(value, timestamp, getTimeZone(dc)));
             }
             conf.put(entry.getKey(), StringUtils.join(set, 
PATH_CONNECTOR_CHARACTER));
         }
     }
 
+    private TimeZone getTimeZone(DataConnector dc) {
+        if (StringUtils.isEmpty(dc.getDataTimeZone())) {
+            return TimeZone.getDefault();
+        }
+        return TimeZone.getTimeZone(dc.getDataTimeZone());
+    }
+
     private boolean createJobInstance(Map<String, Object> confMap) throws 
Exception {
         Map<String, Object> config = (Map<String, Object>) 
confMap.get("checkdonefile.schedule");
         Long interval = TimeUtil.str2Long((String) config.get("interval"));
         Integer repeat = Integer.valueOf(config.get("repeat").toString());
         String groupName = "PG";
         String jobName = griffinJob.getJobName() + "_predicate_" + 
System.currentTimeMillis();
-        Scheduler scheduler = factory.getObject();
+        Scheduler scheduler = factory.getScheduler();
         TriggerKey triggerKey = triggerKey(jobName, groupName);
         return !(scheduler.checkExists(triggerKey)
                 || !saveGriffinJob(jobName, groupName)
-                || !createJobInstance(scheduler, triggerKey, interval, repeat, 
jobName));
+                || !createJobInstance(triggerKey, interval, repeat, jobName));
     }
 
     private boolean saveGriffinJob(String pName, String pGroup) {
@@ -236,26 +246,27 @@ public class JobInstance implements Job {
         return true;
     }
 
-    private boolean createJobInstance(Scheduler scheduler, TriggerKey 
triggerKey, Long interval, Integer repeatCount, String pJobName) throws 
Exception {
-        JobDetail jobDetail = addJobDetail(scheduler, triggerKey, pJobName);
-        scheduler.scheduleJob(newTriggerInstance(triggerKey, jobDetail, 
interval, repeatCount));
+    private boolean createJobInstance(TriggerKey triggerKey, Long interval, 
Integer repeatCount, String pJobName) throws Exception {
+        JobDetail jobDetail = addJobDetail(triggerKey, pJobName);
+        factory.getScheduler().scheduleJob(newTriggerInstance(triggerKey, 
jobDetail, interval, repeatCount));
         return true;
     }
 
 
-    private Trigger newTriggerInstance(TriggerKey triggerKey, JobDetail jd, 
Long interval, Integer repeatCount) throws ParseException {
+    private Trigger newTriggerInstance(TriggerKey triggerKey, JobDetail jd, 
Long interval, Integer repeatCount) {
         return newTrigger()
                 .withIdentity(triggerKey)
                 .forJob(jd)
                 .startNow()
-                .withSchedule(SimpleScheduleBuilder.simpleSchedule()
+                .withSchedule(simpleSchedule()
                         .withIntervalInMilliseconds(interval)
                         .withRepeatCount(repeatCount)
                 )
                 .build();
     }
 
-    private JobDetail addJobDetail(Scheduler scheduler, TriggerKey triggerKey, 
String pJobName) throws SchedulerException, JsonProcessingException {
+    private JobDetail addJobDetail(TriggerKey triggerKey, String pJobName) 
throws SchedulerException, JsonProcessingException {
+        Scheduler scheduler = factory.getScheduler();
         JobKey jobKey = jobKey(triggerKey.getName(), triggerKey.getGroup());
         JobDetail jobDetail;
         Boolean isJobKeyExist = scheduler.checkExists(jobKey);
@@ -275,7 +286,7 @@ public class JobInstance implements Job {
     private void setJobDataMap(JobDetail jobDetail, String pJobName) throws 
JsonProcessingException {
         JobDataMap dataMap = jobDetail.getJobDataMap();
         dataMap.put(MEASURE_KEY, JsonUtil.toJson(measure));
-        dataMap.put(PREDICATES_KEY, JsonUtil.toJson(mPredicts));
+        dataMap.put(PREDICATES_KEY, JsonUtil.toJson(mPredicates));
         dataMap.put(JOB_NAME, griffinJob.getJobName());
         dataMap.put(PREDICATE_JOB_NAME, pJobName);
     }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fd31809a/service/src/main/java/org/apache/griffin/core/job/JobService.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/JobService.java 
b/service/src/main/java/org/apache/griffin/core/job/JobService.java
index 3c15f8f..a238311 100644
--- a/service/src/main/java/org/apache/griffin/core/job/JobService.java
+++ b/service/src/main/java/org/apache/griffin/core/job/JobService.java
@@ -31,6 +31,8 @@ public interface JobService {
 
     List<JobDataBean> getAliveJobs();
 
+    JobSchedule getJobSchedule(String jobName);
+
     JobSchedule addJob(JobSchedule jobSchedule) throws Exception;
 
     void pauseJob(String group, String name) throws SchedulerException;

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fd31809a/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java 
b/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
index aa84f11..b2faa72 100644
--- a/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
+++ b/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
@@ -47,10 +47,12 @@ import org.springframework.web.client.RestClientException;
 import org.springframework.web.client.RestTemplate;
 
 import java.io.IOException;
-import java.text.ParseException;
 import java.util.*;
 
+import static java.util.TimeZone.getTimeZone;
 import static org.apache.griffin.core.exception.GriffinExceptionMessage.*;
+import static org.quartz.CronExpression.isValidExpression;
+import static org.quartz.CronScheduleBuilder.cronSchedule;
 import static org.quartz.JobBuilder.newJob;
 import static org.quartz.JobKey.jobKey;
 import static org.quartz.TriggerBuilder.newTrigger;
@@ -61,8 +63,8 @@ public class JobServiceImpl implements JobService {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(JobServiceImpl.class);
     public static final String JOB_SCHEDULE_ID = "jobScheduleId";
     public static final String GRIFFIN_JOB_ID = "griffinJobId";
-    static final int MAX_PAGE_SIZE = 1024;
-    static final int DEFAULT_PAGE_SIZE = 10;
+    private static final int MAX_PAGE_SIZE = 1024;
+    private static final int DEFAULT_PAGE_SIZE = 10;
 
     @Autowired
     private SchedulerFactoryBean factory;
@@ -86,12 +88,11 @@ public class JobServiceImpl implements JobService {
 
     @Override
     public List<JobDataBean> getAliveJobs() {
-        Scheduler scheduler = factory.getObject();
         List<JobDataBean> dataList = new ArrayList<>();
         try {
             List<GriffinJob> jobs = jobRepo.findByDeleted(false);
             for (GriffinJob job : jobs) {
-                JobDataBean jobData = genJobData(scheduler, 
jobKey(job.getQuartzName(), job.getQuartzGroup()), job);
+                JobDataBean jobData = genJobData(jobKey(job.getQuartzName(), 
job.getQuartzGroup()), job);
                 if (jobData != null) {
                     dataList.add(jobData);
                 }
@@ -103,7 +104,8 @@ public class JobServiceImpl implements JobService {
         return dataList;
     }
 
-    private JobDataBean genJobData(Scheduler scheduler, JobKey jobKey, 
GriffinJob job) throws SchedulerException {
+    private JobDataBean genJobData(JobKey jobKey, GriffinJob job) throws 
SchedulerException {
+        Scheduler scheduler = factory.getScheduler();
         List<Trigger> triggers = (List<Trigger>) 
scheduler.getTriggersOfJob(jobKey);
         if (CollectionUtils.isEmpty(triggers)) {
             return null;
@@ -136,15 +138,25 @@ public class JobServiceImpl implements JobService {
     }
 
     @Override
+    public JobSchedule getJobSchedule(String jobName) {
+        JobSchedule jobSchedule = jobScheduleRepo.findByJobName(jobName);
+        if (jobSchedule == null) {
+            LOGGER.warn("Job name {} does not exist.", jobName);
+            throw new 
GriffinException.NotFoundException(JOB_NAME_DOES_NOT_EXIST);
+        }
+        return jobSchedule;
+    }
+
+    @Override
     @Transactional(rollbackFor = Exception.class)
     public JobSchedule addJob(JobSchedule js) throws Exception {
         Long measureId = js.getMeasureId();
         GriffinMeasure measure = getMeasureIfValid(measureId);
-        checkJobScheduleParams(js, measure);
+        validateJobScheduleParams(js, measure);
         String qName = getQuartzName(js);
-        String qGroup = getQuartzGroupName();
+        String qGroup = getQuartzGroup();
         TriggerKey triggerKey = triggerKey(qName, qGroup);
-        if (factory.getObject().checkExists(triggerKey)) {
+        if (factory.getScheduler().checkExists(triggerKey)) {
             throw new 
GriffinException.ConflictException(QUARTZ_JOB_ALREADY_EXIST);
         }
         GriffinJob job = new GriffinJob(measure.getId(), js.getJobName(), 
qName, qGroup, false);
@@ -155,33 +167,35 @@ public class JobServiceImpl implements JobService {
     }
 
     private void addJob(TriggerKey triggerKey, JobSchedule js, GriffinJob job) 
throws Exception {
-        Scheduler scheduler = factory.getObject();
-        JobDetail jobDetail = addJobDetail(scheduler, triggerKey, js, job);
-        scheduler.scheduleJob(genTriggerInstance(triggerKey, jobDetail, js));
+        JobDetail jobDetail = addJobDetail(triggerKey, js, job);
+        factory.getScheduler().scheduleJob(genTriggerInstance(triggerKey, 
jobDetail, js));
     }
 
     private String getQuartzName(JobSchedule js) {
         return js.getJobName() + "_" + System.currentTimeMillis();
     }
 
-    private String getQuartzGroupName() {
+    private String getQuartzGroup() {
         return "BA";
     }
 
-    private void checkJobScheduleParams(JobSchedule js, GriffinMeasure 
measure) {
-        if (!isJobNameValid(js.getJobName())) {
+    private void validateJobScheduleParams(JobSchedule js, GriffinMeasure 
measure) {
+        if (!isValidJobName(js.getJobName())) {
             throw new GriffinException.BadRequestException(INVALID_JOB_NAME);
         }
-        if (!isBaseLineValid(js.getSegments())) {
+        if (!isValidCronExpression(js.getCronExpression())) {
+            throw new 
GriffinException.BadRequestException(INVALID_CRON_EXPRESSION);
+        }
+        if (!isValidBaseLine(js.getSegments())) {
             throw new 
GriffinException.BadRequestException(MISSING_BASELINE_CONFIG);
         }
         List<String> names = getConnectorNames(measure);
-        if (!isConnectorNamesValid(js.getSegments(), names)) {
+        if (!isValidConnectorNames(js.getSegments(), names)) {
             throw new 
GriffinException.BadRequestException(INVALID_CONNECTOR_NAME);
         }
     }
 
-    private boolean isJobNameValid(String jobName) {
+    private boolean isValidJobName(String jobName) {
         if (StringUtils.isEmpty(jobName)) {
             LOGGER.warn("Job name cannot be empty.");
             return false;
@@ -194,7 +208,19 @@ public class JobServiceImpl implements JobService {
         return true;
     }
 
-    private boolean isBaseLineValid(List<JobDataSegment> segments) {
+    private boolean isValidCronExpression(String cronExpression) {
+        if (StringUtils.isEmpty(cronExpression)) {
+            LOGGER.warn("Cron Expression is empty.");
+            return false;
+        }
+        if (!isValidExpression(cronExpression)) {
+            LOGGER.warn("Cron Expression is invalid.");
+            return false;
+        }
+        return true;
+    }
+
+    private boolean isValidBaseLine(List<JobDataSegment> segments) {
         for (JobDataSegment jds : segments) {
             if (jds.getBaseline()) {
                 return true;
@@ -204,34 +230,25 @@ public class JobServiceImpl implements JobService {
         return false;
     }
 
-    private boolean isConnectorNamesValid(List<JobDataSegment> segments, 
List<String> names) {
-        Set<String> dcSets = new HashSet<>();
+    private boolean isValidConnectorNames(List<JobDataSegment> segments, 
List<String> names) {
+        Set<String> sets = new HashSet<>();
         for (JobDataSegment segment : segments) {
             String dcName = segment.getDataConnectorName();
-            dcSets.add(dcName);
-            if (!isConnectorNameValid(dcName, names)) {
+            sets.add(dcName);
+            boolean exist = names.stream().anyMatch(name -> 
name.equals(dcName));
+            if (!exist) {
+                LOGGER.warn("Param {} is a illegal string. Please input one of 
strings in {}.", dcName, names);
                 return false;
             }
         }
-        if (dcSets.size() < segments.size()) {
-            LOGGER.warn("Connector names in job data segment cannot be 
repeated.");
+        if (sets.size() < segments.size()) {
+            LOGGER.warn("Connector names in job data segment cannot 
duplicate.");
             return false;
         }
         return true;
     }
 
-    private boolean isConnectorNameValid(String param, List<String> names) {
-        for (String name : names) {
-            if (name.equals(param)) {
-                return true;
-            }
-        }
-        LOGGER.warn("Param {} is a illegal string. Please input one of strings 
in {}.", param, names);
-        return false;
-    }
-
     private List<String> getConnectorNames(GriffinMeasure measure) {
-        List<String> names = new ArrayList<>();
         Set<String> sets = new HashSet<>();
         List<DataSource> sources = measure.getDataSources();
         for (DataSource source : sources) {
@@ -241,8 +258,7 @@ public class JobServiceImpl implements JobService {
             LOGGER.warn("Connector names cannot be repeated.");
             return Collections.emptyList();
         }
-        names.addAll(sets);
-        return names;
+        return new ArrayList<>(sets);
     }
 
     private GriffinMeasure getMeasureIfValid(Long measureId) {
@@ -254,18 +270,18 @@ public class JobServiceImpl implements JobService {
         return measure;
     }
 
-
-    private Trigger genTriggerInstance(TriggerKey triggerKey, JobDetail jd, 
JobSchedule js) throws ParseException {
+    private Trigger genTriggerInstance(TriggerKey triggerKey, JobDetail jd, 
JobSchedule js) {
         return newTrigger()
                 .withIdentity(triggerKey)
                 .forJob(jd)
-                .withSchedule(CronScheduleBuilder.cronSchedule(new 
CronExpression(js.getCronExpression()))
-                        .inTimeZone(TimeZone.getTimeZone(js.getTimeZone()))
+                .withSchedule(cronSchedule(js.getCronExpression())
+                        .inTimeZone(getTimeZone(js.getTimeZone()))
                 )
                 .build();
     }
 
-    private JobDetail addJobDetail(Scheduler scheduler, TriggerKey triggerKey, 
JobSchedule js, GriffinJob job) throws SchedulerException {
+    private JobDetail addJobDetail(TriggerKey triggerKey, JobSchedule js, 
GriffinJob job) throws SchedulerException {
+        Scheduler scheduler = factory.getScheduler();
         JobKey jobKey = jobKey(triggerKey.getName(), triggerKey.getGroup());
         JobDetail jobDetail;
         Boolean isJobKeyExist = scheduler.checkExists(jobKey);
@@ -315,7 +331,7 @@ public class JobServiceImpl implements JobService {
 
     @Override
     public void pauseJob(String group, String name) throws SchedulerException {
-        Scheduler scheduler = factory.getObject();
+        Scheduler scheduler = factory.getScheduler();
         JobKey jobKey = new JobKey(name, group);
         if (!scheduler.checkExists(jobKey)) {
             LOGGER.warn("Job({},{}) does not exist.", jobKey.getGroup(), 
jobKey.getName());
@@ -388,7 +404,7 @@ public class JobServiceImpl implements JobService {
     }
 
     private void deleteJob(String group, String name) throws 
SchedulerException {
-        Scheduler scheduler = factory.getObject();
+        Scheduler scheduler = factory.getScheduler();
         JobKey jobKey = new JobKey(name, group);
         if (!scheduler.checkExists(jobKey)) {
             LOGGER.info("Job({},{}) does not exist.", jobKey.getGroup(), 
jobKey.getName());
@@ -450,7 +466,6 @@ public class JobServiceImpl implements JobService {
         }
     }
 
-
     /**
      * call livy to update part of job instance table data associated with 
group and jobName in mysql.
      *
@@ -476,7 +491,6 @@ public class JobServiceImpl implements JobService {
         }
     }
 
-
     private void setJobInstanceIdAndUri(JobInstanceBean instance, 
HashMap<String, Object> resultMap) {
         if (resultMap != null && resultMap.size() != 0 && 
resultMap.get("state") != null) {
             
instance.setState(LivySessionStates.State.valueOf(resultMap.get("state").toString()));
@@ -528,7 +542,7 @@ public class JobServiceImpl implements JobService {
         JobKey jobKey = new JobKey(job.getQuartzName(), job.getQuartzGroup());
         List<Trigger> triggers;
         try {
-            triggers = (List<Trigger>) 
factory.getObject().getTriggersOfJob(jobKey);
+            triggers = (List<Trigger>) 
factory.getScheduler().getTriggersOfJob(jobKey);
         } catch (SchedulerException e) {
             LOGGER.error("Job schedule exception. {}", e.getMessage());
             throw new GriffinException.ServiceException("Fail to Get 
HealthInfo", e);

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fd31809a/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java
----------------------------------------------------------------------
diff --git 
a/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java 
b/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java
index 9a9785f..f1b254a 100644
--- a/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java
+++ b/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java
@@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.util.CollectionUtils;
+import org.springframework.util.StringUtils;
 import org.springframework.web.client.RestTemplate;
 
 import java.io.IOException;
@@ -58,7 +59,7 @@ public class SparkSubmitJob implements Job {
 
     private GriffinMeasure measure;
     private String livyUri;
-    private List<SegmentPredicate> mPredicts;
+    private List<SegmentPredicate> mPredicates;
     private JobInstanceBean jobInstance;
     private RestTemplate restTemplate = new RestTemplate();
     private LivyConf livyConf = new LivyConf();
@@ -69,7 +70,7 @@ public class SparkSubmitJob implements Job {
         try {
             initParam(jd);
             setLivyConf();
-            if (!success(mPredicts)) {
+            if (!success(mPredicates)) {
                 updateJobInstanceState(context);
                 return;
             }
@@ -107,7 +108,7 @@ public class SparkSubmitJob implements Job {
         for (SegmentPredicate segPredicate : predicates) {
             Predicator predicator = 
PredicatorFactory.newPredicateInstance(segPredicate);
             try {
-                if (!predicator.predicate()) {
+                if (predicator != null && !predicator.predicate()) {
                     return false;
                 }
             } catch (Exception e) {
@@ -119,7 +120,7 @@ public class SparkSubmitJob implements Job {
     }
 
     private void initParam(JobDetail jd) throws IOException {
-        mPredicts = new ArrayList<>();
+        mPredicates = new ArrayList<>();
         livyUri = livyConfProps.getProperty("livy.uri");
         jobInstance = 
jobInstanceRepo.findByPredicateName(jd.getJobDataMap().getString(PREDICATE_JOB_NAME));
         measure = JsonUtil.toEntity(jd.getJobDataMap().getString(MEASURE_KEY), 
GriffinMeasure.class);
@@ -128,15 +129,16 @@ public class SparkSubmitJob implements Job {
     }
 
     private void setPredicts(String json) throws IOException {
+        if (StringUtils.isEmpty(json)) {
+            return;
+        }
         List<Map<String, Object>> maps = JsonUtil.toEntity(json, new 
TypeReference<List<Map>>() {
         });
-        if (maps != null) {
-            for (Map<String, Object> map : maps) {
-                SegmentPredicate sp = new SegmentPredicate();
-                sp.setType((String) map.get("type"));
-                sp.setConfigMap((Map<String, String>) map.get("config"));
-                mPredicts.add(sp);
-            }
+        for (Map<String, Object> map : maps) {
+            SegmentPredicate sp = new SegmentPredicate();
+            sp.setType((String) map.get("type"));
+            sp.setConfigMap((Map<String, String>) map.get("config"));
+            mPredicates.add(sp);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fd31809a/service/src/main/java/org/apache/griffin/core/job/entity/JobSchedule.java
----------------------------------------------------------------------
diff --git 
a/service/src/main/java/org/apache/griffin/core/job/entity/JobSchedule.java 
b/service/src/main/java/org/apache/griffin/core/job/entity/JobSchedule.java
index d1dd44f..0a1c2c4 100644
--- a/service/src/main/java/org/apache/griffin/core/job/entity/JobSchedule.java
+++ b/service/src/main/java/org/apache/griffin/core/job/entity/JobSchedule.java
@@ -129,13 +129,15 @@ public class JobSchedule extends AbstractAuditableEntity {
     }
 
     private void setPredicateConfig(String config) throws IOException {
-        this.predicateConfig = config;
-        this.configMap = JsonUtil.toEntity(config, new 
TypeReference<Map<String, Object>>() {
-        });
+        if (!StringUtils.isEmpty(config)) {
+            this.predicateConfig = config;
+            this.configMap = JsonUtil.toEntity(config, new 
TypeReference<Map<String, Object>>() {
+            });
+        }
     }
 
     @JsonProperty("predicate.config")
-    public Map<String, Object> getConfigMap() throws IOException {
+    public Map<String, Object> getConfigMap() {
         return configMap;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fd31809a/service/src/main/java/org/apache/griffin/core/job/entity/SegmentPredicate.java
----------------------------------------------------------------------
diff --git 
a/service/src/main/java/org/apache/griffin/core/job/entity/SegmentPredicate.java
 
b/service/src/main/java/org/apache/griffin/core/job/entity/SegmentPredicate.java
index 78b2794..ac51f97 100644
--- 
a/service/src/main/java/org/apache/griffin/core/job/entity/SegmentPredicate.java
+++ 
b/service/src/main/java/org/apache/griffin/core/job/entity/SegmentPredicate.java
@@ -60,13 +60,15 @@ public class SegmentPredicate extends 
AbstractAuditableEntity {
     }
 
     public void setConfig(String config) throws IOException {
-        this.config = config;
-        this.configMap = JsonUtil.toEntity(config, new 
TypeReference<Map<String, String>>() {
-        });
+        if (!StringUtils.isEmpty(config)) {
+            this.config = config;
+            this.configMap = JsonUtil.toEntity(config, new 
TypeReference<Map<String, String>>() {
+            });
+        }
     }
 
     @JsonProperty("config")
-    public Map<String, String> getConfigMap() throws IOException {
+    public Map<String, String> getConfigMap(){
         return configMap;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fd31809a/service/src/main/java/org/apache/griffin/core/job/entity/SegmentRange.java
----------------------------------------------------------------------
diff --git 
a/service/src/main/java/org/apache/griffin/core/job/entity/SegmentRange.java 
b/service/src/main/java/org/apache/griffin/core/job/entity/SegmentRange.java
index 5393f22..fbd5fbc 100644
--- a/service/src/main/java/org/apache/griffin/core/job/entity/SegmentRange.java
+++ b/service/src/main/java/org/apache/griffin/core/job/entity/SegmentRange.java
@@ -29,7 +29,7 @@ import javax.persistence.Entity;
 public class SegmentRange extends AbstractAuditableEntity {
 
     @Column(name = "data_begin")
-    private String begin = "1h";
+    private String begin = "-1h";
 
     private String length = "1h";
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fd31809a/service/src/main/java/org/apache/griffin/core/job/factory/PredicatorFactory.java
----------------------------------------------------------------------
diff --git 
a/service/src/main/java/org/apache/griffin/core/job/factory/PredicatorFactory.java
 
b/service/src/main/java/org/apache/griffin/core/job/factory/PredicatorFactory.java
index 8af39f4..3aa7403 100644
--- 
a/service/src/main/java/org/apache/griffin/core/job/factory/PredicatorFactory.java
+++ 
b/service/src/main/java/org/apache/griffin/core/job/factory/PredicatorFactory.java
@@ -22,8 +22,11 @@ package org.apache.griffin.core.job.factory;
 import org.apache.griffin.core.job.FileExistPredicator;
 import org.apache.griffin.core.job.Predicator;
 import org.apache.griffin.core.job.entity.SegmentPredicate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class PredicatorFactory {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(PredicatorFactory.class);
     public static Predicator newPredicateInstance(SegmentPredicate 
segPredicate) {
         Predicator predicate = null;
         switch (segPredicate.getType()) {
@@ -31,6 +34,7 @@ public class PredicatorFactory {
                 predicate = new FileExistPredicator(segPredicate);
                 break;
             default:
+                LOGGER.warn("There is no predicate type that you input.");
                 break;
         }
         return predicate;

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fd31809a/service/src/main/java/org/apache/griffin/core/job/repo/JobInstanceRepo.java
----------------------------------------------------------------------
diff --git 
a/service/src/main/java/org/apache/griffin/core/job/repo/JobInstanceRepo.java 
b/service/src/main/java/org/apache/griffin/core/job/repo/JobInstanceRepo.java
index 1714789..c873a97 100644
--- 
a/service/src/main/java/org/apache/griffin/core/job/repo/JobInstanceRepo.java
+++ 
b/service/src/main/java/org/apache/griffin/core/job/repo/JobInstanceRepo.java
@@ -40,7 +40,7 @@ public interface JobInstanceRepo extends 
CrudRepository<JobInstanceBean, Long> {
 
     List<JobInstanceBean> findByExpireTmsLessThanEqual(Long expireTms);
 
-    @Transactional
+    @Transactional(rollbackFor = Exception.class)
     @Modifying
     @Query("delete from JobInstanceBean j where j.expireTms <= ?1")
     int deleteByExpireTimestamp(Long expireTms);

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fd31809a/service/src/main/java/org/apache/griffin/core/job/repo/JobScheduleRepo.java
----------------------------------------------------------------------
diff --git 
a/service/src/main/java/org/apache/griffin/core/job/repo/JobScheduleRepo.java 
b/service/src/main/java/org/apache/griffin/core/job/repo/JobScheduleRepo.java
index 1b360e4..49e5db9 100644
--- 
a/service/src/main/java/org/apache/griffin/core/job/repo/JobScheduleRepo.java
+++ 
b/service/src/main/java/org/apache/griffin/core/job/repo/JobScheduleRepo.java
@@ -23,4 +23,6 @@ import org.apache.griffin.core.job.entity.JobSchedule;
 import org.springframework.data.repository.CrudRepository;
 
 public interface JobScheduleRepo extends CrudRepository<JobSchedule, Long> {
+
+    JobSchedule findByJobName(String jobName);
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fd31809a/service/src/main/java/org/apache/griffin/core/measure/ExternalMeasureOperationImpl.java
----------------------------------------------------------------------
diff --git 
a/service/src/main/java/org/apache/griffin/core/measure/ExternalMeasureOperationImpl.java
 
b/service/src/main/java/org/apache/griffin/core/measure/ExternalMeasureOperationImpl.java
index dc7c056..bb76c09 100644
--- 
a/service/src/main/java/org/apache/griffin/core/measure/ExternalMeasureOperationImpl.java
+++ 
b/service/src/main/java/org/apache/griffin/core/measure/ExternalMeasureOperationImpl.java
@@ -19,8 +19,6 @@ under the License.
 
 package org.apache.griffin.core.measure;
 
-import org.apache.commons.lang.StringUtils;
-import org.apache.griffin.core.exception.GriffinException;
 import org.apache.griffin.core.job.entity.VirtualJob;
 import org.apache.griffin.core.job.repo.VirtualJobRepo;
 import org.apache.griffin.core.measure.entity.ExternalMeasure;
@@ -32,7 +30,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 import org.springframework.transaction.annotation.Transactional;
 
-import static 
org.apache.griffin.core.exception.GriffinExceptionMessage.MISSING_METRIC_NAME;
+import static org.apache.griffin.core.util.MeasureUtil.validateMeasure;
 
 @Component("externalOperation")
 public class ExternalMeasureOperationImpl implements MeasureOperation {
@@ -47,11 +45,7 @@ public class ExternalMeasureOperationImpl implements 
MeasureOperation {
     @Transactional
     public Measure create(Measure measure) {
         ExternalMeasure em = (ExternalMeasure) measure;
-        if (StringUtils.isBlank(em.getMetricName())) {
-            LOGGER.warn("Failed to create external measure {}. Its metric name 
is blank.", measure.getName());
-            throw new 
GriffinException.BadRequestException(MISSING_METRIC_NAME);
-
-        }
+        validateMeasure(em);
         em.setVirtualJob(new VirtualJob());
         em = measureRepo.save(em);
         VirtualJob vj = genVirtualJob(em, em.getVirtualJob());
@@ -62,10 +56,7 @@ public class ExternalMeasureOperationImpl implements 
MeasureOperation {
     @Override
     public void update(Measure measure) {
         ExternalMeasure latestMeasure = (ExternalMeasure) measure;
-        if (StringUtils.isBlank(latestMeasure.getMetricName())) {
-            LOGGER.warn("Failed to update external measure {}. Its metric name 
is blank.", measure.getName());
-            throw new 
GriffinException.BadRequestException(MISSING_METRIC_NAME);
-        }
+        validateMeasure(latestMeasure);
         ExternalMeasure originMeasure = 
measureRepo.findOne(latestMeasure.getId());
         VirtualJob vj = genVirtualJob(latestMeasure, 
originMeasure.getVirtualJob());
         latestMeasure.setVirtualJob(vj);

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fd31809a/service/src/main/java/org/apache/griffin/core/measure/GriffinMeasureOperationImpl.java
----------------------------------------------------------------------
diff --git 
a/service/src/main/java/org/apache/griffin/core/measure/GriffinMeasureOperationImpl.java
 
b/service/src/main/java/org/apache/griffin/core/measure/GriffinMeasureOperationImpl.java
index 22561a4..c15ff23 100644
--- 
a/service/src/main/java/org/apache/griffin/core/measure/GriffinMeasureOperationImpl.java
+++ 
b/service/src/main/java/org/apache/griffin/core/measure/GriffinMeasureOperationImpl.java
@@ -19,46 +19,41 @@ under the License.
 
 package org.apache.griffin.core.measure;
 
-import org.apache.commons.lang.StringUtils;
-import org.apache.griffin.core.exception.GriffinException;
 import org.apache.griffin.core.job.JobServiceImpl;
-import org.apache.griffin.core.measure.entity.DataConnector;
-import org.apache.griffin.core.measure.entity.DataSource;
-import org.apache.griffin.core.measure.entity.GriffinMeasure;
 import org.apache.griffin.core.measure.entity.Measure;
-import org.apache.griffin.core.measure.repo.DataConnectorRepo;
 import org.apache.griffin.core.measure.repo.MeasureRepo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
-import org.springframework.util.CollectionUtils;
 
-import java.util.ArrayList;
-import java.util.List;
-
-import static 
org.apache.griffin.core.exception.GriffinExceptionMessage.INVALID_CONNECTOR_NAME;
+import static org.apache.griffin.core.util.MeasureUtil.validateMeasure;
 
 @Component("griffinOperation")
 public class GriffinMeasureOperationImpl implements MeasureOperation {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(GriffinMeasureOperationImpl.class);
 
+    private final MeasureRepo<Measure> measureRepo;
+
+    private final JobServiceImpl jobService;
+
     @Autowired
-    private MeasureRepo<Measure> measureRepo;
-    @Autowired
-    private DataConnectorRepo dcRepo;
-    @Autowired
-    private JobServiceImpl jobService;
+    public GriffinMeasureOperationImpl(MeasureRepo<Measure> measureRepo, 
JobServiceImpl jobService) {
+        this.measureRepo = measureRepo;
+        this.jobService = jobService;
+    }
 
 
     @Override
     public Measure create(Measure measure) {
-        checkConnectorNames((GriffinMeasure) measure);
+        validateMeasure(measure);
         return measureRepo.save(measure);
     }
 
     @Override
     public void update(Measure measure) {
+        validateMeasure(measure);
+        measure.setDeleted(false);
         measureRepo.save(measure);
     }
 
@@ -68,30 +63,4 @@ public class GriffinMeasureOperationImpl implements 
MeasureOperation {
         measure.setDeleted(true);
         measureRepo.save(measure);
     }
-
-    private void checkConnectorNames(GriffinMeasure measure) {
-        List<String> names = getConnectorNames(measure);
-        if (names.size() == 0) {
-            LOGGER.warn("Connector names cannot be empty.");
-            throw new 
GriffinException.BadRequestException(INVALID_CONNECTOR_NAME);
-        }
-        List<DataConnector> connectors = dcRepo.findByConnectorNames(names);
-        if (!CollectionUtils.isEmpty(connectors)) {
-            LOGGER.warn("Failed to create new measure {}. It's connector names 
already exist. ", measure.getName());
-            throw new 
GriffinException.BadRequestException(INVALID_CONNECTOR_NAME);
-        }
-    }
-
-    private List<String> getConnectorNames(GriffinMeasure measure) {
-        List<String> names = new ArrayList<>();
-        for (DataSource source : measure.getDataSources()) {
-            for (DataConnector dc : source.getConnectors()) {
-                String name = dc.getName();
-                if (!StringUtils.isEmpty(name)) {
-                    names.add(name);
-                }
-            }
-        }
-        return names;
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fd31809a/service/src/main/java/org/apache/griffin/core/measure/MeasureController.java
----------------------------------------------------------------------
diff --git 
a/service/src/main/java/org/apache/griffin/core/measure/MeasureController.java 
b/service/src/main/java/org/apache/griffin/core/measure/MeasureController.java
index 91ffaac..eda0a1f 100644
--- 
a/service/src/main/java/org/apache/griffin/core/measure/MeasureController.java
+++ 
b/service/src/main/java/org/apache/griffin/core/measure/MeasureController.java
@@ -48,6 +48,12 @@ public class MeasureController {
         measureService.deleteMeasureById(id);
     }
 
+    @RequestMapping(value = "/measures", method = RequestMethod.DELETE)
+    @ResponseStatus(HttpStatus.NO_CONTENT)
+    public void deleteMeasures() {
+        measureService.deleteMeasures();
+    }
+
     @RequestMapping(value = "/measures", method = RequestMethod.PUT)
     @ResponseStatus(HttpStatus.NO_CONTENT)
     public void updateMeasure(@RequestBody Measure measure) {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fd31809a/service/src/main/java/org/apache/griffin/core/measure/MeasureService.java
----------------------------------------------------------------------
diff --git 
a/service/src/main/java/org/apache/griffin/core/measure/MeasureService.java 
b/service/src/main/java/org/apache/griffin/core/measure/MeasureService.java
index 818db47..a3d9640 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/MeasureService.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/MeasureService.java
@@ -30,17 +30,9 @@ public interface MeasureService {
 
     Measure getMeasureById(long id);
 
-/*
-    Measure getMeasureByName(String measureName);
-*/
-
-
     void deleteMeasureById(Long id);
 
-
-/*
-    GriffinOperationMessage deleteMeasureByName(String measureName) ;
-*/
+    void deleteMeasures();
 
     void updateMeasure(Measure measure);
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fd31809a/service/src/main/java/org/apache/griffin/core/measure/MeasureServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/service/src/main/java/org/apache/griffin/core/measure/MeasureServiceImpl.java 
b/service/src/main/java/org/apache/griffin/core/measure/MeasureServiceImpl.java
index d9d7cd8..66d7252 100644
--- 
a/service/src/main/java/org/apache/griffin/core/measure/MeasureServiceImpl.java
+++ 
b/service/src/main/java/org/apache/griffin/core/measure/MeasureServiceImpl.java
@@ -102,6 +102,15 @@ public class MeasureServiceImpl implements MeasureService {
         op.delete(measure);
     }
 
+    @Override
+    public void deleteMeasures() {
+        List<Measure> measures = measureRepo.findByDeleted(false);
+        for (Measure m : measures) {
+            MeasureOperation op = getOperation(m);
+            op.delete(m);
+        }
+    }
+
     private MeasureOperation getOperation(Measure measure) {
         if (measure instanceof GriffinMeasure) {
             return griffinOp;

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fd31809a/service/src/main/java/org/apache/griffin/core/measure/entity/DataConnector.java
----------------------------------------------------------------------
diff --git 
a/service/src/main/java/org/apache/griffin/core/measure/entity/DataConnector.java
 
b/service/src/main/java/org/apache/griffin/core/measure/entity/DataConnector.java
index 8a87ea5..dae35e1 100644
--- 
a/service/src/main/java/org/apache/griffin/core/measure/entity/DataConnector.java
+++ 
b/service/src/main/java/org/apache/griffin/core/measure/entity/DataConnector.java
@@ -53,6 +53,9 @@ public class DataConnector extends AbstractAuditableEntity {
     @JsonInclude(JsonInclude.Include.NON_NULL)
     private String dataUnit;
 
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    private String dataTimeZone;
+
     @JsonIgnore
     @Transient
     private String defaultDataUnit = "365000d";
@@ -77,7 +80,7 @@ public class DataConnector extends AbstractAuditableEntity {
     }
 
     @JsonProperty("config")
-    public Map<String, String> getConfigMap() throws IOException {
+    public Map<String, String> getConfigMap() {
         return configMap;
     }
 
@@ -88,12 +91,14 @@ public class DataConnector extends AbstractAuditableEntity {
     }
 
     public void setConfig(String config) throws IOException {
-        this.config = config;
-        this.configMap = JsonUtil.toEntity(config, new 
TypeReference<Map<String, String>>() {
-        });
+        if (!StringUtils.isEmpty(config)) {
+            this.config = config;
+            this.configMap = JsonUtil.toEntity(config, new 
TypeReference<Map<String, String>>() {
+            });
+        }
     }
 
-    public String getConfig() throws IOException {
+    public String getConfig() {
         return config;
     }
 
@@ -107,6 +112,16 @@ public class DataConnector extends AbstractAuditableEntity 
{
         this.dataUnit = dataUnit;
     }
 
+    @JsonProperty("data.time.zone")
+    public String getDataTimeZone() {
+        return dataTimeZone;
+    }
+
+    @JsonProperty("data.time.zone")
+    public void setDataTimeZone(String dataTimeZone) {
+        this.dataTimeZone = dataTimeZone;
+    }
+
     public String getDefaultDataUnit() {
         return defaultDataUnit;
     }
@@ -156,7 +171,7 @@ public class DataConnector extends AbstractAuditableEntity {
         });
     }
 
-    public DataConnector(String name, String dataUnit, Map 
configMap,List<SegmentPredicate> predicates) throws IOException {
+    public DataConnector(String name, String dataUnit, Map configMap, 
List<SegmentPredicate> predicates) throws IOException {
         this.name = name;
         this.dataUnit = dataUnit;
         this.configMap = configMap;

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fd31809a/service/src/main/java/org/apache/griffin/core/measure/entity/GriffinMeasure.java
----------------------------------------------------------------------
diff --git 
a/service/src/main/java/org/apache/griffin/core/measure/entity/GriffinMeasure.java
 
b/service/src/main/java/org/apache/griffin/core/measure/entity/GriffinMeasure.java
index 8576b86..d597a6d 100644
--- 
a/service/src/main/java/org/apache/griffin/core/measure/entity/GriffinMeasure.java
+++ 
b/service/src/main/java/org/apache/griffin/core/measure/entity/GriffinMeasure.java
@@ -19,14 +19,21 @@ under the License.
 
 package org.apache.griffin.core.measure.entity;
 
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.griffin.core.util.JsonUtil;
+import org.springframework.util.StringUtils;
 
 import javax.persistence.*;
 import javax.validation.constraints.NotNull;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Measures processed on Griffin
@@ -40,6 +47,15 @@ public class GriffinMeasure extends Measure {
     @JsonInclude(JsonInclude.Include.NON_NULL)
     private Long timestamp;
 
+    @JsonIgnore
+    @Access(AccessType.PROPERTY)
+    @Column(length = 1024)
+    private String ruleDescription;
+
+    @Transient
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    private Map<String, Object> ruleDescriptionMap;
+
     @NotNull
     @OneToMany(fetch = FetchType.EAGER, cascade = {CascadeType.PERSIST, 
CascadeType.REMOVE, CascadeType.MERGE})
     @JoinColumn(name = "measure_id")
@@ -86,6 +102,29 @@ public class GriffinMeasure extends Measure {
         this.evaluateRule = evaluateRule;
     }
 
+    public String getRuleDescription() {
+        return ruleDescription;
+    }
+
+    public void setRuleDescription(String ruleDescription) throws IOException {
+        if (!StringUtils.isEmpty(ruleDescription)) {
+            this.ruleDescription = ruleDescription;
+            this.ruleDescriptionMap = JsonUtil.toEntity(ruleDescription, new 
TypeReference<Map<String, Object>>() {
+            });
+        }
+    }
+
+    @JsonProperty("rule.description")
+    public Map<String, Object> getRuleDescriptionMap() {
+        return ruleDescriptionMap;
+    }
+
+    @JsonProperty("rule.description")
+    public void setRuleDescriptionMap(Map<String, Object> ruleDescriptionMap) 
throws JsonProcessingException {
+        this.ruleDescriptionMap = ruleDescriptionMap;
+        this.ruleDescription = JsonUtil.toJson(ruleDescriptionMap);
+    }
+
     public Long getTimestamp() {
         return timestamp;
     }
@@ -110,7 +149,7 @@ public class GriffinMeasure extends Measure {
         this.evaluateRule = evaluateRule;
     }
 
-    public GriffinMeasure(Long measureId,String name, String owner, 
List<DataSource> dataSources, EvaluateRule evaluateRule) {
+    public GriffinMeasure(Long measureId, String name, String owner, 
List<DataSource> dataSources, EvaluateRule evaluateRule) {
         this.setId(measureId);
         this.name = name;
         this.owner = owner;

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fd31809a/service/src/main/java/org/apache/griffin/core/measure/entity/Rule.java
----------------------------------------------------------------------
diff --git 
a/service/src/main/java/org/apache/griffin/core/measure/entity/Rule.java 
b/service/src/main/java/org/apache/griffin/core/measure/entity/Rule.java
index 8a61bfb..a7b424d 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/entity/Rule.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/entity/Rule.java
@@ -22,10 +22,13 @@ package org.apache.griffin.core.measure.entity;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
 import org.apache.griffin.core.util.JsonUtil;
+import org.springframework.util.StringUtils;
 
 import javax.persistence.*;
+import javax.validation.constraints.NotNull;
 import java.io.IOException;
 import java.util.Map;
 
@@ -36,18 +39,18 @@ public class Rule extends AbstractAuditableEntity {
     /**
      * three type:1.griffin-dsl 2.df-opr 3.spark-sql
      */
+    @NotNull
     private String dslType;
 
+    @NotNull
     private String dqType;
 
     @Column(length = 8 * 1024)
+    @NotNull
     private String rule;
 
     private String name;
 
-    @Column(length = 1024)
-    private String description;
-
     @JsonIgnore
     @Access(AccessType.PROPERTY)
     @Column(length = 1024)
@@ -57,6 +60,22 @@ public class Rule extends AbstractAuditableEntity {
     @JsonInclude(JsonInclude.Include.NON_NULL)
     private Map<String, Object> detailsMap;
 
+    @JsonIgnore
+    @Access(AccessType.PROPERTY)
+    private String metric;
+
+    @Transient
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    private Map<String, Object> metricMap;
+
+    @JsonIgnore
+    @Access(AccessType.PROPERTY)
+    private String record;
+
+    @Transient
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    private Map<String, Object> recordMap;
+
     @JsonProperty("dsl.type")
     public String getDslType() {
         return dslType;
@@ -90,9 +109,11 @@ public class Rule extends AbstractAuditableEntity {
     }
 
     private void setDetails(String details) throws IOException {
-        this.details = details;
-        detailsMap = JsonUtil.toEntity(details, new TypeReference<Map<String, 
Object>>() {
-        });
+        if (!StringUtils.isEmpty(details)) {
+            this.details = details;
+            this.detailsMap = JsonUtil.toEntity(details, new 
TypeReference<Map<String, Object>>() {
+            });
+        }
     }
 
     @JsonProperty("details")
@@ -106,20 +127,58 @@ public class Rule extends AbstractAuditableEntity {
         this.details = JsonUtil.toJson(details);
     }
 
-    public String getName() {
-        return name;
+    public String getMetric() {
+        return metric;
     }
 
-    public void setName(String name) {
-        this.name = name;
+    public void setMetric(String metric) throws IOException {
+        if (!StringUtils.isEmpty(metric)) {
+            this.metric = metric;
+            this.metricMap = JsonUtil.toEntity(metric, new 
TypeReference<Map<String, Object>>() {
+            });
+        }
+    }
+
+    @JsonProperty("metric")
+    public Map<String, Object> getMetricMap() {
+        return metricMap;
+    }
+
+    @JsonProperty("metric")
+    public void setMetricMap(Map<String, Object> metricMap) throws 
JsonProcessingException {
+        this.metricMap = metricMap;
+        this.metric = JsonUtil.toJson(metricMap);
+    }
+
+    public String getRecord() {
+        return record;
+    }
+
+    public void setRecord(String record) throws IOException {
+        if (!StringUtils.isEmpty(record)) {
+            this.record = record;
+            this.recordMap = JsonUtil.toEntity(record, new 
TypeReference<Map<String, Object>>() {
+            });
+        }
     }
 
-    public String getDescription() {
-        return description;
+    @JsonProperty("record")
+    public Map<String, Object> getRecordMap() {
+        return recordMap;
     }
 
-    public void setDescription(String description) {
-        this.description = description;
+    @JsonProperty("record")
+    public void setRecordMap(Map<String, Object> recordMap) throws 
JsonProcessingException {
+        this.recordMap = recordMap;
+        this.record = JsonUtil.toJson(recordMap);
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
     }
 
     public Rule() {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fd31809a/service/src/main/java/org/apache/griffin/core/metastore/hive/HiveMetaStoreProxy.java
----------------------------------------------------------------------
diff --git 
a/service/src/main/java/org/apache/griffin/core/metastore/hive/HiveMetaStoreProxy.java
 
b/service/src/main/java/org/apache/griffin/core/metastore/hive/HiveMetaStoreProxy.java
index 4df5796..f632f14 100644
--- 
a/service/src/main/java/org/apache/griffin/core/metastore/hive/HiveMetaStoreProxy.java
+++ 
b/service/src/main/java/org/apache/griffin/core/metastore/hive/HiveMetaStoreProxy.java
@@ -70,7 +70,7 @@ public class HiveMetaStoreProxy {
     }
 
     @PreDestroy
-    public void destroy() throws Exception {
+    public void destroy() {
         if (null != client) {
             client.close();
         }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fd31809a/service/src/main/java/org/apache/griffin/core/metastore/hive/HiveMetaStoreServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/service/src/main/java/org/apache/griffin/core/metastore/hive/HiveMetaStoreServiceImpl.java
 
b/service/src/main/java/org/apache/griffin/core/metastore/hive/HiveMetaStoreServiceImpl.java
index 8c41008..e941aaa 100644
--- 
a/service/src/main/java/org/apache/griffin/core/metastore/hive/HiveMetaStoreServiceImpl.java
+++ 
b/service/src/main/java/org/apache/griffin/core/metastore/hive/HiveMetaStoreServiceImpl.java
@@ -59,14 +59,6 @@ public class HiveMetaStoreServiceImpl implements 
HiveMetaStoreService {
         LOGGER.info("HiveMetaStoreServiceImpl single thread pool created.");
     }
 
-    private String getUseDbName(String dbName) {
-        if (!StringUtils.hasText(dbName)) {
-            return defaultDbName;
-        } else {
-            return dbName;
-        }
-    }
-
     @Override
     @Cacheable
     public Iterable<String> getAllDatabases() {
@@ -121,10 +113,11 @@ public class HiveMetaStoreServiceImpl implements 
HiveMetaStoreService {
             return results;
         }
         dbs = getAllDatabases();
-        if (dbs != null) {
-            for (String db : dbs) {
-                results.put(db, getTables(db));
-            }
+        if (dbs == null) {
+            return results;
+        }
+        for (String db : dbs) {
+            results.put(db, getTables(db));
         }
         return results;
     }
@@ -168,6 +161,14 @@ public class HiveMetaStoreServiceImpl implements 
HiveMetaStoreService {
         return allTables;
     }
 
+    private String getUseDbName(String dbName) {
+        if (!StringUtils.hasText(dbName)) {
+            return defaultDbName;
+        } else {
+            return dbName;
+        }
+    }
+
     private void reconnect() {
         if (singleThreadExecutor.getActiveCount() == 0) {
             System.out.println("execute create thread.");

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fd31809a/service/src/main/java/org/apache/griffin/core/metric/MetricStoreImpl.java
----------------------------------------------------------------------
diff --git 
a/service/src/main/java/org/apache/griffin/core/metric/MetricStoreImpl.java 
b/service/src/main/java/org/apache/griffin/core/metric/MetricStoreImpl.java
index 360d4ec..a080f30 100644
--- a/service/src/main/java/org/apache/griffin/core/metric/MetricStoreImpl.java
+++ b/service/src/main/java/org/apache/griffin/core/metric/MetricStoreImpl.java
@@ -116,9 +116,10 @@ public class MetricStoreImpl implements MetricStore {
         if (jsonNode.hasNonNull("hits") && 
jsonNode.get("hits").hasNonNull("hits")) {
             for (JsonNode node : jsonNode.get("hits").get("hits")) {
                 JsonNode sourceNode = node.get("_source");
-                metricValues.add(new 
MetricValue(sourceNode.get("name").asText(), 
Long.parseLong(sourceNode.get("tmst").asText()),
-                        JsonUtil.toEntity(sourceNode.get("value").toString(), 
new TypeReference<Map<String, Object>>() {
-                        })));
+                Map<String, Object> value = 
JsonUtil.toEntity(sourceNode.get("value").toString(), new 
TypeReference<Map<String, Object>>() {
+                });
+                MetricValue metricValue = new 
MetricValue(sourceNode.get("name").asText(), 
Long.parseLong(sourceNode.get("tmst").asText()), value);
+                metricValues.add(metricValue);
             }
         }
         return metricValues;

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fd31809a/service/src/main/java/org/apache/griffin/core/util/MeasureUtil.java
----------------------------------------------------------------------
diff --git 
a/service/src/main/java/org/apache/griffin/core/util/MeasureUtil.java 
b/service/src/main/java/org/apache/griffin/core/util/MeasureUtil.java
new file mode 100644
index 0000000..f59d614
--- /dev/null
+++ b/service/src/main/java/org/apache/griffin/core/util/MeasureUtil.java
@@ -0,0 +1,76 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.util;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.griffin.core.exception.GriffinException;
+import org.apache.griffin.core.measure.entity.DataSource;
+import org.apache.griffin.core.measure.entity.ExternalMeasure;
+import org.apache.griffin.core.measure.entity.GriffinMeasure;
+import org.apache.griffin.core.measure.entity.Measure;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static 
org.apache.griffin.core.exception.GriffinExceptionMessage.INVALID_CONNECTOR_NAME;
+import static 
org.apache.griffin.core.exception.GriffinExceptionMessage.MISSING_METRIC_NAME;
+
+public class MeasureUtil {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(MeasureUtil.class);
+
+    public static void validateMeasure(Measure measure) {
+        if (measure instanceof GriffinMeasure) {
+            validateGriffinMeasure((GriffinMeasure) measure);
+        } else if (measure instanceof ExternalMeasure) {
+            validateExternalMeasure((ExternalMeasure) measure);
+        }
+
+    }
+
+    private static void validateGriffinMeasure(GriffinMeasure measure) {
+        if (getConnectorNamesIfValid(measure) == null) {
+            throw new 
GriffinException.BadRequestException(INVALID_CONNECTOR_NAME);
+        }
+    }
+
+    private static void validateExternalMeasure(ExternalMeasure measure) {
+        if (StringUtils.isBlank(measure.getMetricName())) {
+            LOGGER.warn("Failed to create external measure {}. Its metric name 
is blank.", measure.getName());
+            throw new 
GriffinException.BadRequestException(MISSING_METRIC_NAME);
+        }
+    }
+
+    private static List<String> getConnectorNamesIfValid(GriffinMeasure 
measure) {
+        Set<String> sets = new HashSet<>();
+        List<DataSource> sources = measure.getDataSources();
+        for (DataSource source : sources) {
+            source.getConnectors().stream().filter(dc -> dc.getName() != 
null).forEach(dc -> sets.add(dc.getName()));
+        }
+        if (sets.size() == 0 || sets.size() < sources.size()) {
+            LOGGER.warn("Connector names cannot be repeated or empty.");
+            return null;
+        }
+        return new ArrayList<>(sets);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fd31809a/service/src/main/java/org/apache/griffin/core/util/PropertiesUtil.java
----------------------------------------------------------------------
diff --git 
a/service/src/main/java/org/apache/griffin/core/util/PropertiesUtil.java 
b/service/src/main/java/org/apache/griffin/core/util/PropertiesUtil.java
index 728ee9e..415c9c1 100644
--- a/service/src/main/java/org/apache/griffin/core/util/PropertiesUtil.java
+++ b/service/src/main/java/org/apache/griffin/core/util/PropertiesUtil.java
@@ -19,11 +19,17 @@ under the License.
 
 package org.apache.griffin.core.util;
 
+import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.config.PropertiesFactoryBean;
+import org.springframework.core.io.ClassPathResource;
+import org.springframework.core.io.InputStreamResource;
 import org.springframework.core.io.Resource;
 
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.Properties;
 
@@ -43,4 +49,50 @@ public class PropertiesUtil {
         }
         return properties;
     }
+
+    /**
+     * @param name properties name like sparkJob.properties
+     * @param defaultPath properties classpath like /application.properties
+     * @param location custom properties path
+     * @return Properties
+     * @throws FileNotFoundException location setting is wrong that there is 
no target file.
+     */
+    public static Properties getConf(String name, String defaultPath, String 
location) throws FileNotFoundException {
+        String path = getConfPath(name, location);
+        Resource resource;
+        if (path == null) {
+            resource = new ClassPathResource(defaultPath);
+            path = defaultPath;
+        } else {
+            resource = new InputStreamResource(new FileInputStream(path));
+        }
+        return PropertiesUtil.getProperties(path, resource);
+    }
+
+    private static String getConfPath(String name, String location) throws 
FileNotFoundException {
+        if (StringUtils.isEmpty(location)) {
+            LOGGER.info("Config location is empty. Read from default path.");
+            return null;
+        }
+        File file = new File(location);
+        LOGGER.info("File absolute path:" + file.getAbsolutePath());
+        File[] files = file.listFiles();
+        if (files == null) {
+            LOGGER.warn("The defaultPath {} does not exist.Please check your 
config in application.properties.", location);
+            throw new FileNotFoundException();
+        }
+        return getConfPath(name, files, location);
+    }
+
+    private static String getConfPath(String name, File[] files, String 
location) {
+        String path = null;
+        for (File f : files) {
+            if (f.getName().equals(name)) {
+                path = location + File.separator + name;
+                LOGGER.info("config real path: {}", path);
+            }
+        }
+        return path;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fd31809a/service/src/main/java/org/apache/griffin/core/util/TimeUtil.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/util/TimeUtil.java 
b/service/src/main/java/org/apache/griffin/core/util/TimeUtil.java
index 5f4396a..75f068d 100644
--- a/service/src/main/java/org/apache/griffin/core/util/TimeUtil.java
+++ b/service/src/main/java/org/apache/griffin/core/util/TimeUtil.java
@@ -23,13 +23,31 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.text.SimpleDateFormat;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.TimeZone;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 public class TimeUtil {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(TimeUtil.class);
+    private static final String MILLISECONDS_PATTERN = 
"(?i)m(illi)?s(ec(ond)?)?";
+    private static final String SECONDS_PATTERN = "(?i)s(ec(ond)?)?";
+    private static final String MINUTES_PATTERN = "(?i)m(in(ute)?)?";
+    private static final String HOURS_PATTERN = "(?i)h((ou)?r)?";
+    private static final String DAYS_PATTERN = "(?i)d(ay)?";
+
+    private static class TimeUnitPair {
+        private long t;
+        private String unit;
+
+        TimeUnitPair(long t, String unit) {
+            this.t = t;
+            this.unit = unit;
+        }
+    }
 
     public static Long str2Long(String timeStr) {
         if (timeStr == null) {
@@ -42,18 +60,14 @@ public class TimeUtil {
             trimTimeStr = trimTimeStr.substring(1);
             positive = false;
         }
+        List<TimeUnitPair> list = getTimeUnitPairs(trimTimeStr);
+        return str2Long(positive, list);
+    }
 
-        String timePattern = "(?i)\\d+(ms|s|m|h|d)";
-        Pattern pattern = Pattern.compile(timePattern);
-        Matcher matcher = pattern.matcher(trimTimeStr);
-        List<String> list = new ArrayList<>();
-        while (matcher.find()) {
-            String group = matcher.group();
-            list.add(group.toLowerCase());
-        }
+    private static Long str2Long(boolean positive, List<TimeUnitPair> list) {
         long time = 0;
-        for (String aList : list) {
-            long t = milliseconds(aList.toLowerCase());
+        for (TimeUnitPair tu : list) {
+            long t = milliseconds(tu);
             if (positive) {
                 time += t;
             } else {
@@ -63,19 +77,36 @@ public class TimeUtil {
         return time;
     }
 
-    private static Long milliseconds(String str) {
-        if (str.endsWith("ms")) {
-            return milliseconds(Long.parseLong(str.substring(0, str.length() - 
2)), TimeUnit.MILLISECONDS);
-        } else if (str.endsWith("s")) {
-            return milliseconds(Long.parseLong(str.substring(0, str.length() - 
1)), TimeUnit.SECONDS);
-        } else if (str.endsWith("m")) {
-            return milliseconds(Long.parseLong(str.substring(0, str.length() - 
1)), TimeUnit.MINUTES);
-        } else if (str.endsWith("h")) {
-            return milliseconds(Long.parseLong(str.substring(0, str.length() - 
1)), TimeUnit.HOURS);
-        } else if (str.endsWith("d")) {
-            return milliseconds(Long.parseLong(str.substring(0, str.length() - 
1)), TimeUnit.DAYS);
+    private static List<TimeUnitPair> getTimeUnitPairs(String timeStr) {
+        // "1d2h3m" -> "1d", "2h", "3m"
+        String timePattern = "(?i)(\\d+)([a-zA-Z]+)";
+        Pattern pattern = Pattern.compile(timePattern);
+        Matcher matcher = pattern.matcher(timeStr);
+        List<TimeUnitPair> list = new ArrayList<>();
+        while (matcher.find()) {
+            String num = matcher.group(1);
+            String unit = matcher.group(2);
+            TimeUnitPair tu = new TimeUnitPair(Long.valueOf(num), unit);
+            list.add(tu);
+        }
+        return list;
+    }
+
+    private static Long milliseconds(TimeUnitPair tu) {
+        long t = tu.t;
+        String unit = tu.unit;
+        if (unit.matches(MILLISECONDS_PATTERN)) {
+            return milliseconds(t, TimeUnit.MILLISECONDS);
+        } else if (unit.matches(SECONDS_PATTERN)) {
+            return milliseconds(t, TimeUnit.SECONDS);
+        } else if (unit.matches(MINUTES_PATTERN)) {
+            return milliseconds(t, TimeUnit.MINUTES);
+        } else if (unit.matches(HOURS_PATTERN)) {
+            return milliseconds(t, TimeUnit.HOURS);
+        } else if (unit.matches(DAYS_PATTERN)) {
+            return milliseconds(t, TimeUnit.DAYS);
         } else {
-            LOGGER.error("Time string format error.It only supports 
d(day),h(hour),m(minute),s(second),ms(millsecond).Please check your time 
format.)");
+            LOGGER.error("Time string format error.It only supports 
d(day),h(hour),m(minute),s(second),ms(millsecond).Please check your time 
format.");
             return 0L;
         }
     }
@@ -84,7 +115,7 @@ public class TimeUtil {
         return unit.toMillis(duration);
     }
 
-    public static String format(String timeFormat, long time,String timeZone) {
+    public static String format(String timeFormat, long time, TimeZone 
timeZone) {
         String timePattern = "#(?:\\\\#|[^#])*#";
         Date t = new Date(time);
         Pattern ptn = Pattern.compile(timePattern);
@@ -95,7 +126,7 @@ public class TimeUtil {
             String content = group.substring(1, group.length() - 1);
             String pattern = refreshEscapeHashTag(content);
             SimpleDateFormat sdf = new SimpleDateFormat(pattern);
-            sdf.setTimeZone(TimeZone.getTimeZone(timeZone));
+            sdf.setTimeZone(timeZone);
             matcher.appendReplacement(sb, sdf.format(t));
         }
         matcher.appendTail(sb);

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fd31809a/service/src/main/resources/sparkJob.properties
----------------------------------------------------------------------
diff --git a/service/src/main/resources/sparkJob.properties 
b/service/src/main/resources/sparkJob.properties
index a9be693..6323914 100644
--- a/service/src/main/resources/sparkJob.properties
+++ b/service/src/main/resources/sparkJob.properties
@@ -27,18 +27,17 @@ sparkJob.name=griffin
 sparkJob.queue=default
 
 # options
-sparkJob.numExecutors=10
+sparkJob.numExecutors=2
 sparkJob.executorCores=1
-sparkJob.driverMemory=2g
-sparkJob.executorMemory=2g
+sparkJob.driverMemory=1g
+sparkJob.executorMemory=1g
 
 # shouldn't config in server, but in
-sparkJob.jars = hdfs://livy/spark-avro_2.11-2.0.1.jar;\
-  hdfs://livy/datanucleus-api-jdo-3.2.6.jar;\
-  hdfs://livy/datanucleus-core-3.2.10.jar;\
-  hdfs://livy/datanucleus-rdbms-3.2.9.jar
+sparkJob.jars = hdfs:///livy/datanucleus-api-jdo-3.2.6.jar;\
+  hdfs:///livy/datanucleus-core-3.2.10.jar;\
+  hdfs:///livy/datanucleus-rdbms-3.2.9.jar
 
-spark.yarn.dist.files = hdfs://livy/hive-site.xml
+spark.yarn.dist.files = hdfs:///home/spark_conf/hive-site.xml
 
 # livy
 # livy.uri=http://10.9.246.187:8998/batches


Reply via email to