[GitHub] spark pull request #22152: [SPARK-25159][SQL] json schema inference should o...

2018-08-22 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22152#discussion_r212183703
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
 ---
@@ -69,10 +70,17 @@ private[sql] object JsonInferSchema {
   }.reduceOption(typeMerger).toIterator
 }
 
-// Here we get RDD local iterator then fold, instead of calling 
`RDD.fold` directly, because
-// `RDD.fold` will run the fold function in DAGScheduler event loop 
thread, which may not have
-// active SparkSession and `SQLConf.get` may point to the wrong 
configs.
-val rootType = 
mergedTypesFromPartitions.toLocalIterator.fold(StructType(Nil))(typeMerger)
+// Here we manually submit a fold-like Spark job, so that we can set 
the SQLConf when running
+// the fold functions in the scheduler event loop thread.
+val existingConf = SQLConf.get
+var rootType: DataType = StructType(Nil)
+val foldPartition = (iter: Iterator[DataType]) => 
iter.fold(StructType(Nil))(typeMerger)
+val mergeResult = (index: Int, taskResult: DataType) => {
+  rootType = SQLConf.withExistingConf(existingConf) {
--- End diff --

Same question was in my mind. thanks for clarification.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22152: [SPARK-25159][SQL] json schema inference should o...

2018-08-22 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22152#discussion_r211860311
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
 ---
@@ -69,10 +70,17 @@ private[sql] object JsonInferSchema {
   }.reduceOption(typeMerger).toIterator
 }
 
-// Here we get RDD local iterator then fold, instead of calling 
`RDD.fold` directly, because
-// `RDD.fold` will run the fold function in DAGScheduler event loop 
thread, which may not have
-// active SparkSession and `SQLConf.get` may point to the wrong 
configs.
-val rootType = 
mergedTypesFromPartitions.toLocalIterator.fold(StructType(Nil))(typeMerger)
+// Here we manually submit a fold-like Spark job, so that we can set 
the SQLConf when running
+// the fold functions in the scheduler event loop thread.
+val existingConf = SQLConf.get
+var rootType: DataType = StructType(Nil)
+val foldPartition = (iter: Iterator[DataType]) => 
iter.fold(StructType(Nil))(typeMerger)
+val mergeResult = (index: Int, taskResult: DataType) => {
+  rootType = SQLConf.withExistingConf(existingConf) {
--- End diff --

yes, makes sense, thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22152: [SPARK-25159][SQL] json schema inference should o...

2018-08-21 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/22152


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22152: [SPARK-25159][SQL] json schema inference should o...

2018-08-21 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22152#discussion_r211815985
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
 ---
@@ -69,10 +70,17 @@ private[sql] object JsonInferSchema {
   }.reduceOption(typeMerger).toIterator
 }
 
-// Here we get RDD local iterator then fold, instead of calling 
`RDD.fold` directly, because
-// `RDD.fold` will run the fold function in DAGScheduler event loop 
thread, which may not have
-// active SparkSession and `SQLConf.get` may point to the wrong 
configs.
-val rootType = 
mergedTypesFromPartitions.toLocalIterator.fold(StructType(Nil))(typeMerger)
+// Here we manually submit a fold-like Spark job, so that we can set 
the SQLConf when running
+// the fold functions in the scheduler event loop thread.
+val existingConf = SQLConf.get
+var rootType: DataType = StructType(Nil)
+val foldPartition = (iter: Iterator[DataType]) => 
iter.fold(StructType(Nil))(typeMerger)
+val mergeResult = (index: Int, taskResult: DataType) => {
+  rootType = SQLConf.withExistingConf(existingConf) {
--- End diff --

the schema can be very complex (e.g. very wide and deep schema).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22152: [SPARK-25159][SQL] json schema inference should o...

2018-08-21 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22152#discussion_r211626457
  
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala 
---
@@ -2528,4 +2529,27 @@ class DataFrameSuite extends QueryTest with 
SharedSQLContext {
   checkAnswer(aggPlusFilter1, aggPlusFilter2.collect())
 }
   }
+
+  test("SPARK-25159: json schema inference should only trigger one job") {
+withTempPath { path =>
+  // This test is to prove that the `JsonInferSchema` does not use 
`RDD#toLocalIterator` which
+  // triggers one Spark job per RDD partition.
+  Seq(1 -> "a", 2 -> "b").toDF("i", "p")
+// The data set has 2 partitions, so Spark will write at least 2 
json files.
+// Use a non-splittable compression (gzip), to make sure the json 
scan RDD has at lease 2
--- End diff --

nit: `at least`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22152: [SPARK-25159][SQL] json schema inference should o...

2018-08-21 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22152#discussion_r211621168
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
 ---
@@ -69,10 +70,17 @@ private[sql] object JsonInferSchema {
   }.reduceOption(typeMerger).toIterator
 }
 
-// Here we get RDD local iterator then fold, instead of calling 
`RDD.fold` directly, because
-// `RDD.fold` will run the fold function in DAGScheduler event loop 
thread, which may not have
-// active SparkSession and `SQLConf.get` may point to the wrong 
configs.
-val rootType = 
mergedTypesFromPartitions.toLocalIterator.fold(StructType(Nil))(typeMerger)
+// Here we manually submit a fold-like Spark job, so that we can set 
the SQLConf when running
+// the fold functions in the scheduler event loop thread.
+val existingConf = SQLConf.get
+var rootType: DataType = StructType(Nil)
+val foldPartition = (iter: Iterator[DataType]) => 
iter.fold(StructType(Nil))(typeMerger)
+val mergeResult = (index: Int, taskResult: DataType) => {
+  rootType = SQLConf.withExistingConf(existingConf) {
--- End diff --

it would contain one result per partition, do you think this is enough to 
cause GC problems?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22152: [SPARK-25159][SQL] json schema inference should o...

2018-08-21 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22152#discussion_r211607005
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
 ---
@@ -69,10 +70,17 @@ private[sql] object JsonInferSchema {
   }.reduceOption(typeMerger).toIterator
 }
 
-// Here we get RDD local iterator then fold, instead of calling 
`RDD.fold` directly, because
-// `RDD.fold` will run the fold function in DAGScheduler event loop 
thread, which may not have
-// active SparkSession and `SQLConf.get` may point to the wrong 
configs.
-val rootType = 
mergedTypesFromPartitions.toLocalIterator.fold(StructType(Nil))(typeMerger)
+// Here we manually submit a fold-like Spark job, so that we can set 
the SQLConf when running
+// the fold functions in the scheduler event loop thread.
+val existingConf = SQLConf.get
+var rootType: DataType = StructType(Nil)
+val foldPartition = (iter: Iterator[DataType]) => 
iter.fold(StructType(Nil))(typeMerger)
+val mergeResult = (index: Int, taskResult: DataType) => {
+  rootType = SQLConf.withExistingConf(existingConf) {
--- End diff --

ah good point!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22152: [SPARK-25159][SQL] json schema inference should o...

2018-08-21 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22152#discussion_r211599957
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
 ---
@@ -69,10 +70,17 @@ private[sql] object JsonInferSchema {
   }.reduceOption(typeMerger).toIterator
 }
 
-// Here we get RDD local iterator then fold, instead of calling 
`RDD.fold` directly, because
-// `RDD.fold` will run the fold function in DAGScheduler event loop 
thread, which may not have
-// active SparkSession and `SQLConf.get` may point to the wrong 
configs.
-val rootType = 
mergedTypesFromPartitions.toLocalIterator.fold(StructType(Nil))(typeMerger)
+// Here we manually submit a fold-like Spark job, so that we can set 
the SQLConf when running
+// the fold functions in the scheduler event loop thread.
+val existingConf = SQLConf.get
+var rootType: DataType = StructType(Nil)
+val foldPartition = (iter: Iterator[DataType]) => 
iter.fold(StructType(Nil))(typeMerger)
--- End diff --

Yeah, agreed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22152: [SPARK-25159][SQL] json schema inference should o...

2018-08-21 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22152#discussion_r211584402
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -116,16 +129,24 @@ object SQLConf {
 if (TaskContext.get != null) {
   new ReadOnlySQLConf(TaskContext.get())
 } else {
-  if (Utils.isTesting && SparkContext.getActive.isDefined) {
+  val isSchedulerEventLoopThread = SparkContext.getActive
+.map(_.dagScheduler.eventProcessLoop.eventThread)
+.exists(_.getId == Thread.currentThread().getId)
+  if (isSchedulerEventLoopThread) {
 // DAGScheduler event loop thread does not have an active 
SparkSession, the `confGetter`
-// will return `fallbackConf` which is unexpected. Here we prevent 
it from happening.
-val schedulerEventLoopThread =
-  
SparkContext.getActive.get.dagScheduler.eventProcessLoop.eventThread
-if (schedulerEventLoopThread.getId == 
Thread.currentThread().getId) {
+// will return `fallbackConf` which is unexpected. Here we 
requires the caller to get the
--- End diff --

nit: we require


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22152: [SPARK-25159][SQL] json schema inference should o...

2018-08-21 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22152#discussion_r211585144
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
 ---
@@ -69,10 +70,17 @@ private[sql] object JsonInferSchema {
   }.reduceOption(typeMerger).toIterator
 }
 
-// Here we get RDD local iterator then fold, instead of calling 
`RDD.fold` directly, because
-// `RDD.fold` will run the fold function in DAGScheduler event loop 
thread, which may not have
-// active SparkSession and `SQLConf.get` may point to the wrong 
configs.
-val rootType = 
mergedTypesFromPartitions.toLocalIterator.fold(StructType(Nil))(typeMerger)
+// Here we manually submit a fold-like Spark job, so that we can set 
the SQLConf when running
+// the fold functions in the scheduler event loop thread.
+val existingConf = SQLConf.get
+var rootType: DataType = StructType(Nil)
+val foldPartition = (iter: Iterator[DataType]) => 
iter.fold(StructType(Nil))(typeMerger)
+val mergeResult = (index: Int, taskResult: DataType) => {
+  rootType = SQLConf.withExistingConf(existingConf) {
--- End diff --

just a question, wouldn't:
```
val partitionsResult = json.sparkContext.runJob(mergedTypesFromPartitions, 
foldPartition)
partitionsResult.fold(typeMerger)
```
do the same without requiring these changes?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22152: [SPARK-25159][SQL] json schema inference should o...

2018-08-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22152#discussion_r211496317
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
 ---
@@ -69,10 +70,17 @@ private[sql] object JsonInferSchema {
   }.reduceOption(typeMerger).toIterator
 }
 
-// Here we get RDD local iterator then fold, instead of calling 
`RDD.fold` directly, because
-// `RDD.fold` will run the fold function in DAGScheduler event loop 
thread, which may not have
-// active SparkSession and `SQLConf.get` may point to the wrong 
configs.
-val rootType = 
mergedTypesFromPartitions.toLocalIterator.fold(StructType(Nil))(typeMerger)
+// Here we manually submit a fold-like Spark job, so that we can set 
the SQLConf when running
+// the fold functions in the scheduler event loop thread.
+val existingConf = SQLConf.get
+var rootType: DataType = StructType(Nil)
+val foldPartition = (iter: Iterator[DataType]) => 
iter.fold(StructType(Nil))(typeMerger)
--- End diff --

This closure is defined by us and I don't think we leak outer reference 
here. If we do, it's a bug and we should fix it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22152: [SPARK-25159][SQL] json schema inference should o...

2018-08-20 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22152#discussion_r211189624
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
 ---
@@ -69,10 +70,17 @@ private[sql] object JsonInferSchema {
   }.reduceOption(typeMerger).toIterator
 }
 
-// Here we get RDD local iterator then fold, instead of calling 
`RDD.fold` directly, because
-// `RDD.fold` will run the fold function in DAGScheduler event loop 
thread, which may not have
-// active SparkSession and `SQLConf.get` may point to the wrong 
configs.
-val rootType = 
mergedTypesFromPartitions.toLocalIterator.fold(StructType(Nil))(typeMerger)
+// Here we manually submit a fold-like Spark job, so that we can set 
the SQLConf when running
+// the fold functions in the scheduler event loop thread.
+val existingConf = SQLConf.get
+var rootType: DataType = StructType(Nil)
+val foldPartition = (iter: Iterator[DataType]) => 
iter.fold(StructType(Nil))(typeMerger)
--- End diff --

Need to do `sc.clean(typeMerger)` manually here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22152: [SPARK-25159][SQL] json schema inference should o...

2018-08-20 Thread cloud-fan
GitHub user cloud-fan opened a pull request:

https://github.com/apache/spark/pull/22152

[SPARK-25159][SQL] json schema inference should only trigger one job

## What changes were proposed in this pull request?

This fixes a perf regression caused by 
https://github.com/apache/spark/pull/21376 .

We should not use `RDD#toLocalIterator`, which triggers one Spark job per 
RDD partition. This is very bad for RDDs with a lot of small partitions.

To fix it, this PR introduces a way to access SQLConf in the scheduler 
event loop thread, so that we don't need to use `RDD#toLocalIterator` anymore 
in `JsonInferSchema`.

## How was this patch tested?

a new test

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cloud-fan/spark conf

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22152.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22152


commit cf13d71cb1b23ad6e5ad4644df8c591bfb7a00f9
Author: Wenchen Fan 
Date:   2018-08-17T04:30:31Z

allow accessing SQLConf in the scheduler event loop thread




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org