spark git commit: [SPARK-10143] [SQL] Use parquet's block size (row group size) setting as the min split size if necessary.

2015-08-21 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/master f5b028ed2 -> e3355090d


[SPARK-10143] [SQL] Use parquet's block size (row group size) setting as the 
min split size if necessary.

https://issues.apache.org/jira/browse/SPARK-10143

With this PR, we will set min split size to parquet's block size (row group 
size) set in the conf if the min split size is smaller. So, we can avoid have 
too many tasks and even useless tasks for reading parquet data.

I tested it locally. The table I have has 343MB and it is in my local FS. 
Because I did not set any min/max split size, the default split size was 32MB 
and the map stage had 11 tasks. But there were only three tasks that actually 
read data. With my PR, there were only three tasks in the map stage. Here is 
the difference.

Without this PR:
![image](https://cloud.githubusercontent.com/assets/2072857/9399179/8587dba6-4765-11e5-9189-7ebba52a2b6d.png)

With this PR:
![image](https://cloud.githubusercontent.com/assets/2072857/9399185/a4735d74-4765-11e5-8848-1f1e361a6b4b.png)

Even if the block size setting does match the actual block size of parquet 
file, I think it is still generally good to use parquet's block size setting if 
min split size is smaller than this block size.

Tested it on a cluster using
```
val count = 
sqlContext.table("""store_sales""").groupBy().count().queryExecution.executedPlan(3).execute().count
```
Basically, it reads 0 column of table `store_sales`. My table has 1824 parquet 
files with size from 80MB to 280MB (1 to 3 row group sizes). Without this 
patch, in a 16 worker cluster, the job had 5023 tasks and spent 102s. With this 
patch, the job had 2893 tasks and spent 64s. It is still not as good as using 
one mapper per file (1824 tasks and 42s), but it is much better than our master.

Author: Yin Huai 

Closes #8346 from yhuai/parquetMinSplit.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e3355090
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e3355090
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e3355090

Branch: refs/heads/master
Commit: e3355090d4030daffed5efb0959bf1d724c13c13
Parents: f5b028e
Author: Yin Huai 
Authored: Fri Aug 21 14:30:00 2015 -0700
Committer: Yin Huai 
Committed: Fri Aug 21 14:30:00 2015 -0700

--
 .../datasources/parquet/ParquetRelation.scala   | 41 +++-
 1 file changed, 39 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e3355090/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index 68169d4..bbf682a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -26,6 +26,7 @@ import scala.collection.mutable
 import scala.util.{Failure, Try}
 
 import com.google.common.base.Objects
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.io.Writable
 import org.apache.hadoop.mapreduce._
@@ -281,12 +282,18 @@ private[sql] class ParquetRelation(
 val assumeInt96IsTimestamp = sqlContext.conf.isParquetINT96AsTimestamp
 val followParquetFormatSpec = sqlContext.conf.followParquetFormatSpec
 
+// Parquet row group size. We will use this value as the value for
+// mapreduce.input.fileinputformat.split.minsize and mapred.min.split.size 
if the value
+// of these flags are smaller than the parquet row group size.
+val parquetBlockSize = 
ParquetOutputFormat.getLongBlockSize(broadcastedConf.value.value)
+
 // Create the function to set variable Parquet confs at both driver and 
executor side.
 val initLocalJobFuncOpt =
   ParquetRelation.initializeLocalJobFunc(
 requiredColumns,
 filters,
 dataSchema,
+parquetBlockSize,
 useMetadataCache,
 parquetFilterPushDown,
 assumeBinaryIsString,
@@ -294,7 +301,8 @@ private[sql] class ParquetRelation(
 followParquetFormatSpec) _
 
 // Create the function to set input paths at the driver side.
-val setInputPaths = 
ParquetRelation.initializeDriverSideJobFunc(inputFiles) _
+val setInputPaths =
+  ParquetRelation.initializeDriverSideJobFunc(inputFiles, 
parquetBlockSize) _
 
 Utils.withDummyCallSite(sqlContext.sparkContext) {
   new SqlNewHadoopRDD(
@@ -482,11 +490,35 @@ private[sql] object ParquetRelation extends Logging {
   // internally

spark git commit: [SPARK-10143] [SQL] Use parquet's block size (row group size) setting as the min split size if necessary.

2015-08-21 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 e7db8761b -> 14c8c0c0d


[SPARK-10143] [SQL] Use parquet's block size (row group size) setting as the 
min split size if necessary.

https://issues.apache.org/jira/browse/SPARK-10143

With this PR, we will set min split size to parquet's block size (row group 
size) set in the conf if the min split size is smaller. So, we can avoid have 
too many tasks and even useless tasks for reading parquet data.

I tested it locally. The table I have has 343MB and it is in my local FS. 
Because I did not set any min/max split size, the default split size was 32MB 
and the map stage had 11 tasks. But there were only three tasks that actually 
read data. With my PR, there were only three tasks in the map stage. Here is 
the difference.

Without this PR:
![image](https://cloud.githubusercontent.com/assets/2072857/9399179/8587dba6-4765-11e5-9189-7ebba52a2b6d.png)

With this PR:
![image](https://cloud.githubusercontent.com/assets/2072857/9399185/a4735d74-4765-11e5-8848-1f1e361a6b4b.png)

Even if the block size setting does match the actual block size of parquet 
file, I think it is still generally good to use parquet's block size setting if 
min split size is smaller than this block size.

Tested it on a cluster using
```
val count = 
sqlContext.table("""store_sales""").groupBy().count().queryExecution.executedPlan(3).execute().count
```
Basically, it reads 0 column of table `store_sales`. My table has 1824 parquet 
files with size from 80MB to 280MB (1 to 3 row group sizes). Without this 
patch, in a 16 worker cluster, the job had 5023 tasks and spent 102s. With this 
patch, the job had 2893 tasks and spent 64s. It is still not as good as using 
one mapper per file (1824 tasks and 42s), but it is much better than our master.

Author: Yin Huai 

Closes #8346 from yhuai/parquetMinSplit.

(cherry picked from commit e3355090d4030daffed5efb0959bf1d724c13c13)
Signed-off-by: Yin Huai 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/14c8c0c0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/14c8c0c0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/14c8c0c0

Branch: refs/heads/branch-1.5
Commit: 14c8c0c0da1184c587f0d5ab60f1d56feaa588e4
Parents: e7db876
Author: Yin Huai 
Authored: Fri Aug 21 14:30:00 2015 -0700
Committer: Yin Huai 
Committed: Fri Aug 21 14:30:12 2015 -0700

--
 .../datasources/parquet/ParquetRelation.scala   | 41 +++-
 1 file changed, 39 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/14c8c0c0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index 68169d4..bbf682a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -26,6 +26,7 @@ import scala.collection.mutable
 import scala.util.{Failure, Try}
 
 import com.google.common.base.Objects
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.io.Writable
 import org.apache.hadoop.mapreduce._
@@ -281,12 +282,18 @@ private[sql] class ParquetRelation(
 val assumeInt96IsTimestamp = sqlContext.conf.isParquetINT96AsTimestamp
 val followParquetFormatSpec = sqlContext.conf.followParquetFormatSpec
 
+// Parquet row group size. We will use this value as the value for
+// mapreduce.input.fileinputformat.split.minsize and mapred.min.split.size 
if the value
+// of these flags are smaller than the parquet row group size.
+val parquetBlockSize = 
ParquetOutputFormat.getLongBlockSize(broadcastedConf.value.value)
+
 // Create the function to set variable Parquet confs at both driver and 
executor side.
 val initLocalJobFuncOpt =
   ParquetRelation.initializeLocalJobFunc(
 requiredColumns,
 filters,
 dataSchema,
+parquetBlockSize,
 useMetadataCache,
 parquetFilterPushDown,
 assumeBinaryIsString,
@@ -294,7 +301,8 @@ private[sql] class ParquetRelation(
 followParquetFormatSpec) _
 
 // Create the function to set input paths at the driver side.
-val setInputPaths = 
ParquetRelation.initializeDriverSideJobFunc(inputFiles) _
+val setInputPaths =
+  ParquetRelation.initializeDriverSideJobFunc(inputFiles, 
parquetBlockSize) _
 
 Utils.withDummyCallSite(sqlContext.sparkContext) {
   new SqlNew