Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21868#discussion_r210799950
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 
---
    @@ -425,12 +426,44 @@ case class FileSourceScanExec(
           fsRelation: HadoopFsRelation): RDD[InternalRow] = {
         val defaultMaxSplitBytes =
           fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes
    -    val openCostInBytes = 
fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
    +    var openCostInBytes = 
fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
         val defaultParallelism = 
fsRelation.sparkSession.sparkContext.defaultParallelism
         val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + 
openCostInBytes)).sum
         val bytesPerCore = totalBytes / defaultParallelism
     
    -    val maxSplitBytes = Math.min(defaultMaxSplitBytes, 
Math.max(openCostInBytes, bytesPerCore))
    +    var maxSplitBytes = Math.min(defaultMaxSplitBytes, 
Math.max(openCostInBytes, bytesPerCore))
    +
    +    
if(fsRelation.sparkSession.sessionState.conf.isParquetSizeAdaptiveEnabled &&
    +      (fsRelation.fileFormat.isInstanceOf[ParquetSource] ||
    +        fsRelation.fileFormat.isInstanceOf[OrcFileFormat])) {
    +      if (relation.dataSchema.map(_.dataType).forall(dataType =>
    +        dataType.isInstanceOf[CalendarIntervalType] || 
dataType.isInstanceOf[StructType]
    +          || dataType.isInstanceOf[MapType] || 
dataType.isInstanceOf[NullType]
    +          || dataType.isInstanceOf[AtomicType] || 
dataType.isInstanceOf[ArrayType])) {
    +
    +        def getTypeLength(dataType: DataType): Int = {
    +          if (dataType.isInstanceOf[StructType]) {
    +            
fsRelation.sparkSession.sessionState.conf.parquetStructTypeLength
    +          } else if (dataType.isInstanceOf[ArrayType]) {
    +            
fsRelation.sparkSession.sessionState.conf.parquetArrayTypeLength
    +          } else if (dataType.isInstanceOf[MapType]) {
    +            fsRelation.sparkSession.sessionState.conf.parquetMapTypeLength
    +          } else {
    +            dataType.defaultSize
    +          }
    +        }
    +
    +        val selectedColumnSize = 
requiredSchema.map(_.dataType).map(getTypeLength(_))
    +          .reduceOption(_ + _).getOrElse(StringType.defaultSize)
    +        val totalColumnSize = 
relation.dataSchema.map(_.dataType).map(getTypeLength(_))
    +          .reduceOption(_ + _).getOrElse(StringType.defaultSize)
    --- End diff --
    
    I think his point is that the estimation is super rough which I agree with 
.. I am less sure if we should go ahead or not partially by this reason as well.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to