[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21868 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...
Github user habren commented on a diff in the pull request: https://github.com/apache/spark/pull/21868#discussion_r210890027 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -459,6 +460,29 @@ object SQLConf { .intConf .createWithDefault(4096) + val IS_PARQUET_PARTITION_ADAPTIVE_ENABLED = buildConf("spark.sql.parquet.adaptiveFileSplit") +.doc("For columnar file format (e.g., Parquet), it's possible that only few (not all) " + + "columns are needed. So, it's better to make sure that the total size of the selected " + + "columns is about 128 MB " +) +.booleanConf +.createWithDefault(false) + + val PARQUET_STRUCT_LENGTH = buildConf("spark.sql.parquet.struct.length") +.doc("Set the default size of struct column") +.intConf +.createWithDefault(StringType.defaultSize) + + val PARQUET_MAP_LENGTH = buildConf("spark.sql.parquet.map.length") --- End diff -- @HyukjinKwon @viirya Setting spark.sql.files.maxPartitionBytes explicitly do works. For you or other advanced users, it's convenient to set a bigger number of maxPartitionBytes. But for ad-hoc query, the selected columns are different for different queries, and it's not convenient or event impossible for users to set different maxPartitionBytes for different queries. And for general user (non advanced user), it's not easy for them to calculate a proper value of maxPartitionBytes. You know, in many big company, there may be one or few teams are familiar with the details of Spark, and they maintain the Spark cluster. Other teams are general users of Spark and they care more about their business, such as data warehouse build up and recommendation algorithm. This feature try to handle it dynamically even the users are not familiar with Spark. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...
Github user habren commented on a diff in the pull request: https://github.com/apache/spark/pull/21868#discussion_r210887543 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -459,6 +460,29 @@ object SQLConf { .intConf .createWithDefault(4096) + val IS_PARQUET_PARTITION_ADAPTIVE_ENABLED = buildConf("spark.sql.parquet.adaptiveFileSplit") +.doc("For columnar file format (e.g., Parquet), it's possible that only few (not all) " + + "columns are needed. So, it's better to make sure that the total size of the selected " + + "columns is about 128 MB " --- End diff -- I updated the description just now. Please help to review it again. Thanks a lot @HyukjinKwon --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...
Github user habren commented on a diff in the pull request: https://github.com/apache/spark/pull/21868#discussion_r210887308 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -459,6 +460,29 @@ object SQLConf { .intConf .createWithDefault(4096) + val IS_PARQUET_PARTITION_ADAPTIVE_ENABLED = buildConf("spark.sql.parquet.adaptiveFileSplit") +.doc("For columnar file format (e.g., Parquet), it's possible that only few (not all) " + --- End diff -- I updated it accordingly --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...
Github user habren commented on a diff in the pull request: https://github.com/apache/spark/pull/21868#discussion_r210886442 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -25,17 +25,16 @@ import java.util.zip.Deflater import scala.collection.JavaConverters._ import scala.collection.immutable import scala.util.matching.Regex - --- End diff -- @viirya Ok, I added it back --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...
Github user habren commented on a diff in the pull request: https://github.com/apache/spark/pull/21868#discussion_r210876154 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -459,6 +460,29 @@ object SQLConf { .intConf .createWithDefault(4096) + val IS_PARQUET_PARTITION_ADAPTIVE_ENABLED = buildConf("spark.sql.parquet.adaptiveFileSplit") --- End diff -- @HyukjinKwon Yes, setting spark.sql.files.maxPartitionBytes explicitly do works. But for ad-hoc query, the selected columns are different for different queries, and it's not convenient or event impossible for users to set different maxPartitionBytes for different queries. And for general user (non advanced user), it's not easy for them to calculate a proper value of maxPartitionBytes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...
Github user habren commented on a diff in the pull request: https://github.com/apache/spark/pull/21868#discussion_r210871055 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -425,12 +426,44 @@ case class FileSourceScanExec( fsRelation: HadoopFsRelation): RDD[InternalRow] = { val defaultMaxSplitBytes = fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes -val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes +var openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes val defaultParallelism = fsRelation.sparkSession.sparkContext.defaultParallelism val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum val bytesPerCore = totalBytes / defaultParallelism -val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) +var maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) + + if(fsRelation.sparkSession.sessionState.conf.isParquetSizeAdaptiveEnabled && + (fsRelation.fileFormat.isInstanceOf[ParquetSource] || +fsRelation.fileFormat.isInstanceOf[OrcFileFormat])) { + if (relation.dataSchema.map(_.dataType).forall(dataType => +dataType.isInstanceOf[CalendarIntervalType] || dataType.isInstanceOf[StructType] + || dataType.isInstanceOf[MapType] || dataType.isInstanceOf[NullType] + || dataType.isInstanceOf[AtomicType] || dataType.isInstanceOf[ArrayType])) { + +def getTypeLength(dataType: DataType): Int = { + if (dataType.isInstanceOf[StructType]) { + fsRelation.sparkSession.sessionState.conf.parquetStructTypeLength + } else if (dataType.isInstanceOf[ArrayType]) { + fsRelation.sparkSession.sessionState.conf.parquetArrayTypeLength + } else if (dataType.isInstanceOf[MapType]) { +fsRelation.sparkSession.sessionState.conf.parquetMapTypeLength + } else { +dataType.defaultSize + } +} + +val selectedColumnSize = requiredSchema.map(_.dataType).map(getTypeLength(_)) + .reduceOption(_ + _).getOrElse(StringType.defaultSize) +val totalColumnSize = relation.dataSchema.map(_.dataType).map(getTypeLength(_)) + .reduceOption(_ + _).getOrElse(StringType.defaultSize) --- End diff -- @HyukjinKwon I agree that the estimation is rough especially for complex type. For AtomicType, it works better. And at least it take column pruning into consideration. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21868#discussion_r210799950 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -425,12 +426,44 @@ case class FileSourceScanExec( fsRelation: HadoopFsRelation): RDD[InternalRow] = { val defaultMaxSplitBytes = fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes -val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes +var openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes val defaultParallelism = fsRelation.sparkSession.sparkContext.defaultParallelism val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum val bytesPerCore = totalBytes / defaultParallelism -val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) +var maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) + + if(fsRelation.sparkSession.sessionState.conf.isParquetSizeAdaptiveEnabled && + (fsRelation.fileFormat.isInstanceOf[ParquetSource] || +fsRelation.fileFormat.isInstanceOf[OrcFileFormat])) { + if (relation.dataSchema.map(_.dataType).forall(dataType => +dataType.isInstanceOf[CalendarIntervalType] || dataType.isInstanceOf[StructType] + || dataType.isInstanceOf[MapType] || dataType.isInstanceOf[NullType] + || dataType.isInstanceOf[AtomicType] || dataType.isInstanceOf[ArrayType])) { + +def getTypeLength(dataType: DataType): Int = { + if (dataType.isInstanceOf[StructType]) { + fsRelation.sparkSession.sessionState.conf.parquetStructTypeLength + } else if (dataType.isInstanceOf[ArrayType]) { + fsRelation.sparkSession.sessionState.conf.parquetArrayTypeLength + } else if (dataType.isInstanceOf[MapType]) { +fsRelation.sparkSession.sessionState.conf.parquetMapTypeLength + } else { +dataType.defaultSize + } +} + +val selectedColumnSize = requiredSchema.map(_.dataType).map(getTypeLength(_)) + .reduceOption(_ + _).getOrElse(StringType.defaultSize) +val totalColumnSize = relation.dataSchema.map(_.dataType).map(getTypeLength(_)) + .reduceOption(_ + _).getOrElse(StringType.defaultSize) --- End diff -- I think his point is that the estimation is super rough which I agree with .. I am less sure if we should go ahead or not partially by this reason as well. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21868#discussion_r210799970 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -459,6 +460,29 @@ object SQLConf { .intConf .createWithDefault(4096) + val IS_PARQUET_PARTITION_ADAPTIVE_ENABLED = buildConf("spark.sql.parquet.adaptiveFileSplit") +.doc("For columnar file format (e.g., Parquet), it's possible that only few (not all) " + + "columns are needed. So, it's better to make sure that the total size of the selected " + + "columns is about 128 MB " +) +.booleanConf +.createWithDefault(false) + + val PARQUET_STRUCT_LENGTH = buildConf("spark.sql.parquet.struct.length") +.doc("Set the default size of struct column") +.intConf +.createWithDefault(StringType.defaultSize) + + val PARQUET_MAP_LENGTH = buildConf("spark.sql.parquet.map.length") --- End diff -- Yeah, I was thinking that. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21868#discussion_r210799891 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -459,6 +460,29 @@ object SQLConf { .intConf .createWithDefault(4096) + val IS_PARQUET_PARTITION_ADAPTIVE_ENABLED = buildConf("spark.sql.parquet.adaptiveFileSplit") +.doc("For columnar file format (e.g., Parquet), it's possible that only few (not all) " + + "columns are needed. So, it's better to make sure that the total size of the selected " + + "columns is about 128 MB " +) +.booleanConf +.createWithDefault(false) + + val PARQUET_STRUCT_LENGTH = buildConf("spark.sql.parquet.struct.length") +.doc("Set the default size of struct column") +.intConf +.createWithDefault(StringType.defaultSize) + + val PARQUET_MAP_LENGTH = buildConf("spark.sql.parquet.map.length") --- End diff -- I wouldn't do this. This makes more complicated and I would just set a bigger number for `maxPartitionBytes`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21868#discussion_r210799770 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -459,6 +460,29 @@ object SQLConf { .intConf .createWithDefault(4096) + val IS_PARQUET_PARTITION_ADAPTIVE_ENABLED = buildConf("spark.sql.parquet.adaptiveFileSplit") +.doc("For columnar file format (e.g., Parquet), it's possible that only few (not all) " + + "columns are needed. So, it's better to make sure that the total size of the selected " + + "columns is about 128 MB " --- End diff -- It sounds not describing what the configuration does actually. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21868#discussion_r210799731 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -459,6 +460,29 @@ object SQLConf { .intConf .createWithDefault(4096) + val IS_PARQUET_PARTITION_ADAPTIVE_ENABLED = buildConf("spark.sql.parquet.adaptiveFileSplit") +.doc("For columnar file format (e.g., Parquet), it's possible that only few (not all) " + --- End diff -- `it's` I would avoid abbreviation. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21868#discussion_r210799600 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -459,6 +460,29 @@ object SQLConf { .intConf .createWithDefault(4096) + val IS_PARQUET_PARTITION_ADAPTIVE_ENABLED = buildConf("spark.sql.parquet.adaptiveFileSplit") --- End diff -- This configuration doesn't look specific to parquet anymore. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...
Github user habren commented on a diff in the pull request: https://github.com/apache/spark/pull/21868#discussion_r210793717 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -425,12 +426,44 @@ case class FileSourceScanExec( fsRelation: HadoopFsRelation): RDD[InternalRow] = { val defaultMaxSplitBytes = fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes -val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes +var openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes val defaultParallelism = fsRelation.sparkSession.sparkContext.defaultParallelism val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum val bytesPerCore = totalBytes / defaultParallelism -val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) +var maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) + + if(fsRelation.sparkSession.sessionState.conf.isParquetSizeAdaptiveEnabled && + (fsRelation.fileFormat.isInstanceOf[ParquetSource] || +fsRelation.fileFormat.isInstanceOf[OrcFileFormat])) { + if (relation.dataSchema.map(_.dataType).forall(dataType => +dataType.isInstanceOf[CalendarIntervalType] || dataType.isInstanceOf[StructType] + || dataType.isInstanceOf[MapType] || dataType.isInstanceOf[NullType] + || dataType.isInstanceOf[AtomicType] || dataType.isInstanceOf[ArrayType])) { + +def getTypeLength(dataType: DataType): Int = { + if (dataType.isInstanceOf[StructType]) { + fsRelation.sparkSession.sessionState.conf.parquetStructTypeLength + } else if (dataType.isInstanceOf[ArrayType]) { + fsRelation.sparkSession.sessionState.conf.parquetArrayTypeLength + } else if (dataType.isInstanceOf[MapType]) { +fsRelation.sparkSession.sessionState.conf.parquetMapTypeLength + } else { +dataType.defaultSize + } +} + +val selectedColumnSize = requiredSchema.map(_.dataType).map(getTypeLength(_)) + .reduceOption(_ + _).getOrElse(StringType.defaultSize) +val totalColumnSize = relation.dataSchema.map(_.dataType).map(getTypeLength(_)) + .reduceOption(_ + _).getOrElse(StringType.defaultSize) --- End diff -- @gatorsmile The target of this change is not making users easy to set the partition size. Instead, when user set the partition size, this change will try its best to make sure the read size is close to the value that set by user. Without this change, when user set partition size to 128MB, the actual read size may be 1MB or even smaller because of column pruning. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21868#discussion_r210785081 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -459,6 +458,29 @@ object SQLConf { .intConf .createWithDefault(4096) + val IS_PARQUET_PARTITION_ADAPTIVE_ENABLED = buildConf("spark.sql.parquet.adaptiveFileSplit") +.doc("For columnar file format (e.g., Parquet), it's possible that only few (not all) " + + "columns are needed. So, it's better to make sure that the total size of the selected " + + "columns is about 128 MB " +) +.booleanConf +.createWithDefault(false) + + val PARQUET_STRUCT_LENGTH = buildConf("spark.sql.parquet.struct.length") +.doc("Set the default size of struct column") +.intConf +.createWithDefault(StringType.defaultSize) --- End diff -- And these configs assume that different storage formats use the same size? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21868#discussion_r210779310 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -459,6 +458,29 @@ object SQLConf { .intConf .createWithDefault(4096) + val IS_PARQUET_PARTITION_ADAPTIVE_ENABLED = buildConf("spark.sql.parquet.adaptiveFileSplit") +.doc("For columnar file format (e.g., Parquet), it's possible that only few (not all) " + + "columns are needed. So, it's better to make sure that the total size of the selected " + + "columns is about 128 MB " +) +.booleanConf +.createWithDefault(false) + + val PARQUET_STRUCT_LENGTH = buildConf("spark.sql.parquet.struct.length") +.doc("Set the default size of struct column") +.intConf +.createWithDefault(StringType.defaultSize) + + val PARQUET_MAP_LENGTH = buildConf("spark.sql.parquet.map.length") +.doc("Set the default size of map column") +.intConf +.createWithDefault(StringType.defaultSize) + + val PARQUET_ARRAY_LENGTH = buildConf("spark.sql.parquet.array.length") +.doc("Set the default size of array column") +.intConf +.createWithDefault(StringType.defaultSize) --- End diff -- This feature includes so many configs, my concern is it is hard for end users to set them. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21868#discussion_r210765335 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -25,17 +25,16 @@ import java.util.zip.Deflater import scala.collection.JavaConverters._ import scala.collection.immutable import scala.util.matching.Regex - --- End diff -- Please don't remove these blank lines. Can you revert it back? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21868#discussion_r210494497 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -425,12 +426,44 @@ case class FileSourceScanExec( fsRelation: HadoopFsRelation): RDD[InternalRow] = { val defaultMaxSplitBytes = fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes -val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes +var openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes val defaultParallelism = fsRelation.sparkSession.sparkContext.defaultParallelism val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum val bytesPerCore = totalBytes / defaultParallelism -val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) +var maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) + + if(fsRelation.sparkSession.sessionState.conf.isParquetSizeAdaptiveEnabled && + (fsRelation.fileFormat.isInstanceOf[ParquetSource] || +fsRelation.fileFormat.isInstanceOf[OrcFileFormat])) { + if (relation.dataSchema.map(_.dataType).forall(dataType => +dataType.isInstanceOf[CalendarIntervalType] || dataType.isInstanceOf[StructType] + || dataType.isInstanceOf[MapType] || dataType.isInstanceOf[NullType] + || dataType.isInstanceOf[AtomicType] || dataType.isInstanceOf[ArrayType])) { + +def getTypeLength(dataType: DataType): Int = { + if (dataType.isInstanceOf[StructType]) { + fsRelation.sparkSession.sessionState.conf.parquetStructTypeLength + } else if (dataType.isInstanceOf[ArrayType]) { + fsRelation.sparkSession.sessionState.conf.parquetArrayTypeLength + } else if (dataType.isInstanceOf[MapType]) { +fsRelation.sparkSession.sessionState.conf.parquetMapTypeLength + } else { +dataType.defaultSize + } +} + +val selectedColumnSize = requiredSchema.map(_.dataType).map(getTypeLength(_)) + .reduceOption(_ + _).getOrElse(StringType.defaultSize) +val totalColumnSize = relation.dataSchema.map(_.dataType).map(getTypeLength(_)) + .reduceOption(_ + _).getOrElse(StringType.defaultSize) --- End diff -- The type based estimation is very rough. This is still hard for end users to decide the initial size. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...
Github user habren commented on a diff in the pull request: https://github.com/apache/spark/pull/21868#discussion_r210456342 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -401,12 +399,41 @@ case class FileSourceScanExec( fsRelation: HadoopFsRelation): RDD[InternalRow] = { val defaultMaxSplitBytes = fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes -val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes +var openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes val defaultParallelism = fsRelation.sparkSession.sparkContext.defaultParallelism val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum val bytesPerCore = totalBytes / defaultParallelism -val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) +var maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) +if(fsRelation.fileFormat.isInstanceOf[ParquetSource] && + fsRelation.sparkSession.sessionState.conf.isParquetSizeAdaptiveEnabled) { + if (relation.dataSchema.map(_.dataType).forall(dataType => +dataType.isInstanceOf[CalendarIntervalType] || dataType.isInstanceOf[StructType] + || dataType.isInstanceOf[MapType] || dataType.isInstanceOf[NullType] + || dataType.isInstanceOf[AtomicType] || dataType.isInstanceOf[ArrayType])) { + +def getTypeLength (dataType : DataType) : Int = { + if (dataType.isInstanceOf[StructType]) { + fsRelation.sparkSession.sessionState.conf.parquetStructTypeLength + } else if (dataType.isInstanceOf[ArrayType]) { + fsRelation.sparkSession.sessionState.conf.parquetArrayTypeLength + } else if (dataType.isInstanceOf[MapType]) { +fsRelation.sparkSession.sessionState.conf.parquetMapTypeLength + } else { +dataType.defaultSize + } +} + +val selectedColumnSize = requiredSchema.map(_.dataType).map(getTypeLength(_)) + .reduceOption(_ + _).getOrElse(StringType.defaultSize) +val totalColumnSize = relation.dataSchema.map(_.dataType).map(getTypeLength(_)) + .reduceOption(_ + _).getOrElse(StringType.defaultSize) +val multiplier = totalColumnSize / selectedColumnSize --- End diff -- @viirya Now it also support ORC. Please help to review --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...
GitHub user habren reopened a pull request: https://github.com/apache/spark/pull/21868 [SPARK-24906][SQL] Adaptively enlarge split / partition size for Parq⦠Please refer to https://issues.apache.org/jira/browse/SPARK-24906 for more detail and test For columnar file, such as, when spark sql read the table, each split will be 128 MB by default since spark.sql.files.maxPartitionBytes is default to 128MB. Even when user set it to a large value, such as 512MB, the task may read only few MB or even hundreds of KB. Because the table (Parquet) may consists of dozens of columns while the SQL only need few columns. And spark will prune the unnecessary columns. In this case, spark DataSourceScanExec can enlarge maxPartitionBytes adaptively. For example, there is 40 columns , 20 are integer while another 20 are long. When use query on an integer type column and an long type column, the maxPartitionBytes should be 20 times larger. (20*4+20*8) / (4+8) = 20. With this optimization, the number of task will be smaller and the job will run faster. More importantly, for a very large cluster (more the 10 thousand nodes), it will relieve RM's schedule pressure. Here is the test The table named test2 has more than 40 columns and there are more than 5 TB data each hour. When we issue a very simple query ` select count(device_id) from test2 where date=20180708 and hour='23'` There are 72176 tasks and the duration of the job is 4.8 minutes Most tasks last less than 1 second and read less than 1.5 MB data After the optimization, there are only 1615 tasks and the job last only 30 seconds. It almost 10 times faster. The median of read data is 44.2MB. https://issues.apache.org/jira/browse/SPARK-24906 You can merge this pull request into a Git repository by running: $ git pull https://github.com/habren/spark SPARK-24906 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21868.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21868 commit e34aaa2fc0c1ebf87028d834ea5e9a61bc026bc6 Author: Jason Guo Date: 2018-07-25T02:18:22Z [SPARK-24906][SQL] Adaptively enlarge split / partition size for Parquet scan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...
Github user habren closed the pull request at: https://github.com/apache/spark/pull/21868 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...
Github user habren commented on a diff in the pull request: https://github.com/apache/spark/pull/21868#discussion_r205356861 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -401,12 +399,41 @@ case class FileSourceScanExec( fsRelation: HadoopFsRelation): RDD[InternalRow] = { val defaultMaxSplitBytes = fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes -val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes +var openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes val defaultParallelism = fsRelation.sparkSession.sparkContext.defaultParallelism val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum val bytesPerCore = totalBytes / defaultParallelism -val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) +var maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) +if(fsRelation.fileFormat.isInstanceOf[ParquetSource] && + fsRelation.sparkSession.sessionState.conf.isParquetSizeAdaptiveEnabled) { + if (relation.dataSchema.map(_.dataType).forall(dataType => +dataType.isInstanceOf[CalendarIntervalType] || dataType.isInstanceOf[StructType] + || dataType.isInstanceOf[MapType] || dataType.isInstanceOf[NullType] + || dataType.isInstanceOf[AtomicType] || dataType.isInstanceOf[ArrayType])) { + +def getTypeLength (dataType : DataType) : Int = { + if (dataType.isInstanceOf[StructType]) { + fsRelation.sparkSession.sessionState.conf.parquetStructTypeLength + } else if (dataType.isInstanceOf[ArrayType]) { + fsRelation.sparkSession.sessionState.conf.parquetArrayTypeLength + } else if (dataType.isInstanceOf[MapType]) { +fsRelation.sparkSession.sessionState.conf.parquetMapTypeLength + } else { +dataType.defaultSize + } +} + +val selectedColumnSize = requiredSchema.map(_.dataType).map(getTypeLength(_)) + .reduceOption(_ + _).getOrElse(StringType.defaultSize) +val totalColumnSize = relation.dataSchema.map(_.dataType).map(getTypeLength(_)) + .reduceOption(_ + _).getOrElse(StringType.defaultSize) +val multiplier = totalColumnSize / selectedColumnSize --- End diff -- @viirya As defined in getTypeLength, user can define the complex types' length as per the data statistics. And the length for AtomicType can be determined by AtomicType.defaultSize. So the multiplier is the ratio of the total length of the selected columns to the total length of all columns. def getTypeLength (dataType : DataType) : Int = { if (dataType.isInstanceOf[StructType]) { fsRelation.sparkSession.sessionState.conf.parquetStructTypeLength } else if (dataType.isInstanceOf[ArrayType]) { fsRelation.sparkSession.sessionState.conf.parquetArrayTypeLength } else if (dataType.isInstanceOf[MapType]) { fsRelation.sparkSession.sessionState.conf.parquetMapTypeLength } else { dataType.defaultSize } } --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...
Github user habren commented on a diff in the pull request: https://github.com/apache/spark/pull/21868#discussion_r205288000 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -401,12 +399,41 @@ case class FileSourceScanExec( fsRelation: HadoopFsRelation): RDD[InternalRow] = { val defaultMaxSplitBytes = fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes -val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes +var openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes val defaultParallelism = fsRelation.sparkSession.sparkContext.defaultParallelism val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum val bytesPerCore = totalBytes / defaultParallelism -val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) +var maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) +if(fsRelation.fileFormat.isInstanceOf[ParquetSource] && + fsRelation.sparkSession.sessionState.conf.isParquetSizeAdaptiveEnabled) { + if (relation.dataSchema.map(_.dataType).forall(dataType => +dataType.isInstanceOf[CalendarIntervalType] || dataType.isInstanceOf[StructType] + || dataType.isInstanceOf[MapType] || dataType.isInstanceOf[NullType] + || dataType.isInstanceOf[AtomicType] || dataType.isInstanceOf[ArrayType])) { + +def getTypeLength (dataType : DataType) : Int = { + if (dataType.isInstanceOf[StructType]) { + fsRelation.sparkSession.sessionState.conf.parquetStructTypeLength + } else if (dataType.isInstanceOf[ArrayType]) { + fsRelation.sparkSession.sessionState.conf.parquetArrayTypeLength + } else if (dataType.isInstanceOf[MapType]) { +fsRelation.sparkSession.sessionState.conf.parquetMapTypeLength + } else { +dataType.defaultSize + } +} + +val selectedColumnSize = requiredSchema.map(_.dataType).map(getTypeLength(_)) + .reduceOption(_ + _).getOrElse(StringType.defaultSize) +val totalColumnSize = relation.dataSchema.map(_.dataType).map(getTypeLength(_)) + .reduceOption(_ + _).getOrElse(StringType.defaultSize) +val multiplier = totalColumnSize / selectedColumnSize --- End diff -- There are many data types. CalendarIntervalType StructType MapType NullType UserDefinedType AtomicType(TimestampType StringType HiveStringType BooleanType DateType BinaryType NumericType) ObjectType ArrayType. For AtomicType, the size is fixed to the defaultSize. For complex type, such as StructType, MapType, ArrayType, the size is mutable. So I make it configurable with default value. With the data type size, multiplier is not only the ratio of selected columns to total columns, but the total size of selected columns to total size of all columns. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...
Github user habren commented on a diff in the pull request: https://github.com/apache/spark/pull/21868#discussion_r205287123 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -381,6 +381,26 @@ object SQLConf { .booleanConf .createWithDefault(true) + val IS_PARQUET_PARTITION_ADAPTIVE_ENABLED = buildConf("spark.sql.parquet.adaptiveFileSplit") +.doc("For columnar file format (e.g., Parquet), it's possible that only few (not all) " + + "columns are needed. So, it's better to make sure that the total size of the selected " + + "columns is about 128 MB " +) +.booleanConf +.createWithDefault(false) + + val PARQUET_STRUCT_LENGTH = buildConf("spark.sql.parquet.struct.length") +.intConf +.createWithDefault(StructType.defaultConcreteType.defaultSize) + + val PARQUET_MAP_LENGTH = buildConf("spark.sql.parquet.map.length") +.intConf +.createWithDefault(MapType.defaultConcreteType.defaultSize) + + val PARQUET_ARRAY_LENGTH = buildConf("spark.sql.parquet.array.length") +.intConf +.createWithDefault(ArrayType.defaultConcreteType.defaultSize) --- End diff -- Thanks for your comments. I set the default value to StringType.defaultSize (8). It's default size, use should configure it according to the real data --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21868#discussion_r205278858 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -401,12 +399,41 @@ case class FileSourceScanExec( fsRelation: HadoopFsRelation): RDD[InternalRow] = { val defaultMaxSplitBytes = fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes -val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes +var openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes val defaultParallelism = fsRelation.sparkSession.sparkContext.defaultParallelism val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum val bytesPerCore = totalBytes / defaultParallelism -val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) +var maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) +if(fsRelation.fileFormat.isInstanceOf[ParquetSource] && + fsRelation.sparkSession.sessionState.conf.isParquetSizeAdaptiveEnabled) { + if (relation.dataSchema.map(_.dataType).forall(dataType => +dataType.isInstanceOf[CalendarIntervalType] || dataType.isInstanceOf[StructType] + || dataType.isInstanceOf[MapType] || dataType.isInstanceOf[NullType] + || dataType.isInstanceOf[AtomicType] || dataType.isInstanceOf[ArrayType])) { + +def getTypeLength (dataType : DataType) : Int = { + if (dataType.isInstanceOf[StructType]) { + fsRelation.sparkSession.sessionState.conf.parquetStructTypeLength + } else if (dataType.isInstanceOf[ArrayType]) { + fsRelation.sparkSession.sessionState.conf.parquetArrayTypeLength + } else if (dataType.isInstanceOf[MapType]) { +fsRelation.sparkSession.sessionState.conf.parquetMapTypeLength + } else { +dataType.defaultSize + } +} + +val selectedColumnSize = requiredSchema.map(_.dataType).map(getTypeLength(_)) + .reduceOption(_ + _).getOrElse(StringType.defaultSize) +val totalColumnSize = relation.dataSchema.map(_.dataType).map(getTypeLength(_)) + .reduceOption(_ + _).getOrElse(StringType.defaultSize) +val multiplier = totalColumnSize / selectedColumnSize --- End diff -- Seems here you can only get the ratio of selected columns to total columns. The actual type sizes are not put into consideration. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21868#discussion_r205278202 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -381,6 +381,26 @@ object SQLConf { .booleanConf .createWithDefault(true) + val IS_PARQUET_PARTITION_ADAPTIVE_ENABLED = buildConf("spark.sql.parquet.adaptiveFileSplit") +.doc("For columnar file format (e.g., Parquet), it's possible that only few (not all) " + + "columns are needed. So, it's better to make sure that the total size of the selected " + + "columns is about 128 MB " +) +.booleanConf +.createWithDefault(false) + + val PARQUET_STRUCT_LENGTH = buildConf("spark.sql.parquet.struct.length") +.intConf +.createWithDefault(StructType.defaultConcreteType.defaultSize) + + val PARQUET_MAP_LENGTH = buildConf("spark.sql.parquet.map.length") +.intConf +.createWithDefault(MapType.defaultConcreteType.defaultSize) + + val PARQUET_ARRAY_LENGTH = buildConf("spark.sql.parquet.array.length") +.intConf +.createWithDefault(ArrayType.defaultConcreteType.defaultSize) --- End diff -- `ArrayType.defaultConcreteType` is `ArrayType(NullType, containsNull = true)`. I think using this you won't get a reasonable number. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...
GitHub user habren opened a pull request: https://github.com/apache/spark/pull/21868 [SPARK-24906][SQL] Adaptively enlarge split / partition size for Parq⦠Please refer to https://issues.apache.org/jira/browse/SPARK-24906 for more detail and test For columnar file, such as, when spark sql read the table, each split will be 128 MB by default since spark.sql.files.maxPartitionBytes is default to 128MB. Even when user set it to a large value, such as 512MB, the task may read only few MB or even hundreds of KB. Because the table (Parquet) may consists of dozens of columns while the SQL only need few columns. And spark will prune the unnecessary columns. In this case, spark DataSourceScanExec can enlarge maxPartitionBytes adaptively. For example, there is 40 columns , 20 are integer while another 20 are long. When use query on an integer type column and an long type column, the maxPartitionBytes should be 20 times larger. (20*4+20*8) / (4+8) = 20. With this optimization, the number of task will be smaller and the job will run faster. More importantly, for a very large cluster (more the 10 thousand nodes), it will relieve RM's schedule pressure. Here is the test The table named test2 has more than 40 columns and there are more than 5 TB data each hour. When we issue a very simple query ` select count(device_id) from test2 where date=20180708 and hour='23'` There are 72176 tasks and the duration of the job is 4.8 minutes Most tasks last less than 1 second and read less than 1.5 MB data After the optimization, there are only 1615 tasks and the job last only 30 seconds. It almost 10 times faster. The median of read data is 44.2MB. https://issues.apache.org/jira/browse/SPARK-24906 You can merge this pull request into a Git repository by running: $ git pull https://github.com/habren/spark SPARK-24906 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21868.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21868 commit 9ff34525e346e6e1cbe4b12fc6f972a163fd920e Author: éä¿ Date: 2018-07-25T02:07:38Z [SPARK-24906][SQL] Adaptively enlarge split / partition size for Parquet scan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org