Re: Problem with using Spark ML

2015-04-23 Thread Staffan
So I got the tip of trying to reduce step-size and that finally gave some
more decent results, had hoped for the default params to give at least OK
results and thought that the problem must be somewhere else in the code.
Problem solved!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Problem-with-using-Spark-ML-tp22591p22628.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Problem with using Spark ML

2015-04-21 Thread Staffan
Hi,
I've written an application that performs some machine learning on some
data. I've validated that the data _should_ give a good output with a decent
RMSE by using Lib-SVM:
Mean squared error = 0.00922063 (regression)
Squared correlation coefficient = 0.9987 (regression)

When I try to use Spark ML to do the exact same thing I get:
Mean Squared Error = 8.466193152067944E224

Which is somewhat worse.. I've tried to look at the data before it's
inputted to the model, printed that data to file (which is actually the data
used when I got the result from Lib-SVM above). Somewhere there much be a
huge mistake, but I cannot place it somewhere in my code (see below).
traningLP and testLP are training and test-data, in RDD[LabeledPoint].  

// Generate model
val model_gen = new RidgeRegressionWithSGD();
val model = model_gen.run(trainingLP);

// Predict on the test-data
val valuesAndPreds = testLP.map { point =
val prediction = model.predict(point.features);
println(label:  + point.label + , pred:  + prediction);
(point.label, prediction);
}
val MSE = valuesAndPreds.map{case(v, p) = math.pow((v - p), 2)}.mean();
println(Mean Squared Error =  + MSE) 


I've printed label and prediction-values for each data-point in the testset,
and the result is something like this;
label: 5.04, pred: -4.607899000641277E112
label: 3.59, pred: -3.96787105480399E112
label: 5.06, pred: -2.8263294374576145E112
label: 2.85, pred: -1.1536508029072844E112
label: 2.1, pred: -4.269312783707508E111
label: 2.75, pred: -3.0072665148591558E112
label: -0.29, pred: -2.035681731641989E112
label: 1.98, pred: -3.163404340354783E112

So there is obviously something wrong with the prediction step. I'm using
the SparseVector representation of the Vector in LabeledPoint, looking
something like this for reference (shortened for convenience);
(-1.59,(2080,[29,59,62,74,127,128,131,144,149,175,198,200,239,247,267,293,307,364,374,393,410,424,425,431,448,469,477,485,501,525,532,533,538,560,..],[1.0,1.0,2.0,8.0,1.0,1.0,6.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,5.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,3.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0,8.0,2.0,1.0,1.0,..]))
(-1.75,(2080,[103,131,149,208,296,335,520,534,603,620,661,694,709,748,859,1053,1116,1156,1186,1207,1208,1223,1256,1278,1356,1375,1399,1480,1569,..],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,4.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,2.0,1.0,2.0,2.0,2.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0,2.0,4.0,1.0,7.0,1.0,3.0,2.0,1.0]))

I do get one type of warning, but that's about it! (And as to my
understanding, this native code is not required to get the correct results,
only to improve performance). 
6010 [main] WARN  com.github.fommil.netlib.BLAS  - Failed to load
implementation from: com.github.fommil.netlib.NativeSystemBLAS
6011 [main] WARN  com.github.fommil.netlib.BLAS  - Failed to load
implementation from: com.github.fommil.netlib.NativeRefBLAS

So where do I go from here? 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Problem-with-using-Spark-ML-tp22591.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Pipelines for controlling workflow

2015-04-07 Thread Staffan
Hi,
I am building a pipeline and I've read most that I can find on the topic
(spark.ml library and the AMPcamp version of pipelines:
http://ampcamp.berkeley.edu/5/exercises/image-classification-with-pipelines.html).
I do not have structured data as in the case of the new Spark.ml library
which uses SchemaRDD/DataFrames so the second alternative seems the most
convenient to me. I'm writing in Scala. 

The problem I have is that I want to build a pipeline that can be able to be
branched in (at least) two ways;
1. One of my steps outputs an Either-type (where the output is either an
object containing statistics to why this step/data failed or contain the
expected output). So I would like to branch the pipeline to either skip the
rest of the pipeline and continue in a reporting-step (write a report with
the help of the statistics object) or that the pipeline is continued to the
next step in the pipeline. In the generic case this could of course be two
independent pipelines (like a first pipeline-node that takes multiple
datatypes and passes the input to the correct pipeline in the following
step). 
 
2. The other way I would like to branch the pipeline is to send the same
data to multiple new pipeline-nodes. These nodes are not dependent on each
other so they should just branch of. In the generic case this could be two
new pipelines themselves. 

Has anyone tried this or have a nice idea of how this could be performed? I
like the simplicity in the AMPcamp-pipeline which relies on type-safety,
but I'm confused about how to create a branching pipeline using only
type-declarations. 

Thanks,
Staffan  



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Pipelines-for-controlling-workflow-tp22403.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to efficiently control concurrent Spark jobs

2015-02-25 Thread Staffan
Hi,
Is there a good way (recommended way) to control and run multiple Spark jobs
within the same application? My application is like follows;

1) Run one Spark job on a 'ful' dataset, which then creates a few thousands
of RDDs containing sub-datasets from the complete dataset. Each of the
sub-datasets are independent from the others (the 'ful' dataset is simply a
dump from a database containing several different types of records). 
2) Run some filtration and manipulations on each of the RDD and finally do
some ML on the data. (Each of the created RDD's from step 1) is completely
independent so this should be run concurrently). 

I've implemented this by using Scala Futures and executing the Spark jobs in
2) from a separate thread for each RDD. This works and improves runtime
compared to a naive for-loop over step 2). Scaling is however not as good as
I would expect it to be. (28 minutes for 4 cores on 1 machine - 19 minutes
for 12 cores on 3 machines). 

Each of the sub-datasets are fairly small so I've used 'repartition' and
'cache' to store the sub-datasets on only one machine in step 1), this
improved runtime a few %. 

So, either do anyone have a suggestion of how to do this in a better way or
perhaps if there a higher level workflow tool that I can use on top of
Spark? (The cool solution would have been to use nestled RDDs and just map
over them in a high level way, but as this is not supported afaik).

Thanks!
Staffan 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-efficiently-control-concurrent-Spark-jobs-tp21800.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Issues when combining Spark and a third party java library

2015-01-27 Thread Staffan
To clarify: I'm currently working on this locally, running on a laptop and I
do not use Spark-submit (using Eclipse to run my applications currently).
I've tried running both on Mac OS X and in a VM running Ubuntu. Furthermore,
I've got the VM from a fellow worker which has no issues running his Spark
and Hadoop applications on his machine while using the same VM. (So all the
issues seems to pop up by just introducing a new dependency in the project). 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Issues-when-combining-Spark-and-a-third-party-java-library-tp21367p21384.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Issues when combining Spark and a third party java library

2015-01-27 Thread Staffan
Okay, I finally tried to change the Hadoop-client version from 2.4.0 to 2.5.2
and that mysteriously fixed everything.. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Issues-when-combining-Spark-and-a-third-party-java-library-tp21367p21387.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Issues when combining Spark and a third party java library

2015-01-26 Thread Staffan
I'm using Maven and Eclipse to build my project. I'm letting Maven download
all the things I need for running everything, which has worked fine up until
now. I need to use the CDK library (https://github.com/egonw/cdk,
http://sourceforge.net/projects/cdk/) and once I add the dependencies to my
pom.xml Spark starts to complain (this is without calling any function or
importing any new library into my code, only by introducing new dependencies
to the pom.xml). Trying to set up a SparkContext give me the following
errors in the log: 

[main] DEBUG org.apache.spark.rdd.HadoopRDD - SplitLocationInfo and other
new Hadoop classes are unavailable. Using the older Hadoop location info
code.
java.lang.ClassNotFoundException:
org.apache.hadoop.mapred.InputSplitWithLocationInfo
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:191)
at
org.apache.spark.rdd.HadoopRDD$SplitInfoReflections.init(HadoopRDD.scala:381)
at org.apache.spark.rdd.HadoopRDD$.liftedTree1$1(HadoopRDD.scala:391)
at org.apache.spark.rdd.HadoopRDD$.init(HadoopRDD.scala:390)
at org.apache.spark.rdd.HadoopRDD$.clinit(HadoopRDD.scala)
at org.apache.spark.rdd.HadoopRDD.getJobConf(HadoopRDD.scala:159)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:194)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:203)
at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:203)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1328)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:765)

later in the log:
[Executor task launch worker-0] DEBUG
org.apache.spark.deploy.SparkHadoopUtil - Couldn't find method for
retrieving thread-level FileSystem input data
java.lang.NoSuchMethodException:
org.apache.hadoop.fs.FileSystem$Statistics.getThreadStatistics()
at java.lang.Class.getDeclaredMethod(Class.java:2009)
at org.apache.spark.util.Utils$.invoke(Utils.scala:1733)
at
org.apache.spark.deploy.SparkHadoopUtil$$anonfun$getFileSystemThreadStatistics$1.apply(SparkHadoopUtil.scala:178)
at
org.apache.spark.deploy.SparkHadoopUtil$$anonfun$getFileSystemThreadStatistics$1.apply(SparkHadoopUtil.scala:178)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at
org.apache.spark.deploy.SparkHadoopUtil.getFileSystemThreadStatistics(SparkHadoopUtil.scala:178)
at
org.apache.spark.deploy.SparkHadoopUtil.getFSBytesReadOnThreadCallback(SparkHadoopUtil.scala:138)
at org.apache.spark.rdd.HadoopRDD$$anon$1.init(HadoopRDD.scala:220)
at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:210)
at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:99)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

There has also been issues related to HADOOP_HOME not being set etc., but
which seems to be intermittent and only occur sometimes.


After testing different versions of both CDK and Spark, I've found out that
the Spark version 0.9.1 and earlier DO NOT have this problem, so there is
something in the newer versions of Spark that do not play well with
others... However, I need the functionality in the later versions of Spark
so this do not solve my problem. Anyone willing