Repository: spark
Updated Branches:
  refs/heads/master 6451cf927 -> 9ea0d5e32


[SPARK-15983][SQL] Removes FileFormat.prepareRead

## What changes were proposed in this pull request?

Interface method `FileFormat.prepareRead()` was added in #12088 to handle a 
special case in the LibSVM data source.

However, the semantics of this interface method isn't intuitive: it returns a 
modified version of the data source options map. Considering that the LibSVM 
case can be easily handled using schema metadata inside `inferSchema`, we can 
remove this interface method to keep the `FileFormat` interface clean.

## How was this patch tested?

Existing tests.

Author: Cheng Lian <l...@databricks.com>

Closes #13698 from liancheng/remove-prepare-read.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9ea0d5e3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9ea0d5e3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9ea0d5e3

Branch: refs/heads/master
Commit: 9ea0d5e326e08b914aa46f1eec8795688a61bf74
Parents: 6451cf9
Author: Cheng Lian <l...@databricks.com>
Authored: Thu Jun 16 10:24:29 2016 -0700
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Thu Jun 16 10:24:29 2016 -0700

----------------------------------------------------------------------
 .../spark/ml/source/libsvm/LibSVMRelation.scala | 33 ++++++++++----------
 .../sql/execution/datasources/DataSource.scala  |  5 +--
 .../datasources/fileSourceInterfaces.scala      | 11 -------
 3 files changed, 18 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9ea0d5e3/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala 
b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
index 62e09d2..4988dd6 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
@@ -120,9 +120,12 @@ class LibSVMFileFormat extends TextBasedFileFormat with 
DataSourceRegister {
   override def toString: String = "LibSVM"
 
   private def verifySchema(dataSchema: StructType): Unit = {
-    if (dataSchema.size != 2 ||
-      (!dataSchema(0).dataType.sameType(DataTypes.DoubleType)
-        || !dataSchema(1).dataType.sameType(new VectorUDT()))) {
+    if (
+      dataSchema.size != 2 ||
+        !dataSchema(0).dataType.sameType(DataTypes.DoubleType) ||
+        !dataSchema(1).dataType.sameType(new VectorUDT()) ||
+        !(dataSchema(1).metadata.getLong("numFeatures").toInt > 0)
+    ) {
       throw new IOException(s"Illegal schema for libsvm data, 
schema=$dataSchema")
     }
   }
@@ -131,17 +134,8 @@ class LibSVMFileFormat extends TextBasedFileFormat with 
DataSourceRegister {
       sparkSession: SparkSession,
       options: Map[String, String],
       files: Seq[FileStatus]): Option[StructType] = {
-    Some(
-      StructType(
-        StructField("label", DoubleType, nullable = false) ::
-        StructField("features", new VectorUDT(), nullable = false) :: Nil))
-  }
-
-  override def prepareRead(
-      sparkSession: SparkSession,
-      options: Map[String, String],
-      files: Seq[FileStatus]): Map[String, String] = {
-    val numFeatures = options.get("numFeatures").filter(_.toInt > 0).getOrElse 
{
+    val numFeatures: Int = options.get("numFeatures").map(_.toInt).filter(_ > 
0).getOrElse {
+      // Infers number of features if the user doesn't specify (a valid) one.
       val dataFiles = files.filterNot(_.getPath.getName startsWith "_")
       val path = if (dataFiles.length == 1) {
         dataFiles.head.getPath.toUri.toString
@@ -156,7 +150,14 @@ class LibSVMFileFormat extends TextBasedFileFormat with 
DataSourceRegister {
       MLUtils.computeNumFeatures(parsed)
     }
 
-    new CaseInsensitiveMap(options + ("numFeatures" -> numFeatures.toString))
+    val featuresMetadata = new MetadataBuilder()
+      .putLong("numFeatures", numFeatures)
+      .build()
+
+    Some(
+      StructType(
+        StructField("label", DoubleType, nullable = false) ::
+        StructField("features", new VectorUDT(), nullable = false, 
featuresMetadata) :: Nil))
   }
 
   override def prepareWrite(
@@ -185,7 +186,7 @@ class LibSVMFileFormat extends TextBasedFileFormat with 
DataSourceRegister {
       options: Map[String, String],
       hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = 
{
     verifySchema(dataSchema)
-    val numFeatures = options("numFeatures").toInt
+    val numFeatures = 
dataSchema("features").metadata.getLong("numFeatures").toInt
     assert(numFeatures > 0)
 
     val sparse = options.getOrElse("vectorType", "sparse") == "sparse"

http://git-wip-us.apache.org/repos/asf/spark/blob/9ea0d5e3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index d327302..7f3683f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -386,9 +386,6 @@ case class DataSource(
               "It must be specified manually")
         }
 
-        val enrichedOptions =
-          format.prepareRead(sparkSession, caseInsensitiveOptions, 
fileCatalog.allFiles())
-
         HadoopFsRelation(
           sparkSession,
           fileCatalog,
@@ -396,7 +393,7 @@ case class DataSource(
           dataSchema = dataSchema.asNullable,
           bucketSpec = bucketSpec,
           format,
-          enrichedOptions)
+          caseInsensitiveOptions)
 
       case _ =>
         throw new AnalysisException(

http://git-wip-us.apache.org/repos/asf/spark/blob/9ea0d5e3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
index 641c5cb..4ac555b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
@@ -26,7 +26,6 @@ import 
org.apache.hadoop.io.compress.{CompressionCodecFactory, SplittableCompres
 import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
 import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
 
-import org.apache.spark.SparkContext
 import org.apache.spark.annotation.Experimental
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql._
@@ -187,15 +186,6 @@ trait FileFormat {
       files: Seq[FileStatus]): Option[StructType]
 
   /**
-   * Prepares a read job and returns a potentially updated data source option 
[[Map]]. This method
-   * can be useful for collecting necessary global information for scanning 
input data.
-   */
-  def prepareRead(
-      sparkSession: SparkSession,
-      options: Map[String, String],
-      files: Seq[FileStatus]): Map[String, String] = options
-
-  /**
    * Prepares a write job and returns an [[OutputWriterFactory]].  Client side 
job preparation can
    * be put here.  For example, user defined output committer can be 
configured here
    * by setting the output committer class in the conf of 
spark.sql.sources.outputCommitterClass.
@@ -454,7 +444,6 @@ private[sql] object HadoopFsRelation extends Logging {
     logInfo(s"Listing leaf files and directories in parallel under: 
${paths.mkString(", ")}")
 
     val sparkContext = sparkSession.sparkContext
-    val sqlConf = sparkSession.sessionState.conf
     val serializableConfiguration = new SerializableConfiguration(hadoopConf)
     val serializedPaths = paths.map(_.toString)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to