Re: Connecting an Application to the Cluster

2014-02-17 Thread Michael (Bach) Bui
Spark has the concept of  Driver and Master

Driver is your the spark program that you run in your local machine. 
SparkContext resides in the driver together with the DAG scheduler.
Master is responsible for managing cluster resources, e.g. giving the Driver 
the workers that it needed. The Master can be either Mesos master (for Mesos 
cluster), or Spark master (for Spark standalone cluster), or ResourceManager 
(for Hadoop cluster)
Given the resources assigned by Master, Driver will user DAG to assign tasks to 
workers.

So yes, the result of spark's actions will be sent back to driver, which is 
your local console.


On Feb 17, 2014, at 10:54 AM, David Thomas dt5434...@gmail.com wrote:

 So if I do a spark action, say, collect, will I be able to see the result on 
 my local console? Or would it be only available only on the cluster master?
 
 
 On Mon, Feb 17, 2014 at 9:50 AM, purav aggarwal puravaggarwal...@gmail.com 
 wrote:
 Your local machine simply submits your job (in the form of jar) to the 
 cluster.
 The master node is where the SparkContext object is created, a DAG of your 
 job is formed and tasks (stages) are assigned to different workers - which 
 are not aware of anything but computation of task being assigned.
 
 
 On Mon, Feb 17, 2014 at 10:07 PM, David Thomas dt5434...@gmail.com wrote:
 Where is the SparkContext object created then? On my local machine or on the 
 master node in the cluster?
 
 
 On Mon, Feb 17, 2014 at 4:17 AM, Nhan Vu Lam Chi nhani...@adatao.com wrote:
 Your local app will be called driver program, which creates jobs and 
 submits them to the cluster for running.
 
 
 On Mon, Feb 17, 2014 at 9:19 AM, David Thomas dt5434...@gmail.com wrote:
 From docs:
 Connecting an Application to the Cluster
 
 To run an application on the Spark cluster, simply pass the spark://IP:PORT 
 URL of the master as to the SparkContext constructor.
 
 Could someone enlighten me on what happens if I run the app, from say, 
 Eclipse on my local machine, but use the url of the master node which is on 
 cloud. What role does my local JVM play then?
 
 
 
 



Re: Connecting an Application to the Cluster

2014-02-17 Thread Michael (Bach) Bui
It used to be that you have to read Spark code to figure this information out.
However, Spark team has recently published this info here: 
http://spark.incubator.apache.org/docs/latest/cluster-overview.html






On Feb 17, 2014, at 11:35 AM, purav aggarwal puravaggarwal...@gmail.com wrote:

 Sorry for the incorrect information. Where can I pick up these 
 architectural/design concepts for Spark?
 I seem to have misunderstood the responsibilities of the master and the 
 driver.
 
 
 On Mon, Feb 17, 2014 at 10:51 PM, Michael (Bach) Bui free...@adatao.com 
 wrote:
 Spark has the concept of  Driver and Master
 
 Driver is your the spark program that you run in your local machine. 
 SparkContext resides in the driver together with the DAG scheduler.
 Master is responsible for managing cluster resources, e.g. giving the Driver 
 the workers that it needed. The Master can be either Mesos master (for Mesos 
 cluster), or Spark master (for Spark standalone cluster), or ResourceManager 
 (for Hadoop cluster)
 Given the resources assigned by Master, Driver will user DAG to assign tasks 
 to workers.
 
 So yes, the result of spark's actions will be sent back to driver, which is 
 your local console.
 
 
 On Feb 17, 2014, at 10:54 AM, David Thomas dt5434...@gmail.com wrote:
 
 So if I do a spark action, say, collect, will I be able to see the result on 
 my local console? Or would it be only available only on the cluster master?
 
 
 On Mon, Feb 17, 2014 at 9:50 AM, purav aggarwal puravaggarwal...@gmail.com 
 wrote:
 Your local machine simply submits your job (in the form of jar) to the 
 cluster.
 The master node is where the SparkContext object is created, a DAG of your 
 job is formed and tasks (stages) are assigned to different workers - which 
 are not aware of anything but computation of task being assigned.
 
 
 On Mon, Feb 17, 2014 at 10:07 PM, David Thomas dt5434...@gmail.com wrote:
 Where is the SparkContext object created then? On my local machine or on the 
 master node in the cluster?
 
 
 On Mon, Feb 17, 2014 at 4:17 AM, Nhan Vu Lam Chi nhani...@adatao.com wrote:
 Your local app will be called driver program, which creates jobs and 
 submits them to the cluster for running.
 
 
 On Mon, Feb 17, 2014 at 9:19 AM, David Thomas dt5434...@gmail.com wrote:
 From docs:
 Connecting an Application to the Cluster
 
 To run an application on the Spark cluster, simply pass the spark://IP:PORT 
 URL of the master as to the SparkContext constructor.
 
 Could someone enlighten me on what happens if I run the app, from say, 
 Eclipse on my local machine, but use the url of the master node which is on 
 cloud. What role does my local JVM play then?
 
 
 
 
 
 



Re: How to map each line to (line number, line)?

2013-12-30 Thread Michael (Bach) Bui
Note that, Spark use HDFS API to access the file. 
HDFS API has KeyValueTextInputFormat that addresses Aureliano’s requirement.

I am just not sure it KeyValueTextInputFormat has been pulled into the latest 
version of spark yet.
Without that, it may be messy to make sure that the partition boundary is a new 
line character.

I think this usage pattern is important, if it is not yet available, I can try 
to pull it in.


Michael (Bach) Bui, PhD,
Senior Staff Architect, ADATAO Inc.
www.adatao.com




On Dec 30, 2013, at 6:28 AM, Aureliano Buendia buendia...@gmail.com wrote:

 Hi,
 
 When reading a simple text file in spark, what's the best way of mapping each 
 line to (line number, line)? RDD doesn't seem to have an equivalent of 
 zipWithIndex.



Re: debugging NotSerializableException while using Kryo

2013-12-23 Thread Michael (Bach) Bui
What spark version are you using? By looking at the code Executor.scala 
line195, you will at least know what cause the NPE.
We can start from there.



On Dec 23, 2013, at 10:21 AM, Ameet Kini ameetk...@gmail.com wrote:

 Thanks Imran. 
 
 I tried setting spark.closure.serializer to 
 org.apache.spark.serializer.KryoSerializer and now end up seeing 
 NullPointerException when the executor starts up. This is a snippet of the 
 executor's log. Notice how registered TileIdWritable and registered 
 ArgWritable is called, so I see that my KryoRegistrator is being called. 
 However, it's not clear why there's a follow-on NPE. My spark log level is 
 set to DEBUG in log4j.properties (log4j.rootCategory=DEBUG) so not sure if 
 there s 
 some other way to get the executor to be more verbose as to the cause of the 
 NPE. 
 
 When I take out the spark.closure.serializer setting (i.e., go back to the 
 default Java serialization), the executors start up fine, and executes other 
 RDD actions, but of course not the lookup action (my original problem). With 
 the spark.closure.serializer setting to kryo, it dies with an NPE during 
 executor startup. 
 
 
 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Connecting to driver: 
 akka.tcp://[redacted]:48147/user/StandaloneScheduler
 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Successfully registered 
 with driver
 13/12/23 11:00:36 INFO Slf4jLogger: Slf4jLogger started
 13/12/23 11:00:36 INFO Remoting: Starting remoting
 13/12/23 11:00:36 INFO Remoting: Remoting started; listening on addresses 
 :[akka.tcp:/[redacted]:56483]
 13/12/23 11:00:36 INFO Remoting: Remoting now listens on addresses: 
 [akka.tcp://[redacted]:56483]
 13/12/23 11:00:36 INFO SparkEnv: Connecting to BlockManagerMaster: 
 akka.tcp://[redacted]:48147/user/BlockManagerMaster
 13/12/23 11:00:36 INFO MemoryStore: MemoryStore started with capacity 323.9 
 MB.
 13/12/23 11:00:36 DEBUG DiskStore: Creating local directories at root dirs 
 '/tmp'
 13/12/23 11:00:36 INFO DiskStore: Created local directory at 
 /tmp/spark-local-20131223110036-4335
 13/12/23 11:00:36 INFO ConnectionManager: Bound socket to port 41617 with id 
 = ConnectionManagerId([redacted],41617)
 13/12/23 11:00:36 INFO BlockManagerMaster: Trying to register BlockManager
 13/12/23 11:00:36 INFO BlockManagerMaster: Registered BlockManager
 13/12/23 11:00:36 INFO SparkEnv: Connecting to MapOutputTracker: 
 akka.tcp:/[redacted]:48147/user/MapOutputTracker
 13/12/23 11:00:36 INFO HttpFileServer: HTTP File server directory is 
 /tmp/spark-e71e0a2b-a247-4bb8-b06d-19c12467b65a
 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 0
 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 1
 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 2
 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 3
 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: 
 geotrellis.spark.KryoRegistrator
 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: 
 geotrellis.spark.KryoRegistrator
 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: 
 geotrellis.spark.KryoRegistrator
 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: 
 geotrellis.spark.KryoRegistrator
 registered TileIdWritable
 registered TileIdWritable
 registered TileIdWritable
 registered TileIdWritable
 registered ArgWritable
 registered ArgWritable
 registered ArgWritable
 registered ArgWritable
 13/12/23 11:00:37 INFO Executor: Running task ID 2
 13/12/23 11:00:37 INFO Executor: Running task ID 1
 13/12/23 11:00:37 INFO Executor: Running task ID 3
 13/12/23 11:00:37 INFO Executor: Running task ID 0
 13/12/23 11:00:37 INFO Executor: Fetching 
 http://10.194.70.21:51166/jars/geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar with 
 timestamp 1387814434436
 13/12/23 11:00:37 INFO Utils: Fetching 
 http://10.194.70.21:51166/jars/geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar to 
 /tmp/fetchFileTemp2456419097284083628.tmp
 13/12/23 11:00:37 INFO Executor: Adding 
 file[redacted]/spark/work/app-20131223110034-/0/./geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar
  to class loader
 13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread 
 Thread[pool-7-thread-4,5,main]
 java.lang.NullPointerException
 at 
 org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
 at 
 org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
 at scala.Option.flatMap(Option.scala:170)
 at 
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
 at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:724)
 13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread 
 Thread[pool-7-thread-2,5,main]
 java.lang.NullPointerException
 at 
 

Re: How to access a sub matrix in a spark task?

2013-12-20 Thread Michael (Bach) Bui
Hmm, I misread that you need a sliding window.
I am thinking out loud here: one way of dealing with this is to improve 
NLineInputFormat so that partitions will have a small overlapping portion in 
this case the overlapping portion is 50 columns
So let say the matrix is divided into overlapping partitions like this [100 x 
col[1, n*50] ] , [100 x col[(n-1)*50+1, (2n-1)*50] ] … then we can assign each 
partition to a mapper to do mapPartition on it.



Michael (Bach) Bui, PhD,
Senior Staff Architect, ADATAO Inc.
www.adatao.com




On Dec 20, 2013, at 1:11 PM, Michael (Bach) Bui free...@adatao.com wrote:

 Here, Tom assumed that you have your big matrix already being loaded in one 
 machine. Now if you want to distribute it to slave nodes you will need to 
 broadcast it. I would expect this broadcasting will be done once at the 
 beginning of your algorithm and the computation time will dominate the 
 overall execution time.
 
 On the other hand, a better way to deal with huge matrix is to store the data 
 in hdfs and load data into each slaves partition-by-partition. This is 
 fundamental data processing pattern in Spark/Hadoop world.
 If you opt to do this, you will have to use suitable InputFormat to make sure 
 each partition has the right amount of row that you want. 
 For example if you are lucky each HDFS partition have exact n*50 rows, then 
 you can use rdd.mapPartition(func). Where func will take care of splitting 
 n*50-row partition into n sub matrix
 
 However, HDFS TextInput or SequnceInputFormat format will not guarantee each 
 partition has certain number of rows. What you want is NLineInputFormat, 
 which I think currently has not been pulled into Spark yet.
 If everyone think this is needed, I can implement it quickly, it should be 
 pretty easy.
 
 
 
 Michael (Bach) Bui, PhD,
 Senior Staff Architect, ADATAO Inc.
 www.adatao.com
 
 
 
 
 On Dec 20, 2013, at 12:38 PM, Aureliano Buendia buendia...@gmail.com wrote:
 
 
 
 
 On Fri, Dec 20, 2013 at 6:00 PM, Tom Vacek minnesota...@gmail.com wrote:
 Oh, I see.  I was thinking that there was a computational dependency on one 
 window to the next.  If the computations are independent, then I think Spark 
 can help you out quite a bit.
 
 I think you would want an RDD where each element is a window of your dense 
 matrix.  I'm not aware of a way to distribute the windows of the big matrix 
 in a way that doesn't involve broadcasting the whole thing.  You might have 
 to tweak some config options, but I think it would work straightaway.  I 
 would initialize the data structure like this:
 val matB = sc.broadcast(myBigDenseMatrix)
 val distributedChunks = sc.parallelize(0 until numWindows).mapPartitions(it 
 = it.map(windowID = getWindow(matB.value, windowID) ) )
 
 Here broadcast is used instead of calling parallelize on myBigDenseMatrix. 
 Is it okay to broadcast a huge amount of data? Does sharing a big data mean 
 a big network io overhead comparing to calling parallelize, or is this 
 overhead optimized due to the of partitioning?
  
 
 Then just apply your matrix ops as map on  
 
 You maybe have your own tool for dense matrix ops, but I would suggest Scala 
 Breeze.  You'll have to use an old version of Breeze (current builds are for 
 2.10).  Spark with Scala-2.10 is a little way off.
 
 
 On Fri, Dec 20, 2013 at 11:40 AM, Aureliano Buendia buendia...@gmail.com 
 wrote:
 
 
 
 On Fri, Dec 20, 2013 at 5:21 PM, Tom Vacek minnesota...@gmail.com wrote:
 If you use an RDD[Array[Double]] with a row decomposition of the matrix, you 
 can index windows of the rows all you want, but you're limited to 100 
 concurrent tasks.  You could use a column decomposition and access subsets 
 of the columns with a PartitionPruningRDD.  I have to say, though, if you're 
 doing dense matrix operations, they will be 100s of times faster on a shared 
 mem platform.  This particular matrix, at 800 MB could be a Breeze on a 
 single node.
  
 The computation for every submatrix is very expensive, it takes days on a 
 single node. I was hoping this can be reduced to hours or minutes with spark.
 
 Are you saying that spark is not suitable for this type of job?
 
 
 



Re: Continued performance issues on a small EC2 Spark cluster

2013-11-15 Thread Michael (Bach) Bui
Hi Gary,

What are other frameworks running on your Mesos cluster? 
If they are all Spark frameworks. Another option you may want to consider (in 
order to improve your cluster utilization) is to let all of them share a single 
SparkContext.
We also experienced degraded performance while running multiple Spark 
frameworks on Mesos, mainly due to there are a lot of non-preferred-location 
tasks.

Best,

~ mbbui

On Nov 15, 2013, at 11:30 AM, Gary Malouf malouf.g...@gmail.com wrote:

 Hi everyone,
 
 I want to share what helped us resolve the issue short term and also our 
 concerns longer term.
 
 Some Background:
 
 Many of our jobs that look at a few weeks of data have task counts around 
 3500+.  We went with a small cluster of 4 EC2 larges for Mesos+Spark in 
 production for now because our data sizes are small (3 files hourly on the 
 order of 10-20mb).   The idea is to get this steady and scale horizontally as 
 the data flow continues to go up.
 
 Our Problem:
 
 In short, the time to execute simple tasks with this data of this size for 
 the past 2 weeks or so was not reasonable for us.  The jobs were taking well 
 over an hour to complete.  We were very concerned that as the hourly data 
 grows in size this will become completely unacceptable.
 
 The (Short Term) Solution:
 
 Thanks to Paco and Evan's input, we decided to switch Spark to run in coarse 
 grained mode for the time-being.  We saw an immediate 6-8x improvement on our 
 job runtime and we were able to complete our short term tasks.
 
 The downside we see to this is that it takes away many of the benefits of 
 running on Mesos - namely, the fine-grained resource management.
 
 The Longer Term Solution:
 
 We are looking at ways of tuning to reduce the number of Spark tasks being 
 generated overall.  We think if we can get this down enough, it justifies 
 taking the performance hit in fine-grained mode since the cluster resources 
 will now be allocated dynamically for multiple jobs.
 
 
 Hope this is helpful to people,
 
 Gary
 
 
 On Thu, Nov 14, 2013 at 11:47 AM, Gary Malouf malouf.g...@gmail.com wrote:
 I bring this up because the performance we are seeing is dreadful.  From cpu 
 usage, it appears the issue is the spark shell cpu power.  We have increased 
 this node from a EC2 medium to an xl, we are seeing slightly better 
 performance but still not great.  
 
 My understanding of Spark was that most of the work should be done on the 
 slaves with just the results going back to the shell at the end if we do a 
 take.  It appears from what we see that the client is doing much more work 
 than expected.
 
 
 
 
 On Wed, Nov 13, 2013 at 10:40 PM, Gary Malouf malouf.g...@gmail.com wrote:
 Hi,
 
 We have an HDFS set up of a namenode and three datanodes all on EC2 larges.  
 One of our data partitions basically has files that are fed from a few Flume 
 instances rolling hourly.  This equates to around 3 4-8mb files per hour 
 right now
 
 Our Mesos cluster consists of a Master and the three slave nodes colocated on 
 these EC2 larges as well (slaves - datanodes, mesos master - namenode).  
 Spark scheduled jobs are launched from spark shell ad-hoc today.
 
 The data is serialized protobuf messages in sequence files.  Our operations 
 typically consist of deserializing the data, grabbing a few primitive fields 
 out of the message and doing some maps/reduces.
 
 For grabbing on the order of 2 days of data this size, what would the 
 expected Spark performance be?  We are seeing simple maps and 'takes' on this 
 data taking on the order of 15 minutes.
 
 Thanks,
 
 Gary
 
 



Re: SPARK + YARN the general case

2013-11-15 Thread Michael (Bach) Bui
Tom, more on Shark type of applications on Yarn.
In the current implementation, during the duration of a SparkContext execution, 
Yarn will give an unchanged set of nodes to the SparkContext, is that right?
If that is the case, IMO, it may not be the best architecture for Shark, 
because users may load data from nodes that are not in the given set of nodes. 
Am I right? 





On Nov 15, 2013, at 12:51 PM, Tom Graves tgraves...@yahoo.com wrote:

  Shark is not currently supported on yarn. There are 2 ways this could be 
 done that come to mind. One would be to run shark as the application itself 
 that gets started on the application master in the current yarn-standalone 
 mode, the other is with using the yarn-client introduced in the spark-shell 
 pull request.  I saw some changes that went into Shark that were to support 
 running it along with the yarn-client pull request  (101), but I haven't had 
 time to actually try these yet. 
 
 Tom
 
 
 On Friday, November 15, 2013 10:45 AM, Michael (Bach) Bui 
 free...@adatao.com wrote:
 Hi Tom,
 
 I have another question on SoY. Seems like the current implementation will 
 not support interactive type of application like Shark, right?
 Thanks.
 
 
 
 On Nov 15, 2013, at 8:15 AM, Tom Graves tgraves...@yahoo.com wrote:
 
 Hey Bill,
 
 Currently the Spark on Yarn only supports batch mode where you submit your 
 job via the yarn Client.   Note that this will hook the spark UI up to the 
 Yarn ResourceManager web UI.  Is there something more you were looking for 
 then just finding the spark web ui for various jobs?
 
 There is a pull request (101) to get spark shell working with YARN.
 
 Tom
 
 
 On Thursday, November 14, 2013 10:57 AM, Bill Sparks jspa...@cray.com 
 wrote:
 Sorry for the following question, but I just need a little clarity on 
 expectations of Spark using YARN. 
 
 Is it possible to use the spark-shell with YARN ? Or is the only way to 
 submit a Spark job to YARN is by write a Java application and submit it via 
 the yarn.Client application. 
 
 Also is there a way of running the Spark master so that it can communicate 
 with YARN so I can use the web UI for job tracking.
 
 Thanks,
   Bill