Repository: incubator-griffin
Updated Branches:
  refs/heads/master b83c58706 -> a723f4753


Get data from data source cache in streaming mode, fix bug of time range

Author: Lionel Liu <bhlx3l...@163.com>

Closes #207 from bhlx3lyx7/tmst.


Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/a723f475
Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/a723f475
Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/a723f475

Branch: refs/heads/master
Commit: a723f4753ef187950aff5b2472cafca7aa9df71a
Parents: b83c587
Author: Lionel Liu <bhlx3l...@163.com>
Authored: Fri Feb 2 18:20:30 2018 +0800
Committer: Lionel Liu <bhlx3l...@163.com>
Committed: Fri Feb 2 18:20:30 2018 +0800

----------------------------------------------------------------------
 .../measure/cache/info/TimeInfoCache.scala      | 33 ++++---
 .../measure/data/connector/DataConnector.scala  |  4 +-
 .../data/source/cache/DataSourceCache.scala     |  4 +-
 .../measure/process/temp/TimeRange.scala        |  9 +-
 .../rule/adaptor/GriffinDslAdaptor.scala        | 10 +-
 .../rule/trans/AccuracyRulePlanTrans.scala      |  4 +-
 .../rule/trans/DistinctnessRulePlanTrans.scala  | 18 +++-
 .../rule/trans/ProfilingRulePlanTrans.scala     |  4 +-
 .../measure/rule/trans/RulePlanTrans.scala      |  8 +-
 .../rule/trans/TimelinessRulePlanTrans.scala    | 99 +++++---------------
 .../rule/trans/UniquenessRulePlanTrans.scala    |  4 +-
 11 files changed, 92 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a723f475/measure/src/main/scala/org/apache/griffin/measure/cache/info/TimeInfoCache.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/cache/info/TimeInfoCache.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/cache/info/TimeInfoCache.scala
index efd12b9..c976453 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/cache/info/TimeInfoCache.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/cache/info/TimeInfoCache.scala
@@ -62,33 +62,42 @@ object TimeInfoCache extends Loggable with Serializable {
     val subPath = InfoCacheInstance.listKeys(infoPath)
     val keys = subPath.map { p => s"${infoPath}/${p}/${ReadyTime}" }
     val result = InfoCacheInstance.readInfo(keys)
-    val time = keys.flatMap { k =>
+    val times = keys.flatMap { k =>
       getLongOpt(result, k)
-    }.min
-    val map = Map[String, String]((finalReadyTime -> time.toString))
-    InfoCacheInstance.cacheInfo(map)
+    }
+    if (times.nonEmpty) {
+      val time = times.min
+      val map = Map[String, String]((finalReadyTime -> time.toString))
+      InfoCacheInstance.cacheInfo(map)
+    }
   }
 
   private def genFinalLastProcTime(): Unit = {
     val subPath = InfoCacheInstance.listKeys(infoPath)
     val keys = subPath.map { p => s"${infoPath}/${p}/${LastProcTime}" }
     val result = InfoCacheInstance.readInfo(keys)
-    val time = keys.flatMap { k =>
+    val times = keys.flatMap { k =>
       getLongOpt(result, k)
-    }.min
-    val map = Map[String, String]((finalLastProcTime -> time.toString))
-    InfoCacheInstance.cacheInfo(map)
+    }
+    if (times.nonEmpty) {
+      val time = times.min
+      val map = Map[String, String]((finalLastProcTime -> time.toString))
+      InfoCacheInstance.cacheInfo(map)
+    }
   }
 
   private def genFinalCleanTime(): Unit = {
     val subPath = InfoCacheInstance.listKeys(infoPath)
     val keys = subPath.map { p => s"${infoPath}/${p}/${CleanTime}" }
     val result = InfoCacheInstance.readInfo(keys)
-    val time = keys.flatMap { k =>
+    val times = keys.flatMap { k =>
       getLongOpt(result, k)
-    }.min
-    val map = Map[String, String]((finalCleanTime -> time.toString))
-    InfoCacheInstance.cacheInfo(map)
+    }
+    if (times.nonEmpty) {
+      val time = times.min
+      val map = Map[String, String]((finalCleanTime -> time.toString))
+      InfoCacheInstance.cacheInfo(map)
+    }
   }
 
   private def readTimeRange(): (Long, Long) = {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a723f475/measure/src/main/scala/org/apache/griffin/measure/data/connector/DataConnector.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/data/connector/DataConnector.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/data/connector/DataConnector.scala
index 1cf3f32..b858991 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/data/connector/DataConnector.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/data/connector/DataConnector.scala
@@ -65,6 +65,8 @@ trait DataConnector extends Loggable with Serializable {
     val thisTable = thisName(ms)
 
     try {
+      saveTmst(ms)    // save tmst
+
       dfOpt.flatMap { df =>
         val preProcRules = 
PreProcRuleGenerator.genPreProcRules(dcParam.preProc, suffix(ms))
 
@@ -104,7 +106,7 @@ trait DataConnector extends Loggable with Serializable {
         val withTmstDf = outDf.withColumn(tmstColName, lit(ms))
 
         // tmst cache
-        saveTmst(ms)
+//        saveTmst(ms)
 
         // drop temp tables
         cleanData(timeInfo)

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a723f475/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCache.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCache.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCache.scala
index 1a0366d..419b141 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCache.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCache.scala
@@ -129,7 +129,7 @@ trait DataSourceCache extends DataCacheable with Loggable 
with Serializable {
   def readData(): (Option[DataFrame], TimeRange) = {
     // time range: [a, b)
     val timeRange = TimeInfoCache.getTimeRange
-    val reviseTimeRange = (timeRange._1 + deltaTimeRange._1, timeRange._2 + 
deltaTimeRange._2)
+    val reviseTimeRange = (timeRange._1 + deltaTimeRange._1, timeRange._2 + 
deltaTimeRange._2 + 1)
 
     // read partition info
     val filterStr = s"`${InternalColumns.tmst}` >= ${reviseTimeRange._1} AND 
`${InternalColumns.tmst}` < ${reviseTimeRange._2}"
@@ -326,7 +326,7 @@ trait DataSourceCache extends DataCacheable with Loggable 
with Serializable {
     submitLastProcTime(timeRange._2)
 
     // next clean time
-    val nextCleanTime = timeRange._2 + deltaTimeRange._1
+    val nextCleanTime = timeRange._2 + deltaTimeRange._1 + 1
     submitCleanTime(nextCleanTime)
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a723f475/measure/src/main/scala/org/apache/griffin/measure/process/temp/TimeRange.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/process/temp/TimeRange.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/process/temp/TimeRange.scala
index db92533..9e79396 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/process/temp/TimeRange.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/process/temp/TimeRange.scala
@@ -20,10 +20,17 @@ package org.apache.griffin.measure.process.temp
 
 import scala.math.{min, max}
 
-  case class TimeRange(begin: Long, end: Long, tmsts: Set[Long]) extends 
Serializable {
+case class TimeRange(begin: Long, end: Long, tmsts: Set[Long]) extends 
Serializable {
   def merge(tr: TimeRange): TimeRange = {
     TimeRange(min(begin, tr.begin), max(end, tr.end), tmsts ++ tr.tmsts)
   }
+  def beginTmstOpt: Option[Long] = {
+    try {
+      if (tmsts.nonEmpty) Some(tmsts.min) else None
+    } catch {
+      case _: Throwable => None
+    }
+  }
 }
 
 object TimeRange {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a723f475/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala
index 3b4ec31..d07aa02 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala
@@ -24,6 +24,8 @@ import 
org.apache.griffin.measure.rule.dsl.parser.GriffinDslParser
 import org.apache.griffin.measure.rule.plan.{TimeInfo, _}
 import org.apache.griffin.measure.rule.trans._
 
+import scala.util.{Failure, Success}
+
 case class GriffinDslAdaptor(dataSourceNames: Seq[String],
                              functionNames: Seq[String]
                             ) extends RuleAdaptor {
@@ -49,7 +51,13 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String],
         val expr = result.get
         val rulePlanTrans = RulePlanTrans(dqType, dataSourceNames, timeInfo,
           name, expr, param, processType, dsTimeRanges)
-        rulePlanTrans.trans
+        rulePlanTrans.trans match {
+          case Success(rp) => rp
+          case Failure(ex) => {
+            warn(s"translate rule [ ${rule} ] fails: \n${ex.getMessage}")
+            emptyRulePlan
+          }
+        }
       } else {
         warn(s"parse rule [ ${rule} ] fails: \n${result}")
         emptyRulePlan

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a723f475/measure/src/main/scala/org/apache/griffin/measure/rule/trans/AccuracyRulePlanTrans.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/AccuracyRulePlanTrans.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/AccuracyRulePlanTrans.scala
index 2ff8feb..904b087 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/AccuracyRulePlanTrans.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/AccuracyRulePlanTrans.scala
@@ -30,6 +30,8 @@ import org.apache.griffin.measure.utils.ParamUtil._
 import org.apache.griffin.measure.rule.trans.RuleExportFactory._
 import org.apache.griffin.measure.rule.trans.DsUpdateFactory._
 
+import scala.util.Try
+
 case class AccuracyRulePlanTrans(dataSourceNames: Seq[String],
                                  timeInfo: TimeInfo, name: String, expr: Expr,
                                  param: Map[String, Any], procType: ProcessType
@@ -44,7 +46,7 @@ case class AccuracyRulePlanTrans(dataSourceNames: Seq[String],
   }
   import AccuracyKeys._
 
-  def trans(): RulePlan = {
+  def trans(): Try[RulePlan] = Try {
     val details = getDetails(param)
     val sourceName = details.getString(_source, dataSourceNames.head)
     val targetName = details.getString(_target, dataSourceNames.tail.head)

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a723f475/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DistinctnessRulePlanTrans.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DistinctnessRulePlanTrans.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DistinctnessRulePlanTrans.scala
index 0f4e7c4..40a8102 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DistinctnessRulePlanTrans.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DistinctnessRulePlanTrans.scala
@@ -29,6 +29,8 @@ import org.apache.griffin.measure.rule.plan._
 import org.apache.griffin.measure.rule.trans.RuleExportFactory._
 import org.apache.griffin.measure.utils.ParamUtil._
 
+import scala.util.Try
+
 case class DistinctnessRulePlanTrans(dataSourceNames: Seq[String],
                                      timeInfo: TimeInfo, name: String, expr: 
Expr,
                                      param: Map[String, Any], procType: 
ProcessType,
@@ -49,7 +51,7 @@ case class DistinctnessRulePlanTrans(dataSourceNames: 
Seq[String],
   }
   import DistinctnessKeys._
 
-  def trans(): RulePlan = {
+  def trans(): Try[RulePlan] = Try {
     val details = getDetails(param)
     val sourceName = details.getString(_source, dataSourceNames.head)
     val targetName = details.getString(_target, dataSourceNames.tail.head)
@@ -62,6 +64,12 @@ case class DistinctnessRulePlanTrans(dataSourceNames: 
Seq[String],
     val sourceTimeRange = dsTimeRanges.get(sourceName).getOrElse(TimeRange(ct))
     val beginTime = sourceTimeRange.begin
 
+    val beginTmstOpt = dsTimeRanges.get(sourceName).flatMap(_.beginTmstOpt)
+    val beginTmst = beginTmstOpt match {
+      case Some(t) => t
+      case _ => throw new Exception(s"empty begin tmst from ${sourceName}")
+    }
+
     if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) {
       println(s"[${ct}] data source ${sourceName} not exists")
       emptyRulePlan
@@ -93,7 +101,7 @@ case class DistinctnessRulePlanTrans(dataSourceNames: 
Seq[String],
       }
       val totalStep = SparkSqlStep(totalTableName, totalSql, emptyMap)
       val totalMetricParam = 
emptyMap.addIfNotExist(ExportParamKeys._collectType, EntriesCollectType.desc)
-      val totalMetricExport = genMetricExport(totalMetricParam, totalColName, 
totalTableName, beginTime, mode)
+      val totalMetricExport = genMetricExport(totalMetricParam, totalColName, 
totalTableName, beginTmst, mode)
 
       // 3. group by self
       val selfGroupTableName = "__selfGroup"
@@ -188,7 +196,7 @@ case class DistinctnessRulePlanTrans(dataSourceNames: 
Seq[String],
       }
       val distStep = SparkSqlStep(distTableName, distSql, emptyMap)
       val distMetricParam = 
emptyMap.addIfNotExist(ExportParamKeys._collectType, EntriesCollectType.desc)
-      val distMetricExport = genMetricExport(distMetricParam, distColName, 
distTableName, beginTime, mode)
+      val distMetricExport = genMetricExport(distMetricParam, distColName, 
distTableName, beginTmst, mode)
 
       val distMetricRulePlan = RulePlan(distStep :: Nil, distMetricExport :: 
Nil)
 
@@ -208,7 +216,7 @@ case class DistinctnessRulePlanTrans(dataSourceNames: 
Seq[String],
         }
         val dupRecordStep = SparkSqlStep(dupRecordTableName, dupRecordSql, 
emptyMap, true)
         val dupRecordParam = 
RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap)
-        val dupRecordExport = genRecordExport(dupRecordParam, 
dupRecordTableName, dupRecordTableName, beginTime, mode)
+        val dupRecordExport = genRecordExport(dupRecordParam, 
dupRecordTableName, dupRecordTableName, beginTmst, mode)
 
         // 10. duplicate metric
         val dupMetricTableName = "__dupMetric"
@@ -221,7 +229,7 @@ case class DistinctnessRulePlanTrans(dataSourceNames: 
Seq[String],
         }
         val dupMetricStep = SparkSqlStep(dupMetricTableName, dupMetricSql, 
emptyMap)
         val dupMetricParam = 
emptyMap.addIfNotExist(ExportParamKeys._collectType, ArrayCollectType.desc)
-        val dupMetricExport = genMetricExport(dupMetricParam, 
duplicationArrayName, dupMetricTableName, beginTime, mode)
+        val dupMetricExport = genMetricExport(dupMetricParam, 
duplicationArrayName, dupMetricTableName, beginTmst, mode)
 
         RulePlan(dupRecordStep :: dupMetricStep :: Nil, dupRecordExport :: 
dupMetricExport :: Nil)
       } else emptyRulePlan

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a723f475/measure/src/main/scala/org/apache/griffin/measure/rule/trans/ProfilingRulePlanTrans.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/ProfilingRulePlanTrans.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/ProfilingRulePlanTrans.scala
index d9d2d4e..f80f3c1 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/ProfilingRulePlanTrans.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/ProfilingRulePlanTrans.scala
@@ -28,6 +28,8 @@ import org.apache.griffin.measure.rule.plan._
 import org.apache.griffin.measure.rule.trans.RuleExportFactory._
 import org.apache.griffin.measure.utils.ParamUtil._
 
+import scala.util.Try
+
 case class ProfilingRulePlanTrans(dataSourceNames: Seq[String],
                                   timeInfo: TimeInfo, name: String, expr: Expr,
                                   param: Map[String, Any], procType: 
ProcessType
@@ -38,7 +40,7 @@ case class ProfilingRulePlanTrans(dataSourceNames: 
Seq[String],
   }
   import ProfilingKeys._
 
-  def trans(): RulePlan = {
+  def trans(): Try[RulePlan] = Try {
     val details = getDetails(param)
     val profilingClause = expr.asInstanceOf[ProfilingClause]
     val sourceName = profilingClause.fromClauseOpt match {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a723f475/measure/src/main/scala/org/apache/griffin/measure/rule/trans/RulePlanTrans.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/RulePlanTrans.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/RulePlanTrans.scala
index b7226ba..9289053 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/RulePlanTrans.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/RulePlanTrans.scala
@@ -25,18 +25,20 @@ import org.apache.griffin.measure.rule.dsl._
 import org.apache.griffin.measure.rule.dsl.expr.Expr
 import org.apache.griffin.measure.rule.plan._
 
+import scala.util.Try
+
 trait RulePlanTrans extends Loggable with Serializable {
 
   protected val emptyRulePlan = RulePlan(Nil, Nil)
   protected val emptyMap = Map[String, Any]()
 
-  def trans(): RulePlan
+  def trans(): Try[RulePlan]
 
 }
 
 object RulePlanTrans {
   private val emptyRulePlanTrans = new RulePlanTrans {
-    def trans(): RulePlan = emptyRulePlan
+    def trans(): Try[RulePlan] = Try(emptyRulePlan)
   }
 
   def apply(dqType: DqType,
@@ -50,7 +52,7 @@ object RulePlanTrans {
       case ProfilingType => ProfilingRulePlanTrans(dsNames, ti, name, expr, 
param, procType)
       case UniquenessType => UniquenessRulePlanTrans(dsNames, ti, name, expr, 
param, procType)
       case DistinctnessType => DistinctnessRulePlanTrans(dsNames, ti, name, 
expr, param, procType, dsTimeRanges)
-      case TimelinessType => TimelinessRulePlanTrans(dsNames, ti, name, expr, 
param, procType)
+      case TimelinessType => TimelinessRulePlanTrans(dsNames, ti, name, expr, 
param, procType, dsTimeRanges)
       case _ => emptyRulePlanTrans
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a723f475/measure/src/main/scala/org/apache/griffin/measure/rule/trans/TimelinessRulePlanTrans.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/TimelinessRulePlanTrans.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/TimelinessRulePlanTrans.scala
index 9a01553..7e9b8fb 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/TimelinessRulePlanTrans.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/TimelinessRulePlanTrans.scala
@@ -18,7 +18,7 @@ under the License.
 */
 package org.apache.griffin.measure.rule.trans
 
-import org.apache.griffin.measure.process.temp.TableRegisters
+import org.apache.griffin.measure.process.temp.{TableRegisters, TimeRange}
 import org.apache.griffin.measure.process.{BatchProcessType, ExportMode, 
ProcessType, StreamingProcessType}
 import org.apache.griffin.measure.rule.adaptor.RuleParamKeys._
 import org.apache.griffin.measure.rule.adaptor._
@@ -30,9 +30,12 @@ import 
org.apache.griffin.measure.rule.trans.RuleExportFactory._
 import org.apache.griffin.measure.utils.ParamUtil._
 import org.apache.griffin.measure.utils.TimeUtil
 
+import scala.util.Try
+
 case class TimelinessRulePlanTrans(dataSourceNames: Seq[String],
                                    timeInfo: TimeInfo, name: String, expr: 
Expr,
-                                   param: Map[String, Any], procType: 
ProcessType
+                                   param: Map[String, Any], procType: 
ProcessType,
+                                   dsTimeRanges: Map[String, TimeRange]
                                   ) extends RulePlanTrans {
 
   private object TimelinessKeys {
@@ -49,14 +52,20 @@ case class TimelinessRulePlanTrans(dataSourceNames: 
Seq[String],
   }
   import TimelinessKeys._
 
-  def trans(): RulePlan = {
+  def trans(): Try[RulePlan] =  Try {
     val details = getDetails(param)
     val timelinessClause = expr.asInstanceOf[TimelinessClause]
     val sourceName = details.getString(_source, dataSourceNames.head)
 
     val mode = ExportMode.defaultMode(procType)
 
-    val ct = timeInfo.calcTime
+//    val ct = timeInfo.calcTime
+
+    val beginTmstOpt = dsTimeRanges.get(sourceName).flatMap(_.beginTmstOpt)
+    val beginTmst = beginTmstOpt match {
+      case Some(t) => t
+      case _ => throw new Exception(s"empty begin tmst from ${sourceName}")
+    }
 
     if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) {
       emptyRulePlan
@@ -120,7 +129,7 @@ case class TimelinessRulePlanTrans(dataSourceNames: 
Seq[String],
       }
       val metricStep = SparkSqlStep(metricTableName, metricSql, emptyMap)
       val metricParam = RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap)
-      val metricExports = genMetricExport(metricParam, name, metricTableName, 
ct, mode) :: Nil
+      val metricExports = genMetricExport(metricParam, name, metricTableName, 
beginTmst, mode) :: Nil
 
       // current timeliness plan
       val timeSteps = inTimeStep :: latencyStep :: metricStep :: Nil
@@ -136,64 +145,12 @@ case class TimelinessRulePlanTrans(dataSourceNames: 
Seq[String],
           }
           val recordStep = SparkSqlStep(recordTableName, recordSql, emptyMap)
           val recordParam = 
RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap)
-          val recordExports = genRecordExport(recordParam, recordTableName, 
recordTableName, ct, mode) :: Nil
+          val recordExports = genRecordExport(recordParam, recordTableName, 
recordTableName, beginTmst, mode) :: Nil
           RulePlan(recordStep :: Nil, recordExports)
         }
         case _ => emptyRulePlan
       }
 
-// 5. ranges
-//      val rangePlan = details.get(_rangeSplit) match {
-//        case Some(arr: Seq[String]) => {
-//          val ranges = splitTimeRanges(arr)
-//          if (ranges.size > 0) {
-//            try {
-//              // 5.1. range
-//              val rangeTableName = "__range"
-//              val rangeColName = details.getStringOrKey(_range)
-//              val caseClause = {
-//                val whenClause = ranges.map { range =>
-//                  s"WHEN `${latencyColName}` < ${range._1} THEN 
'<${range._2}'"
-//                }.mkString("\n")
-//                s"CASE ${whenClause} ELSE '>=${ranges.last._2}' END AS 
`${rangeColName}`"
-//              }
-//              val rangeSql = {
-//                s"SELECT *, ${caseClause} FROM `${latencyTableName}`"
-//              }
-//              val rangeStep = SparkSqlStep(rangeTableName, rangeSql, 
emptyMap)
-//
-//              // 5.2. range metric
-//              val rangeMetricTableName = "__rangeMetric"
-//              val countColName = details.getStringOrKey(_count)
-//              val rangeMetricSql = procType match {
-//                case BatchProcessType => {
-//                  s"""
-//                     |SELECT `${rangeColName}`, COUNT(*) AS `${countColName}`
-//                     |FROM `${rangeTableName}` GROUP BY `${rangeColName}`
-//                  """.stripMargin
-//                }
-//                case StreamingProcessType => {
-//                  s"""
-//                     |SELECT `${InternalColumns.tmst}`, `${rangeColName}`, 
COUNT(*) AS `${countColName}`
-//                     |FROM `${rangeTableName}` GROUP BY 
`${InternalColumns.tmst}`, `${rangeColName}`
-//                  """.stripMargin
-//                }
-//              }
-//              val rangeMetricStep = SparkSqlStep(rangeMetricTableName, 
rangeMetricSql, emptyMap)
-//              val rangeMetricParam = 
emptyMap.addIfNotExist(ExportParamKeys._collectType, ArrayCollectType.desc)
-//              val rangeMetricExports = genMetricExport(rangeMetricParam, 
rangeColName, rangeMetricTableName, ct, mode) :: Nil
-//
-//              RulePlan(rangeStep :: rangeMetricStep :: Nil, 
rangeMetricExports)
-//            } catch {
-//              case _: Throwable => emptyRulePlan
-//            }
-//          } else emptyRulePlan
-//        }
-//        case _ => emptyRulePlan
-//      }
-
-// return timeliness plan
-
       // 5. ranges
       val rangePlan = TimeUtil.milliseconds(details.getString(_stepSize, "")) 
match {
         case Some(stepSize) => {
@@ -227,7 +184,7 @@ case class TimelinessRulePlanTrans(dataSourceNames: 
Seq[String],
           }
           val rangeMetricStep = SparkSqlStep(rangeMetricTableName, 
rangeMetricSql, emptyMap)
           val rangeMetricParam = 
emptyMap.addIfNotExist(ExportParamKeys._collectType, ArrayCollectType.desc)
-          val rangeMetricExports = genMetricExport(rangeMetricParam, 
stepColName, rangeMetricTableName, ct, mode) :: Nil
+          val rangeMetricExports = genMetricExport(rangeMetricParam, 
stepColName, rangeMetricTableName, beginTmst, mode) :: Nil
 
           RulePlan(rangeStep :: rangeMetricStep :: Nil, rangeMetricExports)
         }
@@ -243,23 +200,15 @@ case class TimelinessRulePlanTrans(dataSourceNames: 
Seq[String],
           val pctName = (pct * 100).toInt.toString
           s"floor(percentile_approx(${latencyColName}, ${pct})) AS 
`${percentileColName}_${pctName}`"
         }.mkString(", ")
-        val percentileSql = procType match {
-          case BatchProcessType => {
-            s"""
-               |SELECT ${percentileCols}
-               |FROM `${latencyTableName}`
-              """.stripMargin
-          }
-          case StreamingProcessType => {
-            s"""
-               |SELECT `${InternalColumns.tmst}`, ${percentileCols}
-               |FROM `${latencyTableName}` GROUP BY `${InternalColumns.tmst}`
-              """.stripMargin
-          }
+        val percentileSql = {
+          s"""
+             |SELECT ${percentileCols}
+             |FROM `${latencyTableName}`
+            """.stripMargin
         }
         val percentileStep = SparkSqlStep(percentileTableName, percentileSql, 
emptyMap)
         val percentileParam = emptyMap
-        val percentielExports = genMetricExport(percentileParam, 
percentileTableName, percentileTableName, ct, mode) :: Nil
+        val percentielExports = genMetricExport(percentileParam, 
percentileTableName, percentileTableName, beginTmst, mode) :: Nil
 
         RulePlan(percentileStep :: Nil, percentielExports)
       } else emptyRulePlan
@@ -269,10 +218,6 @@ case class TimelinessRulePlanTrans(dataSourceNames: 
Seq[String],
   }
 
   private def getPercentiles(details: Map[String, Any]): Seq[Double] = {
-//    details.get(_percentiles) match {
-//      case Some(seq: Seq[Double]) => seq
-//      case _ => Nil
-//    }
     details.getArr[Double](_percentileValues).filter(d => (d >= 0 && d <= 1))
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a723f475/measure/src/main/scala/org/apache/griffin/measure/rule/trans/UniquenessRulePlanTrans.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/UniquenessRulePlanTrans.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/UniquenessRulePlanTrans.scala
index 326d80b..baa5572 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/UniquenessRulePlanTrans.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/UniquenessRulePlanTrans.scala
@@ -29,6 +29,8 @@ import org.apache.griffin.measure.rule.plan._
 import org.apache.griffin.measure.rule.trans.RuleExportFactory._
 import org.apache.griffin.measure.utils.ParamUtil._
 
+import scala.util.Try
+
 case class UniquenessRulePlanTrans(dataSourceNames: Seq[String],
                                    timeInfo: TimeInfo, name: String, expr: 
Expr,
                                    param: Map[String, Any], procType: 
ProcessType
@@ -46,7 +48,7 @@ case class UniquenessRulePlanTrans(dataSourceNames: 
Seq[String],
   }
   import UniquenessKeys._
 
-  def trans(): RulePlan = {
+  def trans(): Try[RulePlan] = Try {
     val details = getDetails(param)
     val sourceName = details.getString(_source, dataSourceNames.head)
     val targetName = details.getString(_target, dataSourceNames.tail.head)

Reply via email to