Github user habren commented on a diff in the pull request: https://github.com/apache/spark/pull/21868#discussion_r210793717 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -425,12 +426,44 @@ case class FileSourceScanExec( fsRelation: HadoopFsRelation): RDD[InternalRow] = { val defaultMaxSplitBytes = fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes - val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes + var openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes val defaultParallelism = fsRelation.sparkSession.sparkContext.defaultParallelism val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum val bytesPerCore = totalBytes / defaultParallelism - val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) + var maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) + + if(fsRelation.sparkSession.sessionState.conf.isParquetSizeAdaptiveEnabled && + (fsRelation.fileFormat.isInstanceOf[ParquetSource] || + fsRelation.fileFormat.isInstanceOf[OrcFileFormat])) { + if (relation.dataSchema.map(_.dataType).forall(dataType => + dataType.isInstanceOf[CalendarIntervalType] || dataType.isInstanceOf[StructType] + || dataType.isInstanceOf[MapType] || dataType.isInstanceOf[NullType] + || dataType.isInstanceOf[AtomicType] || dataType.isInstanceOf[ArrayType])) { + + def getTypeLength(dataType: DataType): Int = { + if (dataType.isInstanceOf[StructType]) { + fsRelation.sparkSession.sessionState.conf.parquetStructTypeLength + } else if (dataType.isInstanceOf[ArrayType]) { + fsRelation.sparkSession.sessionState.conf.parquetArrayTypeLength + } else if (dataType.isInstanceOf[MapType]) { + fsRelation.sparkSession.sessionState.conf.parquetMapTypeLength + } else { + dataType.defaultSize + } + } + + val selectedColumnSize = requiredSchema.map(_.dataType).map(getTypeLength(_)) + .reduceOption(_ + _).getOrElse(StringType.defaultSize) + val totalColumnSize = relation.dataSchema.map(_.dataType).map(getTypeLength(_)) + .reduceOption(_ + _).getOrElse(StringType.defaultSize) --- End diff -- @gatorsmile The target of this change is not making users easy to set the partition size. Instead, when user set the partition size, this change will try its best to make sure the read size is close to the value that set by user. Without this change, when user set partition size to 128MB, the actual read size may be 1MB or even smaller because of column pruning.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org