Repository: incubator-griffin Updated Branches: refs/heads/master b83c58706 -> a723f4753
Get data from data source cache in streaming mode, fix bug of time range Author: Lionel Liu <bhlx3l...@163.com> Closes #207 from bhlx3lyx7/tmst. Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/a723f475 Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/a723f475 Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/a723f475 Branch: refs/heads/master Commit: a723f4753ef187950aff5b2472cafca7aa9df71a Parents: b83c587 Author: Lionel Liu <bhlx3l...@163.com> Authored: Fri Feb 2 18:20:30 2018 +0800 Committer: Lionel Liu <bhlx3l...@163.com> Committed: Fri Feb 2 18:20:30 2018 +0800 ---------------------------------------------------------------------- .../measure/cache/info/TimeInfoCache.scala | 33 ++++--- .../measure/data/connector/DataConnector.scala | 4 +- .../data/source/cache/DataSourceCache.scala | 4 +- .../measure/process/temp/TimeRange.scala | 9 +- .../rule/adaptor/GriffinDslAdaptor.scala | 10 +- .../rule/trans/AccuracyRulePlanTrans.scala | 4 +- .../rule/trans/DistinctnessRulePlanTrans.scala | 18 +++- .../rule/trans/ProfilingRulePlanTrans.scala | 4 +- .../measure/rule/trans/RulePlanTrans.scala | 8 +- .../rule/trans/TimelinessRulePlanTrans.scala | 99 +++++--------------- .../rule/trans/UniquenessRulePlanTrans.scala | 4 +- 11 files changed, 92 insertions(+), 105 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a723f475/measure/src/main/scala/org/apache/griffin/measure/cache/info/TimeInfoCache.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/cache/info/TimeInfoCache.scala b/measure/src/main/scala/org/apache/griffin/measure/cache/info/TimeInfoCache.scala index efd12b9..c976453 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/cache/info/TimeInfoCache.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/cache/info/TimeInfoCache.scala @@ -62,33 +62,42 @@ object TimeInfoCache extends Loggable with Serializable { val subPath = InfoCacheInstance.listKeys(infoPath) val keys = subPath.map { p => s"${infoPath}/${p}/${ReadyTime}" } val result = InfoCacheInstance.readInfo(keys) - val time = keys.flatMap { k => + val times = keys.flatMap { k => getLongOpt(result, k) - }.min - val map = Map[String, String]((finalReadyTime -> time.toString)) - InfoCacheInstance.cacheInfo(map) + } + if (times.nonEmpty) { + val time = times.min + val map = Map[String, String]((finalReadyTime -> time.toString)) + InfoCacheInstance.cacheInfo(map) + } } private def genFinalLastProcTime(): Unit = { val subPath = InfoCacheInstance.listKeys(infoPath) val keys = subPath.map { p => s"${infoPath}/${p}/${LastProcTime}" } val result = InfoCacheInstance.readInfo(keys) - val time = keys.flatMap { k => + val times = keys.flatMap { k => getLongOpt(result, k) - }.min - val map = Map[String, String]((finalLastProcTime -> time.toString)) - InfoCacheInstance.cacheInfo(map) + } + if (times.nonEmpty) { + val time = times.min + val map = Map[String, String]((finalLastProcTime -> time.toString)) + InfoCacheInstance.cacheInfo(map) + } } private def genFinalCleanTime(): Unit = { val subPath = InfoCacheInstance.listKeys(infoPath) val keys = subPath.map { p => s"${infoPath}/${p}/${CleanTime}" } val result = InfoCacheInstance.readInfo(keys) - val time = keys.flatMap { k => + val times = keys.flatMap { k => getLongOpt(result, k) - }.min - val map = Map[String, String]((finalCleanTime -> time.toString)) - InfoCacheInstance.cacheInfo(map) + } + if (times.nonEmpty) { + val time = times.min + val map = Map[String, String]((finalCleanTime -> time.toString)) + InfoCacheInstance.cacheInfo(map) + } } private def readTimeRange(): (Long, Long) = { http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a723f475/measure/src/main/scala/org/apache/griffin/measure/data/connector/DataConnector.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/DataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/DataConnector.scala index 1cf3f32..b858991 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/data/connector/DataConnector.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/data/connector/DataConnector.scala @@ -65,6 +65,8 @@ trait DataConnector extends Loggable with Serializable { val thisTable = thisName(ms) try { + saveTmst(ms) // save tmst + dfOpt.flatMap { df => val preProcRules = PreProcRuleGenerator.genPreProcRules(dcParam.preProc, suffix(ms)) @@ -104,7 +106,7 @@ trait DataConnector extends Loggable with Serializable { val withTmstDf = outDf.withColumn(tmstColName, lit(ms)) // tmst cache - saveTmst(ms) +// saveTmst(ms) // drop temp tables cleanData(timeInfo) http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a723f475/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCache.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCache.scala b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCache.scala index 1a0366d..419b141 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCache.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCache.scala @@ -129,7 +129,7 @@ trait DataSourceCache extends DataCacheable with Loggable with Serializable { def readData(): (Option[DataFrame], TimeRange) = { // time range: [a, b) val timeRange = TimeInfoCache.getTimeRange - val reviseTimeRange = (timeRange._1 + deltaTimeRange._1, timeRange._2 + deltaTimeRange._2) + val reviseTimeRange = (timeRange._1 + deltaTimeRange._1, timeRange._2 + deltaTimeRange._2 + 1) // read partition info val filterStr = s"`${InternalColumns.tmst}` >= ${reviseTimeRange._1} AND `${InternalColumns.tmst}` < ${reviseTimeRange._2}" @@ -326,7 +326,7 @@ trait DataSourceCache extends DataCacheable with Loggable with Serializable { submitLastProcTime(timeRange._2) // next clean time - val nextCleanTime = timeRange._2 + deltaTimeRange._1 + val nextCleanTime = timeRange._2 + deltaTimeRange._1 + 1 submitCleanTime(nextCleanTime) } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a723f475/measure/src/main/scala/org/apache/griffin/measure/process/temp/TimeRange.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/temp/TimeRange.scala b/measure/src/main/scala/org/apache/griffin/measure/process/temp/TimeRange.scala index db92533..9e79396 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/process/temp/TimeRange.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/process/temp/TimeRange.scala @@ -20,10 +20,17 @@ package org.apache.griffin.measure.process.temp import scala.math.{min, max} - case class TimeRange(begin: Long, end: Long, tmsts: Set[Long]) extends Serializable { +case class TimeRange(begin: Long, end: Long, tmsts: Set[Long]) extends Serializable { def merge(tr: TimeRange): TimeRange = { TimeRange(min(begin, tr.begin), max(end, tr.end), tmsts ++ tr.tmsts) } + def beginTmstOpt: Option[Long] = { + try { + if (tmsts.nonEmpty) Some(tmsts.min) else None + } catch { + case _: Throwable => None + } + } } object TimeRange { http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a723f475/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala index 3b4ec31..d07aa02 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala @@ -24,6 +24,8 @@ import org.apache.griffin.measure.rule.dsl.parser.GriffinDslParser import org.apache.griffin.measure.rule.plan.{TimeInfo, _} import org.apache.griffin.measure.rule.trans._ +import scala.util.{Failure, Success} + case class GriffinDslAdaptor(dataSourceNames: Seq[String], functionNames: Seq[String] ) extends RuleAdaptor { @@ -49,7 +51,13 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String], val expr = result.get val rulePlanTrans = RulePlanTrans(dqType, dataSourceNames, timeInfo, name, expr, param, processType, dsTimeRanges) - rulePlanTrans.trans + rulePlanTrans.trans match { + case Success(rp) => rp + case Failure(ex) => { + warn(s"translate rule [ ${rule} ] fails: \n${ex.getMessage}") + emptyRulePlan + } + } } else { warn(s"parse rule [ ${rule} ] fails: \n${result}") emptyRulePlan http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a723f475/measure/src/main/scala/org/apache/griffin/measure/rule/trans/AccuracyRulePlanTrans.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/AccuracyRulePlanTrans.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/AccuracyRulePlanTrans.scala index 2ff8feb..904b087 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/AccuracyRulePlanTrans.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/AccuracyRulePlanTrans.scala @@ -30,6 +30,8 @@ import org.apache.griffin.measure.utils.ParamUtil._ import org.apache.griffin.measure.rule.trans.RuleExportFactory._ import org.apache.griffin.measure.rule.trans.DsUpdateFactory._ +import scala.util.Try + case class AccuracyRulePlanTrans(dataSourceNames: Seq[String], timeInfo: TimeInfo, name: String, expr: Expr, param: Map[String, Any], procType: ProcessType @@ -44,7 +46,7 @@ case class AccuracyRulePlanTrans(dataSourceNames: Seq[String], } import AccuracyKeys._ - def trans(): RulePlan = { + def trans(): Try[RulePlan] = Try { val details = getDetails(param) val sourceName = details.getString(_source, dataSourceNames.head) val targetName = details.getString(_target, dataSourceNames.tail.head) http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a723f475/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DistinctnessRulePlanTrans.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DistinctnessRulePlanTrans.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DistinctnessRulePlanTrans.scala index 0f4e7c4..40a8102 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DistinctnessRulePlanTrans.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DistinctnessRulePlanTrans.scala @@ -29,6 +29,8 @@ import org.apache.griffin.measure.rule.plan._ import org.apache.griffin.measure.rule.trans.RuleExportFactory._ import org.apache.griffin.measure.utils.ParamUtil._ +import scala.util.Try + case class DistinctnessRulePlanTrans(dataSourceNames: Seq[String], timeInfo: TimeInfo, name: String, expr: Expr, param: Map[String, Any], procType: ProcessType, @@ -49,7 +51,7 @@ case class DistinctnessRulePlanTrans(dataSourceNames: Seq[String], } import DistinctnessKeys._ - def trans(): RulePlan = { + def trans(): Try[RulePlan] = Try { val details = getDetails(param) val sourceName = details.getString(_source, dataSourceNames.head) val targetName = details.getString(_target, dataSourceNames.tail.head) @@ -62,6 +64,12 @@ case class DistinctnessRulePlanTrans(dataSourceNames: Seq[String], val sourceTimeRange = dsTimeRanges.get(sourceName).getOrElse(TimeRange(ct)) val beginTime = sourceTimeRange.begin + val beginTmstOpt = dsTimeRanges.get(sourceName).flatMap(_.beginTmstOpt) + val beginTmst = beginTmstOpt match { + case Some(t) => t + case _ => throw new Exception(s"empty begin tmst from ${sourceName}") + } + if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) { println(s"[${ct}] data source ${sourceName} not exists") emptyRulePlan @@ -93,7 +101,7 @@ case class DistinctnessRulePlanTrans(dataSourceNames: Seq[String], } val totalStep = SparkSqlStep(totalTableName, totalSql, emptyMap) val totalMetricParam = emptyMap.addIfNotExist(ExportParamKeys._collectType, EntriesCollectType.desc) - val totalMetricExport = genMetricExport(totalMetricParam, totalColName, totalTableName, beginTime, mode) + val totalMetricExport = genMetricExport(totalMetricParam, totalColName, totalTableName, beginTmst, mode) // 3. group by self val selfGroupTableName = "__selfGroup" @@ -188,7 +196,7 @@ case class DistinctnessRulePlanTrans(dataSourceNames: Seq[String], } val distStep = SparkSqlStep(distTableName, distSql, emptyMap) val distMetricParam = emptyMap.addIfNotExist(ExportParamKeys._collectType, EntriesCollectType.desc) - val distMetricExport = genMetricExport(distMetricParam, distColName, distTableName, beginTime, mode) + val distMetricExport = genMetricExport(distMetricParam, distColName, distTableName, beginTmst, mode) val distMetricRulePlan = RulePlan(distStep :: Nil, distMetricExport :: Nil) @@ -208,7 +216,7 @@ case class DistinctnessRulePlanTrans(dataSourceNames: Seq[String], } val dupRecordStep = SparkSqlStep(dupRecordTableName, dupRecordSql, emptyMap, true) val dupRecordParam = RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap) - val dupRecordExport = genRecordExport(dupRecordParam, dupRecordTableName, dupRecordTableName, beginTime, mode) + val dupRecordExport = genRecordExport(dupRecordParam, dupRecordTableName, dupRecordTableName, beginTmst, mode) // 10. duplicate metric val dupMetricTableName = "__dupMetric" @@ -221,7 +229,7 @@ case class DistinctnessRulePlanTrans(dataSourceNames: Seq[String], } val dupMetricStep = SparkSqlStep(dupMetricTableName, dupMetricSql, emptyMap) val dupMetricParam = emptyMap.addIfNotExist(ExportParamKeys._collectType, ArrayCollectType.desc) - val dupMetricExport = genMetricExport(dupMetricParam, duplicationArrayName, dupMetricTableName, beginTime, mode) + val dupMetricExport = genMetricExport(dupMetricParam, duplicationArrayName, dupMetricTableName, beginTmst, mode) RulePlan(dupRecordStep :: dupMetricStep :: Nil, dupRecordExport :: dupMetricExport :: Nil) } else emptyRulePlan http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a723f475/measure/src/main/scala/org/apache/griffin/measure/rule/trans/ProfilingRulePlanTrans.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/ProfilingRulePlanTrans.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/ProfilingRulePlanTrans.scala index d9d2d4e..f80f3c1 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/ProfilingRulePlanTrans.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/ProfilingRulePlanTrans.scala @@ -28,6 +28,8 @@ import org.apache.griffin.measure.rule.plan._ import org.apache.griffin.measure.rule.trans.RuleExportFactory._ import org.apache.griffin.measure.utils.ParamUtil._ +import scala.util.Try + case class ProfilingRulePlanTrans(dataSourceNames: Seq[String], timeInfo: TimeInfo, name: String, expr: Expr, param: Map[String, Any], procType: ProcessType @@ -38,7 +40,7 @@ case class ProfilingRulePlanTrans(dataSourceNames: Seq[String], } import ProfilingKeys._ - def trans(): RulePlan = { + def trans(): Try[RulePlan] = Try { val details = getDetails(param) val profilingClause = expr.asInstanceOf[ProfilingClause] val sourceName = profilingClause.fromClauseOpt match { http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a723f475/measure/src/main/scala/org/apache/griffin/measure/rule/trans/RulePlanTrans.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/RulePlanTrans.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/RulePlanTrans.scala index b7226ba..9289053 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/RulePlanTrans.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/RulePlanTrans.scala @@ -25,18 +25,20 @@ import org.apache.griffin.measure.rule.dsl._ import org.apache.griffin.measure.rule.dsl.expr.Expr import org.apache.griffin.measure.rule.plan._ +import scala.util.Try + trait RulePlanTrans extends Loggable with Serializable { protected val emptyRulePlan = RulePlan(Nil, Nil) protected val emptyMap = Map[String, Any]() - def trans(): RulePlan + def trans(): Try[RulePlan] } object RulePlanTrans { private val emptyRulePlanTrans = new RulePlanTrans { - def trans(): RulePlan = emptyRulePlan + def trans(): Try[RulePlan] = Try(emptyRulePlan) } def apply(dqType: DqType, @@ -50,7 +52,7 @@ object RulePlanTrans { case ProfilingType => ProfilingRulePlanTrans(dsNames, ti, name, expr, param, procType) case UniquenessType => UniquenessRulePlanTrans(dsNames, ti, name, expr, param, procType) case DistinctnessType => DistinctnessRulePlanTrans(dsNames, ti, name, expr, param, procType, dsTimeRanges) - case TimelinessType => TimelinessRulePlanTrans(dsNames, ti, name, expr, param, procType) + case TimelinessType => TimelinessRulePlanTrans(dsNames, ti, name, expr, param, procType, dsTimeRanges) case _ => emptyRulePlanTrans } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a723f475/measure/src/main/scala/org/apache/griffin/measure/rule/trans/TimelinessRulePlanTrans.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/TimelinessRulePlanTrans.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/TimelinessRulePlanTrans.scala index 9a01553..7e9b8fb 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/TimelinessRulePlanTrans.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/TimelinessRulePlanTrans.scala @@ -18,7 +18,7 @@ under the License. */ package org.apache.griffin.measure.rule.trans -import org.apache.griffin.measure.process.temp.TableRegisters +import org.apache.griffin.measure.process.temp.{TableRegisters, TimeRange} import org.apache.griffin.measure.process.{BatchProcessType, ExportMode, ProcessType, StreamingProcessType} import org.apache.griffin.measure.rule.adaptor.RuleParamKeys._ import org.apache.griffin.measure.rule.adaptor._ @@ -30,9 +30,12 @@ import org.apache.griffin.measure.rule.trans.RuleExportFactory._ import org.apache.griffin.measure.utils.ParamUtil._ import org.apache.griffin.measure.utils.TimeUtil +import scala.util.Try + case class TimelinessRulePlanTrans(dataSourceNames: Seq[String], timeInfo: TimeInfo, name: String, expr: Expr, - param: Map[String, Any], procType: ProcessType + param: Map[String, Any], procType: ProcessType, + dsTimeRanges: Map[String, TimeRange] ) extends RulePlanTrans { private object TimelinessKeys { @@ -49,14 +52,20 @@ case class TimelinessRulePlanTrans(dataSourceNames: Seq[String], } import TimelinessKeys._ - def trans(): RulePlan = { + def trans(): Try[RulePlan] = Try { val details = getDetails(param) val timelinessClause = expr.asInstanceOf[TimelinessClause] val sourceName = details.getString(_source, dataSourceNames.head) val mode = ExportMode.defaultMode(procType) - val ct = timeInfo.calcTime +// val ct = timeInfo.calcTime + + val beginTmstOpt = dsTimeRanges.get(sourceName).flatMap(_.beginTmstOpt) + val beginTmst = beginTmstOpt match { + case Some(t) => t + case _ => throw new Exception(s"empty begin tmst from ${sourceName}") + } if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) { emptyRulePlan @@ -120,7 +129,7 @@ case class TimelinessRulePlanTrans(dataSourceNames: Seq[String], } val metricStep = SparkSqlStep(metricTableName, metricSql, emptyMap) val metricParam = RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap) - val metricExports = genMetricExport(metricParam, name, metricTableName, ct, mode) :: Nil + val metricExports = genMetricExport(metricParam, name, metricTableName, beginTmst, mode) :: Nil // current timeliness plan val timeSteps = inTimeStep :: latencyStep :: metricStep :: Nil @@ -136,64 +145,12 @@ case class TimelinessRulePlanTrans(dataSourceNames: Seq[String], } val recordStep = SparkSqlStep(recordTableName, recordSql, emptyMap) val recordParam = RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap) - val recordExports = genRecordExport(recordParam, recordTableName, recordTableName, ct, mode) :: Nil + val recordExports = genRecordExport(recordParam, recordTableName, recordTableName, beginTmst, mode) :: Nil RulePlan(recordStep :: Nil, recordExports) } case _ => emptyRulePlan } -// 5. ranges -// val rangePlan = details.get(_rangeSplit) match { -// case Some(arr: Seq[String]) => { -// val ranges = splitTimeRanges(arr) -// if (ranges.size > 0) { -// try { -// // 5.1. range -// val rangeTableName = "__range" -// val rangeColName = details.getStringOrKey(_range) -// val caseClause = { -// val whenClause = ranges.map { range => -// s"WHEN `${latencyColName}` < ${range._1} THEN '<${range._2}'" -// }.mkString("\n") -// s"CASE ${whenClause} ELSE '>=${ranges.last._2}' END AS `${rangeColName}`" -// } -// val rangeSql = { -// s"SELECT *, ${caseClause} FROM `${latencyTableName}`" -// } -// val rangeStep = SparkSqlStep(rangeTableName, rangeSql, emptyMap) -// -// // 5.2. range metric -// val rangeMetricTableName = "__rangeMetric" -// val countColName = details.getStringOrKey(_count) -// val rangeMetricSql = procType match { -// case BatchProcessType => { -// s""" -// |SELECT `${rangeColName}`, COUNT(*) AS `${countColName}` -// |FROM `${rangeTableName}` GROUP BY `${rangeColName}` -// """.stripMargin -// } -// case StreamingProcessType => { -// s""" -// |SELECT `${InternalColumns.tmst}`, `${rangeColName}`, COUNT(*) AS `${countColName}` -// |FROM `${rangeTableName}` GROUP BY `${InternalColumns.tmst}`, `${rangeColName}` -// """.stripMargin -// } -// } -// val rangeMetricStep = SparkSqlStep(rangeMetricTableName, rangeMetricSql, emptyMap) -// val rangeMetricParam = emptyMap.addIfNotExist(ExportParamKeys._collectType, ArrayCollectType.desc) -// val rangeMetricExports = genMetricExport(rangeMetricParam, rangeColName, rangeMetricTableName, ct, mode) :: Nil -// -// RulePlan(rangeStep :: rangeMetricStep :: Nil, rangeMetricExports) -// } catch { -// case _: Throwable => emptyRulePlan -// } -// } else emptyRulePlan -// } -// case _ => emptyRulePlan -// } - -// return timeliness plan - // 5. ranges val rangePlan = TimeUtil.milliseconds(details.getString(_stepSize, "")) match { case Some(stepSize) => { @@ -227,7 +184,7 @@ case class TimelinessRulePlanTrans(dataSourceNames: Seq[String], } val rangeMetricStep = SparkSqlStep(rangeMetricTableName, rangeMetricSql, emptyMap) val rangeMetricParam = emptyMap.addIfNotExist(ExportParamKeys._collectType, ArrayCollectType.desc) - val rangeMetricExports = genMetricExport(rangeMetricParam, stepColName, rangeMetricTableName, ct, mode) :: Nil + val rangeMetricExports = genMetricExport(rangeMetricParam, stepColName, rangeMetricTableName, beginTmst, mode) :: Nil RulePlan(rangeStep :: rangeMetricStep :: Nil, rangeMetricExports) } @@ -243,23 +200,15 @@ case class TimelinessRulePlanTrans(dataSourceNames: Seq[String], val pctName = (pct * 100).toInt.toString s"floor(percentile_approx(${latencyColName}, ${pct})) AS `${percentileColName}_${pctName}`" }.mkString(", ") - val percentileSql = procType match { - case BatchProcessType => { - s""" - |SELECT ${percentileCols} - |FROM `${latencyTableName}` - """.stripMargin - } - case StreamingProcessType => { - s""" - |SELECT `${InternalColumns.tmst}`, ${percentileCols} - |FROM `${latencyTableName}` GROUP BY `${InternalColumns.tmst}` - """.stripMargin - } + val percentileSql = { + s""" + |SELECT ${percentileCols} + |FROM `${latencyTableName}` + """.stripMargin } val percentileStep = SparkSqlStep(percentileTableName, percentileSql, emptyMap) val percentileParam = emptyMap - val percentielExports = genMetricExport(percentileParam, percentileTableName, percentileTableName, ct, mode) :: Nil + val percentielExports = genMetricExport(percentileParam, percentileTableName, percentileTableName, beginTmst, mode) :: Nil RulePlan(percentileStep :: Nil, percentielExports) } else emptyRulePlan @@ -269,10 +218,6 @@ case class TimelinessRulePlanTrans(dataSourceNames: Seq[String], } private def getPercentiles(details: Map[String, Any]): Seq[Double] = { -// details.get(_percentiles) match { -// case Some(seq: Seq[Double]) => seq -// case _ => Nil -// } details.getArr[Double](_percentileValues).filter(d => (d >= 0 && d <= 1)) } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a723f475/measure/src/main/scala/org/apache/griffin/measure/rule/trans/UniquenessRulePlanTrans.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/UniquenessRulePlanTrans.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/UniquenessRulePlanTrans.scala index 326d80b..baa5572 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/UniquenessRulePlanTrans.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/UniquenessRulePlanTrans.scala @@ -29,6 +29,8 @@ import org.apache.griffin.measure.rule.plan._ import org.apache.griffin.measure.rule.trans.RuleExportFactory._ import org.apache.griffin.measure.utils.ParamUtil._ +import scala.util.Try + case class UniquenessRulePlanTrans(dataSourceNames: Seq[String], timeInfo: TimeInfo, name: String, expr: Expr, param: Map[String, Any], procType: ProcessType @@ -46,7 +48,7 @@ case class UniquenessRulePlanTrans(dataSourceNames: Seq[String], } import UniquenessKeys._ - def trans(): RulePlan = { + def trans(): Try[RulePlan] = Try { val details = getDetails(param) val sourceName = details.getString(_source, dataSourceNames.head) val targetName = details.getString(_target, dataSourceNames.tail.head)