Github user habren commented on a diff in the pull request: https://github.com/apache/spark/pull/21868#discussion_r210871055 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -425,12 +426,44 @@ case class FileSourceScanExec( fsRelation: HadoopFsRelation): RDD[InternalRow] = { val defaultMaxSplitBytes = fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes - val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes + var openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes val defaultParallelism = fsRelation.sparkSession.sparkContext.defaultParallelism val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum val bytesPerCore = totalBytes / defaultParallelism - val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) + var maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) + + if(fsRelation.sparkSession.sessionState.conf.isParquetSizeAdaptiveEnabled && + (fsRelation.fileFormat.isInstanceOf[ParquetSource] || + fsRelation.fileFormat.isInstanceOf[OrcFileFormat])) { + if (relation.dataSchema.map(_.dataType).forall(dataType => + dataType.isInstanceOf[CalendarIntervalType] || dataType.isInstanceOf[StructType] + || dataType.isInstanceOf[MapType] || dataType.isInstanceOf[NullType] + || dataType.isInstanceOf[AtomicType] || dataType.isInstanceOf[ArrayType])) { + + def getTypeLength(dataType: DataType): Int = { + if (dataType.isInstanceOf[StructType]) { + fsRelation.sparkSession.sessionState.conf.parquetStructTypeLength + } else if (dataType.isInstanceOf[ArrayType]) { + fsRelation.sparkSession.sessionState.conf.parquetArrayTypeLength + } else if (dataType.isInstanceOf[MapType]) { + fsRelation.sparkSession.sessionState.conf.parquetMapTypeLength + } else { + dataType.defaultSize + } + } + + val selectedColumnSize = requiredSchema.map(_.dataType).map(getTypeLength(_)) + .reduceOption(_ + _).getOrElse(StringType.defaultSize) + val totalColumnSize = relation.dataSchema.map(_.dataType).map(getTypeLength(_)) + .reduceOption(_ + _).getOrElse(StringType.defaultSize) --- End diff -- @HyukjinKwon I agree that the estimation is rough especially for complex type. For AtomicType, it works better. And at least it take column pruning into consideration.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org