[CARBONDATA-2096] Add query test case for 'merge_small_files' distribution

Add query test case for 'merge_small_files' distribution

This closes #1882


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/d90280af
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/d90280af
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/d90280af

Branch: refs/heads/fgdatamap
Commit: d90280afc8adcab741c7aa29a99b450af78cd8e9
Parents: 24ba2fe
Author: QiangCai <qiang...@qq.com>
Authored: Tue Jan 30 17:07:24 2018 +0800
Committer: Jacky Li <jacky.li...@qq.com>
Committed: Wed Jan 31 19:21:04 2018 +0800

----------------------------------------------------------------------
 .../dataload/TestGlobalSortDataLoad.scala       | 27 ++++++++++++++++++--
 .../apache/spark/sql/test/util/QueryTest.scala  |  1 +
 2 files changed, 26 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/d90280af/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
index 9ce9675..50a38f1 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
@@ -25,14 +25,15 @@ import 
org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.spark.sql.Row
+import org.apache.spark.sql.execution.BatchedDataSourceScanExec
 import org.apache.spark.sql.test.TestQueryExecutor.projectPath
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
 
 import 
org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore
 import org.apache.carbondata.core.metadata.CarbonMetadata
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.spark.rdd.CarbonScanRDD
 
 class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with 
BeforeAndAfterAll {
   var filePath: String = s"$resourcesPath/globalsort"
@@ -272,7 +273,29 @@ class TestGlobalSortDataLoad extends QueryTest with 
BeforeAndAfterEach with Befo
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", 
"carbon_globalsort")
     val carbonTablePath = 
CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
     val segmentDir = carbonTablePath.getSegmentDir("0", "0")
-    assertResult(5)(new File(segmentDir).listFiles().length)
+    assertResult(Math.max(4, defaultParallelism) + 1)(new 
File(segmentDir).listFiles().length)
+  }
+
+  test("Query with small files") {
+    try {
+      CarbonProperties.getInstance().addProperty(
+        CarbonCommonConstants.CARBON_TASK_DISTRIBUTION,
+        CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_MERGE_FILES)
+      for (i <- 0 until 10) {
+        sql(s"insert into carbon_globalsort select $i, 'name_$i', 'city_$i', 
${ i % 100 }")
+      }
+      val df = sql("select * from carbon_globalsort")
+      val scanRdd = df.queryExecution.sparkPlan.collect {
+        case b: BatchedDataSourceScanExec if b.rdd.isInstanceOf[CarbonScanRDD] 
=>
+          b.rdd.asInstanceOf[CarbonScanRDD]
+      }.head
+      assertResult(defaultParallelism)(scanRdd.getPartitions.length)
+      assertResult(10)(df.count)
+    } finally {
+      CarbonProperties.getInstance().addProperty(
+        CarbonCommonConstants.CARBON_TASK_DISTRIBUTION,
+        CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT)
+    }
   }
 
   // ----------------------------------- INSERT INTO 
-----------------------------------

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d90280af/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala
index 0079d1e..b87473a 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala
@@ -107,6 +107,7 @@ class QueryTest extends PlanTest {
   val metastoredb = TestQueryExecutor.metastoredb
   val integrationPath = TestQueryExecutor.integrationPath
   val dblocation = TestQueryExecutor.location
+  val defaultParallelism = sqlContext.sparkContext.defaultParallelism
 }
 
 object QueryTest {

Reply via email to