Sean,

Thank you!

Finally, I get this to work, although it is a bit ugly: manually to set the
meta data of dataframe.

import org.apache.spark.ml.attribute._
import org.apache.spark.sql.types._
val df = training.toDF()
val schema = df.schema
val rowRDD = df.rdd
def enrich(m : Metadata) : Metadata = {
  val na = NominalAttribute.defaultAttr.withValues("0", "1")
  na.toMetadata(m)
}
val newSchema = StructType(schema.map(f => if (f.name == "label")
f.copy(metadata=enrich(f.metadata)) else f))
val model = pipeline.fit(sqlContext.createDataFrame(rowRDD, newSchema))

Thanks!
- Terry

On Mon, Sep 7, 2015 at 4:24 PM, Sean Owen <so...@cloudera.com> wrote:

> Hm, off the top of my head I don't know. I haven't looked at this
> aspect in a while, strangely. It's an attribute in the metadata of the
> field. I assume there's a method for setting this metadata when you
> construct the input data.
>
> On Sun, Sep 6, 2015 at 10:41 AM, Terry Hole <hujie.ea...@gmail.com> wrote:
> > Sean
> >
> > Do you know how to tell decision tree that the "label" is a binary or set
> > some attributes to dataframe to carry number of classes?
> >
> > Thanks!
> > - Terry
> >
> > On Sun, Sep 6, 2015 at 5:23 PM, Sean Owen <so...@cloudera.com> wrote:
> >>
> >> (Sean)
> >> The error suggests that the type is not a binary or nominal attribute
> >> though. I think that's the missing step. A double-valued column need
> >> not be one of these attribute types.
> >>
> >> On Sun, Sep 6, 2015 at 10:14 AM, Terry Hole <hujie.ea...@gmail.com>
> wrote:
> >> > Hi, Owen,
> >> >
> >> > The dataframe "training" is from a RDD of case class:
> >> > RDD[LabeledDocument],
> >> > while the case class is defined as this:
> >> > case class LabeledDocument(id: Long, text: String, label: Double)
> >> >
> >> > So there is already has the default "label" column with "double" type.
> >> >
> >> > I already tried to set the label column for decision tree as this:
> >> > val lr = new
> >> >
> >> >
> DecisionTreeClassifier().setMaxDepth(5).setMaxBins(32).setImpurity("gini").setLabelCol("label")
> >> > It raised the same error.
> >> >
> >> > I also tried to change the "label" to "int" type, it also reported
> error
> >> > like following stack, I have no idea how to make this work.
> >> >
> >> > java.lang.IllegalArgumentException: requirement failed: Column label
> >> > must be
> >> > of type DoubleType but was actually IntegerType.
> >> >         at scala.Predef$.require(Predef.scala:233)
> >> >         at
> >> >
> >> >
> org.apache.spark.ml.util.SchemaUtils$.checkColumnType(SchemaUtils.scala:37)
> >> >         at
> >> >
> >> >
> org.apache.spark.ml.PredictorParams$class.validateAndTransformSchema(Predictor.scala:53)
> >> >         at
> >> >
> >> >
> org.apache.spark.ml.Predictor.validateAndTransformSchema(Predictor.scala:71)
> >> >         at
> >> > org.apache.spark.ml.Predictor.transformSchema(Predictor.scala:116)
> >> >         at
> >> >
> >> >
> org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:162)
> >> >         at
> >> >
> >> >
> org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:162)
> >> >         at
> >> >
> >> >
> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
> >> >         at
> >> >
> >> >
> scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)
> >> >         at
> >> > scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:108)
> >> >         at
> >> > org.apache.spark.ml.Pipeline.transformSchema(Pipeline.scala:162)
> >> >         at
> >> > org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:59)
> >> >         at org.apache.spark.ml.Pipeline.fit(Pipeline.scala:116)
> >> >         at
> >> >
> >> >
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:51)
> >> >         at
> >> >
> >> >
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:56)
> >> >         at
> >> > $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:58)
> >> >         at
> >> > $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:60)
> >> >         at
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:62)
> >> >         at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:64)
> >> >         at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:66)
> >> >         at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:68)
> >> >         at $iwC$$iwC$$iwC$$iwC.<init>(<console>:70)
> >> >         at $iwC$$iwC$$iwC.<init>(<console>:72)
> >> >         at $iwC$$iwC.<init>(<console>:74)
> >> >         at $iwC.<init>(<console>:76)
> >> >         at <init>(<console>:78)
> >> >         at .<init>(<console>:82)
> >> >         at .<clinit>(<console>)
> >> >         at .<init>(<console>:7)
> >> >         at .<clinit>(<console>)
> >> >         at $print(<console>)
> >> >
> >> > Thanks!
> >> > - Terry
> >> >
> >> > On Sun, Sep 6, 2015 at 4:53 PM, Sean Owen <so...@cloudera.com> wrote:
> >> >>
> >> >> I think somewhere alone the line you've not specified your label
> >> >> column -- it's defaulting to "label" and it does not recognize it, or
> >> >> at least not as a binary or nominal attribute.
> >> >>
> >> >> On Sun, Sep 6, 2015 at 5:47 AM, Terry Hole <hujie.ea...@gmail.com>
> >> >> wrote:
> >> >> > Hi, Experts,
> >> >> >
> >> >> > I followed the guide of spark ml pipe to test
> DecisionTreeClassifier
> >> >> > on
> >> >> > spark shell with spark 1.4.1, but always meets error like
> following,
> >> >> > do
> >> >> > you
> >> >> > have any idea how to fix this?
> >> >> >
> >> >> > The error stack:
> >> >> > java.lang.IllegalArgumentException: DecisionTreeClassifier was
> given
> >> >> > input
> >> >> > with invalid label column label, without the number of classes
> >> >> > specified.
> >> >> > See StringIndexer.
> >> >> >         at
> >> >> >
> >> >> >
> >> >> >
> org.apache.spark.ml.classification.DecisionTreeClassifier.train(DecisionTreeClassifier.scala:71)
> >> >> >         at
> >> >> >
> >> >> >
> >> >> >
> org.apache.spark.ml.classification.DecisionTreeClassifier.train(DecisionTreeClassifier.scala:41)
> >> >> >         at org.apache.spark.ml.Predictor.fit(Predictor.scala:90)
> >> >> >         at org.apache.spark.ml.Predictor.fit(Predictor.scala:71)
> >> >> >         at
> >> >> >
> org.apache.spark.ml.Pipeline$$anonfun$fit$2.apply(Pipeline.scala:133)
> >> >> >         at
> >> >> >
> org.apache.spark.ml.Pipeline$$anonfun$fit$2.apply(Pipeline.scala:129)
> >> >> >         at
> >> >> > scala.collection.Iterator$class.foreach(Iterator.scala:727)
> >> >> >         at
> >> >> > scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> >> >> >         at
> >> >> >
> >> >> >
> >> >> >
> scala.collection.IterableViewLike$Transformed$class.foreach(IterableViewLike.scala:42)
> >> >> >         at
> >> >> >
> >> >> >
> >> >> >
> scala.collection.SeqViewLike$AbstractTransformed.foreach(SeqViewLike.scala:43)
> >> >> >         at org.apache.spark.ml.Pipeline.fit(Pipeline.scala:129)
> >> >> >         at
> >> >> >
> >> >> >
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:42)
> >> >> >         at
> >> >> > $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:47)
> >> >> >         at
> >> >> > $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:49)
> >> >> >         at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:51)
> >> >> >         at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:53)
> >> >> >         at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:55)
> >> >> >         at $iwC$$iwC$$iwC$$iwC.<init>(<console>:57)
> >> >> >         at $iwC$$iwC$$iwC.<init>(<console>:59)
> >> >> >         at $iwC$$iwC.<init>(<console>:61)
> >> >> >         at $iwC.<init>(<console>:63)
> >> >> >         at <init>(<console>:65)
> >> >> >         at .<init>(<console>:69)
> >> >> >         at .<clinit>(<console>)
> >> >> >         at .<init>(<console>:7)
> >> >> >         at .<clinit>(<console>)
> >> >> >         at $print(<console>)
> >> >> >
> >> >> > The execute code is:
> >> >> > // Labeled and unlabeled instance types.
> >> >> > // Spark SQL can infer schema from case classes.
> >> >> > case class LabeledDocument(id: Long, text: String, label: Double)
> >> >> > case class Document(id: Long, text: String)
> >> >> > // Prepare training documents, which are labeled.
> >> >> > val training = sc.parallelize(Seq(
> >> >> >   LabeledDocument(0L, "a b c d e spark", 1.0),
> >> >> >   LabeledDocument(1L, "b d", 0.0),
> >> >> >   LabeledDocument(2L, "spark f g h", 1.0),
> >> >> >   LabeledDocument(3L, "hadoop mapreduce", 0.0)))
> >> >> >
> >> >> > // Configure an ML pipeline, which consists of three stages:
> >> >> > tokenizer,
> >> >> > hashingTF, and lr.
> >> >> > val tokenizer = new
> >> >> > Tokenizer().setInputCol("text").setOutputCol("words")
> >> >> > val hashingTF = new
> >> >> >
> >> >> >
> >> >> >
> HashingTF().setNumFeatures(1000).setInputCol(tokenizer.getOutputCol).setOutputCol("features")
> >> >> > val lr =  new
> >> >> >
> >> >> >
> >> >> >
> DecisionTreeClassifier().setMaxDepth(5).setMaxBins(32).setImpurity("gini")
> >> >> > val pipeline = new Pipeline().setStages(Array(tokenizer, hashingTF,
> >> >> > lr))
> >> >> >
> >> >> > // Error raises from the following line
> >> >> > val model = pipeline.fit(training.toDF)
> >> >> >
> >> >> >
> >> >
> >> >
> >
> >
>

Reply via email to