Repository: spark
Updated Branches:
  refs/heads/master f5b028ed2 -> e3355090d


[SPARK-10143] [SQL] Use parquet's block size (row group size) setting as the 
min split size if necessary.

https://issues.apache.org/jira/browse/SPARK-10143

With this PR, we will set min split size to parquet's block size (row group 
size) set in the conf if the min split size is smaller. So, we can avoid have 
too many tasks and even useless tasks for reading parquet data.

I tested it locally. The table I have has 343MB and it is in my local FS. 
Because I did not set any min/max split size, the default split size was 32MB 
and the map stage had 11 tasks. But there were only three tasks that actually 
read data. With my PR, there were only three tasks in the map stage. Here is 
the difference.

Without this PR:
![image](https://cloud.githubusercontent.com/assets/2072857/9399179/8587dba6-4765-11e5-9189-7ebba52a2b6d.png)

With this PR:
![image](https://cloud.githubusercontent.com/assets/2072857/9399185/a4735d74-4765-11e5-8848-1f1e361a6b4b.png)

Even if the block size setting does match the actual block size of parquet 
file, I think it is still generally good to use parquet's block size setting if 
min split size is smaller than this block size.

Tested it on a cluster using
```
val count = 
sqlContext.table("""store_sales""").groupBy().count().queryExecution.executedPlan(3).execute().count
```
Basically, it reads 0 column of table `store_sales`. My table has 1824 parquet 
files with size from 80MB to 280MB (1 to 3 row group sizes). Without this 
patch, in a 16 worker cluster, the job had 5023 tasks and spent 102s. With this 
patch, the job had 2893 tasks and spent 64s. It is still not as good as using 
one mapper per file (1824 tasks and 42s), but it is much better than our master.

Author: Yin Huai <yh...@databricks.com>

Closes #8346 from yhuai/parquetMinSplit.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e3355090
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e3355090
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e3355090

Branch: refs/heads/master
Commit: e3355090d4030daffed5efb0959bf1d724c13c13
Parents: f5b028e
Author: Yin Huai <yh...@databricks.com>
Authored: Fri Aug 21 14:30:00 2015 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Fri Aug 21 14:30:00 2015 -0700

----------------------------------------------------------------------
 .../datasources/parquet/ParquetRelation.scala   | 41 +++++++++++++++++++-
 1 file changed, 39 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e3355090/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index 68169d4..bbf682a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -26,6 +26,7 @@ import scala.collection.mutable
 import scala.util.{Failure, Try}
 
 import com.google.common.base.Objects
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.io.Writable
 import org.apache.hadoop.mapreduce._
@@ -281,12 +282,18 @@ private[sql] class ParquetRelation(
     val assumeInt96IsTimestamp = sqlContext.conf.isParquetINT96AsTimestamp
     val followParquetFormatSpec = sqlContext.conf.followParquetFormatSpec
 
+    // Parquet row group size. We will use this value as the value for
+    // mapreduce.input.fileinputformat.split.minsize and mapred.min.split.size 
if the value
+    // of these flags are smaller than the parquet row group size.
+    val parquetBlockSize = 
ParquetOutputFormat.getLongBlockSize(broadcastedConf.value.value)
+
     // Create the function to set variable Parquet confs at both driver and 
executor side.
     val initLocalJobFuncOpt =
       ParquetRelation.initializeLocalJobFunc(
         requiredColumns,
         filters,
         dataSchema,
+        parquetBlockSize,
         useMetadataCache,
         parquetFilterPushDown,
         assumeBinaryIsString,
@@ -294,7 +301,8 @@ private[sql] class ParquetRelation(
         followParquetFormatSpec) _
 
     // Create the function to set input paths at the driver side.
-    val setInputPaths = 
ParquetRelation.initializeDriverSideJobFunc(inputFiles) _
+    val setInputPaths =
+      ParquetRelation.initializeDriverSideJobFunc(inputFiles, 
parquetBlockSize) _
 
     Utils.withDummyCallSite(sqlContext.sparkContext) {
       new SqlNewHadoopRDD(
@@ -482,11 +490,35 @@ private[sql] object ParquetRelation extends Logging {
   // internally.
   private[sql] val METASTORE_SCHEMA = "metastoreSchema"
 
+  /**
+   * If parquet's block size (row group size) setting is larger than the min 
split size,
+   * we use parquet's block size setting as the min split size. Otherwise, we 
will create
+   * tasks processing nothing (because a split does not cover the starting 
point of a
+   * parquet block). See https://issues.apache.org/jira/browse/SPARK-10143 for 
more information.
+   */
+  private def overrideMinSplitSize(parquetBlockSize: Long, conf: 
Configuration): Unit = {
+    val minSplitSize =
+      math.max(
+        conf.getLong("mapred.min.split.size", 0L),
+        conf.getLong("mapreduce.input.fileinputformat.split.minsize", 0L))
+    if (parquetBlockSize > minSplitSize) {
+      val message =
+        s"Parquet's block size (row group size) is larger than " +
+          
s"mapred.min.split.size/mapreduce.input.fileinputformat.split.minsize. Setting 
" +
+          s"mapred.min.split.size and 
mapreduce.input.fileinputformat.split.minsize to " +
+          s"$parquetBlockSize."
+      logDebug(message)
+      conf.set("mapred.min.split.size", parquetBlockSize.toString)
+      conf.set("mapreduce.input.fileinputformat.split.minsize", 
parquetBlockSize.toString)
+    }
+  }
+
   /** This closure sets various Parquet configurations at both driver side and 
executor side. */
   private[parquet] def initializeLocalJobFunc(
       requiredColumns: Array[String],
       filters: Array[Filter],
       dataSchema: StructType,
+      parquetBlockSize: Long,
       useMetadataCache: Boolean,
       parquetFilterPushDown: Boolean,
       assumeBinaryIsString: Boolean,
@@ -522,16 +554,21 @@ private[sql] object ParquetRelation extends Logging {
     conf.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING.key, assumeBinaryIsString)
     conf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, 
assumeInt96IsTimestamp)
     conf.setBoolean(SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.key, 
followParquetFormatSpec)
+
+    overrideMinSplitSize(parquetBlockSize, conf)
   }
 
   /** This closure sets input paths at the driver side. */
   private[parquet] def initializeDriverSideJobFunc(
-      inputFiles: Array[FileStatus])(job: Job): Unit = {
+      inputFiles: Array[FileStatus],
+      parquetBlockSize: Long)(job: Job): Unit = {
     // We side the input paths at the driver side.
     logInfo(s"Reading Parquet file(s) from 
${inputFiles.map(_.getPath).mkString(", ")}")
     if (inputFiles.nonEmpty) {
       FileInputFormat.setInputPaths(job, inputFiles.map(_.getPath): _*)
     }
+
+    overrideMinSplitSize(parquetBlockSize, job.getConfiguration)
   }
 
   private[parquet] def readSchema(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to