Github user chemikadze commented on a diff in the pull request: https://github.com/apache/incubator-griffin/pull/434#discussion_r224151599 --- Diff: measure/src/test/scala/org/apache/griffin/measure/transformations/AccuracyTransformationsIntegrationTest.scala --- @@ -0,0 +1,169 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.transformations + +import com.holdenkarau.spark.testing.DataFrameSuiteBase +import org.apache.spark.sql.DataFrame +import org.scalatest._ + +import org.apache.griffin.measure.configuration.dqdefinition._ +import org.apache.griffin.measure.configuration.enums.BatchProcessType +import org.apache.griffin.measure.context.{ContextId, DQContext} +import org.apache.griffin.measure.datasource.DataSourceFactory +import org.apache.griffin.measure.job.builder.DQJobBuilder + +case class AccuracyResult(total: Long, miss: Long, matched: Long, matchedFraction: Double) + +class AccuracyTransformationsIntegrationTest extends FlatSpec with Matchers with DataFrameSuiteBase { + import spark.implicits._ + + private val EMPTY_PERSON_TABLE = "empty_person" + private val PERSON_TABLE = "person" + + override def beforeAll(): Unit = { + super.beforeAll() + + createPersonTable() + createEmptyPersonTable() + + spark.conf.set("spark.sql.crossJoin.enabled", "true") + } + + "accuracy" should "basically work" in { + checkAccuracy( + sourceName = PERSON_TABLE, + targetName = PERSON_TABLE, + expectedResult = AccuracyResult(total = 2, miss = 0, matched = 2, matchedFraction = 1.0)) + } + + "accuracy" should "work with empty target" in { + checkAccuracy( + sourceName = PERSON_TABLE, + targetName = EMPTY_PERSON_TABLE, + expectedResult = AccuracyResult(total = 2, miss = 2, matched = 0, matchedFraction = 0.0) + ) + } + + "accuracy" should "work with empty source" in { + checkAccuracy( + sourceName = EMPTY_PERSON_TABLE, + targetName = PERSON_TABLE, + expectedResult = AccuracyResult(total = 0, miss = 0, matched = 0, matchedFraction = 1.0)) + } + + "accuracy" should "work with empty source and target" in { + checkAccuracy( + sourceName = EMPTY_PERSON_TABLE, + targetName = EMPTY_PERSON_TABLE, + expectedResult = AccuracyResult(total = 0, miss = 0, matched = 0, matchedFraction = 1.0)) + } + + private def checkAccuracy(sourceName: String, targetName: String, expectedResult: AccuracyResult) = { + val dqContext: DQContext = getDqContext( + dataSourcesParam = List( + DataSourceParam( + name = "source", + connectors = List(dataConnectorParam(tableName = sourceName)) + ), + DataSourceParam( + name = "target", + connectors = List(dataConnectorParam(tableName = targetName)) + ) + )) + + val accuracyRule = RuleParam( + dslType = "griffin-dsl", + dqType = "ACCURACY", + outDfName = "person_accuracy", + rule = "source.name = target.name" + ) + + val res = getRuleResults(dqContext, accuracyRule) + .as[AccuracyResult] + .collect() + + res.length shouldBe 1 + + res(0) shouldEqual expectedResult + } + + private def getRuleResults(dqContext: DQContext, rule: RuleParam): DataFrame = { + val dqJob = DQJobBuilder.buildDQJob( + dqContext, + evaluateRuleParam = EvaluateRuleParam(List(rule)) + ) + + dqJob.execute(dqContext) + + spark.sql(s"select * from ${rule.getOutDfName()}") + } + + private def createPersonTable(): Unit = { + val personCsvPath = getClass.getResource("/hive/person_table.csv").getFile + + spark.sql( + s"CREATE TABLE IF NOT EXISTS ${PERSON_TABLE} " + + "( " + + " name String," + + " age int " + + ") " + + "ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' " + + "STORED AS TEXTFILE" + ) + + spark.sql(s"LOAD DATA LOCAL INPATH '$personCsvPath' OVERWRITE INTO TABLE ${PERSON_TABLE}") + } + + private def createEmptyPersonTable(): Unit = { + spark.sql( + s"CREATE TABLE IF NOT EXISTS ${EMPTY_PERSON_TABLE} " + + "( " + + " name String," + + " age int " + + ") " + + "ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' " + + "STORED AS TEXTFILE" + ) + + spark.sql(s"select * from ${EMPTY_PERSON_TABLE}").show() --- End diff -- assert on emptiness would be useful, what do you think?
---