Repository: incubator-griffin Updated Branches: refs/heads/master 6e03314fa -> 34f06afee
modify partitions to where, and fix bug in parser when parse unquote Author: Lionel Liu <bhlx3l...@163.com> Closes #180 from bhlx3lyx7/docker. Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/34f06afe Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/34f06afe Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/34f06afe Branch: refs/heads/master Commit: 34f06afee6307a0c7042fbe6d8153943d5fa719b Parents: 6e03314 Author: Lionel Liu <bhlx3l...@163.com> Authored: Thu Nov 23 10:31:27 2017 +0800 Committer: Lionel Liu <bhlx3l...@163.com> Committed: Thu Nov 23 10:31:27 2017 +0800 ---------------------------------------------------------------------- .gitignore | 2 + .../batch/HiveBatchDataConnector.scala | 31 +++-- .../measure/process/StreamingDqThread.scala | 6 +- .../process/engine/DataFrameOprEngine.scala | 5 +- .../measure/process/engine/DqEngine.scala | 2 + .../measure/process/engine/DqEngines.scala | 1 - .../measure/process/engine/SparkDqEngine.scala | 120 ++++++++++--------- .../measure/process/engine/SparkSqlEngine.scala | 2 + .../rule/adaptor/GriffinDslAdaptor.scala | 5 +- .../measure/rule/dsl/expr/SelectExpr.scala | 14 ++- .../measure/rule/dsl/parser/BasicParser.scala | 22 +++- .../apache/griffin/measure/utils/TimeUtil.scala | 2 +- measure/src/test/resources/config1.json | 4 +- .../rule/adaptor/GriffinDslAdaptorTest.scala | 2 +- 14 files changed, 123 insertions(+), 95 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/34f06afe/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore index 9270ccc..405d693 100644 --- a/.gitignore +++ b/.gitignore @@ -36,3 +36,5 @@ ui/tmp derby.log metastore_db + +measure/src/test/scala/org/apache/griffin/measure/process/* http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/34f06afe/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/HiveBatchDataConnector.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/HiveBatchDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/HiveBatchDataConnector.scala index 20c9e24..cf51d6c 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/HiveBatchDataConnector.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/HiveBatchDataConnector.scala @@ -41,23 +41,24 @@ case class HiveBatchDataConnector(sqlContext: SQLContext, dqEngines: DqEngines, val Database = "database" val TableName = "table.name" - val Partitions = "partitions" + val Where = "where" val database = config.getString(Database, "default") val tableName = config.getString(TableName, "") - val partitionsString = config.getString(Partitions, "") + val whereString = config.getString(Where, "") val concreteTableName = s"${database}.${tableName}" // val partitions = partitionsString.split(";").map(s => s.split(",").map(_.trim)) - val partitions: Array[Array[String]] = partitionsString.split(";").flatMap { s => - val arr = s.trim.split(",").flatMap { t => - t.trim match { - case p if (p.nonEmpty) => Some(p) - case _ => None - } - } - if (arr.size > 0) Some(arr) else None - } + val wheres = whereString.split(",").map(_.trim).filter(_.nonEmpty) +// val wheres: Array[Array[String]] = whereString.split(",").flatMap { s => +// val arr = s.trim.split(",").flatMap { t => +// t.trim match { +// case p if (p.nonEmpty) => Some(p) +// case _ => None +// } +// } +// if (arr.size > 0) Some(arr) else None +// } def data(ms: Long): Option[DataFrame] = { try { @@ -143,11 +144,9 @@ case class HiveBatchDataConnector(sqlContext: SQLContext, dqEngines: DqEngines, private def dataSql(): String = { val tableClause = s"SELECT * FROM ${concreteTableName}" - val validPartitions = partitions.filter(_.size > 0) - if (validPartitions.size > 0) { - val clauses = validPartitions.map { prtn => - val cls = prtn.mkString(" AND ") - s"${tableClause} WHERE ${cls}" + if (wheres.size > 0) { + val clauses = wheres.map { w => + s"${tableClause} WHERE ${w}" } clauses.mkString(" UNION ALL ") } else tableClause http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/34f06afe/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala b/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala index df1cc1b..c90e572 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala @@ -83,9 +83,9 @@ case class StreamingDqThread(dqEngines: DqEngines, } val lt = new Date().getTime - val collectoRddTimeStr = s"collect records using time: ${lt - rt} ms" - println(collectoRddTimeStr) - appPersist.log(lt, collectoRddTimeStr) + val collectRddTimeStr = s"collect records using time: ${lt - rt} ms" + println(collectRddTimeStr) + appPersist.log(lt, collectRddTimeStr) // persist records dqEngines.persistAllRecords(rdds, persistFactory) http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/34f06afe/measure/src/main/scala/org/apache/griffin/measure/process/engine/DataFrameOprEngine.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DataFrameOprEngine.scala b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DataFrameOprEngine.scala index b409b8d..c3205b5 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DataFrameOprEngine.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DataFrameOprEngine.scala @@ -96,12 +96,13 @@ object DataFrameOprs { val _miss = "miss" val _total = "total" val _matched = "matched" - val _tmst = "tmst" +// val _tmst = "tmst" val dfName = details.getOrElse(_dfName, _dfName).toString val miss = details.getOrElse(_miss, _miss).toString val total = details.getOrElse(_total, _total).toString val matched = details.getOrElse(_matched, _matched).toString - val tmst = details.getOrElse(_tmst, _tmst).toString +// val tmst = details.getOrElse(_tmst, _tmst).toString + val tmst = GroupByColumn.tmst val updateTime = new Date().getTime http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/34f06afe/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala index 84d5917..e28dfa4 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala @@ -31,6 +31,8 @@ trait DqEngine extends Loggable with Serializable { def runRuleStep(ruleStep: ConcreteRuleStep): Boolean + protected def collectable(): Boolean = false + def collectMetrics(ruleStep: ConcreteRuleStep): Map[Long, Map[String, Any]] // def collectRecords(ruleStep: ConcreteRuleStep, timeGroups: Iterable[Long]): Option[RDD[(Long, Iterable[String])]] http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/34f06afe/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala index 1bafa15..1af2ae3 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala @@ -143,7 +143,6 @@ case class DqEngines(engines: Seq[DqEngine]) extends DqEngine { val ret = engines.foldLeft(Map[Long, Map[String, Any]]()) { (ret, engine) => ret ++ engine.collectMetrics(ruleStep) } -// if (ret.isEmpty) warn(s"collect metrics warn: no metrics collected for ${ruleStep}") ret } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/34f06afe/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala b/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala index ee994fd..e8a7b16 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala @@ -31,81 +31,85 @@ trait SparkDqEngine extends DqEngine { val sqlContext: SQLContext def collectMetrics(ruleStep: ConcreteRuleStep): Map[Long, Map[String, Any]] = { - val emptyMap = Map[String, Any]() - ruleStep match { - case step: ConcreteRuleStep if (step.persistType == MetricPersistType) => { - val name = step.name - try { - val pdf = sqlContext.table(s"`${name}`") - val records = pdf.toJSON.collect() + if (collectable) { + val emptyMap = Map[String, Any]() + ruleStep match { + case step: ConcreteRuleStep if (step.persistType == MetricPersistType) => { + val name = step.name + try { + val pdf = sqlContext.table(s"`${name}`") + val records = pdf.toJSON.collect() - val pairs = records.flatMap { rec => - try { - val value = JsonUtil.toAnyMap(rec) - value.get(GroupByColumn.tmst) match { - case Some(t) => { - val key = t.toString.toLong - Some((key, value)) + val pairs = records.flatMap { rec => + try { + val value = JsonUtil.toAnyMap(rec) + value.get(GroupByColumn.tmst) match { + case Some(t) => { + val key = t.toString.toLong + Some((key, value)) + } + case _ => None } - case _ => None + } catch { + case e: Throwable => None } - } catch { - case e: Throwable => None } - } - val groupedPairs = pairs.foldLeft(Map[Long, Seq[Map[String, Any]]]()) { (ret, pair) => - val (k, v) = pair - ret.get(k) match { - case Some(seq) => ret + (k -> (seq :+ v)) - case _ => ret + (k -> (v :: Nil)) + val groupedPairs = pairs.foldLeft(Map[Long, Seq[Map[String, Any]]]()) { (ret, pair) => + val (k, v) = pair + ret.get(k) match { + case Some(seq) => ret + (k -> (seq :+ v)) + case _ => ret + (k -> (v :: Nil)) + } } - } - groupedPairs.mapValues { vs => - if (vs.size > 1) { - Map[String, Any]((name -> vs)) - } else { - vs.headOption.getOrElse(emptyMap) + groupedPairs.mapValues { vs => + if (vs.size > 1) { + Map[String, Any]((name -> vs)) + } else { + vs.headOption.getOrElse(emptyMap) + } + } + } catch { + case e: Throwable => { + error(s"collect metrics ${name} error: ${e.getMessage}") + Map[Long, Map[String, Any]]() } - } - } catch { - case e: Throwable => { - error(s"collect metrics ${name} error: ${e.getMessage}") - Map[Long, Map[String, Any]]() } } + case _ => Map[Long, Map[String, Any]]() } - case _ => Map[Long, Map[String, Any]]() - } + } else Map[Long, Map[String, Any]]() } def collectUpdateRDD(ruleStep: ConcreteRuleStep, timeGroups: Iterable[Long] ): Option[RDD[(Long, Iterable[String])]] = { - ruleStep match { - case step: ConcreteRuleStep if ((step.persistType == RecordPersistType) - || (step.updateDataSource.nonEmpty)) => { - val name = step.name - try { - val pdf = sqlContext.table(s"`${name}`") - val cols = pdf.columns - val rdd = pdf.flatMap { row => - val values = cols.flatMap { col => - Some((col, row.getAs[Any](col))) - }.toMap - values.get(GroupByColumn.tmst) match { - case Some(t: Long) if (timeGroups.exists(_ == t)) => Some((t, JsonUtil.toJson(values))) - case _ => None + if (collectable) { + ruleStep match { + case step: ConcreteRuleStep if ((step.persistType == RecordPersistType) + || (step.updateDataSource.nonEmpty)) => { + val name = step.name + try { + val pdf = sqlContext.table(s"`${name}`") + val cols = pdf.columns + val rdd = pdf.flatMap { row => + val values = cols.flatMap { col => + Some((col, row.getAs[Any](col))) + }.toMap + values.get(GroupByColumn.tmst) match { + case Some(t: Long) if (timeGroups.exists(_ == t)) => Some((t, JsonUtil.toJson(values))) + case _ => None + } + }.groupByKey() + Some(rdd) + } catch { + case e: Throwable => { + error(s"collect records ${name} error: ${e.getMessage}") + None } - }.groupByKey() - Some(rdd) - } catch { - case e: Throwable => { - error(s"collect records ${name} error: ${e.getMessage}") - None } } + case _ => None } - case _ => None - } + } else None } // def collectRecords(ruleStep: ConcreteRuleStep, timeGroups: Iterable[Long]): Option[RDD[(Long, Iterable[String])]] = { http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/34f06afe/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala b/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala index 15df3b5..9c47d77 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala @@ -33,6 +33,8 @@ import org.apache.spark.streaming.StreamingContext case class SparkSqlEngine(sqlContext: SQLContext) extends SparkDqEngine { + override protected def collectable(): Boolean = true + def runRuleStep(ruleStep: ConcreteRuleStep): Boolean = { ruleStep match { case SparkSqlStep(name, rule, _, _, _) => { http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/34f06afe/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala index 8199d80..1e3ecb1 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala @@ -230,7 +230,7 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String], |`${totalTableName}`.`${totalColName}` AS `${totalColName}` |FROM `${totalTableName}` FULL JOIN `${missTableName}` |ON `${totalTableName}`.`${GroupByColumn.tmst}` = `${missTableName}`.`${GroupByColumn.tmst}` - """.stripMargin + """.stripMargin } val accuracyMetricName = resultName(details, AccuracyInfo._Accuracy) val accuracyMetricStep = SparkSqlStep( @@ -250,8 +250,7 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String], ("df.name" -> accuracyMetricName), ("miss" -> missColName), ("total" -> totalColName), - ("matched" -> matchedColName), - ("tmst" -> GroupByColumn.tmst) + ("matched" -> matchedColName) ), resultPersistType(details, AccuracyInfo._Accuracy, MetricPersistType), None http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/34f06afe/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/SelectExpr.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/SelectExpr.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/SelectExpr.scala index d1cc86e..6525c88 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/SelectExpr.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/SelectExpr.scala @@ -30,7 +30,12 @@ case class DataSourceHeadExpr(name: String) extends HeadExpr { case class FieldNameHeadExpr(field: String) extends HeadExpr { def desc: String = field def coalesceDesc: String = desc - override def alias: Option[String] = Some(field) + override def alias: Option[String] = { + val innerField = if (field.startsWith("`") && field.endsWith("`")) { + field.substring(1, field.length - 1) + } else field + Some(innerField) + } } case class ALLSelectHeadExpr() extends HeadExpr { @@ -61,7 +66,12 @@ case class AllFieldsSelectExpr() extends SelectExpr { case class FieldSelectExpr(field: String) extends SelectExpr { def desc: String = s".${field}" def coalesceDesc: String = desc - def alias: Option[String] = Some(field) + override def alias: Option[String] = { + val innerField = if (field.startsWith("`") && field.endsWith("`")) { + field.substring(1, field.length - 1) + } else field + Some(innerField) + } } case class IndexSelectExpr(index: Expr) extends SelectExpr { http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/34f06afe/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/BasicParser.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/BasicParser.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/BasicParser.scala index 1b7c374..6415a02 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/BasicParser.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/BasicParser.scala @@ -159,9 +159,12 @@ trait BasicParser extends JavaTokenParsers with Serializable { import Operator._ object Strings { - def AnyString: Parser[String] = """"(?:[^\"]|\")*"""".r | """'(?:[^']|\')*'""".r - def UQuoteTableFieldName: Parser[String] = """`(?:[^`]|[\\][`])*`""".r - def FieldName: Parser[String] = UQuoteTableFieldName | """[a-zA-Z_]\w*""".r + def innerString(s: String): String = s.substring(1, s.size - 1) + + def AnyString: Parser[String] = """"(?:\"|[^\"])*"""".r | """'(?:\'|[^'])*'""".r + def SimpleTableFieldName: Parser[String] = """[a-zA-Z_]\w*""".r + def UnQuoteTableFieldName: Parser[String] = """`(?:[\\][`]|[^`])*`""".r +// def FieldName: Parser[String] = UnQuoteTableFieldName | SimpleTableFieldName def DataSourceName: Parser[String] = genDataSourceNamesParser(dataSourceNames) def FunctionName: Parser[String] = genFunctionNamesParser(functionNames) @@ -209,14 +212,21 @@ trait BasicParser extends JavaTokenParsers with Serializable { DataSourceHeadExpr(_) } | function ^^ { OtherHeadExpr(_) - } | FieldName ^^ { + } | SimpleTableFieldName ^^ { FieldNameHeadExpr(_) + } | UnQuoteTableFieldName ^^ { s => + FieldNameHeadExpr(innerString(s)) } | ALLSL ^^ { _ => ALLSelectHeadExpr() } def selector: Parser[SelectExpr] = functionSelect | allFieldsSelect | fieldSelect | indexSelect def allFieldsSelect: Parser[AllFieldsSelectExpr] = DOT ~> ALLSL ^^ { _ => AllFieldsSelectExpr() } - def fieldSelect: Parser[FieldSelectExpr] = DOT ~> FieldName ^^ { FieldSelectExpr(_) } + def fieldSelect: Parser[FieldSelectExpr] = DOT ~> ( + SimpleTableFieldName ^^ { + FieldSelectExpr(_) + } | UnQuoteTableFieldName ^^ {s => + FieldSelectExpr(innerString(s)) + }) def indexSelect: Parser[IndexSelectExpr] = LSQBR ~> argument <~ RSQBR ^^ { IndexSelectExpr(_) } def functionSelect: Parser[FunctionSelectExpr] = DOT ~ FunctionName ~ LBR ~ repsep(argument, COMMA) ~ RBR ^^ { case _ ~ name ~ _ ~ args ~ _ => FunctionSelectExpr(name, args) @@ -226,7 +236,7 @@ trait BasicParser extends JavaTokenParsers with Serializable { * -- as alias -- * <as-alias> ::= <as> <field-name> */ - def asAlias: Parser[String] = AS ~> FieldName + def asAlias: Parser[String] = AS ~> (SimpleTableFieldName | UnQuoteTableFieldName ^^ { innerString(_) }) /** * -- math expr -- http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/34f06afe/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala index fe721d2..a8c079b 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala @@ -22,7 +22,7 @@ import scala.util.{Failure, Success, Try} object TimeUtil { - final val TimeRegex = """^([+\-]?\d+)(d|h|m|s|ms)$""".r + final val TimeRegex = """^([+\-]?\d+)(ms|s|m|h|d)$""".r final val PureTimeRegex = """^([+\-]?\d+)$""".r def milliseconds(timeString: String): Option[Long] = { http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/34f06afe/measure/src/test/resources/config1.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/config1.json b/measure/src/test/resources/config1.json index 16c265d..883f4e2 100644 --- a/measure/src/test/resources/config1.json +++ b/measure/src/test/resources/config1.json @@ -9,7 +9,7 @@ "version": "1.2", "config": { "table.name": "rheos_view_event", - "partitions": "dt=20170410, hour=15" + "where": "dt=20170410 AND hour=15" } }, @@ -18,7 +18,7 @@ "version": "1.2", "config": { "table.name": "be_view_event_queue", - "partitions": "dt=20170410, hour=15; dt=20170410, hour=16" + "where": "dt=20170410 AND hour=15, dt=20170410 AND hour=16" } }, http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/34f06afe/measure/src/test/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptorTest.scala ---------------------------------------------------------------------- diff --git a/measure/src/test/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptorTest.scala b/measure/src/test/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptorTest.scala index 8097964..4d51691 100644 --- a/measure/src/test/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptorTest.scala +++ b/measure/src/test/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptorTest.scala @@ -36,7 +36,7 @@ class GriffinDslAdaptorTest extends FunSuite with Matchers with BeforeAndAfter w |{ | "dsl.type": "griffin-dsl", | "dq.type": "profiling", - | "rule": "source.age, source.age.count(), (source.user_id.COUNT() + 1s) as cnt group by source.age having source.desc.count() > 5 or false order by user_id desc, user_name asc limit 5", + | "rule": "source.age, source.`age`.count(), (source.user_id.COUNT() + 1s) as cnt group by source.age having source.desc.count() > 5 or false order by user_id desc, user_name asc limit 5", | "details": { | "source": "source", | "profiling": {