Repository: spark
Updated Branches:
  refs/heads/master 9f990fa3f -> 37f3be5d2


[SPARK-16159][SQL] Move RDD creation logic from FileSourceStrategy.apply

## What changes were proposed in this pull request?
We embed partitioning logic in FileSourceStrategy.apply, making the function 
very long. This is a small refactoring to move it into its own functions. 
Eventually we would be able to move the partitioning functions into a physical 
operator, rather than doing it in physical planning.

## How was this patch tested?
This is a simple code move.

Author: Reynold Xin <r...@databricks.com>

Closes #13862 from rxin/SPARK-16159.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/37f3be5d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/37f3be5d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/37f3be5d

Branch: refs/heads/master
Commit: 37f3be5d29192db0a54f6c4699237b149bd0ecae
Parents: 9f990fa
Author: Reynold Xin <r...@databricks.com>
Authored: Wed Jun 22 18:19:07 2016 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Wed Jun 22 18:19:07 2016 -0700

----------------------------------------------------------------------
 .../sql/execution/datasources/FileScanRDD.scala |  26 +-
 .../datasources/FileSourceStrategy.scala        | 240 +++++++++++--------
 2 files changed, 154 insertions(+), 112 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/37f3be5d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index f7f68b1..1443057 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -27,9 +27,14 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.vectorized.ColumnarBatch
 
 /**
- * A single file that should be read, along with partition column values that
- * need to be prepended to each row.  The reading should start at the first
- * valid record found after `start`.
+ * A part (i.e. "block") of a single file that should be read, along with 
partition column values
+ * that need to be prepended to each row.
+ *
+ * @param partitionValues value of partition columns to be prepended to each 
row.
+ * @param filePath path of the file to read
+ * @param start the beginning offset (in bytes) of the block.
+ * @param length number of bytes to read.
+ * @param locations locality information (list of nodes that have the data).
  */
 case class PartitionedFile(
     partitionValues: InternalRow,
@@ -43,13 +48,14 @@ case class PartitionedFile(
 }
 
 /**
- * A collection of files that should be read as a single task possibly from 
multiple partitioned
- * directories.
- *
- * TODO: This currently does not take locality information about the files 
into account.
+ * A collection of file blocks that should be read as a single task
+ * (possibly from multiple partitioned directories).
  */
 case class FilePartition(index: Int, files: Seq[PartitionedFile]) extends 
RDDPartition
 
+/**
+ * An RDD that scans a list of file partitions.
+ */
 class FileScanRDD(
     @transient private val sparkSession: SparkSession,
     readFunction: (PartitionedFile) => Iterator[InternalRow],
@@ -88,8 +94,8 @@ class FileScanRDD(
       private[this] var currentFile: PartitionedFile = null
       private[this] var currentIterator: Iterator[Object] = null
 
-      def hasNext = (currentIterator != null && currentIterator.hasNext) || 
nextIterator()
-      def next() = {
+      def hasNext: Boolean = (currentIterator != null && 
currentIterator.hasNext) || nextIterator()
+      def next(): Object = {
         val nextElement = currentIterator.next()
         // TODO: we should have a better separation of row based and batch 
based scan, so that we
         // don't need to run this `if` for every record.
@@ -120,7 +126,7 @@ class FileScanRDD(
         }
       }
 
-      override def close() = {
+      override def close(): Unit = {
         updateBytesRead()
         updateBytesReadWithFileSize()
         InputFileNameHolder.unsetInputFileName()

http://git-wip-us.apache.org/repos/asf/spark/blob/37f3be5d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 13a86bf..04f166f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -22,8 +22,9 @@ import scala.collection.mutable.ArrayBuffer
 import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, 
Path}
 
 import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst.{expressions, InternalRow}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@@ -57,7 +58,7 @@ import org.apache.spark.sql.execution.SparkPlan
 private[sql] object FileSourceStrategy extends Strategy with Logging {
   def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
     case PhysicalOperation(projects, filters,
-      l @ LogicalRelation(files: HadoopFsRelation, _, table)) =>
+      l @ LogicalRelation(fsRelation: HadoopFsRelation, _, table)) =>
       // Filters on this relation fall into four categories based on where we 
can use them to avoid
       // reading unneeded data:
       //  - partition keys only - used to prune directories to read
@@ -77,14 +78,15 @@ private[sql] object FileSourceStrategy extends Strategy 
with Logging {
       }
 
       val partitionColumns =
-        l.resolve(files.partitionSchema, 
files.sparkSession.sessionState.analyzer.resolver)
+        l.resolve(
+          fsRelation.partitionSchema, 
fsRelation.sparkSession.sessionState.analyzer.resolver)
       val partitionSet = AttributeSet(partitionColumns)
       val partitionKeyFilters =
         
ExpressionSet(normalizedFilters.filter(_.references.subsetOf(partitionSet)))
       logInfo(s"Pruning directories with: 
${partitionKeyFilters.mkString(",")}")
 
       val dataColumns =
-        l.resolve(files.dataSchema, 
files.sparkSession.sessionState.analyzer.resolver)
+        l.resolve(fsRelation.dataSchema, 
fsRelation.sparkSession.sessionState.analyzer.resolver)
 
       // Partition keys are not available in the statistics of the files.
       val dataFilters = 
normalizedFilters.filter(_.references.intersect(partitionSet).isEmpty)
@@ -93,7 +95,7 @@ private[sql] object FileSourceStrategy extends Strategy with 
Logging {
       val afterScanFilters = filterSet -- partitionKeyFilters
       logInfo(s"Post-Scan Filters: ${afterScanFilters.mkString(",")}")
 
-      val selectedPartitions = 
files.location.listFiles(partitionKeyFilters.toSeq)
+      val selectedPartitions = 
fsRelation.location.listFiles(partitionKeyFilters.toSeq)
 
       val filterAttributes = AttributeSet(afterScanFilters)
       val requiredExpressions: Seq[NamedExpression] = filterAttributes.toSeq 
++ projects
@@ -109,113 +111,35 @@ private[sql] object FileSourceStrategy extends Strategy 
with Logging {
       val pushedDownFilters = 
dataFilters.flatMap(DataSourceStrategy.translateFilter)
       logInfo(s"Pushed Filters: ${pushedDownFilters.mkString(",")}")
 
-      val readFile = files.fileFormat.buildReaderWithPartitionValues(
-        sparkSession = files.sparkSession,
-        dataSchema = files.dataSchema,
-        partitionSchema = files.partitionSchema,
-        requiredSchema = prunedDataSchema,
-        filters = pushedDownFilters,
-        options = files.options,
-        hadoopConf = 
files.sparkSession.sessionState.newHadoopConfWithOptions(files.options))
-
-      val plannedPartitions = files.bucketSpec match {
-        case Some(bucketing) if 
files.sparkSession.sessionState.conf.bucketingEnabled =>
-          logInfo(s"Planning with ${bucketing.numBuckets} buckets")
-          val bucketed =
-            selectedPartitions.flatMap { p =>
-              p.files.map { f =>
-                val hosts = getBlockHosts(getBlockLocations(f), 0, f.getLen)
-                PartitionedFile(p.values, f.getPath.toUri.toString, 0, 
f.getLen, hosts)
-              }
-            }.groupBy { f =>
-              BucketingUtils
-                .getBucketId(new Path(f.filePath).getName)
-                .getOrElse(sys.error(s"Invalid bucket file ${f.filePath}"))
-            }
-
-          (0 until bucketing.numBuckets).map { bucketId =>
-            FilePartition(bucketId, bucketed.getOrElse(bucketId, Nil))
-          }
-
+      val readFile: (PartitionedFile) => Iterator[InternalRow] =
+        fsRelation.fileFormat.buildReaderWithPartitionValues(
+          sparkSession = fsRelation.sparkSession,
+          dataSchema = fsRelation.dataSchema,
+          partitionSchema = fsRelation.partitionSchema,
+          requiredSchema = prunedDataSchema,
+          filters = pushedDownFilters,
+          options = fsRelation.options,
+          hadoopConf =
+            
fsRelation.sparkSession.sessionState.newHadoopConfWithOptions(fsRelation.options))
+
+      val rdd = fsRelation.bucketSpec match {
+        case Some(bucketing) if 
fsRelation.sparkSession.sessionState.conf.bucketingEnabled =>
+          createBucketedReadRDD(bucketing, readFile, selectedPartitions, 
fsRelation)
         case _ =>
-          val defaultMaxSplitBytes = 
files.sparkSession.sessionState.conf.filesMaxPartitionBytes
-          val openCostInBytes = 
files.sparkSession.sessionState.conf.filesOpenCostInBytes
-          val defaultParallelism = 
files.sparkSession.sparkContext.defaultParallelism
-          val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + 
openCostInBytes)).sum
-          val bytesPerCore = totalBytes / defaultParallelism
-          val maxSplitBytes = Math.min(defaultMaxSplitBytes,
-            Math.max(openCostInBytes, bytesPerCore))
-          logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes 
bytes, " +
-            s"open cost is considered as scanning $openCostInBytes bytes.")
-
-          val splitFiles = selectedPartitions.flatMap { partition =>
-            partition.files.flatMap { file =>
-              val blockLocations = getBlockLocations(file)
-              if (files.fileFormat.isSplitable(files.sparkSession, 
files.options, file.getPath)) {
-                (0L until file.getLen by maxSplitBytes).map { offset =>
-                  val remaining = file.getLen - offset
-                  val size = if (remaining > maxSplitBytes) maxSplitBytes else 
remaining
-                  val hosts = getBlockHosts(blockLocations, offset, size)
-                  PartitionedFile(
-                    partition.values, file.getPath.toUri.toString, offset, 
size, hosts)
-                }
-              } else {
-                val hosts = getBlockHosts(blockLocations, 0, file.getLen)
-                Seq(PartitionedFile(
-                  partition.values, file.getPath.toUri.toString, 0, 
file.getLen, hosts))
-              }
-            }
-          }.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
-
-          val partitions = new ArrayBuffer[FilePartition]
-          val currentFiles = new ArrayBuffer[PartitionedFile]
-          var currentSize = 0L
-
-          /** Add the given file to the current partition. */
-          def addFile(file: PartitionedFile): Unit = {
-            currentSize += file.length + openCostInBytes
-            currentFiles.append(file)
-          }
-
-          /** Close the current partition and move to the next. */
-          def closePartition(): Unit = {
-            if (currentFiles.nonEmpty) {
-              val newPartition =
-                FilePartition(
-                  partitions.size,
-                  currentFiles.toArray.toSeq) // Copy to a new Array.
-              partitions.append(newPartition)
-            }
-            currentFiles.clear()
-            currentSize = 0
-          }
-
-          // Assign files to partitions using "First Fit Decreasing" (FFD)
-          // TODO: consider adding a slop factor here?
-          splitFiles.foreach { file =>
-            if (currentSize + file.length > maxSplitBytes) {
-              closePartition()
-            }
-            addFile(file)
-          }
-          closePartition()
-          partitions
+          createNonBucketedReadRDD(readFile, selectedPartitions, fsRelation)
       }
 
       val meta = Map(
-        "Format" -> files.fileFormat.toString,
+        "Format" -> fsRelation.fileFormat.toString,
         "ReadSchema" -> prunedDataSchema.simpleString,
         PUSHED_FILTERS -> pushedDownFilters.mkString("[", ", ", "]"),
-        INPUT_PATHS -> files.location.paths.mkString(", "))
+        INPUT_PATHS -> fsRelation.location.paths.mkString(", "))
 
       val scan =
         DataSourceScanExec.create(
           readDataColumns ++ partitionColumns,
-          new FileScanRDD(
-            files.sparkSession,
-            readFile,
-            plannedPartitions),
-          files,
+          rdd,
+          fsRelation,
           meta,
           table)
 
@@ -232,6 +156,118 @@ private[sql] object FileSourceStrategy extends Strategy 
with Logging {
     case _ => Nil
   }
 
+  /**
+   * Create an RDD for bucketed reads.
+   * The non-bucketed variant of this function is [[createNonBucketedReadRDD]].
+   *
+   * The algorithm is pretty simple: each RDD partition being returned should 
include all the files
+   * with the same bucket id from all the given Hive partitions.
+   *
+   * @param bucketSpec the bucketing spec.
+   * @param readFile a function to read each (part of a) file.
+   * @param selectedPartitions Hive-style partition that are part of the read.
+   * @param fsRelation [[HadoopFsRelation]] associated with the read.
+   */
+  private def createBucketedReadRDD(
+      bucketSpec: BucketSpec,
+      readFile: (PartitionedFile) => Iterator[InternalRow],
+      selectedPartitions: Seq[Partition],
+      fsRelation: HadoopFsRelation): RDD[InternalRow] = {
+    logInfo(s"Planning with ${bucketSpec.numBuckets} buckets")
+    val bucketed =
+      selectedPartitions.flatMap { p =>
+        p.files.map { f =>
+          val hosts = getBlockHosts(getBlockLocations(f), 0, f.getLen)
+          PartitionedFile(p.values, f.getPath.toUri.toString, 0, f.getLen, 
hosts)
+        }
+      }.groupBy { f =>
+        BucketingUtils
+          .getBucketId(new Path(f.filePath).getName)
+          .getOrElse(sys.error(s"Invalid bucket file ${f.filePath}"))
+      }
+
+    val filePartitions = Seq.tabulate(bucketSpec.numBuckets) { bucketId =>
+      FilePartition(bucketId, bucketed.getOrElse(bucketId, Nil))
+    }
+
+    new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions)
+  }
+
+  /**
+   * Create an RDD for non-bucketed reads.
+   * The bucketed variant of this function is [[createBucketedReadRDD]].
+   *
+   * @param readFile a function to read each (part of a) file.
+   * @param selectedPartitions Hive-style partition that are part of the read.
+   * @param fsRelation [[HadoopFsRelation]] associated with the read.
+   */
+  private def createNonBucketedReadRDD(
+      readFile: (PartitionedFile) => Iterator[InternalRow],
+      selectedPartitions: Seq[Partition],
+      fsRelation: HadoopFsRelation): RDD[InternalRow] = {
+    val defaultMaxSplitBytes =
+      fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes
+    val openCostInBytes = 
fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
+    val defaultParallelism = 
fsRelation.sparkSession.sparkContext.defaultParallelism
+    val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + 
openCostInBytes)).sum
+    val bytesPerCore = totalBytes / defaultParallelism
+
+    val maxSplitBytes = Math.min(defaultMaxSplitBytes, 
Math.max(openCostInBytes, bytesPerCore))
+    logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, 
" +
+      s"open cost is considered as scanning $openCostInBytes bytes.")
+
+    val splitFiles = selectedPartitions.flatMap { partition =>
+      partition.files.flatMap { file =>
+        val blockLocations = getBlockLocations(file)
+        if (fsRelation.fileFormat.isSplitable(
+            fsRelation.sparkSession, fsRelation.options, file.getPath)) {
+          (0L until file.getLen by maxSplitBytes).map { offset =>
+            val remaining = file.getLen - offset
+            val size = if (remaining > maxSplitBytes) maxSplitBytes else 
remaining
+            val hosts = getBlockHosts(blockLocations, offset, size)
+            PartitionedFile(
+              partition.values, file.getPath.toUri.toString, offset, size, 
hosts)
+          }
+        } else {
+          val hosts = getBlockHosts(blockLocations, 0, file.getLen)
+          Seq(PartitionedFile(
+            partition.values, file.getPath.toUri.toString, 0, file.getLen, 
hosts))
+        }
+      }
+    }.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
+
+    val partitions = new ArrayBuffer[FilePartition]
+    val currentFiles = new ArrayBuffer[PartitionedFile]
+    var currentSize = 0L
+
+    /** Close the current partition and move to the next. */
+    def closePartition(): Unit = {
+      if (currentFiles.nonEmpty) {
+        val newPartition =
+          FilePartition(
+            partitions.size,
+            currentFiles.toArray.toSeq) // Copy to a new Array.
+        partitions.append(newPartition)
+      }
+      currentFiles.clear()
+      currentSize = 0
+    }
+
+    // Assign files to partitions using "First Fit Decreasing" (FFD)
+    // TODO: consider adding a slop factor here?
+    splitFiles.foreach { file =>
+      if (currentSize + file.length > maxSplitBytes) {
+        closePartition()
+      }
+      // Add the given file to the current partition.
+      currentSize += file.length + openCostInBytes
+      currentFiles.append(file)
+    }
+    closePartition()
+
+    new FileScanRDD(fsRelation.sparkSession, readFile, partitions)
+  }
+
   private def getBlockLocations(file: FileStatus): Array[BlockLocation] = file 
match {
     case f: LocatedFileStatus => f.getBlockLocations
     case f => Array.empty[BlockLocation]


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to