Re: [GraphX] Excessive value recalculations during aggregateMessages cycles
I changed the curGraph = curGraph.outerJoinVertices(curMessages)( (vid, vertex, message) = vertex.process(message.getOrElse(List[Message]()), ti) ).cache() to curGraph = curGraph.outerJoinVertices(curMessages)( (vid, vertex, message) = (vertex, message.getOrElse(List[Message]())) ).mapVertices( (x,y) = y._1.process( y._2, ti ) ).cache() So the call to the 'process' method was moved out of the outerJoinVertices and into a separate mapVertices call, and the problem went away. Now, 'process' is only called once during the correct cycle. So it would appear that outerJoinVertices caches the closure to be recalculated if needed again while mapVertices actually caches the derived values. Is this a bug or a feature? Kyle On Sat, Feb 7, 2015 at 11:44 PM, Kyle Ellrott kellr...@soe.ucsc.edu wrote: I'm trying to setup a simple iterative message/update problem in GraphX (spark 1.2.0), but I'm running into issues with the caching and re-calculation of data. I'm trying to follow the example found in the Pregel implementation of materializing and cacheing messages and graphs and then unpersisting them after the next cycle has been done. It doesn't seem to be working, because every cycle gets progressively slower and it seems as if more and more of the values are being re-calculated despite my attempts to cache them. The code: ``` var oldMessages : VertexRDD[List[Message]] = null var oldGraph : Graph[MyVertex, MyEdge ] = null curGraph = curGraph.mapVertices((x, y) = y.init()) for (i - 0 to cycle_count) { val curMessages = curGraph.aggregateMessages[List[Message]](x = { //send messages . }, (x, y) = { //collect messages into lists val out = x ++ y out } ).cache() curMessages.count() val ti = i oldGraph = curGraph curGraph = curGraph.outerJoinVertices(curMessages)( (vid, vertex, message) = vertex.process(message.getOrElse(List[Message]()), ti) ).cache() curGraph.vertices.count() oldGraph.unpersistVertices(blocking = false) oldGraph.edges.unpersist(blocking = false) oldGraph = curGraph if (oldMessages != null ) { oldMessages.unpersist(blocking=false) } oldMessages = curMessages } ``` The MyVertex.process method takes the list of incoming messages, averages them and returns a new MyVertex object. I've also set it up to append the cycle number (the second argument) into a log file named after the vertex. What ends up getting dumped into the log file for every vertex (in the exact same pattern) is ``` Cycle: 0 Cycle: 1 Cycle: 0 Cycle: 2 Cycle: 0 Cycle: 0 Cycle: 1 Cycle: 3 Cycle: 0 Cycle: 0 Cycle: 1 Cycle: 0 Cycle: 0 Cycle: 1 Cycle: 2 Cycle: 4 Cycle: 0 Cycle: 0 Cycle: 1 Cycle: 0 Cycle: 0 Cycle: 1 Cycle: 2 Cycle: 0 Cycle: 0 Cycle: 1 Cycle: 0 Cycle: 0 Cycle: 1 Cycle: 2 Cycle: 3 Cycle: 5 ``` Any ideas about what I might be doing wrong for the caching? And how I can avoid re-calculating so many of the values. Kyle
[GraphX] Excessive value recalculations during aggregateMessages cycles
I'm trying to setup a simple iterative message/update problem in GraphX (spark 1.2.0), but I'm running into issues with the caching and re-calculation of data. I'm trying to follow the example found in the Pregel implementation of materializing and cacheing messages and graphs and then unpersisting them after the next cycle has been done. It doesn't seem to be working, because every cycle gets progressively slower and it seems as if more and more of the values are being re-calculated despite my attempts to cache them. The code: ``` var oldMessages : VertexRDD[List[Message]] = null var oldGraph : Graph[MyVertex, MyEdge ] = null curGraph = curGraph.mapVertices((x, y) = y.init()) for (i - 0 to cycle_count) { val curMessages = curGraph.aggregateMessages[List[Message]](x = { //send messages . }, (x, y) = { //collect messages into lists val out = x ++ y out } ).cache() curMessages.count() val ti = i oldGraph = curGraph curGraph = curGraph.outerJoinVertices(curMessages)( (vid, vertex, message) = vertex.process(message.getOrElse(List[Message]()), ti) ).cache() curGraph.vertices.count() oldGraph.unpersistVertices(blocking = false) oldGraph.edges.unpersist(blocking = false) oldGraph = curGraph if (oldMessages != null ) { oldMessages.unpersist(blocking=false) } oldMessages = curMessages } ``` The MyVertex.process method takes the list of incoming messages, averages them and returns a new MyVertex object. I've also set it up to append the cycle number (the second argument) into a log file named after the vertex. What ends up getting dumped into the log file for every vertex (in the exact same pattern) is ``` Cycle: 0 Cycle: 1 Cycle: 0 Cycle: 2 Cycle: 0 Cycle: 0 Cycle: 1 Cycle: 3 Cycle: 0 Cycle: 0 Cycle: 1 Cycle: 0 Cycle: 0 Cycle: 1 Cycle: 2 Cycle: 4 Cycle: 0 Cycle: 0 Cycle: 1 Cycle: 0 Cycle: 0 Cycle: 1 Cycle: 2 Cycle: 0 Cycle: 0 Cycle: 1 Cycle: 0 Cycle: 0 Cycle: 1 Cycle: 2 Cycle: 3 Cycle: 5 ``` Any ideas about what I might be doing wrong for the caching? And how I can avoid re-calculating so many of the values. Kyle
Re: dockerized spark executor on mesos?
I'd like to tag a question onto this; has anybody attempted to deploy spark under Kubernetes https://github.com/googlecloudplatform/kubernetes or Kubernetes mesos ( https://github.com/mesosphere/kubernetes-mesos ) . On Wednesday, December 3, 2014, Matei Zaharia matei.zaha...@gmail.com wrote: I'd suggest asking about this on the Mesos list (CCed). As far as I know, there was actually some ongoing work for this. Matei On Dec 3, 2014, at 9:46 AM, Dick Davies d...@hellooperator.net javascript:; wrote: Just wondered if anyone had managed to start spark jobs on mesos wrapped in a docker container? At present (i.e. very early testing) I'm able to submit executors to mesos via spark-submit easily enough, but they fall over as we don't have a JVM on our slaves out of the box. I can push one out via our CM system if push comes to shove, but it'd be nice to have that as part of the job (I'm thinking it might be a way to get some of the dependencies deployed too). bear in mind I'm a total clueless newbie at this so please be gentle if I'm doing this completely wrong. Thanks! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org javascript:; For additional commands, e-mail: user-h...@spark.apache.org javascript:; - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org javascript:; For additional commands, e-mail: user-h...@spark.apache.org javascript:;
Re: Large Task Size?
I'm still having trouble with this one. Watching it, I've noticed that the first time around, the task size is large, but not terrible (199KB). It's on the second iteration of the optimization that the task size goes crazy (120MB). Does anybody have any ideas why this might be happening? Is there anyway that I can view the data being encoded in the task description, so that I might be able to get an idea why it is blowing up? The line in question can be found at: https://github.com/kellrott/spark/blob/mllib-grouped/mllib/src/main/scala/org/apache/spark/mllib/grouped/GroupedGradientDescent.scala#L157 From the logs: 14/07/19 17:29:35 WARN scheduler.TaskSetManager: Stage 858 contains a task of very large size (199 KB). The maximum recommended task size is 100 KB. 14/07/19 17:30:22 WARN scheduler.TaskSetManager: Stage 857 contains a task of very large size (185 KB). The maximum recommended task size is 100 KB. 14/07/19 17:30:26 WARN scheduler.TaskSetManager: Stage 1029 contains a task of very large size (185 KB). The maximum recommended task size is 100 KB. 14/07/19 17:30:57 WARN scheduler.TaskSetManager: Stage 1202 contains a task of very large size (123414 KB). The maximum recommended task size is 100 KB. From the web server (connecting the stage number to the line number) Stage Id Description 858 sample at GroupedGradientDescent.scala:157 857 collect at GroupedGradientDescent.scala:183 1029collect at GroupedGradientDescent.scala:194 1202sample at GroupedGradientDescent.scala:157 Kyle On Tue, Jul 15, 2014 at 2:45 PM, Kyle Ellrott kellr...@soe.ucsc.edu wrote: Yes, this is a proposed patch to MLLib so that you can use 1 RDD to train multiple models at the same time. I am hoping that by multiplexing several models in the same RDD will be more efficient then trying to get the Spark scheduler to manage a few 100 tasks simultaneously. I don't think I see stochasticLossHistory being included in the closure (please correct me if I'm wrong). Its used once on line 183 to capture the loss sums (a local operation on the results of a 'collect' call), and again on line 198 to update weightSet, but that's after the loop completes, and the memory blow definitely happens before then. Kyle On Tue, Jul 15, 2014 at 12:00 PM, Aaron Davidson ilike...@gmail.com wrote: Ah, I didn't realize this was non-MLLib code. Do you mean to be sending stochasticLossHistory in the closure as well? On Sun, Jul 13, 2014 at 1:05 AM, Kyle Ellrott kellr...@soe.ucsc.edu wrote: It uses the standard SquaredL2Updater, and I also tried to broadcast it as well. The input is a RDD created by taking the union of several inputs, that have all been run against MLUtils.kFold to produce even more RDDs. If I run with 10 different inputs, each with 10 kFolds. I'm pretty certain that all of the input RDDs have clean closures. But I'm curious, is there a high overhead for running union? Could that create larger task sizes? Kyle On Sat, Jul 12, 2014 at 7:50 PM, Aaron Davidson ilike...@gmail.com wrote: I also did a quick glance through the code and couldn't find anything worrying that should be included in the task closures. The only possibly unsanitary part is the Updater you pass in -- what is your Updater and is it possible it's dragging in a significant amount of extra state? On Sat, Jul 12, 2014 at 7:27 PM, Kyle Ellrott kellr...@soe.ucsc.edu wrote: I'm working of a patch to MLLib that allows for multiplexing several different model optimization using the same RDD ( SPARK-2372: https://issues.apache.org/jira/browse/SPARK-2372 ) In testing larger datasets, I've started to see some memory errors ( java.lang.OutOfMemoryError and exceeds max allowed: spark.akka.frameSize errors ). My main clue is that Spark will start logging warning on smaller systems like: 14/07/12 19:14:46 WARN scheduler.TaskSetManager: Stage 2862 contains a task of very large size (10119 KB). The maximum recommended task size is 100 KB. Looking up start '2862' in the case leads to a 'sample at GroupedGradientDescent.scala:156' call. That code can be seen at https://github.com/kellrott/spark/blob/mllib-grouped/mllib/src/main/scala/org/apache/spark/mllib/grouped/GroupedGradientDescent.scala#L156 I've looked over the code, I'm broadcasting the larger variables, and between the sampler and the combineByKey, I wouldn't think there much data being moved over the network, much less a 10MB chunk. Any ideas of what this might be a symptom of? Kyle
Re: Large Task Size?
Yes, this is a proposed patch to MLLib so that you can use 1 RDD to train multiple models at the same time. I am hoping that by multiplexing several models in the same RDD will be more efficient then trying to get the Spark scheduler to manage a few 100 tasks simultaneously. I don't think I see stochasticLossHistory being included in the closure (please correct me if I'm wrong). Its used once on line 183 to capture the loss sums (a local operation on the results of a 'collect' call), and again on line 198 to update weightSet, but that's after the loop completes, and the memory blow definitely happens before then. Kyle On Tue, Jul 15, 2014 at 12:00 PM, Aaron Davidson ilike...@gmail.com wrote: Ah, I didn't realize this was non-MLLib code. Do you mean to be sending stochasticLossHistory in the closure as well? On Sun, Jul 13, 2014 at 1:05 AM, Kyle Ellrott kellr...@soe.ucsc.edu wrote: It uses the standard SquaredL2Updater, and I also tried to broadcast it as well. The input is a RDD created by taking the union of several inputs, that have all been run against MLUtils.kFold to produce even more RDDs. If I run with 10 different inputs, each with 10 kFolds. I'm pretty certain that all of the input RDDs have clean closures. But I'm curious, is there a high overhead for running union? Could that create larger task sizes? Kyle On Sat, Jul 12, 2014 at 7:50 PM, Aaron Davidson ilike...@gmail.com wrote: I also did a quick glance through the code and couldn't find anything worrying that should be included in the task closures. The only possibly unsanitary part is the Updater you pass in -- what is your Updater and is it possible it's dragging in a significant amount of extra state? On Sat, Jul 12, 2014 at 7:27 PM, Kyle Ellrott kellr...@soe.ucsc.edu wrote: I'm working of a patch to MLLib that allows for multiplexing several different model optimization using the same RDD ( SPARK-2372: https://issues.apache.org/jira/browse/SPARK-2372 ) In testing larger datasets, I've started to see some memory errors ( java.lang.OutOfMemoryError and exceeds max allowed: spark.akka.frameSize errors ). My main clue is that Spark will start logging warning on smaller systems like: 14/07/12 19:14:46 WARN scheduler.TaskSetManager: Stage 2862 contains a task of very large size (10119 KB). The maximum recommended task size is 100 KB. Looking up start '2862' in the case leads to a 'sample at GroupedGradientDescent.scala:156' call. That code can be seen at https://github.com/kellrott/spark/blob/mllib-grouped/mllib/src/main/scala/org/apache/spark/mllib/grouped/GroupedGradientDescent.scala#L156 I've looked over the code, I'm broadcasting the larger variables, and between the sampler and the combineByKey, I wouldn't think there much data being moved over the network, much less a 10MB chunk. Any ideas of what this might be a symptom of? Kyle
Large Task Size?
I'm working of a patch to MLLib that allows for multiplexing several different model optimization using the same RDD ( SPARK-2372: https://issues.apache.org/jira/browse/SPARK-2372 ) In testing larger datasets, I've started to see some memory errors ( java.lang.OutOfMemoryError and exceeds max allowed: spark.akka.frameSize errors ). My main clue is that Spark will start logging warning on smaller systems like: 14/07/12 19:14:46 WARN scheduler.TaskSetManager: Stage 2862 contains a task of very large size (10119 KB). The maximum recommended task size is 100 KB. Looking up start '2862' in the case leads to a 'sample at GroupedGradientDescent.scala:156' call. That code can be seen at https://github.com/kellrott/spark/blob/mllib-grouped/mllib/src/main/scala/org/apache/spark/mllib/grouped/GroupedGradientDescent.scala#L156 I've looked over the code, I'm broadcasting the larger variables, and between the sampler and the combineByKey, I wouldn't think there much data being moved over the network, much less a 10MB chunk. Any ideas of what this might be a symptom of? Kyle
Re: Improving Spark multithreaded performance?
This all seems pretty hackish and a lot of trouble to get around limitations in mllib. The big limitation is that right now, the optimization algorithms work on one large dataset at a time. We need a second of set of methods to work on a large number of medium sized datasets. I've started to code a new set of optimization methods to add into mllib. I've started with GroupedGradientDecent ( https://github.com/kellrott/spark/blob/mllib-grouped/mllib/src/main/scala/org/apache/spark/mllib/grouped/GroupedGradientDescent.scala ) GroupedGradientDecent is based on GradientDecent, but instead, it takes RDD[(Int, (Double, Vector))] as its data input rather then RDD[(Double, Vector)]. The Int serves as key to mark which elements should be grouped together. This lets you multiplex several dataset optimizations into the same RDD. I think I've gotten the GroupedGradientDecent to work correctly. I need to go up the stack and start adding methods like SVMWithSGD.trainGroup. Does anybody have any thoughts on this? Kyle On Fri, Jun 27, 2014 at 6:36 PM, Xiangrui Meng men...@gmail.com wrote: The RDD is cached in only one or two workers. All other executors need to fetch its content via network. Since the dataset is not huge, could you try this? val features: Array[Vector] = ... val featuresBc = sc.broadcast(features) // parallel loops val labels: Array[Double] = val rdd = sc.parallelize(0 until 1, 1).flatMap(i = featuresBc.value.view.zip(labels)) val model = SVMWithSGD.train(rdd) models(i) = model Using BT broadcast factory would improve the performance of broadcasting. Best, Xiangrui On Fri, Jun 27, 2014 at 3:06 PM, Kyle Ellrott kellr...@soe.ucsc.edu wrote: 1) I'm using the static SVMWithSGD.train, with no options. 2) I have about 20,000 features (~5000 samples) that are being attached and trained against 14,000 different sets of labels (ie I'll be doing 14,000 different training runs against the same sets of features trying to figure out which labels can be learned), and I would also like to do cross fold validation. The driver doesn't seem to be using too much memory. I left it as -Xmx8g and it never complained. Kyle On Fri, Jun 27, 2014 at 1:18 PM, Xiangrui Meng men...@gmail.com wrote: Hi Kyle, A few questions: 1) Did you use `setIntercept(true)`? 2) How many features? I'm a little worried about driver's load because the final aggregation and weights update happen on the driver. Did you check driver's memory usage as well? Best, Xiangrui On Fri, Jun 27, 2014 at 8:10 AM, Kyle Ellrott kellr...@soe.ucsc.edu wrote: As far as I can tell there are is no data to broadcast (unless there is something internal to mllib that needs to be broadcast) I've coalesced the input RDDs to keep the number of partitions limited. When running, I've tried to get up to 500 concurrent stages, and I've coalesced the RDDs down to 2 partitions, so about 1000 tasks. Despite having over 500 threads in the threadpool working on mllib tasks, the total CPU usage never really goes above 150%. I've tried increasing 'spark.akka.threads' but that doesn't seem to do anything. My one thought would be that maybe because I'm using MLUtils.kFold to generate the RDDs is that because I have so many tasks working off RDDs that are permutations of original RDDs that maybe that is creating some sort of dependency bottleneck. Kyle On Thu, Jun 26, 2014 at 6:35 PM, Aaron Davidson ilike...@gmail.com wrote: I don't have specific solutions for you, but the general things to try are: - Decrease task size by broadcasting any non-trivial objects. - Increase duration of tasks by making them less fine-grained. How many tasks are you sending? I've seen in the past something like 25 seconds for ~10k total medium-sized tasks. On Thu, Jun 26, 2014 at 12:06 PM, Kyle Ellrott kellr...@soe.ucsc.edu wrote: I'm working to set up a calculation that involves calling mllib's SVMWithSGD.train several thousand times on different permutations of the data. I'm trying to run the separate jobs using a threadpool to dispatch the different requests to a spark context connected a Mesos's cluster, using course scheduling, and a max of 2000 cores on Spark 1.0. Total utilization of the system is terrible. Most of the 'aggregate at GradientDescent.scala:178' stages(where mllib spends most of its time) take about 3 seconds, but have ~25 seconds of scheduler delay time. What kind of things can I do to improve this? Kyle
Re: Improving Spark multithreaded performance?
1) I'm using the static SVMWithSGD.train, with no options. 2) I have about 20,000 features (~5000 samples) that are being attached and trained against 14,000 different sets of labels (ie I'll be doing 14,000 different training runs against the same sets of features trying to figure out which labels can be learned), and I would also like to do cross fold validation. The driver doesn't seem to be using too much memory. I left it as -Xmx8g and it never complained. Kyle On Fri, Jun 27, 2014 at 1:18 PM, Xiangrui Meng men...@gmail.com wrote: Hi Kyle, A few questions: 1) Did you use `setIntercept(true)`? 2) How many features? I'm a little worried about driver's load because the final aggregation and weights update happen on the driver. Did you check driver's memory usage as well? Best, Xiangrui On Fri, Jun 27, 2014 at 8:10 AM, Kyle Ellrott kellr...@soe.ucsc.edu wrote: As far as I can tell there are is no data to broadcast (unless there is something internal to mllib that needs to be broadcast) I've coalesced the input RDDs to keep the number of partitions limited. When running, I've tried to get up to 500 concurrent stages, and I've coalesced the RDDs down to 2 partitions, so about 1000 tasks. Despite having over 500 threads in the threadpool working on mllib tasks, the total CPU usage never really goes above 150%. I've tried increasing 'spark.akka.threads' but that doesn't seem to do anything. My one thought would be that maybe because I'm using MLUtils.kFold to generate the RDDs is that because I have so many tasks working off RDDs that are permutations of original RDDs that maybe that is creating some sort of dependency bottleneck. Kyle On Thu, Jun 26, 2014 at 6:35 PM, Aaron Davidson ilike...@gmail.com wrote: I don't have specific solutions for you, but the general things to try are: - Decrease task size by broadcasting any non-trivial objects. - Increase duration of tasks by making them less fine-grained. How many tasks are you sending? I've seen in the past something like 25 seconds for ~10k total medium-sized tasks. On Thu, Jun 26, 2014 at 12:06 PM, Kyle Ellrott kellr...@soe.ucsc.edu wrote: I'm working to set up a calculation that involves calling mllib's SVMWithSGD.train several thousand times on different permutations of the data. I'm trying to run the separate jobs using a threadpool to dispatch the different requests to a spark context connected a Mesos's cluster, using course scheduling, and a max of 2000 cores on Spark 1.0. Total utilization of the system is terrible. Most of the 'aggregate at GradientDescent.scala:178' stages(where mllib spends most of its time) take about 3 seconds, but have ~25 seconds of scheduler delay time. What kind of things can I do to improve this? Kyle
Improving Spark multithreaded performance?
I'm working to set up a calculation that involves calling mllib's SVMWithSGD.train several thousand times on different permutations of the data. I'm trying to run the separate jobs using a threadpool to dispatch the different requests to a spark context connected a Mesos's cluster, using course scheduling, and a max of 2000 cores on Spark 1.0. Total utilization of the system is terrible. Most of the 'aggregate at GradientDescent.scala:178' stages(where mllib spends most of its time) take about 3 seconds, but have ~25 seconds of scheduler delay time. What kind of things can I do to improve this? Kyle
Re: Parallel LogisticRegression?
I've tried to parallelize the separate regressions using allResponses.toParArray.map( x= do logistic regression against labels in x) But I start to see messages like 14/06/20 10:10:26 WARN scheduler.TaskSetManager: Lost TID 4193 (task 363.0:4) 14/06/20 10:10:27 WARN scheduler.TaskSetManager: Loss was due to fetch failure from null and finally 14/06/20 10:10:26 ERROR scheduler.TaskSetManager: Task 363.0:4 failed 4 times; aborting job Then 14/06/20 10:10:26 ERROR scheduler.DAGSchedulerActorSupervisor: eventProcesserActor failed due to the error null; shutting down SparkContext 14/06/20 10:10:26 ERROR actor.OneForOneStrategy: java.lang.UnsupportedOperationException at org.apache.spark.scheduler.SchedulerBackend$class.killTask(SchedulerBackend.scala:32) at org.apache.spark.scheduler.cluster.mesos.MesosSchedulerBackend.killTask(MesosSchedulerBackend.scala:41) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$cancelTasks$3$$anonfun$apply$1.apply$mcVJ$sp(TaskSchedulerImpl.scala:185) This doesn't happen when I don't use toParArray. I read that spark was thread safe, but I seem to be running into problems. Am I doing something wrong? Kyle On Thu, Jun 19, 2014 at 11:21 AM, Kyle Ellrott kellr...@soe.ucsc.edu wrote: I'm working on a problem learning several different sets of responses against the same set of training features. Right now I've written the program to cycle through all of the different label sets, attached them to the training data and run LogisticRegressionWithSGD on each of them. ie foreach curResponseSet in allResponses: currentRDD : RDD[LabeledPoints] = curResponseSet joined with trainingData LogisticRegressionWithSGD.train(currentRDD) Each of the different training runs are independent. It seems like I should be parallelize them as well. Is there a better way to do this? Kyle
Re: Parallel LogisticRegression?
I looks like I was running into https://issues.apache.org/jira/browse/SPARK-2204 The issues went away when I changed to spark.mesos.coarse. Kyle On Fri, Jun 20, 2014 at 10:36 AM, Kyle Ellrott kellr...@soe.ucsc.edu wrote: I've tried to parallelize the separate regressions using allResponses.toParArray.map( x= do logistic regression against labels in x) But I start to see messages like 14/06/20 10:10:26 WARN scheduler.TaskSetManager: Lost TID 4193 (task 363.0:4) 14/06/20 10:10:27 WARN scheduler.TaskSetManager: Loss was due to fetch failure from null and finally 14/06/20 10:10:26 ERROR scheduler.TaskSetManager: Task 363.0:4 failed 4 times; aborting job Then 14/06/20 10:10:26 ERROR scheduler.DAGSchedulerActorSupervisor: eventProcesserActor failed due to the error null; shutting down SparkContext 14/06/20 10:10:26 ERROR actor.OneForOneStrategy: java.lang.UnsupportedOperationException at org.apache.spark.scheduler.SchedulerBackend$class.killTask(SchedulerBackend.scala:32) at org.apache.spark.scheduler.cluster.mesos.MesosSchedulerBackend.killTask(MesosSchedulerBackend.scala:41) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$cancelTasks$3$$anonfun$apply$1.apply$mcVJ$sp(TaskSchedulerImpl.scala:185) This doesn't happen when I don't use toParArray. I read that spark was thread safe, but I seem to be running into problems. Am I doing something wrong? Kyle On Thu, Jun 19, 2014 at 11:21 AM, Kyle Ellrott kellr...@soe.ucsc.edu wrote: I'm working on a problem learning several different sets of responses against the same set of training features. Right now I've written the program to cycle through all of the different label sets, attached them to the training data and run LogisticRegressionWithSGD on each of them. ie foreach curResponseSet in allResponses: currentRDD : RDD[LabeledPoints] = curResponseSet joined with trainingData LogisticRegressionWithSGD.train(currentRDD) Each of the different training runs are independent. It seems like I should be parallelize them as well. Is there a better way to do this? Kyle
Parallel LogisticRegression?
I'm working on a problem learning several different sets of responses against the same set of training features. Right now I've written the program to cycle through all of the different label sets, attached them to the training data and run LogisticRegressionWithSGD on each of them. ie foreach curResponseSet in allResponses: currentRDD : RDD[LabeledPoints] = curResponseSet joined with trainingData LogisticRegressionWithSGD.train(currentRDD) Each of the different training runs are independent. It seems like I should be parallelize them as well. Is there a better way to do this? Kyle
GraphX vertices and connected edges
What is the most efficient way to an RDD of GraphX vertices and their connected edges? Initially I though I could use mapReduceTriplet, but I realized that would neglect vertices that aren't connected to anything Would I have to do a mapReduceTriplet and then do a join with all of the vertices to pick up the ones lost because there aren't part of a triplet? I'm hoping there is a simpler way I've neglected. Kyle