[GitHub] spark pull request #22152: [SPARK-25159][SQL] json schema inference should o...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22152#discussion_r212183703 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala --- @@ -69,10 +70,17 @@ private[sql] object JsonInferSchema { }.reduceOption(typeMerger).toIterator } -// Here we get RDD local iterator then fold, instead of calling `RDD.fold` directly, because -// `RDD.fold` will run the fold function in DAGScheduler event loop thread, which may not have -// active SparkSession and `SQLConf.get` may point to the wrong configs. -val rootType = mergedTypesFromPartitions.toLocalIterator.fold(StructType(Nil))(typeMerger) +// Here we manually submit a fold-like Spark job, so that we can set the SQLConf when running +// the fold functions in the scheduler event loop thread. +val existingConf = SQLConf.get +var rootType: DataType = StructType(Nil) +val foldPartition = (iter: Iterator[DataType]) => iter.fold(StructType(Nil))(typeMerger) +val mergeResult = (index: Int, taskResult: DataType) => { + rootType = SQLConf.withExistingConf(existingConf) { --- End diff -- Same question was in my mind. thanks for clarification. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22152: [SPARK-25159][SQL] json schema inference should o...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22152#discussion_r211860311 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala --- @@ -69,10 +70,17 @@ private[sql] object JsonInferSchema { }.reduceOption(typeMerger).toIterator } -// Here we get RDD local iterator then fold, instead of calling `RDD.fold` directly, because -// `RDD.fold` will run the fold function in DAGScheduler event loop thread, which may not have -// active SparkSession and `SQLConf.get` may point to the wrong configs. -val rootType = mergedTypesFromPartitions.toLocalIterator.fold(StructType(Nil))(typeMerger) +// Here we manually submit a fold-like Spark job, so that we can set the SQLConf when running +// the fold functions in the scheduler event loop thread. +val existingConf = SQLConf.get +var rootType: DataType = StructType(Nil) +val foldPartition = (iter: Iterator[DataType]) => iter.fold(StructType(Nil))(typeMerger) +val mergeResult = (index: Int, taskResult: DataType) => { + rootType = SQLConf.withExistingConf(existingConf) { --- End diff -- yes, makes sense, thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22152: [SPARK-25159][SQL] json schema inference should o...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/22152 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22152: [SPARK-25159][SQL] json schema inference should o...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22152#discussion_r211815985 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala --- @@ -69,10 +70,17 @@ private[sql] object JsonInferSchema { }.reduceOption(typeMerger).toIterator } -// Here we get RDD local iterator then fold, instead of calling `RDD.fold` directly, because -// `RDD.fold` will run the fold function in DAGScheduler event loop thread, which may not have -// active SparkSession and `SQLConf.get` may point to the wrong configs. -val rootType = mergedTypesFromPartitions.toLocalIterator.fold(StructType(Nil))(typeMerger) +// Here we manually submit a fold-like Spark job, so that we can set the SQLConf when running +// the fold functions in the scheduler event loop thread. +val existingConf = SQLConf.get +var rootType: DataType = StructType(Nil) +val foldPartition = (iter: Iterator[DataType]) => iter.fold(StructType(Nil))(typeMerger) +val mergeResult = (index: Int, taskResult: DataType) => { + rootType = SQLConf.withExistingConf(existingConf) { --- End diff -- the schema can be very complex (e.g. very wide and deep schema). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22152: [SPARK-25159][SQL] json schema inference should o...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22152#discussion_r211626457 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala --- @@ -2528,4 +2529,27 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { checkAnswer(aggPlusFilter1, aggPlusFilter2.collect()) } } + + test("SPARK-25159: json schema inference should only trigger one job") { +withTempPath { path => + // This test is to prove that the `JsonInferSchema` does not use `RDD#toLocalIterator` which + // triggers one Spark job per RDD partition. + Seq(1 -> "a", 2 -> "b").toDF("i", "p") +// The data set has 2 partitions, so Spark will write at least 2 json files. +// Use a non-splittable compression (gzip), to make sure the json scan RDD has at lease 2 --- End diff -- nit: `at least` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22152: [SPARK-25159][SQL] json schema inference should o...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22152#discussion_r211621168 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala --- @@ -69,10 +70,17 @@ private[sql] object JsonInferSchema { }.reduceOption(typeMerger).toIterator } -// Here we get RDD local iterator then fold, instead of calling `RDD.fold` directly, because -// `RDD.fold` will run the fold function in DAGScheduler event loop thread, which may not have -// active SparkSession and `SQLConf.get` may point to the wrong configs. -val rootType = mergedTypesFromPartitions.toLocalIterator.fold(StructType(Nil))(typeMerger) +// Here we manually submit a fold-like Spark job, so that we can set the SQLConf when running +// the fold functions in the scheduler event loop thread. +val existingConf = SQLConf.get +var rootType: DataType = StructType(Nil) +val foldPartition = (iter: Iterator[DataType]) => iter.fold(StructType(Nil))(typeMerger) +val mergeResult = (index: Int, taskResult: DataType) => { + rootType = SQLConf.withExistingConf(existingConf) { --- End diff -- it would contain one result per partition, do you think this is enough to cause GC problems? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22152: [SPARK-25159][SQL] json schema inference should o...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22152#discussion_r211607005 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala --- @@ -69,10 +70,17 @@ private[sql] object JsonInferSchema { }.reduceOption(typeMerger).toIterator } -// Here we get RDD local iterator then fold, instead of calling `RDD.fold` directly, because -// `RDD.fold` will run the fold function in DAGScheduler event loop thread, which may not have -// active SparkSession and `SQLConf.get` may point to the wrong configs. -val rootType = mergedTypesFromPartitions.toLocalIterator.fold(StructType(Nil))(typeMerger) +// Here we manually submit a fold-like Spark job, so that we can set the SQLConf when running +// the fold functions in the scheduler event loop thread. +val existingConf = SQLConf.get +var rootType: DataType = StructType(Nil) +val foldPartition = (iter: Iterator[DataType]) => iter.fold(StructType(Nil))(typeMerger) +val mergeResult = (index: Int, taskResult: DataType) => { + rootType = SQLConf.withExistingConf(existingConf) { --- End diff -- ah good point! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22152: [SPARK-25159][SQL] json schema inference should o...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22152#discussion_r211599957 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala --- @@ -69,10 +70,17 @@ private[sql] object JsonInferSchema { }.reduceOption(typeMerger).toIterator } -// Here we get RDD local iterator then fold, instead of calling `RDD.fold` directly, because -// `RDD.fold` will run the fold function in DAGScheduler event loop thread, which may not have -// active SparkSession and `SQLConf.get` may point to the wrong configs. -val rootType = mergedTypesFromPartitions.toLocalIterator.fold(StructType(Nil))(typeMerger) +// Here we manually submit a fold-like Spark job, so that we can set the SQLConf when running +// the fold functions in the scheduler event loop thread. +val existingConf = SQLConf.get +var rootType: DataType = StructType(Nil) +val foldPartition = (iter: Iterator[DataType]) => iter.fold(StructType(Nil))(typeMerger) --- End diff -- Yeah, agreed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22152: [SPARK-25159][SQL] json schema inference should o...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22152#discussion_r211584402 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -116,16 +129,24 @@ object SQLConf { if (TaskContext.get != null) { new ReadOnlySQLConf(TaskContext.get()) } else { - if (Utils.isTesting && SparkContext.getActive.isDefined) { + val isSchedulerEventLoopThread = SparkContext.getActive +.map(_.dagScheduler.eventProcessLoop.eventThread) +.exists(_.getId == Thread.currentThread().getId) + if (isSchedulerEventLoopThread) { // DAGScheduler event loop thread does not have an active SparkSession, the `confGetter` -// will return `fallbackConf` which is unexpected. Here we prevent it from happening. -val schedulerEventLoopThread = - SparkContext.getActive.get.dagScheduler.eventProcessLoop.eventThread -if (schedulerEventLoopThread.getId == Thread.currentThread().getId) { +// will return `fallbackConf` which is unexpected. Here we requires the caller to get the --- End diff -- nit: we require --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22152: [SPARK-25159][SQL] json schema inference should o...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22152#discussion_r211585144 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala --- @@ -69,10 +70,17 @@ private[sql] object JsonInferSchema { }.reduceOption(typeMerger).toIterator } -// Here we get RDD local iterator then fold, instead of calling `RDD.fold` directly, because -// `RDD.fold` will run the fold function in DAGScheduler event loop thread, which may not have -// active SparkSession and `SQLConf.get` may point to the wrong configs. -val rootType = mergedTypesFromPartitions.toLocalIterator.fold(StructType(Nil))(typeMerger) +// Here we manually submit a fold-like Spark job, so that we can set the SQLConf when running +// the fold functions in the scheduler event loop thread. +val existingConf = SQLConf.get +var rootType: DataType = StructType(Nil) +val foldPartition = (iter: Iterator[DataType]) => iter.fold(StructType(Nil))(typeMerger) +val mergeResult = (index: Int, taskResult: DataType) => { + rootType = SQLConf.withExistingConf(existingConf) { --- End diff -- just a question, wouldn't: ``` val partitionsResult = json.sparkContext.runJob(mergedTypesFromPartitions, foldPartition) partitionsResult.fold(typeMerger) ``` do the same without requiring these changes? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22152: [SPARK-25159][SQL] json schema inference should o...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22152#discussion_r211496317 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala --- @@ -69,10 +70,17 @@ private[sql] object JsonInferSchema { }.reduceOption(typeMerger).toIterator } -// Here we get RDD local iterator then fold, instead of calling `RDD.fold` directly, because -// `RDD.fold` will run the fold function in DAGScheduler event loop thread, which may not have -// active SparkSession and `SQLConf.get` may point to the wrong configs. -val rootType = mergedTypesFromPartitions.toLocalIterator.fold(StructType(Nil))(typeMerger) +// Here we manually submit a fold-like Spark job, so that we can set the SQLConf when running +// the fold functions in the scheduler event loop thread. +val existingConf = SQLConf.get +var rootType: DataType = StructType(Nil) +val foldPartition = (iter: Iterator[DataType]) => iter.fold(StructType(Nil))(typeMerger) --- End diff -- This closure is defined by us and I don't think we leak outer reference here. If we do, it's a bug and we should fix it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22152: [SPARK-25159][SQL] json schema inference should o...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22152#discussion_r211189624 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala --- @@ -69,10 +70,17 @@ private[sql] object JsonInferSchema { }.reduceOption(typeMerger).toIterator } -// Here we get RDD local iterator then fold, instead of calling `RDD.fold` directly, because -// `RDD.fold` will run the fold function in DAGScheduler event loop thread, which may not have -// active SparkSession and `SQLConf.get` may point to the wrong configs. -val rootType = mergedTypesFromPartitions.toLocalIterator.fold(StructType(Nil))(typeMerger) +// Here we manually submit a fold-like Spark job, so that we can set the SQLConf when running +// the fold functions in the scheduler event loop thread. +val existingConf = SQLConf.get +var rootType: DataType = StructType(Nil) +val foldPartition = (iter: Iterator[DataType]) => iter.fold(StructType(Nil))(typeMerger) --- End diff -- Need to do `sc.clean(typeMerger)` manually here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22152: [SPARK-25159][SQL] json schema inference should o...
GitHub user cloud-fan opened a pull request: https://github.com/apache/spark/pull/22152 [SPARK-25159][SQL] json schema inference should only trigger one job ## What changes were proposed in this pull request? This fixes a perf regression caused by https://github.com/apache/spark/pull/21376 . We should not use `RDD#toLocalIterator`, which triggers one Spark job per RDD partition. This is very bad for RDDs with a lot of small partitions. To fix it, this PR introduces a way to access SQLConf in the scheduler event loop thread, so that we don't need to use `RDD#toLocalIterator` anymore in `JsonInferSchema`. ## How was this patch tested? a new test You can merge this pull request into a Git repository by running: $ git pull https://github.com/cloud-fan/spark conf Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22152.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22152 commit cf13d71cb1b23ad6e5ad4644df8c591bfb7a00f9 Author: Wenchen Fan Date: 2018-08-17T04:30:31Z allow accessing SQLConf in the scheduler event loop thread --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org