Repository: spark
Updated Branches:
  refs/heads/branch-1.5 f75c64b0c -> 94b2f5b32


[SPARK-9743] [SQL] Fixes JSONRelation refreshing

PR #7696 added two `HadoopFsRelation.refresh()` calls ([this] [1], and [this] 
[2]) in `DataSourceStrategy` to make test case `InsertSuite.save directly to 
the path of a JSON table` pass. However, this forces every `HadoopFsRelation` 
table scan to do a refresh, which can be super expensive for tables with large 
number of partitions.

The reason why the original test case fails without the `refresh()` calls is 
that, the old JSON relation builds the base RDD with the input paths, while 
`HadoopFsRelation` provides `FileStatus`es of leaf files. With the old JSON 
relation, we can create a temporary table based on a path, writing data to 
that, and then read newly written data without refreshing the table. This is no 
long true for `HadoopFsRelation`.

This PR removes those two expensive refresh calls, and moves the refresh into 
`JSONRelation` to fix this issue. We might want to update `HadoopFsRelation` 
interface to provide better support for this use case.

[1]: 
https://github.com/apache/spark/blob/ebfd91c542aaead343cb154277fcf9114382fee7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala#L63
[2]: 
https://github.com/apache/spark/blob/ebfd91c542aaead343cb154277fcf9114382fee7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala#L91

Author: Cheng Lian <l...@databricks.com>

Closes #8035 from liancheng/spark-9743/fix-json-relation-refreshing and 
squashes the following commits:

ec1957d [Cheng Lian] Fixes JSONRelation refreshing

(cherry picked from commit e3fef0f9e17b1766a3869cb80ce7e4cd521cb7b6)
Signed-off-by: Yin Huai <yh...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/94b2f5b3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/94b2f5b3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/94b2f5b3

Branch: refs/heads/branch-1.5
Commit: 94b2f5b3213553278ead376c24e63f019a18e793
Parents: f75c64b
Author: Cheng Lian <l...@databricks.com>
Authored: Mon Aug 10 09:07:08 2015 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Mon Aug 10 09:07:20 2015 -0700

----------------------------------------------------------------------
 .../datasources/DataSourceStrategy.scala         |  2 --
 .../org/apache/spark/sql/json/JSONRelation.scala | 19 +++++++++++++++----
 .../apache/spark/sql/sources/interfaces.scala    |  2 +-
 .../apache/spark/sql/sources/InsertSuite.scala   | 10 +++++-----
 4 files changed, 21 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/94b2f5b3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 5b5fa8c..78a4acd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -60,7 +60,6 @@ private[sql] object DataSourceStrategy extends Strategy with 
Logging {
     // Scanning partitioned HadoopFsRelation
     case PhysicalOperation(projects, filters, l @ LogicalRelation(t: 
HadoopFsRelation))
         if t.partitionSpec.partitionColumns.nonEmpty =>
-      t.refresh()
       val selectedPartitions = prunePartitions(filters, 
t.partitionSpec).toArray
 
       logInfo {
@@ -88,7 +87,6 @@ private[sql] object DataSourceStrategy extends Strategy with 
Logging {
 
     // Scanning non-partitioned HadoopFsRelation
     case PhysicalOperation(projects, filters, l @ LogicalRelation(t: 
HadoopFsRelation)) =>
-      t.refresh()
       // See buildPartitionedTableScan for the reason that we need to create a 
shard
       // broadcast HadoopConf.
       val sharedHadoopConf = SparkHadoopUtil.get.conf

http://git-wip-us.apache.org/repos/asf/spark/blob/94b2f5b3/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
index b34a272..5bb9e62 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
@@ -22,20 +22,22 @@ import java.io.CharArrayWriter
 import com.fasterxml.jackson.core.JsonFactory
 import com.google.common.base.Objects
 import org.apache.hadoop.fs.{FileStatus, Path}
-import org.apache.hadoop.io.{Text, LongWritable, NullWritable}
+import org.apache.hadoop.io.{LongWritable, NullWritable, Text}
 import org.apache.hadoop.mapred.{JobConf, TextInputFormat}
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
-import org.apache.hadoop.mapreduce.{RecordWriter, TaskAttemptContext, Job}
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
+import org.apache.hadoop.mapreduce.{Job, RecordWriter, TaskAttemptContext}
+
 import org.apache.spark.Logging
+import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.mapred.SparkHadoopMapRedUtil
-
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.datasources.PartitionSpec
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
+import org.apache.spark.util.SerializableConfiguration
 
 private[sql] class DefaultSource extends HadoopFsRelationProvider with 
DataSourceRegister {
 
@@ -108,6 +110,15 @@ private[sql] class JSONRelation(
     jsonSchema
   }
 
+  override private[sql] def buildScan(
+      requiredColumns: Array[String],
+      filters: Array[Filter],
+      inputPaths: Array[String],
+      broadcastedConf: Broadcast[SerializableConfiguration]): RDD[Row] = {
+    refresh()
+    super.buildScan(requiredColumns, filters, inputPaths, broadcastedConf)
+  }
+
   override def buildScan(
       requiredColumns: Array[String],
       filters: Array[Filter],

http://git-wip-us.apache.org/repos/asf/spark/blob/94b2f5b3/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 4aafec0..6bcabba 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -555,7 +555,7 @@ abstract class HadoopFsRelation 
private[sql](maybePartitionSpec: Option[Partitio
     })
   }
 
-  private[sql] final def buildScan(
+  private[sql] def buildScan(
       requiredColumns: Array[String],
       filters: Array[Filter],
       inputPaths: Array[String],

http://git-wip-us.apache.org/repos/asf/spark/blob/94b2f5b3/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
index 39d18d7..cdbfaf6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
@@ -32,9 +32,9 @@ class InsertSuite extends DataSourceTest with 
BeforeAndAfterAll {
 
   var path: File = null
 
-  override def beforeAll: Unit = {
+  override def beforeAll(): Unit = {
     path = Utils.createTempDir()
-    val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, 
"b":"str${i}"}"""))
+    val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, 
"b":"str$i"}"""))
     caseInsensitiveContext.read.json(rdd).registerTempTable("jt")
     sql(
       s"""
@@ -46,7 +46,7 @@ class InsertSuite extends DataSourceTest with 
BeforeAndAfterAll {
       """.stripMargin)
   }
 
-  override def afterAll: Unit = {
+  override def afterAll(): Unit = {
     caseInsensitiveContext.dropTempTable("jsonTable")
     caseInsensitiveContext.dropTempTable("jt")
     Utils.deleteRecursively(path)
@@ -110,7 +110,7 @@ class InsertSuite extends DataSourceTest with 
BeforeAndAfterAll {
     )
 
     // Writing the table to less part files.
-    val rdd1 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, 
"b":"str${i}"}"""), 5)
+    val rdd1 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, 
"b":"str$i"}"""), 5)
     caseInsensitiveContext.read.json(rdd1).registerTempTable("jt1")
     sql(
       s"""
@@ -122,7 +122,7 @@ class InsertSuite extends DataSourceTest with 
BeforeAndAfterAll {
     )
 
     // Writing the table to more part files.
-    val rdd2 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, 
"b":"str${i}"}"""), 10)
+    val rdd2 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, 
"b":"str$i"}"""), 10)
     caseInsensitiveContext.read.json(rdd2).registerTempTable("jt2")
     sql(
       s"""


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to