Re: Connecting an Application to the Cluster
Spark has the concept of Driver and Master Driver is your the spark program that you run in your local machine. SparkContext resides in the driver together with the DAG scheduler. Master is responsible for managing cluster resources, e.g. giving the Driver the workers that it needed. The Master can be either Mesos master (for Mesos cluster), or Spark master (for Spark standalone cluster), or ResourceManager (for Hadoop cluster) Given the resources assigned by Master, Driver will user DAG to assign tasks to workers. So yes, the result of spark's actions will be sent back to driver, which is your local console. On Feb 17, 2014, at 10:54 AM, David Thomas dt5434...@gmail.com wrote: So if I do a spark action, say, collect, will I be able to see the result on my local console? Or would it be only available only on the cluster master? On Mon, Feb 17, 2014 at 9:50 AM, purav aggarwal puravaggarwal...@gmail.com wrote: Your local machine simply submits your job (in the form of jar) to the cluster. The master node is where the SparkContext object is created, a DAG of your job is formed and tasks (stages) are assigned to different workers - which are not aware of anything but computation of task being assigned. On Mon, Feb 17, 2014 at 10:07 PM, David Thomas dt5434...@gmail.com wrote: Where is the SparkContext object created then? On my local machine or on the master node in the cluster? On Mon, Feb 17, 2014 at 4:17 AM, Nhan Vu Lam Chi nhani...@adatao.com wrote: Your local app will be called driver program, which creates jobs and submits them to the cluster for running. On Mon, Feb 17, 2014 at 9:19 AM, David Thomas dt5434...@gmail.com wrote: From docs: Connecting an Application to the Cluster To run an application on the Spark cluster, simply pass the spark://IP:PORT URL of the master as to the SparkContext constructor. Could someone enlighten me on what happens if I run the app, from say, Eclipse on my local machine, but use the url of the master node which is on cloud. What role does my local JVM play then?
Re: Connecting an Application to the Cluster
It used to be that you have to read Spark code to figure this information out. However, Spark team has recently published this info here: http://spark.incubator.apache.org/docs/latest/cluster-overview.html On Feb 17, 2014, at 11:35 AM, purav aggarwal puravaggarwal...@gmail.com wrote: Sorry for the incorrect information. Where can I pick up these architectural/design concepts for Spark? I seem to have misunderstood the responsibilities of the master and the driver. On Mon, Feb 17, 2014 at 10:51 PM, Michael (Bach) Bui free...@adatao.com wrote: Spark has the concept of Driver and Master Driver is your the spark program that you run in your local machine. SparkContext resides in the driver together with the DAG scheduler. Master is responsible for managing cluster resources, e.g. giving the Driver the workers that it needed. The Master can be either Mesos master (for Mesos cluster), or Spark master (for Spark standalone cluster), or ResourceManager (for Hadoop cluster) Given the resources assigned by Master, Driver will user DAG to assign tasks to workers. So yes, the result of spark's actions will be sent back to driver, which is your local console. On Feb 17, 2014, at 10:54 AM, David Thomas dt5434...@gmail.com wrote: So if I do a spark action, say, collect, will I be able to see the result on my local console? Or would it be only available only on the cluster master? On Mon, Feb 17, 2014 at 9:50 AM, purav aggarwal puravaggarwal...@gmail.com wrote: Your local machine simply submits your job (in the form of jar) to the cluster. The master node is where the SparkContext object is created, a DAG of your job is formed and tasks (stages) are assigned to different workers - which are not aware of anything but computation of task being assigned. On Mon, Feb 17, 2014 at 10:07 PM, David Thomas dt5434...@gmail.com wrote: Where is the SparkContext object created then? On my local machine or on the master node in the cluster? On Mon, Feb 17, 2014 at 4:17 AM, Nhan Vu Lam Chi nhani...@adatao.com wrote: Your local app will be called driver program, which creates jobs and submits them to the cluster for running. On Mon, Feb 17, 2014 at 9:19 AM, David Thomas dt5434...@gmail.com wrote: From docs: Connecting an Application to the Cluster To run an application on the Spark cluster, simply pass the spark://IP:PORT URL of the master as to the SparkContext constructor. Could someone enlighten me on what happens if I run the app, from say, Eclipse on my local machine, but use the url of the master node which is on cloud. What role does my local JVM play then?
Re: How to map each line to (line number, line)?
Note that, Spark use HDFS API to access the file. HDFS API has KeyValueTextInputFormat that addresses Aureliano’s requirement. I am just not sure it KeyValueTextInputFormat has been pulled into the latest version of spark yet. Without that, it may be messy to make sure that the partition boundary is a new line character. I think this usage pattern is important, if it is not yet available, I can try to pull it in. Michael (Bach) Bui, PhD, Senior Staff Architect, ADATAO Inc. www.adatao.com On Dec 30, 2013, at 6:28 AM, Aureliano Buendia buendia...@gmail.com wrote: Hi, When reading a simple text file in spark, what's the best way of mapping each line to (line number, line)? RDD doesn't seem to have an equivalent of zipWithIndex.
Re: debugging NotSerializableException while using Kryo
What spark version are you using? By looking at the code Executor.scala line195, you will at least know what cause the NPE. We can start from there. On Dec 23, 2013, at 10:21 AM, Ameet Kini ameetk...@gmail.com wrote: Thanks Imran. I tried setting spark.closure.serializer to org.apache.spark.serializer.KryoSerializer and now end up seeing NullPointerException when the executor starts up. This is a snippet of the executor's log. Notice how registered TileIdWritable and registered ArgWritable is called, so I see that my KryoRegistrator is being called. However, it's not clear why there's a follow-on NPE. My spark log level is set to DEBUG in log4j.properties (log4j.rootCategory=DEBUG) so not sure if there s some other way to get the executor to be more verbose as to the cause of the NPE. When I take out the spark.closure.serializer setting (i.e., go back to the default Java serialization), the executors start up fine, and executes other RDD actions, but of course not the lookup action (my original problem). With the spark.closure.serializer setting to kryo, it dies with an NPE during executor startup. 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Connecting to driver: akka.tcp://[redacted]:48147/user/StandaloneScheduler 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Successfully registered with driver 13/12/23 11:00:36 INFO Slf4jLogger: Slf4jLogger started 13/12/23 11:00:36 INFO Remoting: Starting remoting 13/12/23 11:00:36 INFO Remoting: Remoting started; listening on addresses :[akka.tcp:/[redacted]:56483] 13/12/23 11:00:36 INFO Remoting: Remoting now listens on addresses: [akka.tcp://[redacted]:56483] 13/12/23 11:00:36 INFO SparkEnv: Connecting to BlockManagerMaster: akka.tcp://[redacted]:48147/user/BlockManagerMaster 13/12/23 11:00:36 INFO MemoryStore: MemoryStore started with capacity 323.9 MB. 13/12/23 11:00:36 DEBUG DiskStore: Creating local directories at root dirs '/tmp' 13/12/23 11:00:36 INFO DiskStore: Created local directory at /tmp/spark-local-20131223110036-4335 13/12/23 11:00:36 INFO ConnectionManager: Bound socket to port 41617 with id = ConnectionManagerId([redacted],41617) 13/12/23 11:00:36 INFO BlockManagerMaster: Trying to register BlockManager 13/12/23 11:00:36 INFO BlockManagerMaster: Registered BlockManager 13/12/23 11:00:36 INFO SparkEnv: Connecting to MapOutputTracker: akka.tcp:/[redacted]:48147/user/MapOutputTracker 13/12/23 11:00:36 INFO HttpFileServer: HTTP File server directory is /tmp/spark-e71e0a2b-a247-4bb8-b06d-19c12467b65a 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 0 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 1 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 2 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 3 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: geotrellis.spark.KryoRegistrator 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: geotrellis.spark.KryoRegistrator 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: geotrellis.spark.KryoRegistrator 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: geotrellis.spark.KryoRegistrator registered TileIdWritable registered TileIdWritable registered TileIdWritable registered TileIdWritable registered ArgWritable registered ArgWritable registered ArgWritable registered ArgWritable 13/12/23 11:00:37 INFO Executor: Running task ID 2 13/12/23 11:00:37 INFO Executor: Running task ID 1 13/12/23 11:00:37 INFO Executor: Running task ID 3 13/12/23 11:00:37 INFO Executor: Running task ID 0 13/12/23 11:00:37 INFO Executor: Fetching http://10.194.70.21:51166/jars/geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar with timestamp 1387814434436 13/12/23 11:00:37 INFO Utils: Fetching http://10.194.70.21:51166/jars/geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar to /tmp/fetchFileTemp2456419097284083628.tmp 13/12/23 11:00:37 INFO Executor: Adding file[redacted]/spark/work/app-20131223110034-/0/./geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar to class loader 13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread Thread[pool-7-thread-4,5,main] java.lang.NullPointerException at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195) at scala.Option.flatMap(Option.scala:170) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:724) 13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread Thread[pool-7-thread-2,5,main] java.lang.NullPointerException at
Re: How to access a sub matrix in a spark task?
Hmm, I misread that you need a sliding window. I am thinking out loud here: one way of dealing with this is to improve NLineInputFormat so that partitions will have a small overlapping portion in this case the overlapping portion is 50 columns So let say the matrix is divided into overlapping partitions like this [100 x col[1, n*50] ] , [100 x col[(n-1)*50+1, (2n-1)*50] ] … then we can assign each partition to a mapper to do mapPartition on it. Michael (Bach) Bui, PhD, Senior Staff Architect, ADATAO Inc. www.adatao.com On Dec 20, 2013, at 1:11 PM, Michael (Bach) Bui free...@adatao.com wrote: Here, Tom assumed that you have your big matrix already being loaded in one machine. Now if you want to distribute it to slave nodes you will need to broadcast it. I would expect this broadcasting will be done once at the beginning of your algorithm and the computation time will dominate the overall execution time. On the other hand, a better way to deal with huge matrix is to store the data in hdfs and load data into each slaves partition-by-partition. This is fundamental data processing pattern in Spark/Hadoop world. If you opt to do this, you will have to use suitable InputFormat to make sure each partition has the right amount of row that you want. For example if you are lucky each HDFS partition have exact n*50 rows, then you can use rdd.mapPartition(func). Where func will take care of splitting n*50-row partition into n sub matrix However, HDFS TextInput or SequnceInputFormat format will not guarantee each partition has certain number of rows. What you want is NLineInputFormat, which I think currently has not been pulled into Spark yet. If everyone think this is needed, I can implement it quickly, it should be pretty easy. Michael (Bach) Bui, PhD, Senior Staff Architect, ADATAO Inc. www.adatao.com On Dec 20, 2013, at 12:38 PM, Aureliano Buendia buendia...@gmail.com wrote: On Fri, Dec 20, 2013 at 6:00 PM, Tom Vacek minnesota...@gmail.com wrote: Oh, I see. I was thinking that there was a computational dependency on one window to the next. If the computations are independent, then I think Spark can help you out quite a bit. I think you would want an RDD where each element is a window of your dense matrix. I'm not aware of a way to distribute the windows of the big matrix in a way that doesn't involve broadcasting the whole thing. You might have to tweak some config options, but I think it would work straightaway. I would initialize the data structure like this: val matB = sc.broadcast(myBigDenseMatrix) val distributedChunks = sc.parallelize(0 until numWindows).mapPartitions(it = it.map(windowID = getWindow(matB.value, windowID) ) ) Here broadcast is used instead of calling parallelize on myBigDenseMatrix. Is it okay to broadcast a huge amount of data? Does sharing a big data mean a big network io overhead comparing to calling parallelize, or is this overhead optimized due to the of partitioning? Then just apply your matrix ops as map on You maybe have your own tool for dense matrix ops, but I would suggest Scala Breeze. You'll have to use an old version of Breeze (current builds are for 2.10). Spark with Scala-2.10 is a little way off. On Fri, Dec 20, 2013 at 11:40 AM, Aureliano Buendia buendia...@gmail.com wrote: On Fri, Dec 20, 2013 at 5:21 PM, Tom Vacek minnesota...@gmail.com wrote: If you use an RDD[Array[Double]] with a row decomposition of the matrix, you can index windows of the rows all you want, but you're limited to 100 concurrent tasks. You could use a column decomposition and access subsets of the columns with a PartitionPruningRDD. I have to say, though, if you're doing dense matrix operations, they will be 100s of times faster on a shared mem platform. This particular matrix, at 800 MB could be a Breeze on a single node. The computation for every submatrix is very expensive, it takes days on a single node. I was hoping this can be reduced to hours or minutes with spark. Are you saying that spark is not suitable for this type of job?
Re: Continued performance issues on a small EC2 Spark cluster
Hi Gary, What are other frameworks running on your Mesos cluster? If they are all Spark frameworks. Another option you may want to consider (in order to improve your cluster utilization) is to let all of them share a single SparkContext. We also experienced degraded performance while running multiple Spark frameworks on Mesos, mainly due to there are a lot of non-preferred-location tasks. Best, ~ mbbui On Nov 15, 2013, at 11:30 AM, Gary Malouf malouf.g...@gmail.com wrote: Hi everyone, I want to share what helped us resolve the issue short term and also our concerns longer term. Some Background: Many of our jobs that look at a few weeks of data have task counts around 3500+. We went with a small cluster of 4 EC2 larges for Mesos+Spark in production for now because our data sizes are small (3 files hourly on the order of 10-20mb). The idea is to get this steady and scale horizontally as the data flow continues to go up. Our Problem: In short, the time to execute simple tasks with this data of this size for the past 2 weeks or so was not reasonable for us. The jobs were taking well over an hour to complete. We were very concerned that as the hourly data grows in size this will become completely unacceptable. The (Short Term) Solution: Thanks to Paco and Evan's input, we decided to switch Spark to run in coarse grained mode for the time-being. We saw an immediate 6-8x improvement on our job runtime and we were able to complete our short term tasks. The downside we see to this is that it takes away many of the benefits of running on Mesos - namely, the fine-grained resource management. The Longer Term Solution: We are looking at ways of tuning to reduce the number of Spark tasks being generated overall. We think if we can get this down enough, it justifies taking the performance hit in fine-grained mode since the cluster resources will now be allocated dynamically for multiple jobs. Hope this is helpful to people, Gary On Thu, Nov 14, 2013 at 11:47 AM, Gary Malouf malouf.g...@gmail.com wrote: I bring this up because the performance we are seeing is dreadful. From cpu usage, it appears the issue is the spark shell cpu power. We have increased this node from a EC2 medium to an xl, we are seeing slightly better performance but still not great. My understanding of Spark was that most of the work should be done on the slaves with just the results going back to the shell at the end if we do a take. It appears from what we see that the client is doing much more work than expected. On Wed, Nov 13, 2013 at 10:40 PM, Gary Malouf malouf.g...@gmail.com wrote: Hi, We have an HDFS set up of a namenode and three datanodes all on EC2 larges. One of our data partitions basically has files that are fed from a few Flume instances rolling hourly. This equates to around 3 4-8mb files per hour right now Our Mesos cluster consists of a Master and the three slave nodes colocated on these EC2 larges as well (slaves - datanodes, mesos master - namenode). Spark scheduled jobs are launched from spark shell ad-hoc today. The data is serialized protobuf messages in sequence files. Our operations typically consist of deserializing the data, grabbing a few primitive fields out of the message and doing some maps/reduces. For grabbing on the order of 2 days of data this size, what would the expected Spark performance be? We are seeing simple maps and 'takes' on this data taking on the order of 15 minutes. Thanks, Gary
Re: SPARK + YARN the general case
Tom, more on Shark type of applications on Yarn. In the current implementation, during the duration of a SparkContext execution, Yarn will give an unchanged set of nodes to the SparkContext, is that right? If that is the case, IMO, it may not be the best architecture for Shark, because users may load data from nodes that are not in the given set of nodes. Am I right? On Nov 15, 2013, at 12:51 PM, Tom Graves tgraves...@yahoo.com wrote: Shark is not currently supported on yarn. There are 2 ways this could be done that come to mind. One would be to run shark as the application itself that gets started on the application master in the current yarn-standalone mode, the other is with using the yarn-client introduced in the spark-shell pull request. I saw some changes that went into Shark that were to support running it along with the yarn-client pull request (101), but I haven't had time to actually try these yet. Tom On Friday, November 15, 2013 10:45 AM, Michael (Bach) Bui free...@adatao.com wrote: Hi Tom, I have another question on SoY. Seems like the current implementation will not support interactive type of application like Shark, right? Thanks. On Nov 15, 2013, at 8:15 AM, Tom Graves tgraves...@yahoo.com wrote: Hey Bill, Currently the Spark on Yarn only supports batch mode where you submit your job via the yarn Client. Note that this will hook the spark UI up to the Yarn ResourceManager web UI. Is there something more you were looking for then just finding the spark web ui for various jobs? There is a pull request (101) to get spark shell working with YARN. Tom On Thursday, November 14, 2013 10:57 AM, Bill Sparks jspa...@cray.com wrote: Sorry for the following question, but I just need a little clarity on expectations of Spark using YARN. Is it possible to use the spark-shell with YARN ? Or is the only way to submit a Spark job to YARN is by write a Java application and submit it via the yarn.Client application. Also is there a way of running the Spark master so that it can communicate with YARN so I can use the web UI for job tracking. Thanks, Bill