spark git commit: [SPARK-10143] [SQL] Use parquet's block size (row group size) setting as the min split size if necessary.
Repository: spark Updated Branches: refs/heads/master f5b028ed2 -> e3355090d [SPARK-10143] [SQL] Use parquet's block size (row group size) setting as the min split size if necessary. https://issues.apache.org/jira/browse/SPARK-10143 With this PR, we will set min split size to parquet's block size (row group size) set in the conf if the min split size is smaller. So, we can avoid have too many tasks and even useless tasks for reading parquet data. I tested it locally. The table I have has 343MB and it is in my local FS. Because I did not set any min/max split size, the default split size was 32MB and the map stage had 11 tasks. But there were only three tasks that actually read data. With my PR, there were only three tasks in the map stage. Here is the difference. Without this PR: ![image](https://cloud.githubusercontent.com/assets/2072857/9399179/8587dba6-4765-11e5-9189-7ebba52a2b6d.png) With this PR: ![image](https://cloud.githubusercontent.com/assets/2072857/9399185/a4735d74-4765-11e5-8848-1f1e361a6b4b.png) Even if the block size setting does match the actual block size of parquet file, I think it is still generally good to use parquet's block size setting if min split size is smaller than this block size. Tested it on a cluster using ``` val count = sqlContext.table("""store_sales""").groupBy().count().queryExecution.executedPlan(3).execute().count ``` Basically, it reads 0 column of table `store_sales`. My table has 1824 parquet files with size from 80MB to 280MB (1 to 3 row group sizes). Without this patch, in a 16 worker cluster, the job had 5023 tasks and spent 102s. With this patch, the job had 2893 tasks and spent 64s. It is still not as good as using one mapper per file (1824 tasks and 42s), but it is much better than our master. Author: Yin Huai Closes #8346 from yhuai/parquetMinSplit. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e3355090 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e3355090 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e3355090 Branch: refs/heads/master Commit: e3355090d4030daffed5efb0959bf1d724c13c13 Parents: f5b028e Author: Yin Huai Authored: Fri Aug 21 14:30:00 2015 -0700 Committer: Yin Huai Committed: Fri Aug 21 14:30:00 2015 -0700 -- .../datasources/parquet/ParquetRelation.scala | 41 +++- 1 file changed, 39 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e3355090/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala index 68169d4..bbf682a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala @@ -26,6 +26,7 @@ import scala.collection.mutable import scala.util.{Failure, Try} import com.google.common.base.Objects +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.io.Writable import org.apache.hadoop.mapreduce._ @@ -281,12 +282,18 @@ private[sql] class ParquetRelation( val assumeInt96IsTimestamp = sqlContext.conf.isParquetINT96AsTimestamp val followParquetFormatSpec = sqlContext.conf.followParquetFormatSpec +// Parquet row group size. We will use this value as the value for +// mapreduce.input.fileinputformat.split.minsize and mapred.min.split.size if the value +// of these flags are smaller than the parquet row group size. +val parquetBlockSize = ParquetOutputFormat.getLongBlockSize(broadcastedConf.value.value) + // Create the function to set variable Parquet confs at both driver and executor side. val initLocalJobFuncOpt = ParquetRelation.initializeLocalJobFunc( requiredColumns, filters, dataSchema, +parquetBlockSize, useMetadataCache, parquetFilterPushDown, assumeBinaryIsString, @@ -294,7 +301,8 @@ private[sql] class ParquetRelation( followParquetFormatSpec) _ // Create the function to set input paths at the driver side. -val setInputPaths = ParquetRelation.initializeDriverSideJobFunc(inputFiles) _ +val setInputPaths = + ParquetRelation.initializeDriverSideJobFunc(inputFiles, parquetBlockSize) _ Utils.withDummyCallSite(sqlContext.sparkContext) { new SqlNewHadoopRDD( @@ -482,11 +490,35 @@ private[sql] object ParquetRelation extends Logging { // internally
spark git commit: [SPARK-10143] [SQL] Use parquet's block size (row group size) setting as the min split size if necessary.
Repository: spark Updated Branches: refs/heads/branch-1.5 e7db8761b -> 14c8c0c0d [SPARK-10143] [SQL] Use parquet's block size (row group size) setting as the min split size if necessary. https://issues.apache.org/jira/browse/SPARK-10143 With this PR, we will set min split size to parquet's block size (row group size) set in the conf if the min split size is smaller. So, we can avoid have too many tasks and even useless tasks for reading parquet data. I tested it locally. The table I have has 343MB and it is in my local FS. Because I did not set any min/max split size, the default split size was 32MB and the map stage had 11 tasks. But there were only three tasks that actually read data. With my PR, there were only three tasks in the map stage. Here is the difference. Without this PR: ![image](https://cloud.githubusercontent.com/assets/2072857/9399179/8587dba6-4765-11e5-9189-7ebba52a2b6d.png) With this PR: ![image](https://cloud.githubusercontent.com/assets/2072857/9399185/a4735d74-4765-11e5-8848-1f1e361a6b4b.png) Even if the block size setting does match the actual block size of parquet file, I think it is still generally good to use parquet's block size setting if min split size is smaller than this block size. Tested it on a cluster using ``` val count = sqlContext.table("""store_sales""").groupBy().count().queryExecution.executedPlan(3).execute().count ``` Basically, it reads 0 column of table `store_sales`. My table has 1824 parquet files with size from 80MB to 280MB (1 to 3 row group sizes). Without this patch, in a 16 worker cluster, the job had 5023 tasks and spent 102s. With this patch, the job had 2893 tasks and spent 64s. It is still not as good as using one mapper per file (1824 tasks and 42s), but it is much better than our master. Author: Yin Huai Closes #8346 from yhuai/parquetMinSplit. (cherry picked from commit e3355090d4030daffed5efb0959bf1d724c13c13) Signed-off-by: Yin Huai Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/14c8c0c0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/14c8c0c0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/14c8c0c0 Branch: refs/heads/branch-1.5 Commit: 14c8c0c0da1184c587f0d5ab60f1d56feaa588e4 Parents: e7db876 Author: Yin Huai Authored: Fri Aug 21 14:30:00 2015 -0700 Committer: Yin Huai Committed: Fri Aug 21 14:30:12 2015 -0700 -- .../datasources/parquet/ParquetRelation.scala | 41 +++- 1 file changed, 39 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/14c8c0c0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala index 68169d4..bbf682a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala @@ -26,6 +26,7 @@ import scala.collection.mutable import scala.util.{Failure, Try} import com.google.common.base.Objects +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.io.Writable import org.apache.hadoop.mapreduce._ @@ -281,12 +282,18 @@ private[sql] class ParquetRelation( val assumeInt96IsTimestamp = sqlContext.conf.isParquetINT96AsTimestamp val followParquetFormatSpec = sqlContext.conf.followParquetFormatSpec +// Parquet row group size. We will use this value as the value for +// mapreduce.input.fileinputformat.split.minsize and mapred.min.split.size if the value +// of these flags are smaller than the parquet row group size. +val parquetBlockSize = ParquetOutputFormat.getLongBlockSize(broadcastedConf.value.value) + // Create the function to set variable Parquet confs at both driver and executor side. val initLocalJobFuncOpt = ParquetRelation.initializeLocalJobFunc( requiredColumns, filters, dataSchema, +parquetBlockSize, useMetadataCache, parquetFilterPushDown, assumeBinaryIsString, @@ -294,7 +301,8 @@ private[sql] class ParquetRelation( followParquetFormatSpec) _ // Create the function to set input paths at the driver side. -val setInputPaths = ParquetRelation.initializeDriverSideJobFunc(inputFiles) _ +val setInputPaths = + ParquetRelation.initializeDriverSideJobFunc(inputFiles, parquetBlockSize) _ Utils.withDummyCallSite(sqlContext.sparkContext) { new SqlNew