Github user QiangCai commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1876#discussion_r164655879 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala --- @@ -414,6 +416,75 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte sql("select * from casesensitivepartition where empno=17")) } + test("Partition LOAD with small files") { + sql("DROP TABLE IF EXISTS smallpartitionfiles") + sql( + """ + | CREATE TABLE smallpartitionfiles(id INT, name STRING, age INT) PARTITIONED BY(city STRING) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + val inputPath = new File("target/small_files").getCanonicalPath + val folder = new File(inputPath) + if (folder.exists()) { + FileUtils.deleteDirectory(folder) + } + folder.mkdir() + for (i <- 0 to 100) { + val file = s"$folder/file$i.csv" + val writer = new FileWriter(file) + writer.write("id,name,city,age\n") + writer.write(s"$i,name_$i,city_${i % 5},${ i % 100 }") + writer.close() + } + sql(s"LOAD DATA LOCAL INPATH '$inputPath' INTO TABLE smallpartitionfiles") + FileUtils.deleteDirectory(folder) + val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "smallpartitionfiles") + val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) + val segmentDir = carbonTablePath.getSegmentDir("0", "0") + assert(new File(segmentDir).listFiles().length < 50) + } + + test("verify partition read with small files") { + try { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION, + CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_MERGE_FILES) + sql("DROP TABLE IF EXISTS smallpartitionfilesread") + sql( + """ + | CREATE TABLE smallpartitionfilesread(id INT, name STRING, age INT) PARTITIONED BY + | (city STRING) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + val inputPath = new File("target/small_files").getCanonicalPath + val folder = new File(inputPath) + if (folder.exists()) { + FileUtils.deleteDirectory(folder) + } + folder.mkdir() + for (i <- 0 until 100) { + val file = s"$folder/file$i.csv" + val writer = new FileWriter(file) + writer.write("id,name,city,age\n") + writer.write(s"$i,name_$i,city_${ i },${ i % 100 }") + writer.close() + } + sql(s"LOAD DATA LOCAL INPATH '$inputPath' INTO TABLE smallpartitionfilesread") + FileUtils.deleteDirectory(folder) + val dataFrame = sql("select * from smallpartitionfilesread") + val scanRdd = dataFrame.queryExecution.sparkPlan.collect { + case b: BatchedDataSourceScanExec if b.rdd.isInstanceOf[CarbonScanRDD] => b.rdd + .asInstanceOf[CarbonScanRDD] + }.head + assert(scanRdd.getPartitions.length < 10) + assertResult(100)(dataFrame.collect().length) --- End diff -- suggest to use dataFrame.count
---