laziness in textFile reading from HDFS?

2015-09-28 Thread davidkl
Hello,

I need to process a significant amount of data every day, about 4TB. This
will be processed in batches of about 140GB. The cluster this will be
running on doesn't have enough memory to hold the dataset at once, so I am
trying to understand how this works internally.

When using textFile to read an HDFS folder (containing multiple files), I
understand that the number of partitions created are equal to the number of
HDFS blocks, correct? Are those created in a lazy way? I mean, if the number
of blocks/partitions is larger than the number of cores/threads the Spark
driver was launched with (N), are N partitions created initially and then
the rest when required? Or are all those partitions created up front?

I want to avoid reading the whole data into memory just to spill it out to
disk if there is no enough memory.

Thanks! 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/laziness-in-textFile-reading-from-HDFS-tp24837.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: ClassCastException: BlockManagerId cannot be cast to [B

2015-06-12 Thread davidkl
Hello,

Just in case someone finds the same issue, it was caused by running the
streaming app with different version of the cluster jars (the uber jar
contained both yarn and spark).

Regards

J



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/ClassCastException-BlockManagerId-cannot-be-cast-to-B-tp23276p23296.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



ClassCastException: BlockManagerId cannot be cast to [B

2015-06-11 Thread davidkl
Hello,

I am running a streaming app in Spark 1.2.1. When running local everything
works fine. When I try on yarn-cluster it fails and I see ClassCastException
in the log (see below). I can run Spark (non-streaming) apps in the cluster
with no problem.

Any ideas here? Thanks in advance!

WARN scheduler.TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1,
yarn-slave1): java.lang.ClassCastException:
org.apache.spark.storage.BlockManagerId cannot be cast to [B
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:61)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/ClassCastException-BlockManagerId-cannot-be-cast-to-B-tp23276.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Custom paritioning of DSTream

2015-04-23 Thread davidkl
Hello Evo, Ranjitiyer,

I am also looking for the same thing. Using foreach is not useful for me as
processing the RDD as a whole won't be distributed across workers and that
would kill performance in my application :-/

Let me know if you find a solution for this. 

Regards



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Custom-paritioning-of-DSTream-tp22574p22630.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Identify the performance bottleneck from hardware prospective

2015-03-05 Thread davidkl
Hello Julaiti,

Maybe I am just asking the obvious :-) but did you check disk IO? Depending
on what you are doing that could be the bottleneck.

In my case none of the HW resources was a bottleneck, but using some
distributed features that were blocking execution (e.g. Hazelcast). Could
that be your case as well? 

Regards



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Identify-the-performance-bottleneck-from-hardware-prospective-tp21684p21927.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Extra output from Spark run

2015-03-05 Thread davidkl
If you do not want those progress indication to appear, just set
spark.ui.showConsoleProgress to false, e.g:

System.setProperty(spark.ui.showConsoleProgress, false);

Regards



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Extra-output-from-Spark-run-tp21920p21931.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Does Spark automatically run different stages concurrently when possible?

2015-01-19 Thread davidkl
Hi Jon, I am looking for an answer for a similar question in the doc now, so
far no clue.

I would need to know what is spark behaviour in a situation like the example
you provided, but taking into account also that there are multiple
partitions/workers.

I could imagine it's possible that different spark workers are not
synchronized in terms of waiting for each other to progress to the next
step/stage for the partitions of data they get assigned, while I believe in
streaming they would wait for the current batch to complete before they
start working on a new one.

In the code I am working on, I need to make sure a particular step is
completed (in all workers, for all partitions) before next transformation is
applied.

Would be great if someone could clarify or point to these issues in the doc!
:-)
 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Does-Spark-automatically-run-different-stages-concurrently-when-possible-tp21075p21227.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Newbie Question on How Tasks are Executed

2015-01-19 Thread davidkl
Hello Mixtou, if you want to look at partition ID, I believe you want to use
mapPartitionsWithIndex





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Newbie-Question-on-How-Tasks-are-Executed-tp21064p21228.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming scheduling control

2014-10-20 Thread davidkl
Thanks Akhil Das-2: actually I tried setting spark.default.parallelism but no
effect :-/

I am running standalone and performing a mix of map/filter/foreachRDD. 

I had to force parallelism with repartition to get both workers to process
tasks, but I do not think this should be required (and I am not sure it's
not optimal). As I mentioned, without forcing it with repartition, there are
scheduled tasks on the queue that continue accumulating over time, so I
would expect Spark should assigning those to idle workers. Is my assumption
wrong? :-)

Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-scheduling-control-tp16778p16805.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming scheduling control

2014-10-20 Thread davidkl
One detail, even forcing partitions (/repartition/), spark is still holding
some tasks; if I increase the load of the system (increasing
/spark.streaming.receiver.maxRate/), even if all workers are used, the one
with the receiver gets twice as many tasks compared with the other workers. 

Total delay keeps growing in this scenario, even if there are workers that
are not 100% loaded :-/

What is the load distribution criteria/policy in Spark? Is there any
documentation? Anything will help, thanks :-)





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-scheduling-control-tp16778p16825.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark Streaming scheduling control

2014-10-19 Thread davidkl
Hello,

I have a cluster 1 master and 2 slaves running on 1.1.0. I am having
problems to get both slaves working at the same time. When I launch the
driver on the master, one of the slaves is assigned the receiver task, and
initially both slaves start processing tasks. After a few tens of batches,
the slave running the receiver starts processing all tasks, and the other
won't execute any task more. If I cancel the execution and start over, the
roles may switch if the other slave gets to be assigned the receiver, but
the behaviour is the same, and the other slave will stop processing tasks
after a short while. So both slaves are working, essentially, but never at
the same time in a consistent way. No errors on logs, etc.

I have tried increasing partitions (up to 100, while slaves have 4 cores
each) but no success :-/

I understand that Spark may decide not to distribute tasks to all workers
due to data locality, etc. but in this case I think there is something else,
since one slave cannot keep up with the processing rate and the total delay
keeps growing: I have set up the batch interval to 1s, but each batch is
processed in 1.6s so after some time the delay (and the enqueued data) is
just too much. 

Does Spark take into consideration this time restriction on the scheduling?
I mean total processing time = batch duration. Any configuration affecting
that? 

Am I missing something important? Any hints or things to tests?  

Thanks in advance! ;-)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-scheduling-control-tp16778.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



bug with MapPartitions?

2014-10-17 Thread davidkl
Hello,

Maybe there is something I do not get to understand, but I believe this code
should not throw any serialization error when I run this in the spark shell.
Using similar code with map instead of mapPartitions works just fine.

import java.io.BufferedInputStream
import java.io.FileInputStream
import com.testing.DataPacket

val inStream = new BufferedInputStream(new FileInputStream(inputFile))
val p = new DataPacket(inStream)
val c = Array(p)
val myfunc[T](iter: Iterator[T]) : Iterator[String] = {
  var res = List[String]()
  while (iter.hasNext)
  {
val cur = iter.next;
res .::= ()
  }
  res.iterator
}
var r = sc.parallelize(c).mapPartitions(myfunc).collect()

This throws the following:

org.apache.spark.SparkException: Task not serializable
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597)
...
...
Caused by: java.io.NotSerializableException: java.io.BufferedInputStream
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
...
...

Why is this code failing? The constructor of DataPacket just reads data, but
does not keep any reference to the BufferedInputStream. Note that this is
not the real code, but a simplification while trying to isolate the cause of
the error I get. Using map on this instead of MapPartitions works fine.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/bug-with-MapPartitions-tp16689.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org