Repository: incubator-carbondata
Updated Branches:
  refs/heads/master 526243b09 -> f5ecfbf5c


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/462f6422/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/CarbonFunSuite.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/CarbonFunSuite.scala
 
b/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/CarbonFunSuite.scala
new file mode 100644
index 0000000..4647e78
--- /dev/null
+++ 
b/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/CarbonFunSuite.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.common.util
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.scalatest.{FunSuite, Outcome}
+
+
+private[spark] abstract class CarbonFunSuite extends FunSuite {
+
+  private val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * Log the suite name and the test name before and after each test.
+   *
+   * Subclasses should never override this method. If they wish to run
+   * custom code before and after each test, they should should mix in
+   * the {{org.scalatest.BeforeAndAfter}} trait instead.
+   */
+  final protected override def withFixture(test: NoArgTest): Outcome = {
+    val testName = test.text
+    val suiteName = this.getClass.getName
+    val shortSuiteName = suiteName.replaceAll("org.apache.spark", "o.a.s")
+    try {
+      LOGGER.info(s"\n\n===== TEST OUTPUT FOR $shortSuiteName: '$testName' 
=====\n")
+      test()
+    } finally {
+      LOGGER.info(s"\n\n===== FINISHED $shortSuiteName: '$testName' =====\n")
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/462f6422/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/CarbonSessionTest.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/CarbonSessionTest.scala
 
b/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/CarbonSessionTest.scala
new file mode 100644
index 0000000..d29196e
--- /dev/null
+++ 
b/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/CarbonSessionTest.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.common.util
+
+import java.io.File
+
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.commons.io.FileUtils
+
+object CarbonSessionTest extends{
+
+    val rootPath = new File(this.getClass.getResource("/").getPath
+      + "../../../..").getCanonicalPath
+    val storeLocation = s"$rootPath/examples/spark2/target/store"
+    val warehouse = s"$rootPath/examples/spark2/target/warehouse"
+    val metastoredb = s"$rootPath/examples/spark2/target/metastore_db"
+
+    val spark = {
+
+        // clean data folder
+        if (true) {
+            val clean = (path: String) => FileUtils.deleteDirectory(new 
File(path))
+            clean(storeLocation)
+            clean(warehouse)
+            clean(metastoredb)
+        }
+
+        val spark = SparkSession
+          .builder()
+          .master("local")
+          .appName("CarbonExample")
+          .enableHiveSupport()
+          .config("spark.sql.warehouse.dir", warehouse)
+          .config("javax.jdo.option.ConnectionURL",
+              s"jdbc:derby:;databaseName=$metastoredb;create=true")
+          .getOrCreate()
+
+        CarbonProperties.getInstance()
+          .addProperty("carbon.kettle.home", 
s"$rootPath/processing/carbonplugins")
+          .addProperty("carbon.storelocation", storeLocation)
+
+        spark.sparkContext.setLogLevel("WARN")
+
+        spark
+    }
+
+    val sc = spark.sparkContext
+
+    lazy val implicits = spark.implicits
+
+    def sql(sqlText: String): DataFrame  = spark.sql(sqlText)
+
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/462f6422/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/PlanTest.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/PlanTest.scala
 
b/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/PlanTest.scala
new file mode 100644
index 0000000..cdd415f
--- /dev/null
+++ 
b/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/PlanTest.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.common.util
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
OneRowRelation}
+import org.apache.spark.sql.catalyst.util._
+
+/**
+ * Provides helper methods for comparing plans.
+ */
+class PlanTest extends CarbonFunSuite {
+
+  /** Fails the test if the two expressions do not match */
+  protected def compareExpressions(e1: Expression, e2: Expression): Unit = {
+    comparePlans(Filter(e1, OneRowRelation), Filter(e2, OneRowRelation))
+  }
+
+  /** Fails the test if the two plans do not match */
+  protected def comparePlans(plan1: LogicalPlan, plan2: LogicalPlan) {
+    val normalized1 = normalizeExprIds(plan1)
+    val normalized2 = normalizeExprIds(plan2)
+    if (normalized1 != normalized2) {
+      fail(
+        s"""
+           |== FAIL: Plans do not match ===
+           |${sideBySide(normalized1.treeString, 
normalized2.treeString).mkString("\n")}
+         """.stripMargin)
+    }
+  }
+
+  /**
+   * Since attribute references are given globally unique ids during analysis,
+   * we must normalize them to check if two different queries are identical.
+   */
+  protected def normalizeExprIds(plan: LogicalPlan) = {
+    plan transformAllExpressions {
+      case a: AttributeReference =>
+        AttributeReference(a.name, a.dataType, a.nullable)(exprId = ExprId(0))
+      case a: Alias =>
+        Alias(a.child, a.name)(exprId = ExprId(0))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/462f6422/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
 
b/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
new file mode 100644
index 0000000..44d3bfa
--- /dev/null
+++ 
b/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.common.util
+
+import java.util.{Locale, TimeZone}
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+
+import scala.collection.JavaConversions._
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.{DataFrame, Row, SQLContext}
+
+class QueryTest extends PlanTest {
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  // Timezone is fixed to America/Los_Angeles for those timezone sensitive 
tests (timestamp_*)
+  TimeZone.setDefault(TimeZone.getTimeZone("America/Los_Angeles"))
+  // Add Locale setting
+  Locale.setDefault(Locale.US)
+
+  /**
+   * Runs the plan and makes sure the answer contains all of the keywords, or 
the
+   * none of keywords are listed in the answer
+   * @param df the [[DataFrame]] to be executed
+   * @param exists true for make sure the keywords are listed in the output, 
otherwise
+   *               to make sure none of the keyword are not listed in the 
output
+   * @param keywords keyword in string array
+   */
+  def checkExistence(df: DataFrame, exists: Boolean, keywords: String*) {
+    val outputs = df.collect().map(_.mkString).mkString
+    for (key <- keywords) {
+      if (exists) {
+        assert(outputs.contains(key), s"Failed for $df ($key doesn't exist in 
result)")
+      } else {
+        assert(!outputs.contains(key), s"Failed for $df ($key existed in the 
result)")
+      }
+    }
+  }
+
+  def sqlTest(sqlString: String, expectedAnswer: Seq[Row])(implicit 
sqlContext: SQLContext) {
+    test(sqlString) {
+      checkAnswer(sqlContext.sql(sqlString), expectedAnswer)
+    }
+  }
+
+  /**
+   * Runs the plan and makes sure the answer matches the expected result.
+   * @param df the [[DataFrame]] to be executed
+   * @param expectedAnswer the expected result in a [[Seq]] of [[Row]]s.
+   */
+  protected def checkAnswer(df: DataFrame, expectedAnswer: Seq[Row]): Unit = {
+    QueryTest.checkAnswer(df, expectedAnswer) match {
+      case Some(errorMessage) => fail(errorMessage)
+      case None =>
+    }
+  }
+
+  protected def checkAnswer(df: DataFrame, expectedAnswer: Row): Unit = {
+    checkAnswer(df, Seq(expectedAnswer))
+  }
+
+  protected def checkAnswer(df: DataFrame, expectedAnswer: DataFrame): Unit = {
+    checkAnswer(df, expectedAnswer.collect())
+  }
+}
+
+object QueryTest {
+  def checkAnswer(df: DataFrame, expectedAnswer: java.util.List[Row]): String 
= {
+    checkAnswer(df, expectedAnswer.toSeq) match {
+      case Some(errorMessage) => errorMessage
+      case None => null
+    }
+  }
+
+  /**
+   * Runs the plan and makes sure the answer matches the expected result.
+   * If there was exception during the execution or the contents of the 
DataFrame does not
+   * match the expected result, an error message will be returned. Otherwise, 
a [[None]] will
+   * be returned.
+   * @param df the [[DataFrame]] to be executed
+   * @param expectedAnswer the expected result in a [[Seq]] of [[Row]]s.
+   */
+  def checkAnswer(df: DataFrame, expectedAnswer: Seq[Row]): Option[String] = {
+    val isSorted = df.logicalPlan.collect { case s: logical.Sort => s 
}.nonEmpty
+    def prepareAnswer(answer: Seq[Row]): Seq[Row] = {
+      // Converts data to types that we can do equality comparison using Scala 
collections.
+      // For BigDecimal type, the Scala type has a better definition of 
equality test (similar to
+      // Java's java.math.BigDecimal.compareTo).
+      // For binary arrays, we convert it to Seq to avoid of calling 
java.util.Arrays.equals for
+      // equality test.
+      val converted: Seq[Row] = answer.map { s =>
+        Row.fromSeq(s.toSeq.map {
+          case d: java.math.BigDecimal => BigDecimal(d)
+          case b: Array[Byte] => b.toSeq
+          case o => o
+        })
+      }
+      if (!isSorted) converted.sortBy(_.toString()) else converted
+    }
+    val sparkAnswer = try df.collect().toSeq catch {
+      case e: Exception =>
+        val errorMessage =
+          s"""
+             |Exception thrown while executing query:
+             |${df.queryExecution}
+             |== Exception ==
+             |$e
+             |${org.apache.spark.sql.catalyst.util.stackTraceToString(e)}
+          """.stripMargin
+        return Some(errorMessage)
+    }
+
+    if (prepareAnswer(expectedAnswer) != prepareAnswer(sparkAnswer)) {
+      val errorMessage =
+        s"""
+           |Results do not match for query:
+           |${df.queryExecution}
+           |== Results ==
+           |${
+          sideBySide(
+            s"== Correct Answer - ${expectedAnswer.size} ==" +:
+              prepareAnswer(expectedAnswer).map(_.toString()),
+            s"== Spark Answer - ${sparkAnswer.size} ==" +:
+              prepareAnswer(sparkAnswer).map(_.toString())).mkString("\n")
+        }
+      """.stripMargin
+      return Some(errorMessage)
+    }
+
+    return None
+  }
+}

Reply via email to