[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...

2018-11-10 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/21868


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...

2018-08-17 Thread habren
Github user habren commented on a diff in the pull request:

https://github.com/apache/spark/pull/21868#discussion_r210890027
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -459,6 +460,29 @@ object SQLConf {
 .intConf
 .createWithDefault(4096)
 
+  val IS_PARQUET_PARTITION_ADAPTIVE_ENABLED = 
buildConf("spark.sql.parquet.adaptiveFileSplit")
+.doc("For columnar file format (e.g., Parquet), it's possible that 
only few (not all) " +
+  "columns are needed. So, it's better to make sure that the total 
size of the selected " +
+  "columns is about 128 MB "
+)
+.booleanConf
+.createWithDefault(false)
+
+  val PARQUET_STRUCT_LENGTH = buildConf("spark.sql.parquet.struct.length")
+.doc("Set the default size of struct column")
+.intConf
+.createWithDefault(StringType.defaultSize)
+
+  val PARQUET_MAP_LENGTH = buildConf("spark.sql.parquet.map.length")
--- End diff --

@HyukjinKwon  @viirya  Setting spark.sql.files.maxPartitionBytes explicitly 
do works. For you or other advanced users, it's convenient to set a bigger 
number of maxPartitionBytes.

But for ad-hoc query, the selected columns are different for different 
queries, and it's not convenient or event impossible for users to set different 
maxPartitionBytes for different queries. 

And for general user (non advanced user), it's not easy for them to 
calculate a proper value of maxPartitionBytes. 

You know, in many big company, there may be one or few teams are familiar 
with the details of  Spark, and they maintain the Spark cluster. Other teams 
are general users of Spark and they care more about their business, such as 
data warehouse build up and recommendation algorithm. This feature try to 
handle it dynamically even the users are not familiar with Spark.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...

2018-08-17 Thread habren
Github user habren commented on a diff in the pull request:

https://github.com/apache/spark/pull/21868#discussion_r210887543
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -459,6 +460,29 @@ object SQLConf {
 .intConf
 .createWithDefault(4096)
 
+  val IS_PARQUET_PARTITION_ADAPTIVE_ENABLED = 
buildConf("spark.sql.parquet.adaptiveFileSplit")
+.doc("For columnar file format (e.g., Parquet), it's possible that 
only few (not all) " +
+  "columns are needed. So, it's better to make sure that the total 
size of the selected " +
+  "columns is about 128 MB "
--- End diff --

I updated the description just now. Please help to review it again. Thanks 
a lot @HyukjinKwon 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...

2018-08-17 Thread habren
Github user habren commented on a diff in the pull request:

https://github.com/apache/spark/pull/21868#discussion_r210887308
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -459,6 +460,29 @@ object SQLConf {
 .intConf
 .createWithDefault(4096)
 
+  val IS_PARQUET_PARTITION_ADAPTIVE_ENABLED = 
buildConf("spark.sql.parquet.adaptiveFileSplit")
+.doc("For columnar file format (e.g., Parquet), it's possible that 
only few (not all) " +
--- End diff --

I updated it accordingly


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...

2018-08-17 Thread habren
Github user habren commented on a diff in the pull request:

https://github.com/apache/spark/pull/21868#discussion_r210886442
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -25,17 +25,16 @@ import java.util.zip.Deflater
 import scala.collection.JavaConverters._
 import scala.collection.immutable
 import scala.util.matching.Regex
-
--- End diff --

@viirya Ok, I added it back


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...

2018-08-17 Thread habren
Github user habren commented on a diff in the pull request:

https://github.com/apache/spark/pull/21868#discussion_r210876154
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -459,6 +460,29 @@ object SQLConf {
 .intConf
 .createWithDefault(4096)
 
+  val IS_PARQUET_PARTITION_ADAPTIVE_ENABLED = 
buildConf("spark.sql.parquet.adaptiveFileSplit")
--- End diff --

@HyukjinKwon  Yes, setting spark.sql.files.maxPartitionBytes explicitly do 
works. But for ad-hoc query, the selected columns are different for different 
queries, and it's not convenient or event impossible for users to set different 
maxPartitionBytes for different queries. And for general user (non advanced 
user), it's not easy for them to calculate a proper value of maxPartitionBytes. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...

2018-08-17 Thread habren
Github user habren commented on a diff in the pull request:

https://github.com/apache/spark/pull/21868#discussion_r210871055
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 
---
@@ -425,12 +426,44 @@ case class FileSourceScanExec(
   fsRelation: HadoopFsRelation): RDD[InternalRow] = {
 val defaultMaxSplitBytes =
   fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes
-val openCostInBytes = 
fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
+var openCostInBytes = 
fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
 val defaultParallelism = 
fsRelation.sparkSession.sparkContext.defaultParallelism
 val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + 
openCostInBytes)).sum
 val bytesPerCore = totalBytes / defaultParallelism
 
-val maxSplitBytes = Math.min(defaultMaxSplitBytes, 
Math.max(openCostInBytes, bytesPerCore))
+var maxSplitBytes = Math.min(defaultMaxSplitBytes, 
Math.max(openCostInBytes, bytesPerCore))
+
+
if(fsRelation.sparkSession.sessionState.conf.isParquetSizeAdaptiveEnabled &&
+  (fsRelation.fileFormat.isInstanceOf[ParquetSource] ||
+fsRelation.fileFormat.isInstanceOf[OrcFileFormat])) {
+  if (relation.dataSchema.map(_.dataType).forall(dataType =>
+dataType.isInstanceOf[CalendarIntervalType] || 
dataType.isInstanceOf[StructType]
+  || dataType.isInstanceOf[MapType] || 
dataType.isInstanceOf[NullType]
+  || dataType.isInstanceOf[AtomicType] || 
dataType.isInstanceOf[ArrayType])) {
+
+def getTypeLength(dataType: DataType): Int = {
+  if (dataType.isInstanceOf[StructType]) {
+
fsRelation.sparkSession.sessionState.conf.parquetStructTypeLength
+  } else if (dataType.isInstanceOf[ArrayType]) {
+
fsRelation.sparkSession.sessionState.conf.parquetArrayTypeLength
+  } else if (dataType.isInstanceOf[MapType]) {
+fsRelation.sparkSession.sessionState.conf.parquetMapTypeLength
+  } else {
+dataType.defaultSize
+  }
+}
+
+val selectedColumnSize = 
requiredSchema.map(_.dataType).map(getTypeLength(_))
+  .reduceOption(_ + _).getOrElse(StringType.defaultSize)
+val totalColumnSize = 
relation.dataSchema.map(_.dataType).map(getTypeLength(_))
+  .reduceOption(_ + _).getOrElse(StringType.defaultSize)
--- End diff --

@HyukjinKwon  I agree that the estimation is rough especially for complex 
type. For AtomicType, it works better. And at least it take column pruning into 
consideration. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...

2018-08-16 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21868#discussion_r210799950
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 
---
@@ -425,12 +426,44 @@ case class FileSourceScanExec(
   fsRelation: HadoopFsRelation): RDD[InternalRow] = {
 val defaultMaxSplitBytes =
   fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes
-val openCostInBytes = 
fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
+var openCostInBytes = 
fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
 val defaultParallelism = 
fsRelation.sparkSession.sparkContext.defaultParallelism
 val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + 
openCostInBytes)).sum
 val bytesPerCore = totalBytes / defaultParallelism
 
-val maxSplitBytes = Math.min(defaultMaxSplitBytes, 
Math.max(openCostInBytes, bytesPerCore))
+var maxSplitBytes = Math.min(defaultMaxSplitBytes, 
Math.max(openCostInBytes, bytesPerCore))
+
+
if(fsRelation.sparkSession.sessionState.conf.isParquetSizeAdaptiveEnabled &&
+  (fsRelation.fileFormat.isInstanceOf[ParquetSource] ||
+fsRelation.fileFormat.isInstanceOf[OrcFileFormat])) {
+  if (relation.dataSchema.map(_.dataType).forall(dataType =>
+dataType.isInstanceOf[CalendarIntervalType] || 
dataType.isInstanceOf[StructType]
+  || dataType.isInstanceOf[MapType] || 
dataType.isInstanceOf[NullType]
+  || dataType.isInstanceOf[AtomicType] || 
dataType.isInstanceOf[ArrayType])) {
+
+def getTypeLength(dataType: DataType): Int = {
+  if (dataType.isInstanceOf[StructType]) {
+
fsRelation.sparkSession.sessionState.conf.parquetStructTypeLength
+  } else if (dataType.isInstanceOf[ArrayType]) {
+
fsRelation.sparkSession.sessionState.conf.parquetArrayTypeLength
+  } else if (dataType.isInstanceOf[MapType]) {
+fsRelation.sparkSession.sessionState.conf.parquetMapTypeLength
+  } else {
+dataType.defaultSize
+  }
+}
+
+val selectedColumnSize = 
requiredSchema.map(_.dataType).map(getTypeLength(_))
+  .reduceOption(_ + _).getOrElse(StringType.defaultSize)
+val totalColumnSize = 
relation.dataSchema.map(_.dataType).map(getTypeLength(_))
+  .reduceOption(_ + _).getOrElse(StringType.defaultSize)
--- End diff --

I think his point is that the estimation is super rough which I agree with 
.. I am less sure if we should go ahead or not partially by this reason as well.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...

2018-08-16 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21868#discussion_r210799970
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -459,6 +460,29 @@ object SQLConf {
 .intConf
 .createWithDefault(4096)
 
+  val IS_PARQUET_PARTITION_ADAPTIVE_ENABLED = 
buildConf("spark.sql.parquet.adaptiveFileSplit")
+.doc("For columnar file format (e.g., Parquet), it's possible that 
only few (not all) " +
+  "columns are needed. So, it's better to make sure that the total 
size of the selected " +
+  "columns is about 128 MB "
+)
+.booleanConf
+.createWithDefault(false)
+
+  val PARQUET_STRUCT_LENGTH = buildConf("spark.sql.parquet.struct.length")
+.doc("Set the default size of struct column")
+.intConf
+.createWithDefault(StringType.defaultSize)
+
+  val PARQUET_MAP_LENGTH = buildConf("spark.sql.parquet.map.length")
--- End diff --

Yeah, I was thinking that.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...

2018-08-16 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21868#discussion_r210799891
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -459,6 +460,29 @@ object SQLConf {
 .intConf
 .createWithDefault(4096)
 
+  val IS_PARQUET_PARTITION_ADAPTIVE_ENABLED = 
buildConf("spark.sql.parquet.adaptiveFileSplit")
+.doc("For columnar file format (e.g., Parquet), it's possible that 
only few (not all) " +
+  "columns are needed. So, it's better to make sure that the total 
size of the selected " +
+  "columns is about 128 MB "
+)
+.booleanConf
+.createWithDefault(false)
+
+  val PARQUET_STRUCT_LENGTH = buildConf("spark.sql.parquet.struct.length")
+.doc("Set the default size of struct column")
+.intConf
+.createWithDefault(StringType.defaultSize)
+
+  val PARQUET_MAP_LENGTH = buildConf("spark.sql.parquet.map.length")
--- End diff --

I wouldn't do this. This makes more complicated and I would just set a 
bigger number for `maxPartitionBytes`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...

2018-08-16 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21868#discussion_r210799770
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -459,6 +460,29 @@ object SQLConf {
 .intConf
 .createWithDefault(4096)
 
+  val IS_PARQUET_PARTITION_ADAPTIVE_ENABLED = 
buildConf("spark.sql.parquet.adaptiveFileSplit")
+.doc("For columnar file format (e.g., Parquet), it's possible that 
only few (not all) " +
+  "columns are needed. So, it's better to make sure that the total 
size of the selected " +
+  "columns is about 128 MB "
--- End diff --

It sounds not describing what the configuration does actually.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...

2018-08-16 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21868#discussion_r210799731
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -459,6 +460,29 @@ object SQLConf {
 .intConf
 .createWithDefault(4096)
 
+  val IS_PARQUET_PARTITION_ADAPTIVE_ENABLED = 
buildConf("spark.sql.parquet.adaptiveFileSplit")
+.doc("For columnar file format (e.g., Parquet), it's possible that 
only few (not all) " +
--- End diff --

`it's` I would avoid abbreviation.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...

2018-08-16 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21868#discussion_r210799600
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -459,6 +460,29 @@ object SQLConf {
 .intConf
 .createWithDefault(4096)
 
+  val IS_PARQUET_PARTITION_ADAPTIVE_ENABLED = 
buildConf("spark.sql.parquet.adaptiveFileSplit")
--- End diff --

This configuration doesn't look specific to parquet anymore.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...

2018-08-16 Thread habren
Github user habren commented on a diff in the pull request:

https://github.com/apache/spark/pull/21868#discussion_r210793717
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 
---
@@ -425,12 +426,44 @@ case class FileSourceScanExec(
   fsRelation: HadoopFsRelation): RDD[InternalRow] = {
 val defaultMaxSplitBytes =
   fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes
-val openCostInBytes = 
fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
+var openCostInBytes = 
fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
 val defaultParallelism = 
fsRelation.sparkSession.sparkContext.defaultParallelism
 val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + 
openCostInBytes)).sum
 val bytesPerCore = totalBytes / defaultParallelism
 
-val maxSplitBytes = Math.min(defaultMaxSplitBytes, 
Math.max(openCostInBytes, bytesPerCore))
+var maxSplitBytes = Math.min(defaultMaxSplitBytes, 
Math.max(openCostInBytes, bytesPerCore))
+
+
if(fsRelation.sparkSession.sessionState.conf.isParquetSizeAdaptiveEnabled &&
+  (fsRelation.fileFormat.isInstanceOf[ParquetSource] ||
+fsRelation.fileFormat.isInstanceOf[OrcFileFormat])) {
+  if (relation.dataSchema.map(_.dataType).forall(dataType =>
+dataType.isInstanceOf[CalendarIntervalType] || 
dataType.isInstanceOf[StructType]
+  || dataType.isInstanceOf[MapType] || 
dataType.isInstanceOf[NullType]
+  || dataType.isInstanceOf[AtomicType] || 
dataType.isInstanceOf[ArrayType])) {
+
+def getTypeLength(dataType: DataType): Int = {
+  if (dataType.isInstanceOf[StructType]) {
+
fsRelation.sparkSession.sessionState.conf.parquetStructTypeLength
+  } else if (dataType.isInstanceOf[ArrayType]) {
+
fsRelation.sparkSession.sessionState.conf.parquetArrayTypeLength
+  } else if (dataType.isInstanceOf[MapType]) {
+fsRelation.sparkSession.sessionState.conf.parquetMapTypeLength
+  } else {
+dataType.defaultSize
+  }
+}
+
+val selectedColumnSize = 
requiredSchema.map(_.dataType).map(getTypeLength(_))
+  .reduceOption(_ + _).getOrElse(StringType.defaultSize)
+val totalColumnSize = 
relation.dataSchema.map(_.dataType).map(getTypeLength(_))
+  .reduceOption(_ + _).getOrElse(StringType.defaultSize)
--- End diff --

@gatorsmile  The target of this change is not making users easy to set the 
partition size. Instead, when user set the partition size, this change will try 
its best to make sure the read size is  close to the value that set by user. 
Without this change, when user set partition size to 128MB, the actual read 
size may be 1MB or even smaller because of column pruning.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...

2018-08-16 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21868#discussion_r210785081
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -459,6 +458,29 @@ object SQLConf {
 .intConf
 .createWithDefault(4096)
 
+  val IS_PARQUET_PARTITION_ADAPTIVE_ENABLED = 
buildConf("spark.sql.parquet.adaptiveFileSplit")
+.doc("For columnar file format (e.g., Parquet), it's possible that 
only few (not all) " +
+  "columns are needed. So, it's better to make sure that the total 
size of the selected " +
+  "columns is about 128 MB "
+)
+.booleanConf
+.createWithDefault(false)
+
+  val PARQUET_STRUCT_LENGTH = buildConf("spark.sql.parquet.struct.length")
+.doc("Set the default size of struct column")
+.intConf
+.createWithDefault(StringType.defaultSize)
--- End diff --

And these configs assume that different storage formats use the same size?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...

2018-08-16 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21868#discussion_r210779310
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -459,6 +458,29 @@ object SQLConf {
 .intConf
 .createWithDefault(4096)
 
+  val IS_PARQUET_PARTITION_ADAPTIVE_ENABLED = 
buildConf("spark.sql.parquet.adaptiveFileSplit")
+.doc("For columnar file format (e.g., Parquet), it's possible that 
only few (not all) " +
+  "columns are needed. So, it's better to make sure that the total 
size of the selected " +
+  "columns is about 128 MB "
+)
+.booleanConf
+.createWithDefault(false)
+
+  val PARQUET_STRUCT_LENGTH = buildConf("spark.sql.parquet.struct.length")
+.doc("Set the default size of struct column")
+.intConf
+.createWithDefault(StringType.defaultSize)
+
+  val PARQUET_MAP_LENGTH = buildConf("spark.sql.parquet.map.length")
+.doc("Set the default size of map column")
+.intConf
+.createWithDefault(StringType.defaultSize)
+
+  val PARQUET_ARRAY_LENGTH = buildConf("spark.sql.parquet.array.length")
+.doc("Set the default size of array column")
+.intConf
+.createWithDefault(StringType.defaultSize)
--- End diff --

This feature includes so many configs, my concern is it is hard for end 
users to set them.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...

2018-08-16 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21868#discussion_r210765335
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -25,17 +25,16 @@ import java.util.zip.Deflater
 import scala.collection.JavaConverters._
 import scala.collection.immutable
 import scala.util.matching.Regex
-
--- End diff --

Please don't remove these blank lines. Can you revert it back?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...

2018-08-16 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21868#discussion_r210494497
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 
---
@@ -425,12 +426,44 @@ case class FileSourceScanExec(
   fsRelation: HadoopFsRelation): RDD[InternalRow] = {
 val defaultMaxSplitBytes =
   fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes
-val openCostInBytes = 
fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
+var openCostInBytes = 
fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
 val defaultParallelism = 
fsRelation.sparkSession.sparkContext.defaultParallelism
 val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + 
openCostInBytes)).sum
 val bytesPerCore = totalBytes / defaultParallelism
 
-val maxSplitBytes = Math.min(defaultMaxSplitBytes, 
Math.max(openCostInBytes, bytesPerCore))
+var maxSplitBytes = Math.min(defaultMaxSplitBytes, 
Math.max(openCostInBytes, bytesPerCore))
+
+
if(fsRelation.sparkSession.sessionState.conf.isParquetSizeAdaptiveEnabled &&
+  (fsRelation.fileFormat.isInstanceOf[ParquetSource] ||
+fsRelation.fileFormat.isInstanceOf[OrcFileFormat])) {
+  if (relation.dataSchema.map(_.dataType).forall(dataType =>
+dataType.isInstanceOf[CalendarIntervalType] || 
dataType.isInstanceOf[StructType]
+  || dataType.isInstanceOf[MapType] || 
dataType.isInstanceOf[NullType]
+  || dataType.isInstanceOf[AtomicType] || 
dataType.isInstanceOf[ArrayType])) {
+
+def getTypeLength(dataType: DataType): Int = {
+  if (dataType.isInstanceOf[StructType]) {
+
fsRelation.sparkSession.sessionState.conf.parquetStructTypeLength
+  } else if (dataType.isInstanceOf[ArrayType]) {
+
fsRelation.sparkSession.sessionState.conf.parquetArrayTypeLength
+  } else if (dataType.isInstanceOf[MapType]) {
+fsRelation.sparkSession.sessionState.conf.parquetMapTypeLength
+  } else {
+dataType.defaultSize
+  }
+}
+
+val selectedColumnSize = 
requiredSchema.map(_.dataType).map(getTypeLength(_))
+  .reduceOption(_ + _).getOrElse(StringType.defaultSize)
+val totalColumnSize = 
relation.dataSchema.map(_.dataType).map(getTypeLength(_))
+  .reduceOption(_ + _).getOrElse(StringType.defaultSize)
--- End diff --

The type based estimation is very rough. This is still hard for end users 
to decide the initial size. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...

2018-08-15 Thread habren
Github user habren commented on a diff in the pull request:

https://github.com/apache/spark/pull/21868#discussion_r210456342
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 
---
@@ -401,12 +399,41 @@ case class FileSourceScanExec(
   fsRelation: HadoopFsRelation): RDD[InternalRow] = {
 val defaultMaxSplitBytes =
   fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes
-val openCostInBytes = 
fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
+var openCostInBytes = 
fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
 val defaultParallelism = 
fsRelation.sparkSession.sparkContext.defaultParallelism
 val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + 
openCostInBytes)).sum
 val bytesPerCore = totalBytes / defaultParallelism
 
-val maxSplitBytes = Math.min(defaultMaxSplitBytes, 
Math.max(openCostInBytes, bytesPerCore))
+var maxSplitBytes = Math.min(defaultMaxSplitBytes, 
Math.max(openCostInBytes, bytesPerCore))
+if(fsRelation.fileFormat.isInstanceOf[ParquetSource] &&
+  
fsRelation.sparkSession.sessionState.conf.isParquetSizeAdaptiveEnabled) {
+  if (relation.dataSchema.map(_.dataType).forall(dataType =>
+dataType.isInstanceOf[CalendarIntervalType] || 
dataType.isInstanceOf[StructType]
+  || dataType.isInstanceOf[MapType] || 
dataType.isInstanceOf[NullType]
+  || dataType.isInstanceOf[AtomicType] || 
dataType.isInstanceOf[ArrayType])) {
+
+def getTypeLength (dataType : DataType) : Int = {
+  if (dataType.isInstanceOf[StructType]) {
+
fsRelation.sparkSession.sessionState.conf.parquetStructTypeLength
+  } else if (dataType.isInstanceOf[ArrayType]) {
+
fsRelation.sparkSession.sessionState.conf.parquetArrayTypeLength
+  } else if (dataType.isInstanceOf[MapType]) {
+fsRelation.sparkSession.sessionState.conf.parquetMapTypeLength
+  } else {
+dataType.defaultSize
+  }
+}
+
+val selectedColumnSize = 
requiredSchema.map(_.dataType).map(getTypeLength(_))
+  .reduceOption(_ + _).getOrElse(StringType.defaultSize)
+val totalColumnSize = 
relation.dataSchema.map(_.dataType).map(getTypeLength(_))
+  .reduceOption(_ + _).getOrElse(StringType.defaultSize)
+val multiplier = totalColumnSize / selectedColumnSize
--- End diff --

@viirya  Now it also support ORC. Please help to review


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...

2018-07-26 Thread habren
GitHub user habren reopened a pull request:

https://github.com/apache/spark/pull/21868

[SPARK-24906][SQL] Adaptively enlarge split / partition size for Parq…

Please refer to https://issues.apache.org/jira/browse/SPARK-24906 for more 
detail and test

For columnar file, such as, when spark sql read the table, each split will 
be 128 MB by default since spark.sql.files.maxPartitionBytes is default to 
128MB. Even when user set it to a large value, such as 512MB, the task may read 
only few MB or even hundreds of KB. Because the table (Parquet) may consists of 
dozens of columns while the SQL only need few columns. And spark will prune the 
unnecessary columns.

In this case, spark DataSourceScanExec can enlarge maxPartitionBytes 
adaptively. 

For example, there is 40 columns , 20 are integer while another 20 are 
long. When use query on an integer type column and an long type column, the 
maxPartitionBytes should be 20 times larger. (20*4+20*8) /  (4+8) = 20. 

 With this optimization, the number of task will be smaller and the job 
will run faster. More importantly, for a very large cluster (more the 10 
thousand nodes), it will relieve RM's schedule pressure.

 Here is the test

 The table named test2 has more than 40 columns and there are more than 5 
TB data each hour.

When we issue a very simple query 

` select count(device_id) from test2 where date=20180708 and hour='23'`
 
There are 72176 tasks and the duration of the job is 4.8 minutes

Most tasks last less than 1 second and read less than 1.5 MB data


After the optimization, there are only 1615 tasks and the job last only 30 
seconds. It almost 10 times faster.

The median of read data is 44.2MB. 

https://issues.apache.org/jira/browse/SPARK-24906


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/habren/spark SPARK-24906

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21868.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21868


commit e34aaa2fc0c1ebf87028d834ea5e9a61bc026bc6
Author: Jason Guo 
Date:   2018-07-25T02:18:22Z

[SPARK-24906][SQL] Adaptively enlarge split / partition size for Parquet 
scan




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...

2018-07-26 Thread habren
Github user habren closed the pull request at:

https://github.com/apache/spark/pull/21868


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...

2018-07-26 Thread habren
Github user habren commented on a diff in the pull request:

https://github.com/apache/spark/pull/21868#discussion_r205356861
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 
---
@@ -401,12 +399,41 @@ case class FileSourceScanExec(
   fsRelation: HadoopFsRelation): RDD[InternalRow] = {
 val defaultMaxSplitBytes =
   fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes
-val openCostInBytes = 
fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
+var openCostInBytes = 
fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
 val defaultParallelism = 
fsRelation.sparkSession.sparkContext.defaultParallelism
 val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + 
openCostInBytes)).sum
 val bytesPerCore = totalBytes / defaultParallelism
 
-val maxSplitBytes = Math.min(defaultMaxSplitBytes, 
Math.max(openCostInBytes, bytesPerCore))
+var maxSplitBytes = Math.min(defaultMaxSplitBytes, 
Math.max(openCostInBytes, bytesPerCore))
+if(fsRelation.fileFormat.isInstanceOf[ParquetSource] &&
+  
fsRelation.sparkSession.sessionState.conf.isParquetSizeAdaptiveEnabled) {
+  if (relation.dataSchema.map(_.dataType).forall(dataType =>
+dataType.isInstanceOf[CalendarIntervalType] || 
dataType.isInstanceOf[StructType]
+  || dataType.isInstanceOf[MapType] || 
dataType.isInstanceOf[NullType]
+  || dataType.isInstanceOf[AtomicType] || 
dataType.isInstanceOf[ArrayType])) {
+
+def getTypeLength (dataType : DataType) : Int = {
+  if (dataType.isInstanceOf[StructType]) {
+
fsRelation.sparkSession.sessionState.conf.parquetStructTypeLength
+  } else if (dataType.isInstanceOf[ArrayType]) {
+
fsRelation.sparkSession.sessionState.conf.parquetArrayTypeLength
+  } else if (dataType.isInstanceOf[MapType]) {
+fsRelation.sparkSession.sessionState.conf.parquetMapTypeLength
+  } else {
+dataType.defaultSize
+  }
+}
+
+val selectedColumnSize = 
requiredSchema.map(_.dataType).map(getTypeLength(_))
+  .reduceOption(_ + _).getOrElse(StringType.defaultSize)
+val totalColumnSize = 
relation.dataSchema.map(_.dataType).map(getTypeLength(_))
+  .reduceOption(_ + _).getOrElse(StringType.defaultSize)
+val multiplier = totalColumnSize / selectedColumnSize
--- End diff --

@viirya  As defined in getTypeLength, user can define the complex types' 
length as per the data statistics. And the length for AtomicType can be 
determined by AtomicType.defaultSize. So the multiplier is the ratio of the 
total length of the selected columns to the total length of all columns.

def getTypeLength (dataType : DataType) : Int = {
  if (dataType.isInstanceOf[StructType]) {

fsRelation.sparkSession.sessionState.conf.parquetStructTypeLength
  } else if (dataType.isInstanceOf[ArrayType]) {
fsRelation.sparkSession.sessionState.conf.parquetArrayTypeLength
  } else if (dataType.isInstanceOf[MapType]) {
fsRelation.sparkSession.sessionState.conf.parquetMapTypeLength
  } else {
dataType.defaultSize
  }
}


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...

2018-07-25 Thread habren
Github user habren commented on a diff in the pull request:

https://github.com/apache/spark/pull/21868#discussion_r205288000
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 
---
@@ -401,12 +399,41 @@ case class FileSourceScanExec(
   fsRelation: HadoopFsRelation): RDD[InternalRow] = {
 val defaultMaxSplitBytes =
   fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes
-val openCostInBytes = 
fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
+var openCostInBytes = 
fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
 val defaultParallelism = 
fsRelation.sparkSession.sparkContext.defaultParallelism
 val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + 
openCostInBytes)).sum
 val bytesPerCore = totalBytes / defaultParallelism
 
-val maxSplitBytes = Math.min(defaultMaxSplitBytes, 
Math.max(openCostInBytes, bytesPerCore))
+var maxSplitBytes = Math.min(defaultMaxSplitBytes, 
Math.max(openCostInBytes, bytesPerCore))
+if(fsRelation.fileFormat.isInstanceOf[ParquetSource] &&
+  
fsRelation.sparkSession.sessionState.conf.isParquetSizeAdaptiveEnabled) {
+  if (relation.dataSchema.map(_.dataType).forall(dataType =>
+dataType.isInstanceOf[CalendarIntervalType] || 
dataType.isInstanceOf[StructType]
+  || dataType.isInstanceOf[MapType] || 
dataType.isInstanceOf[NullType]
+  || dataType.isInstanceOf[AtomicType] || 
dataType.isInstanceOf[ArrayType])) {
+
+def getTypeLength (dataType : DataType) : Int = {
+  if (dataType.isInstanceOf[StructType]) {
+
fsRelation.sparkSession.sessionState.conf.parquetStructTypeLength
+  } else if (dataType.isInstanceOf[ArrayType]) {
+
fsRelation.sparkSession.sessionState.conf.parquetArrayTypeLength
+  } else if (dataType.isInstanceOf[MapType]) {
+fsRelation.sparkSession.sessionState.conf.parquetMapTypeLength
+  } else {
+dataType.defaultSize
+  }
+}
+
+val selectedColumnSize = 
requiredSchema.map(_.dataType).map(getTypeLength(_))
+  .reduceOption(_ + _).getOrElse(StringType.defaultSize)
+val totalColumnSize = 
relation.dataSchema.map(_.dataType).map(getTypeLength(_))
+  .reduceOption(_ + _).getOrElse(StringType.defaultSize)
+val multiplier = totalColumnSize / selectedColumnSize
--- End diff --

There are many data types. CalendarIntervalType  StructType  MapType  
NullType   UserDefinedType AtomicType(TimestampType  StringType  HiveStringType 
 BooleanType  DateType  BinaryType  NumericType)  ObjectType  ArrayType. For 
AtomicType, the size is fixed to the defaultSize. For complex type, such as 
StructType, MapType, ArrayType, the size is mutable. So I make it configurable 
with default value. With the data type size, multiplier is not only the ratio 
of selected columns to total columns, but the total size of selected columns to 
total size of all columns.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...

2018-07-25 Thread habren
Github user habren commented on a diff in the pull request:

https://github.com/apache/spark/pull/21868#discussion_r205287123
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -381,6 +381,26 @@ object SQLConf {
   .booleanConf
   .createWithDefault(true)
 
+  val IS_PARQUET_PARTITION_ADAPTIVE_ENABLED = 
buildConf("spark.sql.parquet.adaptiveFileSplit")
+.doc("For columnar file format (e.g., Parquet), it's possible that 
only few (not all) " +
+  "columns are needed. So, it's better to make sure that the total 
size of the selected " +
+  "columns is about 128 MB "
+)
+.booleanConf
+.createWithDefault(false)
+
+  val PARQUET_STRUCT_LENGTH = buildConf("spark.sql.parquet.struct.length")
+.intConf
+.createWithDefault(StructType.defaultConcreteType.defaultSize)
+
+  val PARQUET_MAP_LENGTH = buildConf("spark.sql.parquet.map.length")
+.intConf
+.createWithDefault(MapType.defaultConcreteType.defaultSize)
+
+  val PARQUET_ARRAY_LENGTH = buildConf("spark.sql.parquet.array.length")
+.intConf
+.createWithDefault(ArrayType.defaultConcreteType.defaultSize)
--- End diff --

Thanks for your comments. I set the default value to StringType.defaultSize 
(8). It's default size, use should configure it according to the real data


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...

2018-07-25 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21868#discussion_r205278858
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 
---
@@ -401,12 +399,41 @@ case class FileSourceScanExec(
   fsRelation: HadoopFsRelation): RDD[InternalRow] = {
 val defaultMaxSplitBytes =
   fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes
-val openCostInBytes = 
fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
+var openCostInBytes = 
fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
 val defaultParallelism = 
fsRelation.sparkSession.sparkContext.defaultParallelism
 val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + 
openCostInBytes)).sum
 val bytesPerCore = totalBytes / defaultParallelism
 
-val maxSplitBytes = Math.min(defaultMaxSplitBytes, 
Math.max(openCostInBytes, bytesPerCore))
+var maxSplitBytes = Math.min(defaultMaxSplitBytes, 
Math.max(openCostInBytes, bytesPerCore))
+if(fsRelation.fileFormat.isInstanceOf[ParquetSource] &&
+  
fsRelation.sparkSession.sessionState.conf.isParquetSizeAdaptiveEnabled) {
+  if (relation.dataSchema.map(_.dataType).forall(dataType =>
+dataType.isInstanceOf[CalendarIntervalType] || 
dataType.isInstanceOf[StructType]
+  || dataType.isInstanceOf[MapType] || 
dataType.isInstanceOf[NullType]
+  || dataType.isInstanceOf[AtomicType] || 
dataType.isInstanceOf[ArrayType])) {
+
+def getTypeLength (dataType : DataType) : Int = {
+  if (dataType.isInstanceOf[StructType]) {
+
fsRelation.sparkSession.sessionState.conf.parquetStructTypeLength
+  } else if (dataType.isInstanceOf[ArrayType]) {
+
fsRelation.sparkSession.sessionState.conf.parquetArrayTypeLength
+  } else if (dataType.isInstanceOf[MapType]) {
+fsRelation.sparkSession.sessionState.conf.parquetMapTypeLength
+  } else {
+dataType.defaultSize
+  }
+}
+
+val selectedColumnSize = 
requiredSchema.map(_.dataType).map(getTypeLength(_))
+  .reduceOption(_ + _).getOrElse(StringType.defaultSize)
+val totalColumnSize = 
relation.dataSchema.map(_.dataType).map(getTypeLength(_))
+  .reduceOption(_ + _).getOrElse(StringType.defaultSize)
+val multiplier = totalColumnSize / selectedColumnSize
--- End diff --

Seems here you can only get the ratio of selected columns to total columns. 
The actual type sizes are not put into consideration.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...

2018-07-25 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21868#discussion_r205278202
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -381,6 +381,26 @@ object SQLConf {
   .booleanConf
   .createWithDefault(true)
 
+  val IS_PARQUET_PARTITION_ADAPTIVE_ENABLED = 
buildConf("spark.sql.parquet.adaptiveFileSplit")
+.doc("For columnar file format (e.g., Parquet), it's possible that 
only few (not all) " +
+  "columns are needed. So, it's better to make sure that the total 
size of the selected " +
+  "columns is about 128 MB "
+)
+.booleanConf
+.createWithDefault(false)
+
+  val PARQUET_STRUCT_LENGTH = buildConf("spark.sql.parquet.struct.length")
+.intConf
+.createWithDefault(StructType.defaultConcreteType.defaultSize)
+
+  val PARQUET_MAP_LENGTH = buildConf("spark.sql.parquet.map.length")
+.intConf
+.createWithDefault(MapType.defaultConcreteType.defaultSize)
+
+  val PARQUET_ARRAY_LENGTH = buildConf("spark.sql.parquet.array.length")
+.intConf
+.createWithDefault(ArrayType.defaultConcreteType.defaultSize)
--- End diff --

`ArrayType.defaultConcreteType` is `ArrayType(NullType, containsNull = 
true)`. I think using this you won't get a reasonable number.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...

2018-07-24 Thread habren
GitHub user habren opened a pull request:

https://github.com/apache/spark/pull/21868

[SPARK-24906][SQL] Adaptively enlarge split / partition size for Parq…

Please refer to https://issues.apache.org/jira/browse/SPARK-24906 for more 
detail and test

For columnar file, such as, when spark sql read the table, each split will 
be 128 MB by default since spark.sql.files.maxPartitionBytes is default to 
128MB. Even when user set it to a large value, such as 512MB, the task may read 
only few MB or even hundreds of KB. Because the table (Parquet) may consists of 
dozens of columns while the SQL only need few columns. And spark will prune the 
unnecessary columns.

In this case, spark DataSourceScanExec can enlarge maxPartitionBytes 
adaptively. 

For example, there is 40 columns , 20 are integer while another 20 are 
long. When use query on an integer type column and an long type column, the 
maxPartitionBytes should be 20 times larger. (20*4+20*8) /  (4+8) = 20. 

 With this optimization, the number of task will be smaller and the job 
will run faster. More importantly, for a very large cluster (more the 10 
thousand nodes), it will relieve RM's schedule pressure.

 Here is the test

 The table named test2 has more than 40 columns and there are more than 5 
TB data each hour.

When we issue a very simple query 

` select count(device_id) from test2 where date=20180708 and hour='23'`
 
There are 72176 tasks and the duration of the job is 4.8 minutes

Most tasks last less than 1 second and read less than 1.5 MB data


After the optimization, there are only 1615 tasks and the job last only 30 
seconds. It almost 10 times faster.

The median of read data is 44.2MB. 

https://issues.apache.org/jira/browse/SPARK-24906


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/habren/spark SPARK-24906

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21868.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21868


commit 9ff34525e346e6e1cbe4b12fc6f972a163fd920e
Author: 郭俊 
Date:   2018-07-25T02:07:38Z

[SPARK-24906][SQL] Adaptively enlarge split / partition size for Parquet 
scan




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org