Sean, Thank you!
Finally, I get this to work, although it is a bit ugly: manually to set the meta data of dataframe. import org.apache.spark.ml.attribute._ import org.apache.spark.sql.types._ val df = training.toDF() val schema = df.schema val rowRDD = df.rdd def enrich(m : Metadata) : Metadata = { val na = NominalAttribute.defaultAttr.withValues("0", "1") na.toMetadata(m) } val newSchema = StructType(schema.map(f => if (f.name == "label") f.copy(metadata=enrich(f.metadata)) else f)) val model = pipeline.fit(sqlContext.createDataFrame(rowRDD, newSchema)) Thanks! - Terry On Mon, Sep 7, 2015 at 4:24 PM, Sean Owen <so...@cloudera.com> wrote: > Hm, off the top of my head I don't know. I haven't looked at this > aspect in a while, strangely. It's an attribute in the metadata of the > field. I assume there's a method for setting this metadata when you > construct the input data. > > On Sun, Sep 6, 2015 at 10:41 AM, Terry Hole <hujie.ea...@gmail.com> wrote: > > Sean > > > > Do you know how to tell decision tree that the "label" is a binary or set > > some attributes to dataframe to carry number of classes? > > > > Thanks! > > - Terry > > > > On Sun, Sep 6, 2015 at 5:23 PM, Sean Owen <so...@cloudera.com> wrote: > >> > >> (Sean) > >> The error suggests that the type is not a binary or nominal attribute > >> though. I think that's the missing step. A double-valued column need > >> not be one of these attribute types. > >> > >> On Sun, Sep 6, 2015 at 10:14 AM, Terry Hole <hujie.ea...@gmail.com> > wrote: > >> > Hi, Owen, > >> > > >> > The dataframe "training" is from a RDD of case class: > >> > RDD[LabeledDocument], > >> > while the case class is defined as this: > >> > case class LabeledDocument(id: Long, text: String, label: Double) > >> > > >> > So there is already has the default "label" column with "double" type. > >> > > >> > I already tried to set the label column for decision tree as this: > >> > val lr = new > >> > > >> > > DecisionTreeClassifier().setMaxDepth(5).setMaxBins(32).setImpurity("gini").setLabelCol("label") > >> > It raised the same error. > >> > > >> > I also tried to change the "label" to "int" type, it also reported > error > >> > like following stack, I have no idea how to make this work. > >> > > >> > java.lang.IllegalArgumentException: requirement failed: Column label > >> > must be > >> > of type DoubleType but was actually IntegerType. > >> > at scala.Predef$.require(Predef.scala:233) > >> > at > >> > > >> > > org.apache.spark.ml.util.SchemaUtils$.checkColumnType(SchemaUtils.scala:37) > >> > at > >> > > >> > > org.apache.spark.ml.PredictorParams$class.validateAndTransformSchema(Predictor.scala:53) > >> > at > >> > > >> > > org.apache.spark.ml.Predictor.validateAndTransformSchema(Predictor.scala:71) > >> > at > >> > org.apache.spark.ml.Predictor.transformSchema(Predictor.scala:116) > >> > at > >> > > >> > > org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:162) > >> > at > >> > > >> > > org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:162) > >> > at > >> > > >> > > scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51) > >> > at > >> > > >> > > scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60) > >> > at > >> > scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:108) > >> > at > >> > org.apache.spark.ml.Pipeline.transformSchema(Pipeline.scala:162) > >> > at > >> > org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:59) > >> > at org.apache.spark.ml.Pipeline.fit(Pipeline.scala:116) > >> > at > >> > > >> > > $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:51) > >> > at > >> > > >> > > $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:56) > >> > at > >> > $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:58) > >> > at > >> > $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:60) > >> > at > $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:62) > >> > at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:64) > >> > at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:66) > >> > at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:68) > >> > at $iwC$$iwC$$iwC$$iwC.<init>(<console>:70) > >> > at $iwC$$iwC$$iwC.<init>(<console>:72) > >> > at $iwC$$iwC.<init>(<console>:74) > >> > at $iwC.<init>(<console>:76) > >> > at <init>(<console>:78) > >> > at .<init>(<console>:82) > >> > at .<clinit>(<console>) > >> > at .<init>(<console>:7) > >> > at .<clinit>(<console>) > >> > at $print(<console>) > >> > > >> > Thanks! > >> > - Terry > >> > > >> > On Sun, Sep 6, 2015 at 4:53 PM, Sean Owen <so...@cloudera.com> wrote: > >> >> > >> >> I think somewhere alone the line you've not specified your label > >> >> column -- it's defaulting to "label" and it does not recognize it, or > >> >> at least not as a binary or nominal attribute. > >> >> > >> >> On Sun, Sep 6, 2015 at 5:47 AM, Terry Hole <hujie.ea...@gmail.com> > >> >> wrote: > >> >> > Hi, Experts, > >> >> > > >> >> > I followed the guide of spark ml pipe to test > DecisionTreeClassifier > >> >> > on > >> >> > spark shell with spark 1.4.1, but always meets error like > following, > >> >> > do > >> >> > you > >> >> > have any idea how to fix this? > >> >> > > >> >> > The error stack: > >> >> > java.lang.IllegalArgumentException: DecisionTreeClassifier was > given > >> >> > input > >> >> > with invalid label column label, without the number of classes > >> >> > specified. > >> >> > See StringIndexer. > >> >> > at > >> >> > > >> >> > > >> >> > > org.apache.spark.ml.classification.DecisionTreeClassifier.train(DecisionTreeClassifier.scala:71) > >> >> > at > >> >> > > >> >> > > >> >> > > org.apache.spark.ml.classification.DecisionTreeClassifier.train(DecisionTreeClassifier.scala:41) > >> >> > at org.apache.spark.ml.Predictor.fit(Predictor.scala:90) > >> >> > at org.apache.spark.ml.Predictor.fit(Predictor.scala:71) > >> >> > at > >> >> > > org.apache.spark.ml.Pipeline$$anonfun$fit$2.apply(Pipeline.scala:133) > >> >> > at > >> >> > > org.apache.spark.ml.Pipeline$$anonfun$fit$2.apply(Pipeline.scala:129) > >> >> > at > >> >> > scala.collection.Iterator$class.foreach(Iterator.scala:727) > >> >> > at > >> >> > scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > >> >> > at > >> >> > > >> >> > > >> >> > > scala.collection.IterableViewLike$Transformed$class.foreach(IterableViewLike.scala:42) > >> >> > at > >> >> > > >> >> > > >> >> > > scala.collection.SeqViewLike$AbstractTransformed.foreach(SeqViewLike.scala:43) > >> >> > at org.apache.spark.ml.Pipeline.fit(Pipeline.scala:129) > >> >> > at > >> >> > > >> >> > > $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:42) > >> >> > at > >> >> > $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:47) > >> >> > at > >> >> > $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:49) > >> >> > at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:51) > >> >> > at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:53) > >> >> > at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:55) > >> >> > at $iwC$$iwC$$iwC$$iwC.<init>(<console>:57) > >> >> > at $iwC$$iwC$$iwC.<init>(<console>:59) > >> >> > at $iwC$$iwC.<init>(<console>:61) > >> >> > at $iwC.<init>(<console>:63) > >> >> > at <init>(<console>:65) > >> >> > at .<init>(<console>:69) > >> >> > at .<clinit>(<console>) > >> >> > at .<init>(<console>:7) > >> >> > at .<clinit>(<console>) > >> >> > at $print(<console>) > >> >> > > >> >> > The execute code is: > >> >> > // Labeled and unlabeled instance types. > >> >> > // Spark SQL can infer schema from case classes. > >> >> > case class LabeledDocument(id: Long, text: String, label: Double) > >> >> > case class Document(id: Long, text: String) > >> >> > // Prepare training documents, which are labeled. > >> >> > val training = sc.parallelize(Seq( > >> >> > LabeledDocument(0L, "a b c d e spark", 1.0), > >> >> > LabeledDocument(1L, "b d", 0.0), > >> >> > LabeledDocument(2L, "spark f g h", 1.0), > >> >> > LabeledDocument(3L, "hadoop mapreduce", 0.0))) > >> >> > > >> >> > // Configure an ML pipeline, which consists of three stages: > >> >> > tokenizer, > >> >> > hashingTF, and lr. > >> >> > val tokenizer = new > >> >> > Tokenizer().setInputCol("text").setOutputCol("words") > >> >> > val hashingTF = new > >> >> > > >> >> > > >> >> > > HashingTF().setNumFeatures(1000).setInputCol(tokenizer.getOutputCol).setOutputCol("features") > >> >> > val lr = new > >> >> > > >> >> > > >> >> > > DecisionTreeClassifier().setMaxDepth(5).setMaxBins(32).setImpurity("gini") > >> >> > val pipeline = new Pipeline().setStages(Array(tokenizer, hashingTF, > >> >> > lr)) > >> >> > > >> >> > // Error raises from the following line > >> >> > val model = pipeline.fit(training.toDF) > >> >> > > >> >> > > >> > > >> > > > > > >