Re: spark-itemsimilarity runs orders of times slower from Mahout 0.11 onwards

2016-05-04 Thread Nikaash Puri
Hi,

Ok, so another interesting result. When I compute cross-cooccurrences with
user profile attributes that have high cardinality (for instance city), the
AtB step completes in roughly 11 minutes on some data set. Now, if I do the
same calculation on a profile attribute such as gender having simply two
distinct values, the AtB step is much slower. In my case, the profile
attribute I was using had a small number of distinct values.

Could this be because of the indicator matrix no longer remaining sparse
(just venturing a guess here)?

These results are from Mahout 0.10 and Spark 1.2.0

Thank you,
Nikaash Puri


On Tue, May 3, 2016 at 6:26 AM Dmitriy Lyubimov  wrote:

> graph = graft, sorry. Graft just the AtB class into 0.12 codebase.
>
> On Mon, May 2, 2016 at 9:06 AM, Dmitriy Lyubimov 
> wrote:
>
> > ok.
> >
> > Nikaash,
> > could you perhaps do one more experiment and graph the 0.10 a'b code into
> > 0.12 code (or whatever branch you say is not working the same) so we
> could
> > quite confirm that the culprit change is indeed AB'?
> >
> > thank you very much.
> >
> > -d
> >
> > On Mon, May 2, 2016 at 3:35 AM, Nikaash Puri 
> > wrote:
> >
> >> Hi,
> >>
> >> I tried commenting out those lines and it did marginally improve the
> >> performance. Although, the 0.10 version still significantly outperforms
> it.
> >>
> >> Here is a screenshot of the saveAsTextFile job (attached as selection1).
> >> The AtB step took about 34 mins, which is significantly more than using
> >> 0.10. Similarly, the saveAsTextFile action takes about 9 mins as well.
> >>
> >> The selection2 file is a screenshot of the flatMap at AtB.scala job,
> >> which ran for 34 minutes,
> >>
> >> Also, I'm using multiple indicators. As of Mahout 0.10, the first AtB
> >> would take time, while subsequent such operations for the other
> indicators
> >> would be orders of magnitudes faster. In the current job, the subsequent
> >> AtB operations take time similar to the first one.
> >>
> >> A snapshot of my code is as follows:
> >>
> >> var existingRowIDs: Option[BiDictionary] = None
> >>
> >> // The first action named in the sequence is the "primary" action and
> begins to fill up the user dictionary
> >> for (actionDescription <- actionInput) {
> >>   // grab the path to actions
> >>   val action: IndexedDataset =
> SparkEngine.indexedDatasetDFSReadElements(
> >> actionDescription._2,
> >> schema = DefaultIndexedDatasetElementReadSchema,
> >> existingRowIDs = existingRowIDs)
> >>   existingRowIDs = Some(action.rowIDs)
> >>
> >>   ...
> >> }
> >>
> >> which seems fairly standard, so I hope I'm not making a mistake here.
> >>
> >> It looks like the 0.11 onward version is using computeAtBZipped3 for
> >> performing the multiplication in atb_nograph_mmul unlike 0.10 which was
> >> using atb_nograph. Though I'm not really sure whether that makes much
> of a
> >> difference.
> >>
> >> Thank you,
> >> Nikaash Puri
> >>
> >> On Sat, Apr 30, 2016 at 12:36 AM Pat Ferrel 
> >> wrote:
> >>
> >>> Right, will do. But Nakaash if you could just comment out those lines
> >>> and see if it has an effect it would be informative and even perhaps
> solve
> >>> your problem sooner than my changes. No great rush. Playing around with
> >>> different values, as Dmitriy says, might yield better results and for
> that
> >>> you can mess with the code or wait for my changes.
> >>>
> >>> Yeah, it’s fast enough in most cases. The main work is the optimized
> >>> A’A, A’B stuff in the BLAS optimizer Dmitriy put in. It is something
> like
> >>> 10x faster than a similar algo in Hadoop MR. This particular calc and
> >>> generalization is not in any other Spark or now Flink lib that I know
> of.
> >>>
> >>>
> >>> On Apr 29, 2016, at 11:24 AM, Dmitriy Lyubimov 
> >>> wrote:
> >>>
> >>> Nikaash,
> >>>
> >>> yes unfortunately you may need to play with parallelism for your
> >>> particular
> >>> load/cluster manually to get the best out of it. I guess Pat will be
> >>> adding
> >>> the option.
> >>>
> >>> On Fri, Apr 29, 2016 at 11:14 AM, Nikaash Puri 
> >>> wrote:
> >>>
> >>> > Hi,
> >>> >
> >>> > Sure, I’ll do some more detailed analysis of the jobs on the UI and
> >>> share
> >>> > screenshots if possible.
> >>> >
> >>> > Pat, yup, I’ll only be able to get to this on Monday, though. I’ll
> >>> comment
> >>> > out the line and see the difference in performance.
> >>> >
> >>> > Thanks so much for helping guys, I really appreciate it.
> >>> >
> >>> > Also, the algorithm implementation for LLR is extremely performant,
> at
> >>> > least as of Mahout 0.10. I ran some tests for around 61 days of data
> >>> (which
> >>> > in our case is a fair amount) and the model was built in about 20
> >>> minutes,
> >>> > which is pretty amazing. This was using a pretty decent sized
> cluster,
> >>> > though.
> >>> >
> >>> > Thank you,
> >>> > Nikaash Puri
> >>> >
> >>> > On 29-Apr-2016, at 10:18 PM, Pat Ferrel 
> wrote:
> >>> >
> >>> > There are some other changes I want to make for the

Re: spark-itemsimilarity runs orders of times slower from Mahout 0.11 onwards

2016-05-02 Thread Dmitriy Lyubimov
graph = graft, sorry. Graft just the AtB class into 0.12 codebase.

On Mon, May 2, 2016 at 9:06 AM, Dmitriy Lyubimov  wrote:

> ok.
>
> Nikaash,
> could you perhaps do one more experiment and graph the 0.10 a'b code into
> 0.12 code (or whatever branch you say is not working the same) so we could
> quite confirm that the culprit change is indeed AB'?
>
> thank you very much.
>
> -d
>
> On Mon, May 2, 2016 at 3:35 AM, Nikaash Puri 
> wrote:
>
>> Hi,
>>
>> I tried commenting out those lines and it did marginally improve the
>> performance. Although, the 0.10 version still significantly outperforms it.
>>
>> Here is a screenshot of the saveAsTextFile job (attached as selection1).
>> The AtB step took about 34 mins, which is significantly more than using
>> 0.10. Similarly, the saveAsTextFile action takes about 9 mins as well.
>>
>> The selection2 file is a screenshot of the flatMap at AtB.scala job,
>> which ran for 34 minutes,
>>
>> Also, I'm using multiple indicators. As of Mahout 0.10, the first AtB
>> would take time, while subsequent such operations for the other indicators
>> would be orders of magnitudes faster. In the current job, the subsequent
>> AtB operations take time similar to the first one.
>>
>> A snapshot of my code is as follows:
>>
>> var existingRowIDs: Option[BiDictionary] = None
>>
>> // The first action named in the sequence is the "primary" action and begins 
>> to fill up the user dictionary
>> for (actionDescription <- actionInput) {
>>   // grab the path to actions
>>   val action: IndexedDataset = SparkEngine.indexedDatasetDFSReadElements(
>> actionDescription._2,
>> schema = DefaultIndexedDatasetElementReadSchema,
>> existingRowIDs = existingRowIDs)
>>   existingRowIDs = Some(action.rowIDs)
>>
>>   ...
>> }
>>
>> which seems fairly standard, so I hope I'm not making a mistake here.
>>
>> It looks like the 0.11 onward version is using computeAtBZipped3 for
>> performing the multiplication in atb_nograph_mmul unlike 0.10 which was
>> using atb_nograph. Though I'm not really sure whether that makes much of a
>> difference.
>>
>> Thank you,
>> Nikaash Puri
>>
>> On Sat, Apr 30, 2016 at 12:36 AM Pat Ferrel 
>> wrote:
>>
>>> Right, will do. But Nakaash if you could just comment out those lines
>>> and see if it has an effect it would be informative and even perhaps solve
>>> your problem sooner than my changes. No great rush. Playing around with
>>> different values, as Dmitriy says, might yield better results and for that
>>> you can mess with the code or wait for my changes.
>>>
>>> Yeah, it’s fast enough in most cases. The main work is the optimized
>>> A’A, A’B stuff in the BLAS optimizer Dmitriy put in. It is something like
>>> 10x faster than a similar algo in Hadoop MR. This particular calc and
>>> generalization is not in any other Spark or now Flink lib that I know of.
>>>
>>>
>>> On Apr 29, 2016, at 11:24 AM, Dmitriy Lyubimov 
>>> wrote:
>>>
>>> Nikaash,
>>>
>>> yes unfortunately you may need to play with parallelism for your
>>> particular
>>> load/cluster manually to get the best out of it. I guess Pat will be
>>> adding
>>> the option.
>>>
>>> On Fri, Apr 29, 2016 at 11:14 AM, Nikaash Puri 
>>> wrote:
>>>
>>> > Hi,
>>> >
>>> > Sure, I’ll do some more detailed analysis of the jobs on the UI and
>>> share
>>> > screenshots if possible.
>>> >
>>> > Pat, yup, I’ll only be able to get to this on Monday, though. I’ll
>>> comment
>>> > out the line and see the difference in performance.
>>> >
>>> > Thanks so much for helping guys, I really appreciate it.
>>> >
>>> > Also, the algorithm implementation for LLR is extremely performant, at
>>> > least as of Mahout 0.10. I ran some tests for around 61 days of data
>>> (which
>>> > in our case is a fair amount) and the model was built in about 20
>>> minutes,
>>> > which is pretty amazing. This was using a pretty decent sized cluster,
>>> > though.
>>> >
>>> > Thank you,
>>> > Nikaash Puri
>>> >
>>> > On 29-Apr-2016, at 10:18 PM, Pat Ferrel  wrote:
>>> >
>>> > There are some other changes I want to make for the next rev so I’ll do
>>> > that.
>>> >
>>> > Nikaash, it would still be nice to verify this fixes your problem,
>>> also if
>>> > you want to create a Jira it will guarantee I don’t forget.
>>> >
>>> >
>>> > On Apr 29, 2016, at 9:23 AM, Dmitriy Lyubimov 
>>> wrote:
>>> >
>>> > yes -- i would do it as an optional option -- just like par does -- do
>>> > nothing; try auto, or try exact number of splits
>>> >
>>> > On Fri, Apr 29, 2016 at 9:15 AM, Pat Ferrel 
>>> wrote:
>>> >
>>> >> It’s certainly easy to put this in the driver, taking it out of the
>>> algo.
>>> >>
>>> >> Dmitriy, is it a candidate for an Option param to the algo? That would
>>> >> catch cases where people rely on it now (like my old DStream example)
>>> but
>>> >> easily allow it to be overridden to None to imitate pre 0.11, or
>>> passed in
>>> >> when the app knows better.
>>> >>
>>> >> Nikaash, are you in a position to comment out the .par(auto=true

Re: spark-itemsimilarity runs orders of times slower from Mahout 0.11 onwards

2016-05-02 Thread Dmitriy Lyubimov
ok.

Nikaash,
could you perhaps do one more experiment and graph the 0.10 a'b code into
0.12 code (or whatever branch you say is not working the same) so we could
quite confirm that the culprit change is indeed AB'?

thank you very much.

-d

On Mon, May 2, 2016 at 3:35 AM, Nikaash Puri  wrote:

> Hi,
>
> I tried commenting out those lines and it did marginally improve the
> performance. Although, the 0.10 version still significantly outperforms it.
>
> Here is a screenshot of the saveAsTextFile job (attached as selection1).
> The AtB step took about 34 mins, which is significantly more than using
> 0.10. Similarly, the saveAsTextFile action takes about 9 mins as well.
>
> The selection2 file is a screenshot of the flatMap at AtB.scala job, which
> ran for 34 minutes,
>
> Also, I'm using multiple indicators. As of Mahout 0.10, the first AtB
> would take time, while subsequent such operations for the other indicators
> would be orders of magnitudes faster. In the current job, the subsequent
> AtB operations take time similar to the first one.
>
> A snapshot of my code is as follows:
>
> var existingRowIDs: Option[BiDictionary] = None
>
> // The first action named in the sequence is the "primary" action and begins 
> to fill up the user dictionary
> for (actionDescription <- actionInput) {
>   // grab the path to actions
>   val action: IndexedDataset = SparkEngine.indexedDatasetDFSReadElements(
> actionDescription._2,
> schema = DefaultIndexedDatasetElementReadSchema,
> existingRowIDs = existingRowIDs)
>   existingRowIDs = Some(action.rowIDs)
>
>   ...
> }
>
> which seems fairly standard, so I hope I'm not making a mistake here.
>
> It looks like the 0.11 onward version is using computeAtBZipped3 for
> performing the multiplication in atb_nograph_mmul unlike 0.10 which was
> using atb_nograph. Though I'm not really sure whether that makes much of a
> difference.
>
> Thank you,
> Nikaash Puri
>
> On Sat, Apr 30, 2016 at 12:36 AM Pat Ferrel  wrote:
>
>> Right, will do. But Nakaash if you could just comment out those lines and
>> see if it has an effect it would be informative and even perhaps solve your
>> problem sooner than my changes. No great rush. Playing around with
>> different values, as Dmitriy says, might yield better results and for that
>> you can mess with the code or wait for my changes.
>>
>> Yeah, it’s fast enough in most cases. The main work is the optimized A’A,
>> A’B stuff in the BLAS optimizer Dmitriy put in. It is something like 10x
>> faster than a similar algo in Hadoop MR. This particular calc and
>> generalization is not in any other Spark or now Flink lib that I know of.
>>
>>
>> On Apr 29, 2016, at 11:24 AM, Dmitriy Lyubimov  wrote:
>>
>> Nikaash,
>>
>> yes unfortunately you may need to play with parallelism for your
>> particular
>> load/cluster manually to get the best out of it. I guess Pat will be
>> adding
>> the option.
>>
>> On Fri, Apr 29, 2016 at 11:14 AM, Nikaash Puri 
>> wrote:
>>
>> > Hi,
>> >
>> > Sure, I’ll do some more detailed analysis of the jobs on the UI and
>> share
>> > screenshots if possible.
>> >
>> > Pat, yup, I’ll only be able to get to this on Monday, though. I’ll
>> comment
>> > out the line and see the difference in performance.
>> >
>> > Thanks so much for helping guys, I really appreciate it.
>> >
>> > Also, the algorithm implementation for LLR is extremely performant, at
>> > least as of Mahout 0.10. I ran some tests for around 61 days of data
>> (which
>> > in our case is a fair amount) and the model was built in about 20
>> minutes,
>> > which is pretty amazing. This was using a pretty decent sized cluster,
>> > though.
>> >
>> > Thank you,
>> > Nikaash Puri
>> >
>> > On 29-Apr-2016, at 10:18 PM, Pat Ferrel  wrote:
>> >
>> > There are some other changes I want to make for the next rev so I’ll do
>> > that.
>> >
>> > Nikaash, it would still be nice to verify this fixes your problem, also
>> if
>> > you want to create a Jira it will guarantee I don’t forget.
>> >
>> >
>> > On Apr 29, 2016, at 9:23 AM, Dmitriy Lyubimov 
>> wrote:
>> >
>> > yes -- i would do it as an optional option -- just like par does -- do
>> > nothing; try auto, or try exact number of splits
>> >
>> > On Fri, Apr 29, 2016 at 9:15 AM, Pat Ferrel 
>> wrote:
>> >
>> >> It’s certainly easy to put this in the driver, taking it out of the
>> algo.
>> >>
>> >> Dmitriy, is it a candidate for an Option param to the algo? That would
>> >> catch cases where people rely on it now (like my old DStream example)
>> but
>> >> easily allow it to be overridden to None to imitate pre 0.11, or
>> passed in
>> >> when the app knows better.
>> >>
>> >> Nikaash, are you in a position to comment out the .par(auto=true) and
>> see
>> >> if it makes a difference?
>> >>
>> >>
>> >> On Apr 29, 2016, at 8:53 AM, Dmitriy Lyubimov 
>> wrote:
>> >>
>> >> can you please look into spark UI and write down how many split the job
>> >> generates in the first stage of the pipeline, or anywhere else there's

Re: spark-itemsimilarity runs orders of times slower from Mahout 0.11 onwards

2016-05-02 Thread Nikaash Puri
Hi,

I tried commenting out those lines and it did marginally improve the
performance. Although, the 0.10 version still significantly outperforms it.

Here is a screenshot of the saveAsTextFile job (attached as selection1).
The AtB step took about 34 mins, which is significantly more than using
0.10. Similarly, the saveAsTextFile action takes about 9 mins as well.

The selection2 file is a screenshot of the flatMap at AtB.scala job, which
ran for 34 minutes,

Also, I'm using multiple indicators. As of Mahout 0.10, the first AtB would
take time, while subsequent such operations for the other indicators would
be orders of magnitudes faster. In the current job, the subsequent AtB
operations take time similar to the first one.

A snapshot of my code is as follows:

var existingRowIDs: Option[BiDictionary] = None

// The first action named in the sequence is the "primary" action and
begins to fill up the user dictionary
for (actionDescription <- actionInput) {
  // grab the path to actions
  val action: IndexedDataset = SparkEngine.indexedDatasetDFSReadElements(
actionDescription._2,
schema = DefaultIndexedDatasetElementReadSchema,
existingRowIDs = existingRowIDs)
  existingRowIDs = Some(action.rowIDs)

  ...
}

which seems fairly standard, so I hope I'm not making a mistake here.

It looks like the 0.11 onward version is using computeAtBZipped3 for
performing the multiplication in atb_nograph_mmul unlike 0.10 which was
using atb_nograph. Though I'm not really sure whether that makes much of a
difference.

Thank you,
Nikaash Puri

On Sat, Apr 30, 2016 at 12:36 AM Pat Ferrel  wrote:

> Right, will do. But Nakaash if you could just comment out those lines and
> see if it has an effect it would be informative and even perhaps solve your
> problem sooner than my changes. No great rush. Playing around with
> different values, as Dmitriy says, might yield better results and for that
> you can mess with the code or wait for my changes.
>
> Yeah, it’s fast enough in most cases. The main work is the optimized A’A,
> A’B stuff in the BLAS optimizer Dmitriy put in. It is something like 10x
> faster than a similar algo in Hadoop MR. This particular calc and
> generalization is not in any other Spark or now Flink lib that I know of.
>
>
> On Apr 29, 2016, at 11:24 AM, Dmitriy Lyubimov  wrote:
>
> Nikaash,
>
> yes unfortunately you may need to play with parallelism for your particular
> load/cluster manually to get the best out of it. I guess Pat will be adding
> the option.
>
> On Fri, Apr 29, 2016 at 11:14 AM, Nikaash Puri 
> wrote:
>
> > Hi,
> >
> > Sure, I’ll do some more detailed analysis of the jobs on the UI and share
> > screenshots if possible.
> >
> > Pat, yup, I’ll only be able to get to this on Monday, though. I’ll
> comment
> > out the line and see the difference in performance.
> >
> > Thanks so much for helping guys, I really appreciate it.
> >
> > Also, the algorithm implementation for LLR is extremely performant, at
> > least as of Mahout 0.10. I ran some tests for around 61 days of data
> (which
> > in our case is a fair amount) and the model was built in about 20
> minutes,
> > which is pretty amazing. This was using a pretty decent sized cluster,
> > though.
> >
> > Thank you,
> > Nikaash Puri
> >
> > On 29-Apr-2016, at 10:18 PM, Pat Ferrel  wrote:
> >
> > There are some other changes I want to make for the next rev so I’ll do
> > that.
> >
> > Nikaash, it would still be nice to verify this fixes your problem, also
> if
> > you want to create a Jira it will guarantee I don’t forget.
> >
> >
> > On Apr 29, 2016, at 9:23 AM, Dmitriy Lyubimov  wrote:
> >
> > yes -- i would do it as an optional option -- just like par does -- do
> > nothing; try auto, or try exact number of splits
> >
> > On Fri, Apr 29, 2016 at 9:15 AM, Pat Ferrel 
> wrote:
> >
> >> It’s certainly easy to put this in the driver, taking it out of the
> algo.
> >>
> >> Dmitriy, is it a candidate for an Option param to the algo? That would
> >> catch cases where people rely on it now (like my old DStream example)
> but
> >> easily allow it to be overridden to None to imitate pre 0.11, or passed
> in
> >> when the app knows better.
> >>
> >> Nikaash, are you in a position to comment out the .par(auto=true) and
> see
> >> if it makes a difference?
> >>
> >>
> >> On Apr 29, 2016, at 8:53 AM, Dmitriy Lyubimov 
> wrote:
> >>
> >> can you please look into spark UI and write down how many split the job
> >> generates in the first stage of the pipeline, or anywhere else there's
> >> signficant variation in # of splits in both cases?
> >>
> >> the row similarity is a very short pipeline (in comparison with what
> would
> >> normally be on average). so only the first input re-splitting is
> critical.
> >>
> >> The splitting along the products is adjusted by optimizer automatically
> to
> >> match the amount of data segments observed on average in the input(s).
> >> e.g.
> >> if uyou compute val C = A %*% B and A has 500 elements per split and

Re: spark-itemsimilarity runs orders of times slower from Mahout 0.11 onwards

2016-04-29 Thread Pat Ferrel
Right, will do. But Nakaash if you could just comment out those lines and see 
if it has an effect it would be informative and even perhaps solve your problem 
sooner than my changes. No great rush. Playing around with different values, as 
Dmitriy says, might yield better results and for that you can mess with the 
code or wait for my changes.

Yeah, it’s fast enough in most cases. The main work is the optimized A’A, A’B 
stuff in the BLAS optimizer Dmitriy put in. It is something like 10x faster 
than a similar algo in Hadoop MR. This particular calc and generalization is 
not in any other Spark or now Flink lib that I know of.


On Apr 29, 2016, at 11:24 AM, Dmitriy Lyubimov  wrote:

Nikaash,

yes unfortunately you may need to play with parallelism for your particular
load/cluster manually to get the best out of it. I guess Pat will be adding
the option.

On Fri, Apr 29, 2016 at 11:14 AM, Nikaash Puri 
wrote:

> Hi,
> 
> Sure, I’ll do some more detailed analysis of the jobs on the UI and share
> screenshots if possible.
> 
> Pat, yup, I’ll only be able to get to this on Monday, though. I’ll comment
> out the line and see the difference in performance.
> 
> Thanks so much for helping guys, I really appreciate it.
> 
> Also, the algorithm implementation for LLR is extremely performant, at
> least as of Mahout 0.10. I ran some tests for around 61 days of data (which
> in our case is a fair amount) and the model was built in about 20 minutes,
> which is pretty amazing. This was using a pretty decent sized cluster,
> though.
> 
> Thank you,
> Nikaash Puri
> 
> On 29-Apr-2016, at 10:18 PM, Pat Ferrel  wrote:
> 
> There are some other changes I want to make for the next rev so I’ll do
> that.
> 
> Nikaash, it would still be nice to verify this fixes your problem, also if
> you want to create a Jira it will guarantee I don’t forget.
> 
> 
> On Apr 29, 2016, at 9:23 AM, Dmitriy Lyubimov  wrote:
> 
> yes -- i would do it as an optional option -- just like par does -- do
> nothing; try auto, or try exact number of splits
> 
> On Fri, Apr 29, 2016 at 9:15 AM, Pat Ferrel  wrote:
> 
>> It’s certainly easy to put this in the driver, taking it out of the algo.
>> 
>> Dmitriy, is it a candidate for an Option param to the algo? That would
>> catch cases where people rely on it now (like my old DStream example) but
>> easily allow it to be overridden to None to imitate pre 0.11, or passed in
>> when the app knows better.
>> 
>> Nikaash, are you in a position to comment out the .par(auto=true) and see
>> if it makes a difference?
>> 
>> 
>> On Apr 29, 2016, at 8:53 AM, Dmitriy Lyubimov  wrote:
>> 
>> can you please look into spark UI and write down how many split the job
>> generates in the first stage of the pipeline, or anywhere else there's
>> signficant variation in # of splits in both cases?
>> 
>> the row similarity is a very short pipeline (in comparison with what would
>> normally be on average). so only the first input re-splitting is critical.
>> 
>> The splitting along the products is adjusted by optimizer automatically to
>> match the amount of data segments observed on average in the input(s).
>> e.g.
>> if uyou compute val C = A %*% B and A has 500 elements per split and B has
>> 5000 elements per split then C would approximately have 5000 elements per
>> split (the larger average in binary operator cases).  That's approximately
>> how it works.
>> 
>> However, the par() that has been added, is messing with initial
>> parallelism
>> which would naturally affect the rest of pipeline per above. I now doubt
>> it
>> was a good thing -- when i suggested Pat to try this, i did not mean to
>> put
>> it _inside_ the algorithm itself, rather, into the accurate input
>> preparation code in his particular case. However, I don't think it will
>> work in any given case. Actually sweet spot parallelism for
>> multioplication
>> unfortunately depends on tons of factors -- network bandwidth and hardware
>> configuration, so it is difficult to give it a good guess universally.
>> More
>> likely, for cli-based prepackaged algorithms (I don't use CLI but rather
>> assemble pipelines in scala via scripting and scala application code) the
>> initial paralellization adjustment options should probably be provided to
>> CLI.
>> 
>> 
> 
> 
> 



Re: spark-itemsimilarity runs orders of times slower from Mahout 0.11 onwards

2016-04-29 Thread Dmitriy Lyubimov
Nikaash,

yes unfortunately you may need to play with parallelism for your particular
load/cluster manually to get the best out of it. I guess Pat will be adding
the option.

On Fri, Apr 29, 2016 at 11:14 AM, Nikaash Puri 
wrote:

> Hi,
>
> Sure, I’ll do some more detailed analysis of the jobs on the UI and share
> screenshots if possible.
>
> Pat, yup, I’ll only be able to get to this on Monday, though. I’ll comment
> out the line and see the difference in performance.
>
> Thanks so much for helping guys, I really appreciate it.
>
> Also, the algorithm implementation for LLR is extremely performant, at
> least as of Mahout 0.10. I ran some tests for around 61 days of data (which
> in our case is a fair amount) and the model was built in about 20 minutes,
> which is pretty amazing. This was using a pretty decent sized cluster,
> though.
>
> Thank you,
> Nikaash Puri
>
> On 29-Apr-2016, at 10:18 PM, Pat Ferrel  wrote:
>
> There are some other changes I want to make for the next rev so I’ll do
> that.
>
> Nikaash, it would still be nice to verify this fixes your problem, also if
> you want to create a Jira it will guarantee I don’t forget.
>
>
> On Apr 29, 2016, at 9:23 AM, Dmitriy Lyubimov  wrote:
>
> yes -- i would do it as an optional option -- just like par does -- do
> nothing; try auto, or try exact number of splits
>
> On Fri, Apr 29, 2016 at 9:15 AM, Pat Ferrel  wrote:
>
>> It’s certainly easy to put this in the driver, taking it out of the algo.
>>
>> Dmitriy, is it a candidate for an Option param to the algo? That would
>> catch cases where people rely on it now (like my old DStream example) but
>> easily allow it to be overridden to None to imitate pre 0.11, or passed in
>> when the app knows better.
>>
>> Nikaash, are you in a position to comment out the .par(auto=true) and see
>> if it makes a difference?
>>
>>
>> On Apr 29, 2016, at 8:53 AM, Dmitriy Lyubimov  wrote:
>>
>> can you please look into spark UI and write down how many split the job
>> generates in the first stage of the pipeline, or anywhere else there's
>> signficant variation in # of splits in both cases?
>>
>> the row similarity is a very short pipeline (in comparison with what would
>> normally be on average). so only the first input re-splitting is critical.
>>
>> The splitting along the products is adjusted by optimizer automatically to
>> match the amount of data segments observed on average in the input(s).
>> e.g.
>> if uyou compute val C = A %*% B and A has 500 elements per split and B has
>> 5000 elements per split then C would approximately have 5000 elements per
>> split (the larger average in binary operator cases).  That's approximately
>> how it works.
>>
>> However, the par() that has been added, is messing with initial
>> parallelism
>> which would naturally affect the rest of pipeline per above. I now doubt
>> it
>> was a good thing -- when i suggested Pat to try this, i did not mean to
>> put
>> it _inside_ the algorithm itself, rather, into the accurate input
>> preparation code in his particular case. However, I don't think it will
>> work in any given case. Actually sweet spot parallelism for
>> multioplication
>> unfortunately depends on tons of factors -- network bandwidth and hardware
>> configuration, so it is difficult to give it a good guess universally.
>> More
>> likely, for cli-based prepackaged algorithms (I don't use CLI but rather
>> assemble pipelines in scala via scripting and scala application code) the
>> initial paralellization adjustment options should probably be provided to
>> CLI.
>>
>>
>
>
>


Re: spark-itemsimilarity runs orders of times slower from Mahout 0.11 onwards

2016-04-29 Thread Nikaash Puri
Hi,

Sure, I’ll do some more detailed analysis of the jobs on the UI and share 
screenshots if possible. 

Pat, yup, I’ll only be able to get to this on Monday, though. I’ll comment out 
the line and see the difference in performance.

Thanks so much for helping guys, I really appreciate it. 

Also, the algorithm implementation for LLR is extremely performant, at least as 
of Mahout 0.10. I ran some tests for around 61 days of data (which in our case 
is a fair amount) and the model was built in about 20 minutes, which is pretty 
amazing. This was using a pretty decent sized cluster, though. 

Thank you,
Nikaash Puri
> On 29-Apr-2016, at 10:18 PM, Pat Ferrel  wrote:
> 
> There are some other changes I want to make for the next rev so I’ll do that.
> 
> Nikaash, it would still be nice to verify this fixes your problem, also if 
> you want to create a Jira it will guarantee I don’t forget.
> 
> 
> On Apr 29, 2016, at 9:23 AM, Dmitriy Lyubimov  > wrote:
> 
> yes -- i would do it as an optional option -- just like par does -- do 
> nothing; try auto, or try exact number of splits
> 
> On Fri, Apr 29, 2016 at 9:15 AM, Pat Ferrel  > wrote:
> It’s certainly easy to put this in the driver, taking it out of the algo.
> 
> Dmitriy, is it a candidate for an Option param to the algo? That would catch 
> cases where people rely on it now (like my old DStream example) but easily 
> allow it to be overridden to None to imitate pre 0.11, or passed in when the 
> app knows better.
> 
> Nikaash, are you in a position to comment out the .par(auto=true) and see if 
> it makes a difference?
> 
> 
> On Apr 29, 2016, at 8:53 AM, Dmitriy Lyubimov  > wrote:
> 
> can you please look into spark UI and write down how many split the job
> generates in the first stage of the pipeline, or anywhere else there's
> signficant variation in # of splits in both cases?
> 
> the row similarity is a very short pipeline (in comparison with what would
> normally be on average). so only the first input re-splitting is critical.
> 
> The splitting along the products is adjusted by optimizer automatically to
> match the amount of data segments observed on average in the input(s). e.g.
> if uyou compute val C = A %*% B and A has 500 elements per split and B has
> 5000 elements per split then C would approximately have 5000 elements per
> split (the larger average in binary operator cases).  That's approximately
> how it works.
> 
> However, the par() that has been added, is messing with initial parallelism
> which would naturally affect the rest of pipeline per above. I now doubt it
> was a good thing -- when i suggested Pat to try this, i did not mean to put
> it _inside_ the algorithm itself, rather, into the accurate input
> preparation code in his particular case. However, I don't think it will
> work in any given case. Actually sweet spot parallelism for multioplication
> unfortunately depends on tons of factors -- network bandwidth and hardware
> configuration, so it is difficult to give it a good guess universally. More
> likely, for cli-based prepackaged algorithms (I don't use CLI but rather
> assemble pipelines in scala via scripting and scala application code) the
> initial paralellization adjustment options should probably be provided to
> CLI.
> 
> 
> 



Re: spark-itemsimilarity runs orders of times slower from Mahout 0.11 onwards

2016-04-29 Thread Pat Ferrel
There are some other changes I want to make for the next rev so I’ll do that.

Nikaash, it would still be nice to verify this fixes your problem, also if you 
want to create a Jira it will guarantee I don’t forget.


On Apr 29, 2016, at 9:23 AM, Dmitriy Lyubimov  wrote:

yes -- i would do it as an optional option -- just like par does -- do nothing; 
try auto, or try exact number of splits

On Fri, Apr 29, 2016 at 9:15 AM, Pat Ferrel mailto:p...@occamsmachete.com>> wrote:
It’s certainly easy to put this in the driver, taking it out of the algo.

Dmitriy, is it a candidate for an Option param to the algo? That would catch 
cases where people rely on it now (like my old DStream example) but easily 
allow it to be overridden to None to imitate pre 0.11, or passed in when the 
app knows better.

Nikaash, are you in a position to comment out the .par(auto=true) and see if it 
makes a difference?


On Apr 29, 2016, at 8:53 AM, Dmitriy Lyubimov mailto:dlie...@gmail.com>> wrote:

can you please look into spark UI and write down how many split the job
generates in the first stage of the pipeline, or anywhere else there's
signficant variation in # of splits in both cases?

the row similarity is a very short pipeline (in comparison with what would
normally be on average). so only the first input re-splitting is critical.

The splitting along the products is adjusted by optimizer automatically to
match the amount of data segments observed on average in the input(s). e.g.
if uyou compute val C = A %*% B and A has 500 elements per split and B has
5000 elements per split then C would approximately have 5000 elements per
split (the larger average in binary operator cases).  That's approximately
how it works.

However, the par() that has been added, is messing with initial parallelism
which would naturally affect the rest of pipeline per above. I now doubt it
was a good thing -- when i suggested Pat to try this, i did not mean to put
it _inside_ the algorithm itself, rather, into the accurate input
preparation code in his particular case. However, I don't think it will
work in any given case. Actually sweet spot parallelism for multioplication
unfortunately depends on tons of factors -- network bandwidth and hardware
configuration, so it is difficult to give it a good guess universally. More
likely, for cli-based prepackaged algorithms (I don't use CLI but rather
assemble pipelines in scala via scripting and scala application code) the
initial paralellization adjustment options should probably be provided to
CLI.





Re: spark-itemsimilarity runs orders of times slower from Mahout 0.11 onwards

2016-04-29 Thread Dmitriy Lyubimov
yes -- i would do it as an optional option -- just like par does -- do
nothing; try auto, or try exact number of splits

On Fri, Apr 29, 2016 at 9:15 AM, Pat Ferrel  wrote:

> It’s certainly easy to put this in the driver, taking it out of the algo.
>
> Dmitriy, is it a candidate for an Option param to the algo? That would
> catch cases where people rely on it now (like my old DStream example) but
> easily allow it to be overridden to None to imitate pre 0.11, or passed in
> when the app knows better.
>
> Nikaash, are you in a position to comment out the .par(auto=true) and see
> if it makes a difference?
>
>
> On Apr 29, 2016, at 8:53 AM, Dmitriy Lyubimov  wrote:
>
> can you please look into spark UI and write down how many split the job
> generates in the first stage of the pipeline, or anywhere else there's
> signficant variation in # of splits in both cases?
>
> the row similarity is a very short pipeline (in comparison with what would
> normally be on average). so only the first input re-splitting is critical.
>
> The splitting along the products is adjusted by optimizer automatically to
> match the amount of data segments observed on average in the input(s). e.g.
> if uyou compute val C = A %*% B and A has 500 elements per split and B has
> 5000 elements per split then C would approximately have 5000 elements per
> split (the larger average in binary operator cases).  That's approximately
> how it works.
>
> However, the par() that has been added, is messing with initial parallelism
> which would naturally affect the rest of pipeline per above. I now doubt it
> was a good thing -- when i suggested Pat to try this, i did not mean to put
> it _inside_ the algorithm itself, rather, into the accurate input
> preparation code in his particular case. However, I don't think it will
> work in any given case. Actually sweet spot parallelism for multioplication
> unfortunately depends on tons of factors -- network bandwidth and hardware
> configuration, so it is difficult to give it a good guess universally. More
> likely, for cli-based prepackaged algorithms (I don't use CLI but rather
> assemble pipelines in scala via scripting and scala application code) the
> initial paralellization adjustment options should probably be provided to
> CLI.
>
>


Re: spark-itemsimilarity runs orders of times slower from Mahout 0.11 onwards

2016-04-29 Thread Pat Ferrel
It’s certainly easy to put this in the driver, taking it out of the algo. 

Dmitriy, is it a candidate for an Option param to the algo? That would catch 
cases where people rely on it now (like my old DStream example) but easily 
allow it to be overridden to None to imitate pre 0.11, or passed in when the 
app knows better.

Nikaash, are you in a position to comment out the .par(auto=true) and see if it 
makes a difference?


On Apr 29, 2016, at 8:53 AM, Dmitriy Lyubimov  wrote:

can you please look into spark UI and write down how many split the job
generates in the first stage of the pipeline, or anywhere else there's
signficant variation in # of splits in both cases?

the row similarity is a very short pipeline (in comparison with what would
normally be on average). so only the first input re-splitting is critical.

The splitting along the products is adjusted by optimizer automatically to
match the amount of data segments observed on average in the input(s). e.g.
if uyou compute val C = A %*% B and A has 500 elements per split and B has
5000 elements per split then C would approximately have 5000 elements per
split (the larger average in binary operator cases).  That's approximately
how it works.

However, the par() that has been added, is messing with initial parallelism
which would naturally affect the rest of pipeline per above. I now doubt it
was a good thing -- when i suggested Pat to try this, i did not mean to put
it _inside_ the algorithm itself, rather, into the accurate input
preparation code in his particular case. However, I don't think it will
work in any given case. Actually sweet spot parallelism for multioplication
unfortunately depends on tons of factors -- network bandwidth and hardware
configuration, so it is difficult to give it a good guess universally. More
likely, for cli-based prepackaged algorithms (I don't use CLI but rather
assemble pipelines in scala via scripting and scala application code) the
initial paralellization adjustment options should probably be provided to
CLI.



Re: spark-itemsimilarity runs orders of times slower from Mahout 0.11 onwards

2016-04-29 Thread Dmitriy Lyubimov
I was replying to Nikaash.

Sorry -- list keeps rejecting replies because of the size, i had to remove
the content

On Fri, Apr 29, 2016 at 9:05 AM, Khurrum Nasim 
wrote:

> Is that for me Dimitry ?
>
>
>
> > On Apr 29, 2016, at 11:53 AM, Dmitriy Lyubimov 
> wrote:
> >
> > can you please look into spark UI and write down how many split the job
> > generates in the first stage of the pipeline, or anywhere else there's
> > signficant variation in # of splits in both cases?
> >
> > the row similarity is a very short pipeline (in comparison with what
> would
> > normally be on average). so only the first input re-splitting is
> critical.
> >
> > The splitting along the products is adjusted by optimizer automatically
> to
> > match the amount of data segments observed on average in the input(s).
> e.g.
> > if uyou compute val C = A %*% B and A has 500 elements per split and B
> has
> > 5000 elements per split then C would approximately have 5000 elements per
> > split (the larger average in binary operator cases).  That's
> approximately
> > how it works.
> >
> > However, the par() that has been added, is messing with initial
> parallelism
> > which would naturally affect the rest of pipeline per above. I now doubt
> it
> > was a good thing -- when i suggested Pat to try this, i did not mean to
> put
> > it _inside_ the algorithm itself, rather, into the accurate input
> > preparation code in his particular case. However, I don't think it will
> > work in any given case. Actually sweet spot parallelism for
> multioplication
> > unfortunately depends on tons of factors -- network bandwidth and
> hardware
> > configuration, so it is difficult to give it a good guess universally.
> More
> > likely, for cli-based prepackaged algorithms (I don't use CLI but rather
> > assemble pipelines in scala via scripting and scala application code) the
> > initial paralellization adjustment options should probably be provided to
> > CLI.
>
>


Re: spark-itemsimilarity runs orders of times slower from Mahout 0.11 onwards

2016-04-29 Thread Khurrum Nasim
Is that for me Dimitry ?



> On Apr 29, 2016, at 11:53 AM, Dmitriy Lyubimov  wrote:
> 
> can you please look into spark UI and write down how many split the job
> generates in the first stage of the pipeline, or anywhere else there's
> signficant variation in # of splits in both cases?
> 
> the row similarity is a very short pipeline (in comparison with what would
> normally be on average). so only the first input re-splitting is critical.
> 
> The splitting along the products is adjusted by optimizer automatically to
> match the amount of data segments observed on average in the input(s). e.g.
> if uyou compute val C = A %*% B and A has 500 elements per split and B has
> 5000 elements per split then C would approximately have 5000 elements per
> split (the larger average in binary operator cases).  That's approximately
> how it works.
> 
> However, the par() that has been added, is messing with initial parallelism
> which would naturally affect the rest of pipeline per above. I now doubt it
> was a good thing -- when i suggested Pat to try this, i did not mean to put
> it _inside_ the algorithm itself, rather, into the accurate input
> preparation code in his particular case. However, I don't think it will
> work in any given case. Actually sweet spot parallelism for multioplication
> unfortunately depends on tons of factors -- network bandwidth and hardware
> configuration, so it is difficult to give it a good guess universally. More
> likely, for cli-based prepackaged algorithms (I don't use CLI but rather
> assemble pipelines in scala via scripting and scala application code) the
> initial paralellization adjustment options should probably be provided to
> CLI.



Re: spark-itemsimilarity runs orders of times slower from Mahout 0.11 onwards

2016-04-29 Thread Dmitriy Lyubimov
can you please look into spark UI and write down how many split the job
generates in the first stage of the pipeline, or anywhere else there's
signficant variation in # of splits in both cases?

the row similarity is a very short pipeline (in comparison with what would
normally be on average). so only the first input re-splitting is critical.

The splitting along the products is adjusted by optimizer automatically to
match the amount of data segments observed on average in the input(s). e.g.
if uyou compute val C = A %*% B and A has 500 elements per split and B has
5000 elements per split then C would approximately have 5000 elements per
split (the larger average in binary operator cases).  That's approximately
how it works.

However, the par() that has been added, is messing with initial parallelism
which would naturally affect the rest of pipeline per above. I now doubt it
was a good thing -- when i suggested Pat to try this, i did not mean to put
it _inside_ the algorithm itself, rather, into the accurate input
preparation code in his particular case. However, I don't think it will
work in any given case. Actually sweet spot parallelism for multioplication
unfortunately depends on tons of factors -- network bandwidth and hardware
configuration, so it is difficult to give it a good guess universally. More
likely, for cli-based prepackaged algorithms (I don't use CLI but rather
assemble pipelines in scala via scripting and scala application code) the
initial paralellization adjustment options should probably be provided to
CLI.


Re: spark-itemsimilarity runs orders of times slower from Mahout 0.11 onwards

2016-04-28 Thread Dmitriy Lyubimov
(sorry for repetition, the list rejects my previous replies due to quoted
message size)

"Auto" just reclusters the input per given _configured cluster capacity_
(there's some safe guard there though i think that doesn't blow up # of
splits if the initial number of splits is ridiculously small though, e.g.
not to recluster 2-split problem into a 300-split problem).

For some algorithms, this is appropriate.

For others such as mmul-bound (A'B) problems, there's a "sweet spot" that i
mentioned due to I/O bandwidth being function of the parallelism  -- which
technically doesn't have anything to do with available cluster capacity. It
is possible that if you do A.par(auto=true).t %*% B.par(auto=true) then you
get a worse performance with 500-task cluster than on 60-task cluster
(depending on the size of operands and product).


> On Thu, Apr 28, 2016 at 11:55 AM, Pat Ferrel 
> wrote:
>
>> Actually on your advice Dmitriy I think these changes went in about 11.
>> Before 11 par was not called. Any clue here?
>>
>> This was in relation to that issue when reading a huge number of part
>> files created by Spark Streaming, which probably trickled down to cause too
>> much parallelization. The auto=true fixed this issue for me but did it have
>> other effects?
>>
>>
>>
>>


Re: spark-itemsimilarity runs orders of times slower from Mahout 0.11 onwards

2016-04-28 Thread Pat Ferrel
Hmm, can’t get images through the Apache mail servers.

The image is here: 
https://drive.google.com/file/d/0B4cAk1SMC1ChWFZiRG9DSEpkdzg/view?usp=sharing

 
On Apr 28, 2016, at 11:55 AM, Pat Ferrel  wrote:

Actually on your advice Dmitriy I think these changes went in about 11. Before 
11 par was not called. Any clue here?

This was in relation to that issue when reading a huge number of part files 
created by Spark Streaming, which probably trickled down to cause too much 
parallelization. The auto=true fixed this issue for me but did it have other 
effects?
 




On Apr 28, 2016, at 10:12 AM, Dmitriy Lyubimov mailto:dlie...@gmail.com>> wrote:

Yes.

Parallelism in Spark makes all the difference.

Since scatter type exchnange in spark increases I/O with increase of # of
the splits, strong scalling is not achievable. if you just keep increasing
parallelism, there's a point where individual cpu load decreases but
cumulative IO cancels out any gains of the parallelism increase. So it is
important to carefully pre-split algorithms inputs using par() operator.

But assuming the same parallelization strategy before and after, release
change also probably should not affect that

-d

On Thu, Apr 28, 2016 at 6:02 AM, Nikaash Puri mailto:nikaashp...@gmail.com>> wrote:

> Hi,
> 
> Ok, so interestingly enough when I repartition my input data across
> indicators on the User IDs, I get significant speedup. This is probably
> because shuffle goes down since RDDs with the same user ids are more likely
> located on the same nodes. What’s even more interesting is the behaviour as
> a function of the number of partitions.
> 
> Concretely, in my case I was using around 20 cores. So, setting the number
> of partitions as 200 or more leads to greater shuffle and poorer
> performance. Setting the number of partitions to slightly more than the
> number of cores, 30 in my case gives significant speedups in the AtB
> calculations. Again, my guess is that shuffle is the reason.
> 
> I’ll keep experimenting and share more results.
> 
> All of these tests are with Spark 1.2.0 and Mahout 0.10.
> 
> Thank you,
> Nikaash Puri
>> On 28-Apr-2016, at 2:50 AM, Pat Ferrel > > wrote:
>> 
>> I have been using the same function through all those versions of
> Mahout. I’m running on newer versions of Spark 1.4-1.6.2. Using my datasets
> there has been no slowdown. I assume that you are only changing the Mahout
> version—leaving data, Spark, HDFS, and all config the same. In which case I
> wonder if you are somehow running into limits of your machine like memory?
> Have you allocated a fixed executor memory limit?
>> 
>> There has been almost no code change to item similarity. Dmitriy, do you
> know if the underlying AtB has changed? I seem to recall the partitioning
> was set to “auto” about 0.11. We were having problems with large numbers of
> small part files from Spark Streaming causing partitioning headaches as I
> recall. In some unexpected way the input structure was trickling down into
> partitioning decisions made in Spark.
>> 
>> The first thing I’d try is giving the job more executor memory, the
> second is to upgrade Spark. A 3x increase in execution speed is a pretty
> big deal if it isn’t helped with these easy fixes so can you share your
> data?
>> 
>> On Apr 27, 2016, at 8:37 AM, Dmitriy Lyubimov > > wrote:
>> 
>> 0.11 targets 1.3+.
>> 
>> I don't quite have anything on top of my head affecting A'B specifically,
>> but i think there were some chanages affecting in-memory multiplication
>> (which is of course used in distributed A'B).
>> 
>> I am not in particular familiar or remember details of row similarity on
>> top of my head, i really wish the original contributor would comment on
>> that. trying to see if i can come up with anything useful though.
>> 
>> what behavior do you see in this job -- cpu-bound or i/o bound?
>> 
>> there are a few pointers to look at:
>> 
>> (1)  I/O many times exceeds the input size, so spills are inevitable. So
>> tuning memory sizes and look at spark spill locations to make sure disks
>> are not slow there is critical. Also, i think in spark 1.6 spark added a
>> lot of flexibility in managing task/cache/shuffle memory sizes, it may
> help
>> in some unexpected way.
>> 
>> (2) sufficient cache: many pipelines commit reused matrices into cache
>> (MEMORY_ONLY) which is the default mahout algebra behavior, assuming
> there
>> is enough cache memory there for only good things to happen. if it is
> not,
>> however, it will cause recomputation of results that were evicted. (not
>> saying it is a known case for row similarity in particular). make sure
> this
>> is not the case. For cases of scatter type exchanges it is especially
> super
>> bad.
>> 
>> (3) A'B -- try to hack and play with implemetnation there in AtB (spark
>> side) class. See if you can come up with a better arrangement.
>> 
>> (4) in-memory computations (MMul class) if that's the

Re: spark-itemsimilarity runs orders of times slower from Mahout 0.11 onwards

2016-04-28 Thread Dmitriy Lyubimov
Yes.

Parallelism in Spark makes all the difference.

Since scatter type exchnange in spark increases I/O with increase of # of
the splits, strong scalling is not achievable. if you just keep increasing
parallelism, there's a point where individual cpu load decreases but
cumulative IO cancels out any gains of the parallelism increase. So it is
important to carefully pre-split algorithms inputs using par() operator.

But assuming the same parallelization strategy before and after, release
change also probably should not affect that

-d

On Thu, Apr 28, 2016 at 6:02 AM, Nikaash Puri  wrote:

> Hi,
>
> Ok, so interestingly enough when I repartition my input data across
> indicators on the User IDs, I get significant speedup. This is probably
> because shuffle goes down since RDDs with the same user ids are more likely
> located on the same nodes. What’s even more interesting is the behaviour as
> a function of the number of partitions.
>
> Concretely, in my case I was using around 20 cores. So, setting the number
> of partitions as 200 or more leads to greater shuffle and poorer
> performance. Setting the number of partitions to slightly more than the
> number of cores, 30 in my case gives significant speedups in the AtB
> calculations. Again, my guess is that shuffle is the reason.
>
> I’ll keep experimenting and share more results.
>
> All of these tests are with Spark 1.2.0 and Mahout 0.10.
>
> Thank you,
> Nikaash Puri
> > On 28-Apr-2016, at 2:50 AM, Pat Ferrel  wrote:
> >
> > I have been using the same function through all those versions of
> Mahout. I’m running on newer versions of Spark 1.4-1.6.2. Using my datasets
> there has been no slowdown. I assume that you are only changing the Mahout
> version—leaving data, Spark, HDFS, and all config the same. In which case I
> wonder if you are somehow running into limits of your machine like memory?
> Have you allocated a fixed executor memory limit?
> >
> > There has been almost no code change to item similarity. Dmitriy, do you
> know if the underlying AtB has changed? I seem to recall the partitioning
> was set to “auto” about 0.11. We were having problems with large numbers of
> small part files from Spark Streaming causing partitioning headaches as I
> recall. In some unexpected way the input structure was trickling down into
> partitioning decisions made in Spark.
> >
> > The first thing I’d try is giving the job more executor memory, the
> second is to upgrade Spark. A 3x increase in execution speed is a pretty
> big deal if it isn’t helped with these easy fixes so can you share your
> data?
> >
> > On Apr 27, 2016, at 8:37 AM, Dmitriy Lyubimov  wrote:
> >
> > 0.11 targets 1.3+.
> >
> > I don't quite have anything on top of my head affecting A'B specifically,
> > but i think there were some chanages affecting in-memory multiplication
> > (which is of course used in distributed A'B).
> >
> > I am not in particular familiar or remember details of row similarity on
> > top of my head, i really wish the original contributor would comment on
> > that. trying to see if i can come up with anything useful though.
> >
> > what behavior do you see in this job -- cpu-bound or i/o bound?
> >
> > there are a few pointers to look at:
> >
> > (1)  I/O many times exceeds the input size, so spills are inevitable. So
> > tuning memory sizes and look at spark spill locations to make sure disks
> > are not slow there is critical. Also, i think in spark 1.6 spark added a
> > lot of flexibility in managing task/cache/shuffle memory sizes, it may
> help
> > in some unexpected way.
> >
> > (2) sufficient cache: many pipelines commit reused matrices into cache
> > (MEMORY_ONLY) which is the default mahout algebra behavior, assuming
> there
> > is enough cache memory there for only good things to happen. if it is
> not,
> > however, it will cause recomputation of results that were evicted. (not
> > saying it is a known case for row similarity in particular). make sure
> this
> > is not the case. For cases of scatter type exchanges it is especially
> super
> > bad.
> >
> > (3) A'B -- try to hack and play with implemetnation there in AtB (spark
> > side) class. See if you can come up with a better arrangement.
> >
> > (4) in-memory computations (MMul class) if that's the bottleneck can be
> in
> > practice quick-hacked with mutlithreaded multiplication and bridge to
> > native solvers (netlib-java) at least for dense cases. this is found to
> > improve performance of distributed multiplications a bit. Works best if
> you
> > get 2 threads in the backend and all threads in the front end.
> >
> > There are other known things that can improve speed multiplication of the
> > public mahout version, i hope mahout will improve on those in the future.
> >
> > -d
> >
> > On Wed, Apr 27, 2016 at 6:14 AM, Nikaash Puri 
> wrote:
> >
> >> Hi,
> >>
> >> I’ve been working with LLR in Mahout for a while now. Mostly using the
> >> SimilarityAnalysis.cooccurenceIDss function. I recently upgraded t

Re: spark-itemsimilarity runs orders of times slower from Mahout 0.11 onwards

2016-04-28 Thread Nikaash Puri
Hi,

Ok, so interestingly enough when I repartition my input data across indicators 
on the User IDs, I get significant speedup. This is probably because shuffle 
goes down since RDDs with the same user ids are more likely located on the same 
nodes. What’s even more interesting is the behaviour as a function of the 
number of partitions. 

Concretely, in my case I was using around 20 cores. So, setting the number of 
partitions as 200 or more leads to greater shuffle and poorer performance. 
Setting the number of partitions to slightly more than the number of cores, 30 
in my case gives significant speedups in the AtB calculations. Again, my guess 
is that shuffle is the reason.

I’ll keep experimenting and share more results.

All of these tests are with Spark 1.2.0 and Mahout 0.10. 

Thank you,
Nikaash Puri
> On 28-Apr-2016, at 2:50 AM, Pat Ferrel  wrote:
> 
> I have been using the same function through all those versions of Mahout. I’m 
> running on newer versions of Spark 1.4-1.6.2. Using my datasets there has 
> been no slowdown. I assume that you are only changing the Mahout 
> version—leaving data, Spark, HDFS, and all config the same. In which case I 
> wonder if you are somehow running into limits of your machine like memory? 
> Have you allocated a fixed executor memory limit?
> 
> There has been almost no code change to item similarity. Dmitriy, do you know 
> if the underlying AtB has changed? I seem to recall the partitioning was set 
> to “auto” about 0.11. We were having problems with large numbers of small 
> part files from Spark Streaming causing partitioning headaches as I recall. 
> In some unexpected way the input structure was trickling down into 
> partitioning decisions made in Spark. 
> 
> The first thing I’d try is giving the job more executor memory, the second is 
> to upgrade Spark. A 3x increase in execution speed is a pretty big deal if it 
> isn’t helped with these easy fixes so can you share your data? 
> 
> On Apr 27, 2016, at 8:37 AM, Dmitriy Lyubimov  wrote:
> 
> 0.11 targets 1.3+.
> 
> I don't quite have anything on top of my head affecting A'B specifically,
> but i think there were some chanages affecting in-memory multiplication
> (which is of course used in distributed A'B).
> 
> I am not in particular familiar or remember details of row similarity on
> top of my head, i really wish the original contributor would comment on
> that. trying to see if i can come up with anything useful though.
> 
> what behavior do you see in this job -- cpu-bound or i/o bound?
> 
> there are a few pointers to look at:
> 
> (1)  I/O many times exceeds the input size, so spills are inevitable. So
> tuning memory sizes and look at spark spill locations to make sure disks
> are not slow there is critical. Also, i think in spark 1.6 spark added a
> lot of flexibility in managing task/cache/shuffle memory sizes, it may help
> in some unexpected way.
> 
> (2) sufficient cache: many pipelines commit reused matrices into cache
> (MEMORY_ONLY) which is the default mahout algebra behavior, assuming there
> is enough cache memory there for only good things to happen. if it is not,
> however, it will cause recomputation of results that were evicted. (not
> saying it is a known case for row similarity in particular). make sure this
> is not the case. For cases of scatter type exchanges it is especially super
> bad.
> 
> (3) A'B -- try to hack and play with implemetnation there in AtB (spark
> side) class. See if you can come up with a better arrangement.
> 
> (4) in-memory computations (MMul class) if that's the bottleneck can be in
> practice quick-hacked with mutlithreaded multiplication and bridge to
> native solvers (netlib-java) at least for dense cases. this is found to
> improve performance of distributed multiplications a bit. Works best if you
> get 2 threads in the backend and all threads in the front end.
> 
> There are other known things that can improve speed multiplication of the
> public mahout version, i hope mahout will improve on those in the future.
> 
> -d
> 
> On Wed, Apr 27, 2016 at 6:14 AM, Nikaash Puri  wrote:
> 
>> Hi,
>> 
>> I’ve been working with LLR in Mahout for a while now. Mostly using the
>> SimilarityAnalysis.cooccurenceIDss function. I recently upgraded the Mahout
>> libraries to 0.11, and subsequently also tried with 0.12 and the same
>> program is running orders of magnitude slower (at least 3x based on initial
>> analysis).
>> 
>> Looking into the tasks more carefully, comparing 0.10 and 0.11 shows that
>> the amount of Shuffle being done in 0.11 is significantly higher,
>> especially in the AtB step. This could possibly be a reason for the
>> reduction in performance.
>> 
>> Although, I am working on Spark 1.2.0. So, its possible that this could be
>> causing the problem. It works fine with Mahout 0.10.
>> 
>> Any ideas why this might be happening?
>> 
>> Thank you,
>> Nikaash Puri
> 



Re: spark-itemsimilarity runs orders of times slower from Mahout 0.11 onwards

2016-04-27 Thread Nikaash Puri
Hi Pat, Dmitriy

Thanks so much. Will run some more experiments to validate the initial
outcomes. A Spark upgrade is definitely in the pipeline and will likely
solve some of these performance issues.

Pat, yup, the tests were conducted under identical code bases and data sets
other than the Mahout version change. I'm sorry, the data is sensitive so
sharing it won't be possible.

Also, as far as I can tell, spark-itemsimilarity now uses computeAtBzipped3
instead of computeAtBzipped for AtB. Although, this is meant to speed
things up, so not sure its relevant as far as this problem is concerned.

Thank you,
Nikaash Puri

On Thu, Apr 28, 2016 at 2:50 AM Pat Ferrel  wrote:

> I have been using the same function through all those versions of Mahout.
> I’m running on newer versions of Spark 1.4-1.6.2. Using my datasets there
> has been no slowdown. I assume that you are only changing the Mahout
> version—leaving data, Spark, HDFS, and all config the same. In which case I
> wonder if you are somehow running into limits of your machine like memory?
> Have you allocated a fixed executor memory limit?
>
> There has been almost no code change to item similarity. Dmitriy, do you
> know if the underlying AtB has changed? I seem to recall the partitioning
> was set to “auto” about 0.11. We were having problems with large numbers of
> small part files from Spark Streaming causing partitioning headaches as I
> recall. In some unexpected way the input structure was trickling down into
> partitioning decisions made in Spark.
>
> The first thing I’d try is giving the job more executor memory, the second
> is to upgrade Spark. A 3x increase in execution speed is a pretty big deal
> if it isn’t helped with these easy fixes so can you share your data?
>
> On Apr 27, 2016, at 8:37 AM, Dmitriy Lyubimov  wrote:
>
> 0.11 targets 1.3+.
>
> I don't quite have anything on top of my head affecting A'B specifically,
> but i think there were some chanages affecting in-memory multiplication
> (which is of course used in distributed A'B).
>
> I am not in particular familiar or remember details of row similarity on
> top of my head, i really wish the original contributor would comment on
> that. trying to see if i can come up with anything useful though.
>
> what behavior do you see in this job -- cpu-bound or i/o bound?
>
> there are a few pointers to look at:
>
> (1)  I/O many times exceeds the input size, so spills are inevitable. So
> tuning memory sizes and look at spark spill locations to make sure disks
> are not slow there is critical. Also, i think in spark 1.6 spark added a
> lot of flexibility in managing task/cache/shuffle memory sizes, it may help
> in some unexpected way.
>
> (2) sufficient cache: many pipelines commit reused matrices into cache
> (MEMORY_ONLY) which is the default mahout algebra behavior, assuming there
> is enough cache memory there for only good things to happen. if it is not,
> however, it will cause recomputation of results that were evicted. (not
> saying it is a known case for row similarity in particular). make sure this
> is not the case. For cases of scatter type exchanges it is especially super
> bad.
>
> (3) A'B -- try to hack and play with implemetnation there in AtB (spark
> side) class. See if you can come up with a better arrangement.
>
> (4) in-memory computations (MMul class) if that's the bottleneck can be in
> practice quick-hacked with mutlithreaded multiplication and bridge to
> native solvers (netlib-java) at least for dense cases. this is found to
> improve performance of distributed multiplications a bit. Works best if you
> get 2 threads in the backend and all threads in the front end.
>
> There are other known things that can improve speed multiplication of the
> public mahout version, i hope mahout will improve on those in the future.
>
> -d
>
> On Wed, Apr 27, 2016 at 6:14 AM, Nikaash Puri 
> wrote:
>
> > Hi,
> >
> > I’ve been working with LLR in Mahout for a while now. Mostly using the
> > SimilarityAnalysis.cooccurenceIDss function. I recently upgraded the
> Mahout
> > libraries to 0.11, and subsequently also tried with 0.12 and the same
> > program is running orders of magnitude slower (at least 3x based on
> initial
> > analysis).
> >
> > Looking into the tasks more carefully, comparing 0.10 and 0.11 shows that
> > the amount of Shuffle being done in 0.11 is significantly higher,
> > especially in the AtB step. This could possibly be a reason for the
> > reduction in performance.
> >
> > Although, I am working on Spark 1.2.0. So, its possible that this could
> be
> > causing the problem. It works fine with Mahout 0.10.
> >
> > Any ideas why this might be happening?
> >
> > Thank you,
> > Nikaash Puri
>
>


Re: spark-itemsimilarity runs orders of times slower from Mahout 0.11 onwards

2016-04-27 Thread Pat Ferrel
I have been using the same function through all those versions of Mahout. I’m 
running on newer versions of Spark 1.4-1.6.2. Using my datasets there has been 
no slowdown. I assume that you are only changing the Mahout version—leaving 
data, Spark, HDFS, and all config the same. In which case I wonder if you are 
somehow running into limits of your machine like memory? Have you allocated a 
fixed executor memory limit?

There has been almost no code change to item similarity. Dmitriy, do you know 
if the underlying AtB has changed? I seem to recall the partitioning was set to 
“auto” about 0.11. We were having problems with large numbers of small part 
files from Spark Streaming causing partitioning headaches as I recall. In some 
unexpected way the input structure was trickling down into partitioning 
decisions made in Spark. 

The first thing I’d try is giving the job more executor memory, the second is 
to upgrade Spark. A 3x increase in execution speed is a pretty big deal if it 
isn’t helped with these easy fixes so can you share your data? 

On Apr 27, 2016, at 8:37 AM, Dmitriy Lyubimov  wrote:

0.11 targets 1.3+.

I don't quite have anything on top of my head affecting A'B specifically,
but i think there were some chanages affecting in-memory multiplication
(which is of course used in distributed A'B).

I am not in particular familiar or remember details of row similarity on
top of my head, i really wish the original contributor would comment on
that. trying to see if i can come up with anything useful though.

what behavior do you see in this job -- cpu-bound or i/o bound?

there are a few pointers to look at:

(1)  I/O many times exceeds the input size, so spills are inevitable. So
tuning memory sizes and look at spark spill locations to make sure disks
are not slow there is critical. Also, i think in spark 1.6 spark added a
lot of flexibility in managing task/cache/shuffle memory sizes, it may help
in some unexpected way.

(2) sufficient cache: many pipelines commit reused matrices into cache
(MEMORY_ONLY) which is the default mahout algebra behavior, assuming there
is enough cache memory there for only good things to happen. if it is not,
however, it will cause recomputation of results that were evicted. (not
saying it is a known case for row similarity in particular). make sure this
is not the case. For cases of scatter type exchanges it is especially super
bad.

(3) A'B -- try to hack and play with implemetnation there in AtB (spark
side) class. See if you can come up with a better arrangement.

(4) in-memory computations (MMul class) if that's the bottleneck can be in
practice quick-hacked with mutlithreaded multiplication and bridge to
native solvers (netlib-java) at least for dense cases. this is found to
improve performance of distributed multiplications a bit. Works best if you
get 2 threads in the backend and all threads in the front end.

There are other known things that can improve speed multiplication of the
public mahout version, i hope mahout will improve on those in the future.

-d

On Wed, Apr 27, 2016 at 6:14 AM, Nikaash Puri  wrote:

> Hi,
> 
> I’ve been working with LLR in Mahout for a while now. Mostly using the
> SimilarityAnalysis.cooccurenceIDss function. I recently upgraded the Mahout
> libraries to 0.11, and subsequently also tried with 0.12 and the same
> program is running orders of magnitude slower (at least 3x based on initial
> analysis).
> 
> Looking into the tasks more carefully, comparing 0.10 and 0.11 shows that
> the amount of Shuffle being done in 0.11 is significantly higher,
> especially in the AtB step. This could possibly be a reason for the
> reduction in performance.
> 
> Although, I am working on Spark 1.2.0. So, its possible that this could be
> causing the problem. It works fine with Mahout 0.10.
> 
> Any ideas why this might be happening?
> 
> Thank you,
> Nikaash Puri



Re: spark-itemsimilarity runs orders of times slower from Mahout 0.11 onwards

2016-04-27 Thread Dmitriy Lyubimov
0.11 targets 1.3+.

I don't quite have anything on top of my head affecting A'B specifically,
but i think there were some chanages affecting in-memory multiplication
(which is of course used in distributed A'B).

I am not in particular familiar or remember details of row similarity on
top of my head, i really wish the original contributor would comment on
that. trying to see if i can come up with anything useful though.

what behavior do you see in this job -- cpu-bound or i/o bound?

there are a few pointers to look at:

(1)  I/O many times exceeds the input size, so spills are inevitable. So
tuning memory sizes and look at spark spill locations to make sure disks
are not slow there is critical. Also, i think in spark 1.6 spark added a
lot of flexibility in managing task/cache/shuffle memory sizes, it may help
in some unexpected way.

(2) sufficient cache: many pipelines commit reused matrices into cache
(MEMORY_ONLY) which is the default mahout algebra behavior, assuming there
is enough cache memory there for only good things to happen. if it is not,
however, it will cause recomputation of results that were evicted. (not
saying it is a known case for row similarity in particular). make sure this
is not the case. For cases of scatter type exchanges it is especially super
bad.

(3) A'B -- try to hack and play with implemetnation there in AtB (spark
side) class. See if you can come up with a better arrangement.

(4) in-memory computations (MMul class) if that's the bottleneck can be in
practice quick-hacked with mutlithreaded multiplication and bridge to
native solvers (netlib-java) at least for dense cases. this is found to
improve performance of distributed multiplications a bit. Works best if you
get 2 threads in the backend and all threads in the front end.

There are other known things that can improve speed multiplication of the
public mahout version, i hope mahout will improve on those in the future.

-d

On Wed, Apr 27, 2016 at 6:14 AM, Nikaash Puri  wrote:

> Hi,
>
> I’ve been working with LLR in Mahout for a while now. Mostly using the
> SimilarityAnalysis.cooccurenceIDss function. I recently upgraded the Mahout
> libraries to 0.11, and subsequently also tried with 0.12 and the same
> program is running orders of magnitude slower (at least 3x based on initial
> analysis).
>
> Looking into the tasks more carefully, comparing 0.10 and 0.11 shows that
> the amount of Shuffle being done in 0.11 is significantly higher,
> especially in the AtB step. This could possibly be a reason for the
> reduction in performance.
>
> Although, I am working on Spark 1.2.0. So, its possible that this could be
> causing the problem. It works fine with Mahout 0.10.
>
> Any ideas why this might be happening?
>
> Thank you,
> Nikaash Puri


spark-itemsimilarity runs orders of times slower from Mahout 0.11 onwards

2016-04-27 Thread Nikaash Puri
Hi,

I’ve been working with LLR in Mahout for a while now. Mostly using the 
SimilarityAnalysis.cooccurenceIDss function. I recently upgraded the Mahout 
libraries to 0.11, and subsequently also tried with 0.12 and the same program 
is running orders of magnitude slower (at least 3x based on initial analysis). 

Looking into the tasks more carefully, comparing 0.10 and 0.11 shows that the 
amount of Shuffle being done in 0.11 is significantly higher, especially in the 
AtB step. This could possibly be a reason for the reduction in performance. 

Although, I am working on Spark 1.2.0. So, its possible that this could be 
causing the problem. It works fine with Mahout 0.10. 

Any ideas why this might be happening?

Thank you,
Nikaash Puri