Dropping parquet file partitions

2016-03-01 Thread sparkuser2345
Is there a way to drop parquet file partitions through Spark? I'm
partitioning a parquet file by a date field and I would like to drop old
partitions in a file system agnostic manner. I guess I could read the whole
parquet file into a DataFrame, filter out the dates to be dropped, and
overwrite the parquet file, but that is quite a heavy task if there are lots
of partitions and I only need to drop a couple. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Dropping-parquet-file-partitions-tp26368.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Experiences about NoSQL databases with Spark

2015-11-24 Thread sparkuser2345
I'm interested in knowing which NoSQL databases you use with Spark and what
are your experiences. 

On a general level, I would like to use Spark streaming to process incoming
data, fetch relevant aggregated data from the database, and update the
aggregates in the DB based on the incoming records. The data in the DB
should be indexed to be able to fetch the relevant data fast and to allow
fast interactive visualization of the data. 

I've been reading about MongoDB+Spark and I've got the impression that there
are some challenges in fetching data by indices and in updating documents,
but things are moving so fast, so I don't know if these are relevant
anymore. Do you find any benefit from using HBase with Spark as HBase is
built on top of HDFS? 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Experiences-about-NoSQL-databases-with-Spark-tp25462.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Preventing an RDD from shuffling

2015-12-16 Thread sparkuser2345
Is there a way to prevent an RDD from shuffling in a join operation without
repartitioning it? 

I'm reading an RDD from sharded MongoDB, joining that with an RDD of
incoming data (+ some additional calculations), and writing the resulting
RDD back to MongoDB. It would make sense to shuffle only the incoming data
RDD so that the joined RDD would already be partitioned correctly according
to the MondoDB shard key. 

I know I can prevent an RDD from shuffling in a join operation by
partitioning it beforehand but partitioning would already shuffle the RDD.
In addition, I'm only doing the join once per RDD read from MongoDB. Is
there a way to tell Spark to shuffle only the incoming data RDD?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Preventing-an-RDD-from-shuffling-tp25717.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to parallelize model fitting with different cross-validation folds?

2014-07-05 Thread sparkuser2345
Hi, 

I am trying to fit a logistic regression model with cross validation in
Spark 0.9.0 using SVMWithSGD. I have created an array data_kfolded where
each element is a pair of RDDs containing the training and test data: 

(training_data: (RDD[org.apache.spark.mllib.regression.LabeledPoint],
test_data: (RDD[org.apache.spark.mllib.regression.LabeledPoint])

scala> data_kfolded
res21:
Array[(org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint],
org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint])] =
Array((MappedRDD[9] at map at :24,MappedRDD[7] at map at
:23), (MappedRDD[13] at map at :24,MappedRDD[11] at map at
:23), (MappedRDD[17] at map at :24,MappedRDD[15] at map at
:23))

Everything works fine when using data_kfolded: 

val validationErrors = 
data_kfolded.map { datafold => 
  val svmAlg = new SVMWithSGD() 
  val model_reg = svmAlg.run(datafold._1)
  val labelAndPreds = datafold._2.map { point =>
val prediction = model_reg.predict(point.features)
(point.label, prediction)
  }
  val trainErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble /
datafold._2.count
  trainErr.toDouble
}

scala> validationErrors
res1: Array[Double] = Array(0.8819836785938481, 0.07082521117608837,
0.29833546734955185)

However, I have understood that the models are not fitted in parallel as
data_kfolded is not an RDD (although it's an array of pairs of RDDs). When
running the same code where data_kfolded has been replaced with
sc.parallelize(data_kfolded), I get a null pointer exception from the line
where the run method of the SVMWithSGD object is called with the traning
data. I guess this is somehow related to the fact that RDDs can't be
accessed from inside a closure. I fail to understand though why the first
version works and the second doesn't. Most importantly, is there a way to
fit the models in parallel? I would really appreciate your help. 

val validationErrors = 
sc.parallelize(data_kfolded).map { datafold => 
  val svmAlg = new SVMWithSGD() 
  val model_reg = svmAlg.run(datafold._1) // This line gives null pointer
exception
  val labelAndPreds = datafold._2.map { point =>
val prediction = model_reg.predict(point.features)
(point.label, prediction)
  }
  val trainErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble /
datafold._2.count
  trainErr.toDouble
}
validationErrors.collect

java.lang.NullPointerException
at org.apache.spark.rdd.RDD.firstParent(RDD.scala:971)
at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
at org.apache.spark.rdd.RDD.take(RDD.scala:824)
at org.apache.spark.rdd.RDD.first(RDD.scala:856)
at
org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:121)
at
$line28.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:36)
at
$line28.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:34)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at
scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$4.apply(RDD.scala:602)
at org.apache.spark.rdd.RDD$$anonfun$4.apply(RDD.scala:602)
at
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:888)
at
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:888)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
at org.apache.spark.scheduler.Task.run(Task.scala:53)
at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.jav

Re: How to parallelize model fitting with different cross-validation folds?

2014-07-07 Thread sparkuser2345
Thank you for all the replies! 

Realizing that I can't distribute the modelling with different
cross-validation folds to the cluster nodes this way (but to the threads
only), I decided not to create nfolds data sets but to parallelize the
calculation (threadwise) over folds and to zip the original dataset with a
sequence of indices indicating fold division: 
 
val data = sc.parallelize(orig_data zip fold_division)

(1 to nfolds).par.map( fold_i => {
  val svmAlg= new SVMWithSGD() 
  val tr_data   = data.filter(x => x._2 != fold_i).map(x => x._1) 
  val test_data = data.filter(x => x._2 == fold_i).map(x => x._1)
  val model = svmAlg.run(tr_data)
  val labelAndPreds = test_data.map { point =>
val prediction = model.predict(point.features)
(point.label, prediction)
  }
  val trainErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble /
test_data.count
  trainErr.toDouble 
})

Really looking forward to the new functionalities in Spark 1.1!  



Nick Pentreath wrote
> For linear models the 3rd option is by far most efficient and I suspect
> what Evan is alluding to. 
> 
> 
> Unfortunately it's not directly possible with the classes in Mllib now so
> you'll have to roll your own using underlying sgd / bfgs primitives.
> —
> Sent from Mailbox
> 
> On Sat, Jul 5, 2014 at 10:45 AM, Christopher Nguyen <

> ctn@

> >
> wrote:
> 
>> Hi sparkuser2345,
>> I'm inferring the problem statement is something like "how do I make this
>> complete faster (given my compute resources)?"
>> Several comments.
>> First, Spark only allows launching parallel tasks from the driver, not
>> from
>> workers, which is why you're seeing the exception when you try. Whether
>> the
>> latter is a sensible/doable idea is another discussion, but I can
>> appreciate why many people assume this should be possible.
>> Second, on optimization, you may be able to apply Sean's idea about
>> (thread) parallelism at the driver, combined with the knowledge that
>> often
>> these cluster tasks bottleneck while competing for the same resources at
>> the same time (cpu vs disk vs network, etc.) You may be able to achieve
>> some performance optimization by randomizing these timings. This is not
>> unlike GMail randomizing user storage locations around the world for load
>> balancing. Here, you would partition each of your RDDs into a different
>> number of partitions, making some tasks larger than others, and thus some
>> may be in cpu-intensive map while others are shuffling data around the
>> network. This is rather cluster-specific; I'd be interested in what you
>> learn from such an exercise.
>> Third, I find it useful always to consider doing as much as possible in
>> one
>> pass, subject to memory limits, e.g., mapPartitions() vs map(), thus
>> minimizing map/shuffle/reduce boundaries with their context switches and
>> data shuffling. In this case, notice how you're running the
>> training+prediction k times over mostly the same rows, with map/reduce
>> boundaries in between. While the training phase is sealed in this
>> context,
>> you may be able to improve performance by collecting all the k models
>> together, and do a [m x k] predictions all at once which may end up being
>> faster.
>> Finally, as implied from the above, for the very common k-fold
>> cross-validation pattern, the algorithm itself might be written to be
>> smart
>> enough to take both train and test data and "do the right thing" within
>> itself, thus obviating the need for the user to prepare k data sets and
>> running over them serially, and likely saving a lot of repeated
>> computations in the right internal places.
>> Enjoy,
>> --
>> Christopher T. Nguyen
>> Co-founder & CEO, Adatao <http://adatao.com>;
>> linkedin.com/in/ctnguyen
>> On Sat, Jul 5, 2014 at 1:50 AM, Sean Owen <

> sowen@

> > wrote:
>>> If you call .par on data_kfolded it will become a parallel collection in
>>> Scala and so the maps will happen in parallel .
>>> On Jul 5, 2014 9:35 AM, "sparkuser2345" <

> hm.spark.user@

> > wrote:
>>>
>>>> Hi,
>>>>
>>>> I am trying to fit a logistic regression model with cross validation in
>>>> Spark 0.9.0 using SVMWithSGD. I have created an array data_kfolded
>>>> where
>>>> each element is a pair of RDDs containing the training and test data:
>>>>
>>>> (training_data: (RDD[org.apache.spark.mllib.regression.LabeledPoint],
>>>> test_data: (RDD[org.apache

Problem reading from S3 in standalone application

2014-08-06 Thread sparkuser2345
Hi, 

I'm running Spark in an EMR cluster and I'm able to read from S3 using REPL
without problems: 

val input_file = "s3:///test_data.txt"
val rawdata = sc.textFile(input_file)  
val test = rawdata.collect

but when I try to run a simple standalone application reading the same data,
I get an error saying that I should provide the access keys: 

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

object test {

  def main(args: Array[String]) {

val master =
"spark://ec2-xx-xx-xxx-xxx.eu-west-1.compute.amazonaws.com:7077"
val sparkHome = "/home/hadoop/spark/"

val sc = new SparkContext(master, "test", sparkHome, Seq())

val input_file = "s3:///test_data.txt"
val rawdata = sc.textFile(input_file)  
val test = rawdata.collect
sc.stop() 
  }
}

[error] (run-main-0) java.lang.IllegalArgumentException: AWS Access Key ID
and Secret Access Key must be specified as the username or password
(respectively) of a s3 URL, or by setting the fs.s3.awsAccessKeyId or
fs.s3.awsSecretAccessKey properties (respectively).
java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key
must be specified as the username or password (respectively) of a s3 URL, or
by setting the fs.s3.awsAccessKeyId or fs.s3.awsSecretAccessKey properties
(respectively).
at
org.apache.hadoop.fs.s3.S3Credentials.initialize(S3Credentials.java:70)
at
org.apache.hadoop.fs.s3.Jets3tFileSystemStore.initialize(Jets3tFileSystemStore.java:93)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:190)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:103)
at com.sun.proxy.$Proxy13.initialize(Unknown Source)
at
org.apache.hadoop.fs.s3.S3FileSystem.initialize(S3FileSystem.java:92)
at
org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2397)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:89)
at
org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2431)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2413)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:368)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
at
org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:256)
at
org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
at
org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:304)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:172)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1094)
at org.apache.spark.rdd.RDD.collect(RDD.scala:717)
at test$.main(test.scala:17)
at test.main(test.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)

When I add the keys to the file name

val input_file = "s3://:@/test_data.txt"

I get an "Input path does not exist" error (keys and bucket name changed
from the error message, naturally): 

[error] (run-main-0) org.apache.hadoop.mapred.InvalidInputException: Input
path does not exist: s3://:@/test_data.txt
org.apache.hadoop.mapred.InvalidInputException: Input path does not exist:
s3://:@/test_data.txt
at
org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:285)
at
org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
at
org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:304)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:172)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at
org.apache.spark.rdd.RDD$$anonfun$par

Re: Problem reading from S3 in standalone application

2014-08-06 Thread sparkuser2345
I'm getting the same "Input path does not exist" error also after setting the
AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY environment variables and using
the format "s3:///test_data.txt"  for the input file. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Problem-reading-from-S3-in-standalone-application-tp11524p11526.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Problem reading from S3 in standalone application

2014-08-06 Thread sparkuser2345
Evan R. Sparks wrote
> Try s3n://

Thanks, that works! In REPL, I can succesfully load the data using both
s3:// and s3n://, why the difference? 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Problem-reading-from-S3-in-standalone-application-tp11524p11537.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to read a multipart s3 file?

2014-08-07 Thread sparkuser2345
Matei Zaharia wrote
> If you use s3n:// for both, you should be able to pass the exact same file
> to load as you did to save. 

I'm trying to write a file to s3n in a Spark app and to read it in another
one using the same file name, but without luck. Writing data to s3n as

val data = Array(1.0, 1.0, 1.0)
sc.parallelize(data).saveAsTextFile("s3n://:@/test")

creates the following files: 

test/_SUCCESS
test/_temporary/0/task_201408071147_m_00_$folder$
test/_temporary/0/task_201408071147_m_00/part-0
test/_temporary/0/task_201408071147_m_01_$folder$
test/_temporary/0/task_201408071147_m_01/part-1

When trying to read the file as

val data2 =
sc.textFile("s3n://:@/test")  

data2 is an empty array:

scala> data2.collect
14/08/07 11:49:56 INFO mapred.FileInputFormat: Total input paths to process
: 0
14/08/07 11:49:56 INFO spark.SparkContext: Starting job: collect at
:15
14/08/07 11:49:56 INFO spark.SparkContext: Job finished: collect at
:15, took 3.7227E-5 s
res5: Array[String] = Array()

I'm using Spark 1.0.0. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-read-a-multipart-s3-file-tp5463p11643.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to read a multipart s3 file?

2014-08-07 Thread sparkuser2345
sparkuser2345 wrote
> I'm using Spark 1.0.0.

The same works when 
- Using Spark 0.9.1.
- Saving to and reading from local file system (Spark 1.0.0)
- Saving to and reading from HDFS (Spark 1.0.0)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-read-a-multipart-s3-file-tp5463p11653.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to read a multipart s3 file?

2014-08-07 Thread sparkuser2345
Ashish Rangole wrote
> Specify a folder instead of a file name for input and output code, as in:
> 
> Output:
> s3n://your-bucket-name/your-data-folder
> 
> Input: (when consuming the above output)
> 
> s3n://your-bucket-name/your-data-folder/*

Unfortunately no luck: 

Exception in thread "main" org.apache.hadoop.mapred.InvalidInputException:
Input Pattern s3n:///test/* matches 0 files



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-read-a-multipart-s3-file-tp5463p11684.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Unable to access worker web UI or application UI (EC2)

2014-08-08 Thread sparkuser2345
I'm running spark 1.0.0 on EMR. I'm able to access the master web UI but not
the worker web UIs or the application detail UI ("Server not found"). 

I added the following inbound rule to the ElasticMapreduce-slave security
group but it didn't help: 
Type = All TCP
Port range = 0 - 65535
Source = My IP

Any suggestions?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Unable-to-access-worker-web-UI-or-application-UI-EC2-tp11773.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Parallelizing a task makes it freeze

2014-08-11 Thread sparkuser2345
I have an array 'dataAll' of key-value pairs where each value is an array of
arrays. I would like to parallelize a task over the elements of 'dataAll' to
the workers. In the dummy example below, the number of elements in 'dataAll'
is 3 but in real application it would be tens to hundreds. 

Without parallelizing dataAll, 'result' is calculated in less than a second: 

import org.jblas.DoubleMatrix  

val nY = 5000
val nX = 400

val dataAll = Array((1, Array.fill(nY)(Array.fill(nX)(1.0))),
(2, Array.fill(nY)(Array.fill(nX)(1.0))),
(3, Array.fill(nY)(Array.fill(nX)(1.0

val w1 = DoubleMatrix.ones(400)

// This finishes in less than a second: 
val result = dataAll.map { dat =>
  val c   = dat._1
  val dataArr = dat._2
  // Map over the Arrays within dataArr: 
  val test = dataArr.map { arr =>
val test2 = new DoubleMatrix(arr.length, 1, arr:_*)
val out = test2.dot(w1)
out
  }
  (c, test)
}

However, when I parallelize dataAll, the same task freezes: 

val dataAllRDD = sc.parallelize(dataAll, 3)

// This doesn't finish in several minutes: 
val result = dataAllRDD.map { dat =>
  val c   = dat._1
  val dataArr = dat._2
  // Map over the Arrays within dataArr: 
  val test = dataArr.map { arr =>
val test2 = new DoubleMatrix(arr.length, 1, arr:_*)
val out = test2.dot(w1)
out
  }
  (c, test)
}.collect

After sending the above task, nothing is written to the worker logs (as
viewed through the web UI), but the following output is printed in the Spark
shell where I'm running the task: 

14/08/11 18:17:31 INFO SparkContext: Starting job: collect at :33
14/08/11 18:17:31 INFO DAGScheduler: Got job 0 (collect at :33)
with 3 output partitions (allowLocal=false)
14/08/11 18:17:31 INFO DAGScheduler: Final stage: Stage 0 (collect at
:33)
14/08/11 18:17:31 INFO DAGScheduler: Parents of final stage: List()
14/08/11 18:17:31 INFO DAGScheduler: Missing parents: List()
14/08/11 18:17:31 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[1] at map
at :23), which has no missing parents
14/08/11 18:17:32 INFO DAGScheduler: Submitting 3 missing tasks from Stage 0
(MappedRDD[1] at map at :23)
14/08/11 18:17:32 INFO TaskSchedulerImpl: Adding task set 0.0 with 3 tasks
14/08/11 18:17:32 INFO TaskSetManager: Starting task 0.0:0 as TID 0 on
executor 2:  (PROCESS_LOCAL)
14/08/11 18:17:32 INFO TaskSetManager: Serialized task 0.0:0 as 16154060
bytes in 69 ms
14/08/11 18:17:32 INFO TaskSetManager: Starting task 0.0:1 as TID 1 on
executor 1:  (PROCESS_LOCAL)
14/08/11 18:17:32 INFO TaskSetManager: Serialized task 0.0:1 as 16154060
bytes in 81 ms
14/08/11 18:17:32 INFO TaskSetManager: Starting task 0.0:2 as TID 2 on
executor 0:  (PROCESS_LOCAL)
14/08/11 18:17:32 INFO TaskSetManager: Serialized task 0.0:2 as 16154060
bytes in 66 ms


dataAllRDD.map does work with smaller array though (e.g. nY = 100; finishes
in less than a second). 

Why is dataAllRDD.map so much slower than dataAll.map, or even not executing
at all? 

The Spark version I'm using is 0.9.0. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Parallelizing-a-task-makes-it-freeze-tp11900.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Parallelizing a task makes it freeze

2014-08-12 Thread sparkuser2345
Actually the program hangs just by calling dataAllRDD.count(). I suspect
creating the RDD is not successful when its elements are too big. When nY =
3000, dataAllRDD.count() works (each element of dataAll = 3000*400*64 bits =
9.6 MB), but when nY = 4000, it hangs (4000*400*64 bits = 12.8 MB). 

What are the limiting factors to the size of the elements of an RDD? 


sparkuser2345 wrote
> I have an array 'dataAll' of key-value pairs where each value is an array
> of arrays. I would like to parallelize a task over the elements of
> 'dataAll' to the workers. In the dummy example below, the number of
> elements in 'dataAll' is 3 but in real application it would be tens to
> hundreds. 
> 
> Without parallelizing dataAll, 'result' is calculated in less than a
> second: 
> 
> import org.jblas.DoubleMatrix  
> 
> val nY = 5000
> val nX = 400
> 
> val dataAll = Array((1, Array.fill(nY)(Array.fill(nX)(1.0))),
> (2, Array.fill(nY)(Array.fill(nX)(1.0))),
> (3, Array.fill(nY)(Array.fill(nX)(1.0
> 
> val w1 = DoubleMatrix.ones(400)
> 
> // This finishes in less than a second: 
> val result = dataAll.map { dat =>
>   val c   = dat._1
>   val dataArr = dat._2
>   // Map over the Arrays within dataArr: 
>   val test = dataArr.map { arr =>
> val test2 = new DoubleMatrix(arr.length, 1, arr:_*)
> val out = test2.dot(w1)
> out
>   }
>   (c, test)
> }
> 
> However, when I parallelize dataAll, the same task freezes: 
> 
> val dataAllRDD = sc.parallelize(dataAll, 3)
> 
> // This doesn't finish in several minutes: 
> val result = dataAllRDD.map { dat =>
>   val c   = dat._1
>   val dataArr = dat._2
>   // Map over the Arrays within dataArr: 
>   val test = dataArr.map { arr =>
> val test2 = new DoubleMatrix(arr.length, 1, arr:_*)
> val out = test2.dot(w1)
> out
>   }
>   (c, test)
> }.collect
> 
> After sending the above task, nothing is written to the worker logs (as
> viewed through the web UI), but the following output is printed in the
> Spark shell where I'm running the task: 
> 
> 14/08/11 18:17:31 INFO SparkContext: Starting job: collect at 
> 
> :33
> 14/08/11 18:17:31 INFO DAGScheduler: Got job 0 (collect at 
> 
> :33) with 3 output partitions (allowLocal=false)
> 14/08/11 18:17:31 INFO DAGScheduler: Final stage: Stage 0 (collect at 
> 
> :33)
> 14/08/11 18:17:31 INFO DAGScheduler: Parents of final stage: List()
> 14/08/11 18:17:31 INFO DAGScheduler: Missing parents: List()
> 14/08/11 18:17:31 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[1] at
> map at 
> 
> :23), which has no missing parents
> 14/08/11 18:17:32 INFO DAGScheduler: Submitting 3 missing tasks from Stage
> 0 (MappedRDD[1] at map at 
> 
> :23)
> 14/08/11 18:17:32 INFO TaskSchedulerImpl: Adding task set 0.0 with 3 tasks
> 14/08/11 18:17:32 INFO TaskSetManager: Starting task 0.0:0 as TID 0 on
> executor 2: 
> 
>  (PROCESS_LOCAL)
> 14/08/11 18:17:32 INFO TaskSetManager: Serialized task 0.0:0 as 16154060
> bytes in 69 ms
> 14/08/11 18:17:32 INFO TaskSetManager: Starting task 0.0:1 as TID 1 on
> executor 1: 
> 
>  (PROCESS_LOCAL)
> 14/08/11 18:17:32 INFO TaskSetManager: Serialized task 0.0:1 as 16154060
> bytes in 81 ms
> 14/08/11 18:17:32 INFO TaskSetManager: Starting task 0.0:2 as TID 2 on
> executor 0: 
> 
>  (PROCESS_LOCAL)
> 14/08/11 18:17:32 INFO TaskSetManager: Serialized task 0.0:2 as 16154060
> bytes in 66 ms
> 
> 
> dataAllRDD.map does work with smaller array though (e.g. nY = 100;
> finishes in less than a second). 
> 
> Why is dataAllRDD.map so much slower than dataAll.map, or even not
> executing at all? 
> 
> The Spark version I'm using is 0.9.0.





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Parallelizing-a-task-makes-it-freeze-tp11900p11967.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org