Re: [EXTERNAL] - Re: Problem with the ML ALS algorithm

2019-06-26 Thread Nick Pentreath
Generally I would say 10s is a bit low, while a few 100s+ starts to make
sense. Of course it depends a lot on the specific use case, item catalogue
etc, user experience / platform, etc.

On Wed, Jun 26, 2019 at 3:57 PM Steve Pruitt  wrote:

> I should have mentioned this is a synthetic dataset I create using some
> likelihood distributions of the rating values.  I am only experimenting /
> learning.  In practice though, the list of items is likely to be at least
> in the 10’s if not 100’s.  Are even this item numbers to low?
>
>
>
> Thanks.
>
>
>
> -S
>
>
>
> *From:* Nick Pentreath 
> *Sent:* Wednesday, June 26, 2019 9:09 AM
> *To:* user@spark.apache.org
> *Subject:* Re: [EXTERNAL] - Re: Problem with the ML ALS algorithm
>
>
>
> If the number of items is indeed 4, then another issue is the rank of the
> factors defaults to 10. Setting the "rank" parameter < 4 will help.
>
>
>
> However, if you only have 4 items, then I would propose that using ALS (or
> any recommendation model in fact) is not really necessary. There is not
> really enough information as well as sparsity, to make collaborative
> filtering useful. And you could simply recommend all items a user has not
> rated and the result would be the same essentially.
>
>
>
>
>
> On Wed, Jun 26, 2019 at 3:03 PM Steve Pruitt  wrote:
>
> Number of users is 1055
>
> Number of items is 4
>
> Ratings values are either 120, 20, 0
>
>
>
>
>
> *From:* Nick Pentreath 
> *Sent:* Wednesday, June 26, 2019 6:03 AM
> *To:* user@spark.apache.org
> *Subject:* [EXTERNAL] - Re: Problem with the ML ALS algorithm
>
>
>
> This means that the matrix that ALS is trying to factor is not positive
> definite. Try increasing regParam (try 0.1, 1.0 for example).
>
>
>
> What does the data look like? e.g. number of users, number of items,
> number of ratings, etc?
>
>
>
> On Wed, Jun 26, 2019 at 12:06 AM Steve Pruitt 
> wrote:
>
> I get an inexplicable exception when trying to build an ALSModel with the
> implicit set to true.  I can’t find any help online.
>
>
>
> Thanks in advance.
>
>
>
> My code is:
>
>
>
> ALS als = new ALS()
>
> .setMaxIter(5)
>
> .setRegParam(0.01)
>
> .setUserCol("customer")
>
> .setItemCol("item")
>
> .setImplicitPrefs(true)
>
> .setRatingCol("rating");
>
> ALSModel model = als.fit(training);
>
>
>
> The exception is:
>
> org.apache.spark.ml.optim.SingularMatrixException: LAPACK.dppsv returned 6
> because A is not positive definite. Is A derived from a singular matrix
> (e.g. collinear column values)?
>
> at
> org.apache.spark.mllib.linalg.CholeskyDecomposition$.checkReturnValue(CholeskyDecomposition.scala:65)
> ~[spark-mllib_2.11-2.3.1.jar:2.3.1]
>
> at
> org.apache.spark.mllib.linalg.CholeskyDecomposition$.solve(CholeskyDecomposition.scala:41)
> ~[spark-mllib_2.11-2.3.1.jar:2.3.1]
>
> at
> org.apache.spark.ml.recommendation.ALS$CholeskySolver.solve(ALS.scala:747)
> ~[spark-mllib_2.11-2.3.1.jar:2.3.1]
>
>


Re: [EXTERNAL] - Re: Problem with the ML ALS algorithm

2019-06-26 Thread Nick Pentreath
If the number of items is indeed 4, then another issue is the rank of the
factors defaults to 10. Setting the "rank" parameter < 4 will help.

However, if you only have 4 items, then I would propose that using ALS (or
any recommendation model in fact) is not really necessary. There is not
really enough information as well as sparsity, to make collaborative
filtering useful. And you could simply recommend all items a user has not
rated and the result would be the same essentially.


On Wed, Jun 26, 2019 at 3:03 PM Steve Pruitt  wrote:

> Number of users is 1055
>
> Number of items is 4
>
> Ratings values are either 120, 20, 0
>
>
>
>
>
> *From:* Nick Pentreath 
> *Sent:* Wednesday, June 26, 2019 6:03 AM
> *To:* user@spark.apache.org
> *Subject:* [EXTERNAL] - Re: Problem with the ML ALS algorithm
>
>
>
> This means that the matrix that ALS is trying to factor is not positive
> definite. Try increasing regParam (try 0.1, 1.0 for example).
>
>
>
> What does the data look like? e.g. number of users, number of items,
> number of ratings, etc?
>
>
>
> On Wed, Jun 26, 2019 at 12:06 AM Steve Pruitt 
> wrote:
>
> I get an inexplicable exception when trying to build an ALSModel with the
> implicit set to true.  I can’t find any help online.
>
>
>
> Thanks in advance.
>
>
>
> My code is:
>
>
>
> ALS als = new ALS()
>
> .setMaxIter(5)
>
> .setRegParam(0.01)
>
> .setUserCol("customer")
>
> .setItemCol("item")
>
> .setImplicitPrefs(true)
>
> .setRatingCol("rating");
>
> ALSModel model = als.fit(training);
>
>
>
> The exception is:
>
> org.apache.spark.ml.optim.SingularMatrixException: LAPACK.dppsv returned 6
> because A is not positive definite. Is A derived from a singular matrix
> (e.g. collinear column values)?
>
> at
> org.apache.spark.mllib.linalg.CholeskyDecomposition$.checkReturnValue(CholeskyDecomposition.scala:65)
> ~[spark-mllib_2.11-2.3.1.jar:2.3.1]
>
> at
> org.apache.spark.mllib.linalg.CholeskyDecomposition$.solve(CholeskyDecomposition.scala:41)
> ~[spark-mllib_2.11-2.3.1.jar:2.3.1]
>
> at
> org.apache.spark.ml.recommendation.ALS$CholeskySolver.solve(ALS.scala:747)
> ~[spark-mllib_2.11-2.3.1.jar:2.3.1]
>
>


Re: How to use StringIndexer for multiple input /output columns in Spark Java

2018-05-15 Thread Nick Pentreath
Multi column support for StringIndexer didn’t make it into Spark 2.3.0

The PR is still in progress I think - should be available in 2.4.0

On Mon, 14 May 2018 at 22:32, Mina Aslani  wrote:

> Please take a look at the api doc:
> https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/ml/feature/StringIndexer.html
>
> On Mon, May 14, 2018 at 4:30 PM, Mina Aslani  wrote:
>
>> Hi,
>>
>> There is no SetInputCols/SetOutputCols for StringIndexer in Spark java.
>> How multiple input/output columns can be specified then?
>>
>> Regards,
>> Mina
>>
>
>


Re: A naive ML question

2018-04-29 Thread Nick Pentreath
One potential approach could be to construct a transition matrix showing
the probability of moving from each state to another state. This can be
visualized with a “heat map” encoding (I think matshow in numpy/matplotlib
does this).

On Sat, 28 Apr 2018 at 21:34, kant kodali  wrote:

> Hi,
>
> I mean a transaction goes typically goes through different states like
> STARTED, PENDING, CANCELLED, COMPLETED, SETTLED etc...
>
> Thanks,
> kant
>
> On Sat, Apr 28, 2018 at 4:11 AM, Jörn Franke  wrote:
>
>> What do you mean by “how it evolved over time” ? A transaction describes
>> basically an action at a certain point of time. Do you mean how a financial
>> product evolved over time given a set of a transactions?
>>
>> > On 28. Apr 2018, at 12:46, kant kodali  wrote:
>> >
>> > Hi All,
>> >
>> > I have a bunch of financial transactional data and I was wondering if
>> there is any ML model that can give me a graph structure for this data?
>> other words, show how a transaction had evolved over time?
>> >
>> > Any suggestions or references would help.
>> >
>> > Thanks!
>> >
>>
>
>


Re: StringIndexer with high cardinality huge data

2018-04-10 Thread Nick Pentreath
Also check out FeatureHasher in Spark 2.3.0 which is designed to handle
this use case in a more natural way than HashingTF (and handles multiple
columns at once).



On Tue, 10 Apr 2018 at 16:00, Filipp Zhinkin 
wrote:

> Hi Shahab,
>
> do you actually need to have a few columns with such a huge amount of
> categories whose value depends on original value's frequency?
>
> If no, then you may use value's hash code as a category or combine all
> columns into a single vector using HashingTF.
>
> Regards,
> Filipp.
>
> On Tue, Apr 10, 2018 at 4:01 PM, Shahab Yunus 
> wrote:
> > Is the StringIndexer keeps all the mapped label to indices in the memory
> of
> > the driver machine? It seems to be unless I am missing something.
> >
> > What if our data that needs to be indexed is huge and columns to be
> indexed
> > are high cardinality (or with lots of categories) and more than one such
> > column need to be indexed? Meaning it wouldn't fit in memory.
> >
> > Thanks.
> >
> > Regards,
> > Shahab
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark MLlib: Should I call .cache before fitting a model?

2018-02-27 Thread Nick Pentreath
Currently, fit for many (most I think) models will cache the input data.
For LogisticRegression this is definitely the case, so you won't get any
benefit from caching it yourself.

On Tue, 27 Feb 2018 at 21:25 Gevorg Hari  wrote:

> Imagine that I am training a Spark MLlib model as follows:
>
> val traingData = loadTrainingData(...)val logisticRegression = new 
> LogisticRegression()
>
> traingData.cacheval logisticRegressionModel = 
> logisticRegression.fit(trainingData)
>
> Does the call traingData.cache improve performances at training time or
> is it not needed?
>
> Does the .fit(...) method for a ML algorithm call cache/unpersist
> internally?
>
>


Re: Reverse MinMaxScaler in SparkML

2018-01-29 Thread Nick Pentreath
This would be interesting and a good addition I think.

It bears some thought about the API though. One approach is to have an
"inverseTransform" method similar to sklearn.

The other approach is to "formalize" something like StringIndexerModel ->
IndexToString. Here, the inverse transformer is a standalone transformer.
It could be returned from a "getInverseTransformer" method, for example.

The former approach is simpler, but cannot be used in pipelines (which work
on "fit" / "transform"). The latter approach is more cumbersome, but fits
better into pipelines.

So it depends on the use cases - i.e. how common is it to use the inverse
transform function within a pipeline (for StringIndexer <-> IndexToString
it is quite common to get back the labels, while for other transformers it
may or may not be).

On Mon, 8 Jan 2018 at 11:10 Tomasz Dudek 
wrote:

> Hello,
>
> since the similar question on StackOverflow remains unanswered (
> https://stackoverflow.com/questions/46092114/is-there-no-inverse-transform-method-for-a-scaler-like-minmaxscaler-in-spark
> ) and perhaps there is a solution that I am not aware of, I'll ask:
>
> After traning MinMaxScaler(or similar scaler) is there any built-in way to
> revert the process? What I mean is to transform the scaled data back to its
> original form. SKlearn has a dedicated method inverse_transform that does
> exactly that.
>
> I can, of course, get the originalMin/originalMax Vectors from the
> MinMaxScalerModel and then map the values myself but it would be nice to
> have it built-in.
>
> Yours,
> Tomasz
>
>


Re: Has there been any explanation on the performance degradation between spark.ml and Mllib?

2018-01-21 Thread Nick Pentreath
At least one of their comparisons is flawed.

The Spark ML version of linear regression (*note* they use linear
regression and not logistic regression, it is not clear why) uses L-BFGS as
the solver, not SGD (as MLLIB uses). Hence it is typically going to be
slower. However, it should in most cases converge to a better solution.
MLLIB doesn't offer an L-BFGS version for linear regression, but it does
for logistic regression.

In my view a more sensible comparison would be between LogReg with L-BFGS
between ML and MLLIB. These should be close to identical since now the
MLLIB version actually wraps the ML version.

They also don't show any results for algorithm performance (accuracy, AUC
etc). The better comparison to make is the run-time to achieve the same AUC
(for example). SGD may be fast, but it may result in a significantly poorer
solution relative to say L-BFGS.

Note that the "withSGD" algorithms are deprecated in MLLIB partly to move
users to ML, but also partly because their performance in terms of accuracy
is relatively poor and the amount of tuning required (e.g. learning rates)
is high.

They say:

The time difference between Spark MLlib and Spark ML can be explained by
internally transforming the dataset from DataFrame to RDD in order to use
the same implementation of the algorithm present in MLlib.

but this is not true for the LR example.

For the feature selection example, it is probably mostly due to the
conversion, but even then the difference seems larger than what I would
expect. It would be worth investigating their implementation to see if
there are other potential underlying causes.


On Sun, 21 Jan 2018 at 23:49 Stephen Boesch  wrote:

> While MLLib performed favorably vs Flink it *also *performed favorably vs
> spark.ml ..  and by an *order of magnitude*.  The following is one of the
> tables - it is for Logistic Regression.  At that time spark.ML did not yet
> support SVM
>
> From:
> https://bdataanalytics.biomedcentral.com/articles/10.1186/s41044-016-0020-2
>
>
>
> Table 3
>
> LR learning time in seconds
>
> Dataset
>
> Spark MLlib
>
> Spark ML
>
> Flink
>
> ECBDL14-10
>
> 3
>
> 26
>
> 181
>
> ECBDL14-30
>
> 5
>
> 63
>
> 815
>
> ECBDL14-50
>
> 6
>
> 173
>
> 1314
>
> ECBDL14-75
>
> 8
>
> 260
>
> 1878
>
> ECBDL14-100
>
> 12
>
> 415
>
> 2566
>
> The DataFrame based API (spark.ml) is even slower vs the RDD (mllib) than
> had been anticipated - yet the latter has been shutdown for several
> versions of Spark already.  What is the thought process behind that
> decision : *performance matters! *Is there visibility into a meaningful
> narrowing of that gap?
>


Re: [ML] Allow CrossValidation ParamGrid on SVMWithSGD

2018-01-19 Thread Nick Pentreath
SVMWithSGD sits in the older "mllib" package and is not compatible directly
with the DataFrame API. I suppose one could write a ML-API wrapper around
it.

However, there is LinearSVC in Spark 2.2.x:
http://spark.apache.org/docs/latest/ml-classification-regression.html#linear-support-vector-machine

You should use that instead I would say.

On Fri, 19 Jan 2018 at 13:59 Tomasz Dudek 
wrote:

> Hello,
>
> is there any way to use CrossValidation's ParamGrid with SVMWithSGD?
>
> usually, when e.g. using RandomForest you can specify a lot of parameters,
> to automatise the param grid search (when used with CrossValidation)
>
> val algorithm = new RandomForestClassifier()
> val paramGrid = { new ParamGridBuilder()
>   .addGrid(algorithm.impurity, Array("gini", "entropy"))
>   .addGrid(algorithm.maxDepth, Array(3, 5, 10))
>   .addGrid(algorithm.numTrees, Array(2, 3, 5, 15, 50))
>   .addGrid(algorithm.minInfoGain, Array(0.01, 0.001))
>   .addGrid(algorithm.minInstancesPerNode, Array(10, 50, 500))
>   .build()
> }
>
> with SGDWIthSGD however, the parameters are inside GradientDescent. You
> can explicitly tune the params, either by using SGDWithSGD's constructor or
> by calling setters here:
>
> val algorithm = new SVMWithSGD()
> algorithm.optimizer.setMiniBatchFraction(256)
>   .setNumIterations(200)
>   .setRegParam(0.01)
>
> those two ways however restrict me from using ParamGridBuilder correctly.
>
> There are no such things as algorithm.optimizer.numIterations or
> algorithm.optimizer.regParam, only setters(and ParamGrid requires Params,
> not setters)
>
> I could of course create each SVM model manually, create one huge Pipeline
> with each model saving its result to different column and then manually
> decide which performed the best. It requires a lot of coding and so far
> CrossValidation's ParamGrid did that job for me instead.
>
> Am I missing something? Is it WIP or is there any hack to do that?
>
> Yours,
> Tomasz
>


Re: does "Deep Learning Pipelines" scale out linearly?

2017-11-22 Thread Nick Pentreath
For that package specifically it’s best to see if they have a mailing list
and if not perhaps ask on github issues.

Having said that perhaps the folks involved in that package will reply here
too.

On Wed, 22 Nov 2017 at 20:03, Andy Davidson 
wrote:

> I am starting a new deep learning project currently we do all of our work
> on a single machine using a combination of Keras and Tensor flow.
> https://databricks.github.io/spark-deep-learning/site/index.html looks
> very promising. Any idea how performance is likely to improve as I add
> machines to my my cluster?
>
> Kind regards
>
> Andy
>
>
> P.s. Is user@spark.apache.org the best place to ask questions about this
> package?
>
>
>


Re: StringIndexer on several columns in a DataFrame with Scala

2017-10-30 Thread Nick Pentreath
For now, you must follow this approach of constructing a pipeline
consisting of a StringIndexer for each categorical column. See
https://issues.apache.org/jira/browse/SPARK-11215 for the related JIRA to
allow multiple columns for StringIndexer, which is being worked on
currently.

The reason you're seeing a NPE is:

var indexers: Array[StringIndexer] = null

and then you're trying to append an element to something that is null.

Try this instead:

var indexers: Array[StringIndexer] = Array()


But even better is a more functional approach:

val indexers = featureCol.map { colName =>

  new StringIndexer().setInputCol(colName).setOutpucol(colName + "_indexed")

}


On Fri, 27 Oct 2017 at 22:29 Md. Rezaul Karim <
rezaul.ka...@insight-centre.org> wrote:

> Hi All,
>
> There are several categorical columns in my dataset as follows:
> [image: grafik.png]
>
> How can I transform values in each (categorical) columns into numeric
> using StringIndexer so that the resulting DataFrame can be feed into
> VectorAssembler to generate a feature vector?
>
> A naive approach that I can try using StringIndexer for each categorical
> column. But that sounds hilarious, I know.
> A possible workaround
> in
> PySpark is combining several StringIndexer on a list and use a Pipeline
> to execute them all as follows:
>
> from pyspark.ml import Pipelinefrom pyspark.ml.feature import StringIndexer
> indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(df) 
> for column in list(set(df.columns)-set(['date'])) ]
> pipeline = Pipeline(stages=indexers)
> df_r = pipeline.fit(df).transform(df)
> df_r.show()
>
> How I can do the same in Scala? I tried the following:
>
> val featureCol = trainingDF.columns
> var indexers: Array[StringIndexer] = null
>
> for (colName <- featureCol) {
>   val index = new StringIndexer()
> .setInputCol(colName)
> .setOutputCol(colName + "_indexed")
> //.fit(trainDF)
>   indexers = indexers :+ index
> }
>
>  val pipeline = new Pipeline()
> .setStages(indexers)
> val newDF = pipeline.fit(trainingDF).transform(trainingDF)
> newDF.show()
>
> However, I am experiencing NullPointerException at
>
> for (colName <- featureCol)
>
> I am sure, I am doing something wrong. Any suggestion?
>
>
>
> Regards,
> _
> *Md. Rezaul Karim*, BSc, MSc
> Researcher, INSIGHT Centre for Data Analytics
> National University of Ireland, Galway
> IDA Business Park, Dangan, Galway, Ireland
> Web: http://www.reza-analytics.eu/index.html
> 
>


Re: How to run MLlib's word2vec in CBOW mode?

2017-09-28 Thread Nick Pentreath
MLlib currently doesn't support CBOW - there is an open PR for it (see
https://issues.apache.org/jira/browse/SPARK-20372).

On Thu, 28 Sep 2017 at 09:56 pun  wrote:

> Hello,
> My understanding is that word2vec can be ran in two modes:
>
>- continuous bag-of-words (CBOW) (order of words does not matter)
>- continuous skip-gram (order of words matters)
>
> I would like to run the *CBOW* implementation from Spark's MLlib, but it
> is not clear to me from the documentation and their example how to do it.
> This is the example listed on their page. From:
> https://spark.apache.org/docs/2.1.0/mllib-feature-extraction.html#example
>
> import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel}
>
> val input = sc.textFile("data/mllib/sample_lda_data.txt").map(line => 
> line.split(" ").toSeq)
>
> val word2vec = new Word2Vec()
>
> val model = word2vec.fit(input)
>
> val synonyms = model.findSynonyms("1", 5)
>
> for((synonym, cosineSimilarity) <- synonyms) {
>   println(s"$synonym $cosineSimilarity")
> }
>
> *My questions:*
>
>- Which of the two modes does this example use?
>- Do you know how I can run the model in the CBOW mode?
>
> Thanks in advance!
> --
> Sent from the Apache Spark User List mailing list archive
>  at Nabble.com.
>


Re: isCached

2017-09-01 Thread Nick Pentreath
No unfortunately not - as i recall storageLevel accesses some private
methods to get the result.

On Fri, 1 Sep 2017 at 17:55, Nathan Kronenfeld
<nkronenfeld@uncharted.software> wrote:

> Ah, in 2.1.0.
>
> I'm in 2.0.1 at the moment... is there any way that works that far back?
>
> On Fri, Sep 1, 2017 at 11:46 AM, Nick Pentreath <nick.pentre...@gmail.com>
> wrote:
>
>> Dataset does have storageLevel. So you can use isCached = (storageLevel
>> != StorageLevel.NONE) as a test.
>>
>> Arguably isCached could be added to dataset too, shouldn't be a
>> controversial change.
>>
>> On Fri, 1 Sep 2017 at 17:31, Nathan Kronenfeld
>> <nkronenfeld@uncharted.software> wrote:
>>
>>> I'm currently porting some of our code from RDDs to Datasets.
>>>
>>> With RDDs it's pretty easy to figure out if they are cached or not.
>>>
>>> I notice that the catalog has a function for determining this on
>>> Datasets too, but it's private[sql].  Is there any reason for it not to be
>>> public?  Is there any way at the moment to determine if a dataset is cached
>>> or not?
>>>
>>> Thanks in advance
>>>-Nathan Kronenfeld
>>>
>>
>


Re: isCached

2017-09-01 Thread Nick Pentreath
Dataset does have storageLevel. So you can use isCached = (storageLevel !=
StorageLevel.NONE) as a test.

Arguably isCached could be added to dataset too, shouldn't be a
controversial change.

On Fri, 1 Sep 2017 at 17:31, Nathan Kronenfeld
 wrote:

> I'm currently porting some of our code from RDDs to Datasets.
>
> With RDDs it's pretty easy to figure out if they are cached or not.
>
> I notice that the catalog has a function for determining this on Datasets
> too, but it's private[sql].  Is there any reason for it not to be public?
> Is there any way at the moment to determine if a dataset is cached or not?
>
> Thanks in advance
>-Nathan Kronenfeld
>


Re: Updates on migration guides

2017-08-30 Thread Nick Pentreath
MLlib has tried quite hard to ensure the migration guide is up to date for
each release. I think generally we catch all breaking and most major
behavior changes

On Wed, 30 Aug 2017 at 17:02, Dongjoon Hyun  wrote:

> +1
>
> On Wed, Aug 30, 2017 at 7:54 AM, Xiao Li  wrote:
>
>> Hi, Devs,
>>
>> Many questions from the open source community are actually caused by the
>> behavior changes we made in each release. So far, the migration guides
>> (e.g.,
>> https://spark.apache.org/docs/latest/sql-programming-guide.html#migration-guide)
>> were not being properly updated. In the last few releases, multiple
>> behavior changes are not documented in migration guides and even release
>> notes. I propose to do the document updates in the same PRs that introduce
>> the behavior changes. If the contributors can't make it, the committers who
>> merge the PRs need to do it instead. We also can create a dedicated page
>> for migration guides of all the components. Hopefully, this can assist the
>> migration efforts.
>>
>> Thanks,
>>
>> Xiao Li
>>
>
>


Re: Setting initial weights of ml.classification.LogisticRegression similar to mllib.classification.LogisticRegressionWithLBFGS

2017-07-20 Thread Nick Pentreath
weightCol sets the weight for each individual row of data (training
example). It does not set the initial coefficients.

On Thu, 20 Jul 2017 at 10:22 Aseem Bansal <asmbans...@gmail.com> wrote:

> Hi
>
> I had asked about this somewhere else too and was told that weightCol
> method does that
>
> On Thu, Jul 20, 2017 at 12:50 PM, Nick Pentreath <nick.pentre...@gmail.com
> > wrote:
>
>> Currently it's not supported, but is on the roadmap: see
>> https://issues.apache.org/jira/browse/SPARK-13025
>>
>> The most recent attempt is to start with simple linear regression, as
>> here: https://issues.apache.org/jira/browse/SPARK-21386
>>
>>
>> On Thu, 20 Jul 2017 at 08:36 Aseem Bansal <asmbans...@gmail.com> wrote:
>>
>>> We were able to set initial weights on
>>> https://spark.apache.org/docs/2.1.0/api/scala/index.html#org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS
>>>
>>> How can we set the initial weights on
>>> https://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.ml.classification.LogisticRegression
>>> similar to above
>>>
>>> Trying to migrate some code from mllib version to the ml version
>>>
>>
>


Re: Setting initial weights of ml.classification.LogisticRegression similar to mllib.classification.LogisticRegressionWithLBFGS

2017-07-20 Thread Nick Pentreath
Currently it's not supported, but is on the roadmap: see
https://issues.apache.org/jira/browse/SPARK-13025

The most recent attempt is to start with simple linear regression, as here:
https://issues.apache.org/jira/browse/SPARK-21386

On Thu, 20 Jul 2017 at 08:36 Aseem Bansal  wrote:

> We were able to set initial weights on
> https://spark.apache.org/docs/2.1.0/api/scala/index.html#org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS
>
> How can we set the initial weights on
> https://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.ml.classification.LogisticRegression
> similar to above
>
> Trying to migrate some code from mllib version to the ml version
>


Re: Regarding Logistic Regression changes in Spark 2.2.0

2017-07-19 Thread Nick Pentreath
L-BFGS is the default optimization method since the initial ML package
implementation. The OWLQN variant is used only when L1 regularization is
specified (via the elasticNetParam). 2.2 adds the box constraints
(optimized using the LBFGS-B variant).

So no, no upgrade is required to use L-BFGS - if you've been using ML's
LogisticRegression estimator, you've been using the L-BFGS optimizer
already.

On Wed, 19 Jul 2017 at 16:09 Aseem Bansal  wrote:

> Hi
>
> I was reading the API of Spark 2.2.0 and noticed a change compared to 2.1.0
>
> Compared to
> https://spark.apache.org/docs/2.1.0/api/scala/index.html#org.apache.spark.ml.classification.LogisticRegression
> the 2.2.0 docs at
> https://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.ml.classification.LogisticRegression
> mention that "This class supports fitting traditional logistic regression
> model by LBFGS/OWLQN and bound (box) constrained logistic regression model
> by LBFGSB."
>
> I went through the release notes and found that bound box constrain was
> added in 2.2. I wanted to know whether LBFGS was the default in Spark
> 2.1.0. If not, can we use LBFGS in Spark 2.1.0 or do we have to upgrade to
> 2.2.0?
>


Re: Spark 2.1.1: A bug in org.apache.spark.ml.linalg.* when using VectorAssembler.scala

2017-07-13 Thread Nick Pentreath
There are Vector classes under ml.linalg package - And VectorAssembler and
other feature transformers all work with ml.linalg vectors.

If you try to use mllib.linalg vectors instead you will get an error as the
user defined type for SQL is not correct


On Thu, 13 Jul 2017 at 11:23,  wrote:

> Dear Developers:
>
> Here is a bug in org.apache.spark.ml.linalg.*:
> Class Vector, Vectors are not included in org.apache.spark.ml.linalg.*,
> but they are used in VectorAssembler.scala as follows:
>
> *import *org.apache.spark.ml.linalg.{Vector, Vectors, VectorUDT}
>
> Therefore, bug was reported when I was using VectorAssembler.
>
> Since org.apache.spark.mllib.linalg.* contains the class {Vector,
> Vectors, VectorUDT}, I rewrote VectorAssembler.scala as
> XVectorAssembler.scala by mainly changing "*import 
> *org.apache.spark.*ml*.linalg.{Vector,
> Vectors, VectorUDT}" to
> "*import *org.apache.spark.*mllib*.linalg.{Vector, Vectors, VectorUDT}"
>
> But bug occured as follows:
>
> " Column v must be of type org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7
> but was actually org.apache.spark.mllib.linalg.VectorUDT@f71b0bce "
>
> Would you please help fix the bug?
>
> Thank you very much!
>
> Best regards
> --xiongjunjie


Re: [PySpark]: How to store NumPy array into single DataFrame cell efficiently

2017-06-28 Thread Nick Pentreath
You will need to use PySpark vectors to store in a DataFrame. They can be
created from Numpy arrays as follows:

from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([("src1", "pkey1", 1, Vectors.dense(np.array([0,
1, 2])))])


On Wed, 28 Jun 2017 at 12:23 Judit Planas  wrote:

> Dear all,
>
> I am trying to store a NumPy array (loaded from an HDF5 dataset) into one
> cell of a DataFrame, but I am having problems.
>
> In short, my data layout is similar to a database, where I have a few
> columns with metadata (source of information, primary key, etc.) and the
> last column contains a NumPy array (each array will have hundreds to
> thousands of elements):
> ++---+-+---+
> | src| PrimaryKey| SecondaryKey|   Data|
> ++---+-+---+
> |  "src1"|"pkey1"|1| np.array([0., 1., 2.])|
> |  "src2"|"pkey1"|2| np.array([0., 1., 2.])|
> ++---+-+---+
>
> In my case, it is important to keep the NumPy array as it is (no
> transformation into Python list, etc.) because later on I will compute some
> statistics on the array, like the average of values. In addition, I expect
> to have thousands of rows (NumPy arrays), so I think trying to explode each
> array will generate too much duplicated metadata.
>
> I have successfully been able to load the data that I want into an RDD
> using the NumPy array object as it is. But I would like to use the
> DataFrame structure to leverage from the SQL functions.
>
> What I have been able to achieve so far is to store the raw data of the NP
> array doing the following:
> 1. Get the raw data of the NP array by calling "tobytes()" [
> https://docs.scipy.org/doc/numpy/reference/generated/numpy.ndarray.tobytes.html
> ]
> 2. Use "BinaryType" in the DF schema for the NP array
> 3. Call "np.frombuffer()" whenever I want to get the NP array back [
> https://docs.scipy.org/doc/numpy/reference/generated/numpy.frombuffer.html
> ]
>
> However, I feel this process is not optimal and it consumes a lot of
> worker memory. For example, if my data size is around 3 GB:
> - Loading all data into a DF and calling "cache()" method (within the same
> line) produces around 3 GB of memory consumed on the worker nodes.
> - However, loading all data into an RDD and calling "cache()" method
> (within the same line) produces around 500 MB of consumed on the worker
> nodes.
>
> From this, I understand that my data is highly compressible, so using an
> RDD is more memory-efficient than the DF ('spark.rdd.compress' is set to
> 'True' by default).
>
> In addition, what I see when I run queries on the data is that, in
> general, the RDD computes the query in less time than the DF. My hypothesis
> here is the following: since data must be exchanged between worker nodes in
> order to perform the queries, the RDD takes less time because data is
> compressed, so communication between workers takes less time.
>
> To summarize, I would like to use the DF structure due to its advantages
> (optimized scheduler, SQL support, etc.), but what I see from real
> performance measurements is that RDDs are much more efficient in my case
> (both execution time and memory consumption). So, I wonder if there is a
> better way to store NP arrays into a DF so that I can benefit from their
> advantages but at the same time they show the same good performance as RDDs.
>
> Regarding the environment, my Spark version is 2.0.1 with Python 3.5.2,
> but I am not restricted to use these versions. I am not tuning any special
> variable (using default values).
>
> Thanks in advance, and please, let me know if I forgot to mention any
> detail or you need further information.
>
> Kind regards,
> Judit
> - To
> unsubscribe e-mail: user-unsubscr...@spark.apache.org


Re: Question about mllib.recommendation.ALS

2017-06-08 Thread Nick Pentreath
Spark 2.2 will support the recommend-all methods in ML.

Also, both ML and MLLIB performance has been greatly improved for the
recommend-all methods.

Perhaps you could check out the current RC of Spark 2.2 or master branch to
try it out?

N

On Thu, 8 Jun 2017 at 17:18, Sahib Aulakh [Search] ­ <
sahibaul...@coupang.com> wrote:

> Many thanks for your response. I already figured out the details with some
> help from another forum.
>
>
>1. I was trying to predict ratings for all users and all products.
>This is inefficient and now I am trying to reduce the number of required
>predictions.
>2. There is a nice example buried in Spark source code which points
>out the usage of ML side ALS.
>
> Regards.
> Sahib Aulakh.
>
> On Wed, Jun 7, 2017 at 8:17 PM, Ryan  wrote:
>
>> 1. could you give job, stage & task status from Spark UI? I found it
>> extremely useful for performance tuning.
>>
>> 2. use modele.transform for predictions. Usually we have a pipeline for
>> preparing training data, and use the same pipeline to transform data you
>> want to predict could give us the prediction column.
>>
>> On Thu, Jun 1, 2017 at 7:48 AM, Sahib Aulakh [Search] ­ <
>> sahibaul...@coupang.com> wrote:
>>
>>> Hello:
>>>
>>> I am training the ALS model for recommendations. I have about 200m
>>> ratings from about 10m users and 3m products. I have a small cluster with
>>> 48 cores and 120gb cluster-wide memory.
>>>
>>> My code is very similar to the example code
>>>
>>> spark/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala
>>> code.
>>>
>>> I have a couple of questions:
>>>
>>>
>>>1. All steps up to model training runs reasonably fast. Model
>>>training is under 10 minutes for rank 20. However, the
>>>model.recommendProductsForUsers step is either slow or just does not work
>>>as the code just seems to hang at this point. I have tried user and 
>>> product
>>>blocks sizes of -1 and 20, 40, etc, played with executor memory size, 
>>> etc.
>>>Can someone shed some light here as to what could be wrong?
>>>2. Also, is there any example code for the ml.recommendation.ALS
>>>algorithm? I can figure out how to train the model but I don't understand
>>>(from the documentation) how to perform predictions?
>>>
>>> Thanks for any information you can provide.
>>> Sahib Aulakh.
>>>
>>>
>>> --
>>> Sahib Aulakh
>>> Sr. Principal Engineer
>>>
>>
>>
>
>
> --
> Sahib Aulakh
> Sr. Principal Engineer
>


Re: spark ML Recommender program

2017-05-18 Thread Nick Pentreath
It sounds like this may be the same as
https://issues.apache.org/jira/browse/SPARK-20402

On Thu, 18 May 2017 at 08:16 Nick Pentreath <nick.pentre...@gmail.com>
wrote:

> Could you try setting the checkpoint interval for ALS (try 3, 5 say) and
> see what the effect is?
>
>
>
> On Thu, 18 May 2017 at 07:32 Mark Vervuurt <m.a.vervu...@gmail.com> wrote:
>
>> If you are running locally try increasing driver memory to for example 4G
>> en executor memory to 3G.
>> Regards, Mark
>>
>> On 18 May 2017, at 05:15, Arun <arunbm...@gmail.com> wrote:
>>
>> hi
>>
>> I am writing spark ML Movie Recomender program on Intelij on windows10
>> Dataset is 2MB with 10 datapoints, My Laptop has 8gb Memory
>>
>> When I set number of iteration 10 works fine
>> When I set number of Iteration 20 I get StackOverFlow error..
>> Whats the solution?..
>>
>> thanks
>>
>>
>>
>>
>>
>> Sent from Samsung tablet
>>
>>
>>
>>
>>
>>


Re: spark ML Recommender program

2017-05-18 Thread Nick Pentreath
Could you try setting the checkpoint interval for ALS (try 3, 5 say) and
see what the effect is?



On Thu, 18 May 2017 at 07:32 Mark Vervuurt  wrote:

> If you are running locally try increasing driver memory to for example 4G
> en executor memory to 3G.
> Regards, Mark
>
> On 18 May 2017, at 05:15, Arun  wrote:
>
> hi
>
> I am writing spark ML Movie Recomender program on Intelij on windows10
> Dataset is 2MB with 10 datapoints, My Laptop has 8gb Memory
>
> When I set number of iteration 10 works fine
> When I set number of Iteration 20 I get StackOverFlow error..
> Whats the solution?..
>
> thanks
>
>
>
>
>
> Sent from Samsung tablet
>
>
>
>
>
>


Re: ElasticSearch Spark error

2017-05-15 Thread Nick Pentreath
It may be best to ask on the elasticsearch-Hadoop github project

On Mon, 15 May 2017 at 13:19, nayan sharma  wrote:

> Hi All,
>
> *ERROR:-*
>
> *Caused by: org.apache.spark.util.TaskCompletionListenerException:
> Connection error (check network and/or proxy settings)- all nodes failed;
> tried [[10.0.1.8*:9200, 10.0.1.**:9200, 10.0.1.***:9200]]*
>
> I am getting this error while trying to show the dataframe.
>
> df.count =5190767 and df.printSchema both are working fine.
> It has 329 columns.
>
> Do any one have any idea regarding this.Please help me to fix this.
>
>
> Thanks,
> Nayan
>
>
>
>


Re: pyspark vector

2017-04-25 Thread Nick Pentreath
Well the 3 in this case is the size of the sparse vector. This equates to
the number of features, which for CountVectorizer (I assume that's what
you're using) is also vocab size (number of unique terms).

On Tue, 25 Apr 2017 at 04:06 Peyman Mohajerian  wrote:

> setVocabSize
>
>
> On Mon, Apr 24, 2017 at 5:36 PM, Zeming Yu  wrote:
>
>> Hi all,
>>
>> Beginner question:
>>
>> what does the 3 mean in the (3,[0,1,2],[1.0,1.0,1.0])?
>>
>> https://spark.apache.org/docs/2.1.0/ml-features.html
>>
>>  id | texts   | vector
>> |-|---
>>  0  | Array("a", "b", "c")| (3,[0,1,2],[1.0,1.0,1.0])
>>  1  | Array("a", "b", "b", "c", "a")  | (3,[0,1,2],[2.0,2.0,1.0])
>>
>>
>


Re: How to convert Spark MLlib vector to ML Vector?

2017-04-09 Thread Nick Pentreath
Why not use the RandomForest from Spark ML?

On Sun, 9 Apr 2017 at 16:01, Md. Rezaul Karim <
rezaul.ka...@insight-centre.org> wrote:

> I have already posted this question to the StackOverflow
> .
> However, not getting any response from someone else. I'm trying to use
> RandomForest algorithm for the classification after applying the PCA
> technique since the dataset is pretty high-dimensional. Here's my source
> code:
>
> import org.apache.spark.mllib.util.MLUtils
> import org.apache.spark.mllib.tree.RandomForest
> import org.apache.spark.mllib.tree.model.RandomForestModel
> import org.apache.spark.mllib.regression.LabeledPoint
> import org.apache.spark.ml.linalg.{Vectors, VectorUDT}
> import org.apache.spark.sql._
> import org.apache.spark.sql.SQLContext
> import org.apache.spark.sql.SparkSession
>
> import org.apache.spark.ml.feature.PCA
> import org.apache.spark.rdd.RDD
>
> object PCAExample {
>   def main(args: Array[String]): Unit = {
> val spark = SparkSession
>   .builder
>   .master("local[*]")
>   .config("spark.sql.warehouse.dir", "E:/Exp/")
>   .appName(s"OneVsRestExample")
>   .getOrCreate()
>
> val dataset = MLUtils.loadLibSVMFile(spark.sparkContext, "data/mnist.bz2")
>
> val splits = dataset.randomSplit(Array(0.7, 0.3), seed = 12345L)
> val (trainingData, testData) = (splits(0), splits(1))
>
> val sqlContext = new SQLContext(spark.sparkContext)
> import sqlContext.implicits._
> val trainingDF = trainingData.toDF("label", "features")
>
> val pca = new PCA()
>   .setInputCol("features")
>   .setOutputCol("pcaFeatures")
>   .setK(100)
>   .fit(trainingDF)
>
> val pcaTrainingData = pca.transform(trainingDF)
> //pcaTrainingData.show()
>
> val labeled = pca.transform(trainingDF).rdd.map(row => LabeledPoint(
>   row.getAs[Double]("label"),
>   row.getAs[org.apache.spark.mllib.linalg.Vector]("pcaFeatures")))
>
> //val labeled = pca.transform(trainingDF).rdd.map(row => 
> LabeledPoint(row.getAs[Double]("label"),
> //  
> Vector.fromML(row.getAs[org.apache.spark.ml.linalg.SparseVector]("features"
>
> val numClasses = 10
> val categoricalFeaturesInfo = Map[Int, Int]()
> val numTrees = 10 // Use more in practice.
> val featureSubsetStrategy = "auto" // Let the algorithm choose.
> val impurity = "gini"
> val maxDepth = 20
> val maxBins = 32
>
> val model = RandomForest.trainClassifier(labeled, numClasses, 
> categoricalFeaturesInfo,
>   numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins)
>   }
> }
>
> However, I'm getting the following error:
>
> *Exception in thread "main" java.lang.IllegalArgumentException:
> requirement failed: Column features must be of type
> org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7 but was actually
> org.apache.spark.mllib.linalg.VectorUDT@f71b0bce.*
>
> What am I doing wrong in my code?  Actually, I'm getting the above
> exception in this line:
>
> val pca = new PCA()
>   .setInputCol("features")
>   .setOutputCol("pcaFeatures")
>   .setK(100)
>   .fit(trainingDF) /// GETTING EXCEPTION HERE
>
> Please, someone, help me to solve the problem.
>
>
>
>
>
> Kind regards,
> *Md. Rezaul Karim*
>


Re: Spark 2.1 ml library scalability

2017-04-07 Thread Nick Pentreath
It's true that CrossValidator is not parallel currently - see
https://issues.apache.org/jira/browse/SPARK-19357 and feel free to help
review.

On Fri, 7 Apr 2017 at 14:18 Aseem Bansal <asmbans...@gmail.com> wrote:

>
>- Limited the data to 100,000 records.
>- 6 categorical feature which go through imputation, string indexing,
>one hot encoding. The maximum classes for the feature is 100. As data is
>imputated it becomes dense.
>- 1 numerical feature.
>- Training Logistic Regression through CrossValidation with grid to
>optimize its regularization parameter over the values 0.0001, 0.001, 0.005,
>0.01, 0.05, 0.1
>- Using spark's launcher api to launch it on a yarn cluster in Amazon
>AWS.
>
> I was thinking that as CrossValidator is finding the best parameters it
> should be able to run them independently. That sounds like something which
> could be ran in parallel.
>
>
> On Fri, Apr 7, 2017 at 5:20 PM, Nick Pentreath <nick.pentre...@gmail.com>
> wrote:
>
> What is the size of training data (number examples, number features)?
> Dense or sparse features? How many classes?
>
> What commands are you using to submit your job via spark-submit?
>
> On Fri, 7 Apr 2017 at 13:12 Aseem Bansal <asmbans...@gmail.com> wrote:
>
> When using spark ml's LogisticRegression, RandomForest, CrossValidator
> etc. do we need to give any consideration while coding in making it scale
> with more CPUs or does it scale automatically?
>
> I am reading some data from S3, using a pipeline to train a model. I am
> running the job on a spark cluster with 36 cores and 60GB RAM and I cannot
> see much usage. It is running but I was expecting spark to use all RAM
> available and make it faster. So that's why I was thinking whether we need
> to take something particular in consideration or wrong expectations?
>
>
>


Re: Spark 2.1 ml library scalability

2017-04-07 Thread Nick Pentreath
What is the size of training data (number examples, number features)? Dense
or sparse features? How many classes?

What commands are you using to submit your job via spark-submit?

On Fri, 7 Apr 2017 at 13:12 Aseem Bansal  wrote:

> When using spark ml's LogisticRegression, RandomForest, CrossValidator
> etc. do we need to give any consideration while coding in making it scale
> with more CPUs or does it scale automatically?
>
> I am reading some data from S3, using a pipeline to train a model. I am
> running the job on a spark cluster with 36 cores and 60GB RAM and I cannot
> see much usage. It is running but I was expecting spark to use all RAM
> available and make it faster. So that's why I was thinking whether we need
> to take something particular in consideration or wrong expectations?
>


Re: Collaborative filtering steps in spark

2017-03-29 Thread Nick Pentreath
No, it does a random initialization. It does use a slightly different
approach from pure normal random - it chooses non-negative draws which
results in very slightly better results empirically.

In practice I'm not sure if the average rating approach will make a big
difference (it's been a long while since I read the paper!)

Sean put the absolute value init stuff in originally if I recall so may
have more context.

Though in fact looking at the code now, I see the comment still says that,
but I'm not convinced the code actually does it:

/**
 * Initializes factors randomly given the in-link blocks.
 *
 * @param inBlocks in-link blocks
 * @param rank rank
 * @return initialized factor blocks
 */
private def initialize[ID](
inBlocks: RDD[(Int, InBlock[ID])],
rank: Int,
seed: Long): RDD[(Int, FactorBlock)] = {
  // Choose a unit vector uniformly at random from the unit sphere, but from the
  // "first quadrant" where all elements are nonnegative. This can be
done by choosing
  // elements distributed as Normal(0,1) and taking the absolute
value, and then normalizing.
  // This appears to create factorizations that have a slightly better
reconstruction
  // (<1%) compared picking elements uniformly at random in [0,1].
  inBlocks.map { case (srcBlockId, inBlock) =>
val random = new XORShiftRandom(byteswap64(seed ^ srcBlockId))
val factors = Array.fill(inBlock.srcIds.length) {
  val factor = Array.fill(rank)(random.nextGaussian().toFloat)
  val nrm = blas.snrm2(rank, factor, 1)
  blas.sscal(rank, 1.0f / nrm, factor, 1)
  factor
}
(srcBlockId, factors)
  }
}


factor is ~ N(0, 1) and then scaled by the L2 norm, but it looks to me the
abs value is never taken before scaling which is what the comment
indicates...


On Mon, 27 Mar 2017 at 00:55 chris snow  wrote:

> In the paper “Large-Scale Parallel Collaborative Filtering for the
> Netflix Prize”, the following steps are described for ALS:
>
> Step 1 Initialize matrix M by assigning the average rating for that
> movie as the first row, and
> small random numbers for the remaining entries.
> Step 2 Fix M, Solve U by minimizing the objective function (the sum of
> squared errors);
> Step 3 Fix U, solve M by minimizing the objective function similarly;
> Step 4 Repeat Steps 2 and 3 until a stopping criterion is satisfied.
>
> Does spark take the average rating for the movie as the first row?
> I've looked through the source code, but I can't see the average
> rating being calculated for the movie.
>
> Many thanks,
>
> Chris
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Collaborative Filtering - scaling of the regularization parameter

2017-03-23 Thread Nick Pentreath
I usually advocate a JIRA even for small stuff but for doc only change like
this it's ok to submit a PR directly with [MINOR] in title.


On Thu, 23 Mar 2017 at 06:55, chris snow <chsnow...@gmail.com> wrote:

> Thanks Nick.  If this will help other users, I'll create a JIRA and
> send a patch.
>
> On 23 March 2017 at 13:49, Nick Pentreath <nick.pentre...@gmail.com>
> wrote:
> > Yup, that is true and a reasonable clarification of the doc.
> >
> > On Thu, 23 Mar 2017 at 00:03 chris snow <chsnow...@gmail.com> wrote:
> >>
> >> The documentation for collaborative filtering is as follows:
> >>
> >> ===
> >> Scaling of the regularization parameter
> >>
> >> Since v1.1, we scale the regularization parameter lambda in solving
> >> each least squares problem by the number of ratings the user generated
> >> in updating user factors, or the number of ratings the product
> >> received in updating product factors.
> >> ===
> >>
> >> I find this description confusing, probably because I lack a detailed
> >> understanding of ALS.   The wording suggest that the number of ratings
> >> change ("generated", "received") during solving the least squares.
> >>
> >> This is how I think I should be interpreting the description:
> >>
> >> ===
> >> Since v1.1, we scale the regularization parameter lambda when solving
> >> each least squares problem.  When updating the user factors, we scale
> >> the regularization parameter by the total number of ratings from the
> >> user.  Similarly, when updating the product factors, we scale the
> >> regularization parameter by the total number of ratings for the
> >> product.
> >> ===
> >>
> >> Have I understood this correctly?
> >>
> >> -
> >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >>
> >
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Collaborative Filtering - scaling of the regularization parameter

2017-03-23 Thread Nick Pentreath
Yup, that is true and a reasonable clarification of the doc.

On Thu, 23 Mar 2017 at 00:03 chris snow  wrote:

> The documentation for collaborative filtering is as follows:
>
> ===
> Scaling of the regularization parameter
>
> Since v1.1, we scale the regularization parameter lambda in solving
> each least squares problem by the number of ratings the user generated
> in updating user factors, or the number of ratings the product
> received in updating product factors.
> ===
>
> I find this description confusing, probably because I lack a detailed
> understanding of ALS.   The wording suggest that the number of ratings
> change ("generated", "received") during solving the least squares.
>
> This is how I think I should be interpreting the description:
>
> ===
> Since v1.1, we scale the regularization parameter lambda when solving
> each least squares problem.  When updating the user factors, we scale
> the regularization parameter by the total number of ratings from the
> user.  Similarly, when updating the product factors, we scale the
> regularization parameter by the total number of ratings for the
> product.
> ===
>
> Have I understood this correctly?
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Contributing to Spark

2017-03-19 Thread Nick Pentreath
If you have experience and interest in Python then PySpark is a good area
to look into.

Yes, adding things like tests & documentation is a good starting point.
Start out relatively small and go from there. Adding new wrappers to python
for ML is useful for slightly larger tasks.




On Mon, 20 Mar 2017 at 02:39, Sam Elamin  wrote:

> Hi All,
>
> I would like to start contributing to Spark if possible, its an amazing
> technology and I would love to get involved
>
>
> The contributing page  states
> this "consult the list of starter tasks in JIRA, or ask the
> user@spark.apache.org mailing list."
>
>
> Can anyone guide me on where is best to start? What are these "starter
> tasks"?
>
> I was thinking adding tests would be a good place to begin when dealing
> with any new code base, perhaps to Pyspark since Scala seems a bit more
> stable
>
>
> Also - if at all possible - I would really appreciate if any of the
> contributors or PMC members would be willing to mentor or guide me in this.
> Any help would be greatly appreciated!
>
>
> Regards
> Sam
>
>
>


Re: Check if dataframe is empty

2017-03-07 Thread Nick Pentreath
I believe take on an empty dataset will return an empty Array rather than
throw an exception.

df.take(1).isEmpty should work

On Tue, 7 Mar 2017 at 07:42, Deepak Sharma  wrote:

> If the df is empty , the .take would return
> java.util.NoSuchElementException.
> This can be done as below:
> df.rdd.isEmpty
>
>
> On Tue, Mar 7, 2017 at 9:33 AM,  wrote:
>
> Dataframe.take(1) is faster.
>
>
>
> *From:* ashaita...@nz.imshealth.com [mailto:ashaita...@nz.imshealth.com]
> *Sent:* Tuesday, March 07, 2017 9:22 AM
> *To:* user@spark.apache.org
> *Subject:* Check if dataframe is empty
>
>
>
> Hello!
>
>
>
> I am pretty sure that I am asking something which has been already asked
> lots of times. However, I cannot find the question in the mailing list
> archive.
>
>
>
> The question is – I need to check whether dataframe is empty or not. I
> receive a dataframe from 3rd party library and this dataframe can be
> potentially empty, but also can be really huge – millions of rows. Thus, I
> want to avoid of doing some logic in case the dataframe is empty. How can I
> efficiently check it?
>
>
>
> Right now I am doing it in the following way:
>
>
>
> *private def *isEmpty(df: Option[DataFrame]): Boolean = {
>   df.isEmpty || (df.isDefined && df.get.limit(1).*rdd*.isEmpty())
> }
>
>
>
> But the performance is really slow for big dataframes. I would be grateful
> for any suggestions.
>
>
>
> Thank you in advance.
>
>
>
>
> Best regards,
>
>
>
> Artem
>
>
> --
>
> ** IMPORTANT--PLEASE READ 
> This electronic message, including its attachments, is CONFIDENTIAL and may
> contain PROPRIETARY or LEGALLY PRIVILEGED or PROTECTED information and is
> intended for the authorized recipient of the sender. If you are not the
> intended recipient, you are hereby notified that any use, disclosure,
> copying, or distribution of this message or any of the information included
> in it is unauthorized and strictly prohibited. If you have received this
> message in error, please immediately notify the sender by reply e-mail and
> permanently delete this message and its attachments, along with any copies
> thereof, from all locations received (e.g., computer, mobile device, etc.).
> Thank you.
> 
>
> --
>
> This message is for the designated recipient only and may contain
> privileged, proprietary, or otherwise confidential information. If you have
> received it in error, please notify the sender immediately and delete the
> original. Any other use of the e-mail by you is prohibited. Where allowed
> by local law, electronic communications with Accenture and its affiliates,
> including e-mail and instant messaging (including content), may be scanned
> by our systems for the purposes of information security and assessment of
> internal compliance with Accenture policy.
>
> __
>
> www.accenture.com
>
>
>
>
> --
> Thanks
> Deepak
> www.bigdatabig.com
> www.keosha.net
>


Re: Practical configuration to run LSH in Spark 2.1.0

2017-02-22 Thread Nick Pentreath
And to be clear, are you doing a self-join for approx similarity? Or
joining to another dataset?



On Thu, 23 Feb 2017 at 02:01, nguyen duc Tuan <newvalu...@gmail.com> wrote:

> Hi Seth,
> Here's the parameters that I used in my experiments.
> - Number of executors: 16
> - Executor's memories: vary from 1G -> 2G -> 3G
> - Number of cores per executor: 1-> 2
> - Driver's memory:  1G -> 2G -> 3G
> - The similar threshold: 0.6
> MinHash:
> - number of hash tables: 2
> SignedRandomProjection:
> - Number of hash tables: 2
>
> 2017-02-23 0:13 GMT+07:00 Seth Hendrickson <seth.hendrickso...@gmail.com>:
>
> I'm looking into this a bit further, thanks for bringing it up! Right now
> the LSH implementation only uses OR-amplification. The practical
> consequence of this is that it will select too many candidates when doing
> approximate near neighbor search and approximate similarity join. When we
> add AND-amplification I think it will become significantly more usable. In
> the meantime, I will also investigate scalability issues.
>
> Can you please provide every parameter you used? It will be very helfpul
> :) For instance, the similarity threshold, the number of hash tables, the
> bucket width, etc...
>
> Thanks!
>
> On Mon, Feb 13, 2017 at 3:21 PM, Nick Pentreath <nick.pentre...@gmail.com>
> wrote:
>
> The original Uber authors provided this performance test result:
> https://docs.google.com/document/d/19BXg-67U83NVB3M0I84HVBVg3baAVaESD_mrg_-vLro
>
> This was for MinHash only though, so it's not clear about what the
> scalability is for the other metric types.
>
> The SignRandomProjectionLSH is not yet in Spark master (see
> https://issues.apache.org/jira/browse/SPARK-18082). It could be there are
> some implementation details that would make a difference here.
>
> By the way, what is the join threshold you use in approx join?
>
> Could you perhaps create a JIRA ticket with the details in order to track
> this?
>
>
> On Sun, 12 Feb 2017 at 22:52 nguyen duc Tuan <newvalu...@gmail.com> wrote:
>
> After all, I switched back to LSH implementation that I used before (
> https://github.com/karlhigley/spark-neighbors ). I can run on my dataset
> now. If someone has any suggestion, please tell me.
> Thanks.
>
> 2017-02-12 9:25 GMT+07:00 nguyen duc Tuan <newvalu...@gmail.com>:
>
> Hi Timur,
> 1) Our data is transformed to dataset of Vector already.
> 2) If I use RandomSignProjectLSH, the job dies after I call
> approximateSimilarJoin. I tried to use Minhash instead, the job is still
> slow. I don't thinks the problem is related to the GC. The time for GC is
> small compare with the time for computation. Here is some screenshots of my
> job.
> Thanks
>
> 2017-02-12 8:01 GMT+07:00 Timur Shenkao <t...@timshenkao.su>:
>
> Hello,
>
> 1) Are you sure that your data is "clean"?  No unexpected missing values?
> No strings in unusual encoding? No additional or missing columns ?
> 2) How long does your job run? What about garbage collector parameters?
> Have you checked what happens with jconsole / jvisualvm ?
>
> Sincerely yours, Timur
>
> On Sat, Feb 11, 2017 at 12:52 AM, nguyen duc Tuan <newvalu...@gmail.com>
> wrote:
>
> Hi Nick,
> Because we use *RandomSignProjectionLSH*, there is only one parameter for
> LSH is the number of hashes. I try with small number of hashes (2) but the
> error is still happens. And it happens when I call similarity join. After
> transformation, the size of  dataset is about 4G.
>
> 2017-02-11 3:07 GMT+07:00 Nick Pentreath <nick.pentre...@gmail.com>:
>
> What other params are you using for the lsh transformer?
>
> Are the issues occurring during transform or during the similarity join?
>
>
> On Fri, 10 Feb 2017 at 05:46, nguyen duc Tuan <newvalu...@gmail.com>
> wrote:
>
> hi Das,
> In general, I will apply them to larger datasets, so I want to use LSH,
> which is more scaleable than the approaches as you suggested. Have you
> tried LSH in Spark 2.1.0 before ? If yes, how do you set the
> parameters/configuration to make it work ?
> Thanks.
>
> 2017-02-10 19:21 GMT+07:00 Debasish Das <debasish.da...@gmail.com>:
>
> If it is 7m rows and 700k features (or say 1m features) brute force row
> similarity will run fine as well...check out spark-4823...you can compare
> quality with approximate variant...
> On Feb 9, 2017 2:55 AM, "nguyen duc Tuan" <newvalu...@gmail.com> wrote:
>
> Hi everyone,
> Since spark 2.1.0 introduces LSH (
> http://spark.apache.org/docs/latest/ml-features.html#locality-sensitive-hashing),
> we want to use LSH to find approximately nearest neighbors. Basica

Re: Practical configuration to run LSH in Spark 2.1.0

2017-02-13 Thread Nick Pentreath
The original Uber authors provided this performance test result:
https://docs.google.com/document/d/19BXg-67U83NVB3M0I84HVBVg3baAVaESD_mrg_-vLro

This was for MinHash only though, so it's not clear about what the
scalability is for the other metric types.

The SignRandomProjectionLSH is not yet in Spark master (see
https://issues.apache.org/jira/browse/SPARK-18082). It could be there are
some implementation details that would make a difference here.

By the way, what is the join threshold you use in approx join?

Could you perhaps create a JIRA ticket with the details in order to track
this?


On Sun, 12 Feb 2017 at 22:52 nguyen duc Tuan <newvalu...@gmail.com> wrote:

> After all, I switched back to LSH implementation that I used before (
> https://github.com/karlhigley/spark-neighbors ). I can run on my dataset
> now. If someone has any suggestion, please tell me.
> Thanks.
>
> 2017-02-12 9:25 GMT+07:00 nguyen duc Tuan <newvalu...@gmail.com>:
>
> Hi Timur,
> 1) Our data is transformed to dataset of Vector already.
> 2) If I use RandomSignProjectLSH, the job dies after I call
> approximateSimilarJoin. I tried to use Minhash instead, the job is still
> slow. I don't thinks the problem is related to the GC. The time for GC is
> small compare with the time for computation. Here is some screenshots of my
> job.
> Thanks
>
> 2017-02-12 8:01 GMT+07:00 Timur Shenkao <t...@timshenkao.su>:
>
> Hello,
>
> 1) Are you sure that your data is "clean"?  No unexpected missing values?
> No strings in unusual encoding? No additional or missing columns ?
> 2) How long does your job run? What about garbage collector parameters?
> Have you checked what happens with jconsole / jvisualvm ?
>
> Sincerely yours, Timur
>
> On Sat, Feb 11, 2017 at 12:52 AM, nguyen duc Tuan <newvalu...@gmail.com>
> wrote:
>
> Hi Nick,
> Because we use *RandomSignProjectionLSH*, there is only one parameter for
> LSH is the number of hashes. I try with small number of hashes (2) but the
> error is still happens. And it happens when I call similarity join. After
> transformation, the size of  dataset is about 4G.
>
> 2017-02-11 3:07 GMT+07:00 Nick Pentreath <nick.pentre...@gmail.com>:
>
> What other params are you using for the lsh transformer?
>
> Are the issues occurring during transform or during the similarity join?
>
>
> On Fri, 10 Feb 2017 at 05:46, nguyen duc Tuan <newvalu...@gmail.com>
> wrote:
>
> hi Das,
> In general, I will apply them to larger datasets, so I want to use LSH,
> which is more scaleable than the approaches as you suggested. Have you
> tried LSH in Spark 2.1.0 before ? If yes, how do you set the
> parameters/configuration to make it work ?
> Thanks.
>
> 2017-02-10 19:21 GMT+07:00 Debasish Das <debasish.da...@gmail.com>:
>
> If it is 7m rows and 700k features (or say 1m features) brute force row
> similarity will run fine as well...check out spark-4823...you can compare
> quality with approximate variant...
> On Feb 9, 2017 2:55 AM, "nguyen duc Tuan" <newvalu...@gmail.com> wrote:
>
> Hi everyone,
> Since spark 2.1.0 introduces LSH (
> http://spark.apache.org/docs/latest/ml-features.html#locality-sensitive-hashing),
> we want to use LSH to find approximately nearest neighbors. Basically, We
> have dataset with about 7M rows. we want to use cosine distance to meassure
> the similarity between items, so we use *RandomSignProjectionLSH* (
> https://gist.github.com/tuan3w/c968e56ea8ef135096eeedb08af097db) instead
> of *BucketedRandomProjectionLSH*. I try to tune some configurations such
> as serialization, memory fraction, executor memory (~6G), number of
> executors ( ~20), memory overhead ..., but nothing works. I often get error
> "java.lang.OutOfMemoryError: Java heap space" while running. I know that
> this implementation is done by engineer at Uber but I don't know right
> configurations,.. to run the algorithm at scale. Do they need very big
> memory to run it?
>
> Any help would be appreciated.
> Thanks
>
>
>
>
>
>
>


Re: Practical configuration to run LSH in Spark 2.1.0

2017-02-10 Thread Nick Pentreath
What other params are you using for the lsh transformer?

Are the issues occurring during transform or during the similarity join?


On Fri, 10 Feb 2017 at 05:46, nguyen duc Tuan  wrote:

> hi Das,
> In general, I will apply them to larger datasets, so I want to use LSH,
> which is more scaleable than the approaches as you suggested. Have you
> tried LSH in Spark 2.1.0 before ? If yes, how do you set the
> parameters/configuration to make it work ?
> Thanks.
>
> 2017-02-10 19:21 GMT+07:00 Debasish Das :
>
> If it is 7m rows and 700k features (or say 1m features) brute force row
> similarity will run fine as well...check out spark-4823...you can compare
> quality with approximate variant...
> On Feb 9, 2017 2:55 AM, "nguyen duc Tuan"  wrote:
>
> Hi everyone,
> Since spark 2.1.0 introduces LSH (
> http://spark.apache.org/docs/latest/ml-features.html#locality-sensitive-hashing),
> we want to use LSH to find approximately nearest neighbors. Basically, We
> have dataset with about 7M rows. we want to use cosine distance to meassure
> the similarity between items, so we use *RandomSignProjectionLSH* (
> https://gist.github.com/tuan3w/c968e56ea8ef135096eeedb08af097db) instead
> of *BucketedRandomProjectionLSH*. I try to tune some configurations such
> as serialization, memory fraction, executor memory (~6G), number of
> executors ( ~20), memory overhead ..., but nothing works. I often get error
> "java.lang.OutOfMemoryError: Java heap space" while running. I know that
> this implementation is done by engineer at Uber but I don't know right
> configurations,.. to run the algorithm at scale. Do they need very big
> memory to run it?
>
> Any help would be appreciated.
> Thanks
>
>
>


Re: ML PIC

2017-01-16 Thread Nick Pentreath
The JIRA for this is here: https://issues.apache.org/jira/browse/SPARK-15784

There is a PR open already for it, which still needs to be reviewed.



On Wed, 21 Dec 2016 at 18:01 Robert Hamilton <robert_b_hamil...@icloud.com>
wrote:

> Thank you Nick that is good to know.
>
> Would this have some opportunity for newbs (like me) to volunteer some
> time?
>
> Sent from my iPhone
>
> On Dec 21, 2016, at 9:08 AM, Nick Pentreath <nick.pentre...@gmail.com>
> wrote:
>
> It is part of the general feature parity roadmap. I can't recall offhand
> any blocker reasons it's just resources
> On Wed, 21 Dec 2016 at 17:05, Robert Hamilton <
> robert_b_hamil...@icloud.com> wrote:
>
> Hi all.  Is it on the roadmap to have an
> Spark.ml.clustering.PowerIterationClustering? Are there technical reasons
> that there is currently only an .mllib version?
>
>
> Sent from my iPhone
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: ML PIC

2016-12-21 Thread Nick Pentreath
It is part of the general feature parity roadmap. I can't recall offhand
any blocker reasons it's just resources
On Wed, 21 Dec 2016 at 17:05, Robert Hamilton 
wrote:

> Hi all.  Is it on the roadmap to have an
> Spark.ml.clustering.PowerIterationClustering? Are there technical reasons
> that there is currently only an .mllib version?
>
>
> Sent from my iPhone
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Issue in using DenseVector in RowMatrix, error could be due to ml and mllib package changes

2016-12-08 Thread Nick Pentreath
Yes most likely due to hashing tf returns ml vectors while you need mllib
vectors for row matrix.

I'd recommend using the vector conversion utils (I think in
mllib.linalg.Vectors but I'm on mobile right now so can't recall exactly).
There are until methods for converting single vectors as well as vector
rows of DF. Check the mllib user guide for 2.0 for details.
On Fri, 9 Dec 2016 at 04:42, satyajit vegesna 
wrote:

> Hi All,
>
> PFB code.
>
>
> import org.apache.spark.ml.feature.{HashingTF, IDF}
> import org.apache.spark.ml.linalg.SparseVector
> import org.apache.spark.mllib.linalg.distributed.RowMatrix
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.{SparkConf, SparkContext}
>
> /**
>   * Created by satyajit on 12/7/16.
>   */
> object DIMSUMusingtf extends App {
>
>   val conf = new SparkConf()
> .setMaster("local[1]")
> .setAppName("testColsim")
>   val sc = new SparkContext(conf)
>   val spark = SparkSession
> .builder
> .appName("testColSim").getOrCreate()
>
>   import org.apache.spark.ml.feature.Tokenizer
>
>   val sentenceData = spark.createDataFrame(Seq(
> (0, "Hi I heard about Spark"),
> (0, "I wish Java could use case classes"),
> (1, "Logistic regression models are neat")
>   )).toDF("label", "sentence")
>
>   val tokenizer = new 
> Tokenizer().setInputCol("sentence").setOutputCol("words")
>
>   val wordsData = tokenizer.transform(sentenceData)
>
>
>   val hashingTF = new HashingTF()
> .setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(20)
>
>   val featurizedData = hashingTF.transform(wordsData)
>
>
>   val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
>   val idfModel = idf.fit(featurizedData)
>   val rescaledData = idfModel.transform(featurizedData)
>   rescaledData.show()
>   rescaledData.select("features", "label").take(3).foreach(println)
>   val check = rescaledData.select("features")
>
>   val row = check.rdd.map(row => row.getAs[SparseVector]("features"))
>
>   val mat = new RowMatrix(row) //i am basically trying to use Dense.vector as 
> a direct input to
>
> rowMatrix, but i get an error that RowMatrix Cannot resolve constructor
>
>   row.foreach(println)
> }
>
> Any help would be appreciated.
>
> Regards,
> Satyajit.
>
>
>
>


Re: how to print auc & prc for GBTClassifier, which is okay for RandomForestClassifier

2016-11-28 Thread Nick Pentreath
This is because currently GBTClassifier doesn't extend the
ClassificationModel abstract class, which in turn has the rawPredictionCol
and related methods for generating that column.

I'm actually not sure off hand whether this was because the GBT
implementation could not produce the raw prediction value, or due to
waiting for future multi-class support before implementing all the
classifier methods.


On Sun, 27 Nov 2016 at 19:52 Zhiliang Zhu 
wrote:

>
> Hi All,
>
> I need to print auc and prc for GBTClassifier model, it seems okay for
> RandomForestClassifier but not GBTClassifier, though rawPrediction column
> is neither in original data.
>
> the codes are :
>
> ..
> // Set up Pipeline
> val stages = new mutable.ArrayBuffer[PipelineStage]()
>
> val labelColName = if (algo == "GBTClassification") "indexedLabel"
> else "label"
> if (algo == "GBTClassification") {
>   val labelIndexer = new StringIndexer()
> .setInputCol("label")
> .setOutputCol(labelColName)
>   stages += labelIndexer
> }
>
> val rawFeatureSize =
> data.select("rawFeatures").first().toString().split(",").length;
> var indices : Array[Int] = new Array[Int](rawFeatureSize);
> for (i <- 0 until rawFeatureSize) {
> indices(i) = i;
> }
> val featuresSlicer = new VectorSlicer()
>   .setInputCol("rawFeatures")
>   .setOutputCol("features")
>   .setIndices(indices)
> stages += featuresSlicer
>
> val dt = algo match {
>
> // THE PROBLEM IS HERE:
>
> //GBTClassifier will not work, error is that field rawPrediction is not
> there, which appeared in the last line of code as pipeline.fit(data)
> //however, the similar codes are okay for RandomForestClassifier
> //in fact, rawPrediction column seems not in original data, but generated
> in BinaryClassificationEvaluator pipelineModel by auto
>
>   case "GBTClassification" =>
> new GBTClassifier()
>   .setFeaturesCol("features")
>   .setLabelCol(labelColName)
>   .setLabelCol(labelColName)
>   case _ => throw new IllegalArgumentException("Algo ${params.algo}
> not supported.")
> }
>
> val grid = new ParamGridBuilder()
>   .addGrid(dt.maxDepth, Array(1))
>   .addGrid(dt.subsamplingRate, Array(0.5))
>   .build()
> val cv = new CrossValidator()
>   .setEstimator(dt)
>   .setEstimatorParamMaps(grid)
>   .setEvaluator((new BinaryClassificationEvaluator))
>   .setNumFolds(6)
> stages += cv
>
> val pipeline = new Pipeline().setStages(stages.toArray)
>
> // Fit the Pipeline
> val pipelineModel = pipeline.fit(data)
> 
>
> Thanks in advance ~~
>
> Zhiliang
>
>
>


Re: scala.MatchError while doing BinaryClassificationMetrics

2016-11-14 Thread Nick Pentreath
Typically you pass in the result of a model transform to the evaluator.

So:
val model = estimator.fit(data)
val auc = evaluator.evaluate(model.transform(testData)

Check Scala API docs for some details:
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.ml.evaluation.BinaryClassificationEvaluator

On Mon, 14 Nov 2016 at 20:02 Bhaarat Sharma <bhaara...@gmail.com> wrote:

Can you please suggest how I can use BinaryClassificationEvaluator? I tried:

scala> import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator

scala>  val evaluator = new BinaryClassificationEvaluator()
evaluator: org.apache.spark.ml.evaluation.BinaryClassificationEvaluator =
binEval_0d57372b7579

Try 1:

scala> evaluator.evaluate(testScoreAndLabel.rdd)
:105: error: type mismatch;
 found   : org.apache.spark.rdd.RDD[(Double, Double)]
 required: org.apache.spark.sql.Dataset[_]
   evaluator.evaluate(testScoreAndLabel.rdd)

Try 2:

scala> evaluator.evaluate(testScoreAndLabel)
java.lang.IllegalArgumentException: Field "rawPrediction" does not exist.
  at
org.apache.spark.sql.types.StructType$$anonfun$apply$1.apply(StructType.scala:228)

Try 3:

scala>
evaluator.evaluate(testScoreAndLabel.select("Label","ModelProbability"))
org.apache.spark.sql.AnalysisException: cannot resolve '`Label`' given
input columns: [_1, _2];
  at
org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)


On Mon, Nov 14, 2016 at 1:44 PM, Nick Pentreath <nick.pentre...@gmail.com>
wrote:

DataFrame.rdd returns an RDD[Row]. You'll need to use map to extract the
doubles from the test score and label DF.

But you may prefer to just use spark.ml evaluators, which work with
DataFrames. Try BinaryClassificationEvaluator.

On Mon, 14 Nov 2016 at 19:30, Bhaarat Sharma <bhaara...@gmail.com> wrote:

I am getting scala.MatchError in the code below. I'm not able to see why
this would be happening. I am using Spark 2.0.1

scala> testResults.columns
res538: Array[String] = Array(TopicVector, subject_id, hadm_id,
isElective, isNewborn, isUrgent, isEmergency, isMale, isFemale,
oasis_score, sapsii_score, sofa_score, age, hosp_death, test,
ModelFeatures, Label, rawPrediction, ModelProbability,
ModelPrediction)

scala> testResults.select("Label","ModelProbability").take(1)
res542: Array[org.apache.spark.sql.Row] =
Array([0.0,[0.737304818744076,0.262695181255924]])

scala> val testScoreAndLabel = testResults.
 | select("Label","ModelProbability").
 | map { case Row(l:Double, p:Vector) => (p(1), l) }
testScoreAndLabel: org.apache.spark.sql.Dataset[(Double, Double)] =
[_1: double, _2: double]

scala> testScoreAndLabel
res539: org.apache.spark.sql.Dataset[(Double, Double)] = [_1: double,
_2: double]

scala> testScoreAndLabel.columns
res540: Array[String] = Array(_1, _2)

scala> val testMetrics = new BinaryClassificationMetrics(testScoreAndLabel.rdd)
testMetrics: org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
= org.apache.spark.mllib.evaluation.BinaryClassificationMetrics@36e780d1

The code below gives the error

val auROC = testMetrics.areaUnderROC() //this line gives the error

Caused by: scala.MatchError:
[0.0,[0.7316583497453766,0.2683416502546234]] (of class
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)


Re: scala.MatchError while doing BinaryClassificationMetrics

2016-11-14 Thread Nick Pentreath
DataFrame.rdd returns an RDD[Row]. You'll need to use map to extract the
doubles from the test score and label DF.

But you may prefer to just use spark.ml evaluators, which work with
DataFrames. Try BinaryClassificationEvaluator.

On Mon, 14 Nov 2016 at 19:30, Bhaarat Sharma  wrote:

> I am getting scala.MatchError in the code below. I'm not able to see why
> this would be happening. I am using Spark 2.0.1
>
> scala> testResults.columns
> res538: Array[String] = Array(TopicVector, subject_id, hadm_id, isElective, 
> isNewborn, isUrgent, isEmergency, isMale, isFemale, oasis_score, 
> sapsii_score, sofa_score, age, hosp_death, test, ModelFeatures, Label, 
> rawPrediction, ModelProbability, ModelPrediction)
>
> scala> testResults.select("Label","ModelProbability").take(1)
> res542: Array[org.apache.spark.sql.Row] = 
> Array([0.0,[0.737304818744076,0.262695181255924]])
>
> scala> val testScoreAndLabel = testResults.
>  | select("Label","ModelProbability").
>  | map { case Row(l:Double, p:Vector) => (p(1), l) }
> testScoreAndLabel: org.apache.spark.sql.Dataset[(Double, Double)] = [_1: 
> double, _2: double]
>
> scala> testScoreAndLabel
> res539: org.apache.spark.sql.Dataset[(Double, Double)] = [_1: double, _2: 
> double]
>
> scala> testScoreAndLabel.columns
> res540: Array[String] = Array(_1, _2)
>
> scala> val testMetrics = new 
> BinaryClassificationMetrics(testScoreAndLabel.rdd)
> testMetrics: org.apache.spark.mllib.evaluation.BinaryClassificationMetrics = 
> org.apache.spark.mllib.evaluation.BinaryClassificationMetrics@36e780d1
>
> The code below gives the error
>
> val auROC = testMetrics.areaUnderROC() //this line gives the error
>
> Caused by: scala.MatchError: [0.0,[0.7316583497453766,0.2683416502546234]] 
> (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
>
>


Re: Nearest neighbour search

2016-11-14 Thread Nick Pentreath
LSH-based NN search and similarity join should be out in Spark 2.1 -
there's a little work being done still to clear up the APIs and some
functionality.

Check out https://issues.apache.org/jira/browse/SPARK-5992

On Mon, 14 Nov 2016 at 16:12, Kevin Mellott 
wrote:

> You may be able to benefit from Soundcloud's open source implementation,
> either as a solution or as a reference implementation.
>
> https://github.com/soundcloud/cosine-lsh-join-spark
>
> Thanks,
> Kevin
>
> On Sun, Nov 13, 2016 at 2:07 PM, Meeraj Kunnumpurath <
> mee...@servicesymphony.com> wrote:
>
> That was a bit of a brute force search, so I changed the code to use a UDF
> to create the dot product between the two IDF vectors, and do a sort on the
> new column.
>
> package com.ss.ml.clustering
>
> import org.apache.spark.sql.{DataFrame, SparkSession}
> import org.apache.spark.sql.functions._
> import org.apache.spark.ml.feature.{IDF, Tokenizer, HashingTF}
> import org.apache.spark.ml.linalg.Vector
>
> object ClusteringBasics extends App {
>
>   val spark = SparkSession.builder().appName("Clustering 
> Basics").master("local").getOrCreate()
>   import spark.implicits._
>
>   val df = spark.read.option("header", "false").csv("data")
>
>   val tk = new Tokenizer().setInputCol("_c2").setOutputCol("words")
>   val tf = new HashingTF().setInputCol("words").setOutputCol("tf")
>   val idf = new IDF().setInputCol("tf").setOutputCol("tf-idf")
>
>   val df1 = tf.transform(tk.transform(df))
>   val idfs = idf.fit(df1).transform(df1)
>
>   val nn = nearestNeighbour("", 
> idfs)
>   println(nn)
>
>   def nearestNeighbour(uri: String, ds: DataFrame) : String = {
> val tfIdfSrc = ds.filter(s"_c0 == 
> '$uri'").take(1)(0).getAs[Vector]("tf-idf")
> def dorProduct(vectorA: Vector) = {
>   var dp = 0.0
>   var index = vectorA.size - 1
>   for (i <- 0 to index) {
> dp += vectorA(i) * tfIdfSrc(i)
>   }
>   dp
> }
> val dpUdf = udf((v1: Vector, v2: Vector) => dorProduct(v1))
> ds.filter(s"_c0 != '$uri'").withColumn("dp", 
> dpUdf('tf-idf)).sort("dp").take(1)(0).getString(1)
>   }
>
> }
>
>
> However, that is generating the exception below,
>
> Exception in thread "main" java.lang.RuntimeException: Unsupported literal
> type class org.apache.spark.ml.feature.IDF idf_e49381a285dd
> at
> org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:57)
> at org.apache.spark.sql.functions$.lit(functions.scala:101)
> at org.apache.spark.sql.Column.$minus(Column.scala:672)
> at
> com.ss.ml.clustering.ClusteringBasics$.nearestNeighbour(ClusteringBasics.scala:36)
> at
> com.ss.ml.clustering.ClusteringBasics$.delayedEndpoint$com$ss$ml$clustering$ClusteringBasics$1(ClusteringBasics.scala:22)
> at
> com.ss.ml.clustering.ClusteringBasics$delayedInit$body.apply(ClusteringBasics.scala:8)
> at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
> at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
> at scala.App$$anonfun$main$1.apply(App.scala:76)
> at scala.App$$anonfun$main$1.apply(App.scala:76)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at
> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
> at scala.App$class.main(App.scala:76)
> at com.ss.ml.clustering.ClusteringBasics$.main(ClusteringBasics.scala:8)
> at com.ss.ml.clustering.ClusteringBasics.main(ClusteringBasics.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:483)
> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
>
> On Sun, Nov 13, 2016 at 10:56 PM, Meeraj Kunnumpurath <
> mee...@servicesymphony.com> wrote:
>
> This is what I have done, is there a better way of doing this?
>
>   val df = spark.read.option("header", "false").csv("data")
>
>
>   val tk = new Tokenizer().setInputCol("_c2").setOutputCol("words")
>
>   val tf = new HashingTF().setInputCol("words").setOutputCol("tf")
>
>   val idf = new IDF().setInputCol("tf").setOutputCol("tf-idf")
>
>
>   val df1 = tf.transform(tk.transform(df))
>
>   val idfs = idf.fit(df1).transform(df1)
>
>
>   println(nearestNeighbour("http://dbpedia.org/resource/Barack_Obama;,
> idfs))
>
>
>   def nearestNeighbour(uri: String, ds: DataFrame) : String = {
>
> var res : Row = null
>
> var metric : Double = 0
>
> val tfIdfSrc = ds.filter(s"_c0 ==
> '$uri'").take(1)(0).getAs[Vector]("tf-idf")
>
> ds.filter("_c0 != '" + uri + "'").foreach { r =>
>
>   val tfIdfDst = r.getAs[Vector]("tf-idf")
>
>   val dp = dorProduct(tfIdfSrc, tfIdfDst)
>
>   if (dp > metric) {
>
> res = r
>
> metric = dp
>
>   }
>
> }
>
> return res.getAs[String]("_c1")
>
>   }
>

Re: Finding a Spark Equivalent for Pandas' get_dummies

2016-11-11 Thread Nick Pentreath
For now OHE supports a single column. So you have to have 1000 OHE in a
pipeline. However you can add them programatically so it is not too bad. If
the cardinality of each feature is quite low, it should be workable.

After that user VectorAssembler to stitch the vectors together (which
accepts multiple input columns).

The other approach is - if your features are all categorical - to encode
the features as "feature_name=feature_value" strings. This can
unfortunately only be done with RDD ops since a UDF can't accept multiple
columns as input at this time. You can create a new column with all the
feature name/value pairs as a list of strings ["feature_1=foo",
"feature_2=bar", ...]. Then use CountVectorizer to create your binary
vectors. This basically works like the DictVectorizer in scikit-learn.



On Fri, 11 Nov 2016 at 20:33 nsharkey  wrote:

> I have a dataset that I need to convert some of the the variables to dummy
> variables. The get_dummies function in Pandas works perfectly on smaller
> datasets but since it collects I'll always be bottlenecked by the master
> node.
>
> I've looked at Spark's OHE feature and while that will work in theory I
> have over a thousand variables I need to convert so I don't want to have to
> do 1000+ OHE. My project is pretty simple in scope: read in a raw CSV,
> convert the categorical variables into dummy variables, then save the
> transformed data back to CSV. That is why I'm so interested in get_dummies
> but it's not scalable enough for my data size (500-600GB per file).
>
> Thanks in advance.
>
> Nick
>
> --
> View this message in context: Finding a Spark Equivalent for Pandas'
> get_dummies
> 
> Sent from the Apache Spark User List mailing list archive
>  at Nabble.com.
>


Re: ALS.trainImplicit block sizes

2016-10-21 Thread Nick Pentreath
Oh also you mention 20 partitions. Is that how many you have? How many
ratings?

It may be worth trying to reparation to larger number of partitions.

On Fri, 21 Oct 2016 at 17:04, Nick Pentreath <nick.pentre...@gmail.com>
wrote:

> I wonder if you can try with setting different blocks for user and item?
> Are you able to try 2.0 or use Scala for setting it in 1.6?
>
> You want your item blocks to be a lot less than user blocks. Items maybe
> 5-10, users perhaps 250-500?
>
> Do you have many "power items" that are connected to almost every user? Or
> vice versa?
>
> On Fri, 21 Oct 2016 at 16:46, Nikhil Mishra <nikhilmishra8...@gmail.com>
> wrote:
>
> Yes, that's what I tried initially. The default value is pretty low -
> something like 20. Default depends on the number of partitions in the
> ratings RDD. It was going out of memory with the default size too.
>
> On Fri, Oct 21, 2016 at 5:31 AM, Nick Pentreath <nick.pentre...@gmail.com>
> wrote:
>
> Did you try not setting the blocks parameter? It will then try to set it
> automatically for your data size.
> On Fri, 21 Oct 2016 at 09:16, Nikhil Mishra <nikhilmishra8...@gmail.com>
> wrote:
>
> I am using 105 nodes (1 master, 4 core and 100 task nodes). All are 7.5
> gig machines.
>
> On Fri, Oct 21, 2016 at 12:15 AM, Nick Pentreath <nick.pentre...@gmail.com
> > wrote:
>
> How many nodes are you using in the cluster?
>
>
>
> On Fri, 21 Oct 2016 at 08:58 Nikhil Mishra <nikhilmishra8...@gmail.com>
> wrote:
>
> Thanks Nick.
>
> So we do partition U x I matrix into BxB matrices, each of size around U/B
> and I/B. Is that correct? Do you know whether a single block of the matrix
> is represented in memory as a full matrix or as sparse matrix? I ask this
> because my job has been failing for block sizes which should have worked.
>
> I have U = 85 million users, I = 250,000 items and when I specify block
> size 5,000, I get out of memory error, even though I am setting
> --executor-memory as 7g (on a linux EC2 which has 7.5g memory). Assuming
> each block has 17000 users and 50 items, eve if the block is internally
> represented as a full matrix, it should still occupy around 50MB space.
>
> Increasing block size to 20,000 also results in the same. So there is
> something I don't understand about how this is working.
>
> BTW, I am trying to find 50 latent factors (rank = 50).
>
> Do you have some insights as to how I should tweak things to get this
> working?
>
> Thanks,
> Nik
>
> On Thu, Oct 20, 2016 at 11:43 PM, Nick Pentreath <nick.pentre...@gmail.com
> > wrote:
>
> The blocks params will set both user and item blocks.
>
> Spark 2.0 supports user and item blocks for PySpark:
> http://spark.apache.org/docs/latest/api/python/pyspark.ml.html#module-pyspark.ml.recommendation
>
>
> On Fri, 21 Oct 2016 at 08:12 Nikhil Mishra <nikhilmishra8...@gmail.com>
> wrote:
>
> Hi,
>
> I have a question about the block size to be specified in
> ALS.trainImplicit() in pyspark (Spark 1.6.1). There is only one block size
> parameter to be specified. I want to know if that would result in
> partitioning both the users as well as the items axes.
>
> For example, I am using the following call to ALs.trainImplicit() in my
> code.
>
> ---
>
> RANK = 50
>
> ITERATIONS = 2
>
> BLOCKS = 1000
>
> ALPHA = 1.0
>
> model = ALS.trainImplicit(ratings, RANK, ITERATIONS, blocks=BLOCKS,
> alpha=ALPHA)
>
>
> 
>
> Will this partition the users x items matrix into BLOCKS x BLOCKS number
> of matrices or will it partition only the users axis thereby resulting in
> BLOCKS number of matrices, each with columns = total number of unique items?
>
> Thanks,
> Nik
>
>
>
>
>


Re: ALS.trainImplicit block sizes

2016-10-21 Thread Nick Pentreath
I wonder if you can try with setting different blocks for user and item?
Are you able to try 2.0 or use Scala for setting it in 1.6?

You want your item blocks to be a lot less than user blocks. Items maybe
5-10, users perhaps 250-500?

Do you have many "power items" that are connected to almost every user? Or
vice versa?

On Fri, 21 Oct 2016 at 16:46, Nikhil Mishra <nikhilmishra8...@gmail.com>
wrote:

> Yes, that's what I tried initially. The default value is pretty low -
> something like 20. Default depends on the number of partitions in the
> ratings RDD. It was going out of memory with the default size too.
>
> On Fri, Oct 21, 2016 at 5:31 AM, Nick Pentreath <nick.pentre...@gmail.com>
> wrote:
>
> Did you try not setting the blocks parameter? It will then try to set it
> automatically for your data size.
> On Fri, 21 Oct 2016 at 09:16, Nikhil Mishra <nikhilmishra8...@gmail.com>
> wrote:
>
> I am using 105 nodes (1 master, 4 core and 100 task nodes). All are 7.5
> gig machines.
>
> On Fri, Oct 21, 2016 at 12:15 AM, Nick Pentreath <nick.pentre...@gmail.com
> > wrote:
>
> How many nodes are you using in the cluster?
>
>
>
> On Fri, 21 Oct 2016 at 08:58 Nikhil Mishra <nikhilmishra8...@gmail.com>
> wrote:
>
> Thanks Nick.
>
> So we do partition U x I matrix into BxB matrices, each of size around U/B
> and I/B. Is that correct? Do you know whether a single block of the matrix
> is represented in memory as a full matrix or as sparse matrix? I ask this
> because my job has been failing for block sizes which should have worked.
>
> I have U = 85 million users, I = 250,000 items and when I specify block
> size 5,000, I get out of memory error, even though I am setting
> --executor-memory as 7g (on a linux EC2 which has 7.5g memory). Assuming
> each block has 17000 users and 50 items, eve if the block is internally
> represented as a full matrix, it should still occupy around 50MB space.
>
> Increasing block size to 20,000 also results in the same. So there is
> something I don't understand about how this is working.
>
> BTW, I am trying to find 50 latent factors (rank = 50).
>
> Do you have some insights as to how I should tweak things to get this
> working?
>
> Thanks,
> Nik
>
> On Thu, Oct 20, 2016 at 11:43 PM, Nick Pentreath <nick.pentre...@gmail.com
> > wrote:
>
> The blocks params will set both user and item blocks.
>
> Spark 2.0 supports user and item blocks for PySpark:
> http://spark.apache.org/docs/latest/api/python/pyspark.ml.html#module-pyspark.ml.recommendation
>
>
> On Fri, 21 Oct 2016 at 08:12 Nikhil Mishra <nikhilmishra8...@gmail.com>
> wrote:
>
> Hi,
>
> I have a question about the block size to be specified in
> ALS.trainImplicit() in pyspark (Spark 1.6.1). There is only one block size
> parameter to be specified. I want to know if that would result in
> partitioning both the users as well as the items axes.
>
> For example, I am using the following call to ALs.trainImplicit() in my
> code.
>
> ---
>
> RANK = 50
>
> ITERATIONS = 2
>
> BLOCKS = 1000
>
> ALPHA = 1.0
>
> model = ALS.trainImplicit(ratings, RANK, ITERATIONS, blocks=BLOCKS,
> alpha=ALPHA)
>
>
> 
>
> Will this partition the users x items matrix into BLOCKS x BLOCKS number
> of matrices or will it partition only the users axis thereby resulting in
> BLOCKS number of matrices, each with columns = total number of unique items?
>
> Thanks,
> Nik
>
>
>
>
>


Re: ALS.trainImplicit block sizes

2016-10-21 Thread Nick Pentreath
How many nodes are you using in the cluster?



On Fri, 21 Oct 2016 at 08:58 Nikhil Mishra <nikhilmishra8...@gmail.com>
wrote:

> Thanks Nick.
>
> So we do partition U x I matrix into BxB matrices, each of size around U/B
> and I/B. Is that correct? Do you know whether a single block of the matrix
> is represented in memory as a full matrix or as sparse matrix? I ask this
> because my job has been failing for block sizes which should have worked.
>
> I have U = 85 million users, I = 250,000 items and when I specify block
> size 5,000, I get out of memory error, even though I am setting
> --executor-memory as 7g (on a linux EC2 which has 7.5g memory). Assuming
> each block has 17000 users and 50 items, eve if the block is internally
> represented as a full matrix, it should still occupy around 50MB space.
>
> Increasing block size to 20,000 also results in the same. So there is
> something I don't understand about how this is working.
>
> BTW, I am trying to find 50 latent factors (rank = 50).
>
> Do you have some insights as to how I should tweak things to get this
> working?
>
> Thanks,
> Nik
>
> On Thu, Oct 20, 2016 at 11:43 PM, Nick Pentreath <nick.pentre...@gmail.com
> > wrote:
>
> The blocks params will set both user and item blocks.
>
> Spark 2.0 supports user and item blocks for PySpark:
> http://spark.apache.org/docs/latest/api/python/pyspark.ml.html#module-pyspark.ml.recommendation
>
>
> On Fri, 21 Oct 2016 at 08:12 Nikhil Mishra <nikhilmishra8...@gmail.com>
> wrote:
>
> Hi,
>
> I have a question about the block size to be specified in
> ALS.trainImplicit() in pyspark (Spark 1.6.1). There is only one block size
> parameter to be specified. I want to know if that would result in
> partitioning both the users as well as the items axes.
>
> For example, I am using the following call to ALs.trainImplicit() in my
> code.
>
> ---
>
> RANK = 50
>
> ITERATIONS = 2
>
> BLOCKS = 1000
>
> ALPHA = 1.0
>
> model = ALS.trainImplicit(ratings, RANK, ITERATIONS, blocks=BLOCKS,
> alpha=ALPHA)
>
>
> 
>
> Will this partition the users x items matrix into BLOCKS x BLOCKS number
> of matrices or will it partition only the users axis thereby resulting in
> BLOCKS number of matrices, each with columns = total number of unique items?
>
> Thanks,
> Nik
>
>
>


Re: [Spark ML] Using GBTClassifier in OneVsRest

2016-10-21 Thread Nick Pentreath
Currently no - GBT implements the predictors, not the classifier interface.
It might be possible to wrap it in a wrapper that extends the Classifier
trait.

Hopefully GBT will support multi-class at some point. But you can use
RandomForest which does support multi-class.

On Fri, 21 Oct 2016 at 02:12 ansari  wrote:

> It appears as if the inheritance hierarchy doesn't allow GBTClassifiers to
> be
> used as the binary classifier in a OneVsRest trainer. Is there a simple way
> to use gradient-boosted trees for multiclass (not binary) problems?
>
> Specifically, it complains that GBTClassifier doesn't inherit from
> Classifier[_, _, _].
>
> I'm using Spark 2.0.1:
>
> val gbt = new GBTClassifier()
>   .setLabelCol("indexedLabel")
>   .setFeaturesCol("features")
>   .setMaxIter(10)
>   .setMaxDepth(10)
>
> val ovr = new OneVsRest().
> setClassifier(gbt)
>
> fails saying
>
> error: type mismatch;
>  found   : org.apache.spark.ml.classification.GBTClassifier
>  required: org.apache.spark.ml.classification.Classifier[_, _, _]
>setClassifier(gbt)
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-ML-Using-GBTClassifier-in-OneVsRest-tp27933.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: ALS.trainImplicit block sizes

2016-10-21 Thread Nick Pentreath
The blocks params will set both user and item blocks.

Spark 2.0 supports user and item blocks for PySpark:
http://spark.apache.org/docs/latest/api/python/pyspark.ml.html#module-pyspark.ml.recommendation

On Fri, 21 Oct 2016 at 08:12 Nikhil Mishra 
wrote:

> Hi,
>
> I have a question about the block size to be specified in
> ALS.trainImplicit() in pyspark (Spark 1.6.1). There is only one block size
> parameter to be specified. I want to know if that would result in
> partitioning both the users as well as the items axes.
>
> For example, I am using the following call to ALs.trainImplicit() in my
> code.
>
> ---
>
> RANK = 50
>
> ITERATIONS = 2
>
> BLOCKS = 1000
>
> ALPHA = 1.0
>
> model = ALS.trainImplicit(ratings, RANK, ITERATIONS, blocks=BLOCKS,
> alpha=ALPHA)
>
>
> 
>
> Will this partition the users x items matrix into BLOCKS x BLOCKS number
> of matrices or will it partition only the users axis thereby resulting in
> BLOCKS number of matrices, each with columns = total number of unique items?
>
> Thanks,
> Nik
>


Re: Making more features in Logistic Regression

2016-10-18 Thread Nick Pentreath
You can use the PolynomialExpansion in Spark ML (
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.ml.feature.PolynomialExpansion
)

On Tue, 18 Oct 2016 at 21:47 miro  wrote:

> Yes, I was thinking going down this road:
>
>
> http://scikit-learn.org/stable/modules/linear_model.html#polynomial-regression-extending-linear-models-with-basis-functions
>
> http://stats.stackexchange.com/questions/58739/polynomial-regression-using-scikit-learn
>
>
> But I’m not sure if spark actually has polynomial regression implemented
> (I couldn’t find it) - maybe SparkML gurus can help here?
>
> You could take a look also at scikit integration package with Spark (
> https://github.com/databricks/spark-sklearn).
>
> Hope it helped :)
>
> All the best,
> m.
>
>
>
> On 18 Oct 2016, at 20:36, aditya1702  wrote:
>
> 
>
> 
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Making-more-features-in-Logistic-Regression-tp27915p27918.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>
>


Re: can mllib Logistic Regression package handle 10 million sparse features?

2016-10-11 Thread Nick Pentreath
That's a good point about shuffle data compression. Still, it would be good
to benchmark the ideas behind https://github.com/apache/spark/pull/12761 I
think.

For many datasets, even within one partition the gradient sums etc can
remain very sparse. For example Criteo DAC data is extremely sparse - and
it has roughly 5% of active features per partition. However, you're correct
that as the coefficients (and intermediate stats counters) get aggregated
they will become more and more dense. But there is also the intermediate
memory overhead of the dense structures, though that comes into play in the
100s - 1000s millions feature range.

The situation in the PR above is actually different in that even the
coefficient vector itself is truly sparse (through some encoding they did
IRC). This is not an uncommon scenario however, as for high-dimensional
features users may want to use feature hashing which may result in actually
sparse coefficient vectors. With hashing often the feature dimension will
be chosen as power of 2 and higher (in some cases significantly) than the
true feature dimension to reduce collisions. So sparsity is critical here
for storage efficiency.

Your result for the final stage does seem to indicate something can be
improved - perhaps it is due to some level of fetch parallelism - so more
partitions may fetch more data in parallel? Because with just default
setting for `treeAggregate` I was seeing much faster times for the final
stage with 34 million feature dimension (though the final shuffle size
seems 50% of yours with 2x the features - this is with Spark 2.0.1, I
haven't tested out master yet with this data).

[image: Screen Shot 2016-10-11 at 12.03.55 PM.png]



On Fri, 7 Oct 2016 at 08:11 DB Tsai <dbt...@dbtsai.com> wrote:

> Hi Nick,
>
>
>
> I'm also working on the benchmark of liner models in Spark. :)
>
>
>
> One thing I saw is that for sparse features, 14 million features, with
>
> multi-depth aggregation, the final aggregation to the driver is
>
> extremely slow. See the attachment. The amount of data being exchanged
>
> between executor and executor is significantly larger than collecting
>
> the data into driver, but the time for collecting the data back to
>
> driver takes 4mins while the aggregation between executors only takes
>
> 20secs. Seems that the code path is different, and I suspect that
>
> there may be something in the spark core that we can optimize.
>
>
>
> Regrading using sparse data structure for aggregation, I'm not so sure
>
> how much this will improve the performance. Since after computing the
>
> gradient sum for all the data in one partitions, the vector will be no
>
> longer to be very sparse. Even it's sparse, after couple depth of
>
> aggregation, it will be very dense. Also, we perform the compression
>
> in the shuffle phase, so if there are many zeros, even it's in dense
>
> vector representation, the vector should take around the same size as
>
> sparse representation. I can be wrong since I never do a study on
>
> this, and I wonder how much performance we can gain in practice by
>
> using sparse vector for aggregating the gradients.
>
>
>
> Sincerely,
>
>
>
> DB Tsai
>
> --
>
> Web: https://www.dbtsai.com
>
> PGP Key ID: 0xAF08DF8D
>
>
>
>
>
> On Thu, Oct 6, 2016 at 4:09 AM, Nick Pentreath <nick.pentre...@gmail.com>
> wrote:
>
> > I'm currently working on various performance tests for large, sparse
> feature
>
> > spaces.
>
> >
>
> > For the Criteo DAC data - 45.8 million rows, 34.3 million features
>
> > (categorical, extremely sparse), the time per iteration for
>
> > ml.LogisticRegression is about 20-30s.
>
> >
>
> > This is with 4x worker nodes, 48 cores & 120GB RAM each. I haven't yet
> tuned
>
> > the tree aggregation depth. But the number of partitions can make a
>
> > difference - generally fewer is better since the cost is mostly
>
> > communication of the gradient (the gradient computation is < 10% of the
>
> > per-iteration time).
>
> >
>
> > Note that the current impl forces dense arrays for intermediate data
>
> > structures, increasing the communication cost significantly. See this PR
> for
>
> > info: https://github.com/apache/spark/pull/12761. Once sparse data
>
> > structures are supported for this, the linear models will be orders of
>
> > magnitude more scalable for sparse data.
>
> >
>
> >
>
> > On Wed, 5 Oct 2016 at 23:37 DB Tsai <dbt...@dbtsai.com> wrote:
>
> >>
>
> >> With the latest code in the current master, we're successfully

Re: why spark ml package doesn't contain svm algorithm

2016-09-27 Thread Nick Pentreath
There is a JIRA and PR for it -
https://issues.apache.org/jira/browse/SPARK-14709

On Tue, 27 Sep 2016 at 09:10 hxw黄祥为  wrote:

> I have found spark ml package have implement naivebayes algorithm and the
> source code is simple,.
>
> I am confusing why spark ml package doesn’t contain svm algorithm,it seems
> not very hard to do that.
>


Re: Spark MLlib ALS algorithm

2016-09-24 Thread Nick Pentreath
The scale factor was only to scale up the number of ratings in the dataset
for performance testing purposes, to illustrate the scalability of Spark
ALS.

It is not something you would normally do on your training dataset.
On Fri, 23 Sep 2016 at 20:07, Roshani Nagmote 
wrote:

> Hello,
>
> I was working on Spark MLlib ALS Matrix factorization algorithm and came
> across the following blog post:
>
>
> https://databricks.com/blog/2014/07/23/scalable-collaborative-filtering-with-spark-mllib.html
>
> Can anyone help me understanding what "s" scaling factor does and does it
> really give better performance? What's the significance of this?
> If we convert input data to scaledData with the help of "s", will it
> speedup the algorithm?
>
> Scaled data usage:
> *(For each user, we create pseudo-users that have the same ratings. That
> is, for every rating as (userId, productId, rating), we generate (userId+i,
> productId, rating) where 0 <= i < s and s is the scaling factor)*
>
> Also, this blogpost is for spark 1.1 and I am currently using 2.0
>
> Any help will be greatly appreciated.
>
> Thanks,
> Roshani
>


Re: Similar Items

2016-09-21 Thread Nick Pentreath
Sorry, the original repo: https://github.com/karlhigley/spark-neighbors

On Wed, 21 Sep 2016 at 13:09 Nick Pentreath <nick.pentre...@gmail.com>
wrote:

> I should also point out another library I had not come across before :
> https://github.com/sethah/spark-neighbors
>
>
> On Tue, 20 Sep 2016 at 21:03 Kevin Mellott <kevin.r.mell...@gmail.com>
> wrote:
>
>> Using the Soundcloud implementation of LSH, I was able to process a 22K
>> product dataset in a mere 65 seconds! Thanks so much for the help!
>>
>> On Tue, Sep 20, 2016 at 1:15 PM, Kevin Mellott <kevin.r.mell...@gmail.com
>> > wrote:
>>
>>> Thanks Nick - those examples will help a ton!!
>>>
>>> On Tue, Sep 20, 2016 at 12:20 PM, Nick Pentreath <
>>> nick.pentre...@gmail.com> wrote:
>>>
>>>> A few options include:
>>>>
>>>> https://github.com/marufaytekin/lsh-spark - I've used this a bit and
>>>> it seems quite scalable too from what I've looked at.
>>>> https://github.com/soundcloud/cosine-lsh-join-spark - not used this
>>>> but looks like it should do exactly what you need.
>>>> https://github.com/mrsqueeze/*spark*-hash
>>>> <https://github.com/mrsqueeze/spark-hash>
>>>>
>>>>
>>>> On Tue, 20 Sep 2016 at 18:06 Kevin Mellott <kevin.r.mell...@gmail.com>
>>>> wrote:
>>>>
>>>>> Thanks for the reply, Nick! I'm typically analyzing around 30-50K
>>>>> products at a time (as an isolated set of products). Within this set of
>>>>> products (which represents all products for a particular supplier), I am
>>>>> also analyzing each category separately. The largest categories typically
>>>>> have around 10K products.
>>>>>
>>>>> That being said, when calculating IDFs for the 10K product set we come
>>>>> out with roughly 12K unique tokens. In other words, our vectors are 12K
>>>>> columns wide (although they are being represented using SparseVectors). We
>>>>> have a step that is attempting to locate all documents that share the same
>>>>> tokens, and for those items we will calculate the cosine similarity.
>>>>> However, the part that attempts to identify documents with shared tokens 
>>>>> is
>>>>> the bottleneck.
>>>>>
>>>>> For this portion, we map our data down to the individual tokens
>>>>> contained by each document. For example:
>>>>>
>>>>> DocumentId   |   Description
>>>>>
>>>>> 
>>>>> 1   Easton Hockey Stick
>>>>> 2   Bauer Hockey Gloves
>>>>>
>>>>> In this case, we'd map to the following:
>>>>>
>>>>> (1, 'Easton')
>>>>> (1, 'Hockey')
>>>>> (1, 'Stick')
>>>>> (2, 'Bauer')
>>>>> (2, 'Hockey')
>>>>> (2, 'Gloves')
>>>>>
>>>>> Our goal is to aggregate this data as follows; however, our code that
>>>>> currently does this is does not perform well. In the realistic 12K product
>>>>> scenario, this resulted in 430K document/token tuples.
>>>>>
>>>>> ((1, 2), ['Hockey'])
>>>>>
>>>>> This then tells us that documents 1 and 2 need to be compared to one
>>>>> another (via cosine similarity) because they both contain the token
>>>>> 'hockey'. I will investigate the methods that you recommended to see if
>>>>> they may resolve our problem.
>>>>>
>>>>> Thanks,
>>>>> Kevin
>>>>>
>>>>> On Tue, Sep 20, 2016 at 1:45 AM, Nick Pentreath <
>>>>> nick.pentre...@gmail.com> wrote:
>>>>>
>>>>>> How many products do you have? How large are your vectors?
>>>>>>
>>>>>> It could be that SVD / LSA could be helpful. But if you have many
>>>>>> products then trying to compute all-pair similarity with brute force is 
>>>>>> not
>>>>>> going to be scalable. In this case you may want to investigate hashing
>>>>>> (LSH) techniques.
>>>>>>
>>>>>>
>>>>>> On Mon, 19 Sep 2016 at 22:49, Kevin Mellott <
>>>>>> kevin.r.mell...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi all,
>>>>>>>
>>>>>>> I'm trying to write a Spark application that will detect similar
>>>>>>> items (in this case products) based on their descriptions. I've got an 
>>>>>>> ML
>>>>>>> pipeline that transforms the product data to TF-IDF representation, 
>>>>>>> using
>>>>>>> the following components.
>>>>>>>
>>>>>>>- *RegexTokenizer* - strips out non-word characters, results in
>>>>>>>a list of tokens
>>>>>>>- *StopWordsRemover* - removes common "stop words", such as
>>>>>>>"the", "and", etc.
>>>>>>>- *HashingTF* - assigns a numeric "hash" to each token and
>>>>>>>calculates the term frequency
>>>>>>>- *IDF* - computes the inverse document frequency
>>>>>>>
>>>>>>> After this pipeline evaluates, I'm left with a SparseVector that
>>>>>>> represents the inverse document frequency of tokens for each product. 
>>>>>>> As a
>>>>>>> next step, I'd like to be able to compare each vector to one another, to
>>>>>>> detect similarities.
>>>>>>>
>>>>>>> Does anybody know of a straightforward way to do this in Spark? I
>>>>>>> tried creating a UDF (that used the Breeze linear algebra methods
>>>>>>> internally); however, that did not scale well.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Kevin
>>>>>>>
>>>>>>
>>>>>
>>>
>>


Re: Similar Items

2016-09-21 Thread Nick Pentreath
I should also point out another library I had not come across before :
https://github.com/sethah/spark-neighbors

On Tue, 20 Sep 2016 at 21:03 Kevin Mellott <kevin.r.mell...@gmail.com>
wrote:

> Using the Soundcloud implementation of LSH, I was able to process a 22K
> product dataset in a mere 65 seconds! Thanks so much for the help!
>
> On Tue, Sep 20, 2016 at 1:15 PM, Kevin Mellott <kevin.r.mell...@gmail.com>
> wrote:
>
>> Thanks Nick - those examples will help a ton!!
>>
>> On Tue, Sep 20, 2016 at 12:20 PM, Nick Pentreath <
>> nick.pentre...@gmail.com> wrote:
>>
>>> A few options include:
>>>
>>> https://github.com/marufaytekin/lsh-spark - I've used this a bit and it
>>> seems quite scalable too from what I've looked at.
>>> https://github.com/soundcloud/cosine-lsh-join-spark - not used this but
>>> looks like it should do exactly what you need.
>>> https://github.com/mrsqueeze/*spark*-hash
>>> <https://github.com/mrsqueeze/spark-hash>
>>>
>>>
>>> On Tue, 20 Sep 2016 at 18:06 Kevin Mellott <kevin.r.mell...@gmail.com>
>>> wrote:
>>>
>>>> Thanks for the reply, Nick! I'm typically analyzing around 30-50K
>>>> products at a time (as an isolated set of products). Within this set of
>>>> products (which represents all products for a particular supplier), I am
>>>> also analyzing each category separately. The largest categories typically
>>>> have around 10K products.
>>>>
>>>> That being said, when calculating IDFs for the 10K product set we come
>>>> out with roughly 12K unique tokens. In other words, our vectors are 12K
>>>> columns wide (although they are being represented using SparseVectors). We
>>>> have a step that is attempting to locate all documents that share the same
>>>> tokens, and for those items we will calculate the cosine similarity.
>>>> However, the part that attempts to identify documents with shared tokens is
>>>> the bottleneck.
>>>>
>>>> For this portion, we map our data down to the individual tokens
>>>> contained by each document. For example:
>>>>
>>>> DocumentId   |   Description
>>>>
>>>> 
>>>> 1   Easton Hockey Stick
>>>> 2   Bauer Hockey Gloves
>>>>
>>>> In this case, we'd map to the following:
>>>>
>>>> (1, 'Easton')
>>>> (1, 'Hockey')
>>>> (1, 'Stick')
>>>> (2, 'Bauer')
>>>> (2, 'Hockey')
>>>> (2, 'Gloves')
>>>>
>>>> Our goal is to aggregate this data as follows; however, our code that
>>>> currently does this is does not perform well. In the realistic 12K product
>>>> scenario, this resulted in 430K document/token tuples.
>>>>
>>>> ((1, 2), ['Hockey'])
>>>>
>>>> This then tells us that documents 1 and 2 need to be compared to one
>>>> another (via cosine similarity) because they both contain the token
>>>> 'hockey'. I will investigate the methods that you recommended to see if
>>>> they may resolve our problem.
>>>>
>>>> Thanks,
>>>> Kevin
>>>>
>>>> On Tue, Sep 20, 2016 at 1:45 AM, Nick Pentreath <
>>>> nick.pentre...@gmail.com> wrote:
>>>>
>>>>> How many products do you have? How large are your vectors?
>>>>>
>>>>> It could be that SVD / LSA could be helpful. But if you have many
>>>>> products then trying to compute all-pair similarity with brute force is 
>>>>> not
>>>>> going to be scalable. In this case you may want to investigate hashing
>>>>> (LSH) techniques.
>>>>>
>>>>>
>>>>> On Mon, 19 Sep 2016 at 22:49, Kevin Mellott <kevin.r.mell...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> I'm trying to write a Spark application that will detect similar
>>>>>> items (in this case products) based on their descriptions. I've got an ML
>>>>>> pipeline that transforms the product data to TF-IDF representation, using
>>>>>> the following components.
>>>>>>
>>>>>>- *RegexTokenizer* - strips out non-word characters, results in a
>>>>>>list of tokens
>>>>>>- *StopWordsRemover* - removes common "stop words", such as
>>>>>>"the", "and", etc.
>>>>>>- *HashingTF* - assigns a numeric "hash" to each token and
>>>>>>calculates the term frequency
>>>>>>- *IDF* - computes the inverse document frequency
>>>>>>
>>>>>> After this pipeline evaluates, I'm left with a SparseVector that
>>>>>> represents the inverse document frequency of tokens for each product. As 
>>>>>> a
>>>>>> next step, I'd like to be able to compare each vector to one another, to
>>>>>> detect similarities.
>>>>>>
>>>>>> Does anybody know of a straightforward way to do this in Spark? I
>>>>>> tried creating a UDF (that used the Breeze linear algebra methods
>>>>>> internally); however, that did not scale well.
>>>>>>
>>>>>> Thanks,
>>>>>> Kevin
>>>>>>
>>>>>
>>>>
>>
>


Re: Similar Items

2016-09-20 Thread Nick Pentreath
A few options include:

https://github.com/marufaytekin/lsh-spark - I've used this a bit and it
seems quite scalable too from what I've looked at.
https://github.com/soundcloud/cosine-lsh-join-spark - not used this but
looks like it should do exactly what you need.
https://github.com/mrsqueeze/*spark*-hash
<https://github.com/mrsqueeze/spark-hash>


On Tue, 20 Sep 2016 at 18:06 Kevin Mellott <kevin.r.mell...@gmail.com>
wrote:

> Thanks for the reply, Nick! I'm typically analyzing around 30-50K products
> at a time (as an isolated set of products). Within this set of products
> (which represents all products for a particular supplier), I am also
> analyzing each category separately. The largest categories typically have
> around 10K products.
>
> That being said, when calculating IDFs for the 10K product set we come out
> with roughly 12K unique tokens. In other words, our vectors are 12K columns
> wide (although they are being represented using SparseVectors). We have a
> step that is attempting to locate all documents that share the same tokens,
> and for those items we will calculate the cosine similarity. However, the
> part that attempts to identify documents with shared tokens is the
> bottleneck.
>
> For this portion, we map our data down to the individual tokens contained
> by each document. For example:
>
> DocumentId   |   Description
>
> 
> 1   Easton Hockey Stick
> 2   Bauer Hockey Gloves
>
> In this case, we'd map to the following:
>
> (1, 'Easton')
> (1, 'Hockey')
> (1, 'Stick')
> (2, 'Bauer')
> (2, 'Hockey')
> (2, 'Gloves')
>
> Our goal is to aggregate this data as follows; however, our code that
> currently does this is does not perform well. In the realistic 12K product
> scenario, this resulted in 430K document/token tuples.
>
> ((1, 2), ['Hockey'])
>
> This then tells us that documents 1 and 2 need to be compared to one
> another (via cosine similarity) because they both contain the token
> 'hockey'. I will investigate the methods that you recommended to see if
> they may resolve our problem.
>
> Thanks,
> Kevin
>
> On Tue, Sep 20, 2016 at 1:45 AM, Nick Pentreath <nick.pentre...@gmail.com>
> wrote:
>
>> How many products do you have? How large are your vectors?
>>
>> It could be that SVD / LSA could be helpful. But if you have many
>> products then trying to compute all-pair similarity with brute force is not
>> going to be scalable. In this case you may want to investigate hashing
>> (LSH) techniques.
>>
>>
>> On Mon, 19 Sep 2016 at 22:49, Kevin Mellott <kevin.r.mell...@gmail.com>
>> wrote:
>>
>>> Hi all,
>>>
>>> I'm trying to write a Spark application that will detect similar items
>>> (in this case products) based on their descriptions. I've got an ML
>>> pipeline that transforms the product data to TF-IDF representation, using
>>> the following components.
>>>
>>>- *RegexTokenizer* - strips out non-word characters, results in a
>>>list of tokens
>>>- *StopWordsRemover* - removes common "stop words", such as "the",
>>>"and", etc.
>>>- *HashingTF* - assigns a numeric "hash" to each token and
>>>calculates the term frequency
>>>- *IDF* - computes the inverse document frequency
>>>
>>> After this pipeline evaluates, I'm left with a SparseVector that
>>> represents the inverse document frequency of tokens for each product. As a
>>> next step, I'd like to be able to compare each vector to one another, to
>>> detect similarities.
>>>
>>> Does anybody know of a straightforward way to do this in Spark? I tried
>>> creating a UDF (that used the Breeze linear algebra methods internally);
>>> however, that did not scale well.
>>>
>>> Thanks,
>>> Kevin
>>>
>>
>


Re: Is RankingMetrics' NDCG implementation correct?

2016-09-20 Thread Nick Pentreath
(cc'ing dev list also)

I think a more general version of ranking metrics that allows arbitrary
relevance scores could be useful. Ranking metrics are applicable to other
settings like search or other learning-to-rank use cases, so it should be a
little more generic than pure recommender settings.

The one issue with the proposed implementation is that it is not compatible
with the existing cross-validators within a pipeline.

As I've mentioned on the linked JIRAs & PRs, one option is to create a
special set of cross-validators for recommenders, that address the issues
of (a) dataset splitting specific to recommender settings (user-based
stratified sampling, time-based etc) and (b) ranking-based evaluation.

The other option is to have the ALSModel itself capable of generating the
"ground-truth" set within the same dataframe output from "transform" (ie
predict top k) that can be fed into the cross-validator (with
RankingEvaluator) directly. That's the approach I took so far in
https://github.com/apache/spark/pull/12574.

Both options are valid and have their positives & negatives - open to
comments / suggestions.

On Tue, 20 Sep 2016 at 06:08 Jong Wook Kim  wrote:

> Thanks for the clarification and the relevant links. I overlooked the
> comments explicitly saying that the relevance is binary.
>
> I understand that the label is not a relevance, but I have been, and I
> think many people are using the label as relevance in the implicit feedback
> context where the user-provided exact label is not defined anyway. I think
> that's why RiVal 's using the term
> "preference" for both the label for MAE and the relevance for NDCG.
>
> At the same time, I see why Spark decided to assume the relevance is
> binary, in part to conform to the class RankingMetrics's constructor. I
> think it would be nice if the upcoming DataFrame-based RankingEvaluator can
> be optionally set a "relevance column" that has non-binary relevance
> values, otherwise defaulting to either 1.0 or the label column.
>
> My extended version of RankingMetrics is here:
> https://github.com/jongwook/spark-ranking-metrics . It has a test case
> checking that the numbers are same as RiVal's.
>
> Jong Wook
>
>
>
> On 19 September 2016 at 03:13, Sean Owen  wrote:
>
>> Yes, relevance is always 1. The label is not a relevance score so
>> don't think it's valid to use it as such.
>>
>> On Mon, Sep 19, 2016 at 4:42 AM, Jong Wook Kim  wrote:
>> > Hi,
>> >
>> > I'm trying to evaluate a recommendation model, and found that Spark and
>> > Rival give different results, and it seems that Rival's one is what
>> Kaggle
>> > defines:
>> https://gist.github.com/jongwook/5d4e78290eaef22cb69abbf68b52e597
>> >
>> > Am I using RankingMetrics in a wrong way, or is Spark's implementation
>> > incorrect?
>> >
>> > To my knowledge, NDCG should be dependent on the relevance (or
>> preference)
>> > values, but Spark's implementation seems not; it uses 1.0 where it
>> should be
>> > 2^(relevance) - 1, probably assuming that relevance is all 1.0? I also
>> tried
>> > tweaking, but its method to obtain the ideal DCG also seems wrong.
>> >
>> > Any feedback from MLlib developers would be appreciated. I made a
>> > modified/extended version of RankingMetrics that produces the identical
>> > numbers to Kaggle and Rival's results, and I'm wondering if it is
>> something
>> > appropriate to be added back to MLlib.
>> >
>> > Jong Wook
>>
>
>


Re: Similar Items

2016-09-20 Thread Nick Pentreath
How many products do you have? How large are your vectors?

It could be that SVD / LSA could be helpful. But if you have many products
then trying to compute all-pair similarity with brute force is not going to
be scalable. In this case you may want to investigate hashing (LSH)
techniques.


On Mon, 19 Sep 2016 at 22:49, Kevin Mellott 
wrote:

> Hi all,
>
> I'm trying to write a Spark application that will detect similar items (in
> this case products) based on their descriptions. I've got an ML pipeline
> that transforms the product data to TF-IDF representation, using the
> following components.
>
>- *RegexTokenizer* - strips out non-word characters, results in a list
>of tokens
>- *StopWordsRemover* - removes common "stop words", such as "the",
>"and", etc.
>- *HashingTF* - assigns a numeric "hash" to each token and calculates
>the term frequency
>- *IDF* - computes the inverse document frequency
>
> After this pipeline evaluates, I'm left with a SparseVector that
> represents the inverse document frequency of tokens for each product. As a
> next step, I'd like to be able to compare each vector to one another, to
> detect similarities.
>
> Does anybody know of a straightforward way to do this in Spark? I tried
> creating a UDF (that used the Breeze linear algebra methods internally);
> however, that did not scale well.
>
> Thanks,
> Kevin
>


Re: Issues while running MLlib matrix factorization ALS algorithm

2016-09-19 Thread Nick Pentreath
Try als.setCheckpointInterval (
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.recommendation.ALS@setCheckpointInterval(checkpointInterval:Int):ALS.this.type
)

On Mon, 19 Sep 2016 at 20:01 Roshani Nagmote 
wrote:

> Hello Sean,
>
> Can you please tell me how to set checkpoint interval? I did set
> checkpointDir("hdfs:/") But if I want to reduce the default value of
> checkpoint interval which is 10. How should it be done?
>
> Sorry is its a very basic question. I am a novice in spark.
>
> Thanks,
> Roshani
>
> On Fri, Sep 16, 2016 at 11:14 AM, Roshani Nagmote <
> roshaninagmo...@gmail.com> wrote:
>
>> Hello,
>>
>> Thanks for your reply.
>>
>> Yes, Its netflix dataset. And when I get no space on device, my ‘/mnt’
>> directory gets filled up. I checked.
>>
>> /usr/lib/spark/bin/spark-submit --deploy-mode cluster --master yarn
>> --class org.apache.spark.examples.mllib.MovieLensALS --jars
>> /usr/lib/spark/examples/jars/scopt_2.11-3.3.0.jar
>> /usr/lib/spark/examples/jars/spark-examples_2.11-2.0.0.jar *--rank 32
>> --numIterations 100* --kryo s3://dataset_netflix
>>
>> When I run above command, I get following error
>>
>> Job aborted due to stage failure: Task 221 in stage 53.0 failed 4 times,
>> most recent failure: Lost task 221.3 in stage 53.0 (TID 9817, ):
>> java.io.FileNotFoundException:
>> /mnt/yarn/usercache/hadoop/appcache/application_1473786456609_0042/blockmgr-045c2dec-7765-4954-9c9a-c7452f7bd3b7/08/shuffle_168_221_0.data.b17d39a6-4d3c-4198-9e25-e19ca2b4d368
>> (No space left on device)
>>
>> I think I should not need to increase the space on device, as data is not
>> that big. So, is there any way, I can setup parameters so that it does not
>> use much disk space. I don’t know much about tuning parameters.
>>
>> It will be great if anyone can help me with this.
>>
>> Thanks,
>> Roshani
>>
>> On Sep 16, 2016, at 9:18 AM, Sean Owen  wrote:
>>
>>
>>
>>
>>
>


Re: Is RankingMetrics' NDCG implementation correct?

2016-09-19 Thread Nick Pentreath
The PR already exists for adding RankingEvaluator to ML -
https://github.com/apache/spark/pull/12461. I need to revive and review it.
DB, your review would be welcome too (and also on
https://github.com/apache/spark/issues/12574 which has implications for the
semantics of ranking metrics in the DataFrame style API).

Also see this discussion here -
https://github.com/apache/spark/pull/12461#discussion-diff-60469791 -
comment welcome.

N

On Mon, 19 Sep 2016 at 06:37 DB Tsai  wrote:

> Hi Jong,
>
> I think the definition from Kaggle is correct. I'm working on
> implementing ranking metrics in Spark ML now, but the timeline is
> unknown. Feel free to submit a PR for this in MLlib.
>
> Thanks.
>
> Sincerely,
>
> DB Tsai
> --
> Web: https://www.dbtsai.com
> PGP Key ID: 0xAF08DF8D
>
>
> On Sun, Sep 18, 2016 at 8:42 PM, Jong Wook Kim  wrote:
> > Hi,
> >
> > I'm trying to evaluate a recommendation model, and found that Spark and
> > Rival give different results, and it seems that Rival's one is what
> Kaggle
> > defines:
> https://gist.github.com/jongwook/5d4e78290eaef22cb69abbf68b52e597
> >
> > Am I using RankingMetrics in a wrong way, or is Spark's implementation
> > incorrect?
> >
> > To my knowledge, NDCG should be dependent on the relevance (or
> preference)
> > values, but Spark's implementation seems not; it uses 1.0 where it
> should be
> > 2^(relevance) - 1, probably assuming that relevance is all 1.0? I also
> tried
> > tweaking, but its method to obtain the ideal DCG also seems wrong.
> >
> > Any feedback from MLlib developers would be appreciated. I made a
> > modified/extended version of RankingMetrics that produces the identical
> > numbers to Kaggle and Rival's results, and I'm wondering if it is
> something
> > appropriate to be added back to MLlib.
> >
> > Jong Wook
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: weightCol doesn't seem to be handled properly in PySpark

2016-09-12 Thread Nick Pentreath
Could you create a JIRA ticket for it?

https://issues.apache.org/jira/browse/SPARK

On Thu, 8 Sep 2016 at 07:50 evanzamir  wrote:

> When I am trying to use LinearRegression, it seems that unless there is a
> column specified with weights, it will raise a py4j error. Seems odd
> because
> supposedly the default is weightCol=None, but when I specifically pass in
> weightCol=None to LinearRegression, I get this error.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/weightCol-doesn-t-seem-to-be-handled-properly-in-PySpark-tp27677.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: How to convert an ArrayType to DenseVector within DataFrame?

2016-09-08 Thread Nick Pentreath
You can use a udf like this:

Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.0.0
  /_/

Using Python version 2.7.12 (default, Jul  2 2016 17:43:17)
SparkSession available as 'spark'.

In [1]: from pyspark.ml.regression import LinearRegression

In [2]: from pyspark.sql.functions import udf

In [3]: from pyspark.ml.linalg import Vectors, VectorUDT

In [4]: df = spark.createDataFrame([(2.3, [1.0, 2.0, 3.0]), (6.5, [2.0,
5.0, 1.0]), (4.3, [7.0, 4.0, 2.0])], ["label", "array"])

In [5]: df.printSchema()
root
 |-- label: double (nullable = true)
 |-- array: array (nullable = true)
 ||-- element: double (containsNull = true)


In [6]: to_vector = udf(lambda a: Vectors.dense(a), VectorUDT())

In [7]: data = df.select("label", to_vector("array").alias("features"))

In [8]: data.printSchema()
root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)


In [9]: lr = LinearRegression()

In [10]: lr.fit(data).transform(data).show()

+-+-+--+
|label| features|prediction|
+-+-+--+
|  2.3|[1.0,2.0,3.0]|2.3003|
|  6.5|[2.0,5.0,1.0]|6.5036|
|  4.3|[7.0,4.0,2.0]| 4.297|
+-+-+--+


On Tue, 30 Aug 2016 at 20:45 evanzamir  wrote:

> I have a DataFrame with a column containing a list of numeric features to
> be
> used for a regression. When I run the regression, I get the following
> error:
>
> *pyspark.sql.utils.IllegalArgumentException: 'requirement failed: Column
> features must be of type org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7 but
> was actually ArrayType(DoubleType,true).'
> *
> It would be nice if Spark could automatically convert the type, but
> assuming
> that isn't possible, what's the easiest way for me to do the conversion?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-convert-an-ArrayType-to-DenseVector-within-DataFrame-tp27625.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: I noticed LinearRegression sometimes produces negative R^2 values

2016-09-06 Thread Nick Pentreath
That does seem strange. Can you provide an example to reproduce?



On Tue, 6 Sep 2016 at 21:49 evanzamir  wrote:

> Am I misinterpreting what r2() in the LinearRegression Model summary means?
> By definition, R^2 should never be a negative number!
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/I-noticed-LinearRegression-sometimes-produces-negative-R-2-values-tp27667.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark 2.0.0 - has anyone used spark ML to do predictions under 20ms?

2016-09-01 Thread Nick Pentreath
I should also point out that right now your only option is to code up your
own export functionality (or be able to read Spark's format in your serving
system), and translate that into the correct format for some other linear
algebra or ML library, and use that for serving.

On Thu, 1 Sep 2016 at 15:37 Nick Pentreath <nick.pentre...@gmail.com> wrote:

> Right now you are correct that Spark ML APIs do not support predicting on
> a single instance (whether Vector for the models or a Row for a pipeline).
>
> See https://issues.apache.org/jira/browse/SPARK-10413 and
> https://issues.apache.org/jira/browse/SPARK-16431 (duplicate) for some
> discussion.
>
> There may be movement in the short term to support the single Vector case.
> But anything for pipelines is not immediately on the horizon I'd say.
>
> N
>
>
> On Thu, 1 Sep 2016 at 15:21 Aseem Bansal <asmbans...@gmail.com> wrote:
>
>> I understand from a theoretical perspective that the model itself is not
>> distributed. Thus it can be used for making predictions for a vector or a
>> RDD. But speaking in terms of the APIs provided by spark 2.0.0 when I
>> create a model from a large data the recommended way is to use the ml
>> library for fit. I have the option of getting a
>> http://spark.apache.org/docs/latest/api/java/org/apache/spark/ml/classification/NaiveBayesModel.html
>>  or wrapping it as
>> http://spark.apache.org/docs/latest/api/java/org/apache/spark/ml/PipelineModel.html
>>
>> Both of these do not have any method which supports Vectors. How do I
>> bridge this gap in the API from my side? Is there anything in Spark's API
>> which I have missed? Or do I need to extract the parameters and use another
>> library for the predictions for a single row?
>>
>> On Thu, Sep 1, 2016 at 6:38 PM, Sean Owen <so...@cloudera.com> wrote:
>>
>>> How the model is built isn't that related to how it scores things.
>>> Here we're just talking about scoring. NaiveBayesModel can score
>>> Vector which is not a distributed entity. That's what you want to use.
>>> You do not want to use a whole distributed operation to score one
>>> record. This isn't related to .ml vs .mllib APIs.
>>>
>>> On Thu, Sep 1, 2016 at 2:01 PM, Aseem Bansal <asmbans...@gmail.com>
>>> wrote:
>>> > I understand your point.
>>> >
>>> > Is there something like a bridge? Is it possible to convert the model
>>> > trained using Dataset (i.e. the distributed one) to the one which
>>> uses
>>> > vectors? In Spark 1.6 the mllib packages had everything as per vectors
>>> and
>>> > that should be faster as per my understanding. But in many spark blogs
>>> we
>>> > saw that spark is moving towards the ml package and mllib package will
>>> be
>>> > phased out. So how can someone train using huge data and then use it
>>> on a
>>> > row by row basis?
>>> >
>>> > Thanks for your inputs.
>>> >
>>> > On Thu, Sep 1, 2016 at 6:15 PM, Sean Owen <so...@cloudera.com> wrote:
>>> >>
>>> >> If you're trying to score a single example by way of an RDD or
>>> >> Dataset, then no it will never be that fast. It's a whole distributed
>>> >> operation, and while you might manage low latency for one job at a
>>> >> time, consider what will happen when hundreds of them are running at
>>> >> once. It's just huge overkill for scoring a single example (but,
>>> >> pretty fine for high-er latency, high throughput batch operations)
>>> >>
>>> >> However if you're scoring a Vector locally I can't imagine it's that
>>> >> slow. It does some linear algebra but it's not that complicated. Even
>>> >> something unoptimized should be fast.
>>> >>
>>> >> On Thu, Sep 1, 2016 at 1:37 PM, Aseem Bansal <asmbans...@gmail.com>
>>> wrote:
>>> >> > Hi
>>> >> >
>>> >> > Currently trying to use NaiveBayes to make predictions. But facing
>>> >> > issues
>>> >> > that doing the predictions takes order of few seconds. I tried with
>>> >> > other
>>> >> > model examples shipped with Spark but they also ran in minimum of
>>> 500 ms
>>> >> > when I used Scala API. With
>>> >> >
>>> >> > Has anyone used spark ML to do predictions for a single row under
>>> 20 ms?
>>> >> >
>>> >> > I am not doing premature optimization. The use case is that we are
>>> doing
>>> >> > real time predictions and we need results 20ms. Maximum 30ms. This
>>> is a
>>> >> > hard
>>> >> > limit for our use case.
>>> >
>>> >
>>>
>>
>>


Re: Spark 2.0.0 - has anyone used spark ML to do predictions under 20ms?

2016-09-01 Thread Nick Pentreath
Right now you are correct that Spark ML APIs do not support predicting on a
single instance (whether Vector for the models or a Row for a pipeline).

See https://issues.apache.org/jira/browse/SPARK-10413 and
https://issues.apache.org/jira/browse/SPARK-16431 (duplicate) for some
discussion.

There may be movement in the short term to support the single Vector case.
But anything for pipelines is not immediately on the horizon I'd say.

N

On Thu, 1 Sep 2016 at 15:21 Aseem Bansal  wrote:

> I understand from a theoretical perspective that the model itself is not
> distributed. Thus it can be used for making predictions for a vector or a
> RDD. But speaking in terms of the APIs provided by spark 2.0.0 when I
> create a model from a large data the recommended way is to use the ml
> library for fit. I have the option of getting a
> http://spark.apache.org/docs/latest/api/java/org/apache/spark/ml/classification/NaiveBayesModel.html
>  or wrapping it as
> http://spark.apache.org/docs/latest/api/java/org/apache/spark/ml/PipelineModel.html
>
> Both of these do not have any method which supports Vectors. How do I
> bridge this gap in the API from my side? Is there anything in Spark's API
> which I have missed? Or do I need to extract the parameters and use another
> library for the predictions for a single row?
>
> On Thu, Sep 1, 2016 at 6:38 PM, Sean Owen  wrote:
>
>> How the model is built isn't that related to how it scores things.
>> Here we're just talking about scoring. NaiveBayesModel can score
>> Vector which is not a distributed entity. That's what you want to use.
>> You do not want to use a whole distributed operation to score one
>> record. This isn't related to .ml vs .mllib APIs.
>>
>> On Thu, Sep 1, 2016 at 2:01 PM, Aseem Bansal 
>> wrote:
>> > I understand your point.
>> >
>> > Is there something like a bridge? Is it possible to convert the model
>> > trained using Dataset (i.e. the distributed one) to the one which
>> uses
>> > vectors? In Spark 1.6 the mllib packages had everything as per vectors
>> and
>> > that should be faster as per my understanding. But in many spark blogs
>> we
>> > saw that spark is moving towards the ml package and mllib package will
>> be
>> > phased out. So how can someone train using huge data and then use it on
>> a
>> > row by row basis?
>> >
>> > Thanks for your inputs.
>> >
>> > On Thu, Sep 1, 2016 at 6:15 PM, Sean Owen  wrote:
>> >>
>> >> If you're trying to score a single example by way of an RDD or
>> >> Dataset, then no it will never be that fast. It's a whole distributed
>> >> operation, and while you might manage low latency for one job at a
>> >> time, consider what will happen when hundreds of them are running at
>> >> once. It's just huge overkill for scoring a single example (but,
>> >> pretty fine for high-er latency, high throughput batch operations)
>> >>
>> >> However if you're scoring a Vector locally I can't imagine it's that
>> >> slow. It does some linear algebra but it's not that complicated. Even
>> >> something unoptimized should be fast.
>> >>
>> >> On Thu, Sep 1, 2016 at 1:37 PM, Aseem Bansal 
>> wrote:
>> >> > Hi
>> >> >
>> >> > Currently trying to use NaiveBayes to make predictions. But facing
>> >> > issues
>> >> > that doing the predictions takes order of few seconds. I tried with
>> >> > other
>> >> > model examples shipped with Spark but they also ran in minimum of
>> 500 ms
>> >> > when I used Scala API. With
>> >> >
>> >> > Has anyone used spark ML to do predictions for a single row under 20
>> ms?
>> >> >
>> >> > I am not doing premature optimization. The use case is that we are
>> doing
>> >> > real time predictions and we need results 20ms. Maximum 30ms. This
>> is a
>> >> > hard
>> >> > limit for our use case.
>> >
>> >
>>
>
>


Re: Equivalent of "predict" function from LogisticRegressionWithLBFGS in OneVsRest with LogisticRegression classifier (Spark 2.0)

2016-08-29 Thread Nick Pentreath
Try this:

val df = spark.createDataFrame(Seq(Vectors.dense(Array(10, 590, 190,
700))).map(Tuple1.apply)).toDF("features")

On Sun, 28 Aug 2016 at 11:06 yaroslav  wrote:

> Hi,
>
> We use such kind of logic for training our model
>
> val model = new LogisticRegressionWithLBFGS()
>   .setNumClasses(3)
>   .run(train)
>
> Next, during spark streaming, we load model and apply incoming data to this
> model to get specific class, for example:
>
>model.predict(Vectors.dense(10, 590, 190, 700))
>
> How we could achieve the same logic for OneVsRest classification:
>
> val classifier = new LogisticRegression()
>   .setMaxIter(10)
>   .setTol(1E-6)
>   .setFitIntercept(true)
>
> val ovr = new OneVsRest().setClassifier(classifier)
> val model = ovr.fit(train)
>
> How call "predict" for this model with vector Vectors.dense(10, 590, 190,
> 700) and get class ?
>
> We try play with this:
>
> val df = spark.createDataFrame(Array((10, 590, 190, 700)))
> val pr_class = model.transform(df)
>
> but get error.
>
> Thank you.
>
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Equivalent-of-predict-function-from-LogisticRegressionWithLBFGS-in-OneVsRest-with-LogisticRegression-tp27611.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Breaking down text String into Array elements

2016-08-23 Thread Nick Pentreath
> How about something like
>
> scala> val text = (1 to 10).map(i => (i.toString,
> random_string(chars.mkString(""), 10))).toArray
>
> text: Array[(String, String)] = Array((1,FBECDoOoAC), (2,wvAyZsMZnt),
> (3,KgnwObOFEG), (4,tAZPRodrgP), (5,uSgrqyZGuc), (6,ztrTmbkOhO),
> (7,qUbQsKtZWq), (8,JDokbiFzWy), (9,vNHgiHSuUM), (10,CmnFjlHnHx))
>
> scala> sc.parallelize(text).count
> res0: Long = 10
>
> By the way not sure exactly why you need the udf registration here?
>
>
> On Tue, 23 Aug 2016 at 20:12 Mich Talebzadeh <mich.talebza...@gmail.com>
> wrote:
>
>> Hi gents,
>>
>> Well I was trying to see whether I can create an array of elements. From
>> RDD to DF, register as TempTable and store it  as a Hive table
>>
>> import scala.util.Random
>> //
>> // UDF to create a random string of charlength characters
>> //
>> def random_string(chars: String, charlength: Int) : String = {
>>   val newKey = (1 to charlength).map(
>> x =>
>> {
>>   val index = Random.nextInt(chars.length)
>>   chars(index)
>> }
>>).mkString("")
>>return newKey
>> }
>> spark.udf.register("random_string", random_string(_:String, _:Int))
>> case class columns (col1: Int, col2: String)
>> val chars = ('a' to 'z') ++ ('A' to 'Z')
>> var text = ""
>> val comma = ","
>> val terminator = "))"
>> var random_char = ""
>> for (i  <- 1 to 10) {
>> random_char = random_string(chars.mkString(""), 10)
>> if (i < 10) {text = text + """(""" + i.toString +
>> """,""""+random_char+"""")"""+comma}
>>else {text = text + """(""" + i.toString +
>> """,""""+random_char+"""")"""}
>> }
>> println(text)
>> val df = sc.parallelize((Array(text)))
>>
>>
>> Unfortunately that only sees it as the text and interprets it as text.
>>
>> I can write is easily as a shell script with ${text} passed to Array and
>> it will work. I was wondering if I could do this in Spark/Scala with my
>> limited knowledge
>>
>> Cheers
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 23 August 2016 at 19:00, Nick Pentreath <nick.pentre...@gmail.com>
>> wrote:
>>
>>> what is "text"? i.e. what is the "val text = ..." definition?
>>>
>>> If text is a String itself then indeed sc.parallelize(Array(text)) is
>>> doing the correct thing in this case.
>>>
>>>
>>> On Tue, 23 Aug 2016 at 19:42 Mich Talebzadeh <mich.talebza...@gmail.com>
>>> wrote:
>>>
>>>> I am sure someone know this :)
>>>>
>>>> Created a dynamic text string which has format
>>>>
>>>> scala> println(text)
>>>>
>>>> (1,"hNjLJEgjxn"),(2,"lgryHkVlCN"),(3,"ukswqcanVC"),(4,"ZFULVxzAsv"),(5,"LNzOozHZPF"),(6,"KZPYXTqMkY"),(7,"DVjpOvVJTw"),(8,"LKRYrrLrLh"),(9,"acheneIPDM"),(10,"iGZTrKfXNr")
>>>>
>>>> now if I do
>>>>
>>>> scala> val df =
>>>> sc.parallelize((Array((1,"hNjLJEgjxn"),(2,"lgryHkVlCN"),(3,"ukswqcanVC"),(4,"ZFULVxzAsv"),(5,"LNzOozHZPF"),(6,"KZPYXTqMkY"),(7,"DVjpOvVJTw"),(8,"LKRYrrLrLh"),(9,"acheneIPDM"),(10,"iGZTrKfXNr"
>>>> df: org.apache.spark.rdd.RDD[(Int, String)] =
>>>> ParallelCollectionRDD[230] at parallelize at :39
>>>> scala> df.count
>>>> res157: Long = 10
>>>> It shows ten Array elements, which is correct.
>>>>
>>>> Now if I pass that text into Array it only sees one row
>>>>
>>>> scala> val df = sc.parallelize((Array(text)))
>>>> df: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[228] at
>>>> parallelize at :41
>>>> scala> df.count
>>>> res158: Long = 1
>>>>
>>>> Basically it sees it as one element of array
>>>>
>>>> scala> df.first
>>>> res165: String =
>>>> (1,"hNjLJEgjxn"),(2,"lgryHkVlCN"),(3,"ukswqcanVC"),(4,"ZFULVxzAsv"),(5,"LNzOozHZPF"),(6,"KZPYXTqMkY"),(7,"DVjpOvVJTw"),(8,"LKRYrrLrLh"),(9,"acheneIPDM"),(10,"iGZTrKfXNr")
>>>> Which is not what I want.
>>>>
>>>> Any ideas?
>>>>
>>>> Thanks
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> This works fine
>>>>
>>>> Dr Mich Talebzadeh
>>>>
>>>>
>>>>
>>>> LinkedIn * 
>>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>
>>>>
>>>>
>>>> http://talebzadehmich.wordpress.com
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>
>>


Re: Breaking down text String into Array elements

2016-08-23 Thread Nick Pentreath
what is "text"? i.e. what is the "val text = ..." definition?

If text is a String itself then indeed sc.parallelize(Array(text)) is doing
the correct thing in this case.

On Tue, 23 Aug 2016 at 19:42 Mich Talebzadeh 
wrote:

> I am sure someone know this :)
>
> Created a dynamic text string which has format
>
> scala> println(text)
>
> (1,"hNjLJEgjxn"),(2,"lgryHkVlCN"),(3,"ukswqcanVC"),(4,"ZFULVxzAsv"),(5,"LNzOozHZPF"),(6,"KZPYXTqMkY"),(7,"DVjpOvVJTw"),(8,"LKRYrrLrLh"),(9,"acheneIPDM"),(10,"iGZTrKfXNr")
>
> now if I do
>
> scala> val df =
> sc.parallelize((Array((1,"hNjLJEgjxn"),(2,"lgryHkVlCN"),(3,"ukswqcanVC"),(4,"ZFULVxzAsv"),(5,"LNzOozHZPF"),(6,"KZPYXTqMkY"),(7,"DVjpOvVJTw"),(8,"LKRYrrLrLh"),(9,"acheneIPDM"),(10,"iGZTrKfXNr"
> df: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[230]
> at parallelize at :39
> scala> df.count
> res157: Long = 10
> It shows ten Array elements, which is correct.
>
> Now if I pass that text into Array it only sees one row
>
> scala> val df = sc.parallelize((Array(text)))
> df: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[228] at
> parallelize at :41
> scala> df.count
> res158: Long = 1
>
> Basically it sees it as one element of array
>
> scala> df.first
> res165: String =
> (1,"hNjLJEgjxn"),(2,"lgryHkVlCN"),(3,"ukswqcanVC"),(4,"ZFULVxzAsv"),(5,"LNzOozHZPF"),(6,"KZPYXTqMkY"),(7,"DVjpOvVJTw"),(8,"LKRYrrLrLh"),(9,"acheneIPDM"),(10,"iGZTrKfXNr")
> Which is not what I want.
>
> Any ideas?
>
> Thanks
>
>
>
>
>
>
> This works fine
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


Re: Vector size mismatch in logistic regression - Spark ML 2.0

2016-08-22 Thread Nick Pentreath
I believe it may be because of this issue (
https://issues.apache.org/jira/browse/SPARK-13030). OHE is not an estimator
- hence in cases where the number of categories differ between train and
test, it's not usable in the current form.

It's tricky to work around, though one option is to use feature hashing
instead of the StringIndexer -> OHE combo (see
https://lists.apache.org/thread.html/a7e06426fd958665985d2c4218ea2f9bf9ba136ddefe83e1ad6f1727@%3Cuser.spark.apache.org%3E
for
some details).



On Mon, 22 Aug 2016 at 03:20 janardhan shetty 
wrote:

> Thanks Krishna for your response.
> Features in the training set has more categories than test set so when
> vectorAssembler is used these numbers are usually different and I believe
> it is as expected right ?
>
> Test dataset usually will not have so many categories in their features as
> Train is the belief here.
>
> On Sun, Aug 21, 2016 at 4:44 PM, Krishna Sankar 
> wrote:
>
>> Hi,
>>Just after I sent the mail, I realized that the error might be with
>> the training-dataset not the test-dataset.
>>
>>1. it might be that you are feeding the full Y vector for training.
>>2. Which could mean, you are using ~50-50 training-test split.
>>3. Take a good look at the code that does the data split and the
>>datasets where they are allocated to.
>>
>> Cheers
>> 
>>
>> On Sun, Aug 21, 2016 at 4:37 PM, Krishna Sankar 
>> wrote:
>>
>>> Hi,
>>>   Looks like the test-dataset has different sizes for X & Y. Possible
>>> steps:
>>>
>>>1. What is the test-data-size ?
>>>   - If it is 15,909, check the prediction variable vector - it is
>>>   now 29,471, should be 15,909
>>>   - If you expect it to be 29,471, then the X Matrix is not right.
>>>   2. It is also probable that the size of the test-data is
>>>something else. If so, check the data pipeline.
>>>3. If you print the count() of the various vectors, I think you can
>>>find the error.
>>>
>>> Cheers & Good Luck
>>> 
>>>
>>> On Sun, Aug 21, 2016 at 3:16 PM, janardhan shetty <
>>> janardhan...@gmail.com> wrote:
>>>
 Hi,

 I have built the logistic regression model using training-dataset.
 When I am predicting on a test-dataset, it is throwing the below error
 of size mismatch.

 Steps done:
 1. String indexers on categorical features.
 2. One hot encoding on these indexed features.

 Any help is appreciated to resolve this issue or is it a bug ?

 SparkException: *Job aborted due to stage failure: Task 0 in stage
 635.0 failed 1 times, most recent failure: Lost task 0.0 in stage 635.0
 (TID 19421, localhost): java.lang.IllegalArgumentException: requirement
 failed: BLAS.dot(x: Vector, y:Vector) was given Vectors with non-matching
 sizes: x.size = 15909, y.size = 29471* at
 scala.Predef$.require(Predef.scala:224) at
 org.apache.spark.ml.linalg.BLAS$.dot(BLAS.scala:104) at
 org.apache.spark.ml.classification.LogisticRegressionModel$$anonfun$19.apply(LogisticRegression.scala:505)
 at 
 org.apache.spark.ml.classification.LogisticRegressionModel$$anonfun$19.apply(LogisticRegression.scala:504)
 at 
 org.apache.spark.ml.classification.LogisticRegressionModel.predictRaw(LogisticRegression.scala:594)
 at 
 org.apache.spark.ml.classification.LogisticRegressionModel.predictRaw(LogisticRegression.scala:484)
 at 
 org.apache.spark.ml.classification.ProbabilisticClassificationModel$$anonfun$1.apply(ProbabilisticClassifier.scala:112)
 at 
 org.apache.spark.ml.classification.ProbabilisticClassificationModel$$anonfun$1.apply(ProbabilisticClassifier.scala:111)
 at
 org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.evalExpr137$(Unknown
 Source) at
 org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
 Source) at
 org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
 Source) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)

>>>
>>>
>>
>


Re: Model Persistence

2016-08-18 Thread Nick Pentreath
Model metadata (mostly parameter values) are usually tiny. The parquet data
is most often for model coefficients. So this depends on the size of your
model, i.e. Your feature dimension.

A high-dimensional linear model can be quite large - but still typically
easy to fit into main memory on a single node. A high-dimensional
multi-layer perceptron with many layers could be quite a lot larger. An ALS
model with millions of users  items could be quite huge.

On Thu, 18 Aug 2016 at 18:00, Rich Tarro  wrote:

> The following Databricks blog on Model Persistence states "Internally, we
> save the model metadata and parameters as JSON and the data as Parquet."
>
>
> https://databricks.com/blog/2016/05/31/apache-spark-2-0-preview-machine-learning-model-persistence.html
>
>
> What data associated with a model or Pipeline is actually saved (in
> Parquet format)?
>
> What factors determine how large the the saved model or pipeline will be?
>
> Thanks.
> Rich
>


Re: OOM with StringIndexer, 800m rows & 56m distinct value column

2016-08-11 Thread Nick Pentreath
Ok, interesting. Would be interested to see how it compares.

By the way, the feature size you select for the hasher should be a power of
2 (e.g. 2**24 to 2**26 may be worth trying) to ensure the feature indexes
are evenly distributed (see the section on HashingTF under
http://spark.apache.org/docs/latest/ml-features.html#tf-idf).

On Thu, 11 Aug 2016 at 22:14 Ben Teeuwen <bteeu...@gmail.com> wrote:

> Thanks Nick, I played around with the hashing trick. When I set
> numFeatures to the amount of distinct values for the largest sparse
> feature, I ended up with half of them colliding. When raising the
> numFeatures to have less collisions, I soon ended up with the same memory
> problems as before. To be honest, I didn’t test the impact of having more
> or less collisions on the quality of the predictions, but tunnel visioned
> into getting it to work with the full sparsity.
>
> Before I worked in RDD land; zipWithIndex on rdd with distinct values +
> one entry ‘missing’ for missing values during predict, collectAsMap,
> broadcast the map, udf generating sparse vector, assembling the vectors
> manually). To move into dataframe land, I wrote:
>
> def getMappings(mode):
> mappings = defaultdict(dict)
> max_index = 0
> for c in cat_int[:]:# for every categorical variable
>
> logging.info("starting with {}".format(c))
> if mode == 'train':
> grouped = (df2
> .groupBy(c).count().orderBy('count', ascending = False)  #
> get counts, ordered from largest to smallest
> .selectExpr("*", "1 as n")  # prepare for window
> function summing up 1s before current row to create a RANK
> .selectExpr("*", "SUM(n) OVER (ORDER BY count DESC ROWS
> BETWEEN UNBOUNDED PRECEDING AND 0 PRECEDING) + {} AS
> index".format(max_index))
> .drop('n') # drop the column with static 1 values used for
> the cumulative sum
> )
> logging.info("Got {} rows.".format(grouped.count()))
> grouped.show()
> logging.info('getting max')
> max_index = grouped.selectExpr("MAX(index) t").rdd.map(lambda
> r: r.t).first()  # update the max index so next categorical feature starts
> with it.
> logging.info("max_index has become: {}".format(max_index))
> logging.info('adding missing value, so we also train on this
> and prediction data missing it. ')
> schema = grouped.schema
> logging.info(schema)
> grouped = grouped.union(spark.createDataFrame([('missing', 0,
> max_index + 1)], schema))  # add index for missing value for values during
> predict that are unseen during training.
> max_index += 1
> saveto = "{}/{}".format(path, c)
> logging.info("Writing to: {}".format(saveto))
> grouped.write.parquet(saveto, mode = 'overwrite')
>
> elif mode == 'predict':
> loadfrom = "{}/{}".format(path, c)
> logging.info("Reading from: {}".format(loadfrom))
> grouped = spark.read.parquet(loadfrom)
>
> logging.info("Adding to dictionary")
> mappings[c] = grouped.rdd.map(lambda r: r.asDict()).map(lambda d:
> (d[c], d['index'])).collectAsMap()  # build up dictionary to be broadcasted
> later on, used for creating sparse vectors
> max_index = grouped.selectExpr("MAX(index) t").rdd.map(lambda r:
> r.t).first()
>
> logging.info("Sanity check for indexes:")
> for c in cat_int[:]:
> logging.info("{} min: {} max: {}".format(c,
> min(mappings[c].values()), max(mappings[c].values(   # some logging to
> confirm the indexes.
> logging.info("Missing value = {}".format(mappings[c]['missing']))
> return max_index, mappings
>
> I’d love to see the StringIndexer + OneHotEncoder transformers cope with
> missing values during prediction; for now I’ll work with the hacked stuff
> above :).
> (.. and I should compare the performance with using the hashing trick.)
>
> Ben
>
>
> On Aug 4, 2016, at 3:44 PM, Nick Pentreath <nick.pentre...@gmail.com>
> wrote:
>
> Sure, I understand there are some issues with handling this missing value
> situation in StringIndexer currently. Your workaround is not ideal but I
> see that it is probably the only mechanism available currently to avoid the
> problem.
>
> But the OOM issues seem to be more about the feature cardinality (so the
> size of the hashmap to store the feature <-> index mappings).
>
> A nice property of feature hashing i

Re: Standardization with Sparse Vectors

2016-08-10 Thread Nick Pentreath
Ah right, got it. As you say for storage it helps significantly, but for
operations I suspect it puts one back in a "dense-like" position. Still,
for online / mini-batch algorithms it may still be feasible I guess.
On Wed, 10 Aug 2016 at 19:50, Sean Owen <so...@cloudera.com> wrote:

> All elements, I think. Imagine a sparse vector 1:3 3:7 which conceptually
> represents 0 3 0 7. Imagine it also has an offset stored which applies to
> all elements. If it is -2 then it now represents -2 1 -2 5, but this
> requires just one extra value to store. It only helps with storage of a
> shifted sparse vector; iterating still typically requires iterating all
> elements.
>
> Probably, where this would help, the caller can track this offset and even
> more efficiently apply this knowledge. I remember digging into this in how
> sparse covariance matrices are computed. It almost but not quite enabled an
> optimization.
>
>
> On Wed, Aug 10, 2016, 18:10 Nick Pentreath <nick.pentre...@gmail.com>
> wrote:
>
>> Sean by 'offset' do you mean basically subtracting the mean but only from
>> the non-zero elements in each row?
>> On Wed, 10 Aug 2016 at 19:02, Sean Owen <so...@cloudera.com> wrote:
>>
>>> Yeah I had thought the same, that perhaps it's fine to let the
>>> StandardScaler proceed, if it's explicitly asked to center, rather
>>> than refuse to. It's not really much more rope to let a user hang
>>> herself with, and, blocks legitimate usages (we ran into this last
>>> week and couldn't use StandardScaler as a result).
>>>
>>> I'm personally supportive of the change and don't see a JIRA. I think
>>> you could at least make one.
>>>
>>> On Wed, Aug 10, 2016 at 5:57 PM, Tobi Bosede <ani.to...@gmail.com>
>>> wrote:
>>> > Thanks Sean, I agree with 100% that the math is math and dense vs
>>> sparse is
>>> > just a matter of representation. I was trying to convince a co-worker
>>> of
>>> > this to no avail. Sending this email was mainly a sanity check.
>>> >
>>> > I think having an offset would be a great idea, although I am not sure
>>> how
>>> > to implement this. However, if anything should be done to rectify this
>>> > issue, it should be done in the standardScaler, not vectorAssembler.
>>> There
>>> > should not be any forcing of vectorAssembler to produce only dense
>>> vectors
>>> > so as to avoid performance problems with data that does not fit in
>>> memory.
>>> > Furthermore, not every machine learning algo requires standardization.
>>> > Instead, standardScaler should have withmean=True as default and should
>>> > apply an offset if the vector is sparse, whereas there would be normal
>>> > subtraction if the vector is dense. This way the default behavior of
>>> > standardScaler will always be what is generally understood to be
>>> > standardization, as opposed to people thinking they are standardizing
>>> when
>>> > they actually are not.
>>> >
>>> > Can anyone confirm whether there is a jira already?
>>> >
>>> > On Wed, Aug 10, 2016 at 10:58 AM, Sean Owen <so...@cloudera.com>
>>> wrote:
>>> >>
>>> >> Dense vs sparse is just a question of representation, so doesn't make
>>> >> an operation on a vector more or less important as a result. You've
>>> >> identified the reason that subtracting the mean can be undesirable: a
>>> >> notionally billion-element sparse vector becomes too big to fit in
>>> >> memory at once.
>>> >>
>>> >> I know this came up as a problem recently (I think there's a JIRA?)
>>> >> because VectorAssembler will *sometimes* output a small dense vector
>>> >> and sometimes output a small sparse vector based on how many zeroes
>>> >> there are. But that's bad because then the StandardScaler can't
>>> >> process the output at all. You can work on this if you're interested;
>>> >> I think the proposal was to be able to force a dense representation
>>> >> only in VectorAssembler. I don't know if that's the nature of the
>>> >> problem you're hitting.
>>> >>
>>> >> It can be meaningful to only scale the dimension without centering it,
>>> >> but it's not the same thing, no. The math is the math.
>>> >>
>>> >> This has come up a few times -- it's necessary to center a sparse
>>> &g

Re: Spark2 SBT Assembly

2016-08-10 Thread Nick Pentreath
You're correct - Spark packaging has been shifted to not use the assembly
jar.

To build now use "build/sbt package"


On Wed, 10 Aug 2016 at 19:40, Efe Selcuk  wrote:

> Hi Spark folks,
>
> With Spark 1.6 the 'assembly' target for sbt would build a fat jar with
> all of the main Spark dependencies for building an application. Against
> Spark 2, that target is no longer building a spark assembly, just ones for
> e.g. Flume and Kafka.
>
> I'm not well versed with maven and sbt, so I don't know how to go about
> figuring this out.
>
> Is this intended? Or am I missing something?
>
> Thanks.
>


Re: Standardization with Sparse Vectors

2016-08-10 Thread Nick Pentreath
Sean by 'offset' do you mean basically subtracting the mean but only from
the non-zero elements in each row?
On Wed, 10 Aug 2016 at 19:02, Sean Owen  wrote:

> Yeah I had thought the same, that perhaps it's fine to let the
> StandardScaler proceed, if it's explicitly asked to center, rather
> than refuse to. It's not really much more rope to let a user hang
> herself with, and, blocks legitimate usages (we ran into this last
> week and couldn't use StandardScaler as a result).
>
> I'm personally supportive of the change and don't see a JIRA. I think
> you could at least make one.
>
> On Wed, Aug 10, 2016 at 5:57 PM, Tobi Bosede  wrote:
> > Thanks Sean, I agree with 100% that the math is math and dense vs sparse
> is
> > just a matter of representation. I was trying to convince a co-worker of
> > this to no avail. Sending this email was mainly a sanity check.
> >
> > I think having an offset would be a great idea, although I am not sure
> how
> > to implement this. However, if anything should be done to rectify this
> > issue, it should be done in the standardScaler, not vectorAssembler.
> There
> > should not be any forcing of vectorAssembler to produce only dense
> vectors
> > so as to avoid performance problems with data that does not fit in
> memory.
> > Furthermore, not every machine learning algo requires standardization.
> > Instead, standardScaler should have withmean=True as default and should
> > apply an offset if the vector is sparse, whereas there would be normal
> > subtraction if the vector is dense. This way the default behavior of
> > standardScaler will always be what is generally understood to be
> > standardization, as opposed to people thinking they are standardizing
> when
> > they actually are not.
> >
> > Can anyone confirm whether there is a jira already?
> >
> > On Wed, Aug 10, 2016 at 10:58 AM, Sean Owen  wrote:
> >>
> >> Dense vs sparse is just a question of representation, so doesn't make
> >> an operation on a vector more or less important as a result. You've
> >> identified the reason that subtracting the mean can be undesirable: a
> >> notionally billion-element sparse vector becomes too big to fit in
> >> memory at once.
> >>
> >> I know this came up as a problem recently (I think there's a JIRA?)
> >> because VectorAssembler will *sometimes* output a small dense vector
> >> and sometimes output a small sparse vector based on how many zeroes
> >> there are. But that's bad because then the StandardScaler can't
> >> process the output at all. You can work on this if you're interested;
> >> I think the proposal was to be able to force a dense representation
> >> only in VectorAssembler. I don't know if that's the nature of the
> >> problem you're hitting.
> >>
> >> It can be meaningful to only scale the dimension without centering it,
> >> but it's not the same thing, no. The math is the math.
> >>
> >> This has come up a few times -- it's necessary to center a sparse
> >> vector but prohibitive to do so. One idea I'd toyed with in the past
> >> was to let a sparse vector have an 'offset' value applied to all
> >> elements. That would let you shift all values while preserving a
> >> sparse representation. I'm not sure if it's worth implementing but
> >> would help this case.
> >>
> >>
> >>
> >>
> >> On Wed, Aug 10, 2016 at 4:41 PM, Tobi Bosede 
> wrote:
> >> > Hi everyone,
> >> >
> >> > I am doing some standardization using standardScaler on data from
> >> > VectorAssembler which is represented as sparse vectors. I plan to fit
> a
> >> > regularized model.  However, standardScaler does not allow the mean to
> >> > be
> >> > subtracted from sparse vectors. It will only divide by the standard
> >> > deviation, which I understand is to keep the vector sparse. Thus I am
> >> > trying
> >> > to convert my sparse vectors into dense vectors, but this may not be
> >> > worthwhile.
> >> >
> >> > So my questions are:
> >> > Is subtracting the mean during standardization only important when
> >> > working
> >> > with dense vectors? Does it not matter for sparse vectors? Is just
> >> > dividing
> >> > by the standard deviation with sparse vectors equivalent to also
> >> > dividing by
> >> > standard deviation w and subtracting mean with dense vectors?
> >> >
> >> > Thank you,
> >> > Tobi
> >
> >
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: OOM with StringIndexer, 800m rows & 56m distinct value column

2016-08-04 Thread Nick Pentreath
prefix available
> at
> org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2203)
> at
> org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:176)
> at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:867)
> 16/08/04 10:34:18 WARN DFSClient: Error Recovery for block
> BP-292564-10.196.101.2-1366289936494:blk_2802150425_1105993414608 in
> pipeline 10.10.66.13:50010, 10.10.66.3:50010, 10.10.95.29:50010: bad
> datanode 10.10.66.13:50010
> 16/08/04 10:36:03 WARN DFSClient: Slow ReadProcessor read fields took
> 74146ms (threshold=3ms); ack: seqno: -2 status: SUCCESS status: SUCCESS
> status: ERROR downstreamAckTimeNanos: 0, targets: [10.10.66.3:50010,
> 10.10.66.1:50010, 10.10.95.29:50010]
> 16/08/04 10:36:03 WARN DFSClient: DFSOutputStream ResponseProcessor
> exception  for block
> BP-292564-10.196.101.2-1366289936494:blk_2802150425_1105993467488
> java.io.IOException: Bad response ERROR for block
> BP-292564-10.196.101.2-1366289936494:blk_2802150425_1105993467488 from
> datanode 10.10.95.29:50010
> at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:897)
> 16/08/04 10:36:03 WARN DFSClient: Error Recovery for block
> BP-292564-10.196.101.2-1366289936494:blk_2802150425_1105993467488 in
> pipeline 10.10.66.3:50010, 10.10.66.1:50010, 10.10.95.29:50010: bad
> datanode 10.10.95.29:50010
> 16/08/04 10:40:48 WARN DFSClient: Slow ReadProcessor read fields took
> 60891ms (threshold=3ms); ack: seqno: -2 status:
>
> 
>
> After 40 minutes or so, with no activity in the application master, it
> dies.
>
> Ben
>
> On Aug 4, 2016, at 12:14 PM, Nick Pentreath <nick.pentre...@gmail.com>
> wrote:
>
> Hi Ben
>
> Perhaps with this size cardinality it is worth looking at feature hashing
> for your problem. Spark has the HashingTF transformer that works on a
> column of "sentences" (i.e. [string]).
>
> For categorical features you can hack it a little by converting your
> feature value into a ["feature_name=feature_value"] representation. Then
> HashingTF can be used as is. Note you can also just do ["feature_value"],
> but the former would allow you, with a bit of munging, to hash all your
> feature columns at the same time.
>
> The advantage is speed and bounded memory footprint. The disadvantages
> include (i) no way to reverse the mapping from feature_index ->
> feature_name; (ii) potential for hash collisions (can be helped a bit by
> increasing your feature vector size).
>
> Here is a minimal example:
>
> In [1]: from pyspark.ml.feature import StringIndexer, OneHotEncoder,
> HashingTF
> In [2]: from pyspark.sql.types import StringType, ArrayType
> In [3]: from pyspark.sql.functions import udf
>
> In [4]: df = spark.createDataFrame([(0, "foo"), (1, "bar"), (2, "foo"),
> (3, "baz")], ["id", "feature"])
>
> In [5]: to_array = udf(lambda s: ["feature=%s" % s],
> ArrayType(StringType()))
>
> In [6]: df = df.withColumn("features", to_array("feature"))
>
> In [7]: df.show()
> +---+---+-+
> | id|feature| features|
> +---+---+-+
> |  0|foo|[feature=foo]|
> |  1|bar|[feature=bar]|
> |  2|foo|[feature=foo]|
> |  3|baz|[feature=baz]|
> +---+---+-+
>
> In [8]: indexer = StringIndexer(inputCol="feature",
> outputCol="feature_index")
>
> In [9]: indexed = indexer.fit(df).transform(df)
>
> In [10]: encoder = OneHotEncoder(dropLast=False, inputCol="feature_index",
> outputCol="feature_vector")
>
> In [11]: encoded = encoder.transform(indexed)
>
> In [12]: encoded.show()
> +---+---+-+-+--+
> | id|feature| features|feature_index|feature_vector|
> +---+---+-+-+--+
> |  0|foo|[feature=foo]|  0.0| (3,[0],[1.0])|
> |  1|bar|[feature=bar]|  2.0| (3,[2],[1.0])|
> |  2|foo|[feature=foo]|  0.0| (3,[0],[1.0])|
> |  3|baz|[feature=baz]|  1.0| (3,[1],[1.0])|
> +---+---+-+-+--+
>
> In [22]: hasher = HashingTF(numFeatures=2**8, inputCol="features",
> outputCol="features_vector")
>
> In [23]: hashed = hasher.transform(df)
>
> In [24]: hashed.show()
> +---+---+-+-+
> | id|feature| features|  features_vector|
> +---+---+-+-+
> |  0|  

Re: OOM with StringIndexer, 800m rows & 56m distinct value column

2016-08-04 Thread Nick Pentreath
Hi Ben

Perhaps with this size cardinality it is worth looking at feature hashing
for your problem. Spark has the HashingTF transformer that works on a
column of "sentences" (i.e. [string]).

For categorical features you can hack it a little by converting your
feature value into a ["feature_name=feature_value"] representation. Then
HashingTF can be used as is. Note you can also just do ["feature_value"],
but the former would allow you, with a bit of munging, to hash all your
feature columns at the same time.

The advantage is speed and bounded memory footprint. The disadvantages
include (i) no way to reverse the mapping from feature_index ->
feature_name; (ii) potential for hash collisions (can be helped a bit by
increasing your feature vector size).

Here is a minimal example:

In [1]: from pyspark.ml.feature import StringIndexer, OneHotEncoder,
HashingTF
In [2]: from pyspark.sql.types import StringType, ArrayType
In [3]: from pyspark.sql.functions import udf

In [4]: df = spark.createDataFrame([(0, "foo"), (1, "bar"), (2, "foo"), (3,
"baz")], ["id", "feature"])

In [5]: to_array = udf(lambda s: ["feature=%s" % s],
ArrayType(StringType()))

In [6]: df = df.withColumn("features", to_array("feature"))

In [7]: df.show()
+---+---+-+
| id|feature| features|
+---+---+-+
|  0|foo|[feature=foo]|
|  1|bar|[feature=bar]|
|  2|foo|[feature=foo]|
|  3|baz|[feature=baz]|
+---+---+-+

In [8]: indexer = StringIndexer(inputCol="feature",
outputCol="feature_index")

In [9]: indexed = indexer.fit(df).transform(df)

In [10]: encoder = OneHotEncoder(dropLast=False, inputCol="feature_index",
outputCol="feature_vector")

In [11]: encoded = encoder.transform(indexed)

In [12]: encoded.show()
+---+---+-+-+--+
| id|feature| features|feature_index|feature_vector|
+---+---+-+-+--+
|  0|foo|[feature=foo]|  0.0| (3,[0],[1.0])|
|  1|bar|[feature=bar]|  2.0| (3,[2],[1.0])|
|  2|foo|[feature=foo]|  0.0| (3,[0],[1.0])|
|  3|baz|[feature=baz]|  1.0| (3,[1],[1.0])|
+---+---+-+-+--+

In [22]: hasher = HashingTF(numFeatures=2**8, inputCol="features",
outputCol="features_vector")

In [23]: hashed = hasher.transform(df)

In [24]: hashed.show()
+---+---+-+-+
| id|feature| features|  features_vector|
+---+---+-+-+
|  0|foo|[feature=foo]| (256,[59],[1.0])|
|  1|bar|[feature=bar]|(256,[219],[1.0])|
|  2|foo|[feature=foo]| (256,[59],[1.0])|
|  3|baz|[feature=baz]| (256,[38],[1.0])|
+---+---+-+-+

On Thu, 4 Aug 2016 at 10:07 Ben Teeuwen  wrote:

> I raised driver memory to 30G and maxresultsize to 25G, this time in
> pyspark.
>
> *Code run:*
>
> cat_int  = ['bigfeature']
>
> stagesIndex = []
> stagesOhe   = []
> for c in cat_int:
>   stagesIndex.append(StringIndexer(inputCol=c,
> outputCol="{}Index".format(c)))
>   stagesOhe.append(OneHotEncoder(dropLast= False, inputCol =
> "{}Index".format(c), outputCol = "{}OHE".format(c)))
>
> df2 = df
>
> for i in range(len(stagesIndex)):
>   logging.info("Starting with {}".format(cat_int[i]))
>   stagesIndex[i].fit(df2)
>   logging.info("Fitted. Now transforming:")
>   df2 = stagesIndex[i].fit(df2).transform(df2)
>   logging.info("Transformed. Now showing transformed:")
>   df2.show()
>   logging.info("OHE")
>   df2 = stagesOhe[i].transform(df2)
>   logging.info("Fitted. Now showing OHE:")
>   df2.show()
>
> *Now I get error:*
>
> 2016-08-04 08:53:44,839 INFO   Starting with bigfeature
> [57/7074]
> ukStringIndexer_442b8e11e3294de9b83a
> 2016-08-04 09:06:18,147 INFO   Fitted. Now transforming:
> 16/08/04 09:10:35 WARN BlockManagerMaster: Failed to remove shuffle 3 -
> Cannot receive any reply in 120 seconds. This timeout is controlled by
> spark.rpc.askTimeout
> org.apache.spark.rpc.RpcTimeoutException: Cannot receive any reply in 120
> seconds. This timeout is controlled by spark.rpc.askTimeout
> at org.apache.spark.rpc.RpcTimeout.org
> $apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
> at
> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)
> at
> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
> at
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
> at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:216)
> at scala.util.Try$.apply(Try.scala:192)
> at scala.util.Failure.recover(Try.scala:216)
> at
> scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:326)
> at
> scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:326)
> at 

Re: [MLlib] Term Frequency in TF-IDF seems incorrect

2016-08-02 Thread Nick Pentreath
Note that both HashingTF and CountVectorizer are usually used for creating
TF-IDF normalized vectors. The definition (
https://en.wikipedia.org/wiki/Tf%E2%80%93idf#Definition) of term frequency
in TF-IDF is actually the "number of times the term occurs in the document".

So it's perhaps a bit of a misnomer, but the implementation is correct.

On Tue, 2 Aug 2016 at 05:44 Yanbo Liang  wrote:

> Hi Hao,
>
> HashingTF directly apply a hash function (Murmurhash3) to the features to
> determine their column index. It excluded any thought about the term
> frequency or the length of the document. It does similar work compared with
> sklearn FeatureHasher. The result is increased speed and reduced memory
> usage, but it does not remember what the input features looked like and can
> not convert the output back to the original features. Actually we misnamed
> this transformer, it only does the work of feature hashing rather than
> computing hashing term frequency.
>
> CountVectorizer will select the top vocabSize words ordered by term
> frequency across the corpus to build the hash table of the features. So it
> will consume more memory than HashingTF. However, we can convert the output
> back to the original feature.
>
> Both of the transformers do not consider the length of each document. If
> you want to compute term frequency divided by the length of the document,
> you should write your own function based on transformers provided by MLlib.
>
> Thanks
> Yanbo
>
> 2016-08-01 15:29 GMT-07:00 Hao Ren :
>
>> When computing term frequency, we can use either HashTF or
>> CountVectorizer feature extractors.
>> However, both of them just use the number of times that a term appears in
>> a document.
>> It is not a true frequency. Acutally, it should be divided by the length
>> of the document.
>>
>> Is this a wanted feature ?
>>
>> --
>> Hao Ren
>>
>> Data Engineer @ leboncoin
>>
>> Paris, France
>>
>
>


Re: Spark ml.ALS question -- RegressionEvaluator .evaluate giving ~1.5 output for same train and predict data

2016-07-27 Thread Nick Pentreath
This is exactly the core problem in the linked issue - normally you would
use the TrainValidationSplit or CrossValidator to do hyper-parameter
selection using cross-validation. You could tune the factor size,
regularization parameter and alpha (for implicit preference data), for
example.

Because of the NaN issue you cannot use the cross-validators currently with
ALS. So you would have to do it yourself manually (dropping the NaNs from
the prediction results as Krishna says).



On Mon, 25 Jul 2016 at 11:40 Rohit Chaddha <rohitchaddha1...@gmail.com>
wrote:

> Hi Krishna,
>
> Great .. I had no idea about this.  I tried your suggestion by using
> na.drop() and got a rmse = 1.5794048211812495
> Any suggestions how this can be reduced and the model improved ?
>
> Regards,
> Rohit
>
> On Mon, Jul 25, 2016 at 4:12 AM, Krishna Sankar <ksanka...@gmail.com>
> wrote:
>
>> Thanks Nick. I also ran into this issue.
>> VG, One workaround is to drop the NaN from predictions (df.na.drop()) and
>> then use the dataset for the evaluator. In real life, probably detect the
>> NaN and recommend most popular on some window.
>> HTH.
>> Cheers
>> 
>>
>> On Sun, Jul 24, 2016 at 12:49 PM, Nick Pentreath <
>> nick.pentre...@gmail.com> wrote:
>>
>>> It seems likely that you're running into
>>> https://issues.apache.org/jira/browse/SPARK-14489 - this occurs when
>>> the test dataset in the train/test split contains users or items that were
>>> not in the training set. Hence the model doesn't have computed factors for
>>> those ids, and ALS 'transform' currently returns NaN for those ids. This in
>>> turn results in NaN for the evaluator result.
>>>
>>> I have a PR open on that issue that will hopefully address this soon.
>>>
>>>
>>> On Sun, 24 Jul 2016 at 17:49 VG <vlin...@gmail.com> wrote:
>>>
>>>> ping. Anyone has some suggestions/advice for me .
>>>> It will be really helpful.
>>>>
>>>> VG
>>>> On Sun, Jul 24, 2016 at 12:19 AM, VG <vlin...@gmail.com> wrote:
>>>>
>>>>> Sean,
>>>>>
>>>>> I did this just to test the model. When I do a split of my data as
>>>>> training to 80% and test to be 20%
>>>>>
>>>>> I get a Root-mean-square error = NaN
>>>>>
>>>>> So I am wondering where I might be going wrong
>>>>>
>>>>> Regards,
>>>>> VG
>>>>>
>>>>> On Sun, Jul 24, 2016 at 12:12 AM, Sean Owen <so...@cloudera.com>
>>>>> wrote:
>>>>>
>>>>>> No, that's certainly not to be expected. ALS works by computing a much
>>>>>> lower-rank representation of the input. It would not reproduce the
>>>>>> input exactly, and you don't want it to -- this would be seriously
>>>>>> overfit. This is why in general you don't evaluate a model on the
>>>>>> training set.
>>>>>>
>>>>>> On Sat, Jul 23, 2016 at 7:37 PM, VG <vlin...@gmail.com> wrote:
>>>>>> > I am trying to run ml.ALS to compute some recommendations.
>>>>>> >
>>>>>> > Just to test I am using the same dataset for training using
>>>>>> ALSModel and for
>>>>>> > predicting the results based on the model .
>>>>>> >
>>>>>> > When I evaluate the result using RegressionEvaluator I get a
>>>>>> > Root-mean-square error = 1.5544064263236066
>>>>>> >
>>>>>> > I thin this should be 0. Any suggestions what might be going wrong.
>>>>>> >
>>>>>> > Regards,
>>>>>> > Vipul
>>>>>>
>>>>>
>>>>>
>>


Re: Spark ml.ALS question -- RegressionEvaluator .evaluate giving ~1.5 output for same train and predict data

2016-07-24 Thread Nick Pentreath
Good suggestion Krishna

One issue is that this doesn't work with TrainValidationSplit or
CrossValidator for parameter tuning. Hence my solution in the PR which
makes it work with the cross-validators.

On Mon, 25 Jul 2016 at 00:42, Krishna Sankar <ksanka...@gmail.com> wrote:

> Thanks Nick. I also ran into this issue.
> VG, One workaround is to drop the NaN from predictions (df.na.drop()) and
> then use the dataset for the evaluator. In real life, probably detect the
> NaN and recommend most popular on some window.
> HTH.
> Cheers
> 
>
> On Sun, Jul 24, 2016 at 12:49 PM, Nick Pentreath <nick.pentre...@gmail.com
> > wrote:
>
>> It seems likely that you're running into
>> https://issues.apache.org/jira/browse/SPARK-14489 - this occurs when the
>> test dataset in the train/test split contains users or items that were not
>> in the training set. Hence the model doesn't have computed factors for
>> those ids, and ALS 'transform' currently returns NaN for those ids. This in
>> turn results in NaN for the evaluator result.
>>
>> I have a PR open on that issue that will hopefully address this soon.
>>
>>
>> On Sun, 24 Jul 2016 at 17:49 VG <vlin...@gmail.com> wrote:
>>
>>> ping. Anyone has some suggestions/advice for me .
>>> It will be really helpful.
>>>
>>> VG
>>> On Sun, Jul 24, 2016 at 12:19 AM, VG <vlin...@gmail.com> wrote:
>>>
>>>> Sean,
>>>>
>>>> I did this just to test the model. When I do a split of my data as
>>>> training to 80% and test to be 20%
>>>>
>>>> I get a Root-mean-square error = NaN
>>>>
>>>> So I am wondering where I might be going wrong
>>>>
>>>> Regards,
>>>> VG
>>>>
>>>> On Sun, Jul 24, 2016 at 12:12 AM, Sean Owen <so...@cloudera.com> wrote:
>>>>
>>>>> No, that's certainly not to be expected. ALS works by computing a much
>>>>> lower-rank representation of the input. It would not reproduce the
>>>>> input exactly, and you don't want it to -- this would be seriously
>>>>> overfit. This is why in general you don't evaluate a model on the
>>>>> training set.
>>>>>
>>>>> On Sat, Jul 23, 2016 at 7:37 PM, VG <vlin...@gmail.com> wrote:
>>>>> > I am trying to run ml.ALS to compute some recommendations.
>>>>> >
>>>>> > Just to test I am using the same dataset for training using ALSModel
>>>>> and for
>>>>> > predicting the results based on the model .
>>>>> >
>>>>> > When I evaluate the result using RegressionEvaluator I get a
>>>>> > Root-mean-square error = 1.5544064263236066
>>>>> >
>>>>> > I thin this should be 0. Any suggestions what might be going wrong.
>>>>> >
>>>>> > Regards,
>>>>> > Vipul
>>>>>
>>>>
>>>>
>


Re: Spark ml.ALS question -- RegressionEvaluator .evaluate giving ~1.5 output for same train and predict data

2016-07-24 Thread Nick Pentreath
It seems likely that you're running into
https://issues.apache.org/jira/browse/SPARK-14489 - this occurs when the
test dataset in the train/test split contains users or items that were not
in the training set. Hence the model doesn't have computed factors for
those ids, and ALS 'transform' currently returns NaN for those ids. This in
turn results in NaN for the evaluator result.

I have a PR open on that issue that will hopefully address this soon.


On Sun, 24 Jul 2016 at 17:49 VG  wrote:

> ping. Anyone has some suggestions/advice for me .
> It will be really helpful.
>
> VG
> On Sun, Jul 24, 2016 at 12:19 AM, VG  wrote:
>
>> Sean,
>>
>> I did this just to test the model. When I do a split of my data as
>> training to 80% and test to be 20%
>>
>> I get a Root-mean-square error = NaN
>>
>> So I am wondering where I might be going wrong
>>
>> Regards,
>> VG
>>
>> On Sun, Jul 24, 2016 at 12:12 AM, Sean Owen  wrote:
>>
>>> No, that's certainly not to be expected. ALS works by computing a much
>>> lower-rank representation of the input. It would not reproduce the
>>> input exactly, and you don't want it to -- this would be seriously
>>> overfit. This is why in general you don't evaluate a model on the
>>> training set.
>>>
>>> On Sat, Jul 23, 2016 at 7:37 PM, VG  wrote:
>>> > I am trying to run ml.ALS to compute some recommendations.
>>> >
>>> > Just to test I am using the same dataset for training using ALSModel
>>> and for
>>> > predicting the results based on the model .
>>> >
>>> > When I evaluate the result using RegressionEvaluator I get a
>>> > Root-mean-square error = 1.5544064263236066
>>> >
>>> > I thin this should be 0. Any suggestions what might be going wrong.
>>> >
>>> > Regards,
>>> > Vipul
>>>
>>
>>


Re: Deploying ML Pipeline Model

2016-07-05 Thread Nick Pentreath
It all depends on your latency requirements and volume. 100s of queries per
minute, with an acceptable latency of up to a few seconds? Yes, you could
use Spark for serving, especially if you're smart about caching results
(and I don't mean just Spark caching, but caching recommendation results
for example similar items etc).

However for many serving use cases using a Spark cluster is too much
overhead. Bear in mind real-world serving of many models (recommendations,
ad-serving, fraud etc) is one component of a complex workflow (e.g. one
page request in ad tech cases involves tens of requests and hops between
various ad servers and exchanges). That is why often the practical latency
bounds are < 100ms (or way, way tighter for ad serving for example).


On Fri, 1 Jul 2016 at 21:59 Saurabh Sardeshpande <saurabh...@gmail.com>
wrote:

> Hi Nick,
>
> Thanks for the answer. Do you think an implementation like the one in this
> article is infeasible in production for say, hundreds of queries per
> minute?
> https://www.codementor.io/spark/tutorial/building-a-web-service-with-apache-spark-flask-example-app-part2.
> The article uses Flask to define routes and Spark for evaluating requests.
>
> Regards,
> Saurabh
>
>
>
>
>
>
> On Fri, Jul 1, 2016 at 10:47 AM, Nick Pentreath <nick.pentre...@gmail.com>
> wrote:
>
>> Generally there are 2 ways to use a trained pipeline model - (offline)
>> batch scoring, and real-time online scoring.
>>
>> For batch (or even "mini-batch" e.g. on Spark streaming data), then yes
>> certainly loading the model back in Spark and feeding new data through the
>> pipeline for prediction works just fine, and this is essentially what is
>> supported in 1.6 (and more or less full coverage in 2.0). For large batch
>> cases this can be quite efficient.
>>
>> However, usually for real-time use cases, the latency required is fairly
>> low - of the order of a few ms to a few 100ms for a request (some examples
>> include recommendations, ad-serving, fraud detection etc).
>>
>> In these cases, using Spark has 2 issues: (1) latency for prediction on
>> the pipeline, which is based on DataFrames and therefore distributed
>> execution, is usually fairly high "per request"; (2) this requires pulling
>> in all of Spark for your real-time serving layer (or running a full Spark
>> cluster), which is usually way too much overkill - all you really need for
>> serving is a bit of linear algebra and some basic transformations.
>>
>> So for now, unfortunately there is not much in the way of options for
>> exporting your pipelines and serving them outside of Spark - the
>> JPMML-based project mentioned on this thread is one option. The other
>> option at this point is to write your own export functionality and your own
>> serving layer.
>>
>> There is (very initial) movement towards improving the local serving
>> possibilities (see https://issues.apache.org/jira/browse/SPARK-13944 which
>> was the "first step" in this process).
>>
>> On Fri, 1 Jul 2016 at 19:24 Jacek Laskowski <ja...@japila.pl> wrote:
>>
>>> Hi Rishabh,
>>>
>>> I've just today had similar conversation about how to do a ML Pipeline
>>> deployment and couldn't really answer this question and more because I
>>> don't really understand the use case.
>>>
>>> What would you expect from ML Pipeline model deployment? You can save
>>> your model to a file by model.write.overwrite.save("model_v1").
>>>
>>> model_v1
>>> |-- metadata
>>> |   |-- _SUCCESS
>>> |   `-- part-0
>>> `-- stages
>>> |-- 0_regexTok_b4265099cc1c
>>> |   `-- metadata
>>> |   |-- _SUCCESS
>>> |   `-- part-0
>>> |-- 1_hashingTF_8de997cf54ba
>>> |   `-- metadata
>>> |   |-- _SUCCESS
>>> |   `-- part-0
>>> `-- 2_linReg_3942a71d2c0e
>>> |-- data
>>> |   |-- _SUCCESS
>>> |   |-- _common_metadata
>>> |   |-- _metadata
>>> |   `--
>>> part-r-0-2096c55a-d654-42b2-90d3-5a310101cba5.gz.parquet
>>> `-- metadata
>>> |-- _SUCCESS
>>> `-- part-0
>>>
>>> 9 directories, 12 files
>>>
>>> What would you like to have outside SparkContext? What's wrong with
>>> using Spark? Just curious hoping to understand the use case better.
>>> Thanks.
>>>
>>> Pozdrawiam,
>>> Jacek Laskowski
>>>

Re: Deploying ML Pipeline Model

2016-07-05 Thread Nick Pentreath
Sean is correct - we now use jpmml-model (which is actually BSD 3-clause,
where old jpmml was A2L, but either work)

On Fri, 1 Jul 2016 at 21:40 Sean Owen <so...@cloudera.com> wrote:

> (The more core JPMML libs are Apache 2; OpenScoring is AGPL. We use
> JPMML in Spark and couldn't otherwise because the Affero license is
> not Apache compatible.)
>
> On Fri, Jul 1, 2016 at 8:16 PM, Nick Pentreath <nick.pentre...@gmail.com>
> wrote:
> > I believe open-scoring is one of the well-known PMML serving frameworks
> in
> > Java land (https://github.com/jpmml/openscoring). One can also use the
> raw
> > https://github.com/jpmml/jpmml-evaluator for embedding in apps.
> >
> > (Note the license on both of these is AGPL - the older version of JPMML
> used
> > to be Apache2 if I recall correctly).
> >
>


Re: Deploying ML Pipeline Model

2016-07-01 Thread Nick Pentreath
I believe open-scoring is one of the well-known PMML serving frameworks in
Java land (https://github.com/jpmml/openscoring). One can also use the raw
https://github.com/jpmml/jpmml-evaluator for embedding in apps.

(Note the license on both of these is AGPL - the older version of JPMML
used to be Apache2 if I recall correctly).

On Fri, 1 Jul 2016 at 20:15 Jacek Laskowski <ja...@japila.pl> wrote:

> Hi Nick,
>
> Thanks a lot for the exhaustive and prompt response! (In the meantime
> I watched a video about PMML to get a better understanding of the
> topic).
>
> What are the tools that could "consume" PMML exports (after running
> JPMML)? What tools would be the endpoint to deliver low-latency
> predictions by doing this "a bit of linear algebra and some basic
> transformations"?
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark http://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
>
> On Fri, Jul 1, 2016 at 6:47 PM, Nick Pentreath <nick.pentre...@gmail.com>
> wrote:
> > Generally there are 2 ways to use a trained pipeline model - (offline)
> batch
> > scoring, and real-time online scoring.
> >
> > For batch (or even "mini-batch" e.g. on Spark streaming data), then yes
> > certainly loading the model back in Spark and feeding new data through
> the
> > pipeline for prediction works just fine, and this is essentially what is
> > supported in 1.6 (and more or less full coverage in 2.0). For large batch
> > cases this can be quite efficient.
> >
> > However, usually for real-time use cases, the latency required is fairly
> low
> > - of the order of a few ms to a few 100ms for a request (some examples
> > include recommendations, ad-serving, fraud detection etc).
> >
> > In these cases, using Spark has 2 issues: (1) latency for prediction on
> the
> > pipeline, which is based on DataFrames and therefore distributed
> execution,
> > is usually fairly high "per request"; (2) this requires pulling in all of
> > Spark for your real-time serving layer (or running a full Spark cluster),
> > which is usually way too much overkill - all you really need for serving
> is
> > a bit of linear algebra and some basic transformations.
> >
> > So for now, unfortunately there is not much in the way of options for
> > exporting your pipelines and serving them outside of Spark - the
> JPMML-based
> > project mentioned on this thread is one option. The other option at this
> > point is to write your own export functionality and your own serving
> layer.
> >
> > There is (very initial) movement towards improving the local serving
> > possibilities (see https://issues.apache.org/jira/browse/SPARK-13944
> which
> > was the "first step" in this process).
> >
> > On Fri, 1 Jul 2016 at 19:24 Jacek Laskowski <ja...@japila.pl> wrote:
> >>
> >> Hi Rishabh,
> >>
> >> I've just today had similar conversation about how to do a ML Pipeline
> >> deployment and couldn't really answer this question and more because I
> >> don't really understand the use case.
> >>
> >> What would you expect from ML Pipeline model deployment? You can save
> >> your model to a file by model.write.overwrite.save("model_v1").
> >>
> >> model_v1
> >> |-- metadata
> >> |   |-- _SUCCESS
> >> |   `-- part-0
> >> `-- stages
> >> |-- 0_regexTok_b4265099cc1c
> >> |   `-- metadata
> >> |   |-- _SUCCESS
> >> |   `-- part-0
> >> |-- 1_hashingTF_8de997cf54ba
> >> |   `-- metadata
> >> |   |-- _SUCCESS
> >> |   `-- part-0
> >> `-- 2_linReg_3942a71d2c0e
> >> |-- data
> >> |   |-- _SUCCESS
> >> |   |-- _common_metadata
> >> |   |-- _metadata
> >> |   `--
> >> part-r-0-2096c55a-d654-42b2-90d3-5a310101cba5.gz.parquet
> >> `-- metadata
> >> |-- _SUCCESS
> >> `-- part-0
> >>
> >> 9 directories, 12 files
> >>
> >> What would you like to have outside SparkContext? What's wrong with
> >> using Spark? Just curious hoping to understand the use case better.
> >> Thanks.
> >>
> >> Pozdrawiam,
> >> Jacek Laskowski
> >> 
> >> https://medium.com/@jaceklaskowski/
> >> Mastering Apache Spark http://bit.ly/mastering-apach

Re: Deploying ML Pipeline Model

2016-07-01 Thread Nick Pentreath
Generally there are 2 ways to use a trained pipeline model - (offline)
batch scoring, and real-time online scoring.

For batch (or even "mini-batch" e.g. on Spark streaming data), then yes
certainly loading the model back in Spark and feeding new data through the
pipeline for prediction works just fine, and this is essentially what is
supported in 1.6 (and more or less full coverage in 2.0). For large batch
cases this can be quite efficient.

However, usually for real-time use cases, the latency required is fairly
low - of the order of a few ms to a few 100ms for a request (some examples
include recommendations, ad-serving, fraud detection etc).

In these cases, using Spark has 2 issues: (1) latency for prediction on the
pipeline, which is based on DataFrames and therefore distributed execution,
is usually fairly high "per request"; (2) this requires pulling in all of
Spark for your real-time serving layer (or running a full Spark cluster),
which is usually way too much overkill - all you really need for serving is
a bit of linear algebra and some basic transformations.

So for now, unfortunately there is not much in the way of options for
exporting your pipelines and serving them outside of Spark - the
JPMML-based project mentioned on this thread is one option. The other
option at this point is to write your own export functionality and your own
serving layer.

There is (very initial) movement towards improving the local serving
possibilities (see https://issues.apache.org/jira/browse/SPARK-13944 which
was the "first step" in this process).

On Fri, 1 Jul 2016 at 19:24 Jacek Laskowski  wrote:

> Hi Rishabh,
>
> I've just today had similar conversation about how to do a ML Pipeline
> deployment and couldn't really answer this question and more because I
> don't really understand the use case.
>
> What would you expect from ML Pipeline model deployment? You can save
> your model to a file by model.write.overwrite.save("model_v1").
>
> model_v1
> |-- metadata
> |   |-- _SUCCESS
> |   `-- part-0
> `-- stages
> |-- 0_regexTok_b4265099cc1c
> |   `-- metadata
> |   |-- _SUCCESS
> |   `-- part-0
> |-- 1_hashingTF_8de997cf54ba
> |   `-- metadata
> |   |-- _SUCCESS
> |   `-- part-0
> `-- 2_linReg_3942a71d2c0e
> |-- data
> |   |-- _SUCCESS
> |   |-- _common_metadata
> |   |-- _metadata
> |   `--
> part-r-0-2096c55a-d654-42b2-90d3-5a310101cba5.gz.parquet
> `-- metadata
> |-- _SUCCESS
> `-- part-0
>
> 9 directories, 12 files
>
> What would you like to have outside SparkContext? What's wrong with
> using Spark? Just curious hoping to understand the use case better.
> Thanks.
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark http://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
>
> On Fri, Jul 1, 2016 at 12:54 PM, Rishabh Bhardwaj 
> wrote:
> > Hi All,
> >
> > I am looking for ways to deploy a ML Pipeline model in production .
> > Spark has already proved to be a one of the best framework for model
> > training and creation, but once the ml pipeline model is ready how can I
> > deploy it outside spark context ?
> > MLlib model has toPMML method but today Pipeline model can not be saved
> to
> > PMML. There are some frameworks like MLeap which are trying to abstract
> > Pipeline Model and provide ML Pipeline Model deployment outside spark
> > context,but currently they don't have most of the ml transformers and
> > estimators.
> > I am looking for related work going on this area.
> > Any pointers will be helpful.
> >
> > Thanks,
> > Rishabh.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Performance issue with spark ml model to make single predictions on server side

2016-06-24 Thread Nick Pentreath
Currently, spark-ml models and pipelines are only usable in Spark. This
means you must use Spark's machinery (and pull in all its dependencies) to
do model serving. Also currently there is no fast "predict" method for a
single Vector instance.

So for now, you are best off going with PMML, or exporting your model in
your own custom format, and re-loading it into your own custom format for
serving. You can also take a look at PredictionIO (https://prediction.io/)
for another serving option, or TensorFlow serving (
https://tensorflow.github.io/serving/).

On Thu, 23 Jun 2016 at 13:40 philippe v  wrote:

> Hello,
>
> I trained a linear regression model with spark-ml. I serialized the model
> pipeline with classical java serialization. Then I loaded it in a
> webservice
> to compute predictions.
>
> For each request recieved by the webservice I create a 1 row dataframe to
> compute that prediction.
>
> Probleme is that it take too much time
>
> Is there some good practices to do that kind of stuff ?
>
> I could export all model's coeffs with PMML and make computations in pure
> java but I keep it in last resort.
>
> Does any one have some hints to increase performances ?
>
> Philippe
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Performance-issue-with-spark-ml-model-to-make-single-predictions-on-server-side-tp27217.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark ml and PMML export

2016-06-23 Thread Nick Pentreath
Currently there is no way within Spark itself. You may want to check out
this issue (https://issues.apache.org/jira/browse/SPARK-11171) and here is
an external project working on it (https://github.com/jpmml/jpmml-sparkml),
that covers quite a number of transformers and models (but not all).


On Thu, 23 Jun 2016 at 02:37 jayantshekhar  wrote:

> I have the same question on Spark ML and PMML export as Philippe.
>
> Is there a way to export Spark ML generated models to PMML?
>
> Jayant
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-ml-and-PMML-export-tp26773p27213.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Classpath hell and Elasticsearch 2.3.2...

2016-06-02 Thread Nick Pentreath
Fair enough.

However, if you take a look at the deployment guide (
http://spark.apache.org/docs/latest/submitting-applications.html#bundling-your-applications-dependencies)
you will see that the generally advised approach is to package your app
dependencies into a fat JAR and submit (possibly using the --jars option
too). This also means you specify the Scala and other library versions in
your project pom.xml or sbt file, avoiding having to manually decide which
artefact to include on your classpath  :)

On Thu, 2 Jun 2016 at 16:06 Kevin Burton <bur...@spinn3r.com> wrote:

> Yeah.. thanks Nick. Figured that out since your last email... I deleted
> the 2.10 by accident but then put 2+2 together.
>
> Got it working now.
>
> Still sticking to my story that it's somewhat complicated to setup :)
>
> Kevin
>
> On Thu, Jun 2, 2016 at 3:59 PM, Nick Pentreath <nick.pentre...@gmail.com>
> wrote:
>
>> Which Scala version is Spark built against? I'd guess it's 2.10 since
>> you're using spark-1.6, and you're using the 2.11 jar for es-hadoop.
>>
>>
>> On Thu, 2 Jun 2016 at 15:50 Kevin Burton <bur...@spinn3r.com> wrote:
>>
>>> Thanks.
>>>
>>> I'm trying to run it in a standalone cluster with an existing / large
>>> 100 node ES install.
>>>
>>> I'm using the standard 1.6.1 -2.6 distribution with
>>> elasticsearch-hadoop-2.3.2...
>>>
>>> I *think* I'm only supposed to use the
>>> elasticsearch-spark_2.11-2.3.2.jar with it...
>>>
>>> but now I get the following exception:
>>>
>>>
>>> java.lang.NoSuchMethodError:
>>> scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object;
>>> at org.elasticsearch.spark.rdd.EsSpark$.saveToEs(EsSpark.scala:52)
>>> at
>>> org.elasticsearch.spark.package$SparkRDDFunctions.saveToEs(package.scala:37)
>>> at
>>> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:40)
>>> at
>>> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:45)
>>> at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:47)
>>> at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:49)
>>> at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:51)
>>> at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:53)
>>> at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:55)
>>> at $iwC$$iwC$$iwC$$iwC$$iwC.(:57)
>>> at $iwC$$iwC$$iwC$$iwC.(:59)
>>> at $iwC$$iwC$$iwC.(:61)
>>> at $iwC$$iwC.(:63)
>>> at $iwC.(:65)
>>> at (:67)
>>> at .(:71)
>>> at .()
>>> at .(:7)
>>> at .()
>>> at $print()
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:497)
>>> at
>>> org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
>>> at
>>> org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
>>> at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
>>> at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
>>> at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
>>> at
>>> org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
>>> at
>>> org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
>>> at
>>> org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:875)
>>> at
>>> org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
>>> at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
>>> at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
>>> at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
>>> at org.apache.spark.repl.SparkILoop.org
>>> $apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
>>> at
>>> org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
>>> at
>>> org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
>>> at
>>> org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
>>> at
>>> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
>>&g

Re: Classpath hell and Elasticsearch 2.3.2...

2016-06-02 Thread Nick Pentreath
Which Scala version is Spark built against? I'd guess it's 2.10 since
you're using spark-1.6, and you're using the 2.11 jar for es-hadoop.


On Thu, 2 Jun 2016 at 15:50 Kevin Burton <bur...@spinn3r.com> wrote:

> Thanks.
>
> I'm trying to run it in a standalone cluster with an existing / large 100
> node ES install.
>
> I'm using the standard 1.6.1 -2.6 distribution with
> elasticsearch-hadoop-2.3.2...
>
> I *think* I'm only supposed to use the
> elasticsearch-spark_2.11-2.3.2.jar with it...
>
> but now I get the following exception:
>
>
> java.lang.NoSuchMethodError:
> scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object;
> at org.elasticsearch.spark.rdd.EsSpark$.saveToEs(EsSpark.scala:52)
> at
> org.elasticsearch.spark.package$SparkRDDFunctions.saveToEs(package.scala:37)
> at
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:40)
> at
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:45)
> at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:47)
> at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:49)
> at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:51)
> at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:53)
> at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:55)
> at $iwC$$iwC$$iwC$$iwC$$iwC.(:57)
> at $iwC$$iwC$$iwC$$iwC.(:59)
> at $iwC$$iwC$$iwC.(:61)
> at $iwC$$iwC.(:63)
> at $iwC.(:65)
> at (:67)
> at .(:71)
> at .()
> at .(:7)
> at .()
> at $print()
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at
> org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
> at
> org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
> at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
> at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
> at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
> at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
> at
> org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
> at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:875)
> at
> org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
> at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
> at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
> at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
> at org.apache.spark.repl.SparkILoop.org
> $apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
> at
> org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
> at
> org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
> at
> org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
> at
> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
> at org.apache.spark.repl.SparkILoop.org
> $apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
> at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
> at org.apache.spark.repl.Main$.main(Main.scala:31)
> at org.apache.spark.repl.Main.main(Main.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
>
> On Thu, Jun 2, 2016 at 3:45 PM, Nick Pentreath <nick.pentre...@gmail.com>
> wrote:
>
>> Hey there
>>
>> When I used es-hadoop, I just pulled in the dependency into my pom.xml,
>> with spark as a "provided" dependency, and built a fat jar with assembly.
>>
>> Then with spark-submit use the --jars option to include your assembly jar
>> (IIRC I sometimes also needed to use --driver-classpath too, but perhaps
>> not with recent Spark versions).
>>
>>
>>
>> On Thu, 2 Jun 2016 at 15:

Re: Classpath hell and Elasticsearch 2.3.2...

2016-06-02 Thread Nick Pentreath
Hey there

When I used es-hadoop, I just pulled in the dependency into my pom.xml,
with spark as a "provided" dependency, and built a fat jar with assembly.

Then with spark-submit use the --jars option to include your assembly jar
(IIRC I sometimes also needed to use --driver-classpath too, but perhaps
not with recent Spark versions).



On Thu, 2 Jun 2016 at 15:34 Kevin Burton  wrote:

> I'm trying to get spark 1.6.1 to work with 2.3.2... needless to say it's
> not super easy.
>
> I wish there was an easier way to get this stuff to work.. Last time I
> tried to use spark more I was having similar problems with classpath setup
> and Cassandra.
>
> Seems a huge opportunity to make this easier for new developers.  This
> stuff isn't rocket science but it can (needlessly) waste a ton of time.
>
> ... anyway... I'm have since figured out I have to specific *specific*
> jars from the elasticsearch-hadoop distribution and use those.
>
> Right now I'm using :
>
>
> SPARK_CLASSPATH=/usr/share/elasticsearch-hadoop/lib/elasticsearch-hadoop-2.3.2.jar:/usr/share/elasticsearch-hadoop/lib/elasticsearch-spark_2.11-2.3.2.jar:/usr/share/elasticsearch-hadoop/lib/elasticsearch-hadoop-mr-2.3.2.jar:/usr/share/apache-spark/lib/*
>
> ... but I"m getting:
>
> java.lang.NoClassDefFoundError: Could not initialize class
> org.elasticsearch.hadoop.util.Version
> at
> org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:376)
> at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:40)
> at
> org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
> at
> org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>
> ... but I think its caused by this:
>
> 16/06/03 00:26:48 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,
> localhost): java.lang.Error: Multiple ES-Hadoop versions detected in the
> classpath; please use only one
> jar:file:/usr/share/elasticsearch-hadoop/lib/elasticsearch-hadoop-2.3.2.jar
>
> jar:file:/usr/share/elasticsearch-hadoop/lib/elasticsearch-spark_2.11-2.3.2.jar
>
> jar:file:/usr/share/elasticsearch-hadoop/lib/elasticsearch-hadoop-mr-2.3.2.jar
>
> at org.elasticsearch.hadoop.util.Version.(Version.java:73)
> at
> org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:376)
> at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:40)
> at
> org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
> at
> org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> .. still tracking this down but was wondering if there is someting obvious
> I'm dong wrong.  I'm going to take out elasticsearch-hadoop-2.3.2.jar and
> try again.
>
> Lots of trial and error here :-/
>
> Kevin
>
> --
>
> We’re hiring if you know of any awesome Java Devops or Linux Operations
> Engineers!
>
> Founder/CEO Spinn3r.com
> Location: *San Francisco, CA*
> blog: http://burtonator.wordpress.com
> … or check out my Google+ profile
> 
>
>


Re: Addign a new column to a dataframe (based on value of existing column)

2016-04-28 Thread Nick Pentreath
This should work:

scala> val df = Seq((25.0, "foo"), (30.0, "bar")).toDF("age", "name")
scala> df.withColumn("AgeInt", when(col("age") > 29.0, 1).otherwise(0)).show
+++--+
| age|name|AgeInt|
 +++--+
|25.0| foo| 0|
 |30.0| bar| 1|
 +++--+

On Thu, 28 Apr 2016 at 20:45 Marco Mistroni  wrote:

> HI all
>  i have a dataFrame with a column ("Age", type double) and i am trying to
> create a new
> column based on the value of the Age column, using Scala API
>
> this code keeps on complaining
>
> scala> df.withColumn("AgeInt", if (df("Age") > 29.0) lit(1) else lit(0))
> :28: error: type mismatch;
>  found   : org.apache.spark.sql.Column
>  required: Boolean
>   df.withColumn("AgeInt", if (df("Age") > 29.0) lit(1) else
> lit(0))
>
> any suggestions?
>
> kind regars
>  marco
>


Re: VectorAssembler handling null values

2016-04-19 Thread Nick Pentreath
Could you provide an example of what your input data looks like? Supporting
missing values in a sparse result vector makes sense.
On Tue, 19 Apr 2016 at 23:55, Andres Perez  wrote:

> Hi everyone. org.apache.spark.ml.feature.VectorAssembler currently cannot
> handle null values. This presents a problem for us as we wish to run a
> decision tree classifier on sometimes sparse data. Is there a particular
> reason VectorAssembler is implemented in this way, and can anyone recommend
> the best path for enabling VectorAssembler to build vectors for data that
> will contain empty values?
>
> Thanks!
>
> -Andres
>
>


Re: [ML] Training with bias

2016-04-12 Thread Nick Pentreath
Are you referring to fitting the intercept term? You can use
lr.setFitIntercept (though it is true by default):

scala> lr.explainParam(lr.fitIntercept)
res27: String = fitIntercept: whether to fit an intercept term (default:
true)

On Mon, 11 Apr 2016 at 21:59 Daniel Siegmann 
wrote:

> I'm trying to understand how I can add a bias when training in Spark. I
> have only a vague familiarity with this subject, so I hope this question
> will be clear enough.
>
> Using liblinear a bias can be set - if it's >= 0, there will be an
> additional weight appended in the model, and predicting with that model
> will automatically append a feature for the bias.
>
> Is there anything similar in Spark, such as for logistic regression? The
> closest thing I can find is MLUtils.appendBias, but this seems to require
> manual work on both the training and scoring side. I was hoping for
> something that would just be part of the model.
>
>
> ~Daniel Siegmann
>


Re: HashingTF "compatibility" across Python, Scala?

2016-04-12 Thread Nick Pentreath
I should point out that actually the "ml" version of HashingTF does call
into Java so that will be consistent across Python and Java.

It's the "mllib" version in PySpark that implements its own version using
Pythons "hash" function (while Java uses Object.hashCode).

On Thu, 7 Apr 2016 at 18:19 Nick Pentreath <nick.pentre...@gmail.com> wrote:

> You're right Sean, the implementation depends on hash code currently so
> may differ. I opened a JIRA (which duplicated this one -
> https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-10574
> which is the active JIRA), for using murmurhash3 which should then be
> consistent across platforms & langs (as well as more performant).
>
> It's also odd (legacy I think) that the Python version has its own
> implementation rather than calling into Java. That should also be changed
> probably.
> On Thu, 7 Apr 2016 at 17:59, Sean Owen <so...@cloudera.com> wrote:
>
>> Let's say I use HashingTF in my Pipeline to hash a string feature.
>> This is available in Python and Scala, but they hash strings to
>> different values since both use their respective runtime's native hash
>> implementation. This means that I create different feature vectors for
>> the same input. While I can load/store something like a
>> NaiveBayesModel across the two languages successfully, it seems like
>> the hashing part doesn't translate.
>>
>> Is that accurate, or, have I completely missed a way to get the same
>> hashing for the same input across languages?
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>


Re: MLlib ALS MatrixFactorizationModel.save fails consistently

2016-04-08 Thread Nick Pentreath
Could you post some stack trace info?

Generally, it can be problematic to run Spark within a web server framework
as often there are dependency conflict and threading issues. You might
prefer to run the model-building as a standalone app, or check out
https://github.com/spark-jobserver/spark-jobserver (either for triggering
spark jobs remotely from a web app, via HTTP, or for ideas on how to handle
SparkContext within web framework / akka).

On Fri, 8 Apr 2016 at 00:56 Colin Woodbury  wrote:

> Hi all,
>
> I've implemented most of a content recommendation system for a client.
> However, whenever I attempt to save a MatrixFactorizationModel I've
> trained, I see one of four outcomes:
>
> 1. Despite "save" being wrapped in a "try" block, I see a massive stack
> trace quoting some java.io classes. The Model isn't written.
> 2. Same as the above, but the Model *is* written. It's unusable however,
> as it's missing many of the files it should have, particularly in the
> "product" folder.
> 3. Same as the above, but sbt crashes completely.
> 4. No massive stack trace, and the Model seems to be written. Upon being
> loaded by another Spark context and fed a user ID, it claims the user isn't
> present in the Model.
>
> Case 4 is pretty rare. I see these failures both locally and when I test
> on a Google Cloud instance with much better resources.
>
> Note that `ALS.trainImplicit` and `model.save` are being called from
> within a Future. Could it be possible that Play threads are closing before
> Spark can finish, thus interrupting it somehow?
>
> We are running Spark 1.6.1 within Play 2.4 and Scala 2.11. All these
> failures have occurred while in Play's Dev mode in SBT.
>
> Thanks for any insight you can give.
>


Re: HashingTF "compatibility" across Python, Scala?

2016-04-07 Thread Nick Pentreath
You're right Sean, the implementation depends on hash code currently so may
differ. I opened a JIRA (which duplicated this one -
https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-10574
which is the active JIRA), for using murmurhash3 which should then be
consistent across platforms & langs (as well as more performant).

It's also odd (legacy I think) that the Python version has its own
implementation rather than calling into Java. That should also be changed
probably.
On Thu, 7 Apr 2016 at 17:59, Sean Owen  wrote:

> Let's say I use HashingTF in my Pipeline to hash a string feature.
> This is available in Python and Scala, but they hash strings to
> different values since both use their respective runtime's native hash
> implementation. This means that I create different feature vectors for
> the same input. While I can load/store something like a
> NaiveBayesModel across the two languages successfully, it seems like
> the hashing part doesn't translate.
>
> Is that accurate, or, have I completely missed a way to get the same
> hashing for the same input across languages?
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: ClassCastException when extracting and collecting DF array column type

2016-04-06 Thread Nick Pentreath
Ah I got it - Seq[(Int, Float)] is actually represented as Seq[Row] (seq of
struct type) internally.

So a further extraction is required, e.g. row => row.getSeq[Row](1).map { r
=> r.getInt(0) }

On Wed, 6 Apr 2016 at 13:35 Nick Pentreath <nick.pentre...@gmail.com> wrote:

> Hi there,
>
> In writing some tests for a PR I'm working on, with a more complex array
> type in a DF, I ran into this issue (running off latest master).
>
> Any thoughts?
>
> *// create DF with a column of Array[(Int, Double)]*
> val df = sc.parallelize(Seq(
> (0, Array((1, 6.0), (1, 4.0))),
> (1, Array((1, 3.0), (2, 1.0))),
> (2, Array((3, 3.0), (4, 6.0
> ).toDF("id", "predictions")
>
> *// extract the field from the Row, and use map to extract first element
> of tuple*
> *// the type of RDD appears correct*
> scala> df.rdd.map { row => row.getSeq[(Int, Double)](1).map(_._1) }
> res14: org.apache.spark.rdd.RDD[Seq[Int]] = MapPartitionsRDD[32] at map at
> :27
>
> *// however, calling collect on the same expression throws
> ClassCastException*
> scala> df.rdd.map { row => row.getSeq[(Int, Double)](1).map(_._1) }.collect
> 16/04/06 13:02:49 ERROR Executor: Exception in task 5.0 in stage 10.0 (TID
> 74)
> java.lang.ClassCastException:
> org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be
> cast to scala.Tuple2
> at
> $line54.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1$$anonfun$apply$1.apply(:27)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at
> $line54.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(:27)
> at
> $line54.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(:27)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
> at scala.collection.Iterator$class.foreach(Iterator.scala:742)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
> at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
> at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
> at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:308)
> at scala.collection.AbstractIterator.to(Iterator.scala:1194)
> at
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:300)
> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1194)
> at
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:287)
> at scala.collection.AbstractIterator.toArray(Iterator.scala:1194)
> at
> org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:880)
> at
> org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:880)
>
> *// can collect the extracted field*
> *// again, return type appears correct*
> scala> df.rdd.map { row => row.getSeq[(Int, Double)](1) }.collect
> res23: Array[Seq[(Int, Double)]] = Array(WrappedArray([1,6.0], [1,4.0]),
> WrappedArray([1,3.0], [2,1.0]), WrappedArray([3,3.0], [4,6.0]))
>
> *// trying to apply map to extract first element of tuple fails*
> scala> df.rdd.map { row => row.getSeq[(Int, Double)](1)
> }.collect.map(_.map(_._1))
> java.lang.ClassCastException:
> org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be
> cast to scala.Tuple2
>   at $anonfun$2$$anonfun$apply$1.apply(:27)
>   at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
>   at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
>   at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at $anonfun$2.apply(:27)
>   at $anonfun$2.apply(:27)
>   at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
>   at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
>   at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
>   at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
>


ClassCastException when extracting and collecting DF array column type

2016-04-06 Thread Nick Pentreath
Hi there,

In writing some tests for a PR I'm working on, with a more complex array
type in a DF, I ran into this issue (running off latest master).

Any thoughts?

*// create DF with a column of Array[(Int, Double)]*
val df = sc.parallelize(Seq(
(0, Array((1, 6.0), (1, 4.0))),
(1, Array((1, 3.0), (2, 1.0))),
(2, Array((3, 3.0), (4, 6.0
).toDF("id", "predictions")

*// extract the field from the Row, and use map to extract first element of
tuple*
*// the type of RDD appears correct*
scala> df.rdd.map { row => row.getSeq[(Int, Double)](1).map(_._1) }
res14: org.apache.spark.rdd.RDD[Seq[Int]] = MapPartitionsRDD[32] at map at
:27

*// however, calling collect on the same expression throws
ClassCastException*
scala> df.rdd.map { row => row.getSeq[(Int, Double)](1).map(_._1) }.collect
16/04/06 13:02:49 ERROR Executor: Exception in task 5.0 in stage 10.0 (TID
74)
java.lang.ClassCastException:
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be
cast to scala.Tuple2
at
$line54.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1$$anonfun$apply$1.apply(:27)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at
$line54.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(:27)
at
$line54.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(:27)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
at scala.collection.Iterator$class.foreach(Iterator.scala:742)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:308)
at scala.collection.AbstractIterator.to(Iterator.scala:1194)
at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:300)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1194)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:287)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1194)
at
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:880)
at
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:880)

*// can collect the extracted field*
*// again, return type appears correct*
scala> df.rdd.map { row => row.getSeq[(Int, Double)](1) }.collect
res23: Array[Seq[(Int, Double)]] = Array(WrappedArray([1,6.0], [1,4.0]),
WrappedArray([1,3.0], [2,1.0]), WrappedArray([3,3.0], [4,6.0]))

*// trying to apply map to extract first element of tuple fails*
scala> df.rdd.map { row => row.getSeq[(Int, Double)](1)
}.collect.map(_.map(_._1))
java.lang.ClassCastException:
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be
cast to scala.Tuple2
  at $anonfun$2$$anonfun$apply$1.apply(:27)
  at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
  at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
  at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
  at scala.collection.AbstractTraversable.map(Traversable.scala:104)
  at $anonfun$2.apply(:27)
  at $anonfun$2.apply(:27)
  at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
  at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
  at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
  at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)


Re: Switch RDD-based MLlib APIs to maintenance mode in Spark 2.0

2016-04-05 Thread Nick Pentreath
+1 for this proposal - as you mention I think it's the defacto current
situation anyway.

Note that from a developer view it's just the user-facing API that will be
only "ml" - the majority of the actual algorithms still operate on RDDs
under the good currently.
On Wed, 6 Apr 2016 at 05:03, Chris Fregly  wrote:

> perhaps renaming to Spark ML would actually clear up code and
> documentation confusion?
>
> +1 for rename
>
> On Apr 5, 2016, at 7:00 PM, Reynold Xin  wrote:
>
> +1
>
> This is a no brainer IMO.
>
>
> On Tue, Apr 5, 2016 at 7:32 PM, Joseph Bradley 
> wrote:
>
>> +1  By the way, the JIRA for tracking (Scala) API parity is:
>> https://issues.apache.org/jira/browse/SPARK-4591
>>
>> On Tue, Apr 5, 2016 at 4:58 PM, Matei Zaharia 
>> wrote:
>>
>>> This sounds good to me as well. The one thing we should pay attention to
>>> is how we update the docs so that people know to start with the spark.ml
>>> classes. Right now the docs list spark.mllib first and also seem more
>>> comprehensive in that area than in spark.ml, so maybe people naturally
>>> move towards that.
>>>
>>> Matei
>>>
>>> On Apr 5, 2016, at 4:44 PM, Xiangrui Meng  wrote:
>>>
>>> Yes, DB (cc'ed) is working on porting the local linear algebra library
>>> over (SPARK-13944). There are also frequent pattern mining algorithms we
>>> need to port over in order to reach feature parity. -Xiangrui
>>>
>>> On Tue, Apr 5, 2016 at 12:08 PM Shivaram Venkataraman <
>>> shiva...@eecs.berkeley.edu> wrote:
>>>
 Overall this sounds good to me. One question I have is that in
 addition to the ML algorithms we have a number of linear algebra
 (various distributed matrices) and statistical methods in the
 spark.mllib package. Is the plan to port or move these to the spark.ml
 namespace in the 2.x series ?

 Thanks
 Shivaram

 On Tue, Apr 5, 2016 at 11:48 AM, Sean Owen  wrote:
 > FWIW, all of that sounds like a good plan to me. Developing one API is
 > certainly better than two.
 >
 > On Tue, Apr 5, 2016 at 7:01 PM, Xiangrui Meng 
 wrote:
 >> Hi all,
 >>
 >> More than a year ago, in Spark 1.2 we introduced the ML pipeline API
 built
 >> on top of Spark SQL’s DataFrames. Since then the new DataFrame-based
 API has
 >> been developed under the spark.ml package, while the old RDD-based
 API has
 >> been developed in parallel under the spark.mllib package. While it
 was
 >> easier to implement and experiment with new APIs under a new
 package, it
 >> became harder and harder to maintain as both packages grew bigger and
 >> bigger. And new users are often confused by having two sets of APIs
 with
 >> overlapped functions.
 >>
 >> We started to recommend the DataFrame-based API over the RDD-based
 API in
 >> Spark 1.5 for its versatility and flexibility, and we saw the
 development
 >> and the usage gradually shifting to the DataFrame-based API. Just
 counting
 >> the lines of Scala code, from 1.5 to the current master we added
 ~1
 >> lines to the DataFrame-based API while ~700 to the RDD-based API.
 So, to
 >> gather more resources on the development of the DataFrame-based API
 and to
 >> help users migrate over sooner, I want to propose switching
 RDD-based MLlib
 >> APIs to maintenance mode in Spark 2.0. What does it mean exactly?
 >>
 >> * We do not accept new features in the RDD-based spark.mllib
 package, unless
 >> they block implementing new features in the DataFrame-based spark.ml
 >> package.
 >> * We still accept bug fixes in the RDD-based API.
 >> * We will add more features to the DataFrame-based API in the 2.x
 series to
 >> reach feature parity with the RDD-based API.
 >> * Once we reach feature parity (possibly in Spark 2.2), we will
 deprecate
 >> the RDD-based API.
 >> * We will remove the RDD-based API from the main Spark repo in Spark
 3.0.
 >>
 >> Though the RDD-based API is already in de facto maintenance mode,
 this
 >> announcement will make it clear and hence important to both MLlib
 developers
 >> and users. So we’d greatly appreciate your feedback!
 >>
 >> (As a side note, people sometimes use “Spark ML” to refer to the
 >> DataFrame-based API or even the entire MLlib component. This also
 causes
 >> confusion. To be clear, “Spark ML” is not an official name and there
 are no
 >> plans to rename MLlib to “Spark ML” at this time.)
 >>
 >> Best,
 >> Xiangrui
 >
 > -
 > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 > For additional commands, e-mail: user-h...@spark.apache.org
 >

>>>
>>>

Re: is there any way to make WEB UI auto-refresh?

2016-03-15 Thread Nick Pentreath
You may want to check out https://github.com/hammerlab/spree

On Tue, 15 Mar 2016 at 10:43 charles li  wrote:

> every time I can only get the latest info by refreshing the page, that's a
> little boring.
>
> so is there any way to make the WEB UI auto-refreshing ?
>
>
> great thanks
>
>
>
> --
> *--*
> a spark lover, a quant, a developer and a good man.
>
> http://github.com/litaotao
>


Re: [MLlib - ALS] Merging two Models?

2016-03-15 Thread Nick Pentreath
By the way, I created a JIRA for supporting initial model for warm start
ALS here: https://issues.apache.org/jira/browse/SPARK-13856

On Fri, 11 Mar 2016 at 09:14, Nick Pentreath <nick.pentre...@gmail.com>
wrote:

> Sean's old Myrrix slides contain an overview of the fold-in math:
> http://www.slideshare.net/srowen/big-practical-recommendations-with-alternating-least-squares/14?src=clipshare
>
> I never quite got around to actually incorporating it into my own
> ALS-based systems, because in the end I just re-computed models every day
> and found other ways to incorporate real-time elements using Elasticsearch.
>
>
> On Fri, 11 Mar 2016 at 01:12 Chris Fregly <ch...@fregly.com> wrote:
>
>> @Colin-  you're asking the $1 million dollar question that a lot of
>> people are trying to do.  This was literally the #1 most-asked question in
>> every city on my recent world-wide meetup tour.
>>
>> I've been pointing people to my old Databricks co-worker's
>> streaming-matrix-factorization project:
>> https://github.com/brkyvz/streaming-matrix-factorization  He got tired
>> of everyone asking about this - and cranked it out over a weekend.  Love
>> that guy, Burak!  :)
>>
>> I've attempted (unsuccessfully, so far) to deploy exactly what you're
>> trying to do here:
>> https://github.com/fluxcapacitor/pipeline/blob/master/myapps/streaming/src/main/scala/com/advancedspark/streaming/rating/ml/TrainMFIncremental.scala
>>
>> We're a couple pull requests away from making this happen.  You can see
>> my comments and open github issues for the remaining bits.
>>
>> And this will be my focus in the next week or so as I prepare for an
>> upcoming conference.  Keep an eye on this repo if you'd like.
>>
>> @Sean:  thanks for the link.  I knew Oryx was doing this somehow - and I
>> kept meaning to see how you were doing it.  I'll likely incorporate some of
>> your stuff into my final solution.
>>
>>
>> On Thu, Mar 10, 2016 at 3:35 PM, Sean Owen <so...@cloudera.com> wrote:
>>
>>> While it isn't crazy, I am not sure how valid it is to build a model
>>> off of only a chunk of recent data and then merge it into another
>>> model in any direct way. They're not really sharing a basis, so you
>>> can't just average them.
>>>
>>> My experience with this aspect suggests you should try to update the
>>> existing model in place on the fly. In short, you figure out how much
>>> the new input ought to change your estimate of the (user,item)
>>> association. Positive interactions should increase it a bit, etc. Then
>>> you work out how the item vector would change if the user vector were
>>> fixed in order to accomplish that change, with a bit of linear
>>> algebra. Vice versa for user vector. Of course, those changes affect
>>> the rest of the matrix too but that's the 'approximate' bit.
>>>
>>> I so happen to have an implementation of this in the context of a
>>> Spark ALS model, though raw source code may be hard to read. If it's
>>> of interest we can discuss offline (or online here to the extent it's
>>> relevant to Spark users)
>>>
>>>
>>> https://github.com/OryxProject/oryx/blob/91004a03413eef0fdfd6e75a61b68248d11db0e5/app/oryx-app/src/main/java/com/cloudera/oryx/app/speed/als/ALSSpeedModelManager.java#L192
>>>
>>> On Thu, Mar 10, 2016 at 8:01 PM, Colin Woodbury <coli...@gmail.com>
>>> wrote:
>>> > Hi there, I'm wondering if it's possible (or feasible) to combine the
>>> > feature matrices of two MatrixFactorizationModels that share a user and
>>> > product set.
>>> >
>>> > Specifically, one model would be the "on-going" model, and the other
>>> is one
>>> > trained only on the most recent aggregation of some event data. My
>>> overall
>>> > goal is to try to approximate "online" training, as ALS doesn't support
>>> > streaming, and it also isn't possible to "seed" the ALS training
>>> process
>>> > with an already trained model.
>>> >
>>> > Since the two Models would share a user/product ID space, can their
>>> feature
>>> > matrices be merged? For instance via:
>>> >
>>> > 1. Adding feature vectors together for user/product vectors that
>>> appear in
>>> > both models
>>> > 2. Averaging said vectors instead
>>> > 3. Some other linear algebra operation
>>> >
>>> > Unfortunately, I'm fairly ignorant as to the internal mechanics of ALS
>>> > itself. Is what I'm asking possible?
>>> >
>>> > Thank you,
>>> > Colin
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>>
>> --
>>
>> *Chris Fregly*
>> Principal Data Solutions Engineer
>> IBM Spark Technology Center, San Francisco, CA
>> http://spark.tc | http://advancedspark.com
>>
>


  1   2   3   >