[CARBONDATA-2096] Add query test case for 'merge_small_files' distribution Add query test case for 'merge_small_files' distribution
This closes #1882 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/d90280af Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/d90280af Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/d90280af Branch: refs/heads/fgdatamap Commit: d90280afc8adcab741c7aa29a99b450af78cd8e9 Parents: 24ba2fe Author: QiangCai <qiang...@qq.com> Authored: Tue Jan 30 17:07:24 2018 +0800 Committer: Jacky Li <jacky.li...@qq.com> Committed: Wed Jan 31 19:21:04 2018 +0800 ---------------------------------------------------------------------- .../dataload/TestGlobalSortDataLoad.scala | 27 ++++++++++++++++++-- .../apache/spark/sql/test/util/QueryTest.scala | 1 + 2 files changed, 26 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/d90280af/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala index 9ce9675..50a38f1 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala @@ -25,14 +25,15 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.spark.exception.MalformedCarbonCommandException import org.apache.spark.sql.Row +import org.apache.spark.sql.execution.BatchedDataSourceScanExec import org.apache.spark.sql.test.TestQueryExecutor.projectPath import org.apache.spark.sql.test.util.QueryTest import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore import org.apache.carbondata.core.metadata.CarbonMetadata -import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.util.path.CarbonStorePath +import org.apache.carbondata.spark.rdd.CarbonScanRDD class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll { var filePath: String = s"$resourcesPath/globalsort" @@ -272,7 +273,29 @@ class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with Befo val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "carbon_globalsort") val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) val segmentDir = carbonTablePath.getSegmentDir("0", "0") - assertResult(5)(new File(segmentDir).listFiles().length) + assertResult(Math.max(4, defaultParallelism) + 1)(new File(segmentDir).listFiles().length) + } + + test("Query with small files") { + try { + CarbonProperties.getInstance().addProperty( + CarbonCommonConstants.CARBON_TASK_DISTRIBUTION, + CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_MERGE_FILES) + for (i <- 0 until 10) { + sql(s"insert into carbon_globalsort select $i, 'name_$i', 'city_$i', ${ i % 100 }") + } + val df = sql("select * from carbon_globalsort") + val scanRdd = df.queryExecution.sparkPlan.collect { + case b: BatchedDataSourceScanExec if b.rdd.isInstanceOf[CarbonScanRDD] => + b.rdd.asInstanceOf[CarbonScanRDD] + }.head + assertResult(defaultParallelism)(scanRdd.getPartitions.length) + assertResult(10)(df.count) + } finally { + CarbonProperties.getInstance().addProperty( + CarbonCommonConstants.CARBON_TASK_DISTRIBUTION, + CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT) + } } // ----------------------------------- INSERT INTO ----------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/d90280af/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala index 0079d1e..b87473a 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala @@ -107,6 +107,7 @@ class QueryTest extends PlanTest { val metastoredb = TestQueryExecutor.metastoredb val integrationPath = TestQueryExecutor.integrationPath val dblocation = TestQueryExecutor.location + val defaultParallelism = sqlContext.sparkContext.defaultParallelism } object QueryTest {