Repository: spark Updated Branches: refs/heads/master c980e20cf -> e2ec018e3
[SPARK-9285] [SQL] Fixes Row/InternalRow conversion for HadoopFsRelation This is a follow-up of #7626. It fixes `Row`/`InternalRow` conversion for data sources extending `HadoopFsRelation` with `needConversion` being `true`. Author: Cheng Lian <l...@databricks.com> Closes #7649 from liancheng/spark-9285-conversion-fix and squashes the following commits: 036a50c [Cheng Lian] Addresses PR comment f6d7c6a [Cheng Lian] Fixes Row/InternalRow conversion for HadoopFsRelation Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e2ec018e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e2ec018e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e2ec018e Branch: refs/heads/master Commit: e2ec018e37cb699077b5fa2bd662f2055cb42296 Parents: c980e20 Author: Cheng Lian <l...@databricks.com> Authored: Sat Jul 25 11:42:49 2015 -0700 Committer: Reynold Xin <r...@databricks.com> Committed: Sat Jul 25 11:42:49 2015 -0700 ---------------------------------------------------------------------- .../apache/spark/sql/sources/interfaces.scala | 23 +++++++++++++++++--- .../SimpleTextHadoopFsRelationSuite.scala | 5 ----- 2 files changed, 20 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/e2ec018e/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index 119bac7..7126145 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -28,7 +28,7 @@ import org.apache.spark.{Logging, SparkContext} import org.apache.spark.annotation.{DeveloperApi, Experimental} import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection import org.apache.spark.sql.execution.RDDConversions @@ -593,6 +593,11 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio * * @since 1.4.0 */ + // TODO Tries to eliminate the extra Catalyst-to-Scala conversion when `needConversion` is true + // + // PR #7626 separated `Row` and `InternalRow` completely. One of the consequences is that we can + // no longer treat an `InternalRow` containing Catalyst values as a `Row`. Thus we have to + // introduce another row value conversion for data sources whose `needConversion` is true. def buildScan(requiredColumns: Array[String], inputFiles: Array[FileStatus]): RDD[Row] = { // Yeah, to workaround serialization... val dataSchema = this.dataSchema @@ -611,14 +616,26 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio } else { rdd.asInstanceOf[RDD[InternalRow]] } + converted.mapPartitions { rows => val buildProjection = if (codegenEnabled) { GenerateMutableProjection.generate(requiredOutput, dataSchema.toAttributes) } else { () => new InterpretedMutableProjection(requiredOutput, dataSchema.toAttributes) } - val mutableProjection = buildProjection() - rows.map(r => mutableProjection(r)) + + val projectedRows = { + val mutableProjection = buildProjection() + rows.map(r => mutableProjection(r)) + } + + if (needConversion) { + val requiredSchema = StructType(requiredColumns.map(dataSchema(_))) + val toScala = CatalystTypeConverters.createToScalaConverter(requiredSchema) + projectedRows.map(toScala(_).asInstanceOf[Row]) + } else { + projectedRows + } }.asInstanceOf[RDD[Row]] } http://git-wip-us.apache.org/repos/asf/spark/blob/e2ec018e/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala index d761909..e8975e5 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala @@ -22,10 +22,6 @@ import org.apache.hadoop.fs.Path import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.sql.types.{IntegerType, StructField, StructType} -/* -This is commented out due a bug in the data source API (SPARK-9291). - - class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest { override val dataSourceName: String = classOf[SimpleTextSource].getCanonicalName @@ -54,4 +50,3 @@ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest { } } } -*/ --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org