[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...
Github user habren commented on a diff in the pull request: https://github.com/apache/spark/pull/21868#discussion_r210890027 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -459,6 +460,29 @@ object SQLConf { .intConf .createWithDefault(4096) + val IS_PARQUET_PARTITION_ADAPTIVE_ENABLED = buildConf("spark.sql.parquet.adaptiveFileSplit") +.doc("For columnar file format (e.g., Parquet), it's possible that only few (not all) " + + "columns are needed. So, it's better to make sure that the total size of the selected " + + "columns is about 128 MB " +) +.booleanConf +.createWithDefault(false) + + val PARQUET_STRUCT_LENGTH = buildConf("spark.sql.parquet.struct.length") +.doc("Set the default size of struct column") +.intConf +.createWithDefault(StringType.defaultSize) + + val PARQUET_MAP_LENGTH = buildConf("spark.sql.parquet.map.length") --- End diff -- @HyukjinKwon @viirya Setting spark.sql.files.maxPartitionBytes explicitly do works. For you or other advanced users, it's convenient to set a bigger number of maxPartitionBytes. But for ad-hoc query, the selected columns are different for different queries, and it's not convenient or event impossible for users to set different maxPartitionBytes for different queries. And for general user (non advanced user), it's not easy for them to calculate a proper value of maxPartitionBytes. You know, in many big company, there may be one or few teams are familiar with the details of Spark, and they maintain the Spark cluster. Other teams are general users of Spark and they care more about their business, such as data warehouse build up and recommendation algorithm. This feature try to handle it dynamically even the users are not familiar with Spark. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21868: [SPARK-24906][SQL] Adaptively enlarge split / partition ...
Github user habren commented on the issue: https://github.com/apache/spark/pull/21868 @HyukjinKwon Yes this is to handle it dynamically. For ad-hoc query, the selected columns are different for different queries, and it's not convenient or event impossible for users to set different maxPartitionBytes for different queries. And for general user (non advanced user), it's not easy for them to set a proper value of maxPartitionBytes. So, this change make it easier > BTW, just for clarification, you can set the bigger number to spark.sql.files.maxPartitionBytes explicitly and that resolved your issue. This one is to handle it dynamically, right? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...
Github user habren commented on a diff in the pull request: https://github.com/apache/spark/pull/21868#discussion_r210887543 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -459,6 +460,29 @@ object SQLConf { .intConf .createWithDefault(4096) + val IS_PARQUET_PARTITION_ADAPTIVE_ENABLED = buildConf("spark.sql.parquet.adaptiveFileSplit") +.doc("For columnar file format (e.g., Parquet), it's possible that only few (not all) " + + "columns are needed. So, it's better to make sure that the total size of the selected " + + "columns is about 128 MB " --- End diff -- I updated the description just now. Please help to review it again. Thanks a lot @HyukjinKwon --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...
Github user habren commented on a diff in the pull request: https://github.com/apache/spark/pull/21868#discussion_r210887308 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -459,6 +460,29 @@ object SQLConf { .intConf .createWithDefault(4096) + val IS_PARQUET_PARTITION_ADAPTIVE_ENABLED = buildConf("spark.sql.parquet.adaptiveFileSplit") +.doc("For columnar file format (e.g., Parquet), it's possible that only few (not all) " + --- End diff -- I updated it accordingly --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...
Github user habren commented on a diff in the pull request: https://github.com/apache/spark/pull/21868#discussion_r210886442 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -25,17 +25,16 @@ import java.util.zip.Deflater import scala.collection.JavaConverters._ import scala.collection.immutable import scala.util.matching.Regex - --- End diff -- @viirya Ok, I added it back --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...
Github user habren commented on a diff in the pull request: https://github.com/apache/spark/pull/21868#discussion_r210876154 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -459,6 +460,29 @@ object SQLConf { .intConf .createWithDefault(4096) + val IS_PARQUET_PARTITION_ADAPTIVE_ENABLED = buildConf("spark.sql.parquet.adaptiveFileSplit") --- End diff -- @HyukjinKwon Yes, setting spark.sql.files.maxPartitionBytes explicitly do works. But for ad-hoc query, the selected columns are different for different queries, and it's not convenient or event impossible for users to set different maxPartitionBytes for different queries. And for general user (non advanced user), it's not easy for them to calculate a proper value of maxPartitionBytes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...
Github user habren commented on a diff in the pull request: https://github.com/apache/spark/pull/21868#discussion_r210871055 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -425,12 +426,44 @@ case class FileSourceScanExec( fsRelation: HadoopFsRelation): RDD[InternalRow] = { val defaultMaxSplitBytes = fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes -val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes +var openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes val defaultParallelism = fsRelation.sparkSession.sparkContext.defaultParallelism val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum val bytesPerCore = totalBytes / defaultParallelism -val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) +var maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) + + if(fsRelation.sparkSession.sessionState.conf.isParquetSizeAdaptiveEnabled && + (fsRelation.fileFormat.isInstanceOf[ParquetSource] || +fsRelation.fileFormat.isInstanceOf[OrcFileFormat])) { + if (relation.dataSchema.map(_.dataType).forall(dataType => +dataType.isInstanceOf[CalendarIntervalType] || dataType.isInstanceOf[StructType] + || dataType.isInstanceOf[MapType] || dataType.isInstanceOf[NullType] + || dataType.isInstanceOf[AtomicType] || dataType.isInstanceOf[ArrayType])) { + +def getTypeLength(dataType: DataType): Int = { + if (dataType.isInstanceOf[StructType]) { + fsRelation.sparkSession.sessionState.conf.parquetStructTypeLength + } else if (dataType.isInstanceOf[ArrayType]) { + fsRelation.sparkSession.sessionState.conf.parquetArrayTypeLength + } else if (dataType.isInstanceOf[MapType]) { +fsRelation.sparkSession.sessionState.conf.parquetMapTypeLength + } else { +dataType.defaultSize + } +} + +val selectedColumnSize = requiredSchema.map(_.dataType).map(getTypeLength(_)) + .reduceOption(_ + _).getOrElse(StringType.defaultSize) +val totalColumnSize = relation.dataSchema.map(_.dataType).map(getTypeLength(_)) + .reduceOption(_ + _).getOrElse(StringType.defaultSize) --- End diff -- @HyukjinKwon I agree that the estimation is rough especially for complex type. For AtomicType, it works better. And at least it take column pruning into consideration. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...
Github user habren commented on a diff in the pull request: https://github.com/apache/spark/pull/21868#discussion_r210793717 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -425,12 +426,44 @@ case class FileSourceScanExec( fsRelation: HadoopFsRelation): RDD[InternalRow] = { val defaultMaxSplitBytes = fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes -val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes +var openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes val defaultParallelism = fsRelation.sparkSession.sparkContext.defaultParallelism val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum val bytesPerCore = totalBytes / defaultParallelism -val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) +var maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) + + if(fsRelation.sparkSession.sessionState.conf.isParquetSizeAdaptiveEnabled && + (fsRelation.fileFormat.isInstanceOf[ParquetSource] || +fsRelation.fileFormat.isInstanceOf[OrcFileFormat])) { + if (relation.dataSchema.map(_.dataType).forall(dataType => +dataType.isInstanceOf[CalendarIntervalType] || dataType.isInstanceOf[StructType] + || dataType.isInstanceOf[MapType] || dataType.isInstanceOf[NullType] + || dataType.isInstanceOf[AtomicType] || dataType.isInstanceOf[ArrayType])) { + +def getTypeLength(dataType: DataType): Int = { + if (dataType.isInstanceOf[StructType]) { + fsRelation.sparkSession.sessionState.conf.parquetStructTypeLength + } else if (dataType.isInstanceOf[ArrayType]) { + fsRelation.sparkSession.sessionState.conf.parquetArrayTypeLength + } else if (dataType.isInstanceOf[MapType]) { +fsRelation.sparkSession.sessionState.conf.parquetMapTypeLength + } else { +dataType.defaultSize + } +} + +val selectedColumnSize = requiredSchema.map(_.dataType).map(getTypeLength(_)) + .reduceOption(_ + _).getOrElse(StringType.defaultSize) +val totalColumnSize = relation.dataSchema.map(_.dataType).map(getTypeLength(_)) + .reduceOption(_ + _).getOrElse(StringType.defaultSize) --- End diff -- @gatorsmile The target of this change is not making users easy to set the partition size. Instead, when user set the partition size, this change will try its best to make sure the read size is close to the value that set by user. Without this change, when user set partition size to 128MB, the actual read size may be 1MB or even smaller because of column pruning. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21868: [SPARK-24906][SQL] Adaptively enlarge split / partition ...
Github user habren commented on the issue: https://github.com/apache/spark/pull/21868 Hi @HyukjinKwon I moved the change to master branch just now. Please help to review --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...
Github user habren commented on a diff in the pull request: https://github.com/apache/spark/pull/21868#discussion_r210456342 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -401,12 +399,41 @@ case class FileSourceScanExec( fsRelation: HadoopFsRelation): RDD[InternalRow] = { val defaultMaxSplitBytes = fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes -val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes +var openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes val defaultParallelism = fsRelation.sparkSession.sparkContext.defaultParallelism val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum val bytesPerCore = totalBytes / defaultParallelism -val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) +var maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) +if(fsRelation.fileFormat.isInstanceOf[ParquetSource] && + fsRelation.sparkSession.sessionState.conf.isParquetSizeAdaptiveEnabled) { + if (relation.dataSchema.map(_.dataType).forall(dataType => +dataType.isInstanceOf[CalendarIntervalType] || dataType.isInstanceOf[StructType] + || dataType.isInstanceOf[MapType] || dataType.isInstanceOf[NullType] + || dataType.isInstanceOf[AtomicType] || dataType.isInstanceOf[ArrayType])) { + +def getTypeLength (dataType : DataType) : Int = { + if (dataType.isInstanceOf[StructType]) { + fsRelation.sparkSession.sessionState.conf.parquetStructTypeLength + } else if (dataType.isInstanceOf[ArrayType]) { + fsRelation.sparkSession.sessionState.conf.parquetArrayTypeLength + } else if (dataType.isInstanceOf[MapType]) { +fsRelation.sparkSession.sessionState.conf.parquetMapTypeLength + } else { +dataType.defaultSize + } +} + +val selectedColumnSize = requiredSchema.map(_.dataType).map(getTypeLength(_)) + .reduceOption(_ + _).getOrElse(StringType.defaultSize) +val totalColumnSize = relation.dataSchema.map(_.dataType).map(getTypeLength(_)) + .reduceOption(_ + _).getOrElse(StringType.defaultSize) +val multiplier = totalColumnSize / selectedColumnSize --- End diff -- @viirya Now it also support ORC. Please help to review --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21868: [SPARK-24906][SQL] Adaptively enlarge split / partition ...
Github user habren commented on the issue: https://github.com/apache/spark/pull/21868 @HyukjinKwon Thanks for your comments. I will submit it to master soon --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21868: [SPARK-24906][SQL] Adaptively enlarge split / partition ...
Github user habren commented on the issue: https://github.com/apache/spark/pull/21868 @maropu Thanks for your comments. ORC can also benefit from this change since ORC is also columnar file format. Do you think I should add ORC support by change the below line ` if(fsRelation.fileFormat.isInstanceOf[ParquetSource]` to `if(fsRelation.fileFormat.isInstanceOf[ParquetSource] || if(fsRelation.fileFormat.isInstanceOf[OrcFileFormat]` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22018: [SPARK-25038][SQL] Get block location in parallel
Github user habren commented on a diff in the pull request: https://github.com/apache/spark/pull/22018#discussion_r208788059 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala --- @@ -297,7 +297,7 @@ object InMemoryFileIndex extends Logging { val missingFiles = mutable.ArrayBuffer.empty[String] val filteredLeafStatuses = allLeafStatuses.filterNot( status => shouldFilterOut(status.getPath.getName)) -val resolvedLeafStatuses = filteredLeafStatuses.flatMap { +val resolvedLeafStatuses = filteredLeafStatuses.par.flatMap { --- End diff -- Thanks @maropu for your comments. I updated the title and description. Let's explain the difference between this change and the current parallel partition discovery. The current one will discovery different partitions in parallel. This change will get the block location for a single partition in parallel. When there is only a few partitions and each contains tons of thousands of files, the current partition discovery won't help. And this change can accelerate it in this case --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22018: [SPARK-25038][SQL] Get block location in parallel
Github user habren commented on a diff in the pull request: https://github.com/apache/spark/pull/22018#discussion_r208787523 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala --- @@ -297,7 +297,7 @@ object InMemoryFileIndex extends Logging { val missingFiles = mutable.ArrayBuffer.empty[String] val filteredLeafStatuses = allLeafStatuses.filterNot( status => shouldFilterOut(status.getPath.getName)) -val resolvedLeafStatuses = filteredLeafStatuses.flatMap { +val resolvedLeafStatuses = filteredLeafStatuses.par.flatMap { case f: LocatedFileStatus => --- End diff -- Thanks. The comment was updated --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22018: [SPARK-25038][SQL] Accelerate Spark Plan generati...
Github user habren commented on a diff in the pull request: https://github.com/apache/spark/pull/22018#discussion_r208784609 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala --- @@ -297,7 +297,7 @@ object InMemoryFileIndex extends Logging { val missingFiles = mutable.ArrayBuffer.empty[String] val filteredLeafStatuses = allLeafStatuses.filterNot( status => shouldFilterOut(status.getPath.getName)) -val resolvedLeafStatuses = filteredLeafStatuses.flatMap { +val resolvedLeafStatuses = filteredLeafStatuses.par.flatMap { --- End diff -- Thanks @viirya for feedback. Yes, this method can be called on executors as below. Do you think it's not thread-safe ? Each partitions will have its own hadoopConf and then own fs, and nothing is shared in this method. sparkContext .parallelize(serializedPaths, numParallelism) .mapPartitions { pathStrings => val hadoopConf = serializableConfiguration.value pathStrings.map(new Path(_)).toSeq.map { path => (path, listLeafFiles(path, hadoopConf, filter, None)) }.iterator }.map { case (path, statuses) => --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22018: [SPARK-25038][SQL] Accelerate Spark Plan generation when...
Github user habren commented on the issue: https://github.com/apache/spark/pull/22018 Hi Takeshi Yamamuro Hyukjin Kwonâ and @viirya Can you take a look at this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21868: [SPARK-24906][SQL] Adaptively enlarge split / partition ...
Github user habren commented on the issue: https://github.com/apache/spark/pull/21868 Hi @maropu and @viirya Do you agree with the basic idea that we should take column pruning in to consideration during splitting the input files? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22018: [SPARK-25038][SQL] Accelerate Spark Plan generati...
GitHub user habren opened a pull request: https://github.com/apache/spark/pull/22018 [SPARK-25038][SQL] Accelerate Spark Plan generation when Spark SQL re⦠https://issues.apache.org/jira/browse/SPARK-25038 When Spark SQL read large amount of data, it take a long time (more than 10 minutes) to generate physical Plan and then ActiveJob Example: There is a table which is partitioned by date and hour. There are more than 13 TB data each hour and 185 TB per day. When we just issue a very simple SQL, it take a long time to generate ActiveJob The SQL statement is select count(device_id) from test_tbl where date=20180731 and hour='21'; Before optimization, it takes 2 minutes and 9 seconds to generate the Job The SQL is issued at 2018-08-07 09:07:41 However, the job is submitted at 2018-08-07 09:09:53, which is 2minutes and 9 seconds later than the SQL issue time After the optimization, it takes only 4 seconds to generate the Job The SQL is issued at 2018-08-07 09:20:15 And the job is submitted at 2018-08-07 09:20:19, which is 4 seconds later than the SQL issue time You can merge this pull request into a Git repository by running: $ git pull https://github.com/habren/spark SPARK-25038 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22018.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22018 commit 2bb5924e04eba5accfe58a4fbae094d46cc36488 Author: Jason Guo Date: 2018-08-07T03:13:03Z [SPARK-25038][SQL] Accelerate Spark Plan generation when Spark SQL read large amount of data --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21868: [SPARK-24906][SQL] Adaptively enlarge split / partition ...
Github user habren commented on the issue: https://github.com/apache/spark/pull/21868 Hi @maropu and @viirya Do you agree with the basic idea that we should take column pruning in to consideration during splitting the input files? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...
GitHub user habren reopened a pull request: https://github.com/apache/spark/pull/21868 [SPARK-24906][SQL] Adaptively enlarge split / partition size for Parq⦠Please refer to https://issues.apache.org/jira/browse/SPARK-24906 for more detail and test For columnar file, such as, when spark sql read the table, each split will be 128 MB by default since spark.sql.files.maxPartitionBytes is default to 128MB. Even when user set it to a large value, such as 512MB, the task may read only few MB or even hundreds of KB. Because the table (Parquet) may consists of dozens of columns while the SQL only need few columns. And spark will prune the unnecessary columns. In this case, spark DataSourceScanExec can enlarge maxPartitionBytes adaptively. For example, there is 40 columns , 20 are integer while another 20 are long. When use query on an integer type column and an long type column, the maxPartitionBytes should be 20 times larger. (20*4+20*8) / (4+8) = 20. With this optimization, the number of task will be smaller and the job will run faster. More importantly, for a very large cluster (more the 10 thousand nodes), it will relieve RM's schedule pressure. Here is the test The table named test2 has more than 40 columns and there are more than 5 TB data each hour. When we issue a very simple query ` select count(device_id) from test2 where date=20180708 and hour='23'` There are 72176 tasks and the duration of the job is 4.8 minutes Most tasks last less than 1 second and read less than 1.5 MB data After the optimization, there are only 1615 tasks and the job last only 30 seconds. It almost 10 times faster. The median of read data is 44.2MB. https://issues.apache.org/jira/browse/SPARK-24906 You can merge this pull request into a Git repository by running: $ git pull https://github.com/habren/spark SPARK-24906 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21868.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21868 commit e34aaa2fc0c1ebf87028d834ea5e9a61bc026bc6 Author: Jason Guo Date: 2018-07-25T02:18:22Z [SPARK-24906][SQL] Adaptively enlarge split / partition size for Parquet scan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21868: [SPARK-24906][SQL] Adaptively enlarge split / partition ...
Github user habren commented on the issue: https://github.com/apache/spark/pull/21868 @maropu If I understand correct, your concern is about how to calculate --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...
Github user habren closed the pull request at: https://github.com/apache/spark/pull/21868 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...
Github user habren commented on a diff in the pull request: https://github.com/apache/spark/pull/21868#discussion_r205356861 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -401,12 +399,41 @@ case class FileSourceScanExec( fsRelation: HadoopFsRelation): RDD[InternalRow] = { val defaultMaxSplitBytes = fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes -val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes +var openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes val defaultParallelism = fsRelation.sparkSession.sparkContext.defaultParallelism val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum val bytesPerCore = totalBytes / defaultParallelism -val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) +var maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) +if(fsRelation.fileFormat.isInstanceOf[ParquetSource] && + fsRelation.sparkSession.sessionState.conf.isParquetSizeAdaptiveEnabled) { + if (relation.dataSchema.map(_.dataType).forall(dataType => +dataType.isInstanceOf[CalendarIntervalType] || dataType.isInstanceOf[StructType] + || dataType.isInstanceOf[MapType] || dataType.isInstanceOf[NullType] + || dataType.isInstanceOf[AtomicType] || dataType.isInstanceOf[ArrayType])) { + +def getTypeLength (dataType : DataType) : Int = { + if (dataType.isInstanceOf[StructType]) { + fsRelation.sparkSession.sessionState.conf.parquetStructTypeLength + } else if (dataType.isInstanceOf[ArrayType]) { + fsRelation.sparkSession.sessionState.conf.parquetArrayTypeLength + } else if (dataType.isInstanceOf[MapType]) { +fsRelation.sparkSession.sessionState.conf.parquetMapTypeLength + } else { +dataType.defaultSize + } +} + +val selectedColumnSize = requiredSchema.map(_.dataType).map(getTypeLength(_)) + .reduceOption(_ + _).getOrElse(StringType.defaultSize) +val totalColumnSize = relation.dataSchema.map(_.dataType).map(getTypeLength(_)) + .reduceOption(_ + _).getOrElse(StringType.defaultSize) +val multiplier = totalColumnSize / selectedColumnSize --- End diff -- @viirya As defined in getTypeLength, user can define the complex types' length as per the data statistics. And the length for AtomicType can be determined by AtomicType.defaultSize. So the multiplier is the ratio of the total length of the selected columns to the total length of all columns. def getTypeLength (dataType : DataType) : Int = { if (dataType.isInstanceOf[StructType]) { fsRelation.sparkSession.sessionState.conf.parquetStructTypeLength } else if (dataType.isInstanceOf[ArrayType]) { fsRelation.sparkSession.sessionState.conf.parquetArrayTypeLength } else if (dataType.isInstanceOf[MapType]) { fsRelation.sparkSession.sessionState.conf.parquetMapTypeLength } else { dataType.defaultSize } } --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...
Github user habren commented on a diff in the pull request: https://github.com/apache/spark/pull/21868#discussion_r205288000 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -401,12 +399,41 @@ case class FileSourceScanExec( fsRelation: HadoopFsRelation): RDD[InternalRow] = { val defaultMaxSplitBytes = fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes -val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes +var openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes val defaultParallelism = fsRelation.sparkSession.sparkContext.defaultParallelism val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum val bytesPerCore = totalBytes / defaultParallelism -val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) +var maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) +if(fsRelation.fileFormat.isInstanceOf[ParquetSource] && + fsRelation.sparkSession.sessionState.conf.isParquetSizeAdaptiveEnabled) { + if (relation.dataSchema.map(_.dataType).forall(dataType => +dataType.isInstanceOf[CalendarIntervalType] || dataType.isInstanceOf[StructType] + || dataType.isInstanceOf[MapType] || dataType.isInstanceOf[NullType] + || dataType.isInstanceOf[AtomicType] || dataType.isInstanceOf[ArrayType])) { + +def getTypeLength (dataType : DataType) : Int = { + if (dataType.isInstanceOf[StructType]) { + fsRelation.sparkSession.sessionState.conf.parquetStructTypeLength + } else if (dataType.isInstanceOf[ArrayType]) { + fsRelation.sparkSession.sessionState.conf.parquetArrayTypeLength + } else if (dataType.isInstanceOf[MapType]) { +fsRelation.sparkSession.sessionState.conf.parquetMapTypeLength + } else { +dataType.defaultSize + } +} + +val selectedColumnSize = requiredSchema.map(_.dataType).map(getTypeLength(_)) + .reduceOption(_ + _).getOrElse(StringType.defaultSize) +val totalColumnSize = relation.dataSchema.map(_.dataType).map(getTypeLength(_)) + .reduceOption(_ + _).getOrElse(StringType.defaultSize) +val multiplier = totalColumnSize / selectedColumnSize --- End diff -- There are many data types. CalendarIntervalType StructType MapType NullType UserDefinedType AtomicType(TimestampType StringType HiveStringType BooleanType DateType BinaryType NumericType) ObjectType ArrayType. For AtomicType, the size is fixed to the defaultSize. For complex type, such as StructType, MapType, ArrayType, the size is mutable. So I make it configurable with default value. With the data type size, multiplier is not only the ratio of selected columns to total columns, but the total size of selected columns to total size of all columns. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...
Github user habren commented on a diff in the pull request: https://github.com/apache/spark/pull/21868#discussion_r205287123 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -381,6 +381,26 @@ object SQLConf { .booleanConf .createWithDefault(true) + val IS_PARQUET_PARTITION_ADAPTIVE_ENABLED = buildConf("spark.sql.parquet.adaptiveFileSplit") +.doc("For columnar file format (e.g., Parquet), it's possible that only few (not all) " + + "columns are needed. So, it's better to make sure that the total size of the selected " + + "columns is about 128 MB " +) +.booleanConf +.createWithDefault(false) + + val PARQUET_STRUCT_LENGTH = buildConf("spark.sql.parquet.struct.length") +.intConf +.createWithDefault(StructType.defaultConcreteType.defaultSize) + + val PARQUET_MAP_LENGTH = buildConf("spark.sql.parquet.map.length") +.intConf +.createWithDefault(MapType.defaultConcreteType.defaultSize) + + val PARQUET_ARRAY_LENGTH = buildConf("spark.sql.parquet.array.length") +.intConf +.createWithDefault(ArrayType.defaultConcreteType.defaultSize) --- End diff -- Thanks for your comments. I set the default value to StringType.defaultSize (8). It's default size, use should configure it according to the real data --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...
GitHub user habren opened a pull request: https://github.com/apache/spark/pull/21868 [SPARK-24906][SQL] Adaptively enlarge split / partition size for Parq⦠Please refer to https://issues.apache.org/jira/browse/SPARK-24906 for more detail and test For columnar file, such as, when spark sql read the table, each split will be 128 MB by default since spark.sql.files.maxPartitionBytes is default to 128MB. Even when user set it to a large value, such as 512MB, the task may read only few MB or even hundreds of KB. Because the table (Parquet) may consists of dozens of columns while the SQL only need few columns. And spark will prune the unnecessary columns. In this case, spark DataSourceScanExec can enlarge maxPartitionBytes adaptively. For example, there is 40 columns , 20 are integer while another 20 are long. When use query on an integer type column and an long type column, the maxPartitionBytes should be 20 times larger. (20*4+20*8) / (4+8) = 20. With this optimization, the number of task will be smaller and the job will run faster. More importantly, for a very large cluster (more the 10 thousand nodes), it will relieve RM's schedule pressure. Here is the test The table named test2 has more than 40 columns and there are more than 5 TB data each hour. When we issue a very simple query ` select count(device_id) from test2 where date=20180708 and hour='23'` There are 72176 tasks and the duration of the job is 4.8 minutes Most tasks last less than 1 second and read less than 1.5 MB data After the optimization, there are only 1615 tasks and the job last only 30 seconds. It almost 10 times faster. The median of read data is 44.2MB. https://issues.apache.org/jira/browse/SPARK-24906 You can merge this pull request into a Git repository by running: $ git pull https://github.com/habren/spark SPARK-24906 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21868.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21868 commit 9ff34525e346e6e1cbe4b12fc6f972a163fd920e Author: éä¿ Date: 2018-07-25T02:07:38Z [SPARK-24906][SQL] Adaptively enlarge split / partition size for Parquet scan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org