Github user QiangCai commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1876#discussion_r164655879
  
    --- Diff: 
integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
 ---
    @@ -414,6 +416,75 @@ class StandardPartitionTableLoadingTestCase extends 
QueryTest with BeforeAndAfte
           sql("select * from  casesensitivepartition where empno=17"))
       }
     
    +  test("Partition LOAD with small files") {
    +    sql("DROP TABLE IF EXISTS smallpartitionfiles")
    +    sql(
    +      """
    +        | CREATE TABLE smallpartitionfiles(id INT, name STRING, age INT) 
PARTITIONED BY(city STRING)
    +        | STORED BY 'org.apache.carbondata.format'
    +      """.stripMargin)
    +    val inputPath = new File("target/small_files").getCanonicalPath
    +    val folder = new File(inputPath)
    +    if (folder.exists()) {
    +      FileUtils.deleteDirectory(folder)
    +    }
    +    folder.mkdir()
    +    for (i <- 0 to 100) {
    +      val file = s"$folder/file$i.csv"
    +      val writer = new FileWriter(file)
    +      writer.write("id,name,city,age\n")
    +      writer.write(s"$i,name_$i,city_${i % 5},${ i % 100 }")
    +      writer.close()
    +    }
    +    sql(s"LOAD DATA LOCAL INPATH '$inputPath' INTO TABLE 
smallpartitionfiles")
    +    FileUtils.deleteDirectory(folder)
    +    val carbonTable = 
CarbonMetadata.getInstance().getCarbonTable("default", "smallpartitionfiles")
    +    val carbonTablePath = 
CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
    +    val segmentDir = carbonTablePath.getSegmentDir("0", "0")
    +    assert(new File(segmentDir).listFiles().length < 50)
    +  }
    +
    +  test("verify partition read with small files") {
    +    try {
    +      
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION,
    +        CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_MERGE_FILES)
    +      sql("DROP TABLE IF EXISTS smallpartitionfilesread")
    +      sql(
    +        """
    +          | CREATE TABLE smallpartitionfilesread(id INT, name STRING, age 
INT) PARTITIONED BY
    +          | (city STRING)
    +          | STORED BY 'org.apache.carbondata.format'
    +        """.stripMargin)
    +      val inputPath = new File("target/small_files").getCanonicalPath
    +      val folder = new File(inputPath)
    +      if (folder.exists()) {
    +        FileUtils.deleteDirectory(folder)
    +      }
    +      folder.mkdir()
    +      for (i <- 0 until 100) {
    +        val file = s"$folder/file$i.csv"
    +        val writer = new FileWriter(file)
    +        writer.write("id,name,city,age\n")
    +        writer.write(s"$i,name_$i,city_${ i },${ i % 100 }")
    +        writer.close()
    +      }
    +      sql(s"LOAD DATA LOCAL INPATH '$inputPath' INTO TABLE 
smallpartitionfilesread")
    +      FileUtils.deleteDirectory(folder)
    +      val dataFrame = sql("select * from smallpartitionfilesread")
    +      val scanRdd = dataFrame.queryExecution.sparkPlan.collect {
    +        case b: BatchedDataSourceScanExec if 
b.rdd.isInstanceOf[CarbonScanRDD] => b.rdd
    +          .asInstanceOf[CarbonScanRDD]
    +      }.head
    +      assert(scanRdd.getPartitions.length < 10)
    +      assertResult(100)(dataFrame.collect().length)
    --- End diff --
    
    suggest to use dataFrame.count


---

Reply via email to