[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...

2018-08-17 Thread habren
Github user habren commented on a diff in the pull request:

https://github.com/apache/spark/pull/21868#discussion_r210890027
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -459,6 +460,29 @@ object SQLConf {
 .intConf
 .createWithDefault(4096)
 
+  val IS_PARQUET_PARTITION_ADAPTIVE_ENABLED = 
buildConf("spark.sql.parquet.adaptiveFileSplit")
+.doc("For columnar file format (e.g., Parquet), it's possible that 
only few (not all) " +
+  "columns are needed. So, it's better to make sure that the total 
size of the selected " +
+  "columns is about 128 MB "
+)
+.booleanConf
+.createWithDefault(false)
+
+  val PARQUET_STRUCT_LENGTH = buildConf("spark.sql.parquet.struct.length")
+.doc("Set the default size of struct column")
+.intConf
+.createWithDefault(StringType.defaultSize)
+
+  val PARQUET_MAP_LENGTH = buildConf("spark.sql.parquet.map.length")
--- End diff --

@HyukjinKwon  @viirya  Setting spark.sql.files.maxPartitionBytes explicitly 
do works. For you or other advanced users, it's convenient to set a bigger 
number of maxPartitionBytes.

But for ad-hoc query, the selected columns are different for different 
queries, and it's not convenient or event impossible for users to set different 
maxPartitionBytes for different queries. 

And for general user (non advanced user), it's not easy for them to 
calculate a proper value of maxPartitionBytes. 

You know, in many big company, there may be one or few teams are familiar 
with the details of  Spark, and they maintain the Spark cluster. Other teams 
are general users of Spark and they care more about their business, such as 
data warehouse build up and recommendation algorithm. This feature try to 
handle it dynamically even the users are not familiar with Spark.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21868: [SPARK-24906][SQL] Adaptively enlarge split / partition ...

2018-08-17 Thread habren
Github user habren commented on the issue:

https://github.com/apache/spark/pull/21868
  
@HyukjinKwon Yes this is to handle it dynamically.
For ad-hoc query, the selected columns are different for different queries, 
and it's not convenient or event impossible for users to set different 
maxPartitionBytes for different queries.
And for general user (non advanced user), it's not easy for them to set a 
proper value of maxPartitionBytes.
So, this change make it easier

> BTW, just for clarification, you can set the bigger number to 
spark.sql.files.maxPartitionBytes explicitly and that resolved your issue. This 
one is to handle it dynamically, right?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...

2018-08-17 Thread habren
Github user habren commented on a diff in the pull request:

https://github.com/apache/spark/pull/21868#discussion_r210887543
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -459,6 +460,29 @@ object SQLConf {
 .intConf
 .createWithDefault(4096)
 
+  val IS_PARQUET_PARTITION_ADAPTIVE_ENABLED = 
buildConf("spark.sql.parquet.adaptiveFileSplit")
+.doc("For columnar file format (e.g., Parquet), it's possible that 
only few (not all) " +
+  "columns are needed. So, it's better to make sure that the total 
size of the selected " +
+  "columns is about 128 MB "
--- End diff --

I updated the description just now. Please help to review it again. Thanks 
a lot @HyukjinKwon 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...

2018-08-17 Thread habren
Github user habren commented on a diff in the pull request:

https://github.com/apache/spark/pull/21868#discussion_r210887308
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -459,6 +460,29 @@ object SQLConf {
 .intConf
 .createWithDefault(4096)
 
+  val IS_PARQUET_PARTITION_ADAPTIVE_ENABLED = 
buildConf("spark.sql.parquet.adaptiveFileSplit")
+.doc("For columnar file format (e.g., Parquet), it's possible that 
only few (not all) " +
--- End diff --

I updated it accordingly


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...

2018-08-17 Thread habren
Github user habren commented on a diff in the pull request:

https://github.com/apache/spark/pull/21868#discussion_r210886442
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -25,17 +25,16 @@ import java.util.zip.Deflater
 import scala.collection.JavaConverters._
 import scala.collection.immutable
 import scala.util.matching.Regex
-
--- End diff --

@viirya Ok, I added it back


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...

2018-08-17 Thread habren
Github user habren commented on a diff in the pull request:

https://github.com/apache/spark/pull/21868#discussion_r210876154
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -459,6 +460,29 @@ object SQLConf {
 .intConf
 .createWithDefault(4096)
 
+  val IS_PARQUET_PARTITION_ADAPTIVE_ENABLED = 
buildConf("spark.sql.parquet.adaptiveFileSplit")
--- End diff --

@HyukjinKwon  Yes, setting spark.sql.files.maxPartitionBytes explicitly do 
works. But for ad-hoc query, the selected columns are different for different 
queries, and it's not convenient or event impossible for users to set different 
maxPartitionBytes for different queries. And for general user (non advanced 
user), it's not easy for them to calculate a proper value of maxPartitionBytes. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...

2018-08-17 Thread habren
Github user habren commented on a diff in the pull request:

https://github.com/apache/spark/pull/21868#discussion_r210871055
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 
---
@@ -425,12 +426,44 @@ case class FileSourceScanExec(
   fsRelation: HadoopFsRelation): RDD[InternalRow] = {
 val defaultMaxSplitBytes =
   fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes
-val openCostInBytes = 
fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
+var openCostInBytes = 
fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
 val defaultParallelism = 
fsRelation.sparkSession.sparkContext.defaultParallelism
 val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + 
openCostInBytes)).sum
 val bytesPerCore = totalBytes / defaultParallelism
 
-val maxSplitBytes = Math.min(defaultMaxSplitBytes, 
Math.max(openCostInBytes, bytesPerCore))
+var maxSplitBytes = Math.min(defaultMaxSplitBytes, 
Math.max(openCostInBytes, bytesPerCore))
+
+
if(fsRelation.sparkSession.sessionState.conf.isParquetSizeAdaptiveEnabled &&
+  (fsRelation.fileFormat.isInstanceOf[ParquetSource] ||
+fsRelation.fileFormat.isInstanceOf[OrcFileFormat])) {
+  if (relation.dataSchema.map(_.dataType).forall(dataType =>
+dataType.isInstanceOf[CalendarIntervalType] || 
dataType.isInstanceOf[StructType]
+  || dataType.isInstanceOf[MapType] || 
dataType.isInstanceOf[NullType]
+  || dataType.isInstanceOf[AtomicType] || 
dataType.isInstanceOf[ArrayType])) {
+
+def getTypeLength(dataType: DataType): Int = {
+  if (dataType.isInstanceOf[StructType]) {
+
fsRelation.sparkSession.sessionState.conf.parquetStructTypeLength
+  } else if (dataType.isInstanceOf[ArrayType]) {
+
fsRelation.sparkSession.sessionState.conf.parquetArrayTypeLength
+  } else if (dataType.isInstanceOf[MapType]) {
+fsRelation.sparkSession.sessionState.conf.parquetMapTypeLength
+  } else {
+dataType.defaultSize
+  }
+}
+
+val selectedColumnSize = 
requiredSchema.map(_.dataType).map(getTypeLength(_))
+  .reduceOption(_ + _).getOrElse(StringType.defaultSize)
+val totalColumnSize = 
relation.dataSchema.map(_.dataType).map(getTypeLength(_))
+  .reduceOption(_ + _).getOrElse(StringType.defaultSize)
--- End diff --

@HyukjinKwon  I agree that the estimation is rough especially for complex 
type. For AtomicType, it works better. And at least it take column pruning into 
consideration. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...

2018-08-16 Thread habren
Github user habren commented on a diff in the pull request:

https://github.com/apache/spark/pull/21868#discussion_r210793717
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 
---
@@ -425,12 +426,44 @@ case class FileSourceScanExec(
   fsRelation: HadoopFsRelation): RDD[InternalRow] = {
 val defaultMaxSplitBytes =
   fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes
-val openCostInBytes = 
fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
+var openCostInBytes = 
fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
 val defaultParallelism = 
fsRelation.sparkSession.sparkContext.defaultParallelism
 val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + 
openCostInBytes)).sum
 val bytesPerCore = totalBytes / defaultParallelism
 
-val maxSplitBytes = Math.min(defaultMaxSplitBytes, 
Math.max(openCostInBytes, bytesPerCore))
+var maxSplitBytes = Math.min(defaultMaxSplitBytes, 
Math.max(openCostInBytes, bytesPerCore))
+
+
if(fsRelation.sparkSession.sessionState.conf.isParquetSizeAdaptiveEnabled &&
+  (fsRelation.fileFormat.isInstanceOf[ParquetSource] ||
+fsRelation.fileFormat.isInstanceOf[OrcFileFormat])) {
+  if (relation.dataSchema.map(_.dataType).forall(dataType =>
+dataType.isInstanceOf[CalendarIntervalType] || 
dataType.isInstanceOf[StructType]
+  || dataType.isInstanceOf[MapType] || 
dataType.isInstanceOf[NullType]
+  || dataType.isInstanceOf[AtomicType] || 
dataType.isInstanceOf[ArrayType])) {
+
+def getTypeLength(dataType: DataType): Int = {
+  if (dataType.isInstanceOf[StructType]) {
+
fsRelation.sparkSession.sessionState.conf.parquetStructTypeLength
+  } else if (dataType.isInstanceOf[ArrayType]) {
+
fsRelation.sparkSession.sessionState.conf.parquetArrayTypeLength
+  } else if (dataType.isInstanceOf[MapType]) {
+fsRelation.sparkSession.sessionState.conf.parquetMapTypeLength
+  } else {
+dataType.defaultSize
+  }
+}
+
+val selectedColumnSize = 
requiredSchema.map(_.dataType).map(getTypeLength(_))
+  .reduceOption(_ + _).getOrElse(StringType.defaultSize)
+val totalColumnSize = 
relation.dataSchema.map(_.dataType).map(getTypeLength(_))
+  .reduceOption(_ + _).getOrElse(StringType.defaultSize)
--- End diff --

@gatorsmile  The target of this change is not making users easy to set the 
partition size. Instead, when user set the partition size, this change will try 
its best to make sure the read size is  close to the value that set by user. 
Without this change, when user set partition size to 128MB, the actual read 
size may be 1MB or even smaller because of column pruning.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21868: [SPARK-24906][SQL] Adaptively enlarge split / partition ...

2018-08-15 Thread habren
Github user habren commented on the issue:

https://github.com/apache/spark/pull/21868
  
Hi @HyukjinKwon  I moved the change to master branch just now. Please help 
to review


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...

2018-08-15 Thread habren
Github user habren commented on a diff in the pull request:

https://github.com/apache/spark/pull/21868#discussion_r210456342
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 
---
@@ -401,12 +399,41 @@ case class FileSourceScanExec(
   fsRelation: HadoopFsRelation): RDD[InternalRow] = {
 val defaultMaxSplitBytes =
   fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes
-val openCostInBytes = 
fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
+var openCostInBytes = 
fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
 val defaultParallelism = 
fsRelation.sparkSession.sparkContext.defaultParallelism
 val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + 
openCostInBytes)).sum
 val bytesPerCore = totalBytes / defaultParallelism
 
-val maxSplitBytes = Math.min(defaultMaxSplitBytes, 
Math.max(openCostInBytes, bytesPerCore))
+var maxSplitBytes = Math.min(defaultMaxSplitBytes, 
Math.max(openCostInBytes, bytesPerCore))
+if(fsRelation.fileFormat.isInstanceOf[ParquetSource] &&
+  
fsRelation.sparkSession.sessionState.conf.isParquetSizeAdaptiveEnabled) {
+  if (relation.dataSchema.map(_.dataType).forall(dataType =>
+dataType.isInstanceOf[CalendarIntervalType] || 
dataType.isInstanceOf[StructType]
+  || dataType.isInstanceOf[MapType] || 
dataType.isInstanceOf[NullType]
+  || dataType.isInstanceOf[AtomicType] || 
dataType.isInstanceOf[ArrayType])) {
+
+def getTypeLength (dataType : DataType) : Int = {
+  if (dataType.isInstanceOf[StructType]) {
+
fsRelation.sparkSession.sessionState.conf.parquetStructTypeLength
+  } else if (dataType.isInstanceOf[ArrayType]) {
+
fsRelation.sparkSession.sessionState.conf.parquetArrayTypeLength
+  } else if (dataType.isInstanceOf[MapType]) {
+fsRelation.sparkSession.sessionState.conf.parquetMapTypeLength
+  } else {
+dataType.defaultSize
+  }
+}
+
+val selectedColumnSize = 
requiredSchema.map(_.dataType).map(getTypeLength(_))
+  .reduceOption(_ + _).getOrElse(StringType.defaultSize)
+val totalColumnSize = 
relation.dataSchema.map(_.dataType).map(getTypeLength(_))
+  .reduceOption(_ + _).getOrElse(StringType.defaultSize)
+val multiplier = totalColumnSize / selectedColumnSize
--- End diff --

@viirya  Now it also support ORC. Please help to review


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21868: [SPARK-24906][SQL] Adaptively enlarge split / partition ...

2018-08-10 Thread habren
Github user habren commented on the issue:

https://github.com/apache/spark/pull/21868
  
@HyukjinKwon  Thanks for your comments. I will submit it to master soon


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21868: [SPARK-24906][SQL] Adaptively enlarge split / partition ...

2018-08-08 Thread habren
Github user habren commented on the issue:

https://github.com/apache/spark/pull/21868
  
@maropu  Thanks for your comments. ORC can also benefit from this change 
since ORC is also columnar file format. Do you think I should add ORC support 
by change the below line 

` if(fsRelation.fileFormat.isInstanceOf[ParquetSource]`

to 
`if(fsRelation.fileFormat.isInstanceOf[ParquetSource] || 
if(fsRelation.fileFormat.isInstanceOf[OrcFileFormat]`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22018: [SPARK-25038][SQL] Get block location in parallel

2018-08-08 Thread habren
Github user habren commented on a diff in the pull request:

https://github.com/apache/spark/pull/22018#discussion_r208788059
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
 ---
@@ -297,7 +297,7 @@ object InMemoryFileIndex extends Logging {
 val missingFiles = mutable.ArrayBuffer.empty[String]
 val filteredLeafStatuses = allLeafStatuses.filterNot(
   status => shouldFilterOut(status.getPath.getName))
-val resolvedLeafStatuses = filteredLeafStatuses.flatMap {
+val resolvedLeafStatuses = filteredLeafStatuses.par.flatMap {
--- End diff --

Thanks @maropu for your comments. I updated the title and description. 
Let's explain the difference between this change and the current parallel 
partition discovery. The current one will discovery different partitions in 
parallel. This change will get the block location for a single partition in 
parallel. When there is only a few partitions and each contains tons of 
thousands of files, the current partition discovery won't help. And this change 
can accelerate it in this case


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22018: [SPARK-25038][SQL] Get block location in parallel

2018-08-08 Thread habren
Github user habren commented on a diff in the pull request:

https://github.com/apache/spark/pull/22018#discussion_r208787523
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
 ---
@@ -297,7 +297,7 @@ object InMemoryFileIndex extends Logging {
 val missingFiles = mutable.ArrayBuffer.empty[String]
 val filteredLeafStatuses = allLeafStatuses.filterNot(
   status => shouldFilterOut(status.getPath.getName))
-val resolvedLeafStatuses = filteredLeafStatuses.flatMap {
+val resolvedLeafStatuses = filteredLeafStatuses.par.flatMap {
   case f: LocatedFileStatus =>
--- End diff --

Thanks. The comment was updated


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22018: [SPARK-25038][SQL] Accelerate Spark Plan generati...

2018-08-08 Thread habren
Github user habren commented on a diff in the pull request:

https://github.com/apache/spark/pull/22018#discussion_r208784609
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
 ---
@@ -297,7 +297,7 @@ object InMemoryFileIndex extends Logging {
 val missingFiles = mutable.ArrayBuffer.empty[String]
 val filteredLeafStatuses = allLeafStatuses.filterNot(
   status => shouldFilterOut(status.getPath.getName))
-val resolvedLeafStatuses = filteredLeafStatuses.flatMap {
+val resolvedLeafStatuses = filteredLeafStatuses.par.flatMap {
--- End diff --

Thanks @viirya for feedback. Yes, this method can be called on executors as 
below. Do you think it's not thread-safe ?
Each partitions will have its own hadoopConf and then own fs, and nothing 
is shared in this method.

sparkContext
.parallelize(serializedPaths, numParallelism)
.mapPartitions { pathStrings =>
  val hadoopConf = serializableConfiguration.value
  pathStrings.map(new Path(_)).toSeq.map { path =>
(path, listLeafFiles(path, hadoopConf, filter, None))
  }.iterator
}.map { case (path, statuses) =>





---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22018: [SPARK-25038][SQL] Accelerate Spark Plan generation when...

2018-08-08 Thread habren
Github user habren commented on the issue:

https://github.com/apache/spark/pull/22018
  
Hi Takeshi Yamamuro Hyukjin Kwon​ and @viirya Can you take a look at this 
patch?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21868: [SPARK-24906][SQL] Adaptively enlarge split / partition ...

2018-08-08 Thread habren
Github user habren commented on the issue:

https://github.com/apache/spark/pull/21868
  
Hi @maropu and @viirya Do you agree with the basic idea that we should take 
column pruning in to consideration during splitting the input files?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22018: [SPARK-25038][SQL] Accelerate Spark Plan generati...

2018-08-06 Thread habren
GitHub user habren opened a pull request:

https://github.com/apache/spark/pull/22018

[SPARK-25038][SQL] Accelerate Spark Plan generation when Spark SQL re…

https://issues.apache.org/jira/browse/SPARK-25038

When Spark SQL read large amount of data, it take a long time (more than 10 
minutes) to generate physical Plan and then ActiveJob

 

Example:

There is a table which is partitioned by date and hour. There are more than 
13 TB data each hour and 185 TB per day. When we just issue a very simple SQL, 
it take a long time to generate ActiveJob

 

The SQL statement is

select count(device_id) from test_tbl where date=20180731 and hour='21';
 

Before optimization, it takes 2 minutes and 9 seconds to generate the Job

 

The SQL is issued at 2018-08-07 09:07:41



However, the job is submitted at 2018-08-07 09:09:53, which is 2minutes and 
9 seconds later than the SQL issue time



 

After the optimization, it takes only 4 seconds to generate the Job

The SQL is issued at 2018-08-07 09:20:15



 

And the job is submitted at 2018-08-07 09:20:19, which is 4 seconds later 
than the SQL issue time



 

 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/habren/spark SPARK-25038

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22018.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22018


commit 2bb5924e04eba5accfe58a4fbae094d46cc36488
Author: Jason Guo 
Date:   2018-08-07T03:13:03Z

[SPARK-25038][SQL] Accelerate Spark Plan generation when Spark SQL read 
large amount of data




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21868: [SPARK-24906][SQL] Adaptively enlarge split / partition ...

2018-07-30 Thread habren
Github user habren commented on the issue:

https://github.com/apache/spark/pull/21868
  
Hi @maropu  and @viirya  Do you agree with the basic idea that we should 
take column pruning in to consideration during splitting the input files?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...

2018-07-26 Thread habren
GitHub user habren reopened a pull request:

https://github.com/apache/spark/pull/21868

[SPARK-24906][SQL] Adaptively enlarge split / partition size for Parq…

Please refer to https://issues.apache.org/jira/browse/SPARK-24906 for more 
detail and test

For columnar file, such as, when spark sql read the table, each split will 
be 128 MB by default since spark.sql.files.maxPartitionBytes is default to 
128MB. Even when user set it to a large value, such as 512MB, the task may read 
only few MB or even hundreds of KB. Because the table (Parquet) may consists of 
dozens of columns while the SQL only need few columns. And spark will prune the 
unnecessary columns.

In this case, spark DataSourceScanExec can enlarge maxPartitionBytes 
adaptively. 

For example, there is 40 columns , 20 are integer while another 20 are 
long. When use query on an integer type column and an long type column, the 
maxPartitionBytes should be 20 times larger. (20*4+20*8) /  (4+8) = 20. 

 With this optimization, the number of task will be smaller and the job 
will run faster. More importantly, for a very large cluster (more the 10 
thousand nodes), it will relieve RM's schedule pressure.

 Here is the test

 The table named test2 has more than 40 columns and there are more than 5 
TB data each hour.

When we issue a very simple query 

` select count(device_id) from test2 where date=20180708 and hour='23'`
 
There are 72176 tasks and the duration of the job is 4.8 minutes

Most tasks last less than 1 second and read less than 1.5 MB data


After the optimization, there are only 1615 tasks and the job last only 30 
seconds. It almost 10 times faster.

The median of read data is 44.2MB. 

https://issues.apache.org/jira/browse/SPARK-24906


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/habren/spark SPARK-24906

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21868.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21868


commit e34aaa2fc0c1ebf87028d834ea5e9a61bc026bc6
Author: Jason Guo 
Date:   2018-07-25T02:18:22Z

[SPARK-24906][SQL] Adaptively enlarge split / partition size for Parquet 
scan




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21868: [SPARK-24906][SQL] Adaptively enlarge split / partition ...

2018-07-26 Thread habren
Github user habren commented on the issue:

https://github.com/apache/spark/pull/21868
  
@maropu If I understand correct, your concern is about how to calculate


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...

2018-07-26 Thread habren
Github user habren closed the pull request at:

https://github.com/apache/spark/pull/21868


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...

2018-07-26 Thread habren
Github user habren commented on a diff in the pull request:

https://github.com/apache/spark/pull/21868#discussion_r205356861
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 
---
@@ -401,12 +399,41 @@ case class FileSourceScanExec(
   fsRelation: HadoopFsRelation): RDD[InternalRow] = {
 val defaultMaxSplitBytes =
   fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes
-val openCostInBytes = 
fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
+var openCostInBytes = 
fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
 val defaultParallelism = 
fsRelation.sparkSession.sparkContext.defaultParallelism
 val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + 
openCostInBytes)).sum
 val bytesPerCore = totalBytes / defaultParallelism
 
-val maxSplitBytes = Math.min(defaultMaxSplitBytes, 
Math.max(openCostInBytes, bytesPerCore))
+var maxSplitBytes = Math.min(defaultMaxSplitBytes, 
Math.max(openCostInBytes, bytesPerCore))
+if(fsRelation.fileFormat.isInstanceOf[ParquetSource] &&
+  
fsRelation.sparkSession.sessionState.conf.isParquetSizeAdaptiveEnabled) {
+  if (relation.dataSchema.map(_.dataType).forall(dataType =>
+dataType.isInstanceOf[CalendarIntervalType] || 
dataType.isInstanceOf[StructType]
+  || dataType.isInstanceOf[MapType] || 
dataType.isInstanceOf[NullType]
+  || dataType.isInstanceOf[AtomicType] || 
dataType.isInstanceOf[ArrayType])) {
+
+def getTypeLength (dataType : DataType) : Int = {
+  if (dataType.isInstanceOf[StructType]) {
+
fsRelation.sparkSession.sessionState.conf.parquetStructTypeLength
+  } else if (dataType.isInstanceOf[ArrayType]) {
+
fsRelation.sparkSession.sessionState.conf.parquetArrayTypeLength
+  } else if (dataType.isInstanceOf[MapType]) {
+fsRelation.sparkSession.sessionState.conf.parquetMapTypeLength
+  } else {
+dataType.defaultSize
+  }
+}
+
+val selectedColumnSize = 
requiredSchema.map(_.dataType).map(getTypeLength(_))
+  .reduceOption(_ + _).getOrElse(StringType.defaultSize)
+val totalColumnSize = 
relation.dataSchema.map(_.dataType).map(getTypeLength(_))
+  .reduceOption(_ + _).getOrElse(StringType.defaultSize)
+val multiplier = totalColumnSize / selectedColumnSize
--- End diff --

@viirya  As defined in getTypeLength, user can define the complex types' 
length as per the data statistics. And the length for AtomicType can be 
determined by AtomicType.defaultSize. So the multiplier is the ratio of the 
total length of the selected columns to the total length of all columns.

def getTypeLength (dataType : DataType) : Int = {
  if (dataType.isInstanceOf[StructType]) {

fsRelation.sparkSession.sessionState.conf.parquetStructTypeLength
  } else if (dataType.isInstanceOf[ArrayType]) {
fsRelation.sparkSession.sessionState.conf.parquetArrayTypeLength
  } else if (dataType.isInstanceOf[MapType]) {
fsRelation.sparkSession.sessionState.conf.parquetMapTypeLength
  } else {
dataType.defaultSize
  }
}


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...

2018-07-25 Thread habren
Github user habren commented on a diff in the pull request:

https://github.com/apache/spark/pull/21868#discussion_r205288000
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 
---
@@ -401,12 +399,41 @@ case class FileSourceScanExec(
   fsRelation: HadoopFsRelation): RDD[InternalRow] = {
 val defaultMaxSplitBytes =
   fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes
-val openCostInBytes = 
fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
+var openCostInBytes = 
fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
 val defaultParallelism = 
fsRelation.sparkSession.sparkContext.defaultParallelism
 val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + 
openCostInBytes)).sum
 val bytesPerCore = totalBytes / defaultParallelism
 
-val maxSplitBytes = Math.min(defaultMaxSplitBytes, 
Math.max(openCostInBytes, bytesPerCore))
+var maxSplitBytes = Math.min(defaultMaxSplitBytes, 
Math.max(openCostInBytes, bytesPerCore))
+if(fsRelation.fileFormat.isInstanceOf[ParquetSource] &&
+  
fsRelation.sparkSession.sessionState.conf.isParquetSizeAdaptiveEnabled) {
+  if (relation.dataSchema.map(_.dataType).forall(dataType =>
+dataType.isInstanceOf[CalendarIntervalType] || 
dataType.isInstanceOf[StructType]
+  || dataType.isInstanceOf[MapType] || 
dataType.isInstanceOf[NullType]
+  || dataType.isInstanceOf[AtomicType] || 
dataType.isInstanceOf[ArrayType])) {
+
+def getTypeLength (dataType : DataType) : Int = {
+  if (dataType.isInstanceOf[StructType]) {
+
fsRelation.sparkSession.sessionState.conf.parquetStructTypeLength
+  } else if (dataType.isInstanceOf[ArrayType]) {
+
fsRelation.sparkSession.sessionState.conf.parquetArrayTypeLength
+  } else if (dataType.isInstanceOf[MapType]) {
+fsRelation.sparkSession.sessionState.conf.parquetMapTypeLength
+  } else {
+dataType.defaultSize
+  }
+}
+
+val selectedColumnSize = 
requiredSchema.map(_.dataType).map(getTypeLength(_))
+  .reduceOption(_ + _).getOrElse(StringType.defaultSize)
+val totalColumnSize = 
relation.dataSchema.map(_.dataType).map(getTypeLength(_))
+  .reduceOption(_ + _).getOrElse(StringType.defaultSize)
+val multiplier = totalColumnSize / selectedColumnSize
--- End diff --

There are many data types. CalendarIntervalType  StructType  MapType  
NullType   UserDefinedType AtomicType(TimestampType  StringType  HiveStringType 
 BooleanType  DateType  BinaryType  NumericType)  ObjectType  ArrayType. For 
AtomicType, the size is fixed to the defaultSize. For complex type, such as 
StructType, MapType, ArrayType, the size is mutable. So I make it configurable 
with default value. With the data type size, multiplier is not only the ratio 
of selected columns to total columns, but the total size of selected columns to 
total size of all columns.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...

2018-07-25 Thread habren
Github user habren commented on a diff in the pull request:

https://github.com/apache/spark/pull/21868#discussion_r205287123
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -381,6 +381,26 @@ object SQLConf {
   .booleanConf
   .createWithDefault(true)
 
+  val IS_PARQUET_PARTITION_ADAPTIVE_ENABLED = 
buildConf("spark.sql.parquet.adaptiveFileSplit")
+.doc("For columnar file format (e.g., Parquet), it's possible that 
only few (not all) " +
+  "columns are needed. So, it's better to make sure that the total 
size of the selected " +
+  "columns is about 128 MB "
+)
+.booleanConf
+.createWithDefault(false)
+
+  val PARQUET_STRUCT_LENGTH = buildConf("spark.sql.parquet.struct.length")
+.intConf
+.createWithDefault(StructType.defaultConcreteType.defaultSize)
+
+  val PARQUET_MAP_LENGTH = buildConf("spark.sql.parquet.map.length")
+.intConf
+.createWithDefault(MapType.defaultConcreteType.defaultSize)
+
+  val PARQUET_ARRAY_LENGTH = buildConf("spark.sql.parquet.array.length")
+.intConf
+.createWithDefault(ArrayType.defaultConcreteType.defaultSize)
--- End diff --

Thanks for your comments. I set the default value to StringType.defaultSize 
(8). It's default size, use should configure it according to the real data


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...

2018-07-24 Thread habren
GitHub user habren opened a pull request:

https://github.com/apache/spark/pull/21868

[SPARK-24906][SQL] Adaptively enlarge split / partition size for Parq…

Please refer to https://issues.apache.org/jira/browse/SPARK-24906 for more 
detail and test

For columnar file, such as, when spark sql read the table, each split will 
be 128 MB by default since spark.sql.files.maxPartitionBytes is default to 
128MB. Even when user set it to a large value, such as 512MB, the task may read 
only few MB or even hundreds of KB. Because the table (Parquet) may consists of 
dozens of columns while the SQL only need few columns. And spark will prune the 
unnecessary columns.

In this case, spark DataSourceScanExec can enlarge maxPartitionBytes 
adaptively. 

For example, there is 40 columns , 20 are integer while another 20 are 
long. When use query on an integer type column and an long type column, the 
maxPartitionBytes should be 20 times larger. (20*4+20*8) /  (4+8) = 20. 

 With this optimization, the number of task will be smaller and the job 
will run faster. More importantly, for a very large cluster (more the 10 
thousand nodes), it will relieve RM's schedule pressure.

 Here is the test

 The table named test2 has more than 40 columns and there are more than 5 
TB data each hour.

When we issue a very simple query 

` select count(device_id) from test2 where date=20180708 and hour='23'`
 
There are 72176 tasks and the duration of the job is 4.8 minutes

Most tasks last less than 1 second and read less than 1.5 MB data


After the optimization, there are only 1615 tasks and the job last only 30 
seconds. It almost 10 times faster.

The median of read data is 44.2MB. 

https://issues.apache.org/jira/browse/SPARK-24906


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/habren/spark SPARK-24906

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21868.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21868


commit 9ff34525e346e6e1cbe4b12fc6f972a163fd920e
Author: 郭俊 
Date:   2018-07-25T02:07:38Z

[SPARK-24906][SQL] Adaptively enlarge split / partition size for Parquet 
scan




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org