Help required to deploy code to Standalone Cluster

2013-10-01 Thread purav aggarwal
Hi,

I am trying to deploy my code (read jar) to a standalone cluster and
nothing is working for me.
- LocalMachine = build machine (Mac)
- Cluster   = (1 master and 1 slave with over 90gigs memory)
(CentOs)
Observations:
1. I can run the code on my local machine passing local as argument to
spark context.
2. I can execute test function on my LocalMachine using "./run-example
org.apache.spark.examples.**SparkPi spark://master:7077" and I can the see
the jar (spark-examples-assembly-0.8.**0-SNAPSHOT.jar) deployed to the
slave work folder and the job being done.
3. Step 2 behaves similar when executed to on master machine using
"./run-example org.apache.spark.examples.**SparkPi spark://master:7077"

4. Now I have written down code in scala for spark. How do I deploy my jar
to the cluster to run and compute. ?
a. Running "/libs/spark/sbt/sbt run" from project directory results in
incessant "cluster.ClusterScheduler: Initial job has not accepted any
resources; check your cluster UI to ensure that workers are registered"

5. I want to keep the build machine separate from the cluster master and
slave.
6. SparkContext in my code looks like this -
val sc  = new SparkContext("spark://master:**7077", "Simple Job",
"$SPARK_HOME", List("target/scala-2.9.3/**simple-project_2.9.3-1.0.jar")**)

Any ideas how to solve this one ?

Regards
Purav


Drop functionality with Spark RDD

2013-11-23 Thread purav aggarwal
Hi,

I have a use case where I read data from files and need to drop certain
number of lines (unwanted data) before I begin processing.

I implemented it as -

  /**
   * Returns an RDD with the first n elements dropped.
   */
  def drop(num: Int) : RDD[T] = {
if (num <= 0)
return this
val toBeDropped = sc.makeRDD(this.take(num))
this.subtract(toBeDropped)
  }

Is the implementation okay ?
If yes, does it make sense to incorporate it in the spark code base since
most Scala collections have a similar drop functionality.

One imp. point to note is the returned RDD might not have the order
maintained - order in which the RDD was constructed in the first place.
Firing a subsequent drop or any order oriented query on the RDD will give
unpredictable results.


Re: Spark heap issues

2013-12-05 Thread purav aggarwal
Try allocating some more resources to your application.
You seem to be using 512Mb for you worker node - (you can verify that from
the master UI)

Try putting the following settings into your code and see if it helps -

System.setProperty("spark.executor.memory","15g")   // Will allocate more
memory
System.setProperty("spark.akka.frameSize","2000")
System.setProperty("spark.akka.threads","16")   // Dependent upon
number of cores with your worker machine


On Fri, Dec 6, 2013 at 1:06 AM, learner1014 all wrote:

> Hi,
>
> Trying to do a join operation on an RDD, my input is pipe delimited data
> and there are 2 files.
> One file is 24MB and the other file is 285MB.
> Setup being used is the single node (server) setup: SPARK_MEM set to 512m
>
> Master
> /pkg/java/jdk1.7.0_11/bin/java -cp
> :/spark-0.8.0-incubating-bin-cdh4/conf:/spark-0.8.0-incubating-bin-cdh4/assembly/target/scala-2.9.3/spark-assembly-0.8.0-incubating-hadoop1.2.1.jar
> -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps
> -Dspark.boundedMemoryCache.memoryFraction=0.4
> -Dspark.cache.class=spark.DiskSpillingCache -XX:+UseConcMarkSweepGC
> -Djava.library.path= -Xms512m -Xmx512m
> org.apache.spark.deploy.master.Master --ip localhost --port 7077
> --webui-port 8080
>
> Worker
> /pkg/java/jdk1.7.0_11/bin/java -cp
> :/spark-0.8.0-incubating-bin-cdh4/conf:/spark-0.8.0-incubating-bin-cdh4/assembly/target/scala-2.9.3/spark-assembly-0.8.0-incubating-hadoop1.2.1.jar
> -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps
> -Dspark.boundedMemoryCache.memoryFraction=0.4
> -Dspark.cache.class=spark.DiskSpillingCache -XX:+UseConcMarkSweepGC
> -Djava.library.path= -Xms512m -Xmx512m
> org.apache.spark.deploy.worker.Worker spark://localhost:7077
>
>
> App
> /pkg/java/jdk1.7.0_11/bin/java -cp
> :/spark-0.8.0-incubating-bin-cdh4/conf:/spark-0.8.0-incubating-bin-cdh4/assembly/target/scala-2.9.3/spark-assembly-0.8.0-incubating-hadoop1.2.1.jar:/spark-0.8.0-incubating-bin-cdh4/core/target/scala-2.9.3/test-classes:/spark-0.8.0-incubating-bin-cdh4/repl/target/scala-2.9.3/test-classes:/spark-0.8.0-incubating-bin-cdh4/mllib/target/scala-2.9.3/test-classes:/spark-0.8.0-incubating-bin-cdh4/bagel/target/scala-2.9.3/test-classes:/spark-0.8.0-incubating-bin-cdh4/streaming/target/scala-2.9.3/test-classes
> -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps
> -Dspark.boundedMemoryCache.memoryFraction=0.4
> -Dspark.cache.class=spark.DiskSpillingCache -XX:+UseConcMarkSweepGC
> -Xms512M -Xmx512M org.apache.spark.executor.StandaloneExecutorBackend
> akka://spark@localhost:33024/user/StandaloneScheduler 1 localhost 4
>
>
> Here is the code
> import org.apache.spark.SparkContext
> import org.apache.spark.SparkContext._
> import org.apache.spark.storage.StorageLevel
>
> object SimpleApp {
>
>   def main (args: Array[String]) {
>
>
> System.setProperty("spark.local.dir","/spark-0.8.0-incubating-bin-cdh4/tmp");
>   System.setProperty("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer")
>   System.setProperty("spark.akka.timeout", "30")  //in seconds
>
>   val dataFile2 = "/tmp_data/data1.txt"
>   val dataFile1 = "/tmp_data/data2.txt"
>   val sc = new SparkContext("spark://localhost:7077", "Simple App",
> "/spark-0.8.0-incubating-bin-cdh4",
>   List("target/scala-2.9.3/simple-project_2.9.3-1.0.jar"))
>
>   val data10 = sc.textFile(dataFile1, 128)
>   val data11 = data10.map(x => x.split("|"))
>   val data12 = data11.map( x  =>  (x(1).toInt -> x) )
>
>
>   val data20 = sc.textFile(dataFile2, 128)
>   val data21 = data20.map(x => x.split("|"))
>   val data22 = data21.map(x => (x(1).toInt -> x))
>
>
>   val data3 = data12.join(data22, 128)
>   val data4 = data3.distinct(4)
>   val numAs = data10.count()
>   val numBs = data20.count()
>   val numCs = data3.count()
>   val numDs = data4.count()
>   println("Lines in 1: %s, Lines in 2: %s Lines in 3: %s Lines in 4:
> %s".format(numAs, numBs, numCs, numDs))
>   data4.foreach(println)
> }
>
> I see the following errors
> 13/12/04 10:53:55 WARN storage.BlockManagerMaster: Error sending message
> to BlockManagerMaster in 1 attempts
> java.util.concurrent.TimeoutException: Futures timed out after [1]
> milliseconds
> at akka.dispatch.DefaultPromise.ready(Future.scala:870)
> at akka.dispatch.DefaultPromise.result(Future.scala:874)
> at akka.dispatch.Await$.result(Future.scala:74)
>
> and
> 13/12/04 10:53:55 ERROR executor.Executor: Exception in task ID 517
> java.lang.OutOfMemoryError: Java heap space
> at com.esotericsoftware.kryo.io.Input.readString(Input.java:448)
> at
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$StringArraySerializer.read(DefaultArraySerializers.java:282)
> at
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$StringArraySerializer.read(DefaultArraySerializers.java:262)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject

Re: more complex analytics

2014-02-11 Thread purav aggarwal
sc.parallelize(inputList)
.map(x => ((x._1,x._3),x._2))
.reduceByKey(_+_)

You need to understand what's happening when you say .map(x=>(x,1))
"For every x (which is a tuple of 3 fields in your case) - you map it to a
pair with key = x and value = 1"
In .map(x => ((x._1,x._3),x._2)) - you set the key as your first and third
field and value as your second field.


On Tue, Feb 11, 2014 at 8:37 PM, Adrian Mocanu wrote:

>  Hi
>
> Are there any examples on how to do any other operation apart from
> counting in spark via map then reduceByKey.
>
> It's pretty straight forward to do counts but how do I add in my own
> function (say conditional sum based on tuple fields or moving average)?
>
>
>
> Here's my count example so we have some code to work with
>
>
>
> val inputList= List(
> ("name","1","11134"),("name","2","11134"),("name","1","11130"),
> ("name2","1","11133") )
>
> sc.parallelize( inputList )
>
> .map(x => (x,1) )
>
> .reduceByKey(sumTuples)
>
> .foreach(x=>println(x))
>
>
>
> How would I add up field 2 from tuples which have fields "name" and the
> last field the same.
>
> In my example the result I want is:
>
> "name","1+2","11134"
>
> "name","1","11130"
>
> "name2","1","11133"
>
>
>
> Thanks
>
> -A
>


Re: Connecting an Application to the Cluster

2014-02-17 Thread purav aggarwal
Your local machine simply submits your job (in the form of jar) to the
cluster.
The master node is where the SparkContext object is created, a DAG of your
job is formed and tasks (stages) are assigned to different workers - which
are not aware of anything but computation of task being assigned.


On Mon, Feb 17, 2014 at 10:07 PM, David Thomas  wrote:

> Where is the SparkContext object created then? On my local machine or on
> the master node in the cluster?
>
>
> On Mon, Feb 17, 2014 at 4:17 AM, Nhan Vu Lam Chi wrote:
>
>> Your local app will be called "driver program", which creates jobs and
>> submits them to the cluster for running.
>>
>>
>> On Mon, Feb 17, 2014 at 9:19 AM, David Thomas wrote:
>>
>>> From 
>>> docs
>>> :
>>>
>>>
>>> *Connecting an Application to the ClusterTo run an application on the
>>> Spark cluster, simply pass the spark://IP:PORT URL of the master as to the
>>> SparkContext constructor.*
>>>
>>> Could someone enlighten me on what happens if I run the app, from say,
>>> Eclipse on my local machine, but use the url of the master node which is on
>>> cloud. What role does my local JVM play then?
>>>
>>
>>
>


Re: Connecting an Application to the Cluster

2014-02-17 Thread purav aggarwal
The data would get aggregated on the master node.
Since the JVM for the application is invoked from your local machine (spark
driver) I think you might be able to print it on your console.


On Mon, Feb 17, 2014 at 10:24 PM, David Thomas  wrote:

> So if I do a spark action, say, collect, will I be able to see the result
> on my local console? Or would it be only available only on the cluster
> master?
>
>
> On Mon, Feb 17, 2014 at 9:50 AM, purav aggarwal <
> puravaggarwal...@gmail.com> wrote:
>
>> Your local machine simply submits your job (in the form of jar) to the
>> cluster.
>> The master node is where the SparkContext object is created, a DAG of
>> your job is formed and tasks (stages) are assigned to different workers -
>> which are not aware of anything but computation of task being assigned.
>>
>>
>> On Mon, Feb 17, 2014 at 10:07 PM, David Thomas wrote:
>>
>>> Where is the SparkContext object created then? On my local machine or on
>>> the master node in the cluster?
>>>
>>>
>>> On Mon, Feb 17, 2014 at 4:17 AM, Nhan Vu Lam Chi wrote:
>>>
>>>> Your local app will be called "driver program", which creates jobs and
>>>> submits them to the cluster for running.
>>>>
>>>>
>>>> On Mon, Feb 17, 2014 at 9:19 AM, David Thomas wrote:
>>>>
>>>>> From 
>>>>> docs<https://spark.incubator.apache.org/docs/latest/spark-standalone.html>
>>>>> :
>>>>>
>>>>>
>>>>> *Connecting an Application to the ClusterTo run an application on the
>>>>> Spark cluster, simply pass the spark://IP:PORT URL of the master as to the
>>>>> SparkContext constructor.*
>>>>>
>>>>> Could someone enlighten me on what happens if I run the app, from say,
>>>>> Eclipse on my local machine, but use the url of the master node which is 
>>>>> on
>>>>> cloud. What role does my local JVM play then?
>>>>>
>>>>
>>>>
>>>
>>
>


Re: Connecting an Application to the Cluster

2014-02-17 Thread purav aggarwal
Sorry for the incorrect information. Where can I pick up these
architectural/design concepts for Spark?
I seem to have misunderstood the responsibilities of the master and the
driver.


On Mon, Feb 17, 2014 at 10:51 PM, Michael (Bach) Bui wrote:

> Spark has the concept of  Driver and Master
>
> Driver is your the spark program that you run in your local machine.
> SparkContext resides in the driver together with the DAG scheduler.
> Master is responsible for managing cluster resources, e.g. giving the
> Driver the workers that it needed. The Master can be either Mesos master
> (for Mesos cluster), or Spark master (for Spark standalone cluster), or
> ResourceManager (for Hadoop cluster)
> Given the resources assigned by Master, Driver will user DAG to assign
> tasks to workers.
>
> So yes, the result of spark's actions will be sent back to driver, which
> is your local console.
>
>
> On Feb 17, 2014, at 10:54 AM, David Thomas  wrote:
>
> So if I do a spark action, say, collect, will I be able to see the result
> on my local console? Or would it be only available only on the cluster
> master?
>
>
> On Mon, Feb 17, 2014 at 9:50 AM, purav aggarwal <
> puravaggarwal...@gmail.com> wrote:
>
>> Your local machine simply submits your job (in the form of jar) to the
>> cluster.
>> The master node is where the SparkContext object is created, a DAG of
>> your job is formed and tasks (stages) are assigned to different workers -
>> which are not aware of anything but computation of task being assigned.
>>
>>
>> On Mon, Feb 17, 2014 at 10:07 PM, David Thomas wrote:
>>
>>> Where is the SparkContext object created then? On my local machine or on
>>> the master node in the cluster?
>>>
>>>
>>> On Mon, Feb 17, 2014 at 4:17 AM, Nhan Vu Lam Chi wrote:
>>>
>>>> Your local app will be called "driver program", which creates jobs and
>>>> submits them to the cluster for running.
>>>>
>>>>
>>>> On Mon, Feb 17, 2014 at 9:19 AM, David Thomas wrote:
>>>>
>>>>> From 
>>>>> docs<https://spark.incubator.apache.org/docs/latest/spark-standalone.html>
>>>>> :
>>>>>
>>>>>
>>>>> *Connecting an Application to the ClusterTo run an application on the
>>>>> Spark cluster, simply pass the spark://IP:PORT URL of the master as to the
>>>>> SparkContext constructor.*
>>>>>
>>>>> Could someone enlighten me on what happens if I run the app, from say,
>>>>> Eclipse on my local machine, but use the url of the master node which is 
>>>>> on
>>>>> cloud. What role does my local JVM play then?
>>>>>
>>>>
>>>>
>>>
>>
>
>