Github user habren commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21868#discussion_r210793717
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 
---
    @@ -425,12 +426,44 @@ case class FileSourceScanExec(
           fsRelation: HadoopFsRelation): RDD[InternalRow] = {
         val defaultMaxSplitBytes =
           fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes
    -    val openCostInBytes = 
fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
    +    var openCostInBytes = 
fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
         val defaultParallelism = 
fsRelation.sparkSession.sparkContext.defaultParallelism
         val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + 
openCostInBytes)).sum
         val bytesPerCore = totalBytes / defaultParallelism
     
    -    val maxSplitBytes = Math.min(defaultMaxSplitBytes, 
Math.max(openCostInBytes, bytesPerCore))
    +    var maxSplitBytes = Math.min(defaultMaxSplitBytes, 
Math.max(openCostInBytes, bytesPerCore))
    +
    +    
if(fsRelation.sparkSession.sessionState.conf.isParquetSizeAdaptiveEnabled &&
    +      (fsRelation.fileFormat.isInstanceOf[ParquetSource] ||
    +        fsRelation.fileFormat.isInstanceOf[OrcFileFormat])) {
    +      if (relation.dataSchema.map(_.dataType).forall(dataType =>
    +        dataType.isInstanceOf[CalendarIntervalType] || 
dataType.isInstanceOf[StructType]
    +          || dataType.isInstanceOf[MapType] || 
dataType.isInstanceOf[NullType]
    +          || dataType.isInstanceOf[AtomicType] || 
dataType.isInstanceOf[ArrayType])) {
    +
    +        def getTypeLength(dataType: DataType): Int = {
    +          if (dataType.isInstanceOf[StructType]) {
    +            
fsRelation.sparkSession.sessionState.conf.parquetStructTypeLength
    +          } else if (dataType.isInstanceOf[ArrayType]) {
    +            
fsRelation.sparkSession.sessionState.conf.parquetArrayTypeLength
    +          } else if (dataType.isInstanceOf[MapType]) {
    +            fsRelation.sparkSession.sessionState.conf.parquetMapTypeLength
    +          } else {
    +            dataType.defaultSize
    +          }
    +        }
    +
    +        val selectedColumnSize = 
requiredSchema.map(_.dataType).map(getTypeLength(_))
    +          .reduceOption(_ + _).getOrElse(StringType.defaultSize)
    +        val totalColumnSize = 
relation.dataSchema.map(_.dataType).map(getTypeLength(_))
    +          .reduceOption(_ + _).getOrElse(StringType.defaultSize)
    --- End diff --
    
    @gatorsmile  The target of this change is not making users easy to set the 
partition size. Instead, when user set the partition size, this change will try 
its best to make sure the read size is  close to the value that set by user. 
Without this change, when user set partition size to 128MB, the actual read 
size may be 1MB or even smaller because of column pruning.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to