Repository: incubator-carbondata Updated Branches: refs/heads/master 526243b09 -> f5ecfbf5c
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/462f6422/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/CarbonFunSuite.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/CarbonFunSuite.scala b/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/CarbonFunSuite.scala new file mode 100644 index 0000000..4647e78 --- /dev/null +++ b/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/CarbonFunSuite.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.common.util + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.scalatest.{FunSuite, Outcome} + + +private[spark] abstract class CarbonFunSuite extends FunSuite { + + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + /** + * Log the suite name and the test name before and after each test. + * + * Subclasses should never override this method. If they wish to run + * custom code before and after each test, they should should mix in + * the {{org.scalatest.BeforeAndAfter}} trait instead. + */ + final protected override def withFixture(test: NoArgTest): Outcome = { + val testName = test.text + val suiteName = this.getClass.getName + val shortSuiteName = suiteName.replaceAll("org.apache.spark", "o.a.s") + try { + LOGGER.info(s"\n\n===== TEST OUTPUT FOR $shortSuiteName: '$testName' =====\n") + test() + } finally { + LOGGER.info(s"\n\n===== FINISHED $shortSuiteName: '$testName' =====\n") + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/462f6422/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/CarbonSessionTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/CarbonSessionTest.scala b/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/CarbonSessionTest.scala new file mode 100644 index 0000000..d29196e --- /dev/null +++ b/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/CarbonSessionTest.scala @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.common.util + +import java.io.File + +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.commons.io.FileUtils + +object CarbonSessionTest extends{ + + val rootPath = new File(this.getClass.getResource("/").getPath + + "../../../..").getCanonicalPath + val storeLocation = s"$rootPath/examples/spark2/target/store" + val warehouse = s"$rootPath/examples/spark2/target/warehouse" + val metastoredb = s"$rootPath/examples/spark2/target/metastore_db" + + val spark = { + + // clean data folder + if (true) { + val clean = (path: String) => FileUtils.deleteDirectory(new File(path)) + clean(storeLocation) + clean(warehouse) + clean(metastoredb) + } + + val spark = SparkSession + .builder() + .master("local") + .appName("CarbonExample") + .enableHiveSupport() + .config("spark.sql.warehouse.dir", warehouse) + .config("javax.jdo.option.ConnectionURL", + s"jdbc:derby:;databaseName=$metastoredb;create=true") + .getOrCreate() + + CarbonProperties.getInstance() + .addProperty("carbon.kettle.home", s"$rootPath/processing/carbonplugins") + .addProperty("carbon.storelocation", storeLocation) + + spark.sparkContext.setLogLevel("WARN") + + spark + } + + val sc = spark.sparkContext + + lazy val implicits = spark.implicits + + def sql(sqlText: String): DataFrame = spark.sql(sqlText) + +} + + http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/462f6422/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/PlanTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/PlanTest.scala b/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/PlanTest.scala new file mode 100644 index 0000000..cdd415f --- /dev/null +++ b/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/PlanTest.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.common.util + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, OneRowRelation} +import org.apache.spark.sql.catalyst.util._ + +/** + * Provides helper methods for comparing plans. + */ +class PlanTest extends CarbonFunSuite { + + /** Fails the test if the two expressions do not match */ + protected def compareExpressions(e1: Expression, e2: Expression): Unit = { + comparePlans(Filter(e1, OneRowRelation), Filter(e2, OneRowRelation)) + } + + /** Fails the test if the two plans do not match */ + protected def comparePlans(plan1: LogicalPlan, plan2: LogicalPlan) { + val normalized1 = normalizeExprIds(plan1) + val normalized2 = normalizeExprIds(plan2) + if (normalized1 != normalized2) { + fail( + s""" + |== FAIL: Plans do not match === + |${sideBySide(normalized1.treeString, normalized2.treeString).mkString("\n")} + """.stripMargin) + } + } + + /** + * Since attribute references are given globally unique ids during analysis, + * we must normalize them to check if two different queries are identical. + */ + protected def normalizeExprIds(plan: LogicalPlan) = { + plan transformAllExpressions { + case a: AttributeReference => + AttributeReference(a.name, a.dataType, a.nullable)(exprId = ExprId(0)) + case a: Alias => + Alias(a.child, a.name)(exprId = ExprId(0)) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/462f6422/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala b/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala new file mode 100644 index 0000000..44d3bfa --- /dev/null +++ b/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.common.util + +import java.util.{Locale, TimeZone} + +import org.apache.carbondata.common.logging.LogServiceFactory + +import scala.collection.JavaConversions._ +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.{DataFrame, Row, SQLContext} + +class QueryTest extends PlanTest { + + val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + // Timezone is fixed to America/Los_Angeles for those timezone sensitive tests (timestamp_*) + TimeZone.setDefault(TimeZone.getTimeZone("America/Los_Angeles")) + // Add Locale setting + Locale.setDefault(Locale.US) + + /** + * Runs the plan and makes sure the answer contains all of the keywords, or the + * none of keywords are listed in the answer + * @param df the [[DataFrame]] to be executed + * @param exists true for make sure the keywords are listed in the output, otherwise + * to make sure none of the keyword are not listed in the output + * @param keywords keyword in string array + */ + def checkExistence(df: DataFrame, exists: Boolean, keywords: String*) { + val outputs = df.collect().map(_.mkString).mkString + for (key <- keywords) { + if (exists) { + assert(outputs.contains(key), s"Failed for $df ($key doesn't exist in result)") + } else { + assert(!outputs.contains(key), s"Failed for $df ($key existed in the result)") + } + } + } + + def sqlTest(sqlString: String, expectedAnswer: Seq[Row])(implicit sqlContext: SQLContext) { + test(sqlString) { + checkAnswer(sqlContext.sql(sqlString), expectedAnswer) + } + } + + /** + * Runs the plan and makes sure the answer matches the expected result. + * @param df the [[DataFrame]] to be executed + * @param expectedAnswer the expected result in a [[Seq]] of [[Row]]s. + */ + protected def checkAnswer(df: DataFrame, expectedAnswer: Seq[Row]): Unit = { + QueryTest.checkAnswer(df, expectedAnswer) match { + case Some(errorMessage) => fail(errorMessage) + case None => + } + } + + protected def checkAnswer(df: DataFrame, expectedAnswer: Row): Unit = { + checkAnswer(df, Seq(expectedAnswer)) + } + + protected def checkAnswer(df: DataFrame, expectedAnswer: DataFrame): Unit = { + checkAnswer(df, expectedAnswer.collect()) + } +} + +object QueryTest { + def checkAnswer(df: DataFrame, expectedAnswer: java.util.List[Row]): String = { + checkAnswer(df, expectedAnswer.toSeq) match { + case Some(errorMessage) => errorMessage + case None => null + } + } + + /** + * Runs the plan and makes sure the answer matches the expected result. + * If there was exception during the execution or the contents of the DataFrame does not + * match the expected result, an error message will be returned. Otherwise, a [[None]] will + * be returned. + * @param df the [[DataFrame]] to be executed + * @param expectedAnswer the expected result in a [[Seq]] of [[Row]]s. + */ + def checkAnswer(df: DataFrame, expectedAnswer: Seq[Row]): Option[String] = { + val isSorted = df.logicalPlan.collect { case s: logical.Sort => s }.nonEmpty + def prepareAnswer(answer: Seq[Row]): Seq[Row] = { + // Converts data to types that we can do equality comparison using Scala collections. + // For BigDecimal type, the Scala type has a better definition of equality test (similar to + // Java's java.math.BigDecimal.compareTo). + // For binary arrays, we convert it to Seq to avoid of calling java.util.Arrays.equals for + // equality test. + val converted: Seq[Row] = answer.map { s => + Row.fromSeq(s.toSeq.map { + case d: java.math.BigDecimal => BigDecimal(d) + case b: Array[Byte] => b.toSeq + case o => o + }) + } + if (!isSorted) converted.sortBy(_.toString()) else converted + } + val sparkAnswer = try df.collect().toSeq catch { + case e: Exception => + val errorMessage = + s""" + |Exception thrown while executing query: + |${df.queryExecution} + |== Exception == + |$e + |${org.apache.spark.sql.catalyst.util.stackTraceToString(e)} + """.stripMargin + return Some(errorMessage) + } + + if (prepareAnswer(expectedAnswer) != prepareAnswer(sparkAnswer)) { + val errorMessage = + s""" + |Results do not match for query: + |${df.queryExecution} + |== Results == + |${ + sideBySide( + s"== Correct Answer - ${expectedAnswer.size} ==" +: + prepareAnswer(expectedAnswer).map(_.toString()), + s"== Spark Answer - ${sparkAnswer.size} ==" +: + prepareAnswer(sparkAnswer).map(_.toString())).mkString("\n") + } + """.stripMargin + return Some(errorMessage) + } + + return None + } +}