[ANNOUNCEMENT] Plan for dropping Python 2 support

2019-06-03 Thread Xiangrui Meng
Hi all,

Today we announced the plan for dropping Python 2 support
 [1]
in Apache Spark:

As many of you already knew, Python core development team and many utilized
Python packages like Pandas and NumPy will drop Python 2 support in or
before 2020/01/01  [2]. Apache Spark has
supported both Python 2 and 3 since Spark 1.4 release in 2015. However,
maintaining Python 2/3 compatibility is an increasing burden and it
essentially limits the use of Python 3 features in Spark. Given the end of
life (EOL) of Python 2 is coming, we plan to eventually drop Python 2
support as well. The current plan is as follows:

* In the next major release in 2019, we will deprecate Python 2 support.
PySpark users will see a deprecation warning if Python 2 is used. We will
publish a migration guide for PySpark users to migrate to Python 3.
* We will drop Python 2 support in a future release (excluding patch
release) in 2020, after Python 2 EOL on 2020/01/01. PySpark users will see
an error if Python 2 is used.
* For releases that support Python 2, e.g., Spark 2.4, their patch releases
will continue supporting Python 2. However, after Python 2 EOL, we might
not take patches that are specific to Python 2.

Best,
Xiangrui

[1]: http://spark.apache.org/news/plan-for-dropping-python-2-support.html
[2]: https://python3statement.org/


Re: Should python-2 be supported in Spark 3.0?

2019-06-03 Thread Xiangrui Meng
I updated Spark website and announced the plan for dropping python 2
support there:
http://spark.apache.org/news/plan-for-dropping-python-2-support.html. I
will send an announcement email to user@ and dev@. -Xiangrui

On Fri, May 31, 2019 at 10:54 PM Felix Cheung 
wrote:

> Very subtle but someone might take
>
> “We will drop Python 2 support in a future release in 2020”
>
> To mean any / first release in 2020. Whereas the next statement indicates
> patch release is not included in above. Might help reorder the items or
> clarify the wording.
>
>
> --
> *From:* shane knapp 
> *Sent:* Friday, May 31, 2019 7:38:10 PM
> *To:* Denny Lee
> *Cc:* Holden Karau; Bryan Cutler; Erik Erlandson; Felix Cheung; Mark
> Hamstra; Matei Zaharia; Reynold Xin; Sean Owen; Wenchen Fen; Xiangrui Meng;
> dev; user
> *Subject:* Re: Should python-2 be supported in Spark 3.0?
>
> +1000  ;)
>
> On Sat, Jun 1, 2019 at 6:53 AM Denny Lee  wrote:
>
>> +1
>>
>> On Fri, May 31, 2019 at 17:58 Holden Karau  wrote:
>>
>>> +1
>>>
>>> On Fri, May 31, 2019 at 5:41 PM Bryan Cutler  wrote:
>>>
>>>> +1 and the draft sounds good
>>>>
>>>> On Thu, May 30, 2019, 11:32 AM Xiangrui Meng  wrote:
>>>>
>>>>> Here is the draft announcement:
>>>>>
>>>>> ===
>>>>> Plan for dropping Python 2 support
>>>>>
>>>>> As many of you already knew, Python core development team and many
>>>>> utilized Python packages like Pandas and NumPy will drop Python 2 support
>>>>> in or before 2020/01/01. Apache Spark has supported both Python 2 and 3
>>>>> since Spark 1.4 release in 2015. However, maintaining Python 2/3
>>>>> compatibility is an increasing burden and it essentially limits the use of
>>>>> Python 3 features in Spark. Given the end of life (EOL) of Python 2 is
>>>>> coming, we plan to eventually drop Python 2 support as well. The current
>>>>> plan is as follows:
>>>>>
>>>>> * In the next major release in 2019, we will deprecate Python 2
>>>>> support. PySpark users will see a deprecation warning if Python 2 is used.
>>>>> We will publish a migration guide for PySpark users to migrate to Python 
>>>>> 3.
>>>>> * We will drop Python 2 support in a future release in 2020, after
>>>>> Python 2 EOL on 2020/01/01. PySpark users will see an error if Python 2 is
>>>>> used.
>>>>> * For releases that support Python 2, e.g., Spark 2.4, their patch
>>>>> releases will continue supporting Python 2. However, after Python 2 EOL, 
>>>>> we
>>>>> might not take patches that are specific to Python 2.
>>>>> ===
>>>>>
>>>>> Sean helped make a pass. If it looks good, I'm going to upload it to
>>>>> Spark website and announce it here. Let me know if you think we should do 
>>>>> a
>>>>> VOTE instead.
>>>>>
>>>>> On Thu, May 30, 2019 at 9:21 AM Xiangrui Meng 
>>>>> wrote:
>>>>>
>>>>>> I created https://issues.apache.org/jira/browse/SPARK-27884 to track
>>>>>> the work.
>>>>>>
>>>>>> On Thu, May 30, 2019 at 2:18 AM Felix Cheung <
>>>>>> felixcheun...@hotmail.com> wrote:
>>>>>>
>>>>>>> We don’t usually reference a future release on website
>>>>>>>
>>>>>>> > Spark website and state that Python 2 is deprecated in Spark 3.0
>>>>>>>
>>>>>>> I suspect people will then ask when is Spark 3.0 coming out then.
>>>>>>> Might need to provide some clarity on that.
>>>>>>>
>>>>>>
>>>>>> We can say the "next major release in 2019" instead of Spark 3.0.
>>>>>> Spark 3.0 timeline certainly requires a new thread to discuss.
>>>>>>
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> *From:* Reynold Xin 
>>>>>>> *Sent:* Thursday, May 30, 2019 12:59:14 AM
>>>>>>> *To:* shane knapp
>>>>>>> *Cc:* Erik Erlandson; Mark Hamstra; Matei Zaharia; Sean Owen;
>>>>>>> Wenchen Fen; Xiangrui Meng; dev; user
>>>>>>> *Subject:* Re: Should pyth

Re: Should python-2 be supported in Spark 3.0?

2019-05-30 Thread Xiangrui Meng
Here is the draft announcement:

===
Plan for dropping Python 2 support

As many of you already knew, Python core development team and many utilized
Python packages like Pandas and NumPy will drop Python 2 support in or
before 2020/01/01. Apache Spark has supported both Python 2 and 3 since
Spark 1.4 release in 2015. However, maintaining Python 2/3 compatibility is
an increasing burden and it essentially limits the use of Python 3 features
in Spark. Given the end of life (EOL) of Python 2 is coming, we plan to
eventually drop Python 2 support as well. The current plan is as follows:

* In the next major release in 2019, we will deprecate Python 2 support.
PySpark users will see a deprecation warning if Python 2 is used. We will
publish a migration guide for PySpark users to migrate to Python 3.
* We will drop Python 2 support in a future release in 2020, after Python 2
EOL on 2020/01/01. PySpark users will see an error if Python 2 is used.
* For releases that support Python 2, e.g., Spark 2.4, their patch releases
will continue supporting Python 2. However, after Python 2 EOL, we might
not take patches that are specific to Python 2.
===

Sean helped make a pass. If it looks good, I'm going to upload it to Spark
website and announce it here. Let me know if you think we should do a VOTE
instead.

On Thu, May 30, 2019 at 9:21 AM Xiangrui Meng  wrote:

> I created https://issues.apache.org/jira/browse/SPARK-27884 to track the
> work.
>
> On Thu, May 30, 2019 at 2:18 AM Felix Cheung 
> wrote:
>
>> We don’t usually reference a future release on website
>>
>> > Spark website and state that Python 2 is deprecated in Spark 3.0
>>
>> I suspect people will then ask when is Spark 3.0 coming out then. Might
>> need to provide some clarity on that.
>>
>
> We can say the "next major release in 2019" instead of Spark 3.0. Spark
> 3.0 timeline certainly requires a new thread to discuss.
>
>
>>
>>
>> --
>> *From:* Reynold Xin 
>> *Sent:* Thursday, May 30, 2019 12:59:14 AM
>> *To:* shane knapp
>> *Cc:* Erik Erlandson; Mark Hamstra; Matei Zaharia; Sean Owen; Wenchen
>> Fen; Xiangrui Meng; dev; user
>> *Subject:* Re: Should python-2 be supported in Spark 3.0?
>>
>> +1 on Xiangrui’s plan.
>>
>> On Thu, May 30, 2019 at 7:55 AM shane knapp  wrote:
>>
>>> I don't have a good sense of the overhead of continuing to support
>>>> Python 2; is it large enough to consider dropping it in Spark 3.0?
>>>>
>>>> from the build/test side, it will actually be pretty easy to continue
>>> support for python2.7 for spark 2.x as the feature sets won't be expanding.
>>>
>>
>>> that being said, i will be cracking a bottle of champagne when i can
>>> delete all of the ansible and anaconda configs for python2.x.  :)
>>>
>>
> On the development side, in a future release that drops Python 2 support
> we can remove code that maintains python 2/3 compatibility and start using
> python 3 only features, which is also quite exciting.
>
>
>>
>>> shane
>>> --
>>> Shane Knapp
>>> UC Berkeley EECS Research / RISELab Staff Technical Lead
>>> https://rise.cs.berkeley.edu
>>>
>>


Re: Should python-2 be supported in Spark 3.0?

2019-05-30 Thread Xiangrui Meng
I created https://issues.apache.org/jira/browse/SPARK-27884 to track the
work.

On Thu, May 30, 2019 at 2:18 AM Felix Cheung 
wrote:

> We don’t usually reference a future release on website
>
> > Spark website and state that Python 2 is deprecated in Spark 3.0
>
> I suspect people will then ask when is Spark 3.0 coming out then. Might
> need to provide some clarity on that.
>

We can say the "next major release in 2019" instead of Spark 3.0. Spark 3.0
timeline certainly requires a new thread to discuss.


>
>
> --
> *From:* Reynold Xin 
> *Sent:* Thursday, May 30, 2019 12:59:14 AM
> *To:* shane knapp
> *Cc:* Erik Erlandson; Mark Hamstra; Matei Zaharia; Sean Owen; Wenchen
> Fen; Xiangrui Meng; dev; user
> *Subject:* Re: Should python-2 be supported in Spark 3.0?
>
> +1 on Xiangrui’s plan.
>
> On Thu, May 30, 2019 at 7:55 AM shane knapp  wrote:
>
>> I don't have a good sense of the overhead of continuing to support
>>> Python 2; is it large enough to consider dropping it in Spark 3.0?
>>>
>>> from the build/test side, it will actually be pretty easy to continue
>> support for python2.7 for spark 2.x as the feature sets won't be expanding.
>>
>
>> that being said, i will be cracking a bottle of champagne when i can
>> delete all of the ansible and anaconda configs for python2.x.  :)
>>
>
On the development side, in a future release that drops Python 2 support we
can remove code that maintains python 2/3 compatibility and start using
python 3 only features, which is also quite exciting.


>
>> shane
>> --
>> Shane Knapp
>> UC Berkeley EECS Research / RISELab Staff Technical Lead
>> https://rise.cs.berkeley.edu
>>
>


Re: Should python-2 be supported in Spark 3.0?

2019-05-29 Thread Xiangrui Meng
Hi all,

I want to revive this old thread since no action was taken so far. If we
plan to mark Python 2 as deprecated in Spark 3.0, we should do it as early
as possible and let users know ahead. PySpark depends on Python, numpy,
pandas, and pyarrow, all of which are sunsetting Python 2 support by
2020/01/01 per https://python3statement.org/. At that time we cannot really
support Python 2 because the dependent libraries do not plan to make new
releases, even for security reasons. So I suggest the following:

1. Update Spark website and state that Python 2 is deprecated in Spark 3.0
and its support will be removed in a release after 2020/01/01.
2. Make a formal announcement to dev@ and users@.
3. Add Apache Spark project to https://python3statement.org/ timeline.
4. Update PySpark, check python version and print a deprecation warning if
version < 3.

Any thoughts and suggestions?

Best,
Xiangrui

On Mon, Sep 17, 2018 at 6:54 PM Erik Erlandson  wrote:

>
> I think that makes sense. The main benefit of deprecating *prior* to 3.0
> would be informational - making the community aware of the upcoming
> transition earlier. But there are other ways to start informing the
> community between now and 3.0, besides formal deprecation.
>
> I have some residual curiosity about what it might mean for a release like
> 2.4 to still be in its support lifetime after Py2 goes EOL. I asked Apache
> Legal  to comment. It is
> possible there are no issues with this at all.
>
>
> On Mon, Sep 17, 2018 at 4:26 PM, Reynold Xin  wrote:
>
>> i'd like to second that.
>>
>> if we want to communicate timeline, we can add to the release notes
>> saying py2 will be deprecated in 3.0, and removed in a 3.x release.
>>
>> --
>> excuse the brevity and lower case due to wrist injury
>>
>>
>> On Mon, Sep 17, 2018 at 4:24 PM Matei Zaharia 
>> wrote:
>>
>>> That’s a good point — I’d say there’s just a risk of creating a
>>> perception issue. First, some users might feel that this means they have to
>>> migrate now, which is before Python itself drops support; they might also
>>> be surprised that we did this in a minor release (e.g. might we drop Python
>>> 2 altogether in a Spark 2.5 if that later comes out?). Second, contributors
>>> might feel that this means new features no longer have to work with Python
>>> 2, which would be confusing. Maybe it’s OK on both fronts, but it just
>>> seems scarier for users to do this now if we do plan to have Spark 3.0 in
>>> the next 6 months anyway.
>>>
>>> Matei
>>>
>>> > On Sep 17, 2018, at 1:04 PM, Mark Hamstra 
>>> wrote:
>>> >
>>> > What is the disadvantage to deprecating now in 2.4.0? I mean, it
>>> doesn't change the code at all; it's just a notification that we will
>>> eventually cease supporting Py2. Wouldn't users prefer to get that
>>> notification sooner rather than later?
>>> >
>>> > On Mon, Sep 17, 2018 at 12:58 PM Matei Zaharia <
>>> matei.zaha...@gmail.com> wrote:
>>> > I’d like to understand the maintenance burden of Python 2 before
>>> deprecating it. Since it is not EOL yet, it might make sense to only
>>> deprecate it once it’s EOL (which is still over a year from now).
>>> Supporting Python 2+3 seems less burdensome than supporting, say, multiple
>>> Scala versions in the same codebase, so what are we losing out?
>>> >
>>> > The other thing is that even though Python core devs might not support
>>> 2.x later, it’s quite possible that various Linux distros will if moving
>>> from 2 to 3 remains painful. In that case, we may want Apache Spark to
>>> continue releasing for it despite the Python core devs not supporting it.
>>> >
>>> > Basically, I’d suggest to deprecate this in Spark 3.0 and then remove
>>> it later in 3.x instead of deprecating it in 2.4. I’d also consider looking
>>> at what other data science tools are doing before fully removing it: for
>>> example, if Pandas and TensorFlow no longer support Python 2 past some
>>> point, that might be a good point to remove it.
>>> >
>>> > Matei
>>> >
>>> > > On Sep 17, 2018, at 11:01 AM, Mark Hamstra 
>>> wrote:
>>> > >
>>> > > If we're going to do that, then we need to do it right now, since
>>> 2.4.0 is already in release candidates.
>>> > >
>>> > > On Mon, Sep 17, 2018 at 10:57 AM Erik Erlandson 
>>> wrote:
>>> > > I like Mark’s concept for deprecating Py2 starting with 2.4: It may
>>> seem like a ways off but even now there may be some spark versions
>>> supporting Py2 past the point where Py2 is no longer receiving security
>>> patches
>>> > >
>>> > >
>>> > > On Sun, Sep 16, 2018 at 12:26 PM Mark Hamstra <
>>> m...@clearstorydata.com> wrote:
>>> > > We could also deprecate Py2 already in the 2.4.0 release.
>>> > >
>>> > > On Sat, Sep 15, 2018 at 11:46 AM Erik Erlandson 
>>> wrote:
>>> > > In case this didn't make it onto this thread:
>>> > >
>>> > > There is a 3rd option, which is to deprecate Py2 for Spark-3.0, and
>>> remove it entirely on a later 3.x release.
>>> > >
>>> > > On 

SPIP: DataFrame-based Property Graphs, Cypher Queries, and Algorithms

2019-01-15 Thread Xiangrui Meng
Hi all,

I want to re-send the previous SPIP on introducing a DataFrame-based graph
component to collect more feedback. It supports property graphs, Cypher
graph queries, and graph algorithms built on top of the DataFrame API. If
you are a GraphX user or your workload is essentially graph queries, please
help review and check how it fits into your use cases. Your feedback would
be greatly appreciated!

# Links to SPIP and design sketch:

* Jira issue for the SPIP: https://issues.apache.org/jira/browse/SPARK-25994
* Google Doc:
https://docs.google.com/document/d/1ljqVsAh2wxTZS8XqwDQgRT6i_mania3ffYSYpEgLx9k/edit?usp=sharing
* Jira issue for a first design sketch:
https://issues.apache.org/jira/browse/SPARK-26028
* Google Doc:
https://docs.google.com/document/d/1Wxzghj0PvpOVu7XD1iA8uonRYhexwn18utdcTxtkxlI/edit?usp=sharing

# Sample code:

~~~
val graph = ...

// query
val result = graph.cypher("""
  MATCH (p:Person)-[r:STUDY_AT]->(u:University)
  RETURN p.name, r.since, u.name
""")

// algorithms
val ranks = graph.pageRank.run()
~~~

Best,
Xiangrui


Re: Spark ML with null labels

2019-01-10 Thread Xiangrui Meng
In your custom transformer that produces labels, can you filter null
labels? A transformer doesn't always need to do 1:1 mapping.

On Thu, Jan 10, 2019, 7:53 AM Patrick McCarthy
 I'm trying to implement an algorithm on the MNIST digits that runs like so:
>
>
>- for every pair of digits (0,1), (0,2), (0,3)... assign a 0/1 label
>to the digits and build a LogisticRegression Classifier -- 45 in total
>- Fit every classifier on the test set separately
>- Aggregate the results per record of the test set and compute a
>prediction from the 45 predictions
>
> I tried implementing this with a Pipeline, composed of
>
>- stringIndexer
>- a custom transformer which accepts a lower-digit and upper-digit
>argument, producing the 0/1 label
>- a custom transformer to assemble the indexed strings to VectorUDT
>- LogisticRegression
>
> fed by a list of paramMaps. It failed because the fit() method of logistic
> couldn't handle cases of null labels, i.e. a case where my 0/1 transformer
> found neither the lower nor the upper digit label. I fixed this by
> extending the LogisticRegression class and overriding the fit() method to
> include a filter for labels in (0,1) -- I didn't want to alter the
> transform method.
>
> Now, I'd like to tune these models using CrossValidator with an estimator
> of pipeline but when I run either fitMultiple on my paramMap or I loop over
> the paramMaps, I get arcane Scala errors.
>
>
> Is there a better way to build this procedure? Thanks!
>


Re: question about barrier execution mode in Spark 2.4.0

2018-12-19 Thread Xiangrui Meng
On Mon, Nov 12, 2018 at 7:33 AM Joe  wrote:

> Hello,
> I was reading Spark 2.4.0 release docs and I'd like to find out more
> about barrier execution mode.
> In particular I'd like to know what happens when number of partitions
> exceeds number of nodes (which I think is allowed, Spark tuning doc
> mentions that)?
>

The barrier execution mode is different. It needs to run tasks for all
partitions together. So when the number of partitions is greater than
number of nodes, it will wait until more nodes are available and print
warning messages.


> Does Spark guarantee that all tasks process all partitions
> simultaneously?


They will start all together. We provide a barrier()

method in the task scope to help simple coordination among tasks.


> If not then how does barrier mode handle partitions that
> are waiting to be processed?
> If there are partitions waiting to be processed then I don't think it's
> possible to send all data from given stage to a DL process, even when
> using barrier mode?
> Thanks a lot,
>
> Joe
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Custom Spark Error on Hadoop Cluster

2016-07-18 Thread Xiangrui Meng
Glad to hear. Could you please share your solution on the user mailing
list? -Xiangrui

On Mon, Jul 18, 2016 at 2:26 AM Alger Remirata <abremirat...@gmail.com>
wrote:

> Hi Xiangrui,
>
> We have now solved the problem. Thanks for all the tips you've given.
>
> Best Regards,
>
> Alger
>
> On Thu, Jul 14, 2016 at 2:43 AM, Alger Remirata <abremirat...@gmail.com>
> wrote:
>
>> By the using cloudera manager for standalone cluster manager
>>
>> On Thu, Jul 14, 2016 at 2:20 AM, Alger Remirata <abremirat...@gmail.com>
>> wrote:
>>
>>> It looks like there are a lot of people already having posted on
>>> classNotFoundError on the cluster mode fro version 1.5.1.
>>>
>>> https://www.mail-archive.com/user@spark.apache.org/msg43089.html
>>>
>>>
>>>
>>> On Thu, Jul 14, 2016 at 12:45 AM, Alger Remirata <abremirat...@gmail.com
>>> > wrote:
>>>
>>>> Hi Xiangrui,
>>>>
>>>> I check all the nodes of the cluster. It is working locally on each
>>>> node but there's an error upon deploying it on the cluster itself. I don't
>>>> know why it is and still don't understand why on individual node, it is
>>>> working locally but when deployed to hadoop cluster, it gets the error
>>>> mentioned.
>>>>
>>>> Thanks,
>>>>
>>>> Alger
>>>>
>>>> On Wed, Jul 13, 2016 at 4:38 AM, Alger Remirata <abremirat...@gmail.com
>>>> > wrote:
>>>>
>>>>> Since we're using mvn to build, it looks like mvn didn't add the
>>>>> class. Is there something on pom.xml to be added so that the new class can
>>>>> be recognized?
>>>>>
>>>>> On Wed, Jul 13, 2016 at 4:21 AM, Alger Remirata <
>>>>> abremirat...@gmail.com> wrote:
>>>>>
>>>>>> Thanks for the reply however I couldn't locate the MLlib jar. What I
>>>>>> have is a fat 'spark-assembly-1.5.1-hadoop2.6.0.jar'.
>>>>>>
>>>>>> There's an error on me copying user@spark.apache.org. The message
>>>>>> suddently is not sent when I do that.
>>>>>>
>>>>>> On Wed, Jul 13, 2016 at 4:13 AM, Alger Remirata <
>>>>>> abremirat...@gmail.com> wrote:
>>>>>>
>>>>>>> Thanks for the reply however I couldn't locate the MLlib jar. What I
>>>>>>> have is a fat 'spark-assembly-1.5.1-hadoop2.6.0.jar'.
>>>>>>>
>>>>>>> On Tue, Jul 12, 2016 at 3:23 AM, Xiangrui Meng <m...@databricks.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> (+user@spark. Please copy user@ so other people could see and
>>>>>>>> help.)
>>>>>>>>
>>>>>>>> The error message means you have an MLlib jar on the classpath but
>>>>>>>> it didn't contain ALS$StandardNNLSSolver. So it is either the
>>>>>>>> modified jar not deployed to the workers or there existing an 
>>>>>>>> unmodified
>>>>>>>> MLlib jar sitting in front of the modified one on the classpath. You 
>>>>>>>> can
>>>>>>>> check the worker logs and see the classpath used in launching the 
>>>>>>>> worker,
>>>>>>>> and then check the MLlib jars on that classpath. -Xiangrui
>>>>>>>>
>>>>>>>> On Sun, Jul 10, 2016 at 10:18 PM Alger Remirata <
>>>>>>>> abremirat...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Xiangrui,
>>>>>>>>>
>>>>>>>>> We have the modified jars deployed both on master and slave nodes.
>>>>>>>>>
>>>>>>>>> What do you mean by this line?: 1. The unmodified Spark jars were
>>>>>>>>> not on the classpath (already existed on the cluster or pulled in by 
>>>>>>>>> other
>>>>>>>>> packages).
>>>>>>>>>
>>>>>>>>> How would I check that the unmodified Spark jars are not on the
>>>>>>>>> classpath? We change entirely the contents of the directory for 
>>>>>>>>> SPARK_HOME.
>>>>>>>>> The newly built customized spark is the new contents of 

Re: Custom Spark Error on Hadoop Cluster

2016-07-11 Thread Xiangrui Meng
(+user@spark. Please copy user@ so other people could see and help.)

The error message means you have an MLlib jar on the classpath but it
didn't contain ALS$StandardNNLSSolver. So it is either the modified jar not
deployed to the workers or there existing an unmodified MLlib jar sitting
in front of the modified one on the classpath. You can check the worker
logs and see the classpath used in launching the worker, and then check the
MLlib jars on that classpath. -Xiangrui

On Sun, Jul 10, 2016 at 10:18 PM Alger Remirata <abremirat...@gmail.com>
wrote:

> Hi Xiangrui,
>
> We have the modified jars deployed both on master and slave nodes.
>
> What do you mean by this line?: 1. The unmodified Spark jars were not on
> the classpath (already existed on the cluster or pulled in by other
> packages).
>
> How would I check that the unmodified Spark jars are not on the classpath?
> We change entirely the contents of the directory for SPARK_HOME. The newly
> built customized spark is the new contents of the current SPARK_HOME we
> have right now.
>
> Thanks,
>
> Alger
>
> On Fri, Jul 8, 2016 at 1:32 PM, Xiangrui Meng <m...@databricks.com> wrote:
>
>> This seems like a deployment or dependency issue. Please check the
>> following:
>> 1. The unmodified Spark jars were not on the classpath (already existed
>> on the cluster or pulled in by other packages).
>> 2. The modified jars were indeed deployed to both master and slave nodes.
>>
>> On Tue, Jul 5, 2016 at 12:29 PM Alger Remirata <abremirat...@gmail.com>
>> wrote:
>>
>>> Hi all,
>>>
>>> First of all, we like to thank you for developing spark. This helps us a
>>> lot on our data science task.
>>>
>>> I have a question. We have build a customized spark using the following
>>> command:
>>> mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive
>>> -Phive-thriftserver -DskipTests clean package.
>>>
>>> On the custom spark we built, we've added a new scala file or package
>>> called StandardNNLS file however it got an error saying:
>>>
>>> Name: org.apache.spark.SparkException
>>> Message: Job aborted due to stage failure: Task 21 in stage 34.0 failed
>>> 4 times, most recent failure: Lost task 21.3 in stage 34.0 (TID 2547,
>>> 192.168.60.115): java.lang.ClassNotFoundException:
>>> org.apache.spark.ml.recommendation.ALS$StandardNNLSSolver
>>>
>>> StandardNNLSolver is found on another scala file called
>>> StandardNNLS.scala
>>> as we replace the original NNLS solver from scala with StandardNNLS
>>> Do you guys have some idea about the error. Is there a config file we
>>> need to edit to add the classpath? Even if we insert the added codes in
>>> ALS.scala and not create another file like StandardNNLS.scala, the inserted
>>> code is not recognized. It still gets an error regarding
>>> ClassNotFoundException
>>>
>>> However, when we run this on our local machine and not on the hadoop
>>> cluster, it is working. We don't know if the error is because we are using
>>> mvn to build custom spark or it has something to do with communicating to
>>> hadoop cluster.
>>>
>>> We would like to ask some ideas from you how to solve this problem. We
>>> can actually create another package not dependent to Apache Spark but this
>>> is so slow. As of now, we are still learning scala and spark. Using Apache
>>> spark utilities make the code faster. However, if we'll make another
>>> package not dependent to apache spark, we have to recode the utilities that
>>> are set private in Apache Spark. So, it is better to use Apache Spark and
>>> insert some code that we can use.
>>>
>>> Thanks,
>>>
>>> Alger
>>>
>>
>


Re: Custom Spark Error on Hadoop Cluster

2016-07-07 Thread Xiangrui Meng
This seems like a deployment or dependency issue. Please check the
following:
1. The unmodified Spark jars were not on the classpath (already existed on
the cluster or pulled in by other packages).
2. The modified jars were indeed deployed to both master and slave nodes.

On Tue, Jul 5, 2016 at 12:29 PM Alger Remirata 
wrote:

> Hi all,
>
> First of all, we like to thank you for developing spark. This helps us a
> lot on our data science task.
>
> I have a question. We have build a customized spark using the following
> command:
> mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver
> -DskipTests clean package.
>
> On the custom spark we built, we've added a new scala file or package
> called StandardNNLS file however it got an error saying:
>
> Name: org.apache.spark.SparkException
> Message: Job aborted due to stage failure: Task 21 in stage 34.0 failed 4
> times, most recent failure: Lost task 21.3 in stage 34.0 (TID 2547,
> 192.168.60.115): java.lang.ClassNotFoundException:
> org.apache.spark.ml.recommendation.ALS$StandardNNLSSolver
>
> StandardNNLSolver is found on another scala file called StandardNNLS.scala
> as we replace the original NNLS solver from scala with StandardNNLS
> Do you guys have some idea about the error. Is there a config file we need
> to edit to add the classpath? Even if we insert the added codes in
> ALS.scala and not create another file like StandardNNLS.scala, the inserted
> code is not recognized. It still gets an error regarding
> ClassNotFoundException
>
> However, when we run this on our local machine and not on the hadoop
> cluster, it is working. We don't know if the error is because we are using
> mvn to build custom spark or it has something to do with communicating to
> hadoop cluster.
>
> We would like to ask some ideas from you how to solve this problem. We can
> actually create another package not dependent to Apache Spark but this is
> so slow. As of now, we are still learning scala and spark. Using Apache
> spark utilities make the code faster. However, if we'll make another
> package not dependent to apache spark, we have to recode the utilities that
> are set private in Apache Spark. So, it is better to use Apache Spark and
> insert some code that we can use.
>
> Thanks,
>
> Alger
>


Re: dataframe stat corr for multiple columns

2016-05-19 Thread Xiangrui Meng
This is nice to have. Please create a JIRA for it. Right now, you can merge
all columns into a vector column using RFormula or VectorAssembler, then
convert it into an RDD and call corr from MLlib.

On Tue, May 17, 2016, 7:09 AM Ankur Jain  wrote:

> Hello Team,
>
>
>
> In my current usecase I am loading data from CSV using spark-csv and
> trying to correlate all variables.
>
>
>
> As of now if we want to correlate 2 column in a dataframe * df.stat.corr*
> works great but if we want to correlate multiple columns this won’t work.
>
> In case of R we can use corrplot and correlate all numeric columns in a
> single line of code. Can you guide me how to achieve the same with
> dataframe or sql?
>
>
>
> There seems a way in spark-mllib
>
> http://spark.apache.org/docs/latest/mllib-statistics.html
>
>
>
>
>
> But it seems that it don’t take input as dataframe…
>
>
>
> Regards,
>
> Ankur
> Information transmitted by this e-mail is proprietary to YASH Technologies
> and/ or its Customers and is intended for use only by the individual or
> entity to which it is addressed, and may contain information that is
> privileged, confidential or exempt from disclosure under applicable law. If
> you are not the intended recipient or it appears that this mail has been
> forwarded to you without proper authority, you are notified that any use or
> dissemination of this information in any manner is strictly prohibited. In
> such cases, please notify us immediately at i...@yash.com and delete this
> mail from your records.
>


Re: Is JavaSparkContext.wholeTextFiles distributed?

2016-04-28 Thread Xiangrui Meng
It implements CombineInputFormat from Hadoop. isSplittable=false means each
individual file cannot be split. If you only see one partition even with a
large minPartitions, perhaps the total size of files is not big enough.
Those are configurable in Hadoop conf. -Xiangrui

On Tue, Apr 26, 2016, 8:32 AM Hyukjin Kwon  wrote:

> EDIT: not mapper but a task for HadoopRDD maybe as far as I know.
>
> I think the most clear way is just to run a job on multiple files with the
> API and check the number of tasks in the job.
> On 27 Apr 2016 12:06 a.m., "Hyukjin Kwon"  wrote:
>
> wholeTextFile() API uses WholeTextFileInputFormat,
> https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala,
> which returns false for isSplittable. In this case, only single mapper
> appears for the entire file as far as I know.
>
> And also https://spark.apache.org/docs/1.6.0/programming-guide.html
>
> If the file is single file, then this would not be distributed.
> On 26 Apr 2016 11:52 p.m., "Ted Yu"  wrote:
>
>> Please take a look at:
>> core/src/main/scala/org/apache/spark/SparkContext.scala
>>
>>* Do `val rdd = sparkContext.wholeTextFile("hdfs://a-hdfs-path")`,
>>*
>>*  then `rdd` contains
>>* {{{
>>*   (a-hdfs-path/part-0, its content)
>>*   (a-hdfs-path/part-1, its content)
>>*   ...
>>*   (a-hdfs-path/part-n, its content)
>>* }}}
>> ...
>>   * @param minPartitions A suggestion value of the minimal splitting
>> number for input data.
>>
>>   def wholeTextFiles(
>>   path: String,
>>   minPartitions: Int = defaultMinPartitions): RDD[(String, String)] =
>> withScope {
>>
>> On Tue, Apr 26, 2016 at 7:43 AM, Vadim Vararu 
>> wrote:
>>
>>> Hi guys,
>>>
>>> I'm trying to read many filed from s3 using
>>> JavaSparkContext.wholeTextFiles(...). Is that executed in a distributed
>>> manner? Please give me a link to the place in documentation where it's
>>> specified.
>>>
>>> Thanks, Vadim.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>


Re: Switch RDD-based MLlib APIs to maintenance mode in Spark 2.0

2016-04-05 Thread Xiangrui Meng
Yes, DB (cc'ed) is working on porting the local linear algebra library over
(SPARK-13944). There are also frequent pattern mining algorithms we need to
port over in order to reach feature parity. -Xiangrui

On Tue, Apr 5, 2016 at 12:08 PM Shivaram Venkataraman <
shiva...@eecs.berkeley.edu> wrote:

> Overall this sounds good to me. One question I have is that in
> addition to the ML algorithms we have a number of linear algebra
> (various distributed matrices) and statistical methods in the
> spark.mllib package. Is the plan to port or move these to the spark.ml
> namespace in the 2.x series ?
>
> Thanks
> Shivaram
>
> On Tue, Apr 5, 2016 at 11:48 AM, Sean Owen <so...@cloudera.com> wrote:
> > FWIW, all of that sounds like a good plan to me. Developing one API is
> > certainly better than two.
> >
> > On Tue, Apr 5, 2016 at 7:01 PM, Xiangrui Meng <men...@gmail.com> wrote:
> >> Hi all,
> >>
> >> More than a year ago, in Spark 1.2 we introduced the ML pipeline API
> built
> >> on top of Spark SQL’s DataFrames. Since then the new DataFrame-based
> API has
> >> been developed under the spark.ml package, while the old RDD-based API
> has
> >> been developed in parallel under the spark.mllib package. While it was
> >> easier to implement and experiment with new APIs under a new package, it
> >> became harder and harder to maintain as both packages grew bigger and
> >> bigger. And new users are often confused by having two sets of APIs with
> >> overlapped functions.
> >>
> >> We started to recommend the DataFrame-based API over the RDD-based API
> in
> >> Spark 1.5 for its versatility and flexibility, and we saw the
> development
> >> and the usage gradually shifting to the DataFrame-based API. Just
> counting
> >> the lines of Scala code, from 1.5 to the current master we added ~1
> >> lines to the DataFrame-based API while ~700 to the RDD-based API. So, to
> >> gather more resources on the development of the DataFrame-based API and
> to
> >> help users migrate over sooner, I want to propose switching RDD-based
> MLlib
> >> APIs to maintenance mode in Spark 2.0. What does it mean exactly?
> >>
> >> * We do not accept new features in the RDD-based spark.mllib package,
> unless
> >> they block implementing new features in the DataFrame-based spark.ml
> >> package.
> >> * We still accept bug fixes in the RDD-based API.
> >> * We will add more features to the DataFrame-based API in the 2.x
> series to
> >> reach feature parity with the RDD-based API.
> >> * Once we reach feature parity (possibly in Spark 2.2), we will
> deprecate
> >> the RDD-based API.
> >> * We will remove the RDD-based API from the main Spark repo in Spark
> 3.0.
> >>
> >> Though the RDD-based API is already in de facto maintenance mode, this
> >> announcement will make it clear and hence important to both MLlib
> developers
> >> and users. So we’d greatly appreciate your feedback!
> >>
> >> (As a side note, people sometimes use “Spark ML” to refer to the
> >> DataFrame-based API or even the entire MLlib component. This also causes
> >> confusion. To be clear, “Spark ML” is not an official name and there
> are no
> >> plans to rename MLlib to “Spark ML” at this time.)
> >>
> >> Best,
> >> Xiangrui
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
>


Switch RDD-based MLlib APIs to maintenance mode in Spark 2.0

2016-04-05 Thread Xiangrui Meng
Hi all,

More than a year ago, in Spark 1.2 we introduced the ML pipeline API built
on top of Spark SQL’s DataFrames. Since then the new DataFrame-based API
has been developed under the spark.ml package, while the old RDD-based API
has been developed in parallel under the spark.mllib package. While it was
easier to implement and experiment with new APIs under a new package, it
became harder and harder to maintain as both packages grew bigger and
bigger. And new users are often confused by having two sets of APIs with
overlapped functions.

We started to recommend the DataFrame-based API over the RDD-based API in
Spark 1.5 for its versatility and flexibility, and we saw the development
and the usage gradually shifting to the DataFrame-based API. Just counting
the lines of Scala code, from 1.5 to the current master we added ~1
lines to the DataFrame-based API while ~700 to the RDD-based API. So, to
gather more resources on the development of the DataFrame-based API and to
help users migrate over sooner, I want to propose switching RDD-based MLlib
APIs to maintenance mode in Spark 2.0. What does it mean exactly?

* We do not accept new features in the RDD-based spark.mllib package,
unless they block implementing new features in the DataFrame-based spark.ml
package.
* We still accept bug fixes in the RDD-based API.
* We will add more features to the DataFrame-based API in the 2.x series to
reach feature parity with the RDD-based API.
* Once we reach feature parity (possibly in Spark 2.2), we will deprecate
the RDD-based API.
* We will remove the RDD-based API from the main Spark repo in Spark 3.0.

Though the RDD-based API is already in de facto maintenance mode, this
announcement will make it clear and hence important to both MLlib
developers and users. So we’d greatly appreciate your feedback!

(As a side note, people sometimes use “Spark ML” to refer to the
DataFrame-based API or even the entire MLlib component. This also causes
confusion. To be clear, “Spark ML” is not an official name and there are no
plans to rename MLlib to “Spark ML” at this time.)

Best,
Xiangrui


Re: How to train and predict in parallel via Spark MLlib?

2016-02-19 Thread Xiangrui Meng
I put a simple example here:
https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1233855/3877825096667927/588180/d9d264e39a.html

On Thu, Feb 18, 2016 at 6:47 AM Игорь Ляхов <tand...@gmail.com> wrote:

> Xiangrui, thnx for your answer!
> Could you clarify some details?
> What do you mean "I can trigger training jobs in different threads on the
> driver"? I have 4-machine cluster (It will grow in future), and I wish
> use them in parallel for training and predicting.
> Do you have any example? It will be great if you show me anyone.
>
> Thanks a lot for your participation!
> --Igor
>
> 2016-02-18 17:24 GMT+03:00 Xiangrui Meng <men...@gmail.com>:
>
>> If you have a big cluster, you can trigger training jobs in different
>> threads on the driver. Putting RDDs inside an RDD won't work. -Xiangrui
>>
>> On Thu, Feb 18, 2016, 4:28 AM Igor L. <tand...@gmail.com> wrote:
>>
>>> Good day, Spark team!
>>> I have to solve regression problem for different restricitons. There is a
>>> bunch of criteria and rules for them, I have to build model and make
>>> predictions for each, combine all and save.
>>> So, now my solution looks like:
>>>
>>> criteria2Rules: List[(String, Set[String])]
>>> var result: RDD[(Id, Double)] = sc.parallelize(Array[(Id, Double)]())
>>> criteria2Rules.foreach {
>>>   case (criterion, rules) =>
>>> val trainDataSet: RDD[LabeledPoint] = prepareTrainSet(criterion,
>>> data)
>>> val model: GradientBoostedTreesModel = buildModel(trainDataSet)
>>> val predictionDataSet = preparePredictionDataSet(criterion, data)
>>> val predictedScores = predictScores(predictionDataSet, model,
>>> criterion, rules)
>>> result = result.union(predictedScores)
>>> }
>>>
>>> It works almost nice, but too slow for the reason
>>> GradientBoostedTreesModel
>>> training not so fast, especially in case of big amount of features,
>>> samples
>>> and also quite big list of using criteria.
>>> I suppose it could work better, if Spark will train models and make
>>> predictions in parallel.
>>>
>>> I've tried to use a relational way of data operation:
>>>
>>> val criteria2RulesRdd: RDD[(String, Set[String])]
>>>
>>> val cartesianCriteriaRules2DataRdd =
>>> criteria2RulesRdd.cartesian(dataRdd)
>>> cartesianCriteriaRules2DataRdd
>>>   .aggregateByKey(List[Data]())(
>>> { case (lst, tuple) => lst :+ tuple }, { case (lstL, lstR) =>
>>> lstL
>>> ::: lstR}
>>>   )
>>>   .map {
>>> case (criteria, rulesSet, scorePredictionDataList) =>
>>>   val trainSet = ???
>>>   val model = ???
>>>   val predictionSet = ???
>>>   val predictedScores = ???
>>>   }
>>>   ...
>>>
>>> but it inevitably brings to situation when one RDD is produced inside
>>> another RDD (GradientBoostedTreesModel is trained on RDD[LabeledPoint])
>>> and
>>> as far as I know it's a bad scenario, e.g.
>>> toy example below doesn't work:
>>> scala> sc.parallelize(1 to 100).map(x => (x,
>>> sc.parallelize(Array(2)).map(_
>>> * 2).collect)).collect.
>>>
>>> Is there any way to use Spark MLlib in parallel way?
>>>
>>> Thank u for attention!
>>>
>>> --
>>> BR,
>>> Junior Scala/Python Developer
>>> Igor L.
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-train-and-predict-in-parallel-via-Spark-MLlib-tp26261.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>
>
> --
> --
> С уважением,
> Игорь Ляхов
>


Re: Why no computations run on workers/slaves in cluster mode?

2016-02-18 Thread Xiangrui Meng
Did the test program finish and did you see any error messages from the
console? -Xiangrui

On Wed, Feb 17, 2016, 1:49 PM Junjie Qian  wrote:

> Hi all,
>
> I am new to Spark, and have one problem that, no computations run on
> workers/slave_servers in the standalone cluster mode.
>
> The Spark version is 1.6.0, and environment is CentOS. I run the example
> codes, e.g.
> https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/mllib/LinearRegression.scala#L117
> .
>
> What I did: 1. setup slaves in ./conf/slaves, 2. setup the spark-env.sh
> file, 3. sbin/start-all.sh, 4. run the test program with spark-submit.
> Follow the link, http://spark.apache.org/docs/latest/spark-standalone.html
> .
>
> Could anyone give some suggestions on this? Or the link to how setup this?
>
> Many thanks
> Junjie Qian
>


Re: How to train and predict in parallel via Spark MLlib?

2016-02-18 Thread Xiangrui Meng
If you have a big cluster, you can trigger training jobs in different
threads on the driver. Putting RDDs inside an RDD won't work. -Xiangrui

On Thu, Feb 18, 2016, 4:28 AM Igor L.  wrote:

> Good day, Spark team!
> I have to solve regression problem for different restricitons. There is a
> bunch of criteria and rules for them, I have to build model and make
> predictions for each, combine all and save.
> So, now my solution looks like:
>
> criteria2Rules: List[(String, Set[String])]
> var result: RDD[(Id, Double)] = sc.parallelize(Array[(Id, Double)]())
> criteria2Rules.foreach {
>   case (criterion, rules) =>
> val trainDataSet: RDD[LabeledPoint] = prepareTrainSet(criterion,
> data)
> val model: GradientBoostedTreesModel = buildModel(trainDataSet)
> val predictionDataSet = preparePredictionDataSet(criterion, data)
> val predictedScores = predictScores(predictionDataSet, model,
> criterion, rules)
> result = result.union(predictedScores)
> }
>
> It works almost nice, but too slow for the reason GradientBoostedTreesModel
> training not so fast, especially in case of big amount of features, samples
> and also quite big list of using criteria.
> I suppose it could work better, if Spark will train models and make
> predictions in parallel.
>
> I've tried to use a relational way of data operation:
>
> val criteria2RulesRdd: RDD[(String, Set[String])]
>
> val cartesianCriteriaRules2DataRdd =
> criteria2RulesRdd.cartesian(dataRdd)
> cartesianCriteriaRules2DataRdd
>   .aggregateByKey(List[Data]())(
> { case (lst, tuple) => lst :+ tuple }, { case (lstL, lstR) => lstL
> ::: lstR}
>   )
>   .map {
> case (criteria, rulesSet, scorePredictionDataList) =>
>   val trainSet = ???
>   val model = ???
>   val predictionSet = ???
>   val predictedScores = ???
>   }
>   ...
>
> but it inevitably brings to situation when one RDD is produced inside
> another RDD (GradientBoostedTreesModel is trained on RDD[LabeledPoint]) and
> as far as I know it's a bad scenario, e.g.
> toy example below doesn't work:
> scala> sc.parallelize(1 to 100).map(x => (x, sc.parallelize(Array(2)).map(_
> * 2).collect)).collect.
>
> Is there any way to use Spark MLlib in parallel way?
>
> Thank u for attention!
>
> --
> BR,
> Junior Scala/Python Developer
> Igor L.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-train-and-predict-in-parallel-via-Spark-MLlib-tp26261.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: AFTSurvivalRegression Prediction and QuantilProbabilities

2016-02-17 Thread Xiangrui Meng
You can get response, and quantiles if you enables quantilesCol. You can
change quantile probabilities as well. There is some example code from the
user guide:
http://spark.apache.org/docs/latest/ml-classification-regression.html#survival-regression.
-Xiangrui


On Mon, Feb 1, 2016 at 9:09 AM Christine Jula <
christine.j...@alexanderthamm.com> wrote:

> Hello,
>
>
>
> I would like to fit a survial model with AFTSurvival Regression. My
> question here is what kind of prediction do I get with this? In the package
> survreg in R I can set a type of prediction ("response", "link", "lp",
> "linear", "terms", "quantile", "uquantile").
>
>
>
> Besides, what can I manipulate with the parameter quantileProbabilities?
>
>
>
> Regards,
>
> Christine
>


Re: Why transformer from ml.Pipeline transform only a DataFrame ?

2015-08-28 Thread Xiangrui Meng
Yes, we will open up APIs in next release. There were some discussion about
the APIs. One approach is to have multiple methods for different outputs
like predicted class and probabilities. -Xiangrui
On Aug 28, 2015 6:39 AM, Jaonary Rabarisoa jaon...@gmail.com wrote:

 Hi there,

 The actual API of ml.Transformer use only DataFrame as input. I have a use
 case where I need to transform a single element. For example transforming
 an element from spark-streaming. Is there any reason for this or the
 ml.Transformer will support transforming a single element later ?

 Cheers,

 Jao



Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

2015-07-28 Thread Xiangrui Meng
Hi Stahlman,

finalRDDStorageLevel is the storage level for the final user/item
factors. It is not common to set it to StorageLevel.NONE, unless you
want to save the factors directly to disk. So if it is NONE, we cannot
unpersist the intermediate RDDs (in/out blocks) because the final
user/item factors returned are not materialized. Otherwise, we have to
recompute from the very beginning (or last checkpoint) when you
materialize the final user/item factors. If you need want to have
multiple runs, you can try to set finalRDDStorageLevel to
MEMORY_AND_DISK, or clean previous runs so the cached RDDs get garbage
collected.

Best,
Xiangrui

On Wed, Jul 22, 2015 at 11:35 AM, Ganelin, Ilya
ilya.gane...@capitalone.com wrote:
 To be Unpersisted the RDD must be persisted first. If it's set to None, then
 it's not persisted, and as such does not need to be freed. Does that make
 sense ?



 Thank you,
 Ilya Ganelin




 -Original Message-
 From: Stahlman, Jonathan [jonathan.stahl...@capitalone.com]
 Sent: Wednesday, July 22, 2015 01:42 PM Eastern Standard Time
 To: user@spark.apache.org
 Subject: Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

 Hello again,

 In trying to understand the caching of intermediate RDDs by ALS, I looked
 into the source code and found what may be a bug.  Looking here:

 https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala#L230

 you see that ALS.train() is being called with finalRDDStorageLevel =
 StorageLevel.NONE, which I would understand to mean that the intermediate
 RDDs will not be persisted.  Looking here:

 https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L631

 unpersist() is only being called on the intermediate RDDs (all the *Blocks
 RDDs listed in my first post) if finalRDDStorageLevel != StorageLevel.NONE.

 This doesn’t make sense to me – I would expect the RDDs to be removed from
 the cache if finalRDDStorageLevel == StorageLevel.NONE, not the other way
 around.

 Jonathan


 From: Stahlman, Stahlman Jonathan jonathan.stahl...@capitalone.com
 Date: Thursday, July 16, 2015 at 2:18 PM
 To: user@spark.apache.org user@spark.apache.org
 Subject: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

 Hello all,

 I am running the Spark recommendation algorithm in MLlib and I have been
 studying its output with various model configurations.  Ideally I would like
 to be able to run one job that trains the recommendation model with many
 different configurations to try to optimize for performance.  A sample code
 in python is copied below.

 The issue I have is that each new model which is trained caches a set of
 RDDs and eventually the executors run out of memory.  Is there any way in
 Pyspark to unpersist() these RDDs after each iteration?  The names of the
 RDDs which I gather from the UI is:

 itemInBlocks
 itemOutBlocks
 Products
 ratingBlocks
 userInBlocks
 userOutBlocks
 users

 I am using Spark 1.3.  Thank you for any help!

 Regards,
 Jonathan




   data_train, data_cv, data_test = data.randomSplit([99,1,1], 2)
   functions = [rating] #defined elsewhere
   ranks = [10,20]
   iterations = [10,20]
   lambdas = [0.01,0.1]
   alphas  = [1.0,50.0]

   results = []
   for ratingFunction, rank, numIterations, m_lambda, m_alpha in
 itertools.product( functions, ranks, iterations, lambdas, alphas ):
 #train model
 ratings_train = data_train.map(lambda l: Rating( l.user, l.product,
 ratingFunction(l) ) )
 model   = ALS.trainImplicit( ratings_train, rank, numIterations,
 lambda_=float(m_lambda), alpha=float(m_alpha) )

 #test performance on CV data
 ratings_cv = data_cv.map(lambda l: Rating( l.uesr, l.product,
 ratingFunction(l) ) )
 auc = areaUnderCurve( ratings_cv, model.predictAll )

 #save results
 result = ,.join(str(l) for l in
 [ratingFunction.__name__,rank,numIterations,m_lambda,m_alpha,auc])
 results.append(result)

 

 The information contained in this e-mail is confidential and/or proprietary
 to Capital One and/or its affiliates and may only be used solely in
 performance of work or services for Capital One. The information transmitted
 herewith is intended only for use by the individual or entity to which it is
 addressed. If the reader of this message is not the intended recipient, you
 are hereby notified that any review, retransmission, dissemination,
 distribution, copying or other use of, or taking of any action in reliance
 upon this information is strictly prohibited. If you have received this
 communication in error, please contact the sender and delete the material
 from your computer.


 

 The information contained in this e-mail is confidential and/or proprietary
 to Capital One and/or its affiliates and may only be used solely in
 performance of work or services for Capital One. The information 

Re: Proper saving/loading of MatrixFactorizationModel

2015-07-28 Thread Xiangrui Meng
The partitioner is not saved with the RDD. So when you load the model
back, we lose the partitioner information. You can call repartition on
the user/product factors and then create a new
MatrixFactorizationModel object using the repartitioned RDDs. It would
be useful to create a utility method for this, e.g.,
`MatrixFactorizationModel.repartition(num: Int):
MatrixFactorizationModel`. -Xiangrui

On Wed, Jul 22, 2015 at 4:34 AM, PShestov pshes...@nvidia.com wrote:
 Hi all!
 I have MatrixFactorizationModel object. If I'm trying to recommend products
 to single user right after constructing model through ALS.train(...) then it
 takes 300ms (for my data and hardware). But if I save model to disk and load
 it back then recommendation takes almost 2000ms. Also Spark warns:
 15/07/17 11:05:47 WARN MatrixFactorizationModel: User factor does not have a
 partitioner. Prediction on individual records could be slow.
 15/07/17 11:05:47 WARN MatrixFactorizationModel: User factor is not cached.
 Prediction could be slow.
 15/07/17 11:05:47 WARN MatrixFactorizationModel: Product factor does not
 have a partitioner. Prediction on individual records could be slow.
 15/07/17 11:05:47 WARN MatrixFactorizationModel: Product factor is not
 cached. Prediction could be slow.
 How can I create/set partitioner and cache user and product factors after
 loading model? Following approach didn't help:
 model.userFeatures().cache();
 model.productFeatures().cache();
 Also I was trying to repartition those rdds and create new model from
 repartitioned versions but that also didn't help.



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Proper-saving-loading-of-MatrixFactorizationModel-tp23952.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: MovieALS Implicit Error

2015-07-28 Thread Xiangrui Meng
Hi Benedict, Did you set lambda to zero? -Xiangrui

On Mon, Jul 13, 2015 at 4:18 AM, Benedict Liang bli...@thecarousell.com wrote:
 Hi Sean,

 This user dataset is organic. What do you think is a good ratings threshold
 then? I am only encountering this with the implicit type though. The
 explicit type works fine though (though it is not suitable for this
 dataset).

 Thank you,
 Benedict

 On Mon, Jul 13, 2015 at 7:15 PM, Sean Owen so...@cloudera.com wrote:

 Is the data set synthetic, or has very few items? or is indeed very
 sparse? those could be reasons. However usually this kind of thing
 happens with very small data sets. I could be wrong about what's going
 on, but it's a decent guess at the immediate cause given the error
 messages.

 On Mon, Jul 13, 2015 at 12:12 PM, Benedict Liang
 bli...@thecarousell.com wrote:
  Hi Sean,
 
  Thank you for your quick response. By very little data, do you mean that
  the
  matrix is too sparse? Or are there too little data points? There are
  3856988
  ratings that are in my dataset currently.
 
  Regards,
  Benedict
 



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Out of Memory Errors on less number of cores in proportion to Partitions in Data

2015-07-28 Thread Xiangrui Meng
Hi Aniruddh,

Increasing number of partitions doesn't always help in ALS due to
communication/computation trade-off. What rank did you set? If the
rank is not large, I'd recommend a small number of partitions. There
are some other numbers to watch. Do you have super popular items/users
in your data?

Best,
Xiangrui

On Wed, Jul 8, 2015 at 5:32 AM, Evo Eftimov evo.efti...@isecc.com wrote:
 Also try to increase the number of partions gradually – not in one big jump
 from 20 to 100 but adding e.g. 10 at a time and see whether there is a
 correlation with adding more RAM to the executors



 From: Evo Eftimov [mailto:evo.efti...@isecc.com]
 Sent: Wednesday, July 8, 2015 1:26 PM
 To: 'Aniruddh Sharma'; 'user@spark.apache.org'
 Subject: RE: Out of Memory Errors on less number of cores in proportion to
 Partitions in Data



 Are you sure you have actually increased the RAM (how exactly did you do
 that and does it show in Spark UI)



 Also use the SPARK UI and the driver console  to check the RAM allocated for
 each RDD and RDD partion in each of the scenarios



 Re b) the general rule is num of partitions = 2 x num of CPU cores



 All partitions are operated in parallel (by independently running JVM
 Threads), however if you have substantially higher num of partitions (JVM
 Threads) than num of core then you will get what happens in any JVM or OS –
 there will be switching between the Threads and some of them will be in a
 suspended mode waiting for free core (Thread contexts also occupy additional
 RAM )



 From: Aniruddh Sharma [mailto:asharma...@gmail.com]
 Sent: Wednesday, July 8, 2015 12:52 PM
 To: Evo Eftimov
 Subject: Re: Out of Memory Errors on less number of cores in proportion to
 Partitions in Data



 Thanks for your revert...

 I increased executor memory from 4GB to 35 GB and still out of memory error
 happens. So it seems it may not be entirely due to more buffers due to more
 partitions.

 Query a) Is there a way to debug at more granular level from user code
 perspective where things could go wrong.



 Query b)

 In general my query is lets suppose it is not ALS (or some iterative
 algorithm). Lets say it is some sample RDD but which 1 partitions and
 each executor has 50 partitions and each machine has 4 physical cores.So do
 4 physical cores parallely try to process these 50 partitions (doing
 multitasking) or will it work in a way that 4 cores will first process first
 4 partitions and then next 4 partitions and so on.

 Thanks and Regards

 Aniruddh



 On Wed, Jul 8, 2015 at 5:09 PM, Evo Eftimov evo.efti...@isecc.com wrote:

 This is most likely due to the internal implementation of ALS in MLib.
 Probably for each parallel unit of execution (partition in Spark terms) the
 implementation allocates and uses a RAM buffer where it keeps interim
 results during the ALS iterations



 If we assume that the size of that internal RAM buffer is fixed per Unit of
 Execution then Total RAM (20 partitions x fixed RAM buffer)  Total RAM (100
 partitions x fixed RAM buffer)



 From: Aniruddh Sharma [mailto:asharma...@gmail.com]
 Sent: Wednesday, July 8, 2015 12:22 PM
 To: user@spark.apache.org
 Subject: Out of Memory Errors on less number of cores in proportion to
 Partitions in Data



 Hi,

 I am new to Spark. I have done following tests and I am confused in
 conclusions. I have 2 queries.

 Following is the detail of test

 Test 1) Used 11 Node Cluster where each machine has 64 GB RAM and 4 physical
 cores. I ran a ALS algorithm using MilLib on 1.6 GB data set. I ran 10
 executors and my Rating data set has 20 partitions. It works. In order to
 increase parallelism, I did 100 partitions instead of 20 and now program
 does not work and it throws out of memory error.



 Query a): As I had 4 cores on each machine , but my number of partitions are
 10 in each executor and my cores are not sufficient for partitions. Is it
 supposed to give memory errors when this kind of misconfiguration.If there
 are not sufficient cores and processing cannot be done in parallel, can
 different partitions not be processed sequentially and operation could have
 become slow rather than throwing memory error.

 Query b)  If it gives error, then error message is not meaningful Here my
 DAG was very simple and I could trace that lowering number of partitions is
 working, but if on misconfiguration of cores it throws error, then how to
 debug it in complex DAGs as error does not tell explicitly that problem
 could be due to low number of cores. If my understanding is incorrect, then
 kindly explain the reasons of error in this case



 Thanks and Regards

 Aniruddh



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Cluster sizing for recommendations

2015-07-28 Thread Xiangrui Meng
Hi Danny,

You might need to reduce the number of partitions (or set userBlocks
and productBlocks directly in ALS). Using a large number of partitions
increases shuffle size and memory requirement. If you have 16 x 16 =
256 cores. I would recommend 64 or 128 instead of 2048.

model.recommendProductsForUsers is a very expensive operation. How
many users/items do you have in your dataset? The cost is basically
#users * #items * rank.

Best,
Xiangrui

On Mon, Jul 6, 2015 at 11:58 AM, Danny Yates da...@codeaholics.org wrote:
 Hi,

 I'm having trouble building a recommender and would appreciate a few
 pointers.

 I have 350,000,000 events which are stored in roughly 500,000 S3 files and
 are formatted as semi-structured JSON. These events are not all relevant to
 making recommendations.

 My code is (roughly):

 case class Event(id: String, eventType: String, line: JsonNode)

 val raw = sc.textFile(s3n://bucket/path/dt=*/*)  // Files stored by
 Hive-style daily partitions

 val parsed = raw.map(json = {
 val obj = (new ObjectMapper()).readTree(json);

 Event(obj.get(_id).asText, obj.get(event).asText, obj);   // Parse
 events into Event objects, keeping parse JSON around for later step
 })

 val downloads = parsed.filter(_.eventType == download)

 val ratings = downloads.map(event = {
 // ... extract userid and assetid (product) from JSON - code elided for
 brevity ...
 Rating(userId, assetId, 1)
 }).repartition(2048)

 ratings.cache

 val model = ALS.trainImplicit(ratings, 10, 10, 0.1, 0.8)

 This gets me to a model in around 20-25 minutes, which is actually pretty
 impressive. But, to get this far in a reasonable time I need to use a fair
 amount of compute power. I've found I need something like 16 x c3.4xl AWS
 instances for the workers (16 cores, 30 GB, SSD storage) and an r3.2xl (8
 cores, 60 GB, SSD storage) for the master. Oddly, the cached Rating objects
 only take a bit under 2GB of RAM.

 I'm developing in a shell at the moment, started like this:

 spark-shell --master yarn-client --executor-cores 16 --executor-memory 23G
 --driver-memory 48G

 --executor-cores: 16 because workers have 16 cores
 --executor-memory: 23GB because that's about the most I can safely allocate
 on a 30GB machine
 --driver-memory: 48GB to make use of the memory on the driver

 I found that if I didn't put the driver/master on a big box with lots of RAM
 I had issues calculating the model, even though the ratings are only taking
 about 2GB of RAM.

 I'm also setting spark.driver.maxResultSize to 40GB.

 If I don't repartition, I end up with 500,000 or so partitions (= number of
 S3 files) and the model doesn't build in any reasonable timescale.

 Now I've got a model, I'm trying (using 1.4.0-rc1 - I can't upgrade to 1.4.0
 yet):

 val recommendations = model.recommendProductsForUsers(5)
 recommendations.cache
 recommendations.first

 This invariably crashes with various memory errors - typically GC errors, or
 errors saying that I'm exceeding the spark.akka.frameSize. Increasing this
 seems to only prolong my agony.

 I would appreciate any advice you can offer. Whilst I appreciate this
 requires a fair amount of CPU, it also seems to need an infeasible amount of
 RAM. To be honest, I probably have far too much because of limitations
 around how I can size EC2 instances in order to get the CPU I need.

 But I've been at this for 3 days now and still haven't actually managed to
 build any recommendations...

 Thanks in advance,

 Danny

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Failed stages and dropped executors when running implicit matrix factorization/ALS : Too many values to unpack

2015-07-28 Thread Xiangrui Meng
)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
 at scala.Option.foreach(Option.scala:236)
 at
 org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
 at
 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450)
 at
 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
 at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
 On Jun 26, 2015, at 12:43 PM, Ravi Mody rmody...@gmail.com wrote:

 I set the number of partitions on the input dataset at 50. The number of CPU
 cores I'm using is 84 (7 executors, 12 cores).

 I'll look into getting a full stack trace. Any idea what my errors mean, and
 why increasing memory causes them to go away? Thanks.

 On Fri, Jun 26, 2015 at 11:26 AM, Xiangrui Meng men...@gmail.com wrote:

 Please see my comments inline. It would be helpful if you can attach
 the full stack trace. -Xiangrui

 On Fri, Jun 26, 2015 at 7:18 AM, Ravi Mody rmody...@gmail.com wrote:
  1. These are my settings:
  rank = 100
  iterations = 12
  users = ~20M
  items = ~2M
  training examples = ~500M-1B (I'm running into the issue even with 500M
  training examples)
 

 Did you set number of blocks? If you didn't, could you check how many
 partitions you have in the ratings RDD? Setting a large number of
 blocks would increase shuffle size. If you have enough RAM, try to set
 number of blocks to the number of CPU cores or less.

  2. The memory storage never seems to go too high. The user blocks may go
  up
  to ~10Gb, and each executor will have a few GB used out of 30 free GB.
  Everything seems small compared to the amount of memory I'm using.
 

 This looks correct.

  3. I think I have a lot of disk space - is this on the executors or the
  driver? Is there a way to know if the error is coming from disk space.
 

 You can see the shuffle data size for each iteration from the WebUI.
 Usually, it should throw an out of disk space exception instead of the
 message you posted. But it is worth checking.

  4. I'm not changing checkpointing settings, but I think checkpointing
  defaults to every 10 iterations? One notable thing is the crashes often
  start on or after the 9th iteration, so it may be related to
  checkpointing.
  But this could just be a coincidence.
 

 If you didn't set checkpointDir in SparkContext, the
 checkpointInterval setting in ALS has no effect.

  Thanks!
 
 
 
 
 
  On Fri, Jun 26, 2015 at 1:08 AM, Ayman Farahat ayman.fara...@yahoo.com
  wrote:
 
  was there any resolution to that problem?
  I am also having that with Pyspark 1.4
  380 Million observations
  100 factors and 5 iterations
  Thanks
  Ayman
 
  On Jun 23, 2015, at 6:20 PM, Xiangrui Meng men...@gmail.com wrote:
 
   It shouldn't be hard to handle 1 billion ratings in 1.3. Just need
   more information to guess what happened:
  
   1. Could you share the ALS settings, e.g., number of blocks, rank and
   number of iterations, as well as number of users/items in your
   dataset?
   2. If you monitor the progress in the WebUI, how much data is stored
   in memory and how much data is shuffled per iteration?
   3. Do you have enough disk space for the shuffle files?
   4. Did you set checkpointDir in SparkContext and checkpointInterval
   in
   ALS?
  
   Best,
   Xiangrui
  
   On Fri, Jun 19, 2015 at 11:43 AM, Ravi Mody rmody...@gmail.com
   wrote:
   Hi, I'm running implicit matrix factorization/ALS in Spark 1.3.1 on
   fairly
   large datasets (1+ billion input records). As I grow my dataset I
   often
   run
   into issues with a lot of failed stages and dropped executors,
   ultimately
   leading to the whole application failing. The errors are like
   org.apache.spark.shuffle.MetadataFetchFailedException: Missing an
   output
   location for shuffle 19 and
   org.apache.spark.shuffle.FetchFailedException:
   Failed to connect to These occur during flatMap, mapPartitions,
   and
   aggregate stages. I know that increasing memory fixes this issue,
   but
   most
   of the time my executors are only using a tiny portion of the their
   allocated memory (10%). Often, the stages run fine until the last
   iteration
   or two of ALS, but this could just be a coincidence.
  
   I've tried tweaking a lot of settings, but it's time-consuming to do
   this
   through guess-and-check. Right now I have these set:
   spark.shuffle.memoryFraction = 0.3
   spark.storage.memoryFraction = 0.65
   spark.executor.heartbeatInterval = 60
  
   I'm sure these settings aren't optimal - any idea of what could be
   causing
   my errors, and what direction I can push these settings in to get
   more
   out
   of my memory? I'm currently using 240 GB of memory (on 7 executors)
   for
   a 1
   billion record dataset, which seems like too much. Thanks!
  
   -
   To unsubscribe, e-mail: user-unsubscr...@spark.apache.org

Re: Spark FP-Growth algorithm for frequent sequential patterns

2015-06-28 Thread Xiangrui Meng
Hi Ping,

FYI, we just merged Feynman's PR:
https://github.com/apache/spark/pull/6997 that adds sequential pattern
support. Please check out master branch and help test. Thanks!

Best,
Xiangrui

On Wed, Jun 24, 2015 at 2:16 PM, Feynman Liang fli...@databricks.com wrote:
 There is a JIRA for this which I just submitted a PR for :)

 On Tue, Jun 23, 2015 at 6:09 PM, Xiangrui Meng men...@gmail.com wrote:

 This is on the wish list for Spark 1.5. Assuming that the items from
 the same transaction are distinct. We can still follow FP-Growth's
 steps:

 1. find frequent items
 2. filter transactions and keep only frequent items
 3. do NOT order by frequency
 4. use suffix to partition the transactions (whether to use prefix or
 suffix doesn't really matter in this case)
 5. grow FP-tree locally on each partition (the data structure should
 be the same)
 6. generate frequent sub-sequences

 +Feynman

 Best,
 Xiangrui

 On Fri, Jun 19, 2015 at 10:51 AM, ping yan sharon...@gmail.com wrote:
  Hi,
 
  I have a use case where I'd like to mine frequent sequential patterns
  (consider the clickpath scenario). Transaction A - B doesn't equal
  Transaction B-A..
 
  From what I understand about FP-growth in general and the MLlib
  implementation of it, the orders are not preserved. Anyone can provide
  some
  insights or ideas in extending the algorithm to solve frequent
  sequential
  pattern mining problems?
 
  Thanks as always.
 
 
  Ping
 



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Failed stages and dropped executors when running implicit matrix factorization/ALS

2015-06-26 Thread Xiangrui Meng
No, they use the same implementation.

On Fri, Jun 26, 2015 at 8:05 AM, Ayman Farahat ayman.fara...@yahoo.com wrote:
 I use the mllib not the ML. Does that make a difference ?

 Sent from my iPhone

 On Jun 26, 2015, at 7:19 AM, Ravi Mody rmody...@gmail.com wrote:

 Forgot to mention: rank of 100 usually works ok, 120 consistently cannot
 finish.

 On Fri, Jun 26, 2015 at 10:18 AM, Ravi Mody rmody...@gmail.com wrote:

 1. These are my settings:
 rank = 100
 iterations = 12
 users = ~20M
 items = ~2M
 training examples = ~500M-1B (I'm running into the issue even with 500M
 training examples)

 2. The memory storage never seems to go too high. The user blocks may go
 up to ~10Gb, and each executor will have a few GB used out of 30 free GB.
 Everything seems small compared to the amount of memory I'm using.

 3. I think I have a lot of disk space - is this on the executors or the
 driver? Is there a way to know if the error is coming from disk space.

 4. I'm not changing checkpointing settings, but I think checkpointing
 defaults to every 10 iterations? One notable thing is the crashes often
 start on or after the 9th iteration, so it may be related to checkpointing.
 But this could just be a coincidence.

 Thanks!





 On Fri, Jun 26, 2015 at 1:08 AM, Ayman Farahat ayman.fara...@yahoo.com
 wrote:

 was there any resolution to that problem?
 I am also having that with Pyspark 1.4
 380 Million observations
 100 factors and 5 iterations
 Thanks
 Ayman

 On Jun 23, 2015, at 6:20 PM, Xiangrui Meng men...@gmail.com wrote:

  It shouldn't be hard to handle 1 billion ratings in 1.3. Just need
  more information to guess what happened:
 
  1. Could you share the ALS settings, e.g., number of blocks, rank and
  number of iterations, as well as number of users/items in your
  dataset?
  2. If you monitor the progress in the WebUI, how much data is stored
  in memory and how much data is shuffled per iteration?
  3. Do you have enough disk space for the shuffle files?
  4. Did you set checkpointDir in SparkContext and checkpointInterval in
  ALS?
 
  Best,
  Xiangrui
 
  On Fri, Jun 19, 2015 at 11:43 AM, Ravi Mody rmody...@gmail.com wrote:
  Hi, I'm running implicit matrix factorization/ALS in Spark 1.3.1 on
  fairly
  large datasets (1+ billion input records). As I grow my dataset I
  often run
  into issues with a lot of failed stages and dropped executors,
  ultimately
  leading to the whole application failing. The errors are like
  org.apache.spark.shuffle.MetadataFetchFailedException: Missing an
  output
  location for shuffle 19 and
  org.apache.spark.shuffle.FetchFailedException:
  Failed to connect to These occur during flatMap, mapPartitions,
  and
  aggregate stages. I know that increasing memory fixes this issue, but
  most
  of the time my executors are only using a tiny portion of the their
  allocated memory (10%). Often, the stages run fine until the last
  iteration
  or two of ALS, but this could just be a coincidence.
 
  I've tried tweaking a lot of settings, but it's time-consuming to do
  this
  through guess-and-check. Right now I have these set:
  spark.shuffle.memoryFraction = 0.3
  spark.storage.memoryFraction = 0.65
  spark.executor.heartbeatInterval = 60
 
  I'm sure these settings aren't optimal - any idea of what could be
  causing
  my errors, and what direction I can push these settings in to get more
  out
  of my memory? I'm currently using 240 GB of memory (on 7 executors)
  for a 1
  billion record dataset, which seems like too much. Thanks!
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Failed stages and dropped executors when running implicit matrix factorization/ALS

2015-06-26 Thread Xiangrui Meng
Please see my comments inline. It would be helpful if you can attach
the full stack trace. -Xiangrui

On Fri, Jun 26, 2015 at 7:18 AM, Ravi Mody rmody...@gmail.com wrote:
 1. These are my settings:
 rank = 100
 iterations = 12
 users = ~20M
 items = ~2M
 training examples = ~500M-1B (I'm running into the issue even with 500M
 training examples)


Did you set number of blocks? If you didn't, could you check how many
partitions you have in the ratings RDD? Setting a large number of
blocks would increase shuffle size. If you have enough RAM, try to set
number of blocks to the number of CPU cores or less.

 2. The memory storage never seems to go too high. The user blocks may go up
 to ~10Gb, and each executor will have a few GB used out of 30 free GB.
 Everything seems small compared to the amount of memory I'm using.


This looks correct.

 3. I think I have a lot of disk space - is this on the executors or the
 driver? Is there a way to know if the error is coming from disk space.


You can see the shuffle data size for each iteration from the WebUI.
Usually, it should throw an out of disk space exception instead of the
message you posted. But it is worth checking.

 4. I'm not changing checkpointing settings, but I think checkpointing
 defaults to every 10 iterations? One notable thing is the crashes often
 start on or after the 9th iteration, so it may be related to checkpointing.
 But this could just be a coincidence.


If you didn't set checkpointDir in SparkContext, the
checkpointInterval setting in ALS has no effect.

 Thanks!





 On Fri, Jun 26, 2015 at 1:08 AM, Ayman Farahat ayman.fara...@yahoo.com
 wrote:

 was there any resolution to that problem?
 I am also having that with Pyspark 1.4
 380 Million observations
 100 factors and 5 iterations
 Thanks
 Ayman

 On Jun 23, 2015, at 6:20 PM, Xiangrui Meng men...@gmail.com wrote:

  It shouldn't be hard to handle 1 billion ratings in 1.3. Just need
  more information to guess what happened:
 
  1. Could you share the ALS settings, e.g., number of blocks, rank and
  number of iterations, as well as number of users/items in your
  dataset?
  2. If you monitor the progress in the WebUI, how much data is stored
  in memory and how much data is shuffled per iteration?
  3. Do you have enough disk space for the shuffle files?
  4. Did you set checkpointDir in SparkContext and checkpointInterval in
  ALS?
 
  Best,
  Xiangrui
 
  On Fri, Jun 19, 2015 at 11:43 AM, Ravi Mody rmody...@gmail.com wrote:
  Hi, I'm running implicit matrix factorization/ALS in Spark 1.3.1 on
  fairly
  large datasets (1+ billion input records). As I grow my dataset I often
  run
  into issues with a lot of failed stages and dropped executors,
  ultimately
  leading to the whole application failing. The errors are like
  org.apache.spark.shuffle.MetadataFetchFailedException: Missing an
  output
  location for shuffle 19 and
  org.apache.spark.shuffle.FetchFailedException:
  Failed to connect to These occur during flatMap, mapPartitions,
  and
  aggregate stages. I know that increasing memory fixes this issue, but
  most
  of the time my executors are only using a tiny portion of the their
  allocated memory (10%). Often, the stages run fine until the last
  iteration
  or two of ALS, but this could just be a coincidence.
 
  I've tried tweaking a lot of settings, but it's time-consuming to do
  this
  through guess-and-check. Right now I have these set:
  spark.shuffle.memoryFraction = 0.3
  spark.storage.memoryFraction = 0.65
  spark.executor.heartbeatInterval = 60
 
  I'm sure these settings aren't optimal - any idea of what could be
  causing
  my errors, and what direction I can push these settings in to get more
  out
  of my memory? I'm currently using 240 GB of memory (on 7 executors) for
  a 1
  billion record dataset, which seems like too much. Thanks!
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: NaiveBayes for MLPipeline is absent

2015-06-25 Thread Xiangrui Meng
FYI, I made a JIRA for this:
https://issues.apache.org/jira/browse/SPARK-8600. -Xiangrui

On Fri, Jun 19, 2015 at 3:01 PM, Xiangrui Meng men...@gmail.com wrote:
 Hi Justin,

 We plan to add it in 1.5, along with some other estimators. We are now
 preparing a list of JIRAs, but feel free to create a JIRA for this and
 submit a PR:)

 Best,
 Xiangrui

 On Thu, Jun 18, 2015 at 6:35 PM, Justin Yip yipjus...@prediction.io wrote:
 Hello,

 Currently, there is no NaiveBayes implementation for MLpipeline. I couldn't
 find the JIRA ticket related to it too (or maybe I missed).

 Is there a plan to implement it? If no one has the bandwidth, I can work on
 it.

 Thanks.

 Justin

 
 View this message in context: NaiveBayes for MLPipeline is absent
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Settings for K-Means Clustering in Mlib for large data set

2015-06-23 Thread Xiangrui Meng
A rough estimate of the worst case memory requirement for driver is
about 2 * k * runs * numFeatures * numPartitions * 8 bytes. I put 2 at
the beginning because the previous centers are still in memory while
receiving new center updates. -Xiangrui

On Fri, Jun 19, 2015 at 9:02 AM, Rogers Jeffrey
rogers.john2...@gmail.com wrote:
 Thanks. Setting the driver memory property  worked for  K=1000 . But when I
 increased K to1500 I get the following error:

 15/06/19 09:38:44 INFO ContextCleaner: Cleaned accumulator 7

 15/06/19 09:38:44 INFO BlockManagerInfo: Removed broadcast_34_piece0 on
 172.31.3.51:45157 in memory (size: 1568.0 B, free: 10.4 GB)

 15/06/19 09:38:44 INFO BlockManagerInfo: Removed broadcast_34_piece0 on
 172.31.9.50:59356 in memory (size: 1568.0 B, free: 73.6 GB)

 15/06/19 09:38:44 INFO BlockManagerInfo: Removed broadcast_34_piece0 on
 172.31.9.50:60934 in memory (size: 1568.0 B, free: 73.6 GB)

 15/06/19 09:38:44 INFO BlockManagerInfo: Removed broadcast_34_piece0 on
 172.31.15.51:37825 in memory (size: 1568.0 B, free: 73.6 GB)

 15/06/19 09:38:44 INFO BlockManagerInfo: Removed broadcast_34_piece0 on
 172.31.15.51:60610 in memory (size: 1568.0 B, free: 73.6 GB)

 15/06/19 09:38:44 INFO ContextCleaner: Cleaned shuffle 5

 Exception in thread Thread-2 java.lang.OutOfMemoryError: Requested array
 size exceeds VM limit

 at java.util.Arrays.copyOf(Arrays.java:2367)

 at
 java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130)

 at
 java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:114)

 at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:587)

 at java.lang.StringBuilder.append(StringBuilder.java:214)

 at py4j.Protocol.getOutputCommand(Protocol.java:305)

 at py4j.commands.CallCommand.execute(CallCommand.java:82)

 at py4j.GatewayConnection.run(GatewayConnection.java:207)

 at java.lang.Thread.run(Thread.java:745)

 Exception in thread Thread-300 java.lang.OutOfMemoryError: Requested array
 size exceeds VM limit

 at java.util.Arrays.copyOf(Arrays.java:2367)

 at
 java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130)

 at
 java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:114)

 at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:587)

 at java.lang.StringBuilder.append(StringBuilder.java:214)

 at py4j.Protocol.getOutputCommand(Protocol.java:305)

 at py4j.commands.CallCommand.execute(CallCommand.java:82)

 at py4j.GatewayConnection.run(GatewayConnection.java:207)


 Is there any method/guideline through which I can understand the memory
 requirement before hand and make appropriate configurations?

 Regards,
 Rogers Jeffrey L

 On Thu, Jun 18, 2015 at 8:14 PM, Rogers Jeffrey rogers.john2...@gmail.com
 wrote:

 I am submitting the application from a python notebook. I am launching
 pyspark as follows:

 SPARK_PUBLIC_DNS=ec2-54-165-202-17.compute-1.amazonaws.com
 SPARK_WORKER_CORES=8 SPARK_WORKER_MEMORY=15g SPARK_MEM=30g OUR_JAVA_MEM=30g
 SPARK_DAEMON_JAVA_OPTS=-XX:MaxPermSize=30g -Xms30g -Xmx30g IPYTHON=1
 PYSPARK_PYTHON=/usr/bin/python SPARK_PRINT_LAUNCH_COMMAND=1
 ./spark/bin/pyspark --master
 spark://54.165.202.17.compute-1.amazonaws.com:7077   --deploy-mode client

 I guess I should be adding another extra argument --conf
 spark.driver.memory=15g . Is that correct?

 Regards,
 Rogers Jeffrey L

 On Thu, Jun 18, 2015 at 7:50 PM, Xiangrui Meng men...@gmail.com wrote:

 With 80,000 features and 1000 clusters, you need 80,000,000 doubles to
 store the cluster centers. That is ~600MB. If there are 10 partitions,
 you might need 6GB on the driver to collect updates from workers. I
 guess the driver died. Did you specify driver memory with
 spark-submit? -Xiangrui

 On Thu, Jun 18, 2015 at 12:22 PM, Rogers Jeffrey
 rogers.john2...@gmail.com wrote:
  Hi All,
 
  I am trying to run KMeans clustering on a large data set with 12,000
  points
  and 80,000 dimensions.  I have a spark cluster in Ec2 stand alone mode
  with
  8  workers running on 2 slaves with 160 GB Ram and 40 VCPU.
 
  My Code is as Follows:
 
  def convert_into_sparse_vector(A):
  non_nan_indices=np.nonzero(~np.isnan(A) )
  non_nan_values=A[non_nan_indices]
  dictionary=dict(zip(non_nan_indices[0],non_nan_values))
  return Vectors.sparse (len(A),dictionary)
 
  X=[convert_into_sparse_vector(A) for A in complete_dataframe.values ]
  sc=SparkContext(appName=parallel_kmeans)
  data=sc.parallelize(X,10)
  model = KMeans.train(data, 1000, initializationMode=k-means||)
 
  where complete_dataframe is a pandas data frame that has my data.
 
  I get the error: Py4JNetworkError: An error occurred while trying to
  connect
  to the Java server.
 
  The error  trace is as follows:
 
   Exception happened during
  processing of request from ('127.0.0.1', 41360) Traceback (most recent
  call last):   File /usr/lib64/python2.6/SocketServer.py, line

Re: Spark FP-Growth algorithm for frequent sequential patterns

2015-06-23 Thread Xiangrui Meng
This is on the wish list for Spark 1.5. Assuming that the items from
the same transaction are distinct. We can still follow FP-Growth's
steps:

1. find frequent items
2. filter transactions and keep only frequent items
3. do NOT order by frequency
4. use suffix to partition the transactions (whether to use prefix or
suffix doesn't really matter in this case)
5. grow FP-tree locally on each partition (the data structure should
be the same)
6. generate frequent sub-sequences

+Feynman

Best,
Xiangrui

On Fri, Jun 19, 2015 at 10:51 AM, ping yan sharon...@gmail.com wrote:
 Hi,

 I have a use case where I'd like to mine frequent sequential patterns
 (consider the clickpath scenario). Transaction A - B doesn't equal
 Transaction B-A..

 From what I understand about FP-growth in general and the MLlib
 implementation of it, the orders are not preserved. Anyone can provide some
 insights or ideas in extending the algorithm to solve frequent sequential
 pattern mining problems?

 Thanks as always.


 Ping


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Failed stages and dropped executors when running implicit matrix factorization/ALS

2015-06-23 Thread Xiangrui Meng
It shouldn't be hard to handle 1 billion ratings in 1.3. Just need
more information to guess what happened:

1. Could you share the ALS settings, e.g., number of blocks, rank and
number of iterations, as well as number of users/items in your
dataset?
2. If you monitor the progress in the WebUI, how much data is stored
in memory and how much data is shuffled per iteration?
3. Do you have enough disk space for the shuffle files?
4. Did you set checkpointDir in SparkContext and checkpointInterval in ALS?

Best,
Xiangrui

On Fri, Jun 19, 2015 at 11:43 AM, Ravi Mody rmody...@gmail.com wrote:
 Hi, I'm running implicit matrix factorization/ALS in Spark 1.3.1 on fairly
 large datasets (1+ billion input records). As I grow my dataset I often run
 into issues with a lot of failed stages and dropped executors, ultimately
 leading to the whole application failing. The errors are like
 org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
 location for shuffle 19 and org.apache.spark.shuffle.FetchFailedException:
 Failed to connect to These occur during flatMap, mapPartitions, and
 aggregate stages. I know that increasing memory fixes this issue, but most
 of the time my executors are only using a tiny portion of the their
 allocated memory (10%). Often, the stages run fine until the last iteration
 or two of ALS, but this could just be a coincidence.

 I've tried tweaking a lot of settings, but it's time-consuming to do this
 through guess-and-check. Right now I have these set:
 spark.shuffle.memoryFraction = 0.3
 spark.storage.memoryFraction = 0.65
 spark.executor.heartbeatInterval = 60

 I'm sure these settings aren't optimal - any idea of what could be causing
 my errors, and what direction I can push these settings in to get more out
 of my memory? I'm currently using 240 GB of memory (on 7 executors) for a 1
 billion record dataset, which seems like too much. Thanks!

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How could output the StreamingLinearRegressionWithSGD prediction result?

2015-06-23 Thread Xiangrui Meng
Please check the input path to your test data, and call `.count()` and
see whether there are records in it. -Xiangrui

On Sat, Jun 20, 2015 at 9:23 PM, Gavin Yue yue.yuany...@gmail.com wrote:
 Hey,

 I am testing the StreamingLinearRegressionWithSGD following the tutorial.


 It works, but I could not output the prediction results. I tried the
 saveAsTextFile, but it only output _SUCCESS to the folder.


 I am trying to check the prediction results and use
 BinaryClassificationMetrics to get areaUnderROC.


 Any example for this?

 Thank you !

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: which mllib algorithm for large multi-class classification?

2015-06-23 Thread Xiangrui Meng
We have multinomial logistic regression implemented. For your case,
the model size is 500 * 300,000 = 150,000,000. MLlib's implementation
might not be able to handle it efficiently, we plan to have a more
scalable implementation in 1.5. However, it shouldn't give you an
array larger than MaxInt exception. Could you paste the stack trace?
-Xiangrui

On Mon, Jun 22, 2015 at 4:21 PM, Danny kont...@dannylinden.de wrote:
 hi,

 I am unfortunately not very fit in the whole MLlib stuff, so I would
 appreciate a little help:

 Which multi-class classification algorithm i should use if i want to train
 texts (100-1000 words each) into categories. The number of categories is
 between 100-500 and the number of training documents which i have transform
 to tf-idf vectors is max ~ 300.000

 it looks like the most algorithms are running into OOM exception or array
 larger than MaxInt exceptions with a large number of classes/categories
 cause there are collect steps in it?

 thanks a lot



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/which-mllib-algorithm-for-large-multi-class-classification-tp23439.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: mutable vs. pure functional implementation - StatCounter

2015-06-23 Thread Xiangrui Meng
Creating millions of temporary (immutable) objects is bad for
performance. It should be simple to do a micro-benchmark locally.
-Xiangrui

On Mon, Jun 22, 2015 at 7:25 PM, mzeltser mzelt...@gmail.com wrote:
 Using StatCounter as an example, I'd like to understand if pure functional
 implementation would be more or less beneficial for accumulating
 structures used inside RDD.map

 StatCounter.merge is updating mutable class variables and returning
 reference to same object. This is clearly a non-functional implementation
 and it mutates existing state of the instance. (Unless I'm missing
 something)

 Would it be preferable to have all the class variables declared as val and
 create new instance to hold merged values?

 The StatCounter would be used inside the RDD.map to collect stats on the
 fly.
 Would mutable state present bottleneck?

 Can anybody comment on why non-functional implementation has been chosen?






 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/mutable-vs-pure-functional-implementation-StatCounter-tp23441.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: NaiveBayes for MLPipeline is absent

2015-06-19 Thread Xiangrui Meng
Hi Justin,

We plan to add it in 1.5, along with some other estimators. We are now
preparing a list of JIRAs, but feel free to create a JIRA for this and
submit a PR:)

Best,
Xiangrui

On Thu, Jun 18, 2015 at 6:35 PM, Justin Yip yipjus...@prediction.io wrote:
 Hello,

 Currently, there is no NaiveBayes implementation for MLpipeline. I couldn't
 find the JIRA ticket related to it too (or maybe I missed).

 Is there a plan to implement it? If no one has the bandwidth, I can work on
 it.

 Thanks.

 Justin

 
 View this message in context: NaiveBayes for MLPipeline is absent
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Issue with PySpark UDF on a column of Vectors

2015-06-18 Thread Xiangrui Meng
This is a known issue. See
https://issues.apache.org/jira/browse/SPARK-7902 -Xiangrui

On Thu, Jun 18, 2015 at 6:41 AM, calstad colin.als...@gmail.com wrote:
 I am having trouble using a UDF on a column of Vectors in PySpark which can
 be illustrated here:

 from pyspark import SparkContext
 from pyspark.sql import Row
 from pyspark.sql.types import DoubleType
 from pyspark.sql.functions import udf
 from pyspark.mllib.linalg import Vectors

 FeatureRow = Row('id', 'features')
 data = sc.parallelize([(0, Vectors.dense([9.7, 1.0, -3.2])),
(1, Vectors.dense([2.25, -11.1, 123.2])),
(2, Vectors.dense([-7.2, 1.0, -3.2]))])
 df = data.map(lambda r: FeatureRow(*r)).toDF()

 vector_udf = udf(lambda vector: sum(vector), DoubleType())

 df.withColumn('feature_sums', vector_udf(df.features)).first()

 This fails with the following stack trace:

 Py4JJavaError: An error occurred while calling
 z:org.apache.spark.api.python.PythonRDD.collectAndServe.
 : org.apache.spark.SparkException: Job aborted due to stage failure: Task 5
 in stage 31.0 failed 1 times, most recent failure: Lost task 5.0 in stage
 31.0 (TID 95, localhost): org.apache.spark.api.python.PythonException:
 Traceback (most recent call last):
   File /Users/colin/src/spark/python/lib/pyspark.zip/pyspark/worker.py,
 line 111, in main
 process()
   File /Users/colin/src/spark/python/lib/pyspark.zip/pyspark/worker.py,
 line 106, in process
 serializer.dump_stream(func(split_index, iterator), outfile)
 x1  File
 /Users/colin/src/spark/python/lib/pyspark.zip/pyspark/serializers.py, line
 263, in dump_stream
 vs = list(itertools.islice(iterator, batch))
   File /Users/colin/src/spark/python/pyspark/sql/functions.py, line 469,
 in lambda
 func = lambda _, it: map(lambda x: f(*x), it)
   File /Users/colin/pokitdok/spark_mapper/spark_mapper/filters.py, line
 143, in lambda
 TypeError: unsupported operand type(s) for +: 'int' and 'NoneType'


 Looking at what gets passed to the UDF, there seems to be something strange.
 The argument passed should be a Vector, but instead it gets passed a Python
 tuple like this:

 (1, None, None, [9.7, 1.0, -3.2])

 Is it not possible to use UDFs on DataFrame columns of Vectors?



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Issue-with-PySpark-UDF-on-a-column-of-Vectors-tp23393.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Does MLLib has attribute importance?

2015-06-18 Thread Xiangrui Meng
ChiSqSelector calls an RDD of labeled points, where the label is the
target. See 
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala#L120

On Wed, Jun 17, 2015 at 10:22 PM, Ruslan Dautkhanov
dautkha...@gmail.com wrote:
 Thank you Xiangrui.

 Oracle's attribute importance mining function have a target variable.
 Attribute importance is a supervised function that ranks attributes
 according to their significance in predicting a target.
 MLlib's ChiSqSelector does not have a target variable.




 --
 Ruslan Dautkhanov

 On Wed, Jun 17, 2015 at 5:50 PM, Xiangrui Meng men...@gmail.com wrote:

 We don't have it in MLlib. The closest would be the ChiSqSelector,
 which works for categorical data. -Xiangrui

 On Thu, Jun 11, 2015 at 4:33 PM, Ruslan Dautkhanov dautkha...@gmail.com
 wrote:
  What would be closest equivalent in MLLib to Oracle Data Miner's
  Attribute
  Importance mining function?
 
 
  http://docs.oracle.com/cd/B28359_01/datamine.111/b28129/feature_extr.htm#i1005920
 
  Attribute importance is a supervised function that ranks attributes
  according to their significance in predicting a target.
 
 
  Best regards,
  Ruslan Dautkhanov



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Settings for K-Means Clustering in Mlib for large data set

2015-06-18 Thread Xiangrui Meng
With 80,000 features and 1000 clusters, you need 80,000,000 doubles to
store the cluster centers. That is ~600MB. If there are 10 partitions,
you might need 6GB on the driver to collect updates from workers. I
guess the driver died. Did you specify driver memory with
spark-submit? -Xiangrui

On Thu, Jun 18, 2015 at 12:22 PM, Rogers Jeffrey
rogers.john2...@gmail.com wrote:
 Hi All,

 I am trying to run KMeans clustering on a large data set with 12,000 points
 and 80,000 dimensions.  I have a spark cluster in Ec2 stand alone mode  with
 8  workers running on 2 slaves with 160 GB Ram and 40 VCPU.

 My Code is as Follows:

 def convert_into_sparse_vector(A):
 non_nan_indices=np.nonzero(~np.isnan(A) )
 non_nan_values=A[non_nan_indices]
 dictionary=dict(zip(non_nan_indices[0],non_nan_values))
 return Vectors.sparse (len(A),dictionary)

 X=[convert_into_sparse_vector(A) for A in complete_dataframe.values ]
 sc=SparkContext(appName=parallel_kmeans)
 data=sc.parallelize(X,10)
 model = KMeans.train(data, 1000, initializationMode=k-means||)

 where complete_dataframe is a pandas data frame that has my data.

 I get the error: Py4JNetworkError: An error occurred while trying to connect
 to the Java server.

 The error  trace is as follows:

  Exception happened during
 processing of request from ('127.0.0.1', 41360) Traceback (most recent
 call last):   File /usr/lib64/python2.6/SocketServer.py, line 283,
 in _handle_request_noblock
 self.process_request(request, client_address)   File
 /usr/lib64/python2.6/SocketServer.py, line 309, in process_request
 self.finish_request(request, client_address)   File
 /usr/lib64/python2.6/SocketServer.py, line 322, in finish_request
 self.RequestHandlerClass(request, client_address, self)   File
 /usr/lib64/python2.6/SocketServer.py, line 617, in __init__
 self.handle()   File /root/spark/python/pyspark/accumulators.py,
 line 235, in handle
 num_updates = read_int(self.rfile)   File
 /root/spark/python/pyspark/serializers.py, line 544, in read_int
 raise EOFError EOFError
 

 ---
 Py4JNetworkError  Traceback (most recent call
 last) ipython-input-13-3dd00c2c5e93 in module()
  1 model = KMeans.train(data, 1000, initializationMode=k-means||)

 /root/spark/python/pyspark/mllib/clustering.pyc in train(cls, rdd, k,
 maxIterations, runs, initializationMode, seed, initializationSteps,
 epsilon)
 134 Train a k-means clustering model.
 135 model = callMLlibFunc(trainKMeansModel,
 rdd.map(_convert_to_vector), k, maxIterations,
 -- 136   runs, initializationMode, seed,
 initializationSteps, epsilon)
 137 centers = callJavaFunc(rdd.context, model.clusterCenters)
 138 return KMeansModel([c.toArray() for c in centers])

 /root/spark/python/pyspark/mllib/common.pyc in callMLlibFunc(name,
 *args)
 126 sc = SparkContext._active_spark_context
 127 api = getattr(sc._jvm.PythonMLLibAPI(), name)
 -- 128 return callJavaFunc(sc, api, *args)
 129
 130

 /root/spark/python/pyspark/mllib/common.pyc in callJavaFunc(sc, func,
 *args)
 119  Call Java Function 
 120 args = [_py2java(sc, a) for a in args]
 -- 121 return _java2py(sc, func(*args))
 122
 123

 /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in
 __call__(self, *args)
 534 END_COMMAND_PART
 535
 -- 536 answer = self.gateway_client.send_command(command)
 537 return_value = get_return_value(answer,
 self.gateway_client,
 538 self.target_id, self.name)

 /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in
 send_command(self, command, retry)
 367 if retry:
 368 #print_exc()
 -- 369 response = self.send_command(command)
 370 else:
 371 response = ERROR

 /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in
 send_command(self, command, retry)
 360  the Py4J protocol.
 361 
 -- 362 connection = self._get_connection()
 363 try:
 364 response = connection.send_command(command)

 /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in
 _get_connection(self)
 316 connection = self.deque.pop()
 317 except Exception:
 -- 318 connection = self._create_connection()
 319 return connection
 320

 /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in
 _create_connection(self)
 323 connection = GatewayConnection(self.address, self.port,
 324 self.auto_close, self.gateway_property)
 -- 325 connection.start()
 326 return connection
 327


Re: MLLib: instance weight

2015-06-17 Thread Xiangrui Meng
Hi Gajan,

Please subscribe our user mailing list, which is the best place to get
your questions answered. We don't have weighted instance support, but
it should be easy to add and we plan to do it in the next release
(1.5). Thanks for asking!

Best,
Xiangrui

On Wed, Jun 17, 2015 at 2:33 PM, Gajan S gajans1...@googlemail.com wrote:
 Hi,

 Would you know if spark supports additions of weights to instances ? I
 couldn't find any documentation to do this.

 Thanks,
 Gajan

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark mlib variance analysis

2015-06-17 Thread Xiangrui Meng
We don't have R-like model summary in MLlib, but we plan to add some
in 1.5. Please watch https://issues.apache.org/jira/browse/SPARK-7674.
-Xiangrui

On Thu, May 28, 2015 at 3:47 PM, rafac rafaelme...@hotmail.com wrote:
 I have a simple problem:
 i got mean number of people on one place by hour(time-series like), and now
 i want to know if the weather condition have impact on the mean number.
 I would do it with  variance analysis like anova in spss or analysing the
 resultant regression model summary
 How is it possible to do with apache spark mlib?

 Thanks



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/spark-mlib-variance-analysis-tp23079.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Why the default Params.copy doesn't work for Model.copy?

2015-06-17 Thread Xiangrui Meng
That's is a bug, which will be fixed in
https://github.com/apache/spark/pull/6622. I disabled Model.copy
because models usually doesn't have a default constructor and hence
the default Params.copy implementation won't work. Unfortunately, due
to insufficient test coverage, StringIndexModel.copy is not
implemented. -Xiangrui

On Thu, Jun 4, 2015 at 10:09 PM, Justin Yip yipjus...@prediction.io wrote:
 Hello,

 I have a question with Spark 1.4 ml library. In the copy function, it is
 stated that the default implementation doesn't work of Params doesn't work
 for models.
 (https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/Model.scala#L49)

 As a result, some feature generation transformer like StringIndexerModel
 cannot be used in Pipeline.

 Maybe due to my limited knowledge in ML pipeline, can anyone give me some
 hints why Model.copy behaves differently as other Params?

 Thanks!

 Justin

 
 View this message in context: Why the default Params.copy doesn't work for
 Model.copy?
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Optimization module in Python mllib

2015-06-17 Thread Xiangrui Meng
There is no plan at this time. We haven't reached 100% coverage on
user-facing API in PySpark yet, which would have higher priority.
-Xiangrui

On Sun, Jun 7, 2015 at 1:42 AM, martingoodson martingood...@gmail.com wrote:
 Am I right in thinking that Python mllib does not contain the optimization
 module? Are there plans to add this to the Python api?



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Optimization-module-in-Python-mllib-tp23191.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Does MLLib has attribute importance?

2015-06-17 Thread Xiangrui Meng
We don't have it in MLlib. The closest would be the ChiSqSelector,
which works for categorical data. -Xiangrui

On Thu, Jun 11, 2015 at 4:33 PM, Ruslan Dautkhanov dautkha...@gmail.com wrote:
 What would be closest equivalent in MLLib to Oracle Data Miner's Attribute
 Importance mining function?

 http://docs.oracle.com/cd/B28359_01/datamine.111/b28129/feature_extr.htm#i1005920

 Attribute importance is a supervised function that ranks attributes
 according to their significance in predicting a target.


 Best regards,
 Ruslan Dautkhanov

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: *Metrics API is odd in MLLib

2015-06-17 Thread Xiangrui Meng
LabeledPoint was used for both classification and regression, where label
type is Double for simplicity. So in BinaryClassificationMetrics, we still
use Double for labels. We compute the confusion matrix at each threshold
internally, but this is not exposed to users (
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala#L127).
Feel free to submit a PR to make it public. -Xiangrui

On Mon, Jun 15, 2015 at 7:13 AM, Sam samthesav...@gmail.com wrote:


 Google+
 https://plus.google.com/app/basic?nopromo=1source=moggl=uk
 http://mail.google.com/mail/x/mog-/gp/?source=moggl=uk
 Calendar
 https://www.google.com/calendar/gpcal?source=moggl=uk
 Web
 http://www.google.co.uk/?source=moggl=uk
 more
 Inbox
 Apache Spark Email
 GmailNot Work
 S
 sam.sav...@barclays.com
 to me
 0 minutes ago
 Details
 According to
 https://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.mllib.evaluation.BinaryClassificationMetrics

 The constructor takes `RDD[(Double, Double)]` meaning lables are Doubles,
 this seems odd, shouldn't it be Boolean?  Similarly for MutlilabelMetrics
 (I.e. Should be RDD[(Array[Double], Array[Boolean])]), and for
 MulticlassMetrics the type of both should be generic?

 Additionally it would be good if either the ROC output type was changed or
 another method was added that returned confusion matricies, so that the
 hard integer values can be obtained before the divisions. E.g.

 ```
 case class Confusion(tp: Int, fp: Int, fn: Int, tn: Int)
 {
   // bunch of methods for each of the things in the table here
 https://en.wikipedia.org/wiki/Receiver_operating_characteristic
 }
 ...
 def confusions(): RDD[Confusion]
 ```



Re: Parallel parameter tuning: distributed execution of MLlib algorithms

2015-06-17 Thread Xiangrui Meng
On Fri, May 22, 2015 at 6:15 AM, Hugo Ferreira h...@inesctec.pt wrote:
 Hi,

 I am currently experimenting with linear regression (SGD) (Spark + MLlib,
 ver. 1.2). At this point in time I need to fine-tune the hyper-parameters. I
 do this (for now) by an exhaustive grid search of the step size and the
 number of iterations. Currently I am on a dual core that acts as a master
 (local mode for now but will be adding spark worker later). In order to
 maximize throughput I need to execute each execution of the linear
 regression algorithm in parallel.


How big is your dataset? If it is small or medium-sized, you might get better
performance by broadcasting the entire dataset and use a single machine solver
on each workers.

 According to the documentation it seems like parallel jobs may be scheduled
 if they are executed in separate threads [1]. So this brings me to my first
 question: does this mean I am CPU bound by the Spark master? In other words
 the maximum number of jobs = maximum number of threads of the OS?


We use the driver to collect model updates. Increasing the number of
parallel jobs
also increasing the driver load for both communication and computation. I don't
think you need to worry much about the max number of threads, which is usually
much larger than the number of parallel jobs we can actually run.

 I searched the mailing list but did not find anything regarding MLlib
 itself. I even peaked into the new MLlib API that uses pipelines and has
 support for parameter tuning. However, it looks like each job (instance of
 the learning algorithm) is executed in sequence. Can anyone confirm this?
 This brings me to my 2ndo question: is their any example that shows how one
 can execute MLlib algorithms as parallel jobs?


The new API is not optimized for performance yet. There is an example
here for k-means:

https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala#L393

 Finally, is their any general technique I can use to execute an algorithm in
 a distributed manner using Spark? More specifically I would like to have
 several MLlib algorithms run in parallel. Can anyone show me an example of
 sorts to do this?

 TIA.
 Hugo F.







 [1] https://spark.apache.org/docs/1.2.0/job-scheduling.html




 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Collabrative Filtering

2015-06-17 Thread Xiangrui Meng
Please following the code examples from the user guide:
http://spark.apache.org/docs/latest/programming-guide.html#passing-functions-to-spark.
-Xiangrui

On Tue, May 26, 2015 at 12:34 AM, Yasemin Kaya godo...@gmail.com wrote:
 Hi,

 In CF

 String path = data/mllib/als/test.data;
 JavaRDDString data = sc.textFile(path);
 JavaRDDRating ratings = data.map(new FunctionString, Rating() {
 public Rating call(String s) {
 String[] sarray = s.split(,);
 return new Rating(Integer.parseInt(sarray[0]), Integer
 .parseInt(sarray[1]), Double.parseDouble(sarray[2]));
 }
 });

 implemented like that.

 I want to use CF for my data set, but it is JavaPairRDDString,
 ListInteger . How can I convert my dataset to JavaRDDRating. Thank
 you..

 Best,
 yasemin


 --
 hiç ender hiç

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to use Apache spark mllib Model output in C++ component

2015-06-17 Thread Xiangrui Meng
In 1.3, we added some model save/load support in Parquet format. You
can use Parquet's C++ library (https://github.com/Parquet/parquet-cpp)
to load the data back. -Xiangrui

On Wed, Jun 10, 2015 at 12:15 AM, Akhil Das ak...@sigmoidanalytics.com wrote:
 Hope Swig and JNA might help for accessing c++ libraries from Java.

 Thanks
 Best Regards

 On Wed, Jun 10, 2015 at 11:50 AM, mahesht mahesh.s.tup...@gmail.com wrote:


 There is C++ component which uses some model which we want to replace it
 by
 spark model output, but there is no C++ API support for reading model,
 what
 is the best way to solve this problem..?





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-Apache-spark-mllib-Model-output-in-C-component-tp23239.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: RandomForest - subsamplingRate parameter

2015-06-17 Thread Xiangrui Meng
Because we don't have random access to the record, sampling still need
to go through the records sequentially. It does save some computation,
which is perhaps noticeable only if you have data cached in memory.
Different random seeds are used for trees. -Xiangrui

On Wed, Jun 3, 2015 at 4:40 PM, Andrew Leverentz andylevere...@fico.com wrote:
 When training a RandomForest model, the Strategy class (in
 mllib.tree.configuration) provides a subsamplingRate parameter.  I was
 hoping to use this to cut down on processing time for large datasets (more
 than 2MM rows and 9K predictors), but I’ve found that the runtime stays
 approximately constant (and sometimes noticeably increases) when I try
 lowering the value of subsamplingRate.



 Is this the expected behavior?  (And, if so, what is the intended purpose of
 this parameter?)



 Of course, I could always just subsample the input dataset prior to running
 RF, but I was hoping that the subsamplingRate (which ostensibly affects the
 sampling used during RF bagging) would decrease the amount of data
 processing without requiring me to entirely ignore large subsets of the
 data.



 Thanks,



 ~ Andrew




 This email and any files transmitted with it are confidential, proprietary
 and intended solely for the individual or entity to whom they are addressed.
 If you have received this email in error please delete it immediately.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: redshift spark

2015-06-17 Thread Xiangrui Meng
Hi Hafiz,

As Ewan mentioned, the path is the path to the S3 files unloaded from
Redshift. This is a more scalable way to get a large amount of data
from Redshift than via JDBC. I'd recommend using the SQL API instead
of the Hadoop API (https://github.com/databricks/spark-redshift).

Best,
Xiangrui

On Fri, Jun 5, 2015 at 7:29 AM, Ewan Leith ewan.le...@realitymine.com wrote:
 That project is for reading data in from Redshift table exports stored in s3 
 by running commands in redshift like this:

 unload ('select * from venue')
 to 's3://mybucket/tickit/unload/'

 http://docs.aws.amazon.com/redshift/latest/dg/t_Unloading_tables.html

 The path in the parameters below is the s3 bucket path.

 Hope this helps,
 Ewan

 -Original Message-
 From: Hafiz Mujadid [mailto:hafizmujadi...@gmail.com]
 Sent: 05 June 2015 15:25
 To: user@spark.apache.org
 Subject: redshift spark

 Hi All,

 I want to read and write data to aws redshift. I found spark-redshift project 
 at following address.
 https://github.com/databricks/spark-redshift

 in its documentation there is following code is written.
 import com.databricks.spark.redshift.RedshiftInputFormat

 val records = sc.newAPIHadoopFile(
   path,
   classOf[RedshiftInputFormat],
   classOf[java.lang.Long],
   classOf[Array[String]])

 I am unable to understand it's parameters. Can somebody explain how to use 
 this? what is meant by path in this case?

 thanks



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/redshift-spark-tp23175.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
 commands, e-mail: user-h...@spark.apache.org


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: k-means for text mining in a streaming context

2015-06-17 Thread Xiangrui Meng
Yes. You can apply HashingTF on your input stream and then use
StreamingKMeans for training and prediction. -Xiangrui

On Mon, Jun 8, 2015 at 11:05 AM, Ruslan Dautkhanov dautkha...@gmail.com wrote:
 Hello,

 https://spark.apache.org/docs/latest/mllib-feature-extraction.html
 would Feature Extraction and Transformation work in a streaming context?

 Wanted to extract text features, build K-means clusters for streaming
 context
 to detect anomalies on a continuous text stream.

 Would it be possible?


 Best reagrds,
 Ruslan Dautkhanov


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: What is the right algorithm to do cluster analysis with mixed numeric, categorical, and string value attributes?

2015-06-17 Thread Xiangrui Meng
You can try hashing to control the feature dimension. MLlib's k-means
implementation can handle sparse data efficiently if the number of
features is not huge. -Xiangrui

On Tue, Jun 16, 2015 at 2:44 PM, Rex X dnsr...@gmail.com wrote:
 Hi Sujit,

 That's a good point. But 1-hot encoding will make our data changing from
 Terabytes to Petabytes, because we have tens of categorical attributes, and
 some of them contain thousands of categorical values.

 Is there any way to make a good balance of data size and right
 representation of categories?


 -Rex



 On Tue, Jun 16, 2015 at 1:27 PM, Sujit Pal sujitatgt...@gmail.com wrote:

 Hi Rexx,

 In general (ie not Spark specific), its best to convert categorical data
 to 1-hot encoding rather than integers - that way the algorithm doesn't use
 the ordering implicit in the integer representation.

 -sujit


 On Tue, Jun 16, 2015 at 1:17 PM, Rex X dnsr...@gmail.com wrote:

 Is it necessary to convert categorical data into integers?

 Any tips would be greatly appreciated!

 -Rex

 On Sun, Jun 14, 2015 at 10:05 AM, Rex X dnsr...@gmail.com wrote:

 For clustering analysis, we need a way to measure distances.

 When the data contains different levels of measurement -
 binary / categorical (nominal), counts (ordinal), and ratio (scale)

 To be concrete, for example, working with attributes of
 city, zip, satisfaction_level, price

 In the meanwhile, the real data usually also contains string attributes,
 for example, book titles. The distance between two strings can be measured
 by minimum-edit-distance.


 In SPSS, it provides Two-Step Cluster, which can handle both ratio scale
 and ordinal numbers.


 What is right algorithm to do hierarchical clustering analysis with all
 these four-kind attributes above with MLlib?


 If we cannot find a right metric to measure the distance, an alternative
 solution is to do a topological data analysis (e.g. linkage, and etc). Can
 we do such kind of analysis with GraphX?


 -Rex





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Inconsistent behavior with Dataframe Timestamp between 1.3.1 and 1.4.0

2015-06-17 Thread Xiangrui Meng
That sounds like a bug. Could you create a JIRA and ping Yin Huai
(cc'ed). -Xiangrui

On Wed, May 27, 2015 at 12:57 AM, Justin Yip yipjus...@prediction.io wrote:
 Hello,

 I am trying out 1.4.0 and notice there are some differences in behavior with
 Timestamp between 1.3.1 and 1.4.0.

 In 1.3.1, I can compare a Timestamp with string.
 scala val df = sqlContext.createDataFrame(Seq((1,
 Timestamp.valueOf(2015-01-01 00:00:00)), (2, Timestamp.valueOf(2014-01-01
 00:00:00
 ...
 scala df.filter($_2 = 2014-06-01).show
 ...
 _1 _2
 2  2014-01-01 00:00:...

 However, in 1.4.0, the filter is always false:
 scala val df = sqlContext.createDataFrame(Seq((1,
 Timestamp.valueOf(2015-01-01 00:00:00)), (2, Timestamp.valueOf(2014-01-01
 00:00:00
 df: org.apache.spark.sql.DataFrame = [_1: int, _2: timestamp]

 scala df.filter($_2 = 2014-06-01).show
 +--+--+
 |_1|_2|
 +--+--+
 +--+--+

 Not sure if that is intended, but I cannot find any doc mentioning these
 inconsistencies.

 Thanks.

 Justin

 
 View this message in context: Inconsistent behavior with Dataframe Timestamp
 between 1.3.1 and 1.4.0
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Efficient way to get top K values per key in (key, value) RDD?

2015-06-17 Thread Xiangrui Meng
This is implemented in MLlib:
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/rdd/MLPairRDDFunctions.scala#L41.
-Xiangrui

On Wed, Jun 10, 2015 at 1:53 PM, erisa erisa...@gmail.com wrote:
 Hi,

 I am a Spark newbie, and trying to solve the same problem, and have
 implemented the same exact solution that sowen  is suggesting. I am using
 priorityqueues to keep trak of the top 25 sub_categories, per each category,
 and using the combineByKey function to do that.
 However I run into the following exception when I submit the spark job:

 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 17)
 java.lang.UnsupportedOperationException: unsuitable as hash key
 at
 scala.collection.mutable.PriorityQueue.hashCode(PriorityQueue.scala:226)


 From the error it looks like spark is trying to use the mutable priority
 queue as a hashkey so the error makes sense, but I don't get why it is doing
 that since the value of the RDD record is a priority queue not the key.

 Maybe there is a more straightforward solution to what I want to achieve, so
 any suggestion is appreciated :)

 Thanks,
 Erisa



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Efficient-way-to-get-top-K-values-per-key-in-key-value-RDD-tp20370p23263.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Not albe to run FP-growth Example

2015-06-17 Thread Xiangrui Meng
You should add spark-mllib_2.10 as a dependency instead of declaring
it as the artifactId. And always use the same version for spark-core
and spark-mllib. I saw you used 1.3.0 for spark-core but 1.4.0 for
spark-mllib, which is not guaranteed to work. If you set the scope to
provided, mllib jar won't be included in the run time dependency. It
means that you need to use spark-summit from Spark to launch your
application. Please read the user guide:
http://spark.apache.org/docs/latest/submitting-applications.html.
-Xiangrui

On Sun, Jun 14, 2015 at 11:39 PM, masoom alam masoom.a...@wanclouds.net wrote:
 even if the following POM is also not working:

 project xmlns=http://maven.apache.org/POM/4.0.0;
 xmlns:xsi=http://www.w3.org/2001/XMLSchema-instance;
 xsi:schemaLocation=http://maven.apache.org/POM/4.0.0
 http://maven.apache.org/maven-v4_0_0.xsd;
 parent
 artifactIdspark-parent_2.10/artifactId
 groupIdorg.apache.spark/groupId
 version1.4.0/version
 /parent
 modelVersion4.0.0/modelVersion
 groupIdorg.apache.spark/groupId
 artifactIdspark-mllib_2.10/artifactId
 nameSpark Project ML Library/name
 urlhttp://spark.apache.org//url
 build

 outputDirectorytarget/scala-${scala.binary.version}/classes/outputDirectory

 testOutputDirectorytarget/scala-${scala.binary.version}/test-classes/testOutputDirectory
 /build
 profiles
 profile
 idnetlib-lgpl/id
 dependencies
 dependency
 groupIdcom.github.fommil.netlib/groupId
 artifactIdall/artifactId
 version${netlib.java.version}/version
 typepom/type
 /dependency
 /dependencies
 /profile
 /profiles
 dependencies
 dependency
 groupIdorg.apache.spark/groupId
 artifactIdspark-core_2.10/artifactId
 version1.4.0/version
 scopecompile/scope
 /dependency
 dependency
 groupIdorg.apache.spark/groupId
 artifactIdspark-streaming_2.10/artifactId
 version1.4.0/version
 scopecompile/scope
 /dependency
 dependency
 groupIdorg.apache.spark/groupId
 artifactIdspark-sql_2.10/artifactId
 version1.4.0/version
 scopecompile/scope
 /dependency
 dependency
 groupIdorg.apache.spark/groupId
 artifactIdspark-graphx_2.10/artifactId
 version1.4.0/version
 scopecompile/scope
 /dependency
 dependency
 groupIdorg.jblas/groupId
 artifactIdjblas/artifactId
 version1.2.4/version
 scopetest/scope
 /dependency
 dependency
 groupIdorg.scalanlp/groupId
 artifactIdbreeze_2.10/artifactId
 version0.11.2/version
 scopecompile/scope
 exclusions
 exclusion
 artifactIdjunit/artifactId
 groupIdjunit/groupId
 /exclusion
 exclusion
 artifactIdcommons-math3/artifactId
 groupIdorg.apache.commons/groupId
 /exclusion
 /exclusions
 /dependency
 dependency
 groupIdorg.apache.commons/groupId
 artifactIdcommons-math3/artifactId
 version3.4.1/version
 scopecompile/scope
 /dependency
 dependency
 groupIdorg.scalacheck/groupId
 artifactIdscalacheck_2.10/artifactId
 version1.11.3/version
 scopetest/scope
 exclusions
 exclusion
 artifactIdtest-interface/artifactId
 groupIdorg.scala-sbt/groupId
 /exclusion
 /exclusions
 /dependency
 dependency
 groupIdjunit/groupId
 artifactIdjunit/artifactId
 version4.10/version
 scopetest/scope
 exclusions
 exclusion
 artifactIdhamcrest-core/artifactId
 groupIdorg.hamcrest/groupId
 /exclusion
 /exclusions
 /dependency
 dependency
 groupIdcom.novocode/groupId
 artifactIdjunit-interface/artifactId
 version0.10/version
 scopetest/scope
 exclusions
 exclusion
 artifactIdjunit-dep/artifactId
 groupIdjunit/groupId
 /exclusion
 exclusion
 artifactIdtest-interface/artifactId
 groupIdorg.scala-tools.testing/groupId
 /exclusion
 /exclusions
 /dependency
 dependency
 groupIdorg.mockito/groupId
 

Re: FP Growth saveAsTextFile

2015-05-21 Thread Xiangrui Meng
)
 at
 org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:428)
 at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:908)
 at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:801)
 at
 org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:123)
 at
 org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:90)
 at
 org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1068)
 at
 org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1059)
 at
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 at org.apache.spark.scheduler.Task.run(Task.scala:64)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)

 On Wed, May 20, 2015 at 2:05 PM, Xiangrui Meng men...@gmail.com wrote:

 Could you post the stack trace? If you are using Spark 1.3 or 1.4, it
 would be easier to save freq itemsets as a Parquet file. -Xiangrui

 On Wed, May 20, 2015 at 12:16 PM, Eric Tanner
 eric.tan...@justenough.com wrote:
  I am having trouble with saving an FP-Growth model as a text file.  I
 can
  print out the results, but when I try to save the model I get a
  NullPointerException.
 
  model.freqItemsets.saveAsTextFile(c://fpGrowth/model)
 
  Thanks,
 
  Eric

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




 --





 *Eric Tanner*Big Data Developer

 [image: JustEnough Logo]

 15440 Laguna Canyon, Suite 100

 Irvine, CA 92618



 Cell:
 Tel:
 Skype:
 Web:

   +1 (951) 313-9274
   +1 (949) 706-0400
   e http://tonya.nicholls.je/ric.tanner.je
   www.justenough.com

 Confidentiality Note: The information contained in this email and
 document(s) attached are for the exclusive use of the addressee and may
 contain confidential, privileged and non-disclosable information. If the
 recipient of this email is not the addressee, such recipient is strictly
 prohibited from reading, photocopying, distribution or otherwise using this
 email or its contents in any way.



Re: Pandas timezone problems

2015-05-21 Thread Xiangrui Meng
These are relevant:

JIRA: https://issues.apache.org/jira/browse/SPARK-6411
PR: https://github.com/apache/spark/pull/6250

On Thu, May 21, 2015 at 3:16 PM, Def_Os njde...@gmail.com wrote:
 After deserialization, something seems to be wrong with my pandas DataFrames.
 It looks like the timezone information is lost, and subsequent errors ensue.

 Serializing and deserializing a timezone-aware DataFrame tests just fine, so
 it must be Spark that somehow changes the data.

 My program runs timezone-unaware data without problems.

 Anybody have any ideas on what causes this, or how to solve it?





 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Pandas-timezone-problems-tp22985.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: FP Growth saveAsTextFile

2015-05-20 Thread Xiangrui Meng
Could you post the stack trace? If you are using Spark 1.3 or 1.4, it
would be easier to save freq itemsets as a Parquet file. -Xiangrui

On Wed, May 20, 2015 at 12:16 PM, Eric Tanner
eric.tan...@justenough.com wrote:
 I am having trouble with saving an FP-Growth model as a text file.  I can
 print out the results, but when I try to save the model I get a
 NullPointerException.

 model.freqItemsets.saveAsTextFile(c://fpGrowth/model)

 Thanks,

 Eric

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: User Defined Type (UDT)

2015-05-20 Thread Xiangrui Meng
Probably in 1.5. I made a JIRA for it:
https://issues.apache.org/jira/browse/SPARK-7768. You can watch that
JIRA (and vote). -Xiangrui

On Wed, May 20, 2015 at 11:03 AM, Justin Uang justin.u...@gmail.com wrote:
 Xiangrui, is there a timeline for when UDTs will become a public API? I'm
 currently using them to support java 8's ZonedDateTime.

 On Tue, May 19, 2015 at 3:14 PM Xiangrui Meng men...@gmail.com wrote:

 (Note that UDT is not a public API yet.)

 On Thu, May 7, 2015 at 7:11 AM, wjur wojtek.jurc...@gmail.com wrote:
  Hi all!
 
  I'm using Spark 1.3.0 and I'm struggling with a definition of a new type
  for
  a project I'm working on. I've created a case class Person(name: String)
  and
  now I'm trying to make Spark to be able serialize and deserialize the
  defined type. I made a couple of attempts but none of them did not work
  in
  100% (there were issues either in serialization or deserialization).
 
  This is my class and the corresponding UDT.
 
  @SQLUserDefinedType(udt = classOf[PersonUDT])
  case class Person(name: String)
 
  class PersonUDT extends UserDefinedType[Person] {
override def sqlType: DataType = StructType(Seq(StructField(name,
  StringType)))
 
override def serialize(obj: Any): Seq[Any] = {

 This should return a Row instance instead of Seq[Any], because the
 sqlType is a struct type.

  obj match {
case c: Person =
  Seq(c.name)
  }
}
 
override def userClass: Class[Person] = classOf[Person]
 
override def deserialize(datum: Any): Person = {
  datum match {
case values: Seq[_] =
  assert(values.length == 1)
  Person(values.head.asInstanceOf[String])
case values: util.ArrayList[_] =
  Person(values.get(0).asInstanceOf[String])
  }
}
 
// In some other attempt I was creating RDD of Seq with manually
  serialized data and
// I had to override equals because two DFs with the same type weren't
  actually equal
// StructField(person,...types.PersonUDT@a096ac3)
// StructField(person,...types.PersonUDT@613fd937)
def canEqual(other: Any): Boolean = other.isInstanceOf[PersonUDT]
 
override def equals(other: Any): Boolean = other match {
  case that: PersonUDT = true
  case _ = false
}
 
override def hashCode(): Int = 1
  }
 
  This is how I create RDD of Person and then try to create a DataFrame
  val rdd = sparkContext.parallelize((1 to 100).map(i =
  Person(i.toString)))
  val sparkDataFrame = sqlContext.createDataFrame(rdd)
 
  The second line throws an exception:
  java.lang.ClassCastException: types.PersonUDT cannot be cast to
  org.apache.spark.sql.types.StructType
  at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:316)
 
  I looked into the code in SQLContext.scala and it seems that the code
  requires UDT to be extending StructType but in fact it extends
  UserDefinedType which extends directly DataType.
  I'm not sure whether it is a bug or I just don't know how to use UDTs.
 
  Do you have any suggestions how to solve this? I based my UDT on
  ExamplePointUDT but it seems to be incorrect. Is there a working example
  for
  UDT?
 
 
  Thank you for the reply in advance!
  wjur
 
 
 
  --
  View this message in context:
  http://apache-spark-user-list.1001560.n3.nabble.com/User-Defined-Type-UDT-tp22796.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to implement an Evaluator for a ML pipeline?

2015-05-19 Thread Xiangrui Meng
The documentation needs to be updated to state that higher metric
values are better (https://issues.apache.org/jira/browse/SPARK-7740).
I don't know why if you negate the return value of the Evaluator you
still get the highest regularization parameter candidate. Maybe you
should check the log messages from CrossValidator and see the average
metric values during cross validation. -Xiangrui

On Sat, May 9, 2015 at 12:15 PM, Stefan H. twel...@gmx.de wrote:
 Hello everyone,

 I am stuck with the (experimental, I think) API for machine learning
 pipelines. I have a pipeline with just one estimator (ALS) and I want it to
 try different values for the regularization parameter. Therefore I need to
 supply an Evaluator that returns a value of type Double. I guess this could
 be something like accuracy or mean squared error? The only implementation I
 found is BinaryClassificationEvaluator, and I did not understand the
 computation there.

 I could not find detailed documentation so I implemented a dummy Evaluator
 that just returns the regularization parameter:

   new Evaluator {
 def evaluate(dataset: DataFrame, paramMap: ParamMap): Double =
   paramMap.get(als.regParam).getOrElse(throw new Exception)
   }

 I just wanted to see whether the lower or higher value wins. On the
 resulting model I inspected the chosen regularization parameter this way:

   cvModel.bestModel.fittingParamMap.get(als.regParam)

 And it was the highest of my three regularization parameter candidates.
 Strange thing is, if I negate the return value of the Evaluator, that line
 still returns the highest regularization parameter candidate.

 So I am probably working with false assumptions. I'd be grateful if someone
 could point me to some documentation or examples, or has a few hints to
 share.

 Cheers,
 Stefan



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-implement-an-Evaluator-for-a-ML-pipeline-tp22830.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Word2Vec with billion-word corpora

2015-05-19 Thread Xiangrui Meng
With vocabulary size 4M and 400 vector size, you need 400 * 4M = 16B
floats to store the model. That is 64GB. We store the model on the
driver node in the current implementation. So I don't think it would
work. You might try increasing the minCount to decrease the vocabulary
size and reduce the vector size. I'm interested in learning the
trade-off between the model size and the model quality. If you have
done some experiments, please let me know. Thanks! -Xiangrui

On Wed, May 13, 2015 at 11:17 AM, Shilad Sen s...@macalester.edu wrote:
 Hi all,

 I'm experimenting with Spark's Word2Vec implementation for a relatively
 large (5B word, vocabulary size 4M, 400-dimensional vectors) corpora. Has
 anybody had success running it at this scale?

 Thanks in advance for your guidance!

 -Shilad

 --
 Shilad W. Sen
 Associate Professor
 Mathematics, Statistics, and Computer Science Dept.
 Macalester College
 s...@macalester.edu
 http://www.shilad.com
 https://www.linkedin.com/in/shilad
 651-696-6273

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Discretization

2015-05-19 Thread Xiangrui Meng
Thanks for asking! We should improve the documentation. The sample
dataset is actually mimicking the MNIST digits dataset, where the
values are gray levels (0-255). So by dividing by 16, we want to map
it to 16 coarse bins for the gray levels. Actually, there is a bug in
the doc, we should convert the values to integer first before dividing
by 16. I created https://issues.apache.org/jira/browse/SPARK-7739 for
this issue. Welcome to submit a patch:) Thanks!

Best,
Xiangrui

On Thu, May 7, 2015 at 9:20 PM, spark_user_2015 li...@adobe.com wrote:
 The Spark documentation shows the following example code:

 // Discretize data in 16 equal bins since ChiSqSelector requires categorical
 features
 val discretizedData = data.map { lp =
   LabeledPoint(lp.label, Vectors.dense(lp.features.toArray.map { x = x / 16
 } ) )
 }

 I'm sort of missing why x / 16 is considered a discretization approach
 here.

 [https://spark.apache.org/docs/latest/mllib-feature-extraction.html#feature-selection]



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Discretization-tp22811.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Stratified sampling with DataFrames

2015-05-19 Thread Xiangrui Meng
You need to convert DataFrame to RDD, call sampleByKey, and then apply
the schema back to create DataFrame.

val df: DataFrame = ...
val schema = df.schema
val sampledRDD = df.rdd.keyBy(r = r.getAs[Int](0)).sampleByKey(...).values
val sampled = sqlContext.createDataFrame(sampledRDD, schema)

Hopefully this would be much easier in 1.5.

Best,
Xiangrui

On Mon, May 11, 2015 at 12:32 PM, Karthikeyan Muthukumar
mkarthiksw...@gmail.com wrote:
 Hi,
 I'm in Spark 1.3.0 and my data is in DataFrames.
 I need operations like sampleByKey(), sampleByKeyExact().
 I saw the JIRA Add approximate stratified sampling to DataFrame
 (https://issues.apache.org/jira/browse/SPARK-7157).
 That's targeted for Spark 1.5, till that comes through, whats the easiest
 way to accomplish the equivalent of sampleByKey() and sampleByKeyExact() on
 DataFrames.
 Thanks  Regards
 MK


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SQL UserDefinedType can't be saved in parquet file when using assembly jar

2015-05-19 Thread Xiangrui Meng
Hey Jaonary,

I saw this line in the error message:

org.apache.spark.sql.types.DataType$CaseClassStringParser$.apply(dataTypes.scala:163)

CaseClassStringParser is only used in older versions of Spark to parse
schema from JSON. So I suspect that the cluster was running on a old
version of Spark when you use spark-submit to run your assembly jar.

Best,
Xiangrui

On Mon, May 11, 2015 at 7:40 AM, Jaonary Rabarisoa jaon...@gmail.com wrote:
 In this example, every thing work expect save to parquet file.

 On Mon, May 11, 2015 at 4:39 PM, Jaonary Rabarisoa jaon...@gmail.com
 wrote:

 MyDenseVectorUDT do exist in the assembly jar and in this example all the
 code is in a single file to make sure every thing is included.

 On Tue, Apr 21, 2015 at 1:17 AM, Xiangrui Meng men...@gmail.com wrote:

 You should check where MyDenseVectorUDT is defined and whether it was
 on the classpath (or in the assembly jar) at runtime. Make sure the
 full class name (with package name) is used. Btw, UDTs are not public
 yet, so please use it with caution. -Xiangrui

 On Fri, Apr 17, 2015 at 12:45 AM, Jaonary Rabarisoa jaon...@gmail.com
 wrote:
  Dear all,
 
  Here is an example of code to reproduce the issue I mentioned in a
  previous
  mail about saving an UserDefinedType into a parquet file. The problem
  here
  is that the code works when I run it inside intellij idea but fails
  when I
  create the assembly jar and run it with spark-submit. I use the master
  version of  Spark.
 
  @SQLUserDefinedType(udt = classOf[MyDenseVectorUDT])
  class MyDenseVector(val data: Array[Double]) extends Serializable {
override def equals(other: Any): Boolean = other match {
  case v: MyDenseVector =
java.util.Arrays.equals(this.data, v.data)
  case _ = false
}
  }
 
  class MyDenseVectorUDT extends UserDefinedType[MyDenseVector] {
override def sqlType: DataType = ArrayType(DoubleType, containsNull =
  false)
override def serialize(obj: Any): Seq[Double] = {
  obj match {
case features: MyDenseVector =
  features.data.toSeq
  }
}
 
override def deserialize(datum: Any): MyDenseVector = {
  datum match {
case data: Seq[_] =
  new MyDenseVector(data.asInstanceOf[Seq[Double]].toArray)
  }
}
 
override def userClass: Class[MyDenseVector] = classOf[MyDenseVector]
 
  }
 
  case class Toto(imageAnnotation: MyDenseVector)
 
  object TestUserDefinedType {
 
case class Params(input: String = null,
 partitions: Int = 12,
  outputDir: String = images.parquet)
 
def main(args: Array[String]): Unit = {
 
  val conf = new
  SparkConf().setAppName(ImportImageFolder).setMaster(local[4])
 
  val sc = new SparkContext(conf)
  val sqlContext = new SQLContext(sc)
 
  import sqlContext.implicits._
 
  val rawImages = sc.parallelize((1 to 5).map(x = Toto(new
  MyDenseVector(Array[Double](x.toDouble).toDF
 
  rawImages.printSchema()
 
  rawImages.show()
 
  rawImages.save(toto.parquet) // This fails with assembly jar
  sc.stop()
 
}
  }
 
 
  My build.sbt is as follow :
 
  libraryDependencies ++= Seq(
org.apache.spark %% spark-core % sparkVersion % provided,
org.apache.spark %% spark-sql % sparkVersion,
org.apache.spark %% spark-mllib % sparkVersion
  )
 
  assemblyMergeStrategy in assembly := {
case PathList(javax, servlet, xs @ _*) = MergeStrategy.first
case PathList(org, apache, xs @ _*) = MergeStrategy.first
case PathList(org, jboss, xs @ _*) = MergeStrategy.first
  //  case PathList(ps @ _*) if ps.last endsWith .html =
  MergeStrategy.first
  //  case application.conf=
  MergeStrategy.concat
case m if m.startsWith(META-INF) = MergeStrategy.discard
//case x =
//  val oldStrategy = (assemblyMergeStrategy in assembly).value
//  oldStrategy(x)
case _ = MergeStrategy.first
  }
 
 
  As I said, this code works without problem when I execute it inside
  intellij
  idea. But when generate the assembly jar with sbt-assembly and
 
  use spark-submit I got the following error :
 
  15/04/17 09:34:01 INFO ParquetOutputFormat: Writer version is:
  PARQUET_1_0
  15/04/17 09:34:01 ERROR Executor: Exception in task 3.0 in stage 2.0
  (TID 7)
  java.lang.IllegalArgumentException: Unsupported dataType:
 
  {type:struct,fields:[{name:imageAnnotation,type:{type:udt,class:MyDenseVectorUDT,pyClass:null,sqlType:{type:array,elementType:double,containsNull:false}},nullable:true,metadata:{}}]},
  [1.1] failure: `TimestampType' expected but `{' found
 
 
  {type:struct,fields:[{name:imageAnnotation,type:{type:udt,class:MyDenseVectorUDT,pyClass:null,sqlType:{type:array,elementType:double,containsNull:false}},nullable:true,metadata:{}}]}
  ^
at
 
  org.apache.spark.sql.types.DataType$CaseClassStringParser$.apply(dataTypes.scala:163)
at
 
  org.apache.spark.sql.types.DataType$.fromCaseClassString(dataTypes.scala:98

Re: Increase maximum amount of columns for covariance matrix for principal components

2015-05-19 Thread Xiangrui Meng
We use a dense array to store the covariance matrix on the driver
node. So its length is limited by the integer range, which is 65536 *
65536 (actually half). -Xiangrui

On Wed, May 13, 2015 at 1:57 AM, Sebastian Alfers
sebastian.alf...@googlemail.com wrote:
 Hello,


 in order to compute a huge dataset, the amount of columns to calculate the
 covariance matrix is limited:

 https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala#L129

 What is the reason behind this limitation and can it be extended?

 Greetings

 Sebastian

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: k-means core function for temporal geo data

2015-05-19 Thread Xiangrui Meng
I'm not sure whether k-means would converge with this customized
distance measure. You can list (weighted) time as a feature along with
coordinates, and then use Euclidean distance. For other supported
distance measures, you can check Derrick's package:
http://spark-packages.org/package/derrickburns/generalized-kmeans-clustering.
-Xiangrui

On Mon, May 18, 2015 at 2:30 AM, Pa Rö paul.roewer1...@googlemail.com wrote:
 hallo,

 i want cluster geo data (lat,long,timestamp) with k-means. now i search for
 a good core function, i can not find good paper or other sources for that.
 to time i multiplicate the time and the space distance:

 public static double dis(GeoData input1, GeoData input2)
 {
double timeDis = Math.abs( input1.getTime() - input2.getTime() );
double geoDis = geoDis(input1, input2); //extra function
return timeDis*geoDis;
 }

 maybe someone know a good core function for clustering temporal geo data?
 (need citation)

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark mllib kmeans

2015-05-19 Thread Xiangrui Meng
Just curious, what distance measure do you need? -Xiangrui

On Mon, May 11, 2015 at 8:28 AM, Jaonary Rabarisoa jaon...@gmail.com wrote:
 take a look at this
 https://github.com/derrickburns/generalized-kmeans-clustering

 Best,

 Jao

 On Mon, May 11, 2015 at 3:55 PM, Driesprong, Fokko fo...@driesprong.frl
 wrote:

 Hi Paul,

 I would say that it should be possible, but you'll need a different
 distance measure which conforms to your coordinate system.

 2015-05-11 14:59 GMT+02:00 Pa Rö paul.roewer1...@googlemail.com:

 hi,

 it is possible to use a custom distance measure and a other data typ as
 vector?
 i want cluster temporal geo datas.

 best regards
 paul




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: LogisticRegressionWithLBFGS with large feature set

2015-05-19 Thread Xiangrui Meng
For ML applications, the best setting to set the number of partitions
to match the number of cores to reduce shuffle size. You have 3072
partitions but 128 executors, which causes the overhead. For the
MultivariateOnlineSummarizer, we plan to add flags to specify what
need to be computed to reduce the overhead, in 1.5. -Xiangrui

On Mon, May 18, 2015 at 7:00 PM, Imran Rashid iras...@cloudera.com wrote:
 I'm not super familiar with this part of the code, but from taking a quick
 look:

 a) the code creates a MultivariateOnlineSummarizer, which stores 7 doubles
 per feature (mean, max, min, etc. etc.)
 b) The limit is on the result size from *all* tasks, not from one task.  You
 start with 3072 tasks
 c) tree aggregate should first merge things down to about 8 partitions
 before bringing results back to the driver, which is how you end up with 54
 tasks at your failure.

 this means you should have about 30 MB / per task per meaure * 54 tasks * 7
 measures, which comes to about 11GB, or in the ballpark of what you found.

 In principle, you could get this working by adding more levels to the
 treeAggregate (the depth parameter), but looks like that isn't exposed.  You
 could also try coalescing your data down to a smaller set of partitions
 first, but that comes with other downsides.

 Perhaps an MLLib expert could chime in on an alternate approach.  My feeling
 (from a very quick look) is that there is room for some optimization in the
 internals

 Imran

 On Thu, May 14, 2015 at 5:44 PM, Pala M Muthaia
 mchett...@rocketfuelinc.com wrote:

 Hi,

 I am trying to validate our modeling data pipeline by running
 LogisticRegressionWithLBFGS on a dataset with ~3.7 million features,
 basically to compute AUC. This is on Spark 1.3.0.

 I am using 128 executors with 4 GB each + driver with 8 GB. The number of
 data partitions is 3072

 The execution fails with the following messages:

 Total size of serialized results of 54 tasks (10.4 GB) is bigger than
 spark.driver.maxResultSize (3.0 GB)

 The associated stage in the job is treeAggregate at
 StandardScaler.scala:52 : The call stack looks as below:

 org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:996)
 org.apache.spark.mllib.feature.StandardScaler.fit(StandardScaler.scala:52)

 org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:233)

 org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:190)


 I am trying to both understand why such large amount of data needs to be
 passed back to driver as well as figure out a way around this. I also want
 to understand how much memory is required, as a function of dataset size,
 feature set size, and number of iterations performed, for future
 experiments.

 From looking at the MLLib code, the largest data structure seems to be a
 dense vector of the same size as feature set. I am not familiar with
 algorithm or its implementation I would guess 3.7 million features would
 lead to a constant multiple of ~3.7 * 8 ~ 30 MB. So how does the dataset
 size become so large?

 I looked into the treeAggregate and it looks like hierarchical
 aggregation. If the data being sent to the driver is basically the
 aggregated coefficients (i.e. dense vectors) for the final aggregation,
 can't the dense vectors from executors be pulled in one at a time and merged
 in memory, rather than pulling all of them in together? (This is totally
 uneducated guess so i may be completely off here).

 Is there a way to get this running?

 Thanks,
 pala



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Find KNN in Spark SQL

2015-05-19 Thread Xiangrui Meng
Spark SQL doesn't provide spatial features. Large-scale KNN is usually
combined with locality-sensitive hashing (LSH). This Spark package may
be helpful: http://spark-packages.org/package/mrsqueeze/spark-hash.
-Xiangrui

On Sat, May 9, 2015 at 9:25 PM, Dong Li lid...@lidong.net.cn wrote:
 Hello experts,

 I’m new to Spark, and want to find K nearest neighbors on huge scale 
 high-dimension points dataset in very short time.

 The scenario is: the dataset contains more than 10 million points, whose 
 dimension is 200d. I’m building a web service, to receive one new point at 
 each request and return K nearest points inside that dataset, also need to 
 ensure the time-cost not very high. I have a cluster with several high-memory 
 nodes for this service.

 Currently I only have these ideas here:
 1. To create several ball-tree instances in each node when service 
 initializing. This is fast, but not perform well at data scaling ability. I 
 cannot insert new nodes to the ball-trees unless I restart the services and 
 rebuild them.
 2. To use sql based solution. Some database like PostgreSQL and SqlServer 
 have features on spatial search. But these database may not perform well in 
 big data environment. (Does SparkSQL have Spatial features or spatial index?)

 Based on your experience, can I achieve this scenario in Spark SQL? Or do you 
 know other projects in Spark stack acting well for this?
 Any ideas are appreciated, thanks very much.

 Regards,
 Dong




 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: question about customize kmeans distance measure

2015-05-19 Thread Xiangrui Meng
MLlib only supports Euclidean distance for k-means. You can find
Bregman divergence support in Derrick's package:
http://spark-packages.org/package/derrickburns/generalized-kmeans-clustering.
Which distance measure do you want to use? -Xiangrui

On Tue, May 12, 2015 at 7:23 PM, June zhuman.priv...@yahoo.com.invalid wrote:
 Dear list,



 I am new to spark, and I want to use the kmeans algorithm in mllib package.

 I am wondering whether it is possible to customize the distance measure used
 by kmeans, and how?



 Many thanks!



 June

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: MLlib libsvm isssues with data

2015-05-19 Thread Xiangrui Meng
The index should start from 1 for LIBSVM format, as defined in the README
of LIBSVM (https://github.com/cjlin1/libsvm/blob/master/README#L64). The
only exception is the precomputed kernel, which MLlib doesn't support.
-Xiangrui

On Wed, May 6, 2015 at 1:42 AM, doyere doy...@doyere.cn wrote:

 Hi all,

 After do some tests,finally I solve it.I wrote here for other people who
 met this question. here's a example of data format error I faced

 0 0:0 1:0 2:1
 1 1:1 3:2

 the data for 0:0 and 1:0/1:1 is the reason for
 ArrayIndexOutOfBoundsException.If someone who faced the same question just
 delete them from u past data or update it. Since it was worked in
 libsvm-tools,so I guess in spark MLlib it just implements a bit different.

  原始邮件
 *发件人:* doyeredoy...@doyere.cn
 *收件人:* useruser@spark.apache.org
 *发送时间:* 2015年5月6日(周三) 08:59
 *主题:* MLlib libsvm isssues with data

 hi all:

 I’ve met a issues with MLlib.I used posted to the community seems put the
 wrong place:( .Then I put in stackoverflowf.for a good format details plz
 see
 http://stackoverflow.com/questions/30048344/spark-mllib-libsvm-isssues-with-data.hope
 someone could help [image: ]

 I guess it’s due to my data.but I’ve test it in libsvm-tools it worked
 well,and I’ve used the libsvm data python data format test tool and it’s
 ok.Just don’t know why it errors with java.lang.
 ArrayIndexOutOfBoundsException: -1 :(

 And this is my first time using the mail list ask for help.If I did
 something wrong or I described not clearly plz tell me.


 doye



Re: RandomSplit with Spark-ML and Dataframe

2015-05-19 Thread Xiangrui Meng
In 1.4, we added RAND as a DataFrame expression, which can be used for
random split. Please check the example here:
https://github.com/apache/spark/blob/master/python/pyspark/ml/tuning.py#L214.
-Xiangrui

On Thu, May 7, 2015 at 8:39 AM, Olivier Girardot
o.girar...@lateral-thoughts.com wrote:
 Hi,
 is there any best practice to do like in MLLib a randomSplit of
 training/cross-validation set with dataframes and the pipeline API ?

 Regards

 Olivier.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: User Defined Type (UDT)

2015-05-19 Thread Xiangrui Meng
(Note that UDT is not a public API yet.)

On Thu, May 7, 2015 at 7:11 AM, wjur wojtek.jurc...@gmail.com wrote:
 Hi all!

 I'm using Spark 1.3.0 and I'm struggling with a definition of a new type for
 a project I'm working on. I've created a case class Person(name: String) and
 now I'm trying to make Spark to be able serialize and deserialize the
 defined type. I made a couple of attempts but none of them did not work in
 100% (there were issues either in serialization or deserialization).

 This is my class and the corresponding UDT.

 @SQLUserDefinedType(udt = classOf[PersonUDT])
 case class Person(name: String)

 class PersonUDT extends UserDefinedType[Person] {
   override def sqlType: DataType = StructType(Seq(StructField(name,
 StringType)))

   override def serialize(obj: Any): Seq[Any] = {

This should return a Row instance instead of Seq[Any], because the
sqlType is a struct type.

 obj match {
   case c: Person =
 Seq(c.name)
 }
   }

   override def userClass: Class[Person] = classOf[Person]

   override def deserialize(datum: Any): Person = {
 datum match {
   case values: Seq[_] =
 assert(values.length == 1)
 Person(values.head.asInstanceOf[String])
   case values: util.ArrayList[_] =
 Person(values.get(0).asInstanceOf[String])
 }
   }

   // In some other attempt I was creating RDD of Seq with manually
 serialized data and
   // I had to override equals because two DFs with the same type weren't
 actually equal
   // StructField(person,...types.PersonUDT@a096ac3)
   // StructField(person,...types.PersonUDT@613fd937)
   def canEqual(other: Any): Boolean = other.isInstanceOf[PersonUDT]

   override def equals(other: Any): Boolean = other match {
 case that: PersonUDT = true
 case _ = false
   }

   override def hashCode(): Int = 1
 }

 This is how I create RDD of Person and then try to create a DataFrame
 val rdd = sparkContext.parallelize((1 to 100).map(i = Person(i.toString)))
 val sparkDataFrame = sqlContext.createDataFrame(rdd)

 The second line throws an exception:
 java.lang.ClassCastException: types.PersonUDT cannot be cast to
 org.apache.spark.sql.types.StructType
 at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:316)

 I looked into the code in SQLContext.scala and it seems that the code
 requires UDT to be extending StructType but in fact it extends
 UserDefinedType which extends directly DataType.
 I'm not sure whether it is a bug or I just don't know how to use UDTs.

 Do you have any suggestions how to solve this? I based my UDT on
 ExamplePointUDT but it seems to be incorrect. Is there a working example for
 UDT?


 Thank you for the reply in advance!
 wjur



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/User-Defined-Type-UDT-tp22796.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Implicit matrix factorization returning different results between spark 1.2.0 and 1.3.0

2015-05-19 Thread Xiangrui Meng
In implicit feedback model, the coefficients were already penalized
(towards zero) by the number of unobserved ratings. So I think it is
fair to keep the 1.3.0 weighting (by the number of total users/items).
Again, I don't think we have a clear answer. It would be nice to run
some experiments and see which works better. -Xiangrui

On Thu, May 7, 2015 at 9:35 AM, Ravi Mody rmody...@gmail.com wrote:
 After thinking about it more, I do think weighting lambda by sum_i cij is
 the equivalent of the ALS-WR paper's approach for the implicit case. This
 provides scale-invariance for varying products/users and for varying
 ratings, and should behave well for all alphas. What do you guys think?

 On Wed, May 6, 2015 at 12:29 PM, Ravi Mody rmody...@gmail.com wrote:

 Whoops I just saw this thread, it got caught in my spam filter. Thanks for
 looking into this Xiangrui and Sean.

 The implicit situation does seem fairly complicated to me. The cost
 function (not including the regularization term) is affected both by the
 number of ratings and by the number of user/products. As we increase alpha
 the contribution to the cost function from the number of users/products
 diminishes compared to the contribution from the number of ratings. So large
 alphas seem to favor the weighted-lambda approach, even though it's not a
 perfect match. Smaller alphas favor Xiangrui's 1.3.0 approach, but again
 it's not a perfect match.

 I believe low alphas won't work well with regularization because both
 terms in the cost function will just push everything to zero. Some of my
 experiments confirm this. This leads me to think that weighted-lambda would
 work better in practice, but I have no evidence of this. It may make sense
 to weight lambda by sum_i cij instead?





 On Wed, Apr 1, 2015 at 7:59 PM, Xiangrui Meng men...@gmail.com wrote:

 Ravi, we just merged https://issues.apache.org/jira/browse/SPARK-6642
 and used the same lambda scaling as in 1.2. The change will be
 included in Spark 1.3.1, which will be released soon. Thanks for
 reporting this issue! -Xiangrui

 On Tue, Mar 31, 2015 at 8:53 PM, Xiangrui Meng men...@gmail.com wrote:
  I created a JIRA for this:
  https://issues.apache.org/jira/browse/SPARK-6637. Since we don't have
  a clear answer about how the scaling should be handled. Maybe the best
  solution for now is to switch back to the 1.2 scaling. -Xiangrui
 
  On Tue, Mar 31, 2015 at 2:50 PM, Sean Owen so...@cloudera.com wrote:
  Ah yeah I take your point. The squared error term is over the whole
  user-item matrix, technically, in the implicit case. I suppose I am
  used to assuming that the 0 terms in this matrix are weighted so much
  less (because alpha is usually large-ish) that they're almost not
  there, but they are. So I had just used the explicit formulation.
 
  I suppose the result is kind of scale invariant, but not exactly. I
  had not prioritized this property since I had generally built models
  on the full data set and not a sample, and had assumed that lambda
  would need to be retuned over time as the input grew anyway.
 
  So, basically I don't know anything more than you do, sorry!
 
  On Tue, Mar 31, 2015 at 10:41 PM, Xiangrui Meng men...@gmail.com
  wrote:
  Hey Sean,
 
  That is true for explicit model, but not for implicit. The ALS-WR
  paper doesn't cover the implicit model. In implicit formulation, a
  sub-problem (for v_j) is:
 
  min_{v_j} \sum_i c_ij (p_ij - u_i^T v_j)^2 + lambda * X * \|v_j\|_2^2
 
  This is a sum for all i but not just the users who rate item j. In
  this case, if we set X=m_j, the number of observed ratings for item
  j,
  it is not really scale invariant. We have #users user vectors in the
  least squares problem but only penalize lambda * #ratings. I was
  suggesting using lambda * m directly for implicit model to match the
  number of vectors in the least squares problem. Well, this is my
  theory. I don't find any public work about it.
 
  Best,
  Xiangrui
 
  On Tue, Mar 31, 2015 at 5:17 AM, Sean Owen so...@cloudera.com
  wrote:
  I had always understood the formulation to be the first option you
  describe. Lambda is scaled by the number of items the user has rated
  /
  interacted with. I think the goal is to avoid fitting the tastes of
  prolific users disproportionately just because they have many
  ratings
  to fit. This is what's described in the ALS-WR paper we link to on
  the
  Spark web site, in equation 5
 
  (http://www.grappa.univ-lille3.fr/~mary/cours/stats/centrale/reco/paper/MatrixFactorizationALS.pdf)
 
  I think this also gets you the scale-invariance? For every
  additional
  rating from user i to product j, you add one new term to the
  squared-error sum, (r_ij - u_i . m_j)^2, but also, you'd increase
  the
  regularization term by lambda * (|u_i|^2 + |m_j|^2)  They are at
  least
  both increasing about linearly as ratings increase. If the
  regularization term is multiplied by the total number of users and
  products in the model

Re: MLLib SVMWithSGD is failing for large dataset

2015-05-18 Thread Xiangrui Meng
Reducing the number of instances won't help in this case. We use the
driver to collect partial gradients. Even with tree aggregation, it
still puts heavy workload on the driver with 20M features. Please try
to reduce the number of partitions before training. We are working on
a more scalable implementation of logistic regression now, which
should be able to solve this problem efficiently. -Xiangrui

On Tue, Apr 28, 2015 at 3:43 PM, sarathkrishn...@gmail.com
sarathkrishn...@gmail.com wrote:
 Hi,

 I'm just calling the standard SVMWithSGD implementation of Spark's MLLib.
 I'm not using any method like collect.

 Thanks,
 Sarath

 On Tue, Apr 28, 2015 at 4:35 PM, ai he heai0...@gmail.com wrote:

 Hi Sarath,

 It might be questionable to set num-executors as 64 if you only has 8
 nodes. Do you use any action like collect which will overwhelm the
 driver since you have a large dataset?

 Thanks

 On Tue, Apr 28, 2015 at 10:50 AM, sarath sarathkrishn...@gmail.com
 wrote:
 
  I am trying to train a large dataset consisting of 8 million data points
  and
  20 million features using SVMWithSGD. But it is failing after running
  for
  some time. I tried increasing num-partitions, driver-memory,
  executor-memory, driver-max-resultSize. Also I tried by reducing the
  size of
  dataset from 8 million to 25K (keeping number of features same 20 M).
  But
  after using the entire 64GB driver memory for 20 to 30 min it failed.
 
  I'm using a cluster of 8 nodes (each with 8 cores and 64G RAM).
  executor-memory - 60G
  driver-memory - 60G
  num-executors - 64
  And other default settings
 
  This is the error log :
 
  15/04/20 11:51:09 WARN NativeCodeLoader: Unable to load native-hadoop
  library for your platform... using builtin-java classes where applicable
  15/04/20 11:51:29 WARN BLAS: Failed to load implementation from:
  com.github.fommil.netlib.NativeSystemBLAS
  15/04/20 11:51:29 WARN BLAS: Failed to load implementation from:
  com.github.fommil.netlib.NativeRefBLAS
  15/04/20 11:56:11 WARN TransportChannelHandler: Exception in connection
  from
  xxx.xxx.xxx.net/xxx.xxx.xxx.xxx:41029
  java.io.IOException: Connection reset by peer
  at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
  at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
  ...
  15/04/20 11:56:11 ERROR TransportResponseHandler: Still have 7 requests
  outstanding when connection from xxx.xxx.xxx.net/xxx.xxx.xxx.xxx:41029
  is
  closed
  15/04/20 11:56:11 ERROR OneForOneBlockFetcher: Failed while starting
  block
  fetches
  java.io.IOException: Connection reset by peer
  at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
  at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
  ...
  15/04/20 11:56:11 ERROR OneForOneBlockFetcher: Failed while starting
  block
  fetches
  java.io.IOException: Connection reset by peer
  at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
  at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
  ...
  15/04/20 11:56:12 ERROR RetryingBlockFetcher: Exception while beginning
  fetch of 1 outstanding blocks
  java.io.IOException: Failed to connect to
  xxx.xxx.xxx.net/xxx.xxx.xxx.xxx:41029
  at
 
  org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191)
  at
 
  org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
  at
 
  org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78)
  at
 
  org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
  at
 
  org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
  at
 
  org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:87)
  at
 
  org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:149)
  at
 
  org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:290)
  at
 
  org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:53)
  at
  scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
  at
 
  org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
  at
 
  org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
  at
  org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:91)
  at
 
  org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:44)
  at
  org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
  at
  org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
   

Re: StandardScaler failing with OOM errors in PySpark

2015-05-18 Thread Xiangrui Meng
AFAIK, there are two places where you can specify the driver memory.
One is via spark-summit --driver-memory and the other is via
spark.driver.memory in spark-defaults.conf. Please try these
approaches and see whether they work or not. You can find detailed
instructions at http://spark.apache.org/docs/latest/configuration.html
and http://spark.apache.org/docs/latest/submitting-applications.html.
-Xiangrui

On Tue, Apr 28, 2015 at 4:06 AM, Rok Roskar rokros...@gmail.com wrote:
 That's exactly what I'm saying -- I specify the memory options using spark
 options, but this is not reflected in how the JVM is created. No matter
 which memory settings I specify, the JVM for the driver is always made with
 512Mb of memory. So I'm not sure if this is a feature or a bug?

 rok

 On Mon, Apr 27, 2015 at 6:54 PM, Xiangrui Meng men...@gmail.com wrote:

 You might need to specify driver memory in spark-submit instead of
 passing JVM options. spark-submit is designed to handle different
 deployments correctly. -Xiangrui

 On Thu, Apr 23, 2015 at 4:58 AM, Rok Roskar rokros...@gmail.com wrote:
  ok yes, I think I have narrowed it down to being a problem with driver
  memory settings. It looks like the application master/driver is not
  being
  launched with the settings specified:
 
  For the driver process on the main node I see -XX:MaxPermSize=128m
  -Xms512m
  -Xmx512m as options used to start the JVM, even though I specified
 
  'spark.yarn.am.memory', '5g'
  'spark.yarn.am.memoryOverhead', '2000'
 
  The info shows that these options were read:
 
  15/04/23 13:47:47 INFO yarn.Client: Will allocate AM container, with
  7120 MB
  memory including 2000 MB overhead
 
  Is there some reason why these options are being ignored and instead
  starting the driver with just 512Mb of heap?
 
  On Thu, Apr 23, 2015 at 8:06 AM, Rok Roskar rokros...@gmail.com wrote:
 
  the feature dimension is 800k.
 
  yes, I believe the driver memory is likely the problem since it doesn't
  crash until the very last part of the tree aggregation.
 
  I'm running it via pyspark through YARN -- I have to run in client mode
  so
  I can't set spark.driver.memory -- I've tried setting the
  spark.yarn.am.memory and overhead parameters but it doesn't seem to
  have an
  effect.
 
  Thanks,
 
  Rok
 
  On Apr 23, 2015, at 7:47 AM, Xiangrui Meng men...@gmail.com wrote:
 
   What is the feature dimension? Did you set the driver memory?
   -Xiangrui
  
   On Tue, Apr 21, 2015 at 6:59 AM, rok rokros...@gmail.com wrote:
   I'm trying to use the StandardScaler in pyspark on a relatively
   small
   (a few
   hundred Mb) dataset of sparse vectors with 800k features. The fit
   method of
   StandardScaler crashes with Java heap space or Direct buffer memory
   errors.
   There should be plenty of memory around -- 10 executors with 2 cores
   each
   and 8 Gb per core. I'm giving the executors 9g of memory and have
   also
   tried
   lots of overhead (3g), thinking it might be the array creation in
   the
   aggregators that's causing issues.
  
   The bizarre thing is that this isn't always reproducible --
   sometimes
   it
   actually works without problems. Should I be setting up executors
   differently?
  
   Thanks,
  
   Rok
  
  
  
  
   --
   View this message in context:
  
   http://apache-spark-user-list.1001560.n3.nabble.com/StandardScaler-failing-with-OOM-errors-in-PySpark-tp22593.html
   Sent from the Apache Spark User List mailing list archive at
   Nabble.com.
  
  
   -
   To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
   For additional commands, e-mail: user-h...@spark.apache.org
  
 
 



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: bug: numClasses is not a valid argument of LogisticRegressionWithSGD

2015-05-18 Thread Xiangrui Meng
LogisticRegressionWithSGD doesn't support multi-class. Please use
LogisticRegressionWithLBFGS instead. -Xiangrui

On Mon, Apr 27, 2015 at 12:37 PM, Pagliari, Roberto
rpagli...@appcomsci.com wrote:
 With the Python APIs, the available arguments I got (using inspect module)
 are the following:

 ['cls', 'data', 'iterations', 'step', 'miniBatchFraction', 'initialWeights',
 'regParam', 'regType', 'intercept']

 numClasses is not available. Can someone comment on this?

 Thanks,






-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: OOM error with GMMs on 4GB dataset

2015-05-06 Thread Xiangrui Meng
Did you set `--driver-memory` with spark-submit? -Xiangrui

On Mon, May 4, 2015 at 5:16 PM, Vinay Muttineni vmuttin...@ebay.com wrote:
 Hi, I am training a GMM with 10 gaussians on a 4 GB dataset(720,000 * 760).
 The spark (1.3.1) job is allocated 120 executors with 6GB each and the
 driver also has 6GB.
 Spark Config Params:

 .set(spark.hadoop.validateOutputSpecs,
 false).set(spark.dynamicAllocation.enabled,
 false).set(spark.driver.maxResultSize,
 4g).set(spark.default.parallelism, 300).set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer).set(spark.kryoserializer.buffer.mb,
 500).set(spark.akka.frameSize, 256).set(spark.akka.timeout, 300)

 However, at the aggregate step (Line 168)
 val sums = breezeData.aggregate(ExpectationSum.zero(k, d))(compute.value, _
 += _)

 I get OOM error and the application hangs indefinitely. Is this an issue or
 am I missing something?
 java.lang.OutOfMemoryError: Java heap space
 at akka.util.CompactByteString$.apply(ByteString.scala:410)
 at akka.util.ByteString$.apply(ByteString.scala:22)
 at
 akka.remote.transport.netty.TcpHandlers$class.onMessage(TcpSupport.scala:45)
 at
 akka.remote.transport.netty.TcpServerHandler.onMessage(TcpSupport.scala:57)
 at
 akka.remote.transport.netty.NettyServerHelpers$class.messageReceived(NettyHelpers.scala:43)
 at
 akka.remote.transport.netty.ServerHandler.messageReceived(NettyTransport.scala:180)
 at
 org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
 at
 org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462)
 at
 org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443)
 at
 org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:310)
 at
 org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
 at
 org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
 at
 org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
 at
 org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108)
 at
 org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318)
 at
 org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
 at
 org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)

 15/05/04 16:23:38 ERROR util.Utils: Uncaught exception in thread
 task-result-getter-2
 java.lang.OutOfMemoryError: Java heap space
 Exception in thread task-result-getter-2 java.lang.OutOfMemoryError: Java
 heap space
 15/05/04 16:23:45 INFO scheduler.TaskSetManager: Finished task 1070.0 in
 stage 6.0 (TID 8276) in 382069 ms on [] (160/3600)
 15/05/04 16:23:54 WARN channel.DefaultChannelPipeline: An exception was
 thrown by a user handler while handling an exception event ([id: 0xc57da871,
 ] EXCEPTION: java.lang.OutOfMemoryError: Java heap space)
 java.lang.OutOfMemoryError: Java heap space
 15/05/04 16:23:55 WARN channel.DefaultChannelPipeline: An exception was
 thrown by a user handler while handling an exception event ([id: 0x3c3dbb0c,
 ] EXCEPTION: java.lang.OutOfMemoryError: Java heap space)
 15/05/04 16:24:45 ERROR actor.ActorSystemImpl: Uncaught fatal error from
 thread [sparkDriver-akka.remote.default-remote-dispatcher-6] shutting down
 ActorSystem [sparkDriver]



 Thanks!
 Vinay

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Getting error running MLlib example with new cluster

2015-04-27 Thread Xiangrui Meng
How did you run the example app? Did you use spark-submit? -Xiangrui

On Thu, Apr 23, 2015 at 2:27 PM, Su She suhsheka...@gmail.com wrote:
 Sorry, accidentally sent the last email before finishing.

 I had asked this question before, but wanted to ask again as I think
 it is now related to my pom file or project setup. Really appreciate the help!

 I have been trying on/off for the past month to try to run this MLlib
 example: 
 https://github.com/databricks/learning-spark/blob/master/src/main/scala/com/oreilly/learningsparkexamples/scala/MLlib.scala

 I am able to build the project successfully. When I run it, it returns:

 features in spam: 8
 features in ham: 7

 and then freezes. According to the UI, the description of the job is
 count at DataValidators.scala.38. This corresponds to this line in
 the code:

 val model = lrLearner.run(trainingData)

 I've tried just about everything I can think of...changed numFeatures
 from 1 - 10,000, set executor memory to 1g, set up a new cluster, at
 this point I think I might have missed dependencies as that has
 usually been the problem in other spark apps I have tried to run. This
 is my pom file, that I have used for other successful spark apps.
 Please let me know if you think I need any additional dependencies or
 there are incompatibility issues, or a pom.xml that is better to use.
 Thank you!

 Cluster information:

 Spark version: 1.2.0-SNAPSHOT (in my older cluster it is 1.2.0)
 java version 1.7.0_25
 Scala version: 2.10.4
 hadoop version: hadoop 2.5.0-cdh5.3.3 (older cluster was 5.3.0)



 project xmlns = http://maven.apache.org/POM/4.0.0;
 xmlns:xsi=http://w3.org/2001/XMLSchema-instance; xsi:schemaLocation
 =http://maven.apache.org/POM/4.0.0
 http://maven.apache.org/maven-v4_0_0.xsd;
 groupId edu.berkely/groupId
 artifactId simple-project /artifactId
 modelVersion 4.0.0/modelVersion
 name Simple Project /name
 packaging jar /packaging
 version 1.0 /version
 repositories
 repository
 idcloudera/id
 url http://repository.cloudera.com/artifactory/cloudera-repos//url
 /repository

 repository
 idscala-tools.org/id
 nameScala-tools Maven2 Repository/name
 urlhttp://scala-tools.org/repo-releases/url
 /repository

 /repositories

 pluginRepositories
 pluginRepository
 idscala-tools.org/id
 nameScala-tools Maven2 Repository/name
 urlhttp://scala-tools.org/repo-releases/url
 /pluginRepository
 /pluginRepositories

 build
 plugins
 plugin
 groupIdorg.scala-tools/groupId
 artifactIdmaven-scala-plugin/artifactId
 executions

 execution
 idcompile/id
 goals
 goalcompile/goal
 /goals
 phasecompile/phase
 /execution
 execution
 idtest-compile/id
 goals
 goaltestCompile/goal
 /goals
 phasetest-compile/phase
 /execution
 execution
phaseprocess-resources/phase
goals
  goalcompile/goal
/goals
 /execution
 /executions
 /plugin
 plugin
 artifactIdmaven-compiler-plugin/artifactId
 configuration
 source1.7/source
 target1.7/target
 /configuration
 /plugin
 /plugins
 /build


 dependencies
 dependency !--Spark dependency --
 groupId org.apache.spark/groupId
 artifactIdspark-core_2.10/artifactId
 version1.2.0-cdh5.3.0/version
 /dependency

 dependency
 groupIdorg.apache.hadoop/groupId
 artifactIdhadoop-client/artifactId
 version2.5.0-mr1-cdh5.3.0/version
 /dependency

 dependency
 groupIdorg.scala-lang/groupId
 artifactIdscala-library/artifactId
 version2.10.4/version
 /dependency

 dependency
 groupIdorg.scala-lang/groupId
 artifactIdscala-compiler/artifactId
 version2.10.4/version
 /dependency

 dependency
 groupIdcom.101tec/groupId
 artifactIdzkclient/artifactId
 version0.3/version
 /dependency

  

Re: MLlib - Collaborative Filtering - trainImplicit task size

2015-04-27 Thread Xiangrui Meng
Could you try different ranks and see whether the task size changes?
We do use YtY in the closure, which should work the same as broadcast.
If that is the case, it should be safe to ignore this warning.
-Xiangrui

On Thu, Apr 23, 2015 at 4:52 AM, Christian S. Perone
christian.per...@gmail.com wrote:
 All these warnings come from ALS iterations, from flatMap and also from
 aggregate, for instance the origin of the state where the flatMap is showing
 these warnings (w/ Spark 1.3.0, they are also shown in Spark 1.3.1):

 org.apache.spark.rdd.RDD.flatMap(RDD.scala:296)
 org.apache.spark.ml.recommendation.ALS$.org$apache$spark$ml$recommendation$ALS$$computeFactors(ALS.scala:1065)
 org.apache.spark.ml.recommendation.ALS$$anonfun$train$3.apply(ALS.scala:530)
 org.apache.spark.ml.recommendation.ALS$$anonfun$train$3.apply(ALS.scala:527)
 scala.collection.immutable.Range.foreach(Range.scala:141)
 org.apache.spark.ml.recommendation.ALS$.train(ALS.scala:527)
 org.apache.spark.mllib.recommendation.ALS.run(ALS.scala:203)

 And from the aggregate:

 org.apache.spark.rdd.RDD.aggregate(RDD.scala:968)
 org.apache.spark.ml.recommendation.ALS$.computeYtY(ALS.scala:1112)
 org.apache.spark.ml.recommendation.ALS$.org$apache$spark$ml$recommendation$ALS$$computeFactors(ALS.scala:1064)
 org.apache.spark.ml.recommendation.ALS$$anonfun$train$3.apply(ALS.scala:538)
 org.apache.spark.ml.recommendation.ALS$$anonfun$train$3.apply(ALS.scala:527)
 scala.collection.immutable.Range.foreach(Range.scala:141)
 org.apache.spark.ml.recommendation.ALS$.train(ALS.scala:527)
 org.apache.spark.mllib.recommendation.ALS.run(ALS.scala:203)



 On Thu, Apr 23, 2015 at 2:49 AM, Xiangrui Meng men...@gmail.com wrote:

 This is the size of the serialized task closure. Is stage 246 part of
 ALS iterations, or something before or after it? -Xiangrui

 On Tue, Apr 21, 2015 at 10:36 AM, Christian S. Perone
 christian.per...@gmail.com wrote:
  Hi Sean, thanks for the answer. I tried to call repartition() on the
  input
  with many different sizes and it still continues to show that warning
  message.
 
  On Tue, Apr 21, 2015 at 7:05 AM, Sean Owen so...@cloudera.com wrote:
 
  I think maybe you need more partitions in your input, which might make
  for smaller tasks?
 
  On Tue, Apr 21, 2015 at 2:56 AM, Christian S. Perone
  christian.per...@gmail.com wrote:
   I keep seeing these warnings when using trainImplicit:
  
   WARN TaskSetManager: Stage 246 contains a task of very large size
   (208
   KB).
   The maximum recommended task size is 100 KB.
  
   And then the task size starts to increase. Is this a known issue ?
  
   Thanks !
  
   --
   Blog | Github | Twitter
   Forgive, O Lord, my little jokes on Thee, and I'll forgive Thy great
   big
   joke on me.
 
 
 
 
  --
  Blog | Github | Twitter
  Forgive, O Lord, my little jokes on Thee, and I'll forgive Thy great
  big
  joke on me.




 --
 Blog | Github | Twitter
 Forgive, O Lord, my little jokes on Thee, and I'll forgive Thy great big
 joke on me.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: StandardScaler failing with OOM errors in PySpark

2015-04-27 Thread Xiangrui Meng
You might need to specify driver memory in spark-submit instead of
passing JVM options. spark-submit is designed to handle different
deployments correctly. -Xiangrui

On Thu, Apr 23, 2015 at 4:58 AM, Rok Roskar rokros...@gmail.com wrote:
 ok yes, I think I have narrowed it down to being a problem with driver
 memory settings. It looks like the application master/driver is not being
 launched with the settings specified:

 For the driver process on the main node I see -XX:MaxPermSize=128m -Xms512m
 -Xmx512m as options used to start the JVM, even though I specified

 'spark.yarn.am.memory', '5g'
 'spark.yarn.am.memoryOverhead', '2000'

 The info shows that these options were read:

 15/04/23 13:47:47 INFO yarn.Client: Will allocate AM container, with 7120 MB
 memory including 2000 MB overhead

 Is there some reason why these options are being ignored and instead
 starting the driver with just 512Mb of heap?

 On Thu, Apr 23, 2015 at 8:06 AM, Rok Roskar rokros...@gmail.com wrote:

 the feature dimension is 800k.

 yes, I believe the driver memory is likely the problem since it doesn't
 crash until the very last part of the tree aggregation.

 I'm running it via pyspark through YARN -- I have to run in client mode so
 I can't set spark.driver.memory -- I've tried setting the
 spark.yarn.am.memory and overhead parameters but it doesn't seem to have an
 effect.

 Thanks,

 Rok

 On Apr 23, 2015, at 7:47 AM, Xiangrui Meng men...@gmail.com wrote:

  What is the feature dimension? Did you set the driver memory? -Xiangrui
 
  On Tue, Apr 21, 2015 at 6:59 AM, rok rokros...@gmail.com wrote:
  I'm trying to use the StandardScaler in pyspark on a relatively small
  (a few
  hundred Mb) dataset of sparse vectors with 800k features. The fit
  method of
  StandardScaler crashes with Java heap space or Direct buffer memory
  errors.
  There should be plenty of memory around -- 10 executors with 2 cores
  each
  and 8 Gb per core. I'm giving the executors 9g of memory and have also
  tried
  lots of overhead (3g), thinking it might be the array creation in the
  aggregators that's causing issues.
 
  The bizarre thing is that this isn't always reproducible -- sometimes
  it
  actually works without problems. Should I be setting up executors
  differently?
 
  Thanks,
 
  Rok
 
 
 
 
  --
  View this message in context:
  http://apache-spark-user-list.1001560.n3.nabble.com/StandardScaler-failing-with-OOM-errors-in-PySpark-tp22593.html
  Sent from the Apache Spark User List mailing list archive at
  Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: gridsearch - python

2015-04-27 Thread Xiangrui Meng
We will try to make them available in 1.4, which is coming soon. -Xiangrui

On Thu, Apr 23, 2015 at 10:18 PM, Pagliari, Roberto
rpagli...@appcomsci.com wrote:
 I know grid search with cross validation is not supported. However, I was
 wondering if there is something availalable for the time being.



 Thanks,





 From: Punyashloka Biswal [mailto:punya.bis...@gmail.com]
 Sent: Thursday, April 23, 2015 9:06 PM
 To: Pagliari, Roberto; user@spark.apache.org
 Subject: Re: gridsearch - python



 https://issues.apache.org/jira/browse/SPARK-7022.

 Punya



 On Thu, Apr 23, 2015 at 5:47 PM Pagliari, Roberto rpagli...@appcomsci.com
 wrote:

 Can anybody point me to an example, if available, about gridsearch with
 python?



 Thank you,



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: setting cost in linear SVM [Python]

2015-04-23 Thread Xiangrui Meng
If by C you mean the parameter C in LIBLINEAR, the corresponding
parameter in MLlib is regParam:
https://github.com/apache/spark/blob/master/python/pyspark/mllib/classification.py#L273,
while regParam = 1/C. -Xiangrui

On Wed, Apr 22, 2015 at 3:25 PM, Pagliari, Roberto
rpagli...@appcomsci.com wrote:
 Is there a way to set the cost value C when using linear SVM?

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: StackOverflow Error when run ALS with 100 iterations

2015-04-23 Thread Xiangrui Meng
ALS.setCheckpointInterval was added in Spark 1.3.1. You need to
upgrade Spark to use this feature. -Xiangrui

On Wed, Apr 22, 2015 at 9:03 PM, amghost zhengweita...@outlook.com wrote:
 Hi, would you please how to checkpoint the training set rdd since all things
 are done in ALS.train method.



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/StackOverflow-Error-when-run-ALS-with-100-iterations-tp4296p22619.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: MLlib - Collaborative Filtering - trainImplicit task size

2015-04-22 Thread Xiangrui Meng
This is the size of the serialized task closure. Is stage 246 part of
ALS iterations, or something before or after it? -Xiangrui

On Tue, Apr 21, 2015 at 10:36 AM, Christian S. Perone
christian.per...@gmail.com wrote:
 Hi Sean, thanks for the answer. I tried to call repartition() on the input
 with many different sizes and it still continues to show that warning
 message.

 On Tue, Apr 21, 2015 at 7:05 AM, Sean Owen so...@cloudera.com wrote:

 I think maybe you need more partitions in your input, which might make
 for smaller tasks?

 On Tue, Apr 21, 2015 at 2:56 AM, Christian S. Perone
 christian.per...@gmail.com wrote:
  I keep seeing these warnings when using trainImplicit:
 
  WARN TaskSetManager: Stage 246 contains a task of very large size (208
  KB).
  The maximum recommended task size is 100 KB.
 
  And then the task size starts to increase. Is this a known issue ?
 
  Thanks !
 
  --
  Blog | Github | Twitter
  Forgive, O Lord, my little jokes on Thee, and I'll forgive Thy great
  big
  joke on me.




 --
 Blog | Github | Twitter
 Forgive, O Lord, my little jokes on Thee, and I'll forgive Thy great big
 joke on me.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark 1.3.1 Dataframe breaking ALS.train?

2015-04-22 Thread Xiangrui Meng
The patched was merged and it will be included in 1.3.2 and 1.4.0.
Thanks for reporting the bug! -Xiangrui

On Tue, Apr 21, 2015 at 2:51 PM, ayan guha guha.a...@gmail.com wrote:
 Thank you all.

 On 22 Apr 2015 04:29, Xiangrui Meng men...@gmail.com wrote:

 SchemaRDD subclasses RDD in 1.2, but DataFrame is no longer an RDD in
 1.3. We should allow DataFrames in ALS.train. I will submit a patch.
 You can use `ALS.train(training.rdd, ...)` for now as a workaround.
 -Xiangrui

 On Tue, Apr 21, 2015 at 10:51 AM, Joseph Bradley jos...@databricks.com
 wrote:
  Hi Ayan,
 
  If you want to use DataFrame, then you should use the Pipelines API
  (org.apache.spark.ml.*) which will take DataFrames:
 
  http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.ml.recommendation.ALS
 
  In the examples/ directory for ml/, you can find a MovieLensALS example.
 
  Good luck!
  Joseph
 
  On Tue, Apr 21, 2015 at 4:58 AM, ayan guha guha.a...@gmail.com wrote:
 
  Hi
 
  I am getting an error
 
  Also, I am getting an error in mlib.ALS.train function when passing
  dataframe (do I need to convert the DF to RDD?)
 
  Code:
  training = ssc.sql(select userId,movieId,rating from ratings where
  partitionKey  6).cache()
  print type(training)
  model = ALS.train(training,rank,numIter,lmbda)
 
  Error:
  class 'pyspark.sql.dataframe.DataFrame'
 
  Traceback (most recent call last):
File D:\Project\Spark\code\movie_sql.py, line 109, in module
  bestConf =
  getBestModel(sc,ssc,training,validation,validationNoRating)
File D:\Project\Spark\code\movie_sql.py, line 54, in getBestModel
  model = ALS.train(trainingRDD,rank,numIter,lmbda)
File
 
  D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\mllib\recommendation.py,
  line 139, in train
  model = callMLlibFunc(trainALSModel, cls._prepare(ratings), rank,
  iterations,
File
 
  D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\mllib\recommendation.py,
  line 127, in _prepare
  assert isinstance(ratings, RDD), ratings should be RDD
  AssertionError: ratings should be RDD
 
  It was working fine in 1.2.0 (till last night :))
 
  Any solution? I am thinking to map the training dataframe back to a
  RDD,
  byt will lose the schema information.
 
  Best
  Ayan
 
  On Mon, Apr 20, 2015 at 10:23 PM, ayan guha guha.a...@gmail.com
  wrote:
 
  Hi
  Just upgraded to Spark 1.3.1.
 
  I am getting an warning
 
  Warning (from warnings module):
File
 
  D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\sql\context.py,
  line 191
  warnings.warn(inferSchema is deprecated, please use
  createDataFrame
  instead)
  UserWarning: inferSchema is deprecated, please use createDataFrame
  instead
 
  However, documentation still says to use inferSchema.
  Here: http://spark.apache.org/docs/latest/sql-programming-guide.htm in
  section
 
  Also, I am getting an error in mlib.ALS.train function when passing
  dataframe (do I need to convert the DF to RDD?)
 
  Code:
  training = ssc.sql(select userId,movieId,rating from ratings where
  partitionKey  6).cache()
  print type(training)
  model = ALS.train(training,rank,numIter,lmbda)
 
  Error:
  class 'pyspark.sql.dataframe.DataFrame'
  Rank:8 Lmbda:1.0 iteration:10
 
  Traceback (most recent call last):
File D:\Project\Spark\code\movie_sql.py, line 109, in module
  bestConf =
  getBestModel(sc,ssc,training,validation,validationNoRating)
File D:\Project\Spark\code\movie_sql.py, line 54, in getBestModel
  model = ALS.train(trainingRDD,rank,numIter,lmbda)
File
 
  D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\mllib\recommendation.py,
  line 139, in train
  model = callMLlibFunc(trainALSModel, cls._prepare(ratings),
  rank,
  iterations,
File
 
  D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\mllib\recommendation.py,
  line 127, in _prepare
  assert isinstance(ratings, RDD), ratings should be RDD
  AssertionError: ratings should be RDD
 
  --
  Best Regards,
  Ayan Guha
 
 
 
 
  --
  Best Regards,
  Ayan Guha
 
 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Problem with using Spark ML

2015-04-22 Thread Xiangrui Meng
Please try reducing the step size. The native BLAS library is not
required. -Xiangrui

On Tue, Apr 21, 2015 at 5:15 AM, Staffan staffan.arvids...@gmail.com wrote:
 Hi,
 I've written an application that performs some machine learning on some
 data. I've validated that the data _should_ give a good output with a decent
 RMSE by using Lib-SVM:
 Mean squared error = 0.00922063 (regression)
 Squared correlation coefficient = 0.9987 (regression)

 When I try to use Spark ML to do the exact same thing I get:
 Mean Squared Error = 8.466193152067944E224

 Which is somewhat worse.. I've tried to look at the data before it's
 inputted to the model, printed that data to file (which is actually the data
 used when I got the result from Lib-SVM above). Somewhere there much be a
 huge mistake, but I cannot place it somewhere in my code (see below).
 traningLP and testLP are training and test-data, in RDD[LabeledPoint].

 // Generate model
 val model_gen = new RidgeRegressionWithSGD();
 val model = model_gen.run(trainingLP);

 // Predict on the test-data
 val valuesAndPreds = testLP.map { point =
 val prediction = model.predict(point.features);
 println(label:  + point.label + , pred:  + prediction);
 (point.label, prediction);
 }
 val MSE = valuesAndPreds.map{case(v, p) = math.pow((v - p), 2)}.mean();
 println(Mean Squared Error =  + MSE)


 I've printed label and prediction-values for each data-point in the testset,
 and the result is something like this;
 label: 5.04, pred: -4.607899000641277E112
 label: 3.59, pred: -3.96787105480399E112
 label: 5.06, pred: -2.8263294374576145E112
 label: 2.85, pred: -1.1536508029072844E112
 label: 2.1, pred: -4.269312783707508E111
 label: 2.75, pred: -3.0072665148591558E112
 label: -0.29, pred: -2.035681731641989E112
 label: 1.98, pred: -3.163404340354783E112

 So there is obviously something wrong with the prediction step. I'm using
 the SparseVector representation of the Vector in LabeledPoint, looking
 something like this for reference (shortened for convenience);
 (-1.59,(2080,[29,59,62,74,127,128,131,144,149,175,198,200,239,247,267,293,307,364,374,393,410,424,425,431,448,469,477,485,501,525,532,533,538,560,..],[1.0,1.0,2.0,8.0,1.0,1.0,6.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,5.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,3.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0,8.0,2.0,1.0,1.0,..]))
 (-1.75,(2080,[103,131,149,208,296,335,520,534,603,620,661,694,709,748,859,1053,1116,1156,1186,1207,1208,1223,1256,1278,1356,1375,1399,1480,1569,..],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,4.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,2.0,1.0,2.0,2.0,2.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0,2.0,4.0,1.0,7.0,1.0,3.0,2.0,1.0]))

 I do get one type of warning, but that's about it! (And as to my
 understanding, this native code is not required to get the correct results,
 only to improve performance).
 6010 [main] WARN  com.github.fommil.netlib.BLAS  - Failed to load
 implementation from: com.github.fommil.netlib.NativeSystemBLAS
 6011 [main] WARN  com.github.fommil.netlib.BLAS  - Failed to load
 implementation from: com.github.fommil.netlib.NativeRefBLAS

 So where do I go from here?



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Problem-with-using-Spark-ML-tp22591.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: [MLlib] fail to run word2vec

2015-04-22 Thread Xiangrui Meng
We store the vectors on the driver node. So it is hard to handle a
really large vocabulary. You can use setMinCount to filter out
infrequent word to reduce the model size. -Xiangrui

On Wed, Apr 22, 2015 at 12:32 AM, gm yu husty...@gmail.com wrote:
 When use Mllib.Word2Vec, I meet the following error:

  allocating large
 array--thread_id[0x7ff2741ca000]--thread_name[Driver]--array_size[1146093680
 bytes]--array_length[1146093656 elememts]
 prio=10 tid=0x7ff2741ca000 nid=0x1405f runnable
   at java.util.Arrays.copyOf(Arrays.java:2786)
   at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:94)
   - locked 0x7ff33f7fafd0 (a java.io.ByteArrayOutputStream)
   at
 java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1812)
   at
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1504)
   at 
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483)
   at
 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400)
   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158)
   at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1346)
   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1154)
   at
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518)
   at 
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483)
   at
 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400)
   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158)
   at
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518)
   at 
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483)
   at
 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400)
   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158)
   at
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518)
   at 
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483)
   at
 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400)
   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158)
   at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:330)
   at
 org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
   at
 org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)
   at
 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
   at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
   at org.apache.spark.SparkContext.clean(SparkContext.scala:1627)
   at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:635)
   at org.apache.spark.mllib.feature.Word2Vec.fit(Word2Vec.scala:270)
   at com.taobao.changrui.SynonymFind$.main(SynonymFind.scala:79)
   at com.taobao.changrui.SynonymFind.main(SynonymFind.scala)
   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
   at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
   at java.lang.reflect.Method.invoke(Method.java:597)
   at
 org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:516)


 The data size is: 100M+ sentences, 100M+ words

 Jos Setting is: 50 executors with 20GB and 4cores, the driver memory is 30GB


 Any ideas? Thank you.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: the indices of SparseVector must be ordered while computing SVD

2015-04-22 Thread Xiangrui Meng
Having ordered indices is a contract of SparseVector:
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.linalg.SparseVector.
We do not verify it for performance. -Xiangrui

On Wed, Apr 22, 2015 at 8:26 AM, yaochunnan yaochun...@gmail.com wrote:
 Hi all,
 I am using Spark 1.3.1 to write a Spectral Clustering algorithm. This really
 confused me today. At first I thought my implementation is wrong. It turns
 out it's an issue in MLlib. Fortunately, I've figured it out.

 I suggest to add a hint on user document of MLlib ( as far as I know, there
 have not been such hints yet) that  indices of Local Sparse Vector must be
 ordered in ascending manner. Because of ignorance of this point, I spent a
 lot of time looking for reasons why computeSVD of RowMatrix did not run
 correctly on Sparse data. I don't know the influence of Sparse Vector
 without ordered indices on other functions, but I believe it is necessary to
 let the users know or fix it. Actually, it's very easy to fix. Just add a
 sortBy function in internal construction of SparseVector.

 Here is an example to reproduce the affect of unordered Sparse Vector on
 computeSVD.
 
 //in spark-shell, Spark 1.3.1
  import org.apache.spark.mllib.linalg.distributed.RowMatrix
  import org.apache.spark.mllib.linalg.{SparseVector, DenseVector, Vector,
 Vectors}

   val sparseData_ordered = Seq(
 Vectors.sparse(3, Array(1, 2), Array(1.0, 2.0)),
 Vectors.sparse(3, Array(0,1,2), Array(3.0, 4.0, 5.0)),
 Vectors.sparse(3, Array(0,1,2), Array(6.0, 7.0, 8.0)),
 Vectors.sparse(3, Array(0,2), Array(9.0, 1.0))
   )
   val sparseMat_ordered = new RowMatrix(sc.parallelize(sparseData_ordered,
 2))

   val sparseData_not_ordered = Seq(
 Vectors.sparse(3, Array(1, 2), Array(1.0, 2.0)),
 Vectors.sparse(3, Array(2,1,0), Array(5.0,4.0,3.0)),
 Vectors.sparse(3, Array(0,1,2), Array(6.0, 7.0, 8.0)),
 Vectors.sparse(3, Array(2,0), Array(1.0,9.0))
   )
  val sparseMat_not_ordered = new
 RowMatrix(sc.parallelize(sparseData_not_ordered, 2))

 //apparently, sparseMat_ordered and sparseMat_not_ordered are essentially
 the same matirx
 //however, the computeSVD result of these two matrixes are different. Users
 should be notified about this situation.
   println(sparseMat_ordered.computeSVD(2,
 true).U.rows.collect.mkString(\n))
   println(===)
   println(sparseMat_not_ordered.computeSVD(2,
 true).U.rows.collect.mkString(\n))
 ==
 The results are:
 ordered:
 [-0.10972870132786407,-0.18850811494220537]
 [-0.44712472003608356,-0.24828866611663725]
 [-0.784520738744303,-0.3080692172910691]
 [-0.4154110101064339,0.8988385762953358]

 not ordered:
 [-0.10830447119599484,-0.1559341848984378]
 [-0.4522713511277327,-0.23449829541447448]
 [-0.7962382310594706,-0.3130624059305111]
 [-0.43131320303494614,0.8453864703362308]

 Looking into this issue, I can see it's reason locates in
 RowMatrix.scala(line 629). The implementation of Sparse dspr here requires
 ordered indices. Because it is scanning the indices consecutively to skip
 empty columns.








 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/the-indices-of-SparseVector-must-be-ordered-while-computing-SVD-tp22611.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: LDA code little error @Xiangrui Meng

2015-04-22 Thread Xiangrui Meng
Thanks! That's a bug .. -Xiangrui

On Wed, Apr 22, 2015 at 9:34 PM, buring qyqb...@gmail.com wrote:
 Hi:
 there is a little error in source code LDA.scala at line 180, as
 follows:
def setBeta(beta: Double): this.type = setBeta(beta)

which cause java.lang.StackOverflowError. It's easy to see there is
 error



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/LDA-code-little-error-Xiangrui-Meng-tp22621.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: StandardScaler failing with OOM errors in PySpark

2015-04-22 Thread Xiangrui Meng
What is the feature dimension? Did you set the driver memory? -Xiangrui

On Tue, Apr 21, 2015 at 6:59 AM, rok rokros...@gmail.com wrote:
 I'm trying to use the StandardScaler in pyspark on a relatively small (a few
 hundred Mb) dataset of sparse vectors with 800k features. The fit method of
 StandardScaler crashes with Java heap space or Direct buffer memory errors.
 There should be plenty of memory around -- 10 executors with 2 cores each
 and 8 Gb per core. I'm giving the executors 9g of memory and have also tried
 lots of overhead (3g), thinking it might be the array creation in the
 aggregators that's causing issues.

 The bizarre thing is that this isn't always reproducible -- sometimes it
 actually works without problems. Should I be setting up executors
 differently?

 Thanks,

 Rok




 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/StandardScaler-failing-with-OOM-errors-in-PySpark-tp22593.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark 1.3.1 Dataframe breaking ALS.train?

2015-04-21 Thread Xiangrui Meng
SchemaRDD subclasses RDD in 1.2, but DataFrame is no longer an RDD in
1.3. We should allow DataFrames in ALS.train. I will submit a patch.
You can use `ALS.train(training.rdd, ...)` for now as a workaround.
-Xiangrui

On Tue, Apr 21, 2015 at 10:51 AM, Joseph Bradley jos...@databricks.com wrote:
 Hi Ayan,

 If you want to use DataFrame, then you should use the Pipelines API
 (org.apache.spark.ml.*) which will take DataFrames:
 http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.ml.recommendation.ALS

 In the examples/ directory for ml/, you can find a MovieLensALS example.

 Good luck!
 Joseph

 On Tue, Apr 21, 2015 at 4:58 AM, ayan guha guha.a...@gmail.com wrote:

 Hi

 I am getting an error

 Also, I am getting an error in mlib.ALS.train function when passing
 dataframe (do I need to convert the DF to RDD?)

 Code:
 training = ssc.sql(select userId,movieId,rating from ratings where
 partitionKey  6).cache()
 print type(training)
 model = ALS.train(training,rank,numIter,lmbda)

 Error:
 class 'pyspark.sql.dataframe.DataFrame'

 Traceback (most recent call last):
   File D:\Project\Spark\code\movie_sql.py, line 109, in module
 bestConf = getBestModel(sc,ssc,training,validation,validationNoRating)
   File D:\Project\Spark\code\movie_sql.py, line 54, in getBestModel
 model = ALS.train(trainingRDD,rank,numIter,lmbda)
   File
 D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\mllib\recommendation.py,
 line 139, in train
 model = callMLlibFunc(trainALSModel, cls._prepare(ratings), rank,
 iterations,
   File
 D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\mllib\recommendation.py,
 line 127, in _prepare
 assert isinstance(ratings, RDD), ratings should be RDD
 AssertionError: ratings should be RDD

 It was working fine in 1.2.0 (till last night :))

 Any solution? I am thinking to map the training dataframe back to a RDD,
 byt will lose the schema information.

 Best
 Ayan

 On Mon, Apr 20, 2015 at 10:23 PM, ayan guha guha.a...@gmail.com wrote:

 Hi
 Just upgraded to Spark 1.3.1.

 I am getting an warning

 Warning (from warnings module):
   File
 D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\sql\context.py,
 line 191
 warnings.warn(inferSchema is deprecated, please use createDataFrame
 instead)
 UserWarning: inferSchema is deprecated, please use createDataFrame
 instead

 However, documentation still says to use inferSchema.
 Here: http://spark.apache.org/docs/latest/sql-programming-guide.htm in
 section

 Also, I am getting an error in mlib.ALS.train function when passing
 dataframe (do I need to convert the DF to RDD?)

 Code:
 training = ssc.sql(select userId,movieId,rating from ratings where
 partitionKey  6).cache()
 print type(training)
 model = ALS.train(training,rank,numIter,lmbda)

 Error:
 class 'pyspark.sql.dataframe.DataFrame'
 Rank:8 Lmbda:1.0 iteration:10

 Traceback (most recent call last):
   File D:\Project\Spark\code\movie_sql.py, line 109, in module
 bestConf =
 getBestModel(sc,ssc,training,validation,validationNoRating)
   File D:\Project\Spark\code\movie_sql.py, line 54, in getBestModel
 model = ALS.train(trainingRDD,rank,numIter,lmbda)
   File
 D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\mllib\recommendation.py,
 line 139, in train
 model = callMLlibFunc(trainALSModel, cls._prepare(ratings), rank,
 iterations,
   File
 D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\mllib\recommendation.py,
 line 127, in _prepare
 assert isinstance(ratings, RDD), ratings should be RDD
 AssertionError: ratings should be RDD

 --
 Best Regards,
 Ayan Guha




 --
 Best Regards,
 Ayan Guha



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Streaming Linear Regression problem

2015-04-20 Thread Xiangrui Meng
Did you keep adding new files under the `train/` folder? What was the
exact warn message? -Xiangrui

On Fri, Apr 17, 2015 at 4:56 AM, barisak baris.akg...@gmail.com wrote:
 Hi,

 I write this code for just train the Stream Linear Regression, but I took no
 data found warn, so no weights were not updated.

 Is there any solution for this ?

 Thanks

 import org.apache.spark.mllib.linalg.Vectors
 import org.apache.spark.mllib.regression.{LabeledPoint,
 StreamingLinearRegressionWithSGD}
 import org.apache.spark.SparkConf
 import org.apache.spark.streaming.{Seconds, StreamingContext}


 object StreamingLinearRegression {

   def main(args: Array[String]) {

 val numFeatures=3

 val conf = new
 SparkConf().setMaster(local[2]).setAppName(StreamingLinearRegression)
 val ssc = new StreamingContext(conf, Seconds(30))

 val trainingData =
 ssc.textFileStream(/home/barisakgu/Desktop/Spark/train).map(LabeledPoint.parse).cache()
 val testData =
 ssc.textFileStream(/home/barisakgu/Desktop/Spark/test).map(LabeledPoint.parse)

 val model = new
 StreamingLinearRegressionWithSGD().setInitialWeights(Vectors.zeros(numFeatures))

 model.trainOn(trainingData)
 model.predictOnValues(testData.map(lp = (lp.label,
 lp.features))).print()

 ssc.start()
 ssc.awaitTermination()

   }

 }



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Linear-Regression-problem-tp22539.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: ChiSquared Test from user response flat files to RDD[Vector]?

2015-04-20 Thread Xiangrui Meng
You can find the user guide for vector creation here:
http://spark.apache.org/docs/latest/mllib-data-types.html#local-vector.
-Xiangrui

On Mon, Apr 20, 2015 at 2:32 PM, Dan DeCapria, CivicScience
dan.decap...@civicscience.com wrote:
 Hi Spark community,

 I'm very new to the Apache Spark community; but if this (very active) group
 is anything like the other Apache project user groups I've worked with, I'm
 going to enjoy discussions here very much. Thanks in advance!

 Use Case:
 I am trying to go from flat files of user response data, to contingency
 tables of frequency counts, to Pearson Chi-Square correlation statistics and
 perform a Chi-Squared hypothesis test.  The user response data represents a
 multiple choice question-answer (MCQ) format. The goal is to compute all
 choose-two combinations of question answers (precondition, question X
 question) contingency tables. Each cell of the contingency table is the
 intersection of the users whom responded per each option per each question
 of the table.

 An overview of the problem:
 // data ingestion and typing schema: Observation (u: String, d:
 java.util.Date, t: String, q: String, v: String, a: Int)
 // a question (q) has a finite set of response options (v) per which a user
 (u) responds
 // additional response fields are not required per this test
 for (precondition a) {
   for (q_i in lex ordered questions) {
 for (q_j in lex ordered question, q_j  q_i) {
 forall v_k \in q_i get set of distinct users {u}_ik
 forall v_l \in q_j get set of distinct users {u}_jl
 forall cells per table (a,q_i,q_j) defn C_ijkl = |intersect({u}_ik,
 {u}_jl)| // contingency table construct
 compute chisq test per this contingency table and persist
 }
   }
 }

 The scala main I'm testing is provided below, and I was planning to use the
 provided example https://spark.apache.org/docs/1.3.1/mllib-statistics.html
 however I am not sure how to go from my RDD[Observation] to the necessary
 precondition of RDD[Vector] for ingestion

   def main(args: Array[String]): Unit = {
 // setup main space for test
 val conf = new SparkConf().setAppName(TestMain)
 val sc = new SparkContext(conf)

 // data ETL and typing schema
 case class Observation (u: String, d: java.util.Date, t: String, q:
 String, v: String, a: Int)
 val date_format = new java.text.SimpleDateFormat(MMdd)
 val data_project_abs_dir = /my/path/to/data/files
 val data_files = data_project_abs_dir + /part-*.gz
 val data = sc.textFile(data_files)
 val observations = data.map(line = line.split(,).map(_.trim)).map(r
 = Observation(r(0).toString, date_format.parse(r(1).toString),
 r(2).toString, r(3).toString, r(4).toString, r(5).toInt))
 observations.cache

 // ToDo: the basic keying of the space, possibly...
 val qvu = observations.map(o = ((o.a, o.q, o.v), o.u)).distinct

 // ToDo: ok, so now how to get this into the precondition RDD[Vector]
 from the Spark example to make a contingency table?...

 // ToDo: perform then persist the resulting chisq and p-value on these
 contingency tables...
   }


 Any help is appreciated.

 Thanks!  -Dan


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: MLlib - Naive Bayes Problem

2015-04-20 Thread Xiangrui Meng
Could you attach the full stack trace? Please also include the stack
trace from executors, which you can find on the Spark WebUI. -Xiangrui

On Thu, Apr 16, 2015 at 1:00 PM, riginos samarasrigi...@gmail.com wrote:
 I have a big dataset of categories of cars and descriptions of cars. So i
 want to give a description of a car and the program to classify the category
 of that car.
 So i decided to use multinomial naive Bayes. I created a unique id for each
 word and replaced my whole category,description data.

 //My input
 2,25187 15095 22608 28756 17862 29523 499 32681 9830 24957 18993 19501 16596
 17953 16596
 20,1846 29058 16252 20446 9835
 52,16861 808 26785 17874 18993 18993 18993 18269 34157 33811 18437 6004 2791
 27923 19141
 ...
 ...

 Why do I have errors like:

 //Errors

 3 ERROR Executor: Exception in task 0.0 in stage 211.0 (TID 392)
 java.lang.IndexOutOfBoundsException: 13 not in [-13,13)

 ERROR Executor: Exception in task 1.0 in stage 211.0 (TID 393)
 java.lang.IndexOutOfBoundsException: 17 not in [-17,17)

 ERROR TaskSetManager: Task 0 in stage 211.0 failed 1 times; aborting job
 org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in
 stage 211.0 failed 1 times, most recent failure: Lost task 0.0 in stage
 211.0 (TID 392, localhost): java.lang.IndexOutOfBoundsException: 13 not in
 [-13,13)

 Driver stacktrace:
 at
 org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)






 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/MLlib-Naive-Bayes-Problem-tp22531.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SQL UserDefinedType can't be saved in parquet file when using assembly jar

2015-04-20 Thread Xiangrui Meng
You should check where MyDenseVectorUDT is defined and whether it was
on the classpath (or in the assembly jar) at runtime. Make sure the
full class name (with package name) is used. Btw, UDTs are not public
yet, so please use it with caution. -Xiangrui

On Fri, Apr 17, 2015 at 12:45 AM, Jaonary Rabarisoa jaon...@gmail.com wrote:
 Dear all,

 Here is an example of code to reproduce the issue I mentioned in a previous
 mail about saving an UserDefinedType into a parquet file. The problem here
 is that the code works when I run it inside intellij idea but fails when I
 create the assembly jar and run it with spark-submit. I use the master
 version of  Spark.

 @SQLUserDefinedType(udt = classOf[MyDenseVectorUDT])
 class MyDenseVector(val data: Array[Double]) extends Serializable {
   override def equals(other: Any): Boolean = other match {
 case v: MyDenseVector =
   java.util.Arrays.equals(this.data, v.data)
 case _ = false
   }
 }

 class MyDenseVectorUDT extends UserDefinedType[MyDenseVector] {
   override def sqlType: DataType = ArrayType(DoubleType, containsNull =
 false)
   override def serialize(obj: Any): Seq[Double] = {
 obj match {
   case features: MyDenseVector =
 features.data.toSeq
 }
   }

   override def deserialize(datum: Any): MyDenseVector = {
 datum match {
   case data: Seq[_] =
 new MyDenseVector(data.asInstanceOf[Seq[Double]].toArray)
 }
   }

   override def userClass: Class[MyDenseVector] = classOf[MyDenseVector]

 }

 case class Toto(imageAnnotation: MyDenseVector)

 object TestUserDefinedType {

   case class Params(input: String = null,
partitions: Int = 12,
 outputDir: String = images.parquet)

   def main(args: Array[String]): Unit = {

 val conf = new
 SparkConf().setAppName(ImportImageFolder).setMaster(local[4])

 val sc = new SparkContext(conf)
 val sqlContext = new SQLContext(sc)

 import sqlContext.implicits._

 val rawImages = sc.parallelize((1 to 5).map(x = Toto(new
 MyDenseVector(Array[Double](x.toDouble).toDF

 rawImages.printSchema()

 rawImages.show()

 rawImages.save(toto.parquet) // This fails with assembly jar
 sc.stop()

   }
 }


 My build.sbt is as follow :

 libraryDependencies ++= Seq(
   org.apache.spark %% spark-core % sparkVersion % provided,
   org.apache.spark %% spark-sql % sparkVersion,
   org.apache.spark %% spark-mllib % sparkVersion
 )

 assemblyMergeStrategy in assembly := {
   case PathList(javax, servlet, xs @ _*) = MergeStrategy.first
   case PathList(org, apache, xs @ _*) = MergeStrategy.first
   case PathList(org, jboss, xs @ _*) = MergeStrategy.first
 //  case PathList(ps @ _*) if ps.last endsWith .html =
 MergeStrategy.first
 //  case application.conf=
 MergeStrategy.concat
   case m if m.startsWith(META-INF) = MergeStrategy.discard
   //case x =
   //  val oldStrategy = (assemblyMergeStrategy in assembly).value
   //  oldStrategy(x)
   case _ = MergeStrategy.first
 }


 As I said, this code works without problem when I execute it inside intellij
 idea. But when generate the assembly jar with sbt-assembly and

 use spark-submit I got the following error :

 15/04/17 09:34:01 INFO ParquetOutputFormat: Writer version is: PARQUET_1_0
 15/04/17 09:34:01 ERROR Executor: Exception in task 3.0 in stage 2.0 (TID 7)
 java.lang.IllegalArgumentException: Unsupported dataType:
 {type:struct,fields:[{name:imageAnnotation,type:{type:udt,class:MyDenseVectorUDT,pyClass:null,sqlType:{type:array,elementType:double,containsNull:false}},nullable:true,metadata:{}}]},
 [1.1] failure: `TimestampType' expected but `{' found

 {type:struct,fields:[{name:imageAnnotation,type:{type:udt,class:MyDenseVectorUDT,pyClass:null,sqlType:{type:array,elementType:double,containsNull:false}},nullable:true,metadata:{}}]}
 ^
   at
 org.apache.spark.sql.types.DataType$CaseClassStringParser$.apply(dataTypes.scala:163)
   at
 org.apache.spark.sql.types.DataType$.fromCaseClassString(dataTypes.scala:98)
   at
 org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$6.apply(ParquetTypes.scala:402)
   at
 org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$6.apply(ParquetTypes.scala:402)
   at scala.util.Try.getOrElse(Try.scala:77)
   at
 org.apache.spark.sql.parquet.ParquetTypesConverter$.convertFromString(ParquetTypes.scala:402)
   at
 org.apache.spark.sql.parquet.RowWriteSupport.init(ParquetTableSupport.scala:145)
   at
 parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:278)
   at
 parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252)
   at
 org.apache.spark.sql.parquet.ParquetRelation2.org$apache$spark$sql$parquet$ParquetRelation2$$writeShard$1(newParquet.scala:694)
   at
 org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:716)
   at
 

Re: multinomial and Bernoulli model in NaiveBayes

2015-04-15 Thread Xiangrui Meng
CC Leah, who added Bernoulli option to MLlib's NaiveBayes. -Xiangrui

On Wed, Apr 15, 2015 at 4:49 AM, 姜林和 linhe_ji...@163.com wrote:


 Dear meng:
 Thanks for the great work for park machine learning, and I saw the
 changes for  NaiveBayes algorithm ,
 separate the algorithm  to : multinomial model  and Bernoulli model ,but
 there be something confused me:

 the caculating of
 P(Ci) -- pi(i)
 P(j|Ci) -- theta(i,j)

 on  multinomial and Bernoulli model are all different ,I can only see
  theta(i,j)  is calculate on different way,but not pi(i)


 Bernoulli:
 the origin feature vector i of label must be 0 or 1, 1 represent word j is
 exits in Document i,

 pi(i) = (number of Documents of class C(i) + lamda)/(number of Documents
 of all class + 2*lamda  )
 theta(i)(j) = (number of Documents which j exists in class C(i) +
 lamda)/(number of Documents of class C(i) + 2*lamda  )

 Multinomial:

 pi(i) = (number of words of class C(i) + lamda)/(number of words of all
 classes + numFeatures*lamda  )
 theta(i)(j) = (number of words j in class C(i) + lamda)/(number of words
  in class C(i) + numFeatures*lamda  )

 the conparison of  two  algorithm :


 definition in Multinomial Multinomial definition in Bernoulli
 Bernoulli  pi(i) number of words of class C(i) math.log(numAllWordsOfC +
 lambda) -piLogDenom  number of Documents of class C(i) math.log(n +
 lambda) - piLogDenom  piLogDenom  number of words of all classes 
 math.log(numAllWords
 + numfeatures* lambda) number of Documents of all class math.log(numDocuments
 + 2 * lambda)  theta(i)(j)  number of words j in class C(i)
 math.log(sumTermFreqs(j) + lamda) - thetaLogDenom number of Documents
 which j exists in class C(i) theta(i)(j) = math.log(sumTermFreqs(j) +
 lamda) - thetaLogDenom  thetaLogDenom number of words  in class C(i) 
 math.log(numAllWordsOfC
 +  numfeatures*lambda) number of Documents of class C(i) math.log(n + 2 *
 lamda)

 best   regard !

 Linhe Jiang






 Linhe  Jiang





Re: org.apache.spark.ml.recommendation.ALS

2015-04-14 Thread Xiangrui Meng
Yes, I think the default Spark builds are on Scala 2.10. You need to
follow instructions at
http://spark.apache.org/docs/latest/building-spark.html#building-for-scala-211
to build 2.11 packages. -Xiangrui

On Mon, Apr 13, 2015 at 4:00 PM, Jay Katukuri jkatuk...@apple.com wrote:

 Hi Xiangrui,

 Here is the class:


 object ALSNew {

  def main (args: Array[String]) {
  val conf = new SparkConf()
   .setAppName(TrainingDataPurchase)
   .set(spark.executor.memory, 4g)



   conf.set(spark.shuffle.memoryFraction,0.65) //default is 0.2
 conf.set(spark.storage.memoryFraction,0.3)//default is 0.6





 val sc = new SparkContext(conf)
  val sqlContext = new org.apache.spark.sql.SQLContext(sc)
 import sqlContext.implicits._



  val pfile = args(0)
  val purchase=sc.textFile(pfile)




 val ratings = purchase.map ( line =
 line.split(',') match { case Array(user, item, rate) =
 (user.toInt, item.toInt, rate.toFloat)
 }).toDF()





 val rank = args(1).toInt
 val numIterations = args(2).toInt
 val regParam : Double = 0.01
 val implicitPrefs : Boolean = true
 val numUserBlocks : Int = 100
 val numItemBlocks : Int = 100
 val nonnegative : Boolean = true


 //val paramMap = ParamMap (regParam=0.01)
 //paramMap.put(numUserBlocks=100,  numItemBlocks=100)
val als = new ALS()
.setRank(rank)
   .setRegParam(regParam)
   .setImplicitPrefs(implicitPrefs)
   .setNumUserBlocks(numUserBlocks)
   .setNumItemBlocks(numItemBlocks)





 val alpha = als.getAlpha





   val model =  als.fit(ratings)





   val predictions = model.transform(ratings)
   .select(rating, prediction)
   .map { case Row(rating: Float, prediction: Float) =
 (rating.toDouble, prediction.toDouble)
   }
 val rmse =
   if (implicitPrefs) {
 // TODO: Use a better (rank-based?) evaluation metric for implicit
 feedback.
 // We limit the ratings and the predictions to interval [0, 1] and
 compute the weighted RMSE
 // with the confidence scores as weights.
 val (totalWeight, weightedSumSq) = predictions.map { case (rating,
 prediction) =
   val confidence = 1.0 + alpha * math.abs(rating)
   val rating01 = math.max(math.min(rating, 1.0), 0.0)
   val prediction01 = math.max(math.min(prediction, 1.0), 0.0)
   val err = prediction01 - rating01
   (confidence, confidence * err * err)
 }.reduce { case ((c0, e0), (c1, e1)) =
   (c0 + c1, e0 + e1)
 }
 math.sqrt(weightedSumSq /totalWeight)
   } else {
 val mse = predictions.map { case (rating, prediction) =
   val err = rating - prediction
   err * err
 }.mean()
 math.sqrt(mse)
   }



 println(Mean Squared Error =  + rmse)
  }







  }




 I am using the following in my maven build (pom.xml):


 dependencies
 dependency
   groupIdorg.scala-lang/groupId
   artifactIdscala-library/artifactId
   version2.11.2/version
 /dependency
 dependency
   groupIdorg.apache.spark/groupId
   artifactIdspark-core_2.11/artifactId
   version1.3.0/version
 /dependency



 dependency
 groupIdorg.apache.spark/groupId
 artifactIdspark-mllib_2.11/artifactId
 version1.3.0/version
/dependency
dependency
groupIdorg.apache.spark/groupId
 artifactIdspark-sql_2.11/artifactId
 version1.3.0/version
/dependency
   /dependencies


 I am using scala version 2.11.2.

 Could it be that spark-1.3.0-bin-hadoop2.4.tgz requires  a different
 version of scala ?

 Thanks,
 Jay



 On Apr 9, 2015, at 4:38 PM, Xiangrui Meng men...@gmail.com wrote:

 Could you share ALSNew.scala? Which Scala version did you use? -Xiangrui

 On Wed, Apr 8, 2015 at 4:09 PM, Jay Katukuri jkatuk...@apple.com wrote:

 Hi Xiangrui,

 I tried running this on my local machine  (laptop) and got the same error:

 Here is what I did:

 1. downloaded spark 1.30 release version (prebuilt for hadoop 2.4 and later)
 spark-1.3.0-bin-hadoop2.4.tgz.
 2. Ran the following command:

 spark-submit --class ALSNew  --master local[8] ALSNew.jar  /input_path


 The stack trace is exactly same.

 Thanks,
 Jay



 On Apr 8, 2015, at 10:47 AM, Jay Katukuri jkatuk...@apple.com wrote:

 some additional context:

 Since, I am using features of spark 1.3.0, I have downloaded spark 1.3.0 and
 used spark-submit from there.
 The cluster is still on spark-1.2.0.

 So, this looks to me that at runtime, the executors could not find some
 libraries of spark-1.3.0, even though I ran spark-submit from my downloaded
 spark-1.30.



 On Apr 6, 2015, at 1:37 PM, Jay Katukuri jkatuk...@apple.com wrote:

 Here is the command that I have used :

 spark-submit —class packagename.ALSNew --num-executors 100 --master yarn
 ALSNew.jar -jar spark-sql_2.11-1.3.0.jar hdfs://input_path

 Btw - I could run the old ALS in mllib package.





 On Apr 6, 2015, at 12:32 PM, Xiangrui Meng men...@gmail.com wrote

Re: spark ml model info

2015-04-14 Thread Xiangrui Meng
If you are using Scala/Java or
pyspark.mllib.classification.LogisticRegressionModel, you should be
able to call weights and intercept to get the model coefficients. If
you are using the pipeline API in Python, you can try
model._java_model.weights(), we are going to add a method to get the
weights directly. Btw, if you are on the master branch, you can call
model.save(sc, path) to persist models on disk and
LogisticRegression.load(sc, path) to load it back. -Xiangrui

On Tue, Apr 14, 2015 at 1:22 PM, Jianguo Li flyingfromch...@gmail.com wrote:
 Hi,

 I am training a model using the logistic regression algorithm in ML. I was
 wondering if there is any API to access the weight vectors (aka the
 co-efficients for each feature). I need those co-efficients for real time
 predictions.

 Thanks,

 Jianguo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



  1   2   3   4   5   >