[jira] [Commented] (SPARK-3573) Dataset
[ https://issues.apache.org/jira/browse/SPARK-3573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14194311#comment-14194311 ] Apache Spark commented on SPARK-3573: - User 'mengxr' has created a pull request for this issue: https://github.com/apache/spark/pull/3070 Dataset --- Key: SPARK-3573 URL: https://issues.apache.org/jira/browse/SPARK-3573 Project: Spark Issue Type: Sub-task Components: MLlib Reporter: Xiangrui Meng Assignee: Xiangrui Meng Priority: Critical This JIRA is for discussion of ML dataset, essentially a SchemaRDD with extra ML-specific metadata embedded in its schema. .Sample code Suppose we have training events stored on HDFS and user/ad features in Hive, we want to assemble features for training and then apply decision tree. The proposed pipeline with dataset looks like the following (need more refinements): {code} sqlContext.jsonFile(/path/to/training/events, 0.01).registerTempTable(event) val training = sqlContext.sql( SELECT event.id AS eventId, event.userId AS userId, event.adId AS adId, event.action AS label, user.gender AS userGender, user.country AS userCountry, user.features AS userFeatures, ad.targetGender AS targetGender FROM event JOIN user ON event.userId = user.id JOIN ad ON event.adId = ad.id;).cache() val indexer = new Indexer() val interactor = new Interactor() val fvAssembler = new FeatureVectorAssembler() val treeClassifer = new DecisionTreeClassifer() val paramMap = new ParamMap() .put(indexer.features, Map(userCountryIndex - userCountry)) .put(indexer.sortByFrequency, true) .put(interactor.features, Map(genderMatch - Array(userGender, targetGender))) .put(fvAssembler.features, Map(features - Array(genderMatch, userCountryIndex, userFeatures))) .put(fvAssembler.dense, true) .put(treeClassifer.maxDepth, 4) // By default, classifier recognizes features and label columns. val pipeline = Pipeline.create(indexer, interactor, fvAssembler, treeClassifier) val model = pipeline.fit(training, paramMap) sqlContext.jsonFile(/path/to/events, 0.01).registerTempTable(event) val test = sqlContext.sql( SELECT event.id AS eventId, event.userId AS userId, event.adId AS adId, user.gender AS userGender, user.country AS userCountry, user.features AS userFeatures, ad.targetGender AS targetGender FROM event JOIN user ON event.userId = user.id JOIN ad ON event.adId = ad.id;) val prediction = model.transform(test).select('eventId, 'prediction) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3573) Dataset
[ https://issues.apache.org/jira/browse/SPARK-3573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14188919#comment-14188919 ] Evan Sparks commented on SPARK-3573: This comment originally appeared on the PR associated with this feature. (https://github.com/apache/spark/pull/2919): I've looked at the code here, and it basically seems reasonable. One high-level concern I have is around the programming pattern that this encourages: complex nesting of otherwise simple structure that may make it difficult to program against Datasets for sufficiently complicated applications. A 'dataset' is now a collection of Row, where we have the guarantee that all rows in a Dataset conform to the same schema. A schema is a list of (name, type) pairs which describe the attributes available in the dataset. This seems like a good thing to me, and is pretty much what we described in MLI (and how conventional databases have been structured forever). So far, so good. The concern that I have is that we are now encouraging these attributes to be complex types. For example, where I might have had val x = Schema(('a', classOf[String]), ('b', classOf[Double]), ..., (z, classOf[Double])) This would become val x = Schema(('a', classOf[String]), ('bGroup', classOf[Vector]), .., (zGroup, classOf[Vector])) So, great, my schema now has these vector things in them, which I can create separately, pass around, etc. This clearly has its merits: 1) Features are groups together logically based on the process that creates them. 2) Managing one short schema where each record is comprised of a few large objects (say, 4 vectors, each of length 1000) is probably easier than managing a really big schema comprised of lots small objects (say, 4000 doubles). But, there are some major drawbacks 1) Why only stop at one level of nesting? Why not have Vector[Vector]? 2) How do learning algorithms, like SVM or PCA deal with these Datasets? Is there an implicit conversion that flattens these things to RDD[LabeledPoint]? Do we want to guarantee these semantics? 3) Manipulating and subsetting nested schemas like this might be tricky. Where before I might be able to write: val x: Dataset = input.select(Seq(0,1,2,4,180,181,1000,1001,1002)) now I might have to write val groupSelections = Seq(Seq(0,1,2,4),Seq(0,1),Seq(0,1,2)) val x: Dataset = groupSelections.zip(input.columns).map {case (gs, col) = col(gs) } Ignoring raw syntax and semantics of how you might actually map an operation over the columns of a Dataset and get back a well-structured dataset, I think this makes two conflicting points: 1) In the first example - presumably all the work goes into figuring out what the subset of features you want is in this really wide feature space. 2) In the second example - there’s a lot of gymnastics that goes into subsetting feature groups. I think it’s clear that working with lots of feature groups might get unreasonable pretty quickly. If we look at R or pandas/scikit-learn as examples of projects that have (arguably quite successfully) dealt with these interface issues, there is one basic pattern: learning algorithms expect big tables of numbers as input. Even here, there are some important differences: For example, in scikit-learn, categorical features aren’t supported directly by most learning algorithms. Instead, users are responsible for getting data from “table with heterogenously typed columns” to “table of numbers.” with something like OneHotEncoder and other feature transformers. In R, on the other hand, categorical features are (sometimes frustratingly) first class citizens by virtue of the “factor” data type - which is essentially and enum. Most out-of-the-box learning algorithms (like glm()) accept data frames with categorical inputs and handle them sensibly - implicitly one hot encoding (or creating dummy variables, if you prefer) the categorical features. While I have a slight preference for representing things as big flat tables, I would be fine coding either way - but I wanted to raise the issue for discussion here before the interfaces are set in stone. Dataset --- Key: SPARK-3573 URL: https://issues.apache.org/jira/browse/SPARK-3573 Project: Spark Issue Type: Sub-task Components: MLlib Reporter: Xiangrui Meng Assignee: Xiangrui Meng Priority: Critical This JIRA is for discussion of ML dataset, essentially a SchemaRDD with extra ML-specific metadata embedded in its schema. .Sample code Suppose we have training events stored on HDFS and user/ad features in Hive, we want to assemble features for training and then apply decision tree. The proposed pipeline with dataset looks like the following (need more refinements): {code} sqlContext.jsonFile(/path/to/training/events, 0.01).registerTempTable(event)
[jira] [Commented] (SPARK-3573) Dataset
[ https://issues.apache.org/jira/browse/SPARK-3573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14188980#comment-14188980 ] Joseph K. Bradley commented on SPARK-3573: -- [~sparks] Trying to simplify things, am I right that the main question is: _Should ML data instances/examples/rows be flat vectors or have structure?_ Breaking this down, (1) Should we allow structure? (2) Should we encourage flatness or structure, and how? (3) How does a Dataset used in a full ML pipeline resemble/differ from a Dataset used by a specific ML algorithm? My thoughts: (1) We should allow structure. For general (complicated) pipelines, it will be important to provide structure to make it easy to select groups of features. (2) We should encourage flatness where possible; e.g., unigram features from a document should be stored as a Vector instead of a bunch of Doubles in the Schema. We should encourage structure where meaningful; e.g., the output of a learning algorithm should be appended as a new column (new element in the Schema) by default, rather than being appended to a big Vector of features. (3) As in my comment for (2), a Dataset for a full pipeline should have structure where meaningful. However, I agree that most common ML algorithms expect flat Vectors of features. There needs to be an easy way to select relevant features and transform them to a Vector, LabeledPoint, etc. Having structured Datasets in the pipeline should be useful for selecting relevant features. To transform the selection, it will be important to provide helper methods for mushing the data into Vectors or other common formats. The big challenge in my mind is (2): Figuring out default behavior and perhaps column naming/selection conventions which make it easy to select subsets of features (or even have an implicit selection if possible). What do you think? Dataset --- Key: SPARK-3573 URL: https://issues.apache.org/jira/browse/SPARK-3573 Project: Spark Issue Type: Sub-task Components: MLlib Reporter: Xiangrui Meng Assignee: Xiangrui Meng Priority: Critical This JIRA is for discussion of ML dataset, essentially a SchemaRDD with extra ML-specific metadata embedded in its schema. .Sample code Suppose we have training events stored on HDFS and user/ad features in Hive, we want to assemble features for training and then apply decision tree. The proposed pipeline with dataset looks like the following (need more refinements): {code} sqlContext.jsonFile(/path/to/training/events, 0.01).registerTempTable(event) val training = sqlContext.sql( SELECT event.id AS eventId, event.userId AS userId, event.adId AS adId, event.action AS label, user.gender AS userGender, user.country AS userCountry, user.features AS userFeatures, ad.targetGender AS targetGender FROM event JOIN user ON event.userId = user.id JOIN ad ON event.adId = ad.id;).cache() val indexer = new Indexer() val interactor = new Interactor() val fvAssembler = new FeatureVectorAssembler() val treeClassifer = new DecisionTreeClassifer() val paramMap = new ParamMap() .put(indexer.features, Map(userCountryIndex - userCountry)) .put(indexer.sortByFrequency, true) .put(interactor.features, Map(genderMatch - Array(userGender, targetGender))) .put(fvAssembler.features, Map(features - Array(genderMatch, userCountryIndex, userFeatures))) .put(fvAssembler.dense, true) .put(treeClassifer.maxDepth, 4) // By default, classifier recognizes features and label columns. val pipeline = Pipeline.create(indexer, interactor, fvAssembler, treeClassifier) val model = pipeline.fit(training, paramMap) sqlContext.jsonFile(/path/to/events, 0.01).registerTempTable(event) val test = sqlContext.sql( SELECT event.id AS eventId, event.userId AS userId, event.adId AS adId, user.gender AS userGender, user.country AS userCountry, user.features AS userFeatures, ad.targetGender AS targetGender FROM event JOIN user ON event.userId = user.id JOIN ad ON event.adId = ad.id;) val prediction = model.transform(test).select('eventId, 'prediction) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3573) Dataset
[ https://issues.apache.org/jira/browse/SPARK-3573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14183173#comment-14183173 ] Sandy Ryza commented on SPARK-3573: --- Is this still targeted for 1.2? Dataset --- Key: SPARK-3573 URL: https://issues.apache.org/jira/browse/SPARK-3573 Project: Spark Issue Type: Sub-task Components: MLlib Reporter: Xiangrui Meng Assignee: Xiangrui Meng Priority: Critical This JIRA is for discussion of ML dataset, essentially a SchemaRDD with extra ML-specific metadata embedded in its schema. .Sample code Suppose we have training events stored on HDFS and user/ad features in Hive, we want to assemble features for training and then apply decision tree. The proposed pipeline with dataset looks like the following (need more refinements): {code} sqlContext.jsonFile(/path/to/training/events, 0.01).registerTempTable(event) val training = sqlContext.sql( SELECT event.id AS eventId, event.userId AS userId, event.adId AS adId, event.action AS label, user.gender AS userGender, user.country AS userCountry, user.features AS userFeatures, ad.targetGender AS targetGender FROM event JOIN user ON event.userId = user.id JOIN ad ON event.adId = ad.id;).cache() val indexer = new Indexer() val interactor = new Interactor() val fvAssembler = new FeatureVectorAssembler() val treeClassifer = new DecisionTreeClassifer() val paramMap = new ParamMap() .put(indexer.features, Map(userCountryIndex - userCountry)) .put(indexer.sortByFrequency, true) .put(interactor.features, Map(genderMatch - Array(userGender, targetGender))) .put(fvAssembler.features, Map(features - Array(genderMatch, userCountryIndex, userFeatures))) .put(fvAssembler.dense, true) .put(treeClassifer.maxDepth, 4) // By default, classifier recognizes features and label columns. val pipeline = Pipeline.create(indexer, interactor, fvAssembler, treeClassifier) val model = pipeline.fit(training, paramMap) sqlContext.jsonFile(/path/to/events, 0.01).registerTempTable(event) val test = sqlContext.sql( SELECT event.id AS eventId, event.userId AS userId, event.adId AS adId, user.gender AS userGender, user.country AS userCountry, user.features AS userFeatures, ad.targetGender AS targetGender FROM event JOIN user ON event.userId = user.id JOIN ad ON event.adId = ad.id;) val prediction = model.transform(test).select('eventId, 'prediction) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3573) Dataset
[ https://issues.apache.org/jira/browse/SPARK-3573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14183488#comment-14183488 ] Xiangrui Meng commented on SPARK-3573: -- Yes, both the metadata PR and the UDT PR are out. Dataset --- Key: SPARK-3573 URL: https://issues.apache.org/jira/browse/SPARK-3573 Project: Spark Issue Type: Sub-task Components: MLlib Reporter: Xiangrui Meng Assignee: Xiangrui Meng Priority: Critical This JIRA is for discussion of ML dataset, essentially a SchemaRDD with extra ML-specific metadata embedded in its schema. .Sample code Suppose we have training events stored on HDFS and user/ad features in Hive, we want to assemble features for training and then apply decision tree. The proposed pipeline with dataset looks like the following (need more refinements): {code} sqlContext.jsonFile(/path/to/training/events, 0.01).registerTempTable(event) val training = sqlContext.sql( SELECT event.id AS eventId, event.userId AS userId, event.adId AS adId, event.action AS label, user.gender AS userGender, user.country AS userCountry, user.features AS userFeatures, ad.targetGender AS targetGender FROM event JOIN user ON event.userId = user.id JOIN ad ON event.adId = ad.id;).cache() val indexer = new Indexer() val interactor = new Interactor() val fvAssembler = new FeatureVectorAssembler() val treeClassifer = new DecisionTreeClassifer() val paramMap = new ParamMap() .put(indexer.features, Map(userCountryIndex - userCountry)) .put(indexer.sortByFrequency, true) .put(interactor.features, Map(genderMatch - Array(userGender, targetGender))) .put(fvAssembler.features, Map(features - Array(genderMatch, userCountryIndex, userFeatures))) .put(fvAssembler.dense, true) .put(treeClassifer.maxDepth, 4) // By default, classifier recognizes features and label columns. val pipeline = Pipeline.create(indexer, interactor, fvAssembler, treeClassifier) val model = pipeline.fit(training, paramMap) sqlContext.jsonFile(/path/to/events, 0.01).registerTempTable(event) val test = sqlContext.sql( SELECT event.id AS eventId, event.userId AS userId, event.adId AS adId, user.gender AS userGender, user.country AS userCountry, user.features AS userFeatures, ad.targetGender AS targetGender FROM event JOIN user ON event.userId = user.id JOIN ad ON event.adId = ad.id;) val prediction = model.transform(test).select('eventId, 'prediction) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3573) Dataset
[ https://issues.apache.org/jira/browse/SPARK-3573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14156129#comment-14156129 ] Patrick Wendell commented on SPARK-3573: I think people are hung up on the term SQL - SchemaRDD is designed to simply represent richer types on top of the core RDD API. In fact we though originally of naming the package schema instead of sql for exactly this reason. SchemaRDD is in the sql/core package right now, but we could pull the public interface of a Schema RDD into another package in the future (and maybe we'd drop exposing anything about the logical plan here). I'd like to see a common representation of typed data be used across both SQL and MLlib and longer term other libraries as well. I don't see an insurmountable semantic gap between an R-style data frame and a relational table. In fact, if you look across other projects today - almost all of them are trying to unify these types of data representations. So I'd support seeing where maybe we can enhance or extend SchemaRDD to better support numeric data sets. And if we find there is just too large of a gap here, then we could look at implementing a second dataset abstraction. If nothing else this is a test of whether SchemaRDD is sufficiently extensible to be useful in contexts beyond SQL (which is its original design). Dataset --- Key: SPARK-3573 URL: https://issues.apache.org/jira/browse/SPARK-3573 Project: Spark Issue Type: Sub-task Components: MLlib Reporter: Xiangrui Meng Assignee: Xiangrui Meng Priority: Critical This JIRA is for discussion of ML dataset, essentially a SchemaRDD with extra ML-specific metadata embedded in its schema. .Sample code Suppose we have training events stored on HDFS and user/ad features in Hive, we want to assemble features for training and then apply decision tree. The proposed pipeline with dataset looks like the following (need more refinements): {code} sqlContext.jsonFile(/path/to/training/events, 0.01).registerTempTable(event) val training = sqlContext.sql( SELECT event.id AS eventId, event.userId AS userId, event.adId AS adId, event.action AS label, user.gender AS userGender, user.country AS userCountry, user.features AS userFeatures, ad.targetGender AS targetGender FROM event JOIN user ON event.userId = user.id JOIN ad ON event.adId = ad.id;).cache() val indexer = new Indexer() val interactor = new Interactor() val fvAssembler = new FeatureVectorAssembler() val treeClassifer = new DecisionTreeClassifer() val paramMap = new ParamMap() .put(indexer.features, Map(userCountryIndex - userCountry)) .put(indexer.sortByFrequency, true) .put(interactor.features, Map(genderMatch - Array(userGender, targetGender))) .put(fvAssembler.features, Map(features - Array(genderMatch, userCountryIndex, userFeatures))) .put(fvAssembler.dense, true) .put(treeClassifer.maxDepth, 4) // By default, classifier recognizes features and label columns. val pipeline = Pipeline.create(indexer, interactor, fvAssembler, treeClassifier) val model = pipeline.fit(training, paramMap) sqlContext.jsonFile(/path/to/events, 0.01).registerTempTable(event) val test = sqlContext.sql( SELECT event.id AS eventId, event.userId AS userId, event.adId AS adId, user.gender AS userGender, user.country AS userCountry, user.features AS userFeatures, ad.targetGender AS targetGender FROM event JOIN user ON event.userId = user.id JOIN ad ON event.adId = ad.id;) val prediction = model.transform(test).select('eventId, 'prediction) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3573) Dataset
[ https://issues.apache.org/jira/browse/SPARK-3573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14141063#comment-14141063 ] Sandy Ryza commented on SPARK-3573: --- Currently SchemaRDD does depend on Catalyst. Are you thinking we'd take that out? I wasn't thinking about any specific drawbacks, unless SQL might need to depend on MLLib as well? I guess I'm thinking about it more from the perspective of what mental model we expect users to have when dealing with Datasets. SchemaRDD brings along baggage like LogicalPlans - do users need to understand what that is? SQL and ML types sometimes line up, sometimes have fuzzy relationships, and sometimes can't be translated. How does the mapping get defined? What stops someone from annotating a String column with numeric? Dataset --- Key: SPARK-3573 URL: https://issues.apache.org/jira/browse/SPARK-3573 Project: Spark Issue Type: Sub-task Components: MLlib Reporter: Xiangrui Meng Assignee: Xiangrui Meng Priority: Critical This JIRA is for discussion of ML dataset, essentially a SchemaRDD with extra ML-specific metadata embedded in its schema. .Sample code Suppose we have training events stored on HDFS and user/ad features in Hive, we want to assemble features for training and then apply decision tree. The proposed pipeline with dataset looks like the following (need more refinements): {code} sqlContext.jsonFile(/path/to/training/events, 0.01).registerTempTable(event) val training = sqlContext.sql( SELECT event.id AS eventId, event.userId AS userId, event.adId AS adId, event.action AS label, user.gender AS userGender, user.country AS userCountry, user.features AS userFeatures, ad.targetGender AS targetGender FROM event JOIN user ON event.userId = user.id JOIN ad ON event.adId = ad.id;).cache() val indexer = new Indexer() val interactor = new Interactor() val fvAssembler = new FeatureVectorAssembler() val treeClassifer = new DecisionTreeClassifer() val paramMap = new ParamMap() .put(indexer.features, Map(userCountryIndex - userCountry)) .put(indexer.sortByFrequency, true) .put(iteractor.features, Map(genderMatch - Array(userGender, targetGender))) .put(fvAssembler.features, Map(features - Array(genderMatch, userCountryIndex, userFeatures))) .put(fvAssembler.dense, true) .put(treeClassifer.maxDepth, 4) // By default, classifier recognizes features and label columns. val pipeline = Pipeline.create(indexer, interactor, fvAssembler, treeClassifier) val model = pipeline.fit(raw, paramMap) sqlContext.jsonFile(/path/to/events, 0.01).registerTempTable(event) val test = sqlContext.sql( SELECT event.id AS eventId, event.userId AS userId, event.adId AS adId, user.gender AS userGender, user.country AS userCountry, user.features AS userFeatures, ad.targetGender AS targetGender FROM event JOIN user ON event.userId = user.id JOIN ad ON event.adId = ad.id;) val prediction = model.transform(test).select('eventId, 'prediction) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3573) Dataset
[ https://issues.apache.org/jira/browse/SPARK-3573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14141271#comment-14141271 ] Xiangrui Meng commented on SPARK-3573: -- [~sandyr] SQL/Streaming/GraphX provide computation frameworks, while MLlib is to build machine learning pipelines. It is natural to take leverage on the tools we have built inside Spark. For example, we included streaming machine learning algorithms in 1.1 and we are plan to implement LDA using GraphX. I'm not worried about MLlib depending on SQL. MLlib can provide UDFs related to machine learning. It will be an extension to SQL but SQL doesn't depend on MLlib. There are not many types in ML. One thing we want to add is Vector, and its transformations are supported by MLlib's transformers. With weak types, we cannot prevent users declare a string column as numeric, but errors will be generated at runtime. [~epahomov] If we are talking about a single machine learning algorithm, label, feature, and perhaps weight should be sufficient. However, for a data pipeline, we need more flexible operations. I think we should make it easier for users to construct such a pipeline. Libraries like R and Pandas support dataframes, which is very similar to SchemaRDD, while the latter also provides execution plan. Do we need execution plan? Maybe not in the first stage but we definitely need it for future optimization. For training, we use label/features, and for prediction, we need id/features. Spark SQL can figure out the columns needed and optimize it if the underlying storage is in columnar format. One useful thing we can try is to write down some sample code to construct a pipeline with couple components and re-apply the pipeline to test data. Then take look at the code as users and see whether it is simple to use. At the beginning, I tried to define Instance similar to Weka (https://github.com/mengxr/spark-ml/blob/master/doc/instance.md), but it doesn't work well to address those pipelines. Dataset --- Key: SPARK-3573 URL: https://issues.apache.org/jira/browse/SPARK-3573 Project: Spark Issue Type: Sub-task Components: MLlib Reporter: Xiangrui Meng Assignee: Xiangrui Meng Priority: Critical This JIRA is for discussion of ML dataset, essentially a SchemaRDD with extra ML-specific metadata embedded in its schema. .Sample code Suppose we have training events stored on HDFS and user/ad features in Hive, we want to assemble features for training and then apply decision tree. The proposed pipeline with dataset looks like the following (need more refinements): {code} sqlContext.jsonFile(/path/to/training/events, 0.01).registerTempTable(event) val training = sqlContext.sql( SELECT event.id AS eventId, event.userId AS userId, event.adId AS adId, event.action AS label, user.gender AS userGender, user.country AS userCountry, user.features AS userFeatures, ad.targetGender AS targetGender FROM event JOIN user ON event.userId = user.id JOIN ad ON event.adId = ad.id;).cache() val indexer = new Indexer() val interactor = new Interactor() val fvAssembler = new FeatureVectorAssembler() val treeClassifer = new DecisionTreeClassifer() val paramMap = new ParamMap() .put(indexer.features, Map(userCountryIndex - userCountry)) .put(indexer.sortByFrequency, true) .put(iteractor.features, Map(genderMatch - Array(userGender, targetGender))) .put(fvAssembler.features, Map(features - Array(genderMatch, userCountryIndex, userFeatures))) .put(fvAssembler.dense, true) .put(treeClassifer.maxDepth, 4) // By default, classifier recognizes features and label columns. val pipeline = Pipeline.create(indexer, interactor, fvAssembler, treeClassifier) val model = pipeline.fit(raw, paramMap) sqlContext.jsonFile(/path/to/events, 0.01).registerTempTable(event) val test = sqlContext.sql( SELECT event.id AS eventId, event.userId AS userId, event.adId AS adId, user.gender AS userGender, user.country AS userCountry, user.features AS userFeatures, ad.targetGender AS targetGender FROM event JOIN user ON event.userId = user.id JOIN ad ON event.adId = ad.id;) val prediction = model.transform(test).select('eventId, 'prediction) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3573) Dataset
[ https://issues.apache.org/jira/browse/SPARK-3573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14139958#comment-14139958 ] Sandy Ryza commented on SPARK-3573: --- Currently SchemaRDD lives inside SQL. Would we move it to core if we plan to use it in components that aren't related to SQL? Dataset --- Key: SPARK-3573 URL: https://issues.apache.org/jira/browse/SPARK-3573 Project: Spark Issue Type: Sub-task Components: MLlib Reporter: Xiangrui Meng Assignee: Xiangrui Meng Priority: Critical This JIRA is for discussion of ML dataset, essentially a SchemaRDD with extra ML-specific metadata embedded in its schema. .Sample code Suppose we have training events stored on HDFS and user/ad features in Hive, we want to assemble features for training and then apply decision tree. The proposed pipeline with dataset looks like the following (need more refinements): {code} sqlContext.jsonFile(/path/to/training/events, 0.01).registerTempTable(event) val training = sqlContext.sql( SELECT event.id AS eventId, event.userId AS userId, event.adId AS adId, event.action AS label, user.gender AS userGender, user.country AS userCountry, user.features AS userFeatures, ad.targetGender AS targetGender FROM event JOIN user ON event.userId = user.id JOIN ad ON event.adId = ad.id;).cache() val indexer = new Indexer() val interactor = new Interactor() val fvAssembler = new FeatureVectorAssembler() val treeClassifer = new DecisionTreeClassifer() val paramMap = new ParamMap() .put(indexer.features, Map(userCountryIndex - userCountry)) .put(indexer.sortByFrequency, true) .put(iteractor.features, Map(genderMatch - Array(userGender, targetGender))) .put(fvAssembler.features, Map(features - Array(genderMatch, userCountryIndex, userFeatures))) .put(fvAssembler.dense, true) .put(treeClassifer.maxDepth, 4) // By default, classifier recognizes features and label columns. val pipeline = Pipeline.create(indexer, interactor, fvAssembler, treeClassifier) val model = pipeline.fit(raw, paramMap) sqlContext.jsonFile(/path/to/events, 0.01).registerTempTable(event) val test = sqlContext.sql( SELECT event.id AS eventId, event.userId AS userId, event.adId AS adId, user.gender AS userGender, user.country AS userCountry, user.features AS userFeatures, ad.targetGender AS targetGender FROM event JOIN user ON event.userId = user.id JOIN ad ON event.adId = ad.id;) val prediction = model.transform(test).select('eventId, 'prediction) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3573) Dataset
[ https://issues.apache.org/jira/browse/SPARK-3573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14140017#comment-14140017 ] Patrick Wendell commented on SPARK-3573: [~sandyr] This is a good question I'm not sure how easy it would be to decouple SchemaRDD from the other things inside of sql/core. This definitely doesn't need to depend on catalyst or on hive, but it might need to depend on the entire sql core. I've been thinking about whether this is bad to have a growing number cross-dependencies in the projects. Do you see specific drawbacks here if that becomes the case? Dataset --- Key: SPARK-3573 URL: https://issues.apache.org/jira/browse/SPARK-3573 Project: Spark Issue Type: Sub-task Components: MLlib Reporter: Xiangrui Meng Assignee: Xiangrui Meng Priority: Critical This JIRA is for discussion of ML dataset, essentially a SchemaRDD with extra ML-specific metadata embedded in its schema. .Sample code Suppose we have training events stored on HDFS and user/ad features in Hive, we want to assemble features for training and then apply decision tree. The proposed pipeline with dataset looks like the following (need more refinements): {code} sqlContext.jsonFile(/path/to/training/events, 0.01).registerTempTable(event) val training = sqlContext.sql( SELECT event.id AS eventId, event.userId AS userId, event.adId AS adId, event.action AS label, user.gender AS userGender, user.country AS userCountry, user.features AS userFeatures, ad.targetGender AS targetGender FROM event JOIN user ON event.userId = user.id JOIN ad ON event.adId = ad.id;).cache() val indexer = new Indexer() val interactor = new Interactor() val fvAssembler = new FeatureVectorAssembler() val treeClassifer = new DecisionTreeClassifer() val paramMap = new ParamMap() .put(indexer.features, Map(userCountryIndex - userCountry)) .put(indexer.sortByFrequency, true) .put(iteractor.features, Map(genderMatch - Array(userGender, targetGender))) .put(fvAssembler.features, Map(features - Array(genderMatch, userCountryIndex, userFeatures))) .put(fvAssembler.dense, true) .put(treeClassifer.maxDepth, 4) // By default, classifier recognizes features and label columns. val pipeline = Pipeline.create(indexer, interactor, fvAssembler, treeClassifier) val model = pipeline.fit(raw, paramMap) sqlContext.jsonFile(/path/to/events, 0.01).registerTempTable(event) val test = sqlContext.sql( SELECT event.id AS eventId, event.userId AS userId, event.adId AS adId, user.gender AS userGender, user.country AS userCountry, user.features AS userFeatures, ad.targetGender AS targetGender FROM event JOIN user ON event.userId = user.id JOIN ad ON event.adId = ad.id;) val prediction = model.transform(test).select('eventId, 'prediction) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org