Repository: spark Updated Branches: refs/heads/master c246b95dd -> ed362008f
[SPARK-4437] update doc for WholeCombineFileRecordReader update doc for WholeCombineFileRecordReader Author: Davies Liu <dav...@databricks.com> Author: Josh Rosen <joshro...@databricks.com> Closes #3301 from davies/fix_doc and squashes the following commits: 1d7422f [Davies Liu] Merge pull request #2 from JoshRosen/whole-text-file-cleanup dc3d21a [Josh Rosen] More genericization in ConfigurableCombineFileRecordReader. 95d13eb [Davies Liu] address comment bf800b9 [Davies Liu] update doc for WholeCombineFileRecordReader Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ed362008 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ed362008 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ed362008 Branch: refs/heads/master Commit: ed362008f0a317729f8404e86e57d8a6ceb60f21 Parents: c246b95 Author: Davies Liu <dav...@databricks.com> Authored: Tue Dec 16 11:19:36 2014 -0800 Committer: Josh Rosen <joshro...@databricks.com> Committed: Tue Dec 16 11:19:36 2014 -0800 ---------------------------------------------------------------------- .../spark/input/WholeTextFileInputFormat.scala | 12 ++---- .../spark/input/WholeTextFileRecordReader.scala | 43 ++++++++++---------- 2 files changed, 25 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/ed362008/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala index d3601cc..aaef7c7 100644 --- a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala +++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala @@ -19,7 +19,6 @@ package org.apache.spark.input import scala.collection.JavaConversions._ -import org.apache.hadoop.conf.{Configuration, Configurable} import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.InputSplit import org.apache.hadoop.mapreduce.JobContext @@ -38,18 +37,13 @@ private[spark] class WholeTextFileInputFormat override protected def isSplitable(context: JobContext, file: Path): Boolean = false - private var conf: Configuration = _ - def setConf(c: Configuration) { - conf = c - } - def getConf: Configuration = conf - override def createRecordReader( split: InputSplit, context: TaskAttemptContext): RecordReader[String, String] = { - val reader = new WholeCombineFileRecordReader(split, context) - reader.setConf(conf) + val reader = + new ConfigurableCombineFileRecordReader(split, context, classOf[WholeTextFileRecordReader]) + reader.setConf(getConf) reader } http://git-wip-us.apache.org/repos/asf/spark/blob/ed362008/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala b/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala index 6d59b24..1b1131b 100644 --- a/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala +++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala @@ -17,7 +17,7 @@ package org.apache.spark.input -import org.apache.hadoop.conf.{Configuration, Configurable} +import org.apache.hadoop.conf.{Configuration, Configurable => HConfigurable} import com.google.common.io.{ByteStreams, Closeables} import org.apache.hadoop.io.Text @@ -27,6 +27,18 @@ import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, CombineFileRecor import org.apache.hadoop.mapreduce.RecordReader import org.apache.hadoop.mapreduce.TaskAttemptContext + +/** + * A trait to implement [[org.apache.hadoop.conf.Configurable Configurable]] interface. + */ +private[spark] trait Configurable extends HConfigurable { + private var conf: Configuration = _ + def setConf(c: Configuration) { + conf = c + } + def getConf: Configuration = conf +} + /** * A [[org.apache.hadoop.mapreduce.RecordReader RecordReader]] for reading a single whole text file * out in a key-value pair, where the key is the file path and the value is the entire content of @@ -38,12 +50,6 @@ private[spark] class WholeTextFileRecordReader( index: Integer) extends RecordReader[String, String] with Configurable { - private var conf: Configuration = _ - def setConf(c: Configuration) { - conf = c - } - def getConf: Configuration = conf - private[this] val path = split.getPath(index) private[this] val fs = path.getFileSystem(context.getConfiguration) @@ -87,29 +93,24 @@ private[spark] class WholeTextFileRecordReader( /** - * A [[org.apache.hadoop.mapreduce.RecordReader RecordReader]] for reading a single whole text file - * out in a key-value pair, where the key is the file path and the value is the entire content of - * the file. + * A [[org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader CombineFileRecordReader]] + * that can pass Hadoop Configuration to [[org.apache.hadoop.conf.Configurable Configurable]] + * RecordReaders. */ -private[spark] class WholeCombineFileRecordReader( +private[spark] class ConfigurableCombineFileRecordReader[K, V]( split: InputSplit, - context: TaskAttemptContext) - extends CombineFileRecordReader[String, String]( + context: TaskAttemptContext, + recordReaderClass: Class[_ <: RecordReader[K, V] with HConfigurable]) + extends CombineFileRecordReader[K, V]( split.asInstanceOf[CombineFileSplit], context, - classOf[WholeTextFileRecordReader] + recordReaderClass ) with Configurable { - private var conf: Configuration = _ - def setConf(c: Configuration) { - conf = c - } - def getConf: Configuration = conf - override def initNextRecordReader(): Boolean = { val r = super.initNextRecordReader() if (r) { - this.curReader.asInstanceOf[WholeTextFileRecordReader].setConf(conf) + this.curReader.asInstanceOf[HConfigurable].setConf(getConf) } r } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org