Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/11509#discussion_r55204271 --- Diff: mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala --- @@ -167,22 +117,63 @@ class DefaultSource extends HadoopFsRelationProvider with DataSourceRegister { throw new IOException(s"Illegal schema for libsvm data, schema=${dataSchema}") } } + override def inferSchema( + sqlContext: SQLContext, + options: Map[String, String], + files: Seq[FileStatus]): Option[StructType] = { + Some( + StructType( + StructField("label", DoubleType, nullable = false) :: + StructField("features", new VectorUDT(), nullable = false) :: Nil)) + } - override def createRelation( + override def prepareWrite( sqlContext: SQLContext, - paths: Array[String], - dataSchema: Option[StructType], - partitionColumns: Option[StructType], - parameters: Map[String, String]): HadoopFsRelation = { - val path = if (paths.length == 1) paths(0) - else if (paths.isEmpty) throw new IOException("No input path specified for libsvm data") - else throw new IOException("Multiple input paths are not supported for libsvm data") - if (partitionColumns.isDefined && !partitionColumns.get.isEmpty) { - throw new IOException("Partition is not supported for libsvm data") + job: Job, + options: Map[String, String], + dataSchema: StructType): OutputWriterFactory = { + new OutputWriterFactory { + override def newInstance( + path: String, + bucketId: Option[Int], + dataSchema: StructType, + context: TaskAttemptContext): OutputWriter = { + if (bucketId.isDefined) { sys.error("LibSVM doesn't support bucketing") } + new LibSVMOutputWriter(path, dataSchema, context) + } + } + } + + override def buildInternalScan( + sqlContext: SQLContext, + dataSchema: StructType, + requiredColumns: Array[String], + filters: Array[Filter], + bucketSet: Option[BitSet], + inputFiles: Array[FileStatus], + broadcastedConf: Broadcast[SerializableConfiguration], + options: Map[String, String]): RDD[InternalRow] = { + // TODO: This does not handle cases where column pruning has been performed. + + verifySchema(dataSchema) --- End diff -- should we verify schema earlier? i.e. in `prepareWrite`
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org