[ 
https://issues.apache.org/jira/browse/SPARK-3573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14141271#comment-14141271
 ] 

Xiangrui Meng commented on SPARK-3573:
--------------------------------------

[~sandyr] SQL/Streaming/GraphX provide computation frameworks, while MLlib is 
to build machine learning pipelines. It is natural to take leverage on the 
tools we have built inside Spark. For example, we included streaming machine 
learning algorithms in 1.1 and we are plan to implement LDA using GraphX. I'm 
not worried about MLlib depending on SQL. MLlib can provide UDFs related to 
machine learning. It will be an extension to SQL but SQL doesn't depend on 
MLlib. There are not many types in ML. One thing we want to add is Vector, and 
its transformations are supported by MLlib's transformers. With weak types, we 
cannot prevent users declare a string column as numeric, but errors will be 
generated at runtime.

[~epahomov] If we are talking about a single machine learning algorithm, label, 
feature, and perhaps weight should be sufficient. However, for a data pipeline, 
we need more flexible operations. I think we should make it easier for users to 
construct such a pipeline. Libraries like R and Pandas support dataframes, 
which is very similar to SchemaRDD, while the latter also provides execution 
plan. Do we need execution plan? Maybe not in the first stage but we definitely 
need it for future optimization. For training, we use label/features, and for 
prediction, we need id/features. Spark SQL can figure out the columns needed 
and optimize it if the underlying storage is in columnar format.

One useful thing we can try is to write down some sample code to construct a 
pipeline with couple components and re-apply the pipeline to test data. Then 
take look at the code as users and see whether it is simple to use. At the 
beginning, I tried to define Instance similar to Weka 
(https://github.com/mengxr/spark-ml/blob/master/doc/instance.md), but it 
doesn't work well to address those pipelines.

> Dataset
> -------
>
>                 Key: SPARK-3573
>                 URL: https://issues.apache.org/jira/browse/SPARK-3573
>             Project: Spark
>          Issue Type: Sub-task
>          Components: MLlib
>            Reporter: Xiangrui Meng
>            Assignee: Xiangrui Meng
>            Priority: Critical
>
> This JIRA is for discussion of ML dataset, essentially a SchemaRDD with extra 
> ML-specific metadata embedded in its schema.
> .Sample code
> Suppose we have training events stored on HDFS and user/ad features in Hive, 
> we want to assemble features for training and then apply decision tree.
> The proposed pipeline with dataset looks like the following (need more 
> refinements):
> {code}
> sqlContext.jsonFile("/path/to/training/events", 
> 0.01).registerTempTable("event")
> val training = sqlContext.sql("""
>   SELECT event.id AS eventId, event.userId AS userId, event.adId AS adId, 
> event.action AS label,
>          user.gender AS userGender, user.country AS userCountry, 
> user.features AS userFeatures,
>          ad.targetGender AS targetGender
>     FROM event JOIN user ON event.userId = user.id JOIN ad ON event.adId = 
> ad.id;""").cache()
> val indexer = new Indexer()
> val interactor = new Interactor()
> val fvAssembler = new FeatureVectorAssembler()
> val treeClassifer = new DecisionTreeClassifer()
> val paramMap = new ParamMap()
>   .put(indexer.features, Map("userCountryIndex" -> "userCountry"))
>   .put(indexer.sortByFrequency, true)
>   .put(iteractor.features, Map("genderMatch" -> Array("userGender", 
> "targetGender")))
>   .put(fvAssembler.features, Map("features" -> Array("genderMatch", 
> "userCountryIndex", "userFeatures")))
>   .put(fvAssembler.dense, true)
>   .put(treeClassifer.maxDepth, 4) // By default, classifier recognizes 
> "features" and "label" columns.
> val pipeline = Pipeline.create(indexer, interactor, fvAssembler, 
> treeClassifier)
> val model = pipeline.fit(raw, paramMap)
> sqlContext.jsonFile("/path/to/events", 0.01).registerTempTable("event")
> val test = sqlContext.sql("""
>   SELECT event.id AS eventId, event.userId AS userId, event.adId AS adId,
>          user.gender AS userGender, user.country AS userCountry, 
> user.features AS userFeatures,
>          ad.targetGender AS targetGender
>     FROM event JOIN user ON event.userId = user.id JOIN ad ON event.adId = 
> ad.id;""")
> val prediction = model.transform(test).select('eventId, 'prediction)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to