Re: GraphX: ShortestPaths does not terminate on a grid graph

2015-02-03 Thread Jay Hutfles
I think this is a separate issue with how the EdgeRDDImpl partitions
edges.  If you can merge this change in and rebuild, it should work:

   https://github.com/apache/spark/pull/4136/files

If you can't, I just called the Graph.partitonBy() method right after
construction my graph but before performing any operations on it.  That
way, the EdgeRDDImpl class doesn't have to use the default partitioner.

Hope this helps!
   Jay

On Tue Feb 03 2015 at 12:35:14 AM NicolasC nicolas.ch...@inria.fr wrote:

 On 01/29/2015 08:31 PM, Ankur Dave wrote:
  Thanks for the reminder. I just created a PR:
  https://github.com/apache/spark/pull/4273
  Ankur
 

 Hello,

 Thanks for the patch. I applied it on Pregel.scala (in Spark 1.2.0
 sources) and rebuilt
 Spark. During execution, at the 25th iteration of Pregel, checkpointing is
 done and then
 it throws the following exception :

 Exception in thread main org.apache.spark.SparkException: Checkpoint
 RDD CheckpointRDD[521] at reduce at VertexRDDImpl.scala:80(0) has
 different number of partitions than original RDD VertexRDD
 ZippedPartitionsRDD2[518] at zipPartitions at VertexRDDImpl.scala:170(2)
 at org.apache.spark.rdd.RDDCheckpointData.doCheckpoint(
 RDDCheckpointData.scala:98)
 at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1279)
 at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(
 RDD.scala:1281)
 at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(
 RDD.scala:1281)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1281)
 at org.apache.spark.SparkContext.runJob(SparkContext.scala:1285)
 at org.apache.spark.SparkContext.runJob(SparkContext.scala:1351)
 at org.apache.spark.rdd.RDD.reduce(RDD.scala:867)
 at org.apache.spark.graphx.impl.VertexRDDImpl.count(
 VertexRDDImpl.scala:80)
 at org.apache.spark.graphx.Pregel$.apply(Pregel.scala:155)
 at org.apache.spark.graphx.lib.ShortestPaths$.run(
 ShortestPaths.scala:69)
 

 Pregel.scala:155 is the following line in the pregel loop:

activeMessages = messages.count()


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: GraphX: ShortestPaths does not terminate on a grid graph

2015-01-29 Thread Jay Hutfles
Just curious, is this set to be merged at some point?

On Thu Jan 22 2015 at 4:34:46 PM Ankur Dave ankurd...@gmail.com wrote:

 At 2015-01-22 02:06:37 -0800, NicolasC nicolas.ch...@inria.fr wrote:
  I try to execute a simple program that runs the ShortestPaths algorithm
  (org.apache.spark.graphx.lib.ShortestPaths) on a small grid graph.
  I use Spark 1.2.0 downloaded from spark.apache.org.
 
  This program runs more than 2 hours when the grid size is 70x70 as
 above, and is then killed
  by the resource manager of the cluster (Torque). After a 5-6 minutes of
 execution, the
  Spark master UI does not even respond.
 
  For a grid size of 30x30, the program terminates in about 20 seconds,
 and for a grid size
  of 50x50 it finishes in about 80 seconds. The problem appears for a grid
 size of 70x70 and
  above.

 Unfortunately this problem is due to a Spark bug. In later iterations of
 iterative algorithms, the lineage maintained for fault tolerance grows long
 and causes Spark to consume increasing amounts of resources for scheduling
 and task serialization.

 The workaround is to checkpoint the graph periodically, which writes it to
 stable storage and interrupts the lineage chain before it grows too long.

 If you're able to recompile Spark, you can do this by applying the patch
 to GraphX at the end of this mail, and before running graph algorithms,
 calling

 sc.setCheckpointDir(/tmp)

 to set the checkpoint directory as desired.

 Ankur

 === patch begins here ===

 diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
 b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
 index 5e55620..1fbbb87 100644
 --- a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
 +++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
 @@ -126,6 +126,8 @@ object Pregel extends Logging {
  // Loop
  var prevG: Graph[VD, ED] = null
  var i = 0
 +val checkpoint = g.vertices.context.getCheckpointDir.nonEmpty
 +val checkpointFrequency = 25
  while (activeMessages  0  i  maxIterations) {
// Receive the messages. Vertices that didn't get any messages do
 not appear in newVerts.
val newVerts = g.vertices.innerJoin(messages)(vprog).cache()
 @@ -139,6 +141,14 @@ object Pregel extends Logging {
// get to send messages. We must cache messages so it can be
 materialized on the next line,
// allowing us to uncache the previous iteration.
messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts,
 activeDirection))).cache()
 +
 +  if (checkpoint  i % checkpointFrequency == checkpointFrequency -
 1) {
 +logInfo(Checkpointing in iteration  + i)
 +g.vertices.checkpoint()
 +g.edges.checkpoint()
 +messages.checkpoint()
 +  }
 +
// The call to count() materializes `messages`, `newVerts`, and the
 vertices of `g`. This
// hides oldMessages (depended on by newVerts), newVerts (depended
 on by messages), and the
// vertices of prevG (depended on by newVerts, oldMessages, and the
 vertices of g).

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




spark-shell bug with RDD distinct?

2014-12-19 Thread Jay Hutfles
Found a problem in the spark-shell, but can't confirm that it's related to
open issues on Spark's JIRA page.  I was wondering if anyone could help
identify if this is an issue or if it's already being addressed.

Test:  (in spark-shell)
case class Person(name: String, age: Int)
val peopleList = List(Person(Alice, 35), Person(Bob, 47),
Person(Alice, 35), Person(Bob, 15))
val peopleRDD = sc.parallelize(peopleList)
assert(peopleList.distinct.size == peopleRDD.distinct.count)


At first I thought it was related to issue SPARK-2620 (
https://issues.apache.org/jira/browse/SPARK-2620), which says case classes
can't be used as keys in spark-shell due to how case classes are compiled
by the REPL.  It lists .reduceByKey, .groupByKey and .distinct as being
affected.  But the associated pull request for adding tests to cover this (
https://github.com/apache/spark/pull/1588) was closed.

Is this something I just have to live with when using the REPL?  Or is this
covered by something bigger that's being addressed?

Thanks in advance
   -Jay


spark-shell bug with RDDs and case classes?

2014-12-19 Thread Jay Hutfles
Found a problem in the spark-shell, but can't confirm that it's related to
open issues on Spark's JIRA page.  I was wondering if anyone could help
identify if this is an issue or if it's already being addressed.

Test:  (in spark-shell)
case class Person(name: String, age: Int)
val peopleList = List(Person(Alice, 35), Person(Bob, 47),
Person(Alice, 35), Person(Bob, 15))
val peopleRDD = sc.parallelize(peopleList)
assert(peopleList.distinct.size == peopleRDD.distinct.count) 


At first I thought it was related to issue SPARK-2620
(https://issues.apache.org/jira/browse/SPARK-2620), which says case classes
can't be used as keys in spark-shell due to how case classes are compiled by
the REPL.  It lists .reduceByKey, .groupByKey and .distinct as being
affected.  But the associated pull request for adding tests to cover this
(https://github.com/apache/spark/pull/1588) was closed.  

Is this something I just have to live with when using the REPL?  Or is this
covered by something bigger that's being addressed?

Thanks in advance



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-shell-bug-with-RDDs-and-case-classes-tp20789.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



GraphX Pregel halting condition

2014-12-03 Thread Jay Hutfles
I'm trying to implement a graph algorithm that does a form of path
searching.  Once a certain criteria is met on any path in the graph, I
wanted to halt the rest of the iterations.  But I can't see how to do that
with the Pregel API, since any vertex isn't able to know the state of other
arbitrary vertex (if they're not adjacent).

Is there a common pattern for doing something like this?  I was thinking of
using a custom accumulator where the zero is true and the addInPlace is a
boolean or.  Each vertex (as part of its vprog) could add to the
accumulator, and once a path is found which meets the condition, the
accumulator would then have a value of false.  But since workers can't read
accumulators, I don't see how to use that when knowing whether to iterate
again.  That is, unless I reimplement the Pregel class with the added check
when iterating...

Any suggestions?  Thanks in advance!


GraphX Pregel halting condition

2014-12-03 Thread Jay Hutfles
I'm trying to implement a graph algorithm that does a form of path
searching.  Once a certain criteria is met on any path in the graph, I
wanted to halt the rest of the iterations.  But I can't see how to do that
with the Pregel API, since any vertex isn't able to know the state of other
arbitrary vertex (if they're not adjacent).

Is there a common pattern for doing something like this?  I was thinking of
using a custom accumulator where the zero is true and the addInPlace is a
boolean or.  Each vertex (as part of its vprog) could add to the
accumulator, and once a path is found which meets the condition, the
accumulator would then have a value of false.  But since workers can't read
accumulators, I don't see how to use that when knowing whether to iterate
again.  That is, unless I reimplement the Pregel class with the added check
when iterating...

Any suggestions?  Thanks in advance!


Re: questions about MLLib recommendation models

2014-08-08 Thread Jay Hutfles
Ah, that makes perfect sense.  Thanks for the concise explanation!


On Thu, Aug 7, 2014 at 9:14 PM, Xiangrui Meng men...@gmail.com wrote:

 ratings.map{ case Rating(u,m,r) = {
 val pred = model.predict(u, m)
 (r - pred)*(r - pred)
   }
 }.mean()

 The code doesn't work because the userFeatures and productFeatures
 stored in the model are RDDs. You tried to serialize them into the
 task closure, and execute `model.predict` on an executor, which won't
 work because `model.predict` can only be called on the driver. We
 should make this clear in the doc. You should use what Burak
 suggested:

 val predictions = model.predict(data.map(x = (x.user, x.product)))

 Best,
 Xiangrui

 On Thu, Aug 7, 2014 at 1:20 PM, Burak Yavuz bya...@stanford.edu wrote:
  Hi Jay,
 
  I've had the same problem you've been having in Question 1 with a
 synthetic dataset. I thought I wasn't producing the dataset well enough.
 This seems to
  be a bug. I will open a JIRA for it.
 
  Instead of using:
 
  ratings.map{ case Rating(u,m,r) = {
  val pred = model.predict(u, m)
  (r - pred)*(r - pred)
}
  }.mean()
 
  you can use something like:
 
  val predictions: RDD[Rating] = model.predict(data.map(x = (x.user,
 x.product)))
  val predictionsAndRatings: RDD[(Double, Double)] = predictions.map{ x =
def mapPredictedRating(r: Double) = if (implicitPrefs)
 math.max(math.min(r, 1.0), 0.0) else r
((x.user, x.product), mapPredictedRating(x.rating))
  }.join(data.map(x = ((x.user, x.product), x.rating))).values
 
  math.sqrt(predictionsAndRatings.map(x = (x._1 - x._2) * (x._1 -
 x._2)).mean())
 
  This work around worked for me.
 
  Regarding your question 2, it will be best of you do a special filtering
 of the dataset so that you do train for that user and product.
  If we don't have any data trained on a user, there is no way to predict
 how he would like a product.
  That filtering takes a lot of work though. I can share some code on that
 too if you like.
 
  Best,
  Burak
 
  - Original Message -
  From: Jay Hutfles jayhutf...@gmail.com
  To: user@spark.apache.org
  Sent: Thursday, August 7, 2014 1:06:33 PM
  Subject: questions about MLLib recommendation models
 
  I have a few questions regarding a collaborative filtering model, and was
  hoping for some recommendations (no pun intended...)
 
  *Setup*
 
  I have a csv file with user/movie/ratings named unimaginatively
  'movies.csv'.  Here are the contents:
 
  0,0,5
  0,1,5
  0,2,0
  0,3,0
  1,0,5
  1,3,0
  2,1,4
  2,2,0
  3,0,0
  3,1,0
  3,2,5
  3,3,4
  4,0,0
  4,1,0
  4,2,5
 
  I then load it into an RDD with a nice command like
 
  val ratings = sc.textFile(movies.csv).map(_.split(',') match { case
  Array(u,m,r) = (Rating(u.toInt, m.toInt, r.toDouble))})
 
  So far so good.  I'm even okay building a model for predicting the absent
  values in the matrix with
 
  val rank = 10
  val iters = 20
  val model = ALS.train(ratings, rank, iters)
 
  I can then use the model to predict any user/movie rating without
 trouble,
  like
 
  model.predict(2, 0)
 
  *Question 1: *
 
  If I were to calculate, say, the mean squared error of the training set
 (or
  to my next question, a test set), this doesn't work:
 
  ratings.map{ case Rating(u,m,r) = {
  val pred = model.predict(u, m)
  (r - pred)*(r - pred)
}
  }.mean()
 
  Actually, any action on RDDs created by mapping over the RDD[Rating]
 with a
  model prediction  fails, like
 
  ratings.map{ case Rating(u, m, _) = model.predict(u, m) }.collect
 
  I get errors due to a scala.MatchError: null.  Here's the exact
 verbiage:
 
 
  org.apache.spark.SparkException: Job aborted due to stage failure: Task
  26150.0:1 failed 1 times, most recent failure: Exception failure in TID
  7091 on host localhost: scala.MatchError: null
 
  org.apache.spark.rdd.PairRDDFunctions.lookup(PairRDDFunctions.scala:571)
 
 
 org.apache.spark.mllib.recommendation.MatrixFactorizationModel.predict(MatrixFactorizationModel.scala:43)
  $iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:18)
  $iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:18)
  scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
  scala.collection.Iterator$class.foreach(Iterator.scala:727)
  scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 
  scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
 
  scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
 
  scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
  scala.collection.TraversableOnce$class.to
 (TraversableOnce.scala:273)
  scala.collection.AbstractIterator.to(Iterator.scala:1157)
 
 
 scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
  scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
 
  scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
  scala.collection.AbstractIterator.toArray

questions about MLLib recommendation models

2014-08-07 Thread Jay Hutfles
I have a few questions regarding a collaborative filtering model, and was
hoping for some recommendations (no pun intended...)

*Setup*

I have a csv file with user/movie/ratings named unimaginatively
'movies.csv'.  Here are the contents:

0,0,5
0,1,5
0,2,0
0,3,0
1,0,5
1,3,0
2,1,4
2,2,0
3,0,0
3,1,0
3,2,5
3,3,4
4,0,0
4,1,0
4,2,5

I then load it into an RDD with a nice command like

val ratings = sc.textFile(movies.csv).map(_.split(',') match { case
Array(u,m,r) = (Rating(u.toInt, m.toInt, r.toDouble))})

So far so good.  I'm even okay building a model for predicting the absent
values in the matrix with

val rank = 10
val iters = 20
val model = ALS.train(ratings, rank, iters)

I can then use the model to predict any user/movie rating without trouble,
like

model.predict(2, 0)

*Question 1: *

If I were to calculate, say, the mean squared error of the training set (or
to my next question, a test set), this doesn't work:

ratings.map{ case Rating(u,m,r) = {
val pred = model.predict(u, m)
(r - pred)*(r - pred)
  }
}.mean()

Actually, any action on RDDs created by mapping over the RDD[Rating] with a
model prediction  fails, like

ratings.map{ case Rating(u, m, _) = model.predict(u, m) }.collect

I get errors due to a scala.MatchError: null.  Here's the exact verbiage:


org.apache.spark.SparkException: Job aborted due to stage failure: Task
26150.0:1 failed 1 times, most recent failure: Exception failure in TID
7091 on host localhost: scala.MatchError: null

org.apache.spark.rdd.PairRDDFunctions.lookup(PairRDDFunctions.scala:571)

org.apache.spark.mllib.recommendation.MatrixFactorizationModel.predict(MatrixFactorizationModel.scala:43)
$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:18)
$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:18)
scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
scala.collection.Iterator$class.foreach(Iterator.scala:727)
scala.collection.AbstractIterator.foreach(Iterator.scala:1157)

scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)

scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)

scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
scala.collection.AbstractIterator.to(Iterator.scala:1157)

scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)

scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:717)
org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:717)

org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1083)

org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1083)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
org.apache.spark.scheduler.Task.run(Task.scala:51)

org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:744)

I think I'm missing something, since I can build up a scala collection of
the exact (user, movie) tuples I'm testing, map over that with the model
prediction, and it works fine.  But if I map over the RDD[Rating], it
doesn't.  Am I doing something obviously wrong?

*Question 2:*

I have a much larger data set, and instead of running the ALS algorithm on
the whole set, it seems prudent to use the kFolds method in
org.apache.spark.mllib.util.MLUtils to generate training/testing splits.

It's rather sparse data, and there are cases where the test set has both
users and movies that are not present in any Ratings in the training set.
 When encountering these, the model shouts at me:

java.util.NoSuchElementException: next on empty iterator

Is it the case that the Alternating Least Squares method doesn't create
models which predict values for untrained users/products?  My high-level
understanding of the ALS implementation makes it seem understandable that
the calculations depend on at least one rating for each user, and at least
one for each movie.  Is that true?

If so, should I simply filter out entries from the test set which have
users or movies absent from the training set?  Or is kMeans not an
appropriate way to generate test data for collaborative filtering?

Actually, I should have probably just asked, What is the best way to do
testing for recommendation models?  Leave it nice and general...

Thanks in advance.  Sorry for the long ramble.
   Jay