Repository: spark
Updated Branches:
  refs/heads/master c980e20cf -> e2ec018e3


[SPARK-9285] [SQL] Fixes Row/InternalRow conversion for HadoopFsRelation

This is a follow-up of #7626. It fixes `Row`/`InternalRow` conversion for data 
sources extending `HadoopFsRelation` with `needConversion` being `true`.

Author: Cheng Lian <l...@databricks.com>

Closes #7649 from liancheng/spark-9285-conversion-fix and squashes the 
following commits:

036a50c [Cheng Lian] Addresses PR comment
f6d7c6a [Cheng Lian] Fixes Row/InternalRow conversion for HadoopFsRelation


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e2ec018e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e2ec018e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e2ec018e

Branch: refs/heads/master
Commit: e2ec018e37cb699077b5fa2bd662f2055cb42296
Parents: c980e20
Author: Cheng Lian <l...@databricks.com>
Authored: Sat Jul 25 11:42:49 2015 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Sat Jul 25 11:42:49 2015 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/sources/interfaces.scala   | 23 +++++++++++++++++---
 .../SimpleTextHadoopFsRelationSuite.scala       |  5 -----
 2 files changed, 20 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e2ec018e/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 119bac7..7126145 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -28,7 +28,7 @@ import org.apache.spark.{Logging, SparkContext}
 import org.apache.spark.annotation.{DeveloperApi, Experimental}
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
 import org.apache.spark.sql.catalyst.expressions._
 import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
 import org.apache.spark.sql.execution.RDDConversions
@@ -593,6 +593,11 @@ abstract class HadoopFsRelation 
private[sql](maybePartitionSpec: Option[Partitio
    *
    * @since 1.4.0
    */
+  // TODO Tries to eliminate the extra Catalyst-to-Scala conversion when 
`needConversion` is true
+  //
+  // PR #7626 separated `Row` and `InternalRow` completely.  One of the 
consequences is that we can
+  // no longer treat an `InternalRow` containing Catalyst values as a `Row`.  
Thus we have to
+  // introduce another row value conversion for data sources whose 
`needConversion` is true.
   def buildScan(requiredColumns: Array[String], inputFiles: 
Array[FileStatus]): RDD[Row] = {
     // Yeah, to workaround serialization...
     val dataSchema = this.dataSchema
@@ -611,14 +616,26 @@ abstract class HadoopFsRelation 
private[sql](maybePartitionSpec: Option[Partitio
       } else {
         rdd.asInstanceOf[RDD[InternalRow]]
       }
+
     converted.mapPartitions { rows =>
       val buildProjection = if (codegenEnabled) {
         GenerateMutableProjection.generate(requiredOutput, 
dataSchema.toAttributes)
       } else {
         () => new InterpretedMutableProjection(requiredOutput, 
dataSchema.toAttributes)
       }
-      val mutableProjection = buildProjection()
-      rows.map(r => mutableProjection(r))
+
+      val projectedRows = {
+        val mutableProjection = buildProjection()
+        rows.map(r => mutableProjection(r))
+      }
+
+      if (needConversion) {
+        val requiredSchema = StructType(requiredColumns.map(dataSchema(_)))
+        val toScala = 
CatalystTypeConverters.createToScalaConverter(requiredSchema)
+        projectedRows.map(toScala(_).asInstanceOf[Row])
+      } else {
+        projectedRows
+      }
     }.asInstanceOf[RDD[Row]]
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e2ec018e/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
index d761909..e8975e5 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
@@ -22,10 +22,6 @@ import org.apache.hadoop.fs.Path
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
 
-/*
-This is commented out due a bug in the data source API (SPARK-9291).
-
-
 class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest {
   override val dataSourceName: String = 
classOf[SimpleTextSource].getCanonicalName
 
@@ -54,4 +50,3 @@ class SimpleTextHadoopFsRelationSuite extends 
HadoopFsRelationTest {
     }
   }
 }
-*/


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to