Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11509#discussion_r55204271
  
    --- Diff: 
mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala ---
    @@ -167,22 +117,63 @@ class DefaultSource extends HadoopFsRelationProvider 
with DataSourceRegister {
           throw new IOException(s"Illegal schema for libsvm data, 
schema=${dataSchema}")
         }
       }
    +  override def inferSchema(
    +      sqlContext: SQLContext,
    +      options: Map[String, String],
    +      files: Seq[FileStatus]): Option[StructType] = {
    +    Some(
    +      StructType(
    +        StructField("label", DoubleType, nullable = false) ::
    +        StructField("features", new VectorUDT(), nullable = false) :: Nil))
    +  }
     
    -  override def createRelation(
    +  override def prepareWrite(
           sqlContext: SQLContext,
    -      paths: Array[String],
    -      dataSchema: Option[StructType],
    -      partitionColumns: Option[StructType],
    -      parameters: Map[String, String]): HadoopFsRelation = {
    -    val path = if (paths.length == 1) paths(0)
    -      else if (paths.isEmpty) throw new IOException("No input path 
specified for libsvm data")
    -      else throw new IOException("Multiple input paths are not supported 
for libsvm data")
    -    if (partitionColumns.isDefined && !partitionColumns.get.isEmpty) {
    -      throw new IOException("Partition is not supported for libsvm data")
    +      job: Job,
    +      options: Map[String, String],
    +      dataSchema: StructType): OutputWriterFactory = {
    +    new OutputWriterFactory {
    +      override def newInstance(
    +          path: String,
    +          bucketId: Option[Int],
    +          dataSchema: StructType,
    +          context: TaskAttemptContext): OutputWriter = {
    +        if (bucketId.isDefined) { sys.error("LibSVM doesn't support 
bucketing") }
    +        new LibSVMOutputWriter(path, dataSchema, context)
    +      }
    +    }
    +  }
    +
    +  override def buildInternalScan(
    +      sqlContext: SQLContext,
    +      dataSchema: StructType,
    +      requiredColumns: Array[String],
    +      filters: Array[Filter],
    +      bucketSet: Option[BitSet],
    +      inputFiles: Array[FileStatus],
    +      broadcastedConf: Broadcast[SerializableConfiguration],
    +      options: Map[String, String]): RDD[InternalRow] = {
    +    // TODO: This does not handle cases where column pruning has been 
performed.
    +
    +    verifySchema(dataSchema)
    --- End diff --
    
    should we verify schema earlier? i.e. in `prepareWrite`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to