laziness in textFile reading from HDFS?
Hello, I need to process a significant amount of data every day, about 4TB. This will be processed in batches of about 140GB. The cluster this will be running on doesn't have enough memory to hold the dataset at once, so I am trying to understand how this works internally. When using textFile to read an HDFS folder (containing multiple files), I understand that the number of partitions created are equal to the number of HDFS blocks, correct? Are those created in a lazy way? I mean, if the number of blocks/partitions is larger than the number of cores/threads the Spark driver was launched with (N), are N partitions created initially and then the rest when required? Or are all those partitions created up front? I want to avoid reading the whole data into memory just to spill it out to disk if there is no enough memory. Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/laziness-in-textFile-reading-from-HDFS-tp24837.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: ClassCastException: BlockManagerId cannot be cast to [B
Hello, Just in case someone finds the same issue, it was caused by running the streaming app with different version of the cluster jars (the uber jar contained both yarn and spark). Regards J -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ClassCastException-BlockManagerId-cannot-be-cast-to-B-tp23276p23296.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
ClassCastException: BlockManagerId cannot be cast to [B
Hello, I am running a streaming app in Spark 1.2.1. When running local everything works fine. When I try on yarn-cluster it fails and I see ClassCastException in the log (see below). I can run Spark (non-streaming) apps in the cluster with no problem. Any ideas here? Thanks in advance! WARN scheduler.TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, yarn-slave1): java.lang.ClassCastException: org.apache.spark.storage.BlockManagerId cannot be cast to [B at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:61) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ClassCastException-BlockManagerId-cannot-be-cast-to-B-tp23276.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Custom paritioning of DSTream
Hello Evo, Ranjitiyer, I am also looking for the same thing. Using foreach is not useful for me as processing the RDD as a whole won't be distributed across workers and that would kill performance in my application :-/ Let me know if you find a solution for this. Regards -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Custom-paritioning-of-DSTream-tp22574p22630.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Identify the performance bottleneck from hardware prospective
Hello Julaiti, Maybe I am just asking the obvious :-) but did you check disk IO? Depending on what you are doing that could be the bottleneck. In my case none of the HW resources was a bottleneck, but using some distributed features that were blocking execution (e.g. Hazelcast). Could that be your case as well? Regards -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Identify-the-performance-bottleneck-from-hardware-prospective-tp21684p21927.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Extra output from Spark run
If you do not want those progress indication to appear, just set spark.ui.showConsoleProgress to false, e.g: System.setProperty(spark.ui.showConsoleProgress, false); Regards -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Extra-output-from-Spark-run-tp21920p21931.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Does Spark automatically run different stages concurrently when possible?
Hi Jon, I am looking for an answer for a similar question in the doc now, so far no clue. I would need to know what is spark behaviour in a situation like the example you provided, but taking into account also that there are multiple partitions/workers. I could imagine it's possible that different spark workers are not synchronized in terms of waiting for each other to progress to the next step/stage for the partitions of data they get assigned, while I believe in streaming they would wait for the current batch to complete before they start working on a new one. In the code I am working on, I need to make sure a particular step is completed (in all workers, for all partitions) before next transformation is applied. Would be great if someone could clarify or point to these issues in the doc! :-) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Does-Spark-automatically-run-different-stages-concurrently-when-possible-tp21075p21227.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Newbie Question on How Tasks are Executed
Hello Mixtou, if you want to look at partition ID, I believe you want to use mapPartitionsWithIndex -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Newbie-Question-on-How-Tasks-are-Executed-tp21064p21228.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming scheduling control
Thanks Akhil Das-2: actually I tried setting spark.default.parallelism but no effect :-/ I am running standalone and performing a mix of map/filter/foreachRDD. I had to force parallelism with repartition to get both workers to process tasks, but I do not think this should be required (and I am not sure it's not optimal). As I mentioned, without forcing it with repartition, there are scheduled tasks on the queue that continue accumulating over time, so I would expect Spark should assigning those to idle workers. Is my assumption wrong? :-) Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-scheduling-control-tp16778p16805.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming scheduling control
One detail, even forcing partitions (/repartition/), spark is still holding some tasks; if I increase the load of the system (increasing /spark.streaming.receiver.maxRate/), even if all workers are used, the one with the receiver gets twice as many tasks compared with the other workers. Total delay keeps growing in this scenario, even if there are workers that are not 100% loaded :-/ What is the load distribution criteria/policy in Spark? Is there any documentation? Anything will help, thanks :-) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-scheduling-control-tp16778p16825.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark Streaming scheduling control
Hello, I have a cluster 1 master and 2 slaves running on 1.1.0. I am having problems to get both slaves working at the same time. When I launch the driver on the master, one of the slaves is assigned the receiver task, and initially both slaves start processing tasks. After a few tens of batches, the slave running the receiver starts processing all tasks, and the other won't execute any task more. If I cancel the execution and start over, the roles may switch if the other slave gets to be assigned the receiver, but the behaviour is the same, and the other slave will stop processing tasks after a short while. So both slaves are working, essentially, but never at the same time in a consistent way. No errors on logs, etc. I have tried increasing partitions (up to 100, while slaves have 4 cores each) but no success :-/ I understand that Spark may decide not to distribute tasks to all workers due to data locality, etc. but in this case I think there is something else, since one slave cannot keep up with the processing rate and the total delay keeps growing: I have set up the batch interval to 1s, but each batch is processed in 1.6s so after some time the delay (and the enqueued data) is just too much. Does Spark take into consideration this time restriction on the scheduling? I mean total processing time = batch duration. Any configuration affecting that? Am I missing something important? Any hints or things to tests? Thanks in advance! ;-) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-scheduling-control-tp16778.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
bug with MapPartitions?
Hello, Maybe there is something I do not get to understand, but I believe this code should not throw any serialization error when I run this in the spark shell. Using similar code with map instead of mapPartitions works just fine. import java.io.BufferedInputStream import java.io.FileInputStream import com.testing.DataPacket val inStream = new BufferedInputStream(new FileInputStream(inputFile)) val p = new DataPacket(inStream) val c = Array(p) val myfunc[T](iter: Iterator[T]) : Iterator[String] = { var res = List[String]() while (iter.hasNext) { val cur = iter.next; res .::= () } res.iterator } var r = sc.parallelize(c).mapPartitions(myfunc).collect() This throws the following: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1242) at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597) ... ... Caused by: java.io.NotSerializableException: java.io.BufferedInputStream at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) ... ... Why is this code failing? The constructor of DataPacket just reads data, but does not keep any reference to the BufferedInputStream. Note that this is not the real code, but a simplification while trying to isolate the cause of the error I get. Using map on this instead of MapPartitions works fine. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/bug-with-MapPartitions-tp16689.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org