Re: reading compress lzo files

2014-07-05 Thread Gurvinder Singh
On 07/06/2014 05:19 AM, Nicholas Chammas wrote:
> On Fri, Jul 4, 2014 at 3:33 PM, Gurvinder Singh
> mailto:gurvinder.si...@uninett.no>> wrote:
> 
> csv =
> 
> sc.newAPIHadoopFile(opts.input,"com.hadoop.mapreduce.LzoTextInputFormat","org.apache.hadoop.io.LongWritable","org.apache.hadoop.io.Text").count()
> 
> Does anyone know what the rough equivalent of this would be in the Scala
> API?
> 
I am not sure, I haven't tested it using scala.
com.hadoop.mapreduce.LzoTextInputFormat class is from this package
https://github.com/twitter/hadoop-lzo

I have installed it from clourdera "hadoop-lzo" package with liblzo2-2
debian package on all of my workers. Make sure you have hadoop-lzo.jar
in your class path for spark.

- Gurvinder

> I am trying the following, but the first import yields an error on my
> |spark-ec2| cluster:
> 
> |import com.hadoop.mapreduce.LzoTextInputFormat
> import org.apache.hadoop.io.LongWritable
> import org.apache.hadoop.io.Text
> 
> sc.newAPIHadoopFile("s3n://datasets.elasticmapreduce/ngrams/books/20090715/eng-us-all/1gram/data",
>  LzoTextInputFormat, LongWritable, Text)
> |
> 
> |scala> import com.hadoop.mapreduce.LzoTextInputFormat
> :12: error: object hadoop is not a member of package com
>import com.hadoop.mapreduce.LzoTextInputFormat
> |
> 
> Nick
> 
> ​




Re: Execution stalls in LogisticRegressionWithSGD

2014-07-05 Thread Xiangrui Meng
Hi Bharath,

1) Did you sync the spark jar and conf to the worker nodes after build?
2) Since the dataset is not large, could you try local mode first
using `spark-summit --driver-memory 12g --master local[*]`?
3) Try to use less number of partitions, say 5.

If the problem is still there, please attach the full master/worker log files.

Best,
Xiangrui

On Fri, Jul 4, 2014 at 12:16 AM, Bharath Ravi Kumar  wrote:
> Xiangrui,
>
> Leaving the frameSize unspecified led to an error message (and failure)
> stating that the task size (~11M) was larger. I hence set it to an
> arbitrarily large value ( I realize 500 was unrealistic & unnecessary in
> this case). I've now set the size to 20M and repeated the runs. The earlier
> runs were on an uncached RDD. Caching the RDD (and setting
> spark.storage.memoryFraction=0.5) resulted in marginal speed up of
> execution, but the end result remained the same. The cached RDD size is as
> follows:
>
> RDD NameStorage LevelCached Partitions
> Fraction CachedSize in MemorySize in TachyonSize on Disk
> 1084 Memory Deserialized 1x Replicated 80
> 100% 165.9 MB 0.0 B 0.0 B
>
>
>
> The corresponding master logs were:
>
> 14/07/04 06:29:34 INFO Master: Removing executor app-20140704062238-0033/1
> because it is EXITED
> 14/07/04 06:29:34 INFO Master: Launching executor app-20140704062238-0033/2
> on worker worker-20140630124441-slave1-40182
> 14/07/04 06:29:34 INFO Master: Removing executor app-20140704062238-0033/0
> because it is EXITED
> 14/07/04 06:29:34 INFO Master: Launching executor app-20140704062238-0033/3
> on worker worker-20140630102913-slave2-44735
> 14/07/04 06:29:37 INFO Master: Removing executor app-20140704062238-0033/2
> because it is EXITED
> 14/07/04 06:29:37 INFO Master: Launching executor app-20140704062238-0033/4
> on worker worker-20140630124441-slave1-40182
> 14/07/04 06:29:37 INFO Master: Removing executor app-20140704062238-0033/3
> because it is EXITED
> 14/07/04 06:29:37 INFO Master: Launching executor app-20140704062238-0033/5
> on worker worker-20140630102913-slave2-44735
> 14/07/04 06:29:39 INFO Master: akka.tcp://spark@slave2:45172 got
> disassociated, removing it.
> 14/07/04 06:29:39 INFO Master: Removing app app-20140704062238-0033
> 14/07/04 06:29:39 INFO LocalActorRef: Message
> [akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from
> Actor[akka://sparkMaster/deadLetters] to
> Actor[akka://sparkMaster/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkMaster%4010.3.1.135%3A33061-123#1986674260]
> was not delivered. [39] dead letters encountered. This logging can be turned
> off or adjusted with configuration settings 'akka.log-dead-letters' and
> 'akka.log-dead-letters-during-shutdown'.
> 14/07/04 06:29:39 INFO Master: akka.tcp://spark@slave2:45172 got
> disassociated, removing it.
> 14/07/04 06:29:39 INFO Master: akka.tcp://spark@slave2:45172 got
> disassociated, removing it.
> 14/07/04 06:29:39 ERROR EndpointWriter: AssociationError
> [akka.tcp://sparkMaster@master:7077] -> [akka.tcp://spark@slave2:45172]:
> Error [Association failed with [akka.tcp://spark@slave2:45172]] [
> akka.remote.EndpointAssociationException: Association failed with
> [akka.tcp://spark@slave2:45172]
> Caused by:
> akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
> Connection refused: slave2/10.3.1.135:45172
> ]
> 14/07/04 06:29:39 INFO Master: akka.tcp://spark@slave2:45172 got
> disassociated, removing it.
> 14/07/04 06:29:39 ERROR EndpointWriter: AssociationError
> [akka.tcp://sparkMaster@master:7077] -> [akka.tcp://spark@slave2:45172]:
> Error [Association failed with [akka.tcp://spark@slave2:45172]] [
> akka.remote.EndpointAssociationException: Association failed with
> [akka.tcp://spark@slave2:45172]
> Caused by:
> akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
> Connection refused: slave2/10.3.1.135:45172
> ]
> 14/07/04 06:29:39 ERROR EndpointWriter: AssociationError
> [akka.tcp://sparkMaster@master:7077] -> [akka.tcp://spark@slave2:45172]:
> Error [Association failed with [akka.tcp://spark@slave2:45172]] [
> akka.remote.EndpointAssociationException: Association failed with
> [akka.tcp://spark@slave2:45172]
> Caused by:
> akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
> Connection refused: slave2/10.3.1.135:45172
> ]
> 14/07/04 06:29:39 INFO Master: akka.tcp://spark@slave2:45172 got
> disassociated, removing it.
> 14/07/04 06:29:40 WARN Master: Got status update for unknown executor
> app-20140704062238-0033/5
> 14/07/04 06:29:40 WARN Master: Got status update for unknown executor
> app-20140704062238-0033/4
>
>
> Coincidentally, after the initial executor failed, each following executor
> that was re-spawned failed with the following logs:
> (e.g the following was from
> slave1:~/spark-1.0.1-rc1/work/app-20140704062238-0033/2/stderr)
>
> log4j:WARN No append

Re: reading compress lzo files

2014-07-05 Thread Sean Owen
The package com.hadoop.mapreduce certainly looks wrong. If it is a Hadoop
class it starts with org.apache.hadoop
On Jul 6, 2014 4:20 AM, "Nicholas Chammas" 
wrote:

> On Fri, Jul 4, 2014 at 3:33 PM, Gurvinder Singh <
> gurvinder.si...@uninett.no> wrote:
>
> csv =
>> sc.newAPIHadoopFile(opts.input,"com.hadoop
>> .mapreduce.LzoTextInputFormat","org.apache.hadoop
>> .io.LongWritable","org.apache.hadoop.io.Text").count()
>>
> Does anyone know what the rough equivalent of this would be in the Scala
> API?
>
> I am trying the following, but the first import yields an error on my
> spark-ec2 cluster:
>
> import com.hadoop.mapreduce.LzoTextInputFormatimport 
> org.apache.hadoop.io.LongWritableimport org.apache.hadoop.io.Text
>
> sc.newAPIHadoopFile("s3n://datasets.elasticmapreduce/ngrams/books/20090715/eng-us-all/1gram/data",
>  LzoTextInputFormat, LongWritable, Text)
>
> scala> import com.hadoop.mapreduce.LzoTextInputFormat
> :12: error: object hadoop is not a member of package com
>import com.hadoop.mapreduce.LzoTextInputFormat
>
> Nick
> ​
>


Re: graphx Joining two VertexPartitions with different indexes is slow.

2014-07-05 Thread Ankur Dave
When joining two VertexRDDs with identical indexes, GraphX can use a fast
code path (a zip join without any hash lookups). However, the check for
identical indexes is performed using reference equality.

Without caching, two copies of the index are created. Although the two
indexes are structurally identical, they fail reference equality, and so
GraphX mistakenly uses the slow path involving a hash lookup per joined
element.

I'm working on a patch  that
attempts an optimistic zip join with per-element fallback to hash lookups,
which would improve this situation.

Ankur 


Re: reading compress lzo files

2014-07-05 Thread Nicholas Chammas
On Fri, Jul 4, 2014 at 3:33 PM, Gurvinder Singh 
wrote:

csv =
> sc.newAPIHadoopFile(opts.input,"com.hadoop
> .mapreduce.LzoTextInputFormat","org.apache.hadoop
> .io.LongWritable","org.apache.hadoop.io.Text").count()
>
Does anyone know what the rough equivalent of this would be in the Scala
API?

I am trying the following, but the first import yields an error on my
spark-ec2 cluster:

import com.hadoop.mapreduce.LzoTextInputFormatimport
org.apache.hadoop.io.LongWritableimport org.apache.hadoop.io.Text

sc.newAPIHadoopFile("s3n://datasets.elasticmapreduce/ngrams/books/20090715/eng-us-all/1gram/data",
LzoTextInputFormat, LongWritable, Text)

scala> import com.hadoop.mapreduce.LzoTextInputFormat
:12: error: object hadoop is not a member of package com
   import com.hadoop.mapreduce.LzoTextInputFormat

Nick
​


Re: graphx Joining two VertexPartitions with different indexes is slow.

2014-07-05 Thread Koert Kuipers
thanks for replying. why is joining two vertexrdds without caching slow?
what is recomputed unnecessarily?
i am not sure what is different here from joining 2 regular RDDs (where
nobody seems to recommend to cache before joining i think...)


On Thu, Jul 3, 2014 at 10:52 PM, Ankur Dave  wrote:

> Oh, I just read your message more carefully and noticed that you're
> joining a regular RDD with a VertexRDD. In that case I'm not sure why the
> warning is occurring, but it might be worth caching both operands
> (graph.vertices and the regular RDD) just to be sure.
>
> Ankur 
>
>


Re: window analysis with Spark and Spark streaming

2014-07-05 Thread Mayur Rustagi
Key idea is to simulate your app time as you enter data . So you can
connect spark streaming to a queue and insert data in it spaced by time.
Easier said than done :). What are the parallelism issues you are hitting
with your static approach.

On Friday, July 4, 2014, alessandro finamore 
wrote:

> Thanks for the replies
>
> What is not completely clear to me is how time is managed.
> I can create a DStream from file.
> But if I set the window property that will be bounded to the application
> time, right?
>
> If I got it right, with a receiver I can control the way DStream are
> created.
> But, how can apply then the windowing already shipped with the framework if
> this is bounded to the "application time"?
> I would like to do define a window of N files but the window() function
> requires a duration as input...
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/window-analysis-with-Spark-and-Spark-streaming-tp8806p8824.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


-- 
Sent from Gmail Mobile


Re: [mllib] strange/buggy results with RidgeRegressionWithSGD

2014-07-05 Thread DB Tsai
You may try LBFGS to have more stable convergence. In spark 1.1, we will be
able to use LBFGS instead of GD in training process.
On Jul 4, 2014 1:23 PM, "Thomas Robert"  wrote:

> Hi all,
>
> I too am having some issues with *RegressionWithSGD algorithms.
>
> Concerning your issue Eustache, this could be due to the fact that these
> regression algorithms uses a fixed step (that is divided by
> sqrt(iteration)). During my tests, quite often, the algorithm diverged an
> infinity cost, I guessed because the step was too big. I reduce it and
> managed to get good results on a very simple generated dataset.
>
> But I was wondering if anyone here had some advises concerning the use of
> these regression algorithms, for example how to choose a good step and
> number of iterations? I wonder if I'm using those right...
>
> Thanks,
>
> --
>
> *Thomas ROBERT*
> www.creativedata.fr
>
>
> 2014-07-03 16:16 GMT+02:00 Eustache DIEMERT :
>
>> Printing the model show the intercept is always 0 :(
>>
>> Should I open a bug for that ?
>>
>>
>> 2014-07-02 16:11 GMT+02:00 Eustache DIEMERT :
>>
>>> Hi list,
>>>
>>> I'm benchmarking MLlib for a regression task [1] and get strange
>>> results.
>>>
>>> Namely, using RidgeRegressionWithSGD it seems the predicted points miss
>>> the intercept:
>>>
>>> {code}
>>> val trainedModel = RidgeRegressionWithSGD.train(trainingData, 1000)
>>> ...
>>> valuesAndPreds.take(10).map(t => println(t))
>>> {code}
>>>
>>> output:
>>>
>>> (2007.0,-3.784588726958493E75)
>>> (2003.0,-1.9562390324037716E75)
>>> (2005.0,-4.147413202985629E75)
>>> (2003.0,-1.524938024096847E75)
>>> ...
>>>
>>> If I change the parameters (step size, regularization and iterations) I
>>> get NaNs more often than not:
>>> (2007.0,NaN)
>>> (2003.0,NaN)
>>> (2005.0,NaN)
>>> ...
>>>
>>> On the other hand DecisionTree model give sensible results.
>>>
>>> I see there is a `setIntercept()` method in abstract class
>>> GeneralizedLinearAlgorithm that seems to trigger the use of the intercept
>>> but I'm unable to use it from the public interface :(
>>>
>>> Any help appreciated :)
>>>
>>> Eustache
>>>
>>> [1] https://archive.ics.uci.edu/ml/datasets/YearPredictionMSD
>>>
>>
>


Re: Spark 1.0 failed on HDP 2.0 with absurd exception

2014-07-05 Thread Cesar Arevalo
>From looking at the exception message that was returned, I would try the
following command for running the application:

./bin/spark-submit --class test.etl.RunETL --master yarn-cluster
--num-workers 14 --driver-memory 3200m --worker-memory 3g --worker-cores 2
--jar my-etl-1.0-SNAPSHOT-hadoop2.2.0.jar


I didn't try this, so it may not work.

Best,
-Cesar



On Sat, Jul 5, 2014 at 2:48 AM, Konstantin Kudryavtsev <
kudryavtsev.konstan...@gmail.com> wrote:

> Hi all,
>
> I have cluster with HDP 2.0. I built Spark 1.0 on edge node and trying to
> run with a command
> ./bin/spark-submit --class test.etl.RunETL --master yarn-cluster
> --num-executors 14 --driver-memory 3200m --executor-memory 3g
> --executor-cores 2 my-etl-1.0-SNAPSHOT-hadoop2.2.0.jar
>
> in result I got failed YARN application with following stack trace
>
> Application application_1404481778533_0068 failed 3 times due to AM
> Container for appattempt_1404481778533_0068_03 exited with exitCode: 1
> due to: Exception from container-launch:
> org.apache.hadoop.util.Shell$ExitCodeException:
> at org.apache.hadoop.util.Shell.runCommand(Shell.java:464)
> at org.apache.hadoop.util.Shell.run(Shell.java:379)
> at
> org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:589)
> at
> org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:195)
> at
> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:283)
> at
> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:79)
> at java.util.concurrent.FutureTask.run(FutureTask.java:262)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:744)
>  .Failing this attempt.. Failing the application
>
> Log Type: stderr
>
> Log Length: 686
>
> Unknown/unsupported param List(--executor-memory, 3072, --executor-cores, 2, 
> --num-executors, 14)
> Usage: org.apache.spark.deploy.yarn.ApplicationMaster [options]
> Options:
>   --jar JAR_PATH   Path to your application's JAR file (required)
>   --class CLASS_NAME   Name of your application's main class (required)
>   --args ARGS  Arguments to be passed to your application's main 
> class.
>Mutliple invocations are possible, each will be passed 
> in order.
>   --num-workers NUMNumber of workers to start (Default: 2)
>   --worker-cores NUM   Number of cores for the workers (Default: 1)
>   --worker-memory MEM  Memory per Worker (e.g. 1000M, 2G) (Default: 1G)
>
>
> Seems like the old spark notation any ideas?
>
> Thank you,
> Konstantin Kudryavtsev
>



-- 
Cesar Arevalo
Software Engineer ❘ Zephyr Health
450 Mission Street, Suite #201 ❘ San Francisco, CA 94105
m: +1 415-571-7687 ❘ s: arevalocesar | t: @zephyrhealth

o: +1 415-529-7649 ❘ f: +1 415-520-9288
http://www.zephyrhealth.com


Re: How to use groupByKey and CqlPagingInputFormat

2014-07-05 Thread Martin Gammelsæter
Ah, I see. Thank you!

As we are in the process of building the system we have not tried with
any large amounts of data yet, but when the time comes I'll try both
implementations and do a small benchmark.

On Fri, Jul 4, 2014 at 9:20 PM, Mohammed Guller  wrote:
> As far as I know, there is not much difference, except that the outer 
> parenthesis is redundant. The problem with your original code was that there 
> was mismatch in the opening and closing parenthesis. Sometimes the error 
> messages are misleading :-)
>
> Do you see any performance difference with the Datastax spark driver?
>
> Mohammed
>
> -Original Message-
> From: Martin Gammelsæter [mailto:martingammelsae...@gmail.com]
> Sent: Friday, July 4, 2014 12:43 AM
> To: user@spark.apache.org
> Subject: Re: How to use groupByKey and CqlPagingInputFormat
>
> On Thu, Jul 3, 2014 at 10:29 PM, Mohammed Guller  
> wrote:
>> Martin,
>>
>> 1) The first map contains the columns in the primary key, which could be a 
>> compound primary key containing multiple columns,  and the second map 
>> contains all the non-key columns.
>
> Ah, thank you, that makes sense.
>
>> 2) try this fixed code:
>> val navnrevmap = casRdd.map{
>>   case (key, value) =>
>> (ByteBufferUtil.string(value.get("navn")),
>>ByteBufferUtil.toInt(value.get("revisjon")))
>>}.groupByKey()
>
> I changed from CqlPagingInputFormat to the new Datastax cassandra-spark 
> driver, which is a bit easier to work with, but thanks! I'm curious though, 
> what is the semantic difference between
> map({}) and map{}?



-- 
Mvh.
Martin Gammelsæter
92209139


Re: How to parallelize model fitting with different cross-validation folds?

2014-07-05 Thread Nick Pentreath
For linear models the 3rd option is by far most efficient and I suspect what 
Evan is alluding to. 


Unfortunately it's not directly possible with the classes in Mllib now so 
you'll have to roll your own using underlying sgd / bfgs primitives.
—
Sent from Mailbox

On Sat, Jul 5, 2014 at 10:45 AM, Christopher Nguyen 
wrote:

> Hi sparkuser2345,
> I'm inferring the problem statement is something like "how do I make this
> complete faster (given my compute resources)?"
> Several comments.
> First, Spark only allows launching parallel tasks from the driver, not from
> workers, which is why you're seeing the exception when you try. Whether the
> latter is a sensible/doable idea is another discussion, but I can
> appreciate why many people assume this should be possible.
> Second, on optimization, you may be able to apply Sean's idea about
> (thread) parallelism at the driver, combined with the knowledge that often
> these cluster tasks bottleneck while competing for the same resources at
> the same time (cpu vs disk vs network, etc.) You may be able to achieve
> some performance optimization by randomizing these timings. This is not
> unlike GMail randomizing user storage locations around the world for load
> balancing. Here, you would partition each of your RDDs into a different
> number of partitions, making some tasks larger than others, and thus some
> may be in cpu-intensive map while others are shuffling data around the
> network. This is rather cluster-specific; I'd be interested in what you
> learn from such an exercise.
> Third, I find it useful always to consider doing as much as possible in one
> pass, subject to memory limits, e.g., mapPartitions() vs map(), thus
> minimizing map/shuffle/reduce boundaries with their context switches and
> data shuffling. In this case, notice how you're running the
> training+prediction k times over mostly the same rows, with map/reduce
> boundaries in between. While the training phase is sealed in this context,
> you may be able to improve performance by collecting all the k models
> together, and do a [m x k] predictions all at once which may end up being
> faster.
> Finally, as implied from the above, for the very common k-fold
> cross-validation pattern, the algorithm itself might be written to be smart
> enough to take both train and test data and "do the right thing" within
> itself, thus obviating the need for the user to prepare k data sets and
> running over them serially, and likely saving a lot of repeated
> computations in the right internal places.
> Enjoy,
> --
> Christopher T. Nguyen
> Co-founder & CEO, Adatao 
> linkedin.com/in/ctnguyen
> On Sat, Jul 5, 2014 at 1:50 AM, Sean Owen  wrote:
>> If you call .par on data_kfolded it will become a parallel collection in
>> Scala and so the maps will happen in parallel .
>> On Jul 5, 2014 9:35 AM, "sparkuser2345"  wrote:
>>
>>> Hi,
>>>
>>> I am trying to fit a logistic regression model with cross validation in
>>> Spark 0.9.0 using SVMWithSGD. I have created an array data_kfolded where
>>> each element is a pair of RDDs containing the training and test data:
>>>
>>> (training_data: (RDD[org.apache.spark.mllib.regression.LabeledPoint],
>>> test_data: (RDD[org.apache.spark.mllib.regression.LabeledPoint])
>>>
>>> scala> data_kfolded
>>> res21:
>>>
>>> Array[(org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint],
>>> org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint])]
>>> =
>>> Array((MappedRDD[9] at map at :24,MappedRDD[7] at map at
>>> :23), (MappedRDD[13] at map at :24,MappedRDD[11] at map
>>> at
>>> :23), (MappedRDD[17] at map at :24,MappedRDD[15] at map
>>> at
>>> :23))
>>>
>>> Everything works fine when using data_kfolded:
>>>
>>> val validationErrors =
>>> data_kfolded.map { datafold =>
>>>   val svmAlg = new SVMWithSGD()
>>>   val model_reg = svmAlg.run(datafold._1)
>>>   val labelAndPreds = datafold._2.map { point =>
>>> val prediction = model_reg.predict(point.features)
>>> (point.label, prediction)
>>>   }
>>>   val trainErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble /
>>> datafold._2.count
>>>   trainErr.toDouble
>>> }
>>>
>>> scala> validationErrors
>>> res1: Array[Double] = Array(0.8819836785938481, 0.07082521117608837,
>>> 0.29833546734955185)
>>>
>>> However, I have understood that the models are not fitted in parallel as
>>> data_kfolded is not an RDD (although it's an array of pairs of RDDs). When
>>> running the same code where data_kfolded has been replaced with
>>> sc.parallelize(data_kfolded), I get a null pointer exception from the line
>>> where the run method of the SVMWithSGD object is called with the traning
>>> data. I guess this is somehow related to the fact that RDDs can't be
>>> accessed from inside a closure. I fail to understand though why the first
>>> version works and the second doesn't. Most importantly, is there a way to
>>> fit the models in parallel? I would really appreciate your help.
>>>

Re: How to parallelize model fitting with different cross-validation folds?

2014-07-05 Thread Christopher Nguyen
Hi sparkuser2345,

I'm inferring the problem statement is something like "how do I make this
complete faster (given my compute resources)?"

Several comments.

First, Spark only allows launching parallel tasks from the driver, not from
workers, which is why you're seeing the exception when you try. Whether the
latter is a sensible/doable idea is another discussion, but I can
appreciate why many people assume this should be possible.

Second, on optimization, you may be able to apply Sean's idea about
(thread) parallelism at the driver, combined with the knowledge that often
these cluster tasks bottleneck while competing for the same resources at
the same time (cpu vs disk vs network, etc.) You may be able to achieve
some performance optimization by randomizing these timings. This is not
unlike GMail randomizing user storage locations around the world for load
balancing. Here, you would partition each of your RDDs into a different
number of partitions, making some tasks larger than others, and thus some
may be in cpu-intensive map while others are shuffling data around the
network. This is rather cluster-specific; I'd be interested in what you
learn from such an exercise.

Third, I find it useful always to consider doing as much as possible in one
pass, subject to memory limits, e.g., mapPartitions() vs map(), thus
minimizing map/shuffle/reduce boundaries with their context switches and
data shuffling. In this case, notice how you're running the
training+prediction k times over mostly the same rows, with map/reduce
boundaries in between. While the training phase is sealed in this context,
you may be able to improve performance by collecting all the k models
together, and do a [m x k] predictions all at once which may end up being
faster.

Finally, as implied from the above, for the very common k-fold
cross-validation pattern, the algorithm itself might be written to be smart
enough to take both train and test data and "do the right thing" within
itself, thus obviating the need for the user to prepare k data sets and
running over them serially, and likely saving a lot of repeated
computations in the right internal places.

Enjoy,
--
Christopher T. Nguyen
Co-founder & CEO, Adatao 
linkedin.com/in/ctnguyen



On Sat, Jul 5, 2014 at 1:50 AM, Sean Owen  wrote:

> If you call .par on data_kfolded it will become a parallel collection in
> Scala and so the maps will happen in parallel .
> On Jul 5, 2014 9:35 AM, "sparkuser2345"  wrote:
>
>> Hi,
>>
>> I am trying to fit a logistic regression model with cross validation in
>> Spark 0.9.0 using SVMWithSGD. I have created an array data_kfolded where
>> each element is a pair of RDDs containing the training and test data:
>>
>> (training_data: (RDD[org.apache.spark.mllib.regression.LabeledPoint],
>> test_data: (RDD[org.apache.spark.mllib.regression.LabeledPoint])
>>
>> scala> data_kfolded
>> res21:
>>
>> Array[(org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint],
>> org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint])]
>> =
>> Array((MappedRDD[9] at map at :24,MappedRDD[7] at map at
>> :23), (MappedRDD[13] at map at :24,MappedRDD[11] at map
>> at
>> :23), (MappedRDD[17] at map at :24,MappedRDD[15] at map
>> at
>> :23))
>>
>> Everything works fine when using data_kfolded:
>>
>> val validationErrors =
>> data_kfolded.map { datafold =>
>>   val svmAlg = new SVMWithSGD()
>>   val model_reg = svmAlg.run(datafold._1)
>>   val labelAndPreds = datafold._2.map { point =>
>> val prediction = model_reg.predict(point.features)
>> (point.label, prediction)
>>   }
>>   val trainErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble /
>> datafold._2.count
>>   trainErr.toDouble
>> }
>>
>> scala> validationErrors
>> res1: Array[Double] = Array(0.8819836785938481, 0.07082521117608837,
>> 0.29833546734955185)
>>
>> However, I have understood that the models are not fitted in parallel as
>> data_kfolded is not an RDD (although it's an array of pairs of RDDs). When
>> running the same code where data_kfolded has been replaced with
>> sc.parallelize(data_kfolded), I get a null pointer exception from the line
>> where the run method of the SVMWithSGD object is called with the traning
>> data. I guess this is somehow related to the fact that RDDs can't be
>> accessed from inside a closure. I fail to understand though why the first
>> version works and the second doesn't. Most importantly, is there a way to
>> fit the models in parallel? I would really appreciate your help.
>>
>> val validationErrors =
>> sc.parallelize(data_kfolded).map { datafold =>
>>   val svmAlg = new SVMWithSGD()
>>   val model_reg = svmAlg.run(datafold._1) // This line gives null pointer
>> exception
>>   val labelAndPreds = datafold._2.map { point =>
>> val prediction = model_reg.predict(point.features)
>> (point.label, prediction)
>>   }
>>   val trainErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble /
>> datafold._2.count
>>

Re: taking top k values of rdd

2014-07-05 Thread Nick Pentreath
Right. That is unavoidable unless as you say you repartition into 1 partition, 
which may do the trick.


When I say send the top k per partition I don't mean send the pq but the actual 
values. This may end up being relatively small if k and p are not too big. (I'm 
not sure how large serialized pq is).
—
Sent from Mailbox

On Sat, Jul 5, 2014 at 10:29 AM, Koert Kuipers  wrote:

> hey nick,
> you are right. i didnt explain myself well and my code example was wrong...
> i am keeping a priority-queue with k items per partition (using
> com.twitter.algebird.mutable.PriorityQueueMonoid.build to limit the sizes
> of the queues).
> but this still means i am sending k items per partition to my driver, so k
> x p, while i only need k.
> thanks! koert
> On Sat, Jul 5, 2014 at 1:21 PM, Nick Pentreath 
> wrote:
>> To make it efficient in your case you may need to do a bit of custom code
>> to emit the top k per partition and then only send those to the driver. On
>> the driver you can just top k the combined top k from each partition
>> (assuming you have (object, count) for each top k list).
>>
>> —
>> Sent from Mailbox 
>>
>>
>> On Sat, Jul 5, 2014 at 10:17 AM, Koert Kuipers  wrote:
>>
>>> my initial approach to taking top k values of a rdd was using a
>>> priority-queue monoid. along these lines:
>>>
>>>  rdd.mapPartitions({ items => Iterator.single(new PriorityQueue(...)) },
>>> false).reduce(monoid.plus)
>>>
>>> this works fine, but looking at the code for reduce it first reduces
>>> within a partition (which doesnt help me) and then sends the results to the
>>> driver where these again get reduced. this means that for every partition
>>> the (potentially very bulky) priorityqueue gets shipped to the driver.
>>>
>>> my driver is client side, not inside cluster, and i cannot change this,
>>> so this shipping to driver of all these queues can be expensive.
>>>
>>> is there a better way to do this? should i try to a shuffle first to
>>> reduce the partitions to the minimal amount (since number of queues shipped
>>> is equal to number of partitions)?
>>>
>>> is was a way to reduce to a single item RDD, so the queues stay inside
>>> cluster and i can retrieve the final result with RDD.first?
>>>
>>
>>

Re: taking top k values of rdd

2014-07-05 Thread Koert Kuipers
so i was thinking along these lines, assuming i start with p partitions:
1) create a priority queue of size k per partition
2) repartition to create one partition
3) reduce

i guess the worry is that in step 2 the one partition needs to hold p
priority queues of size k in memory...
the benefit is that the p priority queues do not get send to the driver
(which is not on cluster)


On Sat, Jul 5, 2014 at 1:20 PM, Koert Kuipers  wrote:

> i guess i could create a single priorityque per partition, then shuffle to
> a new rdd with 1 partition, and then reduce?
>
>
> On Sat, Jul 5, 2014 at 1:16 PM, Koert Kuipers  wrote:
>
>> my initial approach to taking top k values of a rdd was using a
>> priority-queue monoid. along these lines:
>>
>> rdd.mapPartitions({ items => Iterator.single(new PriorityQueue(...)) },
>> false).reduce(monoid.plus)
>>
>> this works fine, but looking at the code for reduce it first reduces
>> within a partition (which doesnt help me) and then sends the results to the
>> driver where these again get reduced. this means that for every partition
>> the (potentially very bulky) priorityqueue gets shipped to the driver.
>>
>> my driver is client side, not inside cluster, and i cannot change this,
>> so this shipping to driver of all these queues can be expensive.
>>
>> is there a better way to do this? should i try to a shuffle first to
>> reduce the partitions to the minimal amount (since number of queues shipped
>> is equal to number of partitions)?
>>
>> is was a way to reduce to a single item RDD, so the queues stay inside
>> cluster and i can retrieve the final result with RDD.first?
>>
>
>


Re: taking top k values of rdd

2014-07-05 Thread Koert Kuipers
hey nick,
you are right. i didnt explain myself well and my code example was wrong...
i am keeping a priority-queue with k items per partition (using
com.twitter.algebird.mutable.PriorityQueueMonoid.build to limit the sizes
of the queues).
but this still means i am sending k items per partition to my driver, so k
x p, while i only need k.
thanks! koert



On Sat, Jul 5, 2014 at 1:21 PM, Nick Pentreath 
wrote:

> To make it efficient in your case you may need to do a bit of custom code
> to emit the top k per partition and then only send those to the driver. On
> the driver you can just top k the combined top k from each partition
> (assuming you have (object, count) for each top k list).
>
> —
> Sent from Mailbox 
>
>
> On Sat, Jul 5, 2014 at 10:17 AM, Koert Kuipers  wrote:
>
>> my initial approach to taking top k values of a rdd was using a
>> priority-queue monoid. along these lines:
>>
>>  rdd.mapPartitions({ items => Iterator.single(new PriorityQueue(...)) },
>> false).reduce(monoid.plus)
>>
>> this works fine, but looking at the code for reduce it first reduces
>> within a partition (which doesnt help me) and then sends the results to the
>> driver where these again get reduced. this means that for every partition
>> the (potentially very bulky) priorityqueue gets shipped to the driver.
>>
>> my driver is client side, not inside cluster, and i cannot change this,
>> so this shipping to driver of all these queues can be expensive.
>>
>> is there a better way to do this? should i try to a shuffle first to
>> reduce the partitions to the minimal amount (since number of queues shipped
>> is equal to number of partitions)?
>>
>> is was a way to reduce to a single item RDD, so the queues stay inside
>> cluster and i can retrieve the final result with RDD.first?
>>
>
>


Re: taking top k values of rdd

2014-07-05 Thread Nick Pentreath
To make it efficient in your case you may need to do a bit of custom code to 
emit the top k per partition and then only send those to the driver. On the 
driver you can just top k the combined top k from each partition (assuming you 
have (object, count) for each top k list).

—
Sent from Mailbox

On Sat, Jul 5, 2014 at 10:17 AM, Koert Kuipers  wrote:

> my initial approach to taking top k values of a rdd was using a
> priority-queue monoid. along these lines:
> rdd.mapPartitions({ items => Iterator.single(new PriorityQueue(...)) },
> false).reduce(monoid.plus)
> this works fine, but looking at the code for reduce it first reduces within
> a partition (which doesnt help me) and then sends the results to the driver
> where these again get reduced. this means that for every partition the
> (potentially very bulky) priorityqueue gets shipped to the driver.
> my driver is client side, not inside cluster, and i cannot change this, so
> this shipping to driver of all these queues can be expensive.
> is there a better way to do this? should i try to a shuffle first to reduce
> the partitions to the minimal amount (since number of queues shipped is
> equal to number of partitions)?
> is was a way to reduce to a single item RDD, so the queues stay inside
> cluster and i can retrieve the final result with RDD.first?

Re: taking top k values of rdd

2014-07-05 Thread Koert Kuipers
i guess i could create a single priorityque per partition, then shuffle to
a new rdd with 1 partition, and then reduce?


On Sat, Jul 5, 2014 at 1:16 PM, Koert Kuipers  wrote:

> my initial approach to taking top k values of a rdd was using a
> priority-queue monoid. along these lines:
>
> rdd.mapPartitions({ items => Iterator.single(new PriorityQueue(...)) },
> false).reduce(monoid.plus)
>
> this works fine, but looking at the code for reduce it first reduces
> within a partition (which doesnt help me) and then sends the results to the
> driver where these again get reduced. this means that for every partition
> the (potentially very bulky) priorityqueue gets shipped to the driver.
>
> my driver is client side, not inside cluster, and i cannot change this, so
> this shipping to driver of all these queues can be expensive.
>
> is there a better way to do this? should i try to a shuffle first to
> reduce the partitions to the minimal amount (since number of queues shipped
> is equal to number of partitions)?
>
> is was a way to reduce to a single item RDD, so the queues stay inside
> cluster and i can retrieve the final result with RDD.first?
>


taking top k values of rdd

2014-07-05 Thread Koert Kuipers
my initial approach to taking top k values of a rdd was using a
priority-queue monoid. along these lines:

rdd.mapPartitions({ items => Iterator.single(new PriorityQueue(...)) },
false).reduce(monoid.plus)

this works fine, but looking at the code for reduce it first reduces within
a partition (which doesnt help me) and then sends the results to the driver
where these again get reduced. this means that for every partition the
(potentially very bulky) priorityqueue gets shipped to the driver.

my driver is client side, not inside cluster, and i cannot change this, so
this shipping to driver of all these queues can be expensive.

is there a better way to do this? should i try to a shuffle first to reduce
the partitions to the minimal amount (since number of queues shipped is
equal to number of partitions)?

is was a way to reduce to a single item RDD, so the queues stay inside
cluster and i can retrieve the final result with RDD.first?


Re: How to parallelize model fitting with different cross-validation folds?

2014-07-05 Thread Evan R. Sparks
To be clear - each of the RDDs is still a distributed dataset and each of
the individual SVM models will be trained in parallel across the cluster.
Sean's suggestion effectively has you submitting multiple spark jobs
simultaneously, which, depending on your cluster configuration and the size
of your dataset, may or may not be a good idea.

There are some tricks you can do to make training multiple models on the
same dataset faster, which we're hoping to expose to users in an upcoming
release.

- Evan


On Sat, Jul 5, 2014 at 1:50 AM, Sean Owen  wrote:

> If you call .par on data_kfolded it will become a parallel collection in
> Scala and so the maps will happen in parallel .
> On Jul 5, 2014 9:35 AM, "sparkuser2345"  wrote:
>
>> Hi,
>>
>> I am trying to fit a logistic regression model with cross validation in
>> Spark 0.9.0 using SVMWithSGD. I have created an array data_kfolded where
>> each element is a pair of RDDs containing the training and test data:
>>
>> (training_data: (RDD[org.apache.spark.mllib.regression.LabeledPoint],
>> test_data: (RDD[org.apache.spark.mllib.regression.LabeledPoint])
>>
>> scala> data_kfolded
>> res21:
>>
>> Array[(org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint],
>> org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint])]
>> =
>> Array((MappedRDD[9] at map at :24,MappedRDD[7] at map at
>> :23), (MappedRDD[13] at map at :24,MappedRDD[11] at map
>> at
>> :23), (MappedRDD[17] at map at :24,MappedRDD[15] at map
>> at
>> :23))
>>
>> Everything works fine when using data_kfolded:
>>
>> val validationErrors =
>> data_kfolded.map { datafold =>
>>   val svmAlg = new SVMWithSGD()
>>   val model_reg = svmAlg.run(datafold._1)
>>   val labelAndPreds = datafold._2.map { point =>
>> val prediction = model_reg.predict(point.features)
>> (point.label, prediction)
>>   }
>>   val trainErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble /
>> datafold._2.count
>>   trainErr.toDouble
>> }
>>
>> scala> validationErrors
>> res1: Array[Double] = Array(0.8819836785938481, 0.07082521117608837,
>> 0.29833546734955185)
>>
>> However, I have understood that the models are not fitted in parallel as
>> data_kfolded is not an RDD (although it's an array of pairs of RDDs). When
>> running the same code where data_kfolded has been replaced with
>> sc.parallelize(data_kfolded), I get a null pointer exception from the line
>> where the run method of the SVMWithSGD object is called with the traning
>> data. I guess this is somehow related to the fact that RDDs can't be
>> accessed from inside a closure. I fail to understand though why the first
>> version works and the second doesn't. Most importantly, is there a way to
>> fit the models in parallel? I would really appreciate your help.
>>
>> val validationErrors =
>> sc.parallelize(data_kfolded).map { datafold =>
>>   val svmAlg = new SVMWithSGD()
>>   val model_reg = svmAlg.run(datafold._1) // This line gives null pointer
>> exception
>>   val labelAndPreds = datafold._2.map { point =>
>> val prediction = model_reg.predict(point.features)
>> (point.label, prediction)
>>   }
>>   val trainErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble /
>> datafold._2.count
>>   trainErr.toDouble
>> }
>> validationErrors.collect
>>
>> java.lang.NullPointerException
>> at org.apache.spark.rdd.RDD.firstParent(RDD.scala:971)
>> at
>> org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
>> at scala.Option.getOrElse(Option.scala:120)
>> at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
>> at org.apache.spark.rdd.RDD.take(RDD.scala:824)
>> at org.apache.spark.rdd.RDD.first(RDD.scala:856)
>> at
>>
>> org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:121)
>> at
>> $line28.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:36)
>> at
>> $line28.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:34)
>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>> at
>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>> at
>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>> at
>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>> at
>> scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
>> at scala.collection.AbstractIterator.to(Iterator.scala:1157)
>> at
>> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>>   

[no subject]

2014-07-05 Thread Konstantin Kudryavtsev
I faced in very strange behavior of job that I was run on YARN hadoop
cluster. One of stages (map function) was split in 80 tasks, 10 of them
successfully finished in ~2 min, but all other jobs are running > 40 min
and still not finished... I suspect they hung on.
Any ideas what's going on and how can it be fixed?


Thank you,
Konstantin Kudryavtsev


Re: Graphx traversal and merge interesting edges

2014-07-05 Thread HHB
Thanks Ankur,

Cannot thank you enough for this!!! I am reading your example still digesting & 
grokking it though :-)

I was breaking my head over this for past few hours.

In my last futile attempts over past few hours. I was looking at Pregel... E.g 
if that could be used to see at what step of a path match the vertex is in and 
send message to next vertex with the history of traversal.. then for merging 
message append the historical traversal path of for each message :-P. 

--Gautam

On 05-Jul-2014, at 3:23 pm, Ankur Dave  wrote:

> Interesting problem! My understanding is that you want to (1) find paths 
> matching a particular pattern, and (2) add edges between the start and end 
> vertices of the matched paths.
> 
> For (1), I implemented a pattern matcher for GraphX that iteratively 
> accumulates partial pattern matches. I used your example in the unit test.
> 
> For (2), you can take the output of the pattern matcher (the set of matching 
> paths organized by their terminal vertices) and construct a set of new edges 
> using the initial and terminal vertices of each path. Then you can make a new 
> graph consisting of the union of the original edge set and the new edges. Let 
> me know if you'd like help with this.
> 
> Ankur
> 



Re: Graphx traversal and merge interesting edges

2014-07-05 Thread Ankur Dave
Interesting problem! My understanding is that you want to (1) find paths
matching a particular pattern, and (2) add edges between the start and end
vertices of the matched paths.

For (1), I implemented a pattern matcher for GraphX

that iteratively accumulates partial pattern matches. I used your example
in the unit test

.

For (2), you can take the output of the pattern matcher (the set of
matching paths organized by their terminal vertices) and construct a set of
new edges using the initial and terminal vertices of each path. Then you
can make a new graph consisting of the union of the original edge set and
the new edges. Let me know if you'd like help with this.

Ankur 


Spark 1.0 failed on HDP 2.0 with absurd exception

2014-07-05 Thread Konstantin Kudryavtsev
Hi all,

I have cluster with HDP 2.0. I built Spark 1.0 on edge node and trying to
run with a command
./bin/spark-submit --class test.etl.RunETL --master yarn-cluster
--num-executors 14 --driver-memory 3200m --executor-memory 3g
--executor-cores 2 my-etl-1.0-SNAPSHOT-hadoop2.2.0.jar

in result I got failed YARN application with following stack trace

Application application_1404481778533_0068 failed 3 times due to AM
Container for appattempt_1404481778533_0068_03 exited with exitCode: 1
due to: Exception from container-launch:
org.apache.hadoop.util.Shell$ExitCodeException:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:464)
at org.apache.hadoop.util.Shell.run(Shell.java:379)
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:589)
at
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:195)
at
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:283)
at
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:79)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
 .Failing this attempt.. Failing the application

Log Type: stderr

Log Length: 686

Unknown/unsupported param List(--executor-memory, 3072,
--executor-cores, 2, --num-executors, 14)
Usage: org.apache.spark.deploy.yarn.ApplicationMaster [options]
Options:
  --jar JAR_PATH   Path to your application's JAR file (required)
  --class CLASS_NAME   Name of your application's main class (required)
  --args ARGS  Arguments to be passed to your application's main class.
   Mutliple invocations are possible, each will be
passed in order.
  --num-workers NUMNumber of workers to start (Default: 2)
  --worker-cores NUM   Number of cores for the workers (Default: 1)
  --worker-memory MEM  Memory per Worker (e.g. 1000M, 2G) (Default: 1G)


Seems like the old spark notation any ideas?

Thank you,
Konstantin Kudryavtsev


Re: How to parallelize model fitting with different cross-validation folds?

2014-07-05 Thread Sean Owen
If you call .par on data_kfolded it will become a parallel collection in
Scala and so the maps will happen in parallel .
On Jul 5, 2014 9:35 AM, "sparkuser2345"  wrote:

> Hi,
>
> I am trying to fit a logistic regression model with cross validation in
> Spark 0.9.0 using SVMWithSGD. I have created an array data_kfolded where
> each element is a pair of RDDs containing the training and test data:
>
> (training_data: (RDD[org.apache.spark.mllib.regression.LabeledPoint],
> test_data: (RDD[org.apache.spark.mllib.regression.LabeledPoint])
>
> scala> data_kfolded
> res21:
>
> Array[(org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint],
> org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint])]
> =
> Array((MappedRDD[9] at map at :24,MappedRDD[7] at map at
> :23), (MappedRDD[13] at map at :24,MappedRDD[11] at map
> at
> :23), (MappedRDD[17] at map at :24,MappedRDD[15] at map
> at
> :23))
>
> Everything works fine when using data_kfolded:
>
> val validationErrors =
> data_kfolded.map { datafold =>
>   val svmAlg = new SVMWithSGD()
>   val model_reg = svmAlg.run(datafold._1)
>   val labelAndPreds = datafold._2.map { point =>
> val prediction = model_reg.predict(point.features)
> (point.label, prediction)
>   }
>   val trainErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble /
> datafold._2.count
>   trainErr.toDouble
> }
>
> scala> validationErrors
> res1: Array[Double] = Array(0.8819836785938481, 0.07082521117608837,
> 0.29833546734955185)
>
> However, I have understood that the models are not fitted in parallel as
> data_kfolded is not an RDD (although it's an array of pairs of RDDs). When
> running the same code where data_kfolded has been replaced with
> sc.parallelize(data_kfolded), I get a null pointer exception from the line
> where the run method of the SVMWithSGD object is called with the traning
> data. I guess this is somehow related to the fact that RDDs can't be
> accessed from inside a closure. I fail to understand though why the first
> version works and the second doesn't. Most importantly, is there a way to
> fit the models in parallel? I would really appreciate your help.
>
> val validationErrors =
> sc.parallelize(data_kfolded).map { datafold =>
>   val svmAlg = new SVMWithSGD()
>   val model_reg = svmAlg.run(datafold._1) // This line gives null pointer
> exception
>   val labelAndPreds = datafold._2.map { point =>
> val prediction = model_reg.predict(point.features)
> (point.label, prediction)
>   }
>   val trainErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble /
> datafold._2.count
>   trainErr.toDouble
> }
> validationErrors.collect
>
> java.lang.NullPointerException
> at org.apache.spark.rdd.RDD.firstParent(RDD.scala:971)
> at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
> at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
> at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
> at org.apache.spark.rdd.RDD.take(RDD.scala:824)
> at org.apache.spark.rdd.RDD.first(RDD.scala:856)
> at
>
> org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:121)
> at
> $line28.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:36)
> at
> $line28.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:34)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
> at
> scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
> at scala.collection.AbstractIterator.to(Iterator.scala:1157)
> at
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
> at
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
> at org.apache.spark.rdd.RDD$$anonfun$4.apply(RDD.scala:602)
> at org.apache.spark.rdd.RDD$$anonfun$4.apply(RDD.scala:602)
> at
>
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:888)
> at
>
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:888)
> at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
> at org.apache.spark.scheduler.Task.run(Task.scala:53)
> at
>
> org.apache.spa

How to parallelize model fitting with different cross-validation folds?

2014-07-05 Thread sparkuser2345
Hi, 

I am trying to fit a logistic regression model with cross validation in
Spark 0.9.0 using SVMWithSGD. I have created an array data_kfolded where
each element is a pair of RDDs containing the training and test data: 

(training_data: (RDD[org.apache.spark.mllib.regression.LabeledPoint],
test_data: (RDD[org.apache.spark.mllib.regression.LabeledPoint])

scala> data_kfolded
res21:
Array[(org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint],
org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint])] =
Array((MappedRDD[9] at map at :24,MappedRDD[7] at map at
:23), (MappedRDD[13] at map at :24,MappedRDD[11] at map at
:23), (MappedRDD[17] at map at :24,MappedRDD[15] at map at
:23))

Everything works fine when using data_kfolded: 

val validationErrors = 
data_kfolded.map { datafold => 
  val svmAlg = new SVMWithSGD() 
  val model_reg = svmAlg.run(datafold._1)
  val labelAndPreds = datafold._2.map { point =>
val prediction = model_reg.predict(point.features)
(point.label, prediction)
  }
  val trainErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble /
datafold._2.count
  trainErr.toDouble
}

scala> validationErrors
res1: Array[Double] = Array(0.8819836785938481, 0.07082521117608837,
0.29833546734955185)

However, I have understood that the models are not fitted in parallel as
data_kfolded is not an RDD (although it's an array of pairs of RDDs). When
running the same code where data_kfolded has been replaced with
sc.parallelize(data_kfolded), I get a null pointer exception from the line
where the run method of the SVMWithSGD object is called with the traning
data. I guess this is somehow related to the fact that RDDs can't be
accessed from inside a closure. I fail to understand though why the first
version works and the second doesn't. Most importantly, is there a way to
fit the models in parallel? I would really appreciate your help. 

val validationErrors = 
sc.parallelize(data_kfolded).map { datafold => 
  val svmAlg = new SVMWithSGD() 
  val model_reg = svmAlg.run(datafold._1) // This line gives null pointer
exception
  val labelAndPreds = datafold._2.map { point =>
val prediction = model_reg.predict(point.features)
(point.label, prediction)
  }
  val trainErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble /
datafold._2.count
  trainErr.toDouble
}
validationErrors.collect

java.lang.NullPointerException
at org.apache.spark.rdd.RDD.firstParent(RDD.scala:971)
at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
at org.apache.spark.rdd.RDD.take(RDD.scala:824)
at org.apache.spark.rdd.RDD.first(RDD.scala:856)
at
org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:121)
at
$line28.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:36)
at
$line28.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:34)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at
scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$4.apply(RDD.scala:602)
at org.apache.spark.rdd.RDD$$anonfun$4.apply(RDD.scala:602)
at
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:888)
at
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:888)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
at org.apache.spark.scheduler.Task.run(Task.scala:53)
at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.jav