Repository: spark Updated Branches: refs/heads/master 7320f9bd1 -> 816f359cf
[SPARK-14114][SQL] implement buildReader for text data source ## What changes were proposed in this pull request? This PR implements buildReader for text data source and enable it in the new data source code path. ## How was this patch tested? Existing tests. Author: Wenchen Fan <wenc...@databricks.com> Closes #11934 from cloud-fan/text. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/816f359c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/816f359c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/816f359c Branch: refs/heads/master Commit: 816f359cf043ef719a0bc7df0506a3a830fff70d Parents: 7320f9b Author: Wenchen Fan <wenc...@databricks.com> Authored: Wed Mar 30 17:32:53 2016 +0800 Committer: Cheng Lian <l...@databricks.com> Committed: Wed Mar 30 17:32:53 2016 +0800 ---------------------------------------------------------------------- .../datasources/FileSourceStrategy.scala | 3 ++- .../datasources/text/DefaultSource.scala | 28 +++++++++++++++++++- 2 files changed, 29 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/816f359c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index 20fda95..4448796 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -59,7 +59,8 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { if (files.fileFormat.toString == "TestFileFormat" || files.fileFormat.isInstanceOf[parquet.DefaultSource] || files.fileFormat.toString == "ORC" || - files.fileFormat.isInstanceOf[json.DefaultSource]) && + files.fileFormat.isInstanceOf[json.DefaultSource] || + files.fileFormat.isInstanceOf[text.DefaultSource]) && files.sqlContext.conf.useFileScan => // Filters on this relation fall into four categories based on where we can use them to avoid // reading unneeded data: http://git-wip-us.apache.org/repos/asf/spark/blob/816f359c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala index 5cfc9e9..d6ab5fc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.datasources.text +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.io.{LongWritable, NullWritable, Text} import org.apache.hadoop.mapred.{JobConf, TextInputFormat} @@ -30,7 +31,7 @@ import org.apache.spark.sql.{AnalysisException, Row, SQLContext} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter} -import org.apache.spark.sql.execution.datasources.CompressionCodecs +import org.apache.spark.sql.execution.datasources.{CompressionCodecs, HadoopFileLinesReader, PartitionedFile} import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{StringType, StructType} import org.apache.spark.util.SerializableConfiguration @@ -125,6 +126,31 @@ class DefaultSource extends FileFormat with DataSourceRegister { } } } + + override def buildReader( + sqlContext: SQLContext, + partitionSchema: StructType, + dataSchema: StructType, + filters: Seq[Filter], + options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = { + val conf = new Configuration(sqlContext.sparkContext.hadoopConfiguration) + val broadcastedConf = + sqlContext.sparkContext.broadcast(new SerializableConfiguration(conf)) + + file => { + val unsafeRow = new UnsafeRow(1) + val bufferHolder = new BufferHolder(unsafeRow) + val unsafeRowWriter = new UnsafeRowWriter(bufferHolder, 1) + + new HadoopFileLinesReader(file, broadcastedConf.value.value).map { line => + // Writes to an UnsafeRow directly + bufferHolder.reset() + unsafeRowWriter.write(0, line.getBytes, 0, line.getLength) + unsafeRow.setTotalSize(bufferHolder.totalSize()) + unsafeRow + } + } + } } class TextOutputWriter(path: String, dataSchema: StructType, context: TaskAttemptContext) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org