Github user habren commented on a diff in the pull request: https://github.com/apache/spark/pull/21868#discussion_r210456342 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -401,12 +399,41 @@ case class FileSourceScanExec( fsRelation: HadoopFsRelation): RDD[InternalRow] = { val defaultMaxSplitBytes = fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes - val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes + var openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes val defaultParallelism = fsRelation.sparkSession.sparkContext.defaultParallelism val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum val bytesPerCore = totalBytes / defaultParallelism - val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) + var maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) + if(fsRelation.fileFormat.isInstanceOf[ParquetSource] && + fsRelation.sparkSession.sessionState.conf.isParquetSizeAdaptiveEnabled) { + if (relation.dataSchema.map(_.dataType).forall(dataType => + dataType.isInstanceOf[CalendarIntervalType] || dataType.isInstanceOf[StructType] + || dataType.isInstanceOf[MapType] || dataType.isInstanceOf[NullType] + || dataType.isInstanceOf[AtomicType] || dataType.isInstanceOf[ArrayType])) { + + def getTypeLength (dataType : DataType) : Int = { + if (dataType.isInstanceOf[StructType]) { + fsRelation.sparkSession.sessionState.conf.parquetStructTypeLength + } else if (dataType.isInstanceOf[ArrayType]) { + fsRelation.sparkSession.sessionState.conf.parquetArrayTypeLength + } else if (dataType.isInstanceOf[MapType]) { + fsRelation.sparkSession.sessionState.conf.parquetMapTypeLength + } else { + dataType.defaultSize + } + } + + val selectedColumnSize = requiredSchema.map(_.dataType).map(getTypeLength(_)) + .reduceOption(_ + _).getOrElse(StringType.defaultSize) + val totalColumnSize = relation.dataSchema.map(_.dataType).map(getTypeLength(_)) + .reduceOption(_ + _).getOrElse(StringType.defaultSize) + val multiplier = totalColumnSize / selectedColumnSize --- End diff -- @viirya Now it also support ORC. Please help to review
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org