Re: Reduce memory usage of UnsafeInMemorySorter

2016-12-06 Thread Reynold Xin
This is not supposed to happen. Do you have a repro?


On Tue, Dec 6, 2016 at 6:11 PM, Nicholas Chammas  wrote:

> [Re-titling thread.]
>
> OK, I see that the exception from my original email is being triggered
> from this part of UnsafeInMemorySorter:
>
> https://github.com/apache/spark/blob/v2.0.2/core/src/
> main/java/org/apache/spark/util/collection/unsafe/sort/
> UnsafeInMemorySorter.java#L209-L212
>
> So I can ask a more refined question now: How can I ensure that
> UnsafeInMemorySorter has room to insert new records? In other words, how
> can I ensure that hasSpaceForAnotherRecord() returns a true value?
>
> Do I need:
>
>- More, smaller partitions?
>- More memory per executor?
>- Some Java or Spark option enabled?
>- etc.
>
> I’m running Spark 2.0.2 on Java 7 and YARN. Would Java 8 help here?
> (Unfortunately, I cannot upgrade at this time, but it would be good to know
> regardless.)
>
> This is morphing into a user-list question, so accept my apologies. Since
> I can’t find any information anywhere else about this, and the question is
> about internals like UnsafeInMemorySorter, I hope this is OK here.
>
> Nick
>
> On Mon, Dec 5, 2016 at 9:11 AM Nicholas Chammas nicholas.cham...@gmail.com
>  wrote:
>
> I was testing out a new project at scale on Spark 2.0.2 running on YARN,
>> and my job failed with an interesting error message:
>>
>> TaskSetManager: Lost task 37.3 in stage 31.0 (TID 10684, server.host.name): 
>> java.lang.IllegalStateException: There is no space for new record
>> 05:27:09.573 at 
>> org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.insertRecord(UnsafeInMemorySorter.java:211)
>> 05:27:09.574 at 
>> org.apache.spark.sql.execution.UnsafeKVExternalSorter.(UnsafeKVExternalSorter.java:127)
>> 05:27:09.574 at 
>> org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap.destructAndCreateExternalSorter(UnsafeFixedWidthAggregationMap.java:244)
>> 05:27:09.575 at 
>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown
>>  Source)
>> 05:27:09.575 at 
>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>>  Source)
>> 05:27:09.576 at 
>> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>> 05:27:09.576 at 
>> org.apache.spark.sql.execution.WholeStageCodegenExec$anonfun$8$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>> 05:27:09.577 at 
>> scala.collection.Iterator$anon$11.hasNext(Iterator.scala:408)
>> 05:27:09.577 at 
>> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
>> 05:27:09.577 at 
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
>> 05:27:09.578 at 
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
>> 05:27:09.578 at org.apache.spark.scheduler.Task.run(Task.scala:86)
>> 05:27:09.578 at 
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>> 05:27:09.579 at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> 05:27:09.579 at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> 05:27:09.579 at java.lang.Thread.run(Thread.java:745)
>>
>> I’ve never seen this before, and searching on Google/DDG/JIRA doesn’t
>> yield any results. There are no other errors coming from that executor,
>> whether related to memory, storage space, or otherwise.
>>
>> Could this be a bug? If so, how would I narrow down the source?
>> Otherwise, how might I work around the issue?
>>
>> Nick
>> ​
>>
> ​
>


Reduce memory usage of UnsafeInMemorySorter

2016-12-06 Thread Nicholas Chammas
[Re-titling thread.]

OK, I see that the exception from my original email is being triggered from
this part of UnsafeInMemorySorter:

https://github.com/apache/spark/blob/v2.0.2/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java#L209-L212

So I can ask a more refined question now: How can I ensure that
UnsafeInMemorySorter has room to insert new records? In other words, how
can I ensure that hasSpaceForAnotherRecord() returns a true value?

Do I need:

   - More, smaller partitions?
   - More memory per executor?
   - Some Java or Spark option enabled?
   - etc.

I’m running Spark 2.0.2 on Java 7 and YARN. Would Java 8 help here?
(Unfortunately, I cannot upgrade at this time, but it would be good to know
regardless.)

This is morphing into a user-list question, so accept my apologies. Since I
can’t find any information anywhere else about this, and the question is
about internals like UnsafeInMemorySorter, I hope this is OK here.

Nick

On Mon, Dec 5, 2016 at 9:11 AM Nicholas Chammas nicholas.cham...@gmail.com
 wrote:

I was testing out a new project at scale on Spark 2.0.2 running on YARN,
> and my job failed with an interesting error message:
>
> TaskSetManager: Lost task 37.3 in stage 31.0 (TID 10684, server.host.name): 
> java.lang.IllegalStateException: There is no space for new record
> 05:27:09.573 at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.insertRecord(UnsafeInMemorySorter.java:211)
> 05:27:09.574 at 
> org.apache.spark.sql.execution.UnsafeKVExternalSorter.(UnsafeKVExternalSorter.java:127)
> 05:27:09.574 at 
> org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap.destructAndCreateExternalSorter(UnsafeFixedWidthAggregationMap.java:244)
> 05:27:09.575 at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown
>  Source)
> 05:27:09.575 at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
> 05:27:09.576 at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> 05:27:09.576 at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$anonfun$8$anon$1.hasNext(WholeStageCodegenExec.scala:370)
> 05:27:09.577 at 
> scala.collection.Iterator$anon$11.hasNext(Iterator.scala:408)
> 05:27:09.577 at 
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
> 05:27:09.577 at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
> 05:27:09.578 at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
> 05:27:09.578 at org.apache.spark.scheduler.Task.run(Task.scala:86)
> 05:27:09.578 at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> 05:27:09.579 at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> 05:27:09.579 at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> 05:27:09.579 at java.lang.Thread.run(Thread.java:745)
>
> I’ve never seen this before, and searching on Google/DDG/JIRA doesn’t
> yield any results. There are no other errors coming from that executor,
> whether related to memory, storage space, or otherwise.
>
> Could this be a bug? If so, how would I narrow down the source? Otherwise,
> how might I work around the issue?
>
> Nick
> ​
>
​


Re: [MLLIB] RankingMetrics.precisionAt

2016-12-06 Thread Maciej Szymkiewicz
This sounds much better.

Follow up question is if we should provide MAP@k, which I believe is
wider used metric.


On 12/06/2016 09:52 PM, Sean Owen wrote:
> As I understand, this might best be called "mean precision@k", not
> "mean average precision, up to k".
>
> On Tue, Dec 6, 2016 at 9:43 PM Maciej Szymkiewicz
> mailto:mszymkiew...@gmail.com>> wrote:
>
> Thank you Sean.
>
> Maybe I am just confused about the language. When I read that it
> returns "the average precision at the first k ranking positions" I
> somehow expect there will ap@k there and a the final output would
> be MAP@k not average precision at the k-th position.
>
> I guess it is not enough sleep.
>
> On 12/06/2016 02:45 AM, Sean Owen wrote:
>> I read it again and that looks like it implements mean
>> precision@k as I would expect. What is the issue?
>>
>> On Tue, Dec 6, 2016, 07:30 Maciej Szymkiewicz
>> mailto:mszymkiew...@gmail.com>> wrote:
>>
>> Hi,
>>
>> Could I ask fora fresh pair of eyes on this piece of code:
>>
>> 
>> https://github.com/apache/spark/blob/f830bb9170f6b853565d9dd30ca7418b93a54fe3/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RankingMetrics.scala#L59-L80
>>
>>   @Since("1.2.0")
>>   def precisionAt(k: Int): Double = {
>> require(k > 0, "ranking position k should be positive")
>> predictionAndLabels.map { case (pred, lab) =>
>>   val labSet = lab.toSet
>>
>>   if (labSet.nonEmpty) {
>> val n = math.min(pred.length, k)
>> var i = 0
>> var cnt = 0
>> while (i < n) {
>>   if (labSet.contains(pred(i))) {
>> cnt += 1
>>   }
>>   i += 1
>> }
>> cnt.toDouble / k
>>   } else {
>> logWarning("Empty ground truth set, check input data")
>> 0.0
>>   }
>> }.mean()
>>   }
>>
>>
>> Am I the only one who thinks this doesn't do what it claims?
>> Just for reference:
>>
>>   * 
>> https://web.archive.org/web/20120415101144/http://sas.uwaterloo.ca/stats_navigation/techreports/04WorkingPapers/2004-09.pdf
>>   * 
>> https://github.com/benhamner/Metrics/blob/master/Python/ml_metrics/average_precision.py
>>
>> -- 
>> Best,
>> Maciej
>>
>
> -- 
> Maciej Szymkiewicz
>

-- 
Maciej Szymkiewicz



Re: Can I add a new method to RDD class?

2016-12-06 Thread Teng Long
Hi Jakob, 

It seems like I’ll have to either replace the version with my custom version in 
all the pom.xml files in every subdirectory that has one and publish locally, 
or keep the version (i.e. 2.0.2) and manually remove the spark repository cache 
in ~/.ivy2 and ~/.m2 and publish spark locally, then compile my application 
with the correct version respectively to make it work. I think there has to be 
an elegant way to do this. 

> On Dec 6, 2016, at 1:07 PM, Jakob Odersky-2 [via Apache Spark Developers 
> List]  wrote:
> 
> Yes, I think changing the  property (line 29) in spark's root 
> pom.xml should be sufficient. However, keep in mind that you'll also 
> need to publish spark locally before you can access it in your test 
> application. 
> 
> On Tue, Dec 6, 2016 at 2:50 AM, Teng Long <[hidden email] 
> > wrote:
> 
> > Thank you Jokob for clearing things up for me. 
> > 
> > Before, I thought my application was compiled against my local build since 
> > I 
> > can get all the logs I just added in spark-core. But it was all along using 
> > spark downloaded from remote maven repository, and that’s why I “cannot" 
> > add 
> > new RDD methods in. 
> > 
> > How can I specify a custom version? modify version numbers in all the 
> > pom.xml file? 
> > 
> > 
> > 
> > On Dec 5, 2016, at 9:12 PM, Jakob Odersky <[hidden email] 
> > > wrote: 
> > 
> > m rdds in an "org.apache.spark" package as well 
> > 
> >
> 
> - 
> To unsubscribe e-mail: [hidden email] 
>  
> 
> 
> 
> If you reply to this email, your message will be added to the discussion 
> below:
> http://apache-spark-developers-list.1001551.n3.nabble.com/Can-I-add-a-new-method-to-RDD-class-tp20100p20151.html
>  
> 
> To unsubscribe from Can I add a new method to RDD class?, click here 
> .
> NAML 
> 




--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Can-I-add-a-new-method-to-RDD-class-tp20100p20157.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

Re: [MLLIB] RankingMetrics.precisionAt

2016-12-06 Thread Sean Owen
As I understand, this might best be called "mean precision@k", not "mean
average precision, up to k".

On Tue, Dec 6, 2016 at 9:43 PM Maciej Szymkiewicz 
wrote:

> Thank you Sean.
>
> Maybe I am just confused about the language. When I read that it returns "the
> average precision at the first k ranking positions" I somehow expect there
> will ap@k there and a the final output would be MAP@k not average
> precision at the k-th position.
>
> I guess it is not enough sleep.
> On 12/06/2016 02:45 AM, Sean Owen wrote:
>
> I read it again and that looks like it implements mean precision@k as I
> would expect. What is the issue?
>
> On Tue, Dec 6, 2016, 07:30 Maciej Szymkiewicz 
> wrote:
>
> Hi,
>
> Could I ask for a fresh pair of eyes on this piece of code:
>
>
> https://github.com/apache/spark/blob/f830bb9170f6b853565d9dd30ca7418b93a54fe3/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RankingMetrics.scala#L59-L80
>
>   @Since("1.2.0")
>   def precisionAt(k: Int): Double = {
> require(k > 0, "ranking position k should be positive")
> predictionAndLabels.map { case (pred, lab) =>
>   val labSet = lab.toSet
>
>   if (labSet.nonEmpty) {
> val n = math.min(pred.length, k)
> var i = 0
> var cnt = 0
> while (i < n) {
>   if (labSet.contains(pred(i))) {
> cnt += 1
>   }
>   i += 1
> }
> cnt.toDouble / k
>   } else {
> logWarning("Empty ground truth set, check input data")
> 0.0
>   }
> }.mean()
>   }
>
>
> Am I the only one who thinks this doesn't do what it claims? Just for
> reference:
>
>
>-
>
> https://web.archive.org/web/20120415101144/http://sas.uwaterloo.ca/stats_navigation/techreports/04WorkingPapers/2004-09.pdf
>-
>
> https://github.com/benhamner/Metrics/blob/master/Python/ml_metrics/average_precision.py
>
> --
> Best,
> Maciej
>
>
> --
> Maciej Szymkiewicz
>
>


Seeing bytecode of each task ececuted.

2016-12-06 Thread Mr rty ff
Hi
If there are some way to see the bytecode in each task that is executed by 
spark.
Thanks

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: driver in queued state and not started

2016-12-06 Thread Michael Gummelt
Client mode or cluster mode?

On Mon, Dec 5, 2016 at 10:05 PM, Yu Wei  wrote:

> Hi Guys,
>
>
> I tried to run spark on mesos cluster.
>
> However, when I tried to submit jobs via spark-submit. The driver is in
> "Queued state" and not started.
>
>
> Which should I check?
>
>
>
> Thanks,
>
> Jared, (韦煜)
> Software developer
> Interested in open source software, big data, Linux
>



-- 
Michael Gummelt
Software Engineer
Mesosphere


Re: SparkR Function for Step Wise Regression

2016-12-06 Thread Miao Wang
I tried one example on sparkR:
 
> training <- suppressWarnings(createDataFrame(iris))> step(spark.glm(training, Sepal_Width ~ Sepal_Length + Species), direction = "forward")
 
There is an error:
Error: $ operator not defined for this S4 class
 
Based on my understanding of mllib.R, I think it is not supported yet.
 
Miao
 
 
- Original message -From: Prasann modi To: u...@spark.apache.orgCc: dev@spark.apache.orgSubject: SparkR Function for Step Wise RegressionDate: Mon, Dec 5, 2016 10:50 PM 
Hello,I have an issue related to SparkR. I want to build step wise regression modelusing SparkR, is any function is there in SparkR to build those kind of model.In R function is available for step wise regression, code is given below:step(glm(formula,data,family),direction = "forward"))If SparkR have function for step wise then please let me know.Thanks & RegardPrasann Modi-To unsubscribe e-mail: dev-unsubscr...@spark.apache.org 
 


-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Spark-9487, Need some insight

2016-12-06 Thread Saikat Kanjilal
Well  other than making the code consistent whats the high level goal in doing 
this and why does it matter so much how many workers we have in different 
scenarios (pyspark versus different components of spark).  I'm ok not making 
the change and working on something else to be honest but spending hours 
troubleshooting issues in a local dev environment that doesnt resemble jenkins 
closely enough is not a productive use of time.  Would love to get input on 
next logical steps.



From: Reynold Xin 
Sent: Monday, December 5, 2016 6:44 PM
To: Saikat Kanjilal
Cc: dev@spark.apache.org
Subject: Re: Spark-9487, Need some insight

Honestly it is pretty difficult. Given the difficulty, would it still make 
sense to do that change? (the one that sets the same number of 
workers/parallelism across different languages in testing)


On Mon, Dec 5, 2016 at 3:33 PM, Saikat Kanjilal 
mailto:sxk1...@hotmail.com>> wrote:

Hello again dev community,

Ping on this, apologies for rerunning this thread but never heard from anyone, 
based on this link:  
https://wiki.jenkins-ci.org/display/JENKINS/Installing+Jenkins  I can try to 
install jenkins locally but is that really needed?


Thanks in advance.



From: Saikat Kanjilal mailto:sxk1...@hotmail.com>>
Sent: Tuesday, November 29, 2016 8:14 PM
To: dev@spark.apache.org
Subject: Spark-9487, Need some insight


Hello Spark dev community,

I took this the following jira item 
(https://github.com/apache/spark/pull/15848) and am looking for some general 
pointers, it seems that I am running into issues where things work successfully 
doing local development on my macbook pro but fail on jenkins for a multitiude 
of reasons and errors, here's an example,  if you see this build output report: 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/69297/ you 
will see the DataFrameStatSuite, now locally I am running these individual 
tests with this command: ./build/mvn test -P... -DwildcardSuites=none 
-Dtest=org.apache.spark.sql.DataFrameStatSuite. It seems that I need to 
emulate a jenkins like environment locally, this seems sort of like an 
untenable hurdle, granted that my changes involve changing the total number of 
workers in the sparkcontext and if so should I be testing my changes in an 
environment that more closely resembles jenkins.  I really want to work 
on/complete this PR but I keep getting hamstrung by a dev environment that is 
not equivalent to our CI environment.



I'm guessing/hoping I'm not the first one to run into this so some insights. 
pointers to get past this would be very appreciated , would love to keep 
contributing and hoping this is a hurdle that's overcomeable with some tweaks 
to my dev environment.



Thanks in advance.



Re: Can I add a new method to RDD class?

2016-12-06 Thread Jakob Odersky
Yes, I think changing the  property (line 29) in spark's root
pom.xml should be sufficient. However, keep in mind that you'll also
need to publish spark locally before you can access it in your test
application.

On Tue, Dec 6, 2016 at 2:50 AM, Teng Long  wrote:
> Thank you Jokob for clearing things up for me.
>
> Before, I thought my application was compiled against my local build since I
> can get all the logs I just added in spark-core. But it was all along using
> spark downloaded from remote maven repository, and that’s why I “cannot" add
> new RDD methods in.
>
> How can I specify a custom version? modify version numbers in all the
> pom.xml file?
>
>
>
> On Dec 5, 2016, at 9:12 PM, Jakob Odersky  wrote:
>
> m rdds in an "org.apache.spark" package as well
>
>

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: SPARK-18689: A proposal for priority based app scheduling utilizing linux cgroups.

2016-12-06 Thread Hegner, Travis

Steve,

I appreciate your experience and insight when dealing with large clusters at 
the data-center scale. I'm also well aware of the complex nature of schedulers, 
and that it is an area of ongoing research being done by people/companies with 
many more resources than I have. This might explain my apprehension in even 
calling this idea a *scheduler*: I wanted to avoid this exact kind of debate 
over what I want to accomplish. This is also why I mentioned that this idea 
will mostly benefit users with small clusters.

I've used many of the big named "cluster schedulers" (YARN, Mesos, and 
Kubernetes) and the main thing that they have in common is that they don't work 
well for my use case. Those systems are designed for large scale 1000+ node 
clusters, and become painful to manage in the small cluster range. Most of the 
tools that we've attempted to use don't work well for us, so we've written 
several of our own: https://github.com/trilliumit/.

It can be most easily stated by the fact that *we are not* Google, Facebook, or 
Amazon: we don't have a *data-center* of servers to manage, we barely have half 
of a rack. *We are not trying to solve the problem that you are referring to*. 
We are operating at a level that if we aren't meeting SLAs, then we could just 
buy another server to add to the cluster. I imagine that we are not alone in 
that fact either, I've seen that many of the questions on SO and on the user 
list are from others operating at a level similar to ours.

I understand that pre-emption isn't inherently a bad thing, and that these 
multi-node systems typically handle it gracefully. However, if idle CPU is 
expensive, then how much more does wasted CPU cost when a nearly complete task 
is pre-empted and has to be started over? Fortunately for me, that isn't a 
problem that I have to solve at the moment.

>Instead? Use a multi-user cluster scheduler and spin up different spark 
>instances for the different workloads

See my above comment on how well these cluster schedulers work for us. I have 
considered the avenue of multiple spark clusters, and in reality the 
infrastructure we have set up would allow me to do this relatively easily. In 
fact, in my environment, this is a similar solution to what I'm proposing, just 
managed one layer up the stack and with less flexibility. I am trying to avoid 
this solution however because it does require more overhead and maintenance. 
What if I want two spark apps to run on the same cluster at the same time, 
sharing the available CPU capacity equally? I can't accomplish that easily with 
multiple spark clusters. Also, we are a 1 to 2 man operation at this point, I 
don't have teams of ops people to task with managing as many spark clusters as 
I feel like launching.

>FWIW, it's often memory consumption that's most problematic here.

Perhaps in the use-cases you have experience with, but not currently in mine. 
In fact, my initial proposal is net yet changing the allocation of memory as a 
resource. This would still be statically allocated in a FIFO manner as long as 
memory is available on the cluster, the same way it is now.

>I would strongly encourage you to avoid this topic

Thanks for the suggestion, but I will choose how I spend my time. If I can find 
a simple solution to a problem that I face, and I'm willing to share that 
solution, I'd hope one would encourage that instead.


Perhaps I haven't yet clearly communicated what I'm trying to do. In short, *I 
am not trying to write a scheduler*: I am trying to slightly (and optionally) 
tweak the way executors are allocated and launched, so that I can more 
intuitively and more optimally utilize my small spark cluster.

Thanks,

Travis


From: Steve Loughran 
Sent: Tuesday, December 6, 2016 06:54
To: Hegner, Travis
Cc: Shuai Lin; Apache Spark Dev
Subject: Re: SPARK-18689: A proposal for priority based app scheduling 
utilizing linux cgroups.

This is essentially what the cluster schedulers do: allow different people to 
submit work with different credentials and priority; cgroups & equivalent to 
limit granted resources to requested ones. If you have pre-emption enabled, you 
can even have one job kill work off the others. Spark does recognise 
pre-emption failures and doesn't treat it as a sign of problems in the 
executor, that is: it doesn't over-react.

cluster scheduling is one of the cutting edge bits of datacentre-scale 
computing —if you are curious about what is state of the art, look at the 
Morning Paper https://blog.acolyer.org/ for coverage last week of MS and google 
work there. YARN, Mesos, Borg, whatever Amazon use, at scale it's not just 
meeting SLAs, its about how much idle CPU costs, and how expensive even a 1-2% 
drop in throughput would be.


I would strongly encourage you to avoid this topic, unless you want dive deep 
into the whole world of cluster scheduling, the debate over centralized vs 
decentralized, the idelogical one of "sho

Re: [MLLIB] RankingMetrics.precisionAt

2016-12-06 Thread Maciej Szymkiewicz
Thank you Sean.

Maybe I am just confused about the language. When I read that it returns
"the average precision at the first k ranking positions" I somehow
expect there will ap@k there and a the final output would be MAP@k not
average precision at the k-th position.

I guess it is not enough sleep.

On 12/06/2016 02:45 AM, Sean Owen wrote:
> I read it again and that looks like it implements mean precision@k as
> I would expect. What is the issue?
>
> On Tue, Dec 6, 2016, 07:30 Maciej Szymkiewicz  > wrote:
>
> Hi,
>
> Could I ask fora fresh pair of eyes on this piece of code:
>
> 
> https://github.com/apache/spark/blob/f830bb9170f6b853565d9dd30ca7418b93a54fe3/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RankingMetrics.scala#L59-L80
>
>   @Since("1.2.0")
>   def precisionAt(k: Int): Double = {
> require(k > 0, "ranking position k should be positive")
> predictionAndLabels.map { case (pred, lab) =>
>   val labSet = lab.toSet
>
>   if (labSet.nonEmpty) {
> val n = math.min(pred.length, k)
> var i = 0
> var cnt = 0
> while (i < n) {
>   if (labSet.contains(pred(i))) {
> cnt += 1
>   }
>   i += 1
> }
> cnt.toDouble / k
>   } else {
> logWarning("Empty ground truth set, check input data")
> 0.0
>   }
> }.mean()
>   }
>
>
> Am I the only one who thinks this doesn't do what it claims? Just
> for reference:
>
>   * 
> https://web.archive.org/web/20120415101144/http://sas.uwaterloo.ca/stats_navigation/techreports/04WorkingPapers/2004-09.pdf
>   * 
> https://github.com/benhamner/Metrics/blob/master/Python/ml_metrics/average_precision.py
>
> -- 
> Best,
> Maciej
>

-- 
Maciej Szymkiewicz



Re: unhelpful exception thrown on predict() when ALS trained model doesn't contain user or product?

2016-12-06 Thread chris snow
Ah cool, thanks for the link!

On 6 December 2016 at 12:25, Nick Pentreath 
wrote:

> Indeed, it's being tracked here: https://issues.apache.
> org/jira/browse/SPARK-18230 though no Pr has been opened yet.
>
>
> On Tue, 6 Dec 2016 at 13:36 chris snow  wrote:
>
>> I'm using the MatrixFactorizationModel.predict() method and encountered
>> the following exception:
>>
>> Name: java.util.NoSuchElementException
>> Message: next on empty iterator
>> StackTrace: scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
>> scala.collection.Iterator$$anon$2.next(Iterator.scala:37)
>> scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:64)
>> scala.collection.IterableLike$class.head(IterableLike.scala:91)
>> scala.collection.mutable.ArrayBuffer.scala$collection$
>> IndexedSeqOptimized$$super$head(ArrayBuffer.scala:47)
>> scala.collection.IndexedSeqOptimized$class.head(IndexedSeqOptimized.
>> scala:120)
>> scala.collection.mutable.ArrayBuffer.head(ArrayBuffer.scala:47)
>> org.apache.spark.mllib.recommendation.MatrixFactorizationModel.predict(
>> MatrixFactorizationModel.scala:81)
>> $line78.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$
>> iwC$$iwC$$iwC$$iwC$$iwC.(:74)
>> $line78.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$
>> iwC$$iwC$$iwC$$iwC.(:79)
>> $line78.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$
>> iwC$$iwC$$iwC.(:81)
>> $line78.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$
>> iwC$$iwC.(:83)
>> $line78.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$
>> iwC.(:85)
>> $line78.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<
>> init>(:87)
>> $line78.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.
>> (:89)
>> $line78.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:91)
>> $line78.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:93)
>> $line78.$read$$iwC$$iwC$$iwC$$iwC$$iwC.(:95)
>> $line78.$read$$iwC$$iwC$$iwC$$iwC.(:97)
>> $line78.$read$$iwC$$iwC$$iwC.(:99)
>> $line78.$read$$iwC$$iwC.(:101)
>> $line78.$read$$iwC.(:103)
>> $line78.$read.(:105)
>> $line78.$read$.(:109)
>> $line78.$read$.()
>> $line78.$eval$.(:7)
>> $line78.$eval$.()
>> $line78.$eval.$print()
>> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> sun.reflect.NativeMethodAccessorImpl.invoke(
>> NativeMethodAccessorImpl.java:95)
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(
>> DelegatingMethodAccessorImpl.java:55)
>> java.lang.reflect.Method.invoke(Method.java:507)
>> org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(
>> SparkIMain.scala:1065)
>> org.apache.spark.repl.SparkIMain$Request.loadAndRun(
>> SparkIMain.scala:1346)
>> org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
>> org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
>> org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
>> com.ibm.spark.interpreter.ScalaInterpreter$$anonfun$
>> interpretAddTask$1$$anonfun$apply$3.apply(ScalaInterpreter.scala:296)
>> com.ibm.spark.interpreter.ScalaInterpreter$$anonfun$
>> interpretAddTask$1$$anonfun$apply$3.apply(ScalaInterpreter.scala:291)
>> com.ibm.spark.global.StreamState$.withStreams(StreamState.scala:80)
>> com.ibm.spark.interpreter.ScalaInterpreter$$anonfun$
>> interpretAddTask$1.apply(ScalaInterpreter.scala:290)
>> com.ibm.spark.interpreter.ScalaInterpreter$$anonfun$
>> interpretAddTask$1.apply(ScalaInterpreter.scala:290)
>> com.ibm.spark.utils.TaskManager$$anonfun$add$2$$
>> anon$1.run(TaskManager.scala:123)
>> java.util.concurrent.ThreadPoolExecutor.runWorker(
>> ThreadPoolExecutor.java:1153)
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(
>> ThreadPoolExecutor.java:628)
>> java.lang.Thread.run(Thread.java:785)
>>
>> This took some debugging to figure out why I received the Exception, but
>> when looking at the predict() implementation, I seems to assume that there
>> will always be features found for the provided user and product ids:
>>
>>
>>   /** Predict the rating of one user for one product. */
>>   @Since("0.8.0")
>>   def predict(user: Int, product: Int): Double = {
>> val userVector = userFeatures.lookup(user).head
>> val productVector = productFeatures.lookup(product).head
>> blas.ddot(rank, userVector, 1, productVector, 1)
>>   }
>>
>> It would be helpful if a more useful exception was raised, e.g.
>>
>> MissingUserFeatureException : "User ID ${user} not found in model"
>> MissingProductFeatureException : "Product ID ${product} not found in
>> model"
>>
>> WDYT?
>>
>>
>>
>>


Re: unhelpful exception thrown on predict() when ALS trained model doesn't contain user or product?

2016-12-06 Thread Nick Pentreath
Indeed, it's being tracked here:
https://issues.apache.org/jira/browse/SPARK-18230 though no Pr has been
opened yet.

On Tue, 6 Dec 2016 at 13:36 chris snow  wrote:

> I'm using the MatrixFactorizationModel.predict() method and encountered
> the following exception:
>
> Name: java.util.NoSuchElementException
> Message: next on empty iterator
> StackTrace: scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
> scala.collection.Iterator$$anon$2.next(Iterator.scala:37)
> scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:64)
> scala.collection.IterableLike$class.head(IterableLike.scala:91)
>
> scala.collection.mutable.ArrayBuffer.scala$collection$IndexedSeqOptimized$$super$head(ArrayBuffer.scala:47)
>
> scala.collection.IndexedSeqOptimized$class.head(IndexedSeqOptimized.scala:120)
> scala.collection.mutable.ArrayBuffer.head(ArrayBuffer.scala:47)
>
> org.apache.spark.mllib.recommendation.MatrixFactorizationModel.predict(MatrixFactorizationModel.scala:81)
>
> $line78.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:74)
>
> $line78.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:79)
>
> $line78.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:81)
>
> $line78.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:83)
>
> $line78.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:85)
>
> $line78.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:87)
> $line78.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:89)
> $line78.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:91)
> $line78.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:93)
> $line78.$read$$iwC$$iwC$$iwC$$iwC$$iwC.(:95)
> $line78.$read$$iwC$$iwC$$iwC$$iwC.(:97)
> $line78.$read$$iwC$$iwC$$iwC.(:99)
> $line78.$read$$iwC$$iwC.(:101)
> $line78.$read$$iwC.(:103)
> $line78.$read.(:105)
> $line78.$read$.(:109)
> $line78.$read$.()
> $line78.$eval$.(:7)
> $line78.$eval$.()
> $line78.$eval.$print()
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:95)
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:55)
> java.lang.reflect.Method.invoke(Method.java:507)
> org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
> org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
> org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
> org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
> org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
>
> com.ibm.spark.interpreter.ScalaInterpreter$$anonfun$interpretAddTask$1$$anonfun$apply$3.apply(ScalaInterpreter.scala:296)
>
> com.ibm.spark.interpreter.ScalaInterpreter$$anonfun$interpretAddTask$1$$anonfun$apply$3.apply(ScalaInterpreter.scala:291)
> com.ibm.spark.global.StreamState$.withStreams(StreamState.scala:80)
>
> com.ibm.spark.interpreter.ScalaInterpreter$$anonfun$interpretAddTask$1.apply(ScalaInterpreter.scala:290)
>
> com.ibm.spark.interpreter.ScalaInterpreter$$anonfun$interpretAddTask$1.apply(ScalaInterpreter.scala:290)
>
> com.ibm.spark.utils.TaskManager$$anonfun$add$2$$anon$1.run(TaskManager.scala:123)
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153)
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> java.lang.Thread.run(Thread.java:785)
>
> This took some debugging to figure out why I received the Exception, but
> when looking at the predict() implementation, I seems to assume that there
> will always be features found for the provided user and product ids:
>
>
>   /** Predict the rating of one user for one product. */
>   @Since("0.8.0")
>   def predict(user: Int, product: Int): Double = {
> val userVector = userFeatures.lookup(user).head
> val productVector = productFeatures.lookup(product).head
> blas.ddot(rank, userVector, 1, productVector, 1)
>   }
>
> It would be helpful if a more useful exception was raised, e.g.
>
> MissingUserFeatureException : "User ID ${user} not found in model"
> MissingProductFeatureException : "Product ID ${product} not found in model"
>
> WDYT?
>
>
>
>


Re: Difference between netty and netty-all

2016-12-06 Thread Steve Loughran
Nicholas,

FYI, there's some patch for Hadoop 2.8? 2.9? to move up to Netty

https://issues.apache.org/jira/browse/HADOOP-13866
https://issues.apache.org/jira/browse/HADOOP-12854



On 5 Dec 2016, at 19:46, Nicholas Chammas 
mailto:nicholas.cham...@gmail.com>> wrote:

So if I'm running Spark 2.0.2 built against Hadoop 2.6, I should be running 
[Netty 
4.0.29.Final](https://github.com/apache/spark/blob/v2.0.2/dev/deps/spark-deps-hadoop-2.6#L141),
 right?

And since [the Netty PR I'm interested 
in](https://github.com/netty/netty/pull/5345) is tagged 4.0.37.Final, then I 
guess Spark 2.0.2 isn't using a version of Netty that includes that PR. This 
correlates with what I'm seeing in my environment (warnings related to low 
entropy followed by executor failures).

OK cool! Thanks for the pointers.

Nick

On Mon, Dec 5, 2016 at 2:18 PM Sean Owen 
mailto:so...@cloudera.com>> wrote:
netty should be Netty 3.x. It is all but unused but I couldn't manage to get 
rid of it: https://issues.apache.org/jira/browse/SPARK-17875


I don't know why there's also a dependency on netty 3.x there, except to note 
that HADOOP-12928 covers keeping it in sync with ZK. Cutting it entirely would 
achieve that. Patches there welcome; anything that cuts a dependency is less 
traumatic than those which increment them, which is why I'm happy Hadoop had 
just got rid of Jackson 1.9.x entirely.

netty-all should be 4.x, actually used.


On Tue, Dec 6, 2016 at 12:54 AM Nicholas Chammas 
mailto:nicholas.cham...@gmail.com>> wrote:

I’m looking at the list of dependencies here:

https://github.com/apache/spark/search?l=Groff&q=netty&type=Code&utf8=%E2%9C%93

What’s the difference between netty and netty-all?

The reason I ask is because I’m looking at a Netty 
PR and trying to figure out if Spark 
2.0.2 is using a version of Netty that includes that PR or not.

Nick

​



Re: SPARK-18689: A proposal for priority based app scheduling utilizing linux cgroups.

2016-12-06 Thread Steve Loughran
This is essentially what the cluster schedulers do: allow different people to 
submit work with different credentials and priority; cgroups & equivalent to 
limit granted resources to requested ones. If you have pre-emption enabled, you 
can even have one job kill work off the others. Spark does recognise 
pre-emption failures and doesn't treat it as a sign of problems in the 
executor, that is: it doesn't over-react.

cluster scheduling is one of the cutting edge bits of datacentre-scale 
computing —if you are curious about what is state of the art, look at the 
Morning Paper https://blog.acolyer.org/ for coverage last week of MS and google 
work there. YARN, Mesos, Borg, whatever Amazon use, at scale it's not just 
meeting SLAs, its about how much idle CPU costs, and how expensive even a 1-2% 
drop in throughput would be.

I would strongly encourage you to avoid this topic, unless you want dive deep 
into the whole world of cluster scheduling, the debate over centralized vs 
decentralized, the idelogical one of "should services ever get allocated 
RAM/CPU in times of low overall load?", the challenge of swap, or more 
specifically, "how do you throttle memory consumption", as well as what to do 
when the IO load of a service is actually incurred on a completely different 
host from the one your work is running on.

There's also a fair amount of engineering work; to get a hint download the 
Hadoop tree and look at 
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux
 for the cgroup support, and then 
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl
 for the native code needed alongside this. Then consider that it's not just a 
matter of writing something similar, it's getting an OSS project to actually 
commit to maintaining such code after you provide that initial contribution.

Instead? Use a multi-user cluster scheduler and spin up different spark 
instances for the different workloads, with different CPU & memory limits, 
queue priorities, etc. Other people have done the work, written the tests, 
deployed it in production, met their own SLAs *and are therefore committed to 
maintaining this stuff*.

-Steve

On 5 Dec 2016, at 15:36, Hegner, Travis 
mailto:theg...@trilliumit.com>> wrote:

My apologies, in my excitement of finding a rather simple way to accomplish the 
scheduling goal I have in mind, I hastily jumped straight into a technical 
solution, without explaining that goal, or the problem it's attempting to solve.

You are correct that I'm looking for an additional running mode for the 
standalone scheduler. Perhaps you could/should classify it as a different 
scheduler, but I don't want to give the impression that this will be as 
difficult to implement as most schedulers are. Initially, from a memory 
perspective, we would still allocate in a FIFO manner. This new scheduling mode 
(or new scheduler, if you'd rather) would mostly benefit any users with 
small-ish clusters, both on-premise and cloud based. Essentially, my end goal 
is to be able to run multiple *applications* simultaneously with each 
application having *access* to the entire core count of the cluster.

I have a very cpu intensive application that I'd like to run weekly. I have a 
second application that I'd like to run hourly. The hourly application is more 
time critical (higher priority), so I'd like it to finish in a small amount of 
time. If I allow the first app to run with all cores (this takes several days 
on my 64 core cluster), then nothing else can be executed when running with the 
default FIFO scheduler. All of the cores have been allocated to the first 
application, and it will not release them until it is finished. Dynamic 
allocation does not help in this case, as there is always a backlog of tasks to 
run until the first application is nearing the end anyway. Naturally, I could 
just limit the number of cores that the first application has access to, but 
then I have idle cpu time when the second app is not running, and that is not 
optimal. Secondly in that case, the second application only has access to the 
*leftover* cores that the first app has not allocated, and will take a 
considerably longer amount of time to run.

You could also imagine a scenario where a developer has a spark-shell running 
without specifying the number of cores they want to utilize (whether 
intentionally or not). As I'm sure you know, the default is to allocate the 
entire cluster to this application. The cores allocated to this shell are 
unavailable to other applications, even if they are just sitting idle while a 
developer is getting their environment set up to run a very big job 
interactively. Other developers that would like to launch interactive shells 
are stuck waiting for the first one to exit their shell.

My proposal would eliminate this st

unhelpful exception thrown on predict() when ALS trained model doesn't contain user or product?

2016-12-06 Thread chris snow
I'm using the MatrixFactorizationModel.predict() method and encountered the
following exception:

Name: java.util.NoSuchElementException
Message: next on empty iterator
StackTrace: scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
scala.collection.Iterator$$anon$2.next(Iterator.scala:37)
scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:64)
scala.collection.IterableLike$class.head(IterableLike.scala:91)
scala.collection.mutable.ArrayBuffer.scala$collection$IndexedSeqOptimized$$super$head(ArrayBuffer.scala:47)
scala.collection.IndexedSeqOptimized$class.head(IndexedSeqOptimized.scala:120)
scala.collection.mutable.ArrayBuffer.head(ArrayBuffer.scala:47)
org.apache.spark.mllib.recommendation.MatrixFactorizationModel.predict(MatrixFactorizationModel.scala:81)
$line78.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:74)
$line78.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:79)
$line78.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:81)
$line78.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:83)
$line78.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:85)
$line78.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:87)
$line78.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:89)
$line78.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:91)
$line78.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:93)
$line78.$read$$iwC$$iwC$$iwC$$iwC$$iwC.(:95)
$line78.$read$$iwC$$iwC$$iwC$$iwC.(:97)
$line78.$read$$iwC$$iwC$$iwC.(:99)
$line78.$read$$iwC$$iwC.(:101)
$line78.$read$$iwC.(:103)
$line78.$read.(:105)
$line78.$read$.(:109)
$line78.$read$.()
$line78.$eval$.(:7)
$line78.$eval$.()
$line78.$eval.$print()
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:95)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:55)
java.lang.reflect.Method.invoke(Method.java:507)
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
com.ibm.spark.interpreter.ScalaInterpreter$$anonfun$interpretAddTask$1$$anonfun$apply$3.apply(ScalaInterpreter.scala:296)
com.ibm.spark.interpreter.ScalaInterpreter$$anonfun$interpretAddTask$1$$anonfun$apply$3.apply(ScalaInterpreter.scala:291)
com.ibm.spark.global.StreamState$.withStreams(StreamState.scala:80)
com.ibm.spark.interpreter.ScalaInterpreter$$anonfun$interpretAddTask$1.apply(ScalaInterpreter.scala:290)
com.ibm.spark.interpreter.ScalaInterpreter$$anonfun$interpretAddTask$1.apply(ScalaInterpreter.scala:290)
com.ibm.spark.utils.TaskManager$$anonfun$add$2$$anon$1.run(TaskManager.scala:123)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
java.lang.Thread.run(Thread.java:785)

This took some debugging to figure out why I received the Exception, but
when looking at the predict() implementation, I seems to assume that there
will always be features found for the provided user and product ids:


  /** Predict the rating of one user for one product. */
  @Since("0.8.0")
  def predict(user: Int, product: Int): Double = {
val userVector = userFeatures.lookup(user).head
val productVector = productFeatures.lookup(product).head
blas.ddot(rank, userVector, 1, productVector, 1)
  }

It would be helpful if a more useful exception was raised, e.g.

MissingUserFeatureException : "User ID ${user} not found in model"
MissingProductFeatureException : "Product ID ${product} not found in model"

WDYT?


Re: Spark-9487, Need some insight

2016-12-06 Thread Steve Loughran
jenkins uses SBT, so you need to do the test run there. They are different, and 
have different test runners in particular.


On 30 Nov 2016, at 04:14, Saikat Kanjilal 
mailto:sxk1...@hotmail.com>> wrote:

Hello Spark dev community,
I took this the following jira item 
(https://github.com/apache/spark/pull/15848) and am looking for some general 
pointers, it seems that I am running into issues where things work successfully 
doing local development on my macbook pro but fail on jenkins for a multitiude 
of reasons and errors, here's an example,  if you see this build output report: 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/69297/ you 
will see the DataFrameStatSuite, now locally I am running these individual 
tests with this command: ./build/mvn test -P... -DwildcardSuites=none 
-Dtest=org.apache.spark.sql.DataFrameStatSuite. It seems that I need to 
emulate a jenkins like environment locally, this seems sort of like an 
untenable hurdle, granted that my changes involve changing the total number of 
workers in the sparkcontext and if so should I be testing my changes in an 
environment that more closely resembles jenkins.  I really want to work 
on/complete this PR but I keep getting hamstrung by a dev environment that is 
not equivalent to our CI environment.

There's always the option of creating a linux VM/container with jenkins in it; 
there's a nice trick there in which you can have it watch a git branch, and 
have it kick off a run whenever you push up to it. That way, you have your own 
personal jenkins to do the full regression tests, while you yourself work on a 
small bit.



I'm guessing/hoping I'm not the first one to run into this so some insights. 
pointers to get past this would be very appreciated , would love to keep 
contributing and hoping this is a hurdle that's overcomeable with some tweaks 
to my dev environment.


Thanks in advance.



Re: Can I add a new method to RDD class?

2016-12-06 Thread Teng Long
Thank you Jokob for clearing things up for me. 

Before, I thought my application was compiled against my local build since I 
can get all the logs I just added in spark-core. But it was all along using 
spark downloaded from remote maven repository, and that’s why I “cannot" add 
new RDD methods in. 

How can I specify a custom version? modify version numbers in all the pom.xml 
file?

 
> On Dec 5, 2016, at 9:12 PM, Jakob Odersky  wrote:
> 
> m rdds in an "org.apache.spark" package as well