[ https://issues.apache.org/jira/browse/SPARK-3573?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Xiangrui Meng resolved SPARK-3573. ---------------------------------- Resolution: Fixed Fix Version/s: 1.2.0 Issue resolved by pull request 3070 [https://github.com/apache/spark/pull/3070] > Dataset > ------- > > Key: SPARK-3573 > URL: https://issues.apache.org/jira/browse/SPARK-3573 > Project: Spark > Issue Type: Sub-task > Components: MLlib > Reporter: Xiangrui Meng > Assignee: Xiangrui Meng > Priority: Critical > Fix For: 1.2.0 > > > This JIRA is for discussion of ML dataset, essentially a SchemaRDD with extra > ML-specific metadata embedded in its schema. > .Sample code > Suppose we have training events stored on HDFS and user/ad features in Hive, > we want to assemble features for training and then apply decision tree. > The proposed pipeline with dataset looks like the following (need more > refinements): > {code} > sqlContext.jsonFile("/path/to/training/events", > 0.01).registerTempTable("event") > val training = sqlContext.sql(""" > SELECT event.id AS eventId, event.userId AS userId, event.adId AS adId, > event.action AS label, > user.gender AS userGender, user.country AS userCountry, > user.features AS userFeatures, > ad.targetGender AS targetGender > FROM event JOIN user ON event.userId = user.id JOIN ad ON event.adId = > ad.id;""").cache() > val indexer = new Indexer() > val interactor = new Interactor() > val fvAssembler = new FeatureVectorAssembler() > val treeClassifer = new DecisionTreeClassifer() > val paramMap = new ParamMap() > .put(indexer.features, Map("userCountryIndex" -> "userCountry")) > .put(indexer.sortByFrequency, true) > .put(interactor.features, Map("genderMatch" -> Array("userGender", > "targetGender"))) > .put(fvAssembler.features, Map("features" -> Array("genderMatch", > "userCountryIndex", "userFeatures"))) > .put(fvAssembler.dense, true) > .put(treeClassifer.maxDepth, 4) // By default, classifier recognizes > "features" and "label" columns. > val pipeline = Pipeline.create(indexer, interactor, fvAssembler, > treeClassifier) > val model = pipeline.fit(training, paramMap) > sqlContext.jsonFile("/path/to/events", 0.01).registerTempTable("event") > val test = sqlContext.sql(""" > SELECT event.id AS eventId, event.userId AS userId, event.adId AS adId, > user.gender AS userGender, user.country AS userCountry, > user.features AS userFeatures, > ad.targetGender AS targetGender > FROM event JOIN user ON event.userId = user.id JOIN ad ON event.adId = > ad.id;""") > val prediction = model.transform(test).select('eventId, 'prediction) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org