Re: How to set environment variable for a spark job

2014-03-25 Thread Sourav Chandra
Did you try to access the variables in worker using System.getenv(...) and
it failed?


On Wed, Mar 26, 2014 at 11:42 AM, santhoma wrote:

> I tried it, it did not work
>
>  conf.setExecutorEnv("ORACLE_HOME", orahome)
>  conf.setExecutorEnv("LD_LIBRARY_PATH", ldpath)
>
> Any idea how to set it using java.library.path ?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-set-environment-variable-for-a-spark-job-tp3180p3241.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>



-- 

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

sourav.chan...@livestream.com

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com


RDD Collect returns empty arrays

2014-03-25 Thread gaganbm
I am getting strange behavior with the RDDs.

All I want is to persist the RDD contents in a single file. 

The saveAsTextFile() saves them in multiple textfiles for each partition. So
I tried with rdd.coalesce(1,true).saveAsTextFile(). This fails with the
exception :

org.apache.spark.SparkException: Job aborted: Task 75.0:0 failed 1 times
(most recent failure: Exception failure: java.lang.IllegalStateException:
unread block data) 

Then I tried collecting the RDD contents in an array, and writing the array
to the file manually. Again, that fails. It is giving me empty arrays, even
when data is there.

/**The below saves the data in multiple text files. So data is there for
sure **/
rdd.saveAsTextFile(resultDirectory)
/**The below simply prints size 0 for all the RDDs in a stream. Why ?! **/
val arr = rdd.collect
println("SIZE of RDD " + rdd.id + " " + arr.size)

Kindly help! I am clueless on how to proceed.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/RDD-Collect-returns-empty-arrays-tp3242.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: How to set environment variable for a spark job

2014-03-25 Thread santhoma
I tried it, it did not work

 conf.setExecutorEnv("ORACLE_HOME", orahome)
 conf.setExecutorEnv("LD_LIBRARY_PATH", ldpath)

Any idea how to set it using java.library.path ?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-set-environment-variable-for-a-spark-job-tp3180p3241.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


ALS memory limits

2014-03-25 Thread Debasish Das
Hi,

For our usecases we are looking into 20 x 1M matrices which comes in the
similar ranges as outlined by the paper over here:

http://sandeeptata.blogspot.com/2012/12/sparkler-large-scale-matrix.html

Is the exponential runtime growth in spark ALS as outlined by the blog
still exists in recommendation.ALS ?

I am running a spark cluster of 10 nodes with total memory of around 1 TB
with 80 cores

With rank = 50, the memory requirements for ALS should be 20Mx50 doubles on
every worker which is around 8 GB

Even if both the factor matrices are cached in memory I should be bounded
by ~ 9 GB but even with 32 GB per worker I see GC errors...

I am debugging the scalability and memory requirements of the algorithm
further but any insights will be very helpful...

Also there are two other issues:

1. If GC errors are hit, that worker JVM goes down and I have to restart it
manually. Is this expected ?

2. When I try to make use of all 80 cores on the cluster I get some issues
related to java.io.File not found exception on /tmp/ ? Is there some OS
limit that how many cores can simultaneously access /tmp from a process ?

Thanks.
Deb

On Sun, Mar 16, 2014 at 2:20 PM, Sean Owen  wrote:

> Good point -- there's been another optimization for ALS in HEAD (
> https://github.com/apache/spark/pull/131), but yes the better place to
> pick up just essential changes since 0.9.0 including the previous one is
> the 0.9 branch.
>
> --
> Sean Owen | Director, Data Science | London
>
>
> On Sun, Mar 16, 2014 at 2:18 PM, Patrick Wendell wrote:
>
>> Sean - was this merged into the 0.9 branch as well (it seems so based
>> on the message from rxin). If so it might make sense to try out the
>> head of branch-0.9 as well. Unless there are *also* other changes
>> relevant to this in master.
>>
>> - Patrick
>>
>> On Sun, Mar 16, 2014 at 12:24 PM, Sean Owen  wrote:
>> > You should simply use a snapshot built from HEAD of
>> github.com/apache/spark
>> > if you can. The key change is in MLlib and with any luck you can just
>> > replace that bit. See the PR I referenced.
>> >
>> > Sure with enough memory you can get it to run even with the memory
>> issue,
>> > but it could be hundreds of GB at your scale. Not sure I take the point
>> > about the JVM; you can give it 64GB of heap and executors can use that
>> much,
>> > sure.
>> >
>> > You could reduce the number of features a lot to work around it too, or
>> > reduce the input size. (If anyone saw my blog post about StackOverflow
>> and
>> > ALS -- that's why I snuck in a relatively paltry 40 features and pruned
>> > questions with <4 tags :) )
>> >
>> > I don't think jblas has anything to do with it per se, and the
>> allocation
>> > fails in Java code, not native code. This should be exactly what that
>> PR I
>> > mentioned fixes.
>> >
>> > --
>> > Sean Owen | Director, Data Science | London
>> >
>> >
>> > On Sun, Mar 16, 2014 at 11:48 AM, Debasish Das <
>> debasish.da...@gmail.com>
>> > wrote:
>> >>
>> >> Thanks Sean...let me get the latest code..do you know which PR was it ?
>> >>
>> >> But will the executors run fine with say 32 gb or 64 gb of memory ?
>> Does
>> >> not JVM shows up issues when the max memory goes beyond certain
>> limit...
>> >>
>> >> Also the failure is due to GC limits from jblas...and I was thinking
>> that
>> >> jblas is going to call native malloc right ? May be 64 gb is not a big
>> deal
>> >> then...I will try increasing to 32 and then 64...
>> >>
>> >> java.lang.OutOfMemoryError (java.lang.OutOfMemoryError: GC overhead
>> limit
>> >> exceeded)
>> >>
>> >>
>> org.jblas.DoubleMatrix.(DoubleMatrix.java:323)org.jblas.DoubleMatrix.zeros(DoubleMatrix.java:471)org.jblas.DoubleMatrix.zeros(DoubleMatrix.java:476)com.verizon.bigdata.mllib.recommendation.ALSQR$$anonfun$17.apply(ALSQR.scala:366)com.verizon.bigdata.mllib.recommendation.ALSQR$$anonfun$17.apply(ALSQR.scala:366)scala.Array$.fill(Array.scala:267)com.verizon.bigdata.mllib.recommendation.ALSQR.updateBlock(ALSQR.scala:366)com.verizon.bigdata.mllib.recommendation.ALSQR$$anonfun$com$verizon$bigdata$mllib$recommendation$ALSQR$$updateFeatures$2.apply(ALSQR.scala:346)com.verizon.bigdata.mllib.recommendation.ALSQR$$anonfun$com$verizon$bigdata$mllib$recommendation$ALSQR$$updateFeatures$2.apply(ALSQR.scala:345)org.apache.spark.rdd.MappedValuesRDD$$anonfun$compute$1.apply(MappedValuesRDD.scala:32)org.apache.spark.rdd.MappedValuesRDD$$anonfun$compute$1.apply(MappedValuesRDD.scala:32)scala.collection.Iterator$$anon$11.next(Iterator.scala:328)org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$4.apply(CoGroupedRDD.scala:149)org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$4.apply(CoGroupedRDD.scala:147)scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:147)org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:242)org.apache.spark.rdd.RDD.iterator(RDD.scala:23

Re: Building Spark 0.9.x for CDH5 with mrv1 installation (Protobuf 2.5 upgrade)

2014-03-25 Thread Patrick Wendell
I'm not sure exactly how your cluster is configured. But as far as I can
tell Cloudera's MR1 CDH5 dependencies are against Hadoop 2.3. I'd just find
the exact CDH version you have and link against the `mr1` version of their
published dependencies in that version.

So I think you wan't "2.3.0-mr1-cdh5.0.0"
https://repository.cloudera.com/artifactory/cloudera-repos/org/apache/hadoop/hadoop-client/2.3.0-mr1-cdh5.0.0/

The full list of Cloudera versions is here:
https://repository.cloudera.com/artifactory/cloudera-repos/org/apache/hadoop/hadoop-client/



On Tue, Mar 25, 2014 at 6:42 PM, Gary Malouf  wrote:

> Today, our cluster setup is as follows:
>
> Mesos 0.15,
> CDH 4.2.1-MRV1,
> Spark 0.9-pre-scala-2.10 off master build targeted at appropriate CDH4
> version
>
>
> We are looking to upgrade all of these in order to get protobuf 2.5
> working properly.  The question is, which 'Hadoop version build' of Spark
> 0.9 is compatible with the HDFS from Hadoop 2.2 and Cloudera's CDH5 MRV1
> installation?  Is there one?
>


Re: rdd.saveAsTextFile problem

2014-03-25 Thread gaganbm
Hi Folks,

Is this issue resolved ? If yes, could you please throw some light on how to
fix this ?

I am facing the same problem during writing to text files.

When I do 

stream.foreachRDD(rdd =>{
rdd.saveAsTextFile(<"Some path">)
})

This works fine for me. But it creates multiple text files for each
partition within an RDD.

So I tried with coalesce option to merge my results in a single file for
each RDD as :

stream.foreachRDD(rdd =>{
rdd.coalesce(1, true).saveAsTextFile(<"Some 
path">)
})

This fails with :
org.apache.spark.SparkException: Job aborted: Task 75.0:0 failed 1 times
(most recent failure: Exception failure: java.lang.IllegalStateException:
unread block data)

I am using Spark Streaming 0.9.0

Any clue what's going wrong when using coalesce ?





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/rdd-saveAsTextFile-problem-tp176p3238.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Spark executor memory & relationship with worker threads

2014-03-25 Thread Sai Prasanna
Hi All,
Does number of worker threads bear any relationship to setting executor
memory ?.
I have a 16 GB RAM, with an 8-core processor. I had set SPARK_MEM to 12g
and was running locally with default 1 thread.
So this means there can be maximum one executor in one node scheduled at
any point of time. If I increase the number of worker threads to say 4, do
I need to reduce SPARK_MEM to 3g or I need not ???

Is there any performance difference between running in interactive spark
shell to non-interactive standalone spark apart from building time(sbt…)
???

THANKS !!


any distributed cache mechanism available in spark ?

2014-03-25 Thread santhoma
I have been writing map-reduce on hadoop using PIG , and is now trying to
migrate to SPARK.

My cluster consists of multiple nodes, and the jobs depend on a native
library (.so files).
In hadoop and PIG , I could distribute the files across nodes using 
"-files" or "-archive" option, but I could not find any similar mechanism
for SPARK.

Can some one please explain what are the best ways to distribute dependent
files across nodes? 
I have see an SparkContext.addFile() , but looks like this will copy big
files everytime per job.
Moreover, I am not sure if addFile() can automatically unzip archive files.

thanks in advance.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/any-distributed-cache-mechanism-available-in-spark-tp3236.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: [BLOG] : Shark on Cassandra

2014-03-25 Thread Matei Zaharia
Very cool, thanks for posting this!

Matei

On Mar 25, 2014, at 6:18 PM, Brian O'Neill  wrote:

> As promised, here is that followup post for those looking to get started with 
> Shark against Cassandra:
> http://brianoneill.blogspot.com/2014/03/shark-on-cassandra-w-cash-interrogating.html
> 
> Again -- thanks to Rohit and the team at TupleJump.  Great work.
> 
> -brian
> 
> -- 
> Brian ONeill
> CTO, Health Market Science (http://healthmarketscience.com)
> mobile:215.588.6024
> blog: http://brianoneill.blogspot.com/
> twitter: @boneill42



Re: Writing RDDs to HDFS

2014-03-25 Thread Ognen Duzlevski
Well, my long running app has 512M per executor on a 16 node cluster 
where each machine has 16G of RAM. I could not run a second application 
until I restricted the spark.cores.max. As soon as I restricted the 
cores, I am able to run a second job at the same time.


Ognen

On 3/24/14, 7:46 PM, Yana Kadiyska wrote:

Ognen, can you comment if you were actually able to run two jobs
concurrently with just restricting spark.cores.max? I run Shark on the
same cluster and was not able to see a standalone job get in (since
Shark is a "long running" job) until I restricted both spark.cores.max
_and_ spark.executor.memory. Just curious if I did something wrong.

On Mon, Mar 24, 2014 at 7:48 PM, Ognen Duzlevski
 wrote:

Just so I can close this thread (in case anyone else runs into this stuff) -
I did sleep through the basics of Spark ;). The answer on why my job is in
waiting state (hanging) is here:
http://spark.incubator.apache.org/docs/latest/spark-standalone.html#resource-scheduling


Ognen

On 3/24/14, 5:01 PM, Diana Carroll wrote:

Ongen:

I don't know why your process is hanging, sorry.  But I do know that the way
saveAsTextFile works is that you give it a path to a directory, not a file.
The "file" is saved in multiple parts, corresponding to the partitions.
(part-0, part-1 etc.)

(Presumably it does this because it allows each partition to be saved on the
local disk, to minimize network traffic.  It's how Hadoop works, too.)




On Mon, Mar 24, 2014 at 5:00 PM, Ognen Duzlevski 
wrote:

Is someRDD.saveAsTextFile("hdfs://ip:port/path/final_filename.txt")
supposed to work? Meaning, can I save files to the HDFS fs this way?

I tried:

val r = sc.parallelize(List(1,2,3,4,5,6,7,8))
r.saveAsTextFile("hdfs://ip:port/path/file.txt")

and it is just hanging. At the same time on my HDFS it created file.txt
but as a directory which has subdirectories (the final one is empty).

Thanks!
Ognen


Re: tracking resource usage for spark-shell commands

2014-03-25 Thread Mayur Rustagi
Time taken is shown in Shark shell web ui (hosted on 4040 port).
Also memory used is shown in terms of Storage of RDD, how much shuffle data
was written & read during the process is also highlighted thr.

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi 



On Tue, Mar 25, 2014 at 6:04 AM, Bharath Bhushan wrote:

> Is there a way to see the resource usage of each spark-shell command — say
> time taken and memory used?
> I checked the WebUI of spark-shell and of the master and I don’t see any
> such breakdown. I see the time taken in the INFO logs but nothing about
> memory usage. It would also be nice to track the time taken in the
> spark-shell web UI.
>
> —
> Thanks


Re: [bug?] streaming window unexpected behaviour

2014-03-25 Thread Tathagata Das
You can probably do it in a simpler but sort of hacky way!

If your window size is W and sliding interval S, you can do some math to
figure out how many of the first windows are actually partial windows. Its
probably math.ceil(W/S) . So in a windowDStream.foreachRDD() you can
increment a global counter to count how many RDDs have been generated and
ignore the first few RDDs.

windowDStream.foreachRDD(rdd => {
Global.counter += 1
if (Global.counter < math.ceil(W/S)) {
  return  // ignore
} else {
 // do something awesome
}
})


On Tue, Mar 25, 2014 at 7:29 AM, Adrian Mocanu wrote:

> Let me rephrase that,
> Do you think it is possible to use an accumulator to skip the first few
> incomplete RDDs?
>
> -Original Message-
> From: Adrian Mocanu [mailto:amoc...@verticalscope.com]
> Sent: March-25-14 9:57 AM
> To: user@spark.apache.org
> Cc: u...@spark.incubator.apache.org
> Subject: RE: [bug?] streaming window unexpected behaviour
>
> Thanks TD!
> Is it possible to perhaps add another window method that doesn't not
> generate partial windows? Or, Is it possible to remove the first few
> partial windows? I'm thinking of using an accumulator to count how many
> windows there are.
>
> -A
>
> -Original Message-
> From: Tathagata Das [mailto:tathagata.das1...@gmail.com]
> Sent: March-24-14 6:55 PM
> To: user@spark.apache.org
> Cc: u...@spark.incubator.apache.org
> Subject: Re: [bug?] streaming window unexpected behaviour
>
> Yes, I believe that is current behavior. Essentially, the first few RDDs
> will be partial windows (assuming window duration > sliding interval).
>
> TD
>
>
> On Mon, Mar 24, 2014 at 1:12 PM, Adrian Mocanu 
> wrote:
> > I have what I would call unexpected behaviour when using window on a
> stream.
> >
> > I have 2 windowed streams with a 5s batch interval. One window stream
> > is (5s,5s)=smallWindow and the other (10s,5s)=bigWindow
> >
> > What I've noticed is that the 1st RDD produced by bigWindow is
> > incorrect and is of the size 5s not 10s. So instead of waiting 10s and
> > producing 1 RDD with size 10s, Spark produced the 1st 10s RDD of size 5s.
> >
> > Why is this happening? To me it looks like a bug; Matei or TD can you
> > verify that this is correct behaviour?
> >
> >
> >
> >
> >
> > I have the following code
> >
> > val ssc = new StreamingContext(conf, Seconds(5))
> >
> >
> >
> > val smallWindowStream = ssc.queueStream(smallWindowRddQueue)
> >
> > val bigWindowStream = ssc.queueStream(bigWindowRddQueue)
> >
> >
> >
> > val smallWindow = smallWindowReshapedStream.window(Seconds(5),
> > Seconds(5))
> >
> >   .reduceByKey((t1, t2) => (t1._1, t1._2, t1._3 + t2._3))
> >
> > val bigWindow = bigWindowReshapedStream.window(Seconds(10),
> > Seconds(5))
> >
> > .reduceByKey((t1, t2) => (t1._1, t1._2, t1._3 + t2._3))
> >
> >
> >
> > -Adrian
> >
> >
>


Re: Implementation problem with Streaming

2014-03-25 Thread Mayur Rustagi
2 good benefits of Streaming
1. maintains windows as you move across time, removing & adding monads as
you move through the window
2. Connect with streaming systems like kafka to import data as it comes &
process it

You dont seem to need any of these features, you would be better off using
Spark with crontab maybe :), serializing your object in HDFS if its huge,
or maintaining it in memory.

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi 



On Tue, Mar 25, 2014 at 2:04 PM, Sanjay Awatramani wrote:

> Hi,
>
> I had initially thought of a streaming approach to solve my problem, and I
> am stuck at few places and want opinion if this problem is suitable for
> streaming, or is it better to stick to basic spark.
>
> Problem: I get chunks of log files in a folder and need to do some
> analysis on them on an hourly interval, eg. 11.00 to 11.59. The file chunks
> may or may not come in real time and there can be breaks between subsequent
> chunks.
>
> pseudocode:
> While{
>   CheckForFile(localFolder)
>   CopyToHDFS()
>   RDDfile=read(fileFromHDFS)
>   RDDHour=RDDHour.union.RDDfile.filter(keyHour=currentHr)
>   if(RDDHour.keys().contains(currentHr+1) //next Hr has come, so current
> Hr should be complete
>   {
>   RDDHour.process()
>   deleteFileFromHDFS()
>   RDDHour.empty()
>   currentHr++
>   }
> }
>
> If I use streaming, I face the following problems:
> 1) Inability to keep a Java Variable (currentHr) in the driver which can
> be used across batches.
> 2) The input files may come with a break, for eg. 10.00 - 10.30 comes,
> then a break for 4 hours. If I use streaming, then I can't process the
> 10.00 - 10.30 batch as its incomplete, and the 1 hour DStream window for
> the 10.30 - 11.00 file will have previous RDD as empty as nothing was
> received in the preceding 4 hours. Basically Streaming takes file time as
> input and not the time inside the file content.
> 3) no control on deleting file from HDFS as the program runs in a
> SparkStreamingContext loop
>
> Any ideas on overcoming the above limitations or whether streaming is
> suitable for such kind of problem or not, will be helpful.
>
> Regards,
> Sanjay
>


Building Spark 0.9.x for CDH5 with mrv1 installation (Protobuf 2.5 upgrade)

2014-03-25 Thread Gary Malouf
Today, our cluster setup is as follows:

Mesos 0.15,
CDH 4.2.1-MRV1,
Spark 0.9-pre-scala-2.10 off master build targeted at appropriate CDH4
version


We are looking to upgrade all of these in order to get protobuf 2.5 working
properly.  The question is, which 'Hadoop version build' of Spark 0.9 is
compatible with the HDFS from Hadoop 2.2 and Cloudera's CDH5 MRV1
installation?  Is there one?


Re: Spark Streaming ZeroMQ Java Example

2014-03-25 Thread Tathagata Das
Unfortunately there isnt one right now. But it is probably too hard to
start with the 
JavaNetworkWordCount,
and use the ZeroMQUtils in the same way as the Scala ZeroMQWordCount
example. Basically you have to change this
line
and
create a zeromq stream rather than a socketStream. Refer to
ZeroMQUtils.createStream
docs for more details.

Would be great if you can contribute it back ;)

TD




On Mon, Mar 24, 2014 at 11:28 PM, goofy real wrote:

> Is there a ZeroMQWordCount Java sample code?
>
>
> https://github.com/apache/incubator-spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala
>


[BLOG] : Shark on Cassandra

2014-03-25 Thread Brian O'Neill
As promised, here is that followup post for those looking to get started
with Shark against Cassandra:
http://brianoneill.blogspot.com/2014/03/shark-on-cassandra-w-cash-interrogating.html

Again -- thanks to Rohit and the team at TupleJump.  Great work.

-brian

-- 
Brian ONeill
CTO, Health Market Science (http://healthmarketscience.com)
mobile:215.588.6024
blog: http://brianoneill.blogspot.com/
twitter: @boneill42


[BLOG] Shark on Cassandra

2014-03-25 Thread Brian O'Neill
As promised, here is that follow-up post for those looking to get started
with Shark against Cassandra:

-- 
Brian ONeill
CTO, Health Market Science (http://healthmarketscience.com)
mobile:215.588.6024
blog: http://brianoneill.blogspot.com/
twitter: @boneill42


Re: Spark 0.9.0-incubation + Apache Hadoop 2.2.0 + YARN encounter Compression codec com.hadoop.compression.lzo.LzoCodec not found

2014-03-25 Thread alee526
You can try to add the following to your shell:


In bin/compute-classpath.sh, append the JAR lzo JAR from Mapreduce:

CLASSPATH=$CLASSPATH:$HADOOP_HOME/share/hadoop/mapreduce/lib/hadoop-lzo.jar
export JAVA_LIBRARY_PATH=$JAVA_LIBRARY_PATH:$HADOOP_HOME/lib/native/
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$HADOOP_HOME/lib/native/


In bin/spark-class, before the JAVA_OPTS:
Add the following:

SPARK_LIBRARY_PATH=$SPARK_LIBRARY_PATH:$HADOOP_HOME/lib/native/

This fixed my problem.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-0-9-0-incubation-Apache-Hadoop-2-2-0-YARN-encounter-Compression-codec-com-hadoop-compression-ld-tp2793p3226.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Error reading HDFS file using spark 0.9.0 / hadoop 2.2.0 - incompatible protobuf 2.5 and 2.4.1

2014-03-25 Thread giive chen
Hi

I am quite beginner in spark and I have similar issue last week. I don't
know if my issue is the same as yours. I found that my program's jar
contain protobuf and when I remove this dependency on my program's pom.xml,
rebuild my program and it works.

Here is how I solved my own issue.

Environment:

Spark 0.9, HDFS (Hadoop 2.3), Scala 2.10. My spark is hadoop 2 HDP2
prebuild version from http://spark.apache.org/downloads.html. I don't build
spark by my own.

Problem :

I use spark 0.9 example folder's word count program to connect my hdfs file
which is build on hadoop 2.3. The running command is "./bin/run-example
org.apache.spark.examples.WordCount"
It show "Caused by: java.lang.VerifyError". I survey a lot on web but
cannot get any workable solution.

How I Solve my issue

I found that if I use spark 0.9's spark-shell and it can connect hdfs file
without this problem. But if I use run-example command, it show
java.lang.VerifyError.
I think the main reason is these two command(spark-shell and run-example)'s
classpath is different.

Run-Example's classpath is $SPARK_HOME
/examples/target/scala-2.10/spark-examples_2.10-assembly-0.9.0-incubating.jar::$SPARK_HOME/conf:$SPARK_HOME/assembly/target/scala-2.10/spark-assembly_2.10-0.9.0-incubating-hadoop2.2.0.jar

Spark-Home's classpath
is 
:$SPARK_HOME/conf:$SPARK_HOME/assembly/target/scala-2.10/spark-assembly_2.10-0.9.0-incubating-hadoop2.2.0.jar

The class path difference is
$SPARK_HOME/examples/target/scala-2.10/spark-examples_2.10-assembly-0.9.0-incubating.jar
and it is build by exmaple program. When I look into  this jar file, I
found that it contain two protobuf which I don't know where it is from. I
remove all dependency from my example pom.xml and left only one
dependncy "spark-core".
I rebuild it and it success.

I don't know if my issue is the same as yours. I hope it can help.

Wisely Chen



On Wed, Mar 26, 2014 at 7:10 AM, Patrick Wendell  wrote:

> Starting with Spark 0.9 the protobuf dependency we use is shaded and
> cannot interfere with other protobuf libaries including those in
> Hadoop. Not sure what's going on in this case. Would someone who is
> having this problem post exactly how they are building spark?
>
> - Patrick
>
> On Fri, Mar 21, 2014 at 3:49 PM, Aureliano Buendia 
> wrote:
> >
> >
> >
> > On Tue, Mar 18, 2014 at 12:56 PM, Ognen Duzlevski
> >  wrote:
> >>
> >>
> >> On 3/18/14, 4:49 AM, dmpou...@gmail.com wrote:
> >>>
> >>> On Sunday, 2 March 2014 19:19:49 UTC+2, Aureliano Buendia  wrote:
> 
>  Is there a reason for spark using the older akka?
> 
> 
> 
> 
>  On Sun, Mar 2, 2014 at 1:53 PM, 1esha  wrote:
> 
>  The problem is in akka remote. It contains files compiled with 2.4.*.
>  When
> 
>  you run it with 2.5.* in classpath it fails like above.
> 
> 
> 
>  Looks like moving to akka 2.3 will solve this issue. Check this issue
> -
> 
> 
> 
> https://www.assembla.com/spaces/akka/tickets/3154-use-protobuf-version-2-5-0#/activity/ticket
> :
> 
> 
>  Is the solution to exclude the  2.4.*. dependency on protobuf or will
>  thi produce more complications?
> >>
> >> I am not sure I remember what the context was around this but I run
> 0.9.0
> >> with hadoop 2.2.0 just fine.
> >
> >
> > The problem is that spark depends on an older version of akka, which
> depends
> > on an older version of protobuf (2.4).
> >
> > This means people cannot use protobuf 2.5 with spark.
> >
> >>
> >> Ognen
> >
> >
>


RE: Using an external jar in the driver, in yarn-standalone mode.

2014-03-25 Thread Andrew Lee
Hi Julien,
The ADD_JAR doesn't work in the command line. I checked spark-class, and I 
couldn't find any Bash shell bringing in the variable ADD_JAR to the CLASSPATH.
Were you able to print out the properties and environment variables from the 
Web GUI?
localhost:4040
This should give you an idea what is included in the current Spark shell. The 
bin/spark-shell invokes bin/spark-class, and I don't see ADD_JAR in 
bin/spark-class as well.
Hi Sandy,
Does Spark automatically deploy the JAR for you on the DFS cache if Spark is 
running on cluster mode? I haven't got that far yet to deploy my own one-time 
JAR for testing. Just setup a local cluster for practice.

Date: Tue, 25 Mar 2014 23:13:58 +0100
Subject: Re: Using an external jar in the driver, in yarn-standalone mode.
From: julien.ca...@gmail.com
To: user@spark.apache.org

Thanks for your answer.
I am using bin/spark-class  org.apache.spark.deploy.yarn.Client --jar myjar.jar 
--class myclass ...

myclass in myjar.jar contains a main that initializes a SparkContext in 
yarn-standalone mode.

Then I am using some code that uses myotherjar.jar, but I do not execute it 
using the spark context or a RDD, so my understanding is that it is not excuted 
on yarn slaves, only on the yarn master. 

I found no way to make my code being able to find myotherjar.jar. CLASSPATH is 
set by Spark (or Yarn?) before being executed on the Yarn Master, it is not set 
by me. It seems that the idea is to set SPARK_CLASSPATH and/or ADD_JAR and then 
these jars becomes automatically available in the Yarn Master but it did not 
work for me. 

I tried also to use sc.addJar, it did not work either, but anyway it seems 
clear that this is used for dependancies in the code exectued on the slaves, 
not on the master. Tell me if I am wrong








2014-03-25 21:11 GMT+01:00 Nathan Kronenfeld :

by 'use ... my main program' I presume you mean you have a main function in a 
class file you want to use as your entry point.

SPARK_CLASSPATH, ADD_JAR, etc add your jars in on the master and the workers... 
but they don't on the client.
For that, you're just using ordinary, everyday java/scala - so it just has to 
be on the normal java classpath.
Could that be your issue?
  -Nathan




On Tue, Mar 25, 2014 at 2:18 PM, Sandy Ryza  wrote:


Hi Julien,
Have you called SparkContext#addJars?
-Sandy



On Tue, Mar 25, 2014 at 10:05 AM, Julien Carme  wrote:



Hello,



I have been struggling for ages to use an external jar in my spark driver 
program, in yarn-standalone mode. I just want to use in my main program, 
outside the calls to spark functions, objects that are defined in another jar.




I tried to set SPARK_CLASSPATH, ADD_JAR, I tried to use --addJar in the 
spark-class arguments, I always end up with a "Class not found exception" when 
I want to use classes defined in my jar.




Any ideas?




Thanks a lot,




-- 
Nathan Kronenfeld
Senior Visualization Developer
Oculus Info Inc
2 Berkeley Street, Suite 600,

Toronto, Ontario M5A 4J5
Phone:  +1-416-203-3003 x 238

Email:  nkronenf...@oculusinfo.com



  

RE: Spark 0.9.1 - How to run bin/spark-class with my own hadoop jar files?

2014-03-25 Thread Andrew Lee
Hi Paul,
I got it sorted out.
The problem is that the JARs are built into the assembly JARs when you run
sbt/sbt clean assembly
What I did is:sbt/sbt clean package
This will only give you the small JARs. The next steps is to update the 
CLASSPATH in the bin/compute-classpath.sh script manually, appending all the 
JARs.
With :
sbt/sbt assembly
We can't introduce our own Hadoop patch since it will always pull from Maven 
repo, unless we hijack the repository path, or do a 'mvn install' locally. This 
is more of a hack I think.


Date: Tue, 25 Mar 2014 15:23:08 -0700
Subject: Re: Spark 0.9.1 - How to run bin/spark-class with my own hadoop jar 
files?
From: paulmscho...@gmail.com
To: user@spark.apache.org

Andrew, 
I ran into the same problem and eventually settled on just running the jars 
directly with java. Since we use sbt to build our jars we had all the 
dependancies builtin to the jar it self so need for random class paths. 


On Tue, Mar 25, 2014 at 1:47 PM, Andrew Lee  wrote:




Hi All,
I'm getting the following error when I execute start-master.sh which also 
invokes spark-class at the end.








Failed to find Spark assembly in /root/spark/assembly/target/scala-2.10/

You need to build Spark with 'sbt/sbt assembly' before running this program.


After digging into the code, I see the CLASSPATH is hardcoded with 
"spark-assembly.*hadoop.*.jar".

In bin/spark-class :


if [ ! -f "$FWDIR/RELEASE" ]; then
  # Exit if the user hasn't compiled Spark
  num_jars=$(ls "$FWDIR"/assembly/target/scala-$SCALA_VERSION/ | grep 
"spark-assembly.*hadoop.*.jar" | wc -l)

  jars_list=$(ls "$FWDIR"/assembly/target/scala-$SCALA_VERSION/ | grep 
"spark-assembly.*hadoop.*.jar")
  if [ "$num_jars" -eq "0" ]; then

echo "Failed to find Spark assembly in 
$FWDIR/assembly/target/scala-$SCALA_VERSION/" >&2
echo "You need to build Spark with 'sbt/sbt assembly' before running this 
program." >&2

exit 1
  fi
  if [ "$num_jars" -gt "1" ]; then
echo "Found multiple Spark assembly jars in 
$FWDIR/assembly/target/scala-$SCALA_VERSION:" >&2

echo "$jars_list"
echo "Please remove all but one jar."
exit 1
  fi

























fi


Is there any reason why this is only grabbing spark-assembly.*hadoop.*.jar ? I 
am trying to run Spark that links to my own version of Hadoop under 
/opt/hadoop23/, 

and I use 'sbt/sbt clean package' to build the package without the Hadoop jar. 
What is the correct way to link to my own Hadoop jar?





  


  

Re: Error reading HDFS file using spark 0.9.0 / hadoop 2.2.0 - incompatible protobuf 2.5 and 2.4.1

2014-03-25 Thread Patrick Wendell
Starting with Spark 0.9 the protobuf dependency we use is shaded and
cannot interfere with other protobuf libaries including those in
Hadoop. Not sure what's going on in this case. Would someone who is
having this problem post exactly how they are building spark?

- Patrick

On Fri, Mar 21, 2014 at 3:49 PM, Aureliano Buendia  wrote:
>
>
>
> On Tue, Mar 18, 2014 at 12:56 PM, Ognen Duzlevski
>  wrote:
>>
>>
>> On 3/18/14, 4:49 AM, dmpou...@gmail.com wrote:
>>>
>>> On Sunday, 2 March 2014 19:19:49 UTC+2, Aureliano Buendia  wrote:

 Is there a reason for spark using the older akka?




 On Sun, Mar 2, 2014 at 1:53 PM, 1esha  wrote:

 The problem is in akka remote. It contains files compiled with 2.4.*.
 When

 you run it with 2.5.* in classpath it fails like above.



 Looks like moving to akka 2.3 will solve this issue. Check this issue -


 https://www.assembla.com/spaces/akka/tickets/3154-use-protobuf-version-2-5-0#/activity/ticket:


 Is the solution to exclude the  2.4.*. dependency on protobuf or will
 thi produce more complications?
>>
>> I am not sure I remember what the context was around this but I run 0.9.0
>> with hadoop 2.2.0 just fine.
>
>
> The problem is that spark depends on an older version of akka, which depends
> on an older version of protobuf (2.4).
>
> This means people cannot use protobuf 2.5 with spark.
>
>>
>> Ognen
>
>


Re: Spark 0.9.1 - How to run bin/spark-class with my own hadoop jar files?

2014-03-25 Thread Paul Schooss
Andrew,

I ran into the same problem and eventually settled on just running the jars
directly with java. Since we use sbt to build our jars we had all the
dependancies builtin to the jar it self so need for random class paths.


On Tue, Mar 25, 2014 at 1:47 PM, Andrew Lee  wrote:

> Hi All,
>
> I'm getting the following error when I execute start-master.sh which also
> invokes spark-class at the end.
>
> Failed to find Spark assembly in /root/spark/assembly/target/scala-2.10/
>
> You need to build Spark with 'sbt/sbt assembly' before running this
> program.
>
>
> After digging into the code, I see the CLASSPATH is hardcoded with "
> spark-assembly.*hadoop.*.jar".
>
> In bin/spark-class :
>
>
> if [ ! -f "$FWDIR/RELEASE" ]; then
>
>   # Exit if the user hasn't compiled Spark
>
> *  num_jars=$(ls "$FWDIR"/assembly/target/scala-$SCALA_VERSION/ | grep
> "spark-assembly.*hadoop.*.jar" | wc -l)*
>
> *  jars_list=$(ls "$FWDIR"/assembly/target/scala-$SCALA_VERSION/ | grep
> "spark-assembly.*hadoop.*.jar")*
>
>   if [ "$num_jars" -eq "0" ]; then
>
> echo "Failed to find Spark assembly in
> $FWDIR/assembly/target/scala-$SCALA_VERSION/" >&2
>
> echo "You need to build Spark with 'sbt/sbt assembly' before running
> this program." >&2
>
> exit 1
>
>   fi
>
>   if [ "$num_jars" -gt "1" ]; then
>
> echo "Found multiple Spark assembly jars in
> $FWDIR/assembly/target/scala-$SCALA_VERSION:" >&2
>
> echo "$jars_list"
>
> echo "Please remove all but one jar."
>
> exit 1
>
>   fi
>
> fi
>
>
> Is there any reason why this is only grabbing spark-assembly.**hadoop*.*.jar
> ? I am trying to run Spark that links to my own version of Hadoop under
> /opt/hadoop23/,
>
> and I use 'sbt/sbt clean package' to build the package without the Hadoop
> jar. What is the correct way to link to my own Hadoop jar?
>
>
>
>


Re: Using an external jar in the driver, in yarn-standalone mode.

2014-03-25 Thread Julien Carme
Thanks for your answer.

I am using
bin/spark-class  org.apache.spark.deploy.yarn.Client --jar myjar.jar
--class myclass ...

myclass in myjar.jar contains a main that initializes a SparkContext in
yarn-standalone mode.

Then I am using some code that uses myotherjar.jar, but I do not execute it
using the spark context or a RDD, so my understanding is that it is not
excuted on yarn slaves, only on the yarn master.

I found no way to make my code being able to find myotherjar.jar. CLASSPATH
is set by Spark (or Yarn?) before being executed on the Yarn Master, it is
not set by me. It seems that the idea is to set SPARK_CLASSPATH and/or
ADD_JAR and then these jars becomes automatically available in the Yarn
Master but it did not work for me.

I tried also to use sc.addJar, it did not work either, but anyway it seems
clear that this is used for dependancies in the code exectued on the
slaves, not on the master. Tell me if I am wrong






2014-03-25 21:11 GMT+01:00 Nathan Kronenfeld :

> by 'use ... my main program' I presume you mean you have a main function
> in a class file you want to use as your entry point.
>
> SPARK_CLASSPATH, ADD_JAR, etc add your jars in on the master and the
> workers... but they don't on the client.
> For that, you're just using ordinary, everyday java/scala - so it just has
> to be on the normal java classpath.
>
> Could that be your issue?
>
>   -Nathan
>
>
>
> On Tue, Mar 25, 2014 at 2:18 PM, Sandy Ryza wrote:
>
>> Hi Julien,
>>
>> Have you called SparkContext#addJars?
>>
>> -Sandy
>>
>>
>> On Tue, Mar 25, 2014 at 10:05 AM, Julien Carme wrote:
>>
>>> Hello,
>>>
>>> I have been struggling for ages to use an external jar in my spark
>>> driver program, in yarn-standalone mode. I just want to use in my main
>>> program, outside the calls to spark functions, objects that are defined in
>>> another jar.
>>>
>>> I tried to set SPARK_CLASSPATH, ADD_JAR, I tried to use --addJar in the
>>> spark-class arguments, I always end up with a "Class not found exception"
>>> when I want to use classes defined in my jar.
>>>
>>> Any ideas?
>>>
>>> Thanks a lot,
>>>
>>
>>
>
>
> --
> Nathan Kronenfeld
> Senior Visualization Developer
> Oculus Info Inc
> 2 Berkeley Street, Suite 600,
> Toronto, Ontario M5A 4J5
> Phone:  +1-416-203-3003 x 238
> Email:  nkronenf...@oculusinfo.com
>


Re: Error reading HDFS file using spark 0.9.0 / hadoop 2.2.0 - incompatible protobuf 2.5 and 2.4.1

2014-03-25 Thread Gary Malouf
Can anyone verify the claims from Aureliano regarding the Akka dependency
protobuf collision?  Our team has a major need to upgrade to protobuf 2.5.0
up the pipe and Spark seems to be the blocker here.


On Fri, Mar 21, 2014 at 6:49 PM, Aureliano Buendia wrote:

>
>
>
> On Tue, Mar 18, 2014 at 12:56 PM, Ognen Duzlevski <
> og...@plainvanillagames.com> wrote:
>
>>
>> On 3/18/14, 4:49 AM, dmpou...@gmail.com wrote:
>>
>>> On Sunday, 2 March 2014 19:19:49 UTC+2, Aureliano Buendia  wrote:
>>>
 Is there a reason for spark using the older akka?




 On Sun, Mar 2, 2014 at 1:53 PM, 1esha  wrote:

 The problem is in akka remote. It contains files compiled with 2.4.*.
 When

 you run it with 2.5.* in classpath it fails like above.



 Looks like moving to akka 2.3 will solve this issue. Check this issue -

 https://www.assembla.com/spaces/akka/tickets/3154-use-
 protobuf-version-2-5-0#/activity/ticket:


 Is the solution to exclude the  2.4.*. dependency on protobuf or will
 thi produce more complications?

>>> I am not sure I remember what the context was around this but I run
>> 0.9.0 with hadoop 2.2.0 just fine.
>>
>
> The problem is that spark depends on an older version of akka, which
> depends on an older version of protobuf (2.4).
>
> This means people cannot use protobuf 2.5 with spark.
>
>
>> Ognen
>>
>
>


Re: spark executor/driver log files management

2014-03-25 Thread Tathagata Das
The logs from the executor are redirected to stdout only because there is a
default log4j.properties that is configured to do so. If you put your
log4j.properties with rolling file appender in the classpath (refer to
Spark docs for that), all the logs will get redirected to a separate files
that will get rolled over. So even though there will be a stdout, nothing
much would get printed in that. Though the side effect of this modification
is that you wont be able to see the logs from the Spark web UI as that only
shows stdout.




On Mon, Mar 24, 2014 at 10:48 PM, Sourav Chandra <
sourav.chan...@livestream.com> wrote:

> Hi TD,
>
> I thought about that but was not sure whether this will have any impact in
> spark UI/ Executor runner as it redirects stream to stderr/stdout. But
> ideally it should not as it will fetch the log record from stderr file
> (which is latest)..
>
> Is my understanding correct?
>
> Thanks,
> Sourav
>
>
> On Tue, Mar 25, 2014 at 3:26 AM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> You can use RollingFileAppenders in log4j.properties.
>>
>> http://logging.apache.org/log4j/extras/apidocs/org/apache/log4j/rolling/RollingFileAppender.html
>>
>> You can have other scripts delete old logs.
>>
>> TD
>>
>>
>> On Mon, Mar 24, 2014 at 12:20 AM, Sourav Chandra <
>> sourav.chan...@livestream.com> wrote:
>>
>>> Hi,
>>>
>>> I have few questions regarding log file management in spark:
>>>
>>> 1. Currently I did not find any way to modify the lof file name for
>>> executor/drivers). Its hardcoded as stdout and stderr. Also there is no log
>>> rotation.
>>>
>>> In case of streaming application this will grow forever and become
>>> unmanageable. Is there any way to overcome this?
>>>
>>> Thanks,
>>> --
>>>
>>> Sourav Chandra
>>>
>>> Senior Software Engineer
>>>
>>> · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·
>>>
>>> sourav.chan...@livestream.com
>>>
>>> o: +91 80 4121 8723
>>>
>>> m: +91 988 699 3746
>>>
>>> skype: sourav.chandra
>>>
>>> Livestream
>>>
>>> "Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
>>> Block, Koramangala Industrial Area,
>>>
>>> Bangalore 560034
>>>
>>> www.livestream.com
>>>
>>
>>
>
>
> --
>
> Sourav Chandra
>
> Senior Software Engineer
>
> · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·
>
> sourav.chan...@livestream.com
>
> o: +91 80 4121 8723
>
> m: +91 988 699 3746
>
> skype: sourav.chandra
>
> Livestream
>
> "Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
> Block, Koramangala Industrial Area,
>
> Bangalore 560034
>
> www.livestream.com
>


Spark 0.9.1 - How to run bin/spark-class with my own hadoop jar files?

2014-03-25 Thread Andrew Lee
Hi All,
I'm getting the following error when I execute start-master.sh which also 
invokes spark-class at the end.








Failed to find Spark assembly in /root/spark/assembly/target/scala-2.10/
You need to build Spark with 'sbt/sbt assembly' before running this program.
After digging into the code, I see the CLASSPATH is hardcoded with 
"spark-assembly.*hadoop.*.jar".In bin/spark-class :
if [ ! -f "$FWDIR/RELEASE" ]; then  # Exit if the user hasn't compiled Spark  
num_jars=$(ls "$FWDIR"/assembly/target/scala-$SCALA_VERSION/ | grep 
"spark-assembly.*hadoop.*.jar" | wc -l)  jars_list=$(ls 
"$FWDIR"/assembly/target/scala-$SCALA_VERSION/ | grep 
"spark-assembly.*hadoop.*.jar")  if [ "$num_jars" -eq "0" ]; thenecho 
"Failed to find Spark assembly in $FWDIR/assembly/target/scala-$SCALA_VERSION/" 
>&2echo "You need to build Spark with 'sbt/sbt assembly' before running 
this program." >&2exit 1  fi  if [ "$num_jars" -gt "1" ]; thenecho 
"Found multiple Spark assembly jars in 
$FWDIR/assembly/target/scala-$SCALA_VERSION:" >&2echo "$jars_list"echo 
"Please remove all but one jar."exit 1  fi






















fi
Is there any reason why this is only grabbing spark-assembly.*hadoop.*.jar ? I 
am trying to run Spark that links to my own version of Hadoop under 
/opt/hadoop23/, and I use 'sbt/sbt clean package' to build the package without 
the Hadoop jar. What is the correct way to link to my own Hadoop jar?


  

Static ports for fileserver and httpbroadcast in Spark driver

2014-03-25 Thread Guillermo Cabrera2
Hi:

I am setting up a Spark 0.9.0 cluster over multiple hosts using Docker. I 
use a combination of /etc/hosts editing and port mapping to handle correct 
routing between Spark Master and Worker containers. My issue arises when I 
try to do any operation involving a textFile (hdfs or local) in the Spark 
cluster from a Spark Shell (in another container):

14/03/25 18:56:00 WARN TaskSetManager: Lost TID 1 (task 0.0:1)
14/03/25 18:56:01 WARN TaskSetManager: Loss was due to 
java.net.NoRouteToHostException
java.net.NoRouteToHostException: No route to host
at java.net.PlainSocketImpl.socketConnect(Native Method)

I currently set the spark.driver.port and spark.driver.host properties for 
my Spark Driver, however, the fileserver and httpBroadcast server are 
given random ports. I am currently not mapping these last two ports, thus 
are inaccessible in cluster.

Is there a way to also make these static ports without having to modify 
source?

All the best,
Gui

Re: Using an external jar in the driver, in yarn-standalone mode.

2014-03-25 Thread Nathan Kronenfeld
by 'use ... my main program' I presume you mean you have a main function in
a class file you want to use as your entry point.

SPARK_CLASSPATH, ADD_JAR, etc add your jars in on the master and the
workers... but they don't on the client.
For that, you're just using ordinary, everyday java/scala - so it just has
to be on the normal java classpath.

Could that be your issue?

  -Nathan



On Tue, Mar 25, 2014 at 2:18 PM, Sandy Ryza  wrote:

> Hi Julien,
>
> Have you called SparkContext#addJars?
>
> -Sandy
>
>
> On Tue, Mar 25, 2014 at 10:05 AM, Julien Carme wrote:
>
>> Hello,
>>
>> I have been struggling for ages to use an external jar in my spark driver
>> program, in yarn-standalone mode. I just want to use in my main program,
>> outside the calls to spark functions, objects that are defined in another
>> jar.
>>
>> I tried to set SPARK_CLASSPATH, ADD_JAR, I tried to use --addJar in the
>> spark-class arguments, I always end up with a "Class not found exception"
>> when I want to use classes defined in my jar.
>>
>> Any ideas?
>>
>> Thanks a lot,
>>
>
>


-- 
Nathan Kronenfeld
Senior Visualization Developer
Oculus Info Inc
2 Berkeley Street, Suite 600,
Toronto, Ontario M5A 4J5
Phone:  +1-416-203-3003 x 238
Email:  nkronenf...@oculusinfo.com


Re: Running a task once on each executor

2014-03-25 Thread Christopher Nguyen
Deenar, the singleton pattern I'm suggesting would look something like this:

public class TaskNonce {

  private transient boolean mIsAlreadyDone;

  private static transient TaskNonce mSingleton = new TaskNonce();

  private transient Object mSyncObject = new Object();

  public TaskNonce getSingleton() { return mSingleton; }

  public void doThisOnce() {
if (mIsAlreadyDone) return;
lock (mSyncObject) {
  mIsAlreadyDone = true;
  ...
}
  }

which you would invoke as TaskNonce.getSingleton().doThisOnce() from within
the map closure. If you're using the Spark Java API, you can put all this
code in the mapper class itself.

There is no need to require one-row RDD partitions to achieve what you
want, if I understand your problem statement correctly.



--
Christopher T. Nguyen
Co-founder & CEO, Adatao 
linkedin.com/in/ctnguyen



On Tue, Mar 25, 2014 at 11:07 AM, deenar.toraskar wrote:

> Christopher
>
> It is once per JVM. TaskNonce would meet my needs. I guess if I want it
> once
> per thread, then a ThreadLocal would do the same.
>
> But how do I invoke TaskNonce, what is the best way to generate a RDD to
> ensure that there is one element per executor.
>
> Deenar
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Running-a-task-once-on-each-executor-tp3203p3208.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: ClassCastException when using saveAsTextFile

2014-03-25 Thread Niko Stahl
Ok, so I've been able to narrow down the problem to this specific case:

def toCsv(userTuple: String) = {"a,b,c"}
val dataTemp = Array("line1", "line2")
val dataTempDist = sc.parallelize(dataTemp)
val usersFormatted = dataTempDist.map(toCsv)
usersFormatted.saveAsTextFile("hdfs://" + masterDomain + ":9000/user/root/"
+  "test_dir")

Even this simple mapping give me a java.lang.ClassCastException. Sorry, my
knowledge of Scala is very rudimentary.

Thanks,
Niko


On Tue, Mar 25, 2014 at 5:55 PM, Niko Stahl  wrote:

> Hi,
>
> I'm trying to save an RDD to HDFS with the saveAsTextFile method on my ec2
> cluster and am encountering the following exception (the app is called
> GraphTest):
>
> Exception failure: java.lang.ClassCastException: cannot assign instance of
> GraphTest$$anonfun$3 to field org.apache.spark.rdd.MappedRDD.f of type
> scala.Function1 in instance of org.apache.spark.rdd.MappedRDD
>
> The RDD is simply a list of strings. Strangely enough the same sequence of
> commands when executed in the Spark shell does not cause the above error.
> Any thoughts on what might be going on here?
>
> Thanks,
> Niko
>


Re: Using an external jar in the driver, in yarn-standalone mode.

2014-03-25 Thread Sandy Ryza
Hi Julien,

Have you called SparkContext#addJars?

-Sandy


On Tue, Mar 25, 2014 at 10:05 AM, Julien Carme wrote:

> Hello,
>
> I have been struggling for ages to use an external jar in my spark driver
> program, in yarn-standalone mode. I just want to use in my main program,
> outside the calls to spark functions, objects that are defined in another
> jar.
>
> I tried to set SPARK_CLASSPATH, ADD_JAR, I tried to use --addJar in the
> spark-class arguments, I always end up with a "Class not found exception"
> when I want to use classes defined in my jar.
>
> Any ideas?
>
> Thanks a lot,
>


Re: Akka error with largish job (works fine for smaller versions)

2014-03-25 Thread Nathan Kronenfeld
After digging deeper, I realized all the workers ran out of memory, giving
an hs_error.log file in /tmp/jvm- with the header:

# Native memory allocation (malloc) failed to allocate 2097152 bytes for
committing reserved memory.
# Possible reasons:
#   The system is out of physical RAM or swap space
#   In 32 bit mode, the process size limit was hit
# Possible solutions:
#   Reduce memory load on the system
#   Increase physical memory or swap space
#   Check if swap backing store is full
#   Use 64 bit Java on a 64 bit OS
#   Decrease Java heap size (-Xmx/-Xms)
#   Decrease number of Java threads
#   Decrease Java thread stack sizes (-Xss)
#   Set larger code cache with -XX:ReservedCodeCacheSize=
# This output file may be truncated or incomplete.
#
#  Out of Memory Error (os_linux.cpp:2761), pid=31426, tid=139549745604352
#
# JRE version: OpenJDK Runtime Environment (7.0_51-b02) (build
1.7.0_51-mockbuild_2014_01_15_01_3
9-b00)
# Java VM: OpenJDK 64-Bit Server VM (24.45-b08 mixed mode linux-amd64 )



We have 3 workers, each assigned 200G for spark.
The dataset is ~250g

All I'm doing is data.map(r => (getKey(r),
r)).sortByKey().map(_._2).coalesce(n).saveAsTextFile(), where n is the
original number of files in the dataset.

This worked fine under spark 0.8.1, with the same setup; I haven't changed
this code since upgrading to 0.9.0.

I took a look at a workers memory before it ran out using jmap and jhat;
they indicated file handles as the biggest memory user (which I guess makes
sense for a sort) - but the total was nowhere close to 200g, so I find
their output somewhat suspect.



On Tue, Mar 25, 2014 at 6:59 AM, Andrew Ash  wrote:

> Possibly one of your executors is in the middle of a large stop-the-world
> GC and doesn't respond to network traffic during that period?  If you
> shared some information about how each node in your cluster is set up (heap
> size, memory, CPU, etc) that might help with debugging.
>
> Andrew
>
>
> On Mon, Mar 24, 2014 at 9:13 PM, Nathan Kronenfeld <
> nkronenf...@oculusinfo.com> wrote:
>
>> What does this error mean:
>>
>> @hadoop-s2.oculus.local:45186]: Error [Association failed with
>> [akka.tcp://spark@hadoop-s2.oculus.local:45186]] [
>> akka.remote.EndpointAssociationException: Association failed with
>> [akka.tcp://spark@hadoop-s2.oculus.local:45186]
>> Caused by:
>> akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
>> Connection refused: hadoop-s2.oculus.loca\
>> l/192.168.0.47:45186
>> ]
>>
>> ?
>>
>> --
>> Nathan Kronenfeld
>> Senior Visualization Developer
>> Oculus Info Inc
>> 2 Berkeley Street, Suite 600,
>> Toronto, Ontario M5A 4J5
>> Phone:  +1-416-203-3003 x 238
>> Email:  nkronenf...@oculusinfo.com
>>
>
>


-- 
Nathan Kronenfeld
Senior Visualization Developer
Oculus Info Inc
2 Berkeley Street, Suite 600,
Toronto, Ontario M5A 4J5
Phone:  +1-416-203-3003 x 238
Email:  nkronenf...@oculusinfo.com


Re: Running a task once on each executor

2014-03-25 Thread deenar.toraskar
Christopher 

It is once per JVM. TaskNonce would meet my needs. I guess if I want it once
per thread, then a ThreadLocal would do the same.

But how do I invoke TaskNonce, what is the best way to generate a RDD to
ensure that there is one element per executor.

Deenar 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Running-a-task-once-on-each-executor-tp3203p3208.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Implementation problem with Streaming

2014-03-25 Thread Sanjay Awatramani
Hi,

I had initially thought of a streaming approach to solve my problem, and I am 
stuck at few places and want opinion if this problem is suitable for streaming, 
or is it better to stick to basic spark.

Problem: I get chunks of log files in a folder and need to do some analysis on 
them on an hourly interval, eg. 11.00 to 11.59. The file chunks may or may not 
come in real time and there can be breaks between subsequent chunks.

pseudocode:
While{
  CheckForFile(localFolder)
  CopyToHDFS()
  RDDfile=read(fileFromHDFS)
  RDDHour=RDDHour.union.RDDfile.filter(keyHour=currentHr)
  if(RDDHour.keys().contains(currentHr+1) //next Hr has come, so current Hr 
should be complete
  {
      RDDHour.process()
      deleteFileFromHDFS()
      RDDHour.empty()
      currentHr++
  }
}

If I use streaming, I face the following problems:
1) Inability to keep a Java Variable (currentHr) in the driver which can be 
used across batches.
2) The input files may come with a break, for eg. 10.00 - 10.30 comes, then a 
break for 4 hours. If I use streaming, then I can't process the 10.00 - 10.30 
batch as its incomplete, and the 1 hour DStream window for the 10.30 - 11.00 
file will have previous RDD as empty as nothing was received in the preceding 4 
hours. Basically Streaming takes file time as input and not the time inside the 
file content. 
3) no control on deleting file from HDFS as the program runs in a 
SparkStreamingContext loop

Any ideas on overcoming the above limitations or whether streaming is suitable 
for such kind of problem or not, will be helpful.

Regards,
Sanjay

ClassCastException when using saveAsTextFile

2014-03-25 Thread Niko Stahl
Hi,

I'm trying to save an RDD to HDFS with the saveAsTextFile method on my ec2
cluster and am encountering the following exception (the app is called
GraphTest):

Exception failure: java.lang.ClassCastException: cannot assign instance of
GraphTest$$anonfun$3 to field org.apache.spark.rdd.MappedRDD.f of type
scala.Function1 in instance of org.apache.spark.rdd.MappedRDD

The RDD is simply a list of strings. Strangely enough the same sequence of
commands when executed in the Spark shell does not cause the above error.
Any thoughts on what might be going on here?

Thanks,
Niko


Re: Running a task once on each executor

2014-03-25 Thread Christopher Nguyen
Deenar, when you say "just once", have you defined "across multiple "
(e.g., across multiple threads in the same JVM on the same machine)? In
principle you can have multiple executors on the same machine.

In any case, assuming it's the same JVM, have you considered using a
singleton that maintains done/not-done state, that is invoked by each of
the instances (TaskNonce.getSingleton().doThisOnce()) ? You can, e.g., mark
the state boolean "transient" to prevent it from going through serdes.



--
Christopher T. Nguyen
Co-founder & CEO, Adatao 
linkedin.com/in/ctnguyen



On Tue, Mar 25, 2014 at 10:03 AM, deenar.toraskar wrote:

> Hi
>
> Is there a way in Spark to run a function on each executor just once. I
> have
> a couple of use cases.
>
> a) I use an external library that is a singleton. It keeps some global
> state
> and provides some functions to manipulate it (e.g. reclaim memory. etc.) .
> I
> want to check the global state of this library on each executor.
>
> b) To get jvm stats or instrumentation on each executor.
>
> Currently I have a crude way of achieving something similar, I just run a
> map on a large RDD that is hash partitioned, this does not guarantee that
> the job would run just once.
>
> Deenar
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Running-a-task-once-on-each-executor-tp3203.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Using an external jar in the driver, in yarn-standalone mode.

2014-03-25 Thread Julien Carme
Hello,

I have been struggling for ages to use an external jar in my spark driver
program, in yarn-standalone mode. I just want to use in my main program,
outside the calls to spark functions, objects that are defined in another
jar.

I tried to set SPARK_CLASSPATH, ADD_JAR, I tried to use --addJar in the
spark-class arguments, I always end up with a "Class not found exception"
when I want to use classes defined in my jar.

Any ideas?

Thanks a lot,


Running a task once on each executor

2014-03-25 Thread deenar.toraskar
Hi

Is there a way in Spark to run a function on each executor just once. I have
a couple of use cases. 

a) I use an external library that is a singleton. It keeps some global state
and provides some functions to manipulate it (e.g. reclaim memory. etc.) . I
want to check the global state of this library on each executor. 

b) To get jvm stats or instrumentation on each executor.

Currently I have a crude way of achieving something similar, I just run a
map on a large RDD that is hash partitioned, this does not guarantee that
the job would run just once.

Deenar



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Running-a-task-once-on-each-executor-tp3203.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


RE: [bug?] streaming window unexpected behaviour

2014-03-25 Thread Adrian Mocanu
Let me rephrase that,
Do you think it is possible to use an accumulator to skip the first few 
incomplete RDDs?

-Original Message-
From: Adrian Mocanu [mailto:amoc...@verticalscope.com] 
Sent: March-25-14 9:57 AM
To: user@spark.apache.org
Cc: u...@spark.incubator.apache.org
Subject: RE: [bug?] streaming window unexpected behaviour

Thanks TD!
Is it possible to perhaps add another window method that doesn't not generate 
partial windows? Or, Is it possible to remove the first few partial windows? 
I'm thinking of using an accumulator to count how many windows there are.

-A

-Original Message-
From: Tathagata Das [mailto:tathagata.das1...@gmail.com]
Sent: March-24-14 6:55 PM
To: user@spark.apache.org
Cc: u...@spark.incubator.apache.org
Subject: Re: [bug?] streaming window unexpected behaviour

Yes, I believe that is current behavior. Essentially, the first few RDDs will 
be partial windows (assuming window duration > sliding interval).

TD


On Mon, Mar 24, 2014 at 1:12 PM, Adrian Mocanu  
wrote:
> I have what I would call unexpected behaviour when using window on a stream.
>
> I have 2 windowed streams with a 5s batch interval. One window stream 
> is (5s,5s)=smallWindow and the other (10s,5s)=bigWindow
>
> What I've noticed is that the 1st RDD produced by bigWindow is 
> incorrect and is of the size 5s not 10s. So instead of waiting 10s and 
> producing 1 RDD with size 10s, Spark produced the 1st 10s RDD of size 5s.
>
> Why is this happening? To me it looks like a bug; Matei or TD can you 
> verify that this is correct behaviour?
>
>
>
>
>
> I have the following code
>
> val ssc = new StreamingContext(conf, Seconds(5))
>
>
>
> val smallWindowStream = ssc.queueStream(smallWindowRddQueue)
>
> val bigWindowStream = ssc.queueStream(bigWindowRddQueue)
>
>
>
> val smallWindow = smallWindowReshapedStream.window(Seconds(5),
> Seconds(5))
>
>   .reduceByKey((t1, t2) => (t1._1, t1._2, t1._3 + t2._3))
>
> val bigWindow = bigWindowReshapedStream.window(Seconds(10),
> Seconds(5))
>
> .reduceByKey((t1, t2) => (t1._1, t1._2, t1._3 + t2._3))
>
>
>
> -Adrian
>
>


Re: K-means faster on Mahout then on Spark

2014-03-25 Thread Egor Pahomov
Mahout used MR and made one MR on every iteration. It worked as predicted.
My question more about why spark was so slow. I would try
MEMORY_AND_DISK_SER


2014-03-25 17:58 GMT+04:00 Suneel Marthi :

> Mahout does have a kmeans which can be executed in mapreduce and iterative
> modes.
>
> Sent from my iPhone
>
> On Mar 25, 2014, at 9:25 AM, Prashant Sharma  wrote:
>
> I think Mahout uses FuzzyKmeans, which is different algorithm and it is
> not iterative.
>
> Prashant Sharma
>
>
> On Tue, Mar 25, 2014 at 6:50 PM, Egor Pahomov wrote:
>
>> Hi, I'm running benchmark, which compares Mahout and SparkML. For now I
>> have next results for k-means:
>> Number of iterations= 10, number of elements = 1000, mahouttime= 602,
>> spark time = 138
>> Number of iterations= 40, number of elements = 1000, mahouttime=
>> 1917, spark time = 330
>> Number of iterations= 70, number of elements = 1000, mahouttime=
>> 3203, spark time = 388
>> Number of iterations= 10, number of elements = 1, mahouttime=
>> 1235, spark time = 2226
>> Number of iterations= 40, number of elements = 1, mahouttime=
>> 2755, spark time = 6388
>> Number of iterations= 70, number of elements = 1, mahouttime=
>> 4107, spark time = 10967
>> Number of iterations= 10, number of elements = 10, mahouttime=
>> 7070, spark time = 25268
>>
>> Time in seconds. It runs on Yarn cluster with about 40 machines. Elements
>> for clusterization are randomly created. When I changed persistence level
>> from Memory to Memory_and_disk, on big data spark started to work faster.
>>
>> What am I missing?
>>
>> See my benchmarking code in attachment.
>>
>>
>> --
>>
>>
>>
>> *Sincerely yours Egor PakhomovScala Developer, Yandex*
>>
>
>


-- 



*Sincerely yoursEgor PakhomovScala Developer, Yandex*


Re: K-means faster on Mahout then on Spark

2014-03-25 Thread Suneel Marthi
Mahout does have a kmeans which can be executed in mapreduce and iterative 
modes.

Sent from my iPhone

> On Mar 25, 2014, at 9:25 AM, Prashant Sharma  wrote:
> 
> I think Mahout uses FuzzyKmeans, which is different algorithm and it is not 
> iterative. 
> 
> Prashant Sharma
> 
> 
>> On Tue, Mar 25, 2014 at 6:50 PM, Egor Pahomov  wrote:
>> Hi, I'm running benchmark, which compares Mahout and SparkML. For now I have 
>> next results for k-means:
>> Number of iterations= 10, number of elements = 1000, mahouttime= 602, 
>> spark time = 138
>> Number of iterations= 40, number of elements = 1000, mahouttime= 1917, 
>> spark time = 330
>> Number of iterations= 70, number of elements = 1000, mahouttime= 3203, 
>> spark time = 388
>> Number of iterations= 10, number of elements = 1, mahouttime= 1235, 
>> spark time = 2226
>> Number of iterations= 40, number of elements = 1, mahouttime= 2755, 
>> spark time = 6388
>> Number of iterations= 70, number of elements = 1, mahouttime= 4107, 
>> spark time = 10967
>> Number of iterations= 10, number of elements = 10, mahouttime= 7070, 
>> spark time = 25268
>> 
>> Time in seconds. It runs on Yarn cluster with about 40 machines. Elements 
>> for clusterization are randomly created. When I changed persistence level 
>> from Memory to Memory_and_disk, on big data spark started to work faster.
>> 
>> What am I missing?
>> 
>> See my benchmarking code in attachment.
>> 
>> 
>> -- 
>> Sincerely yours
>> Egor Pakhomov
>> Scala Developer, Yandex
> 


Re: K-means faster on Mahout then on Spark

2014-03-25 Thread Prashant Sharma
I think Mahout uses FuzzyKmeans, which is different algorithm and it is not
iterative.

Prashant Sharma


On Tue, Mar 25, 2014 at 6:50 PM, Egor Pahomov wrote:

> Hi, I'm running benchmark, which compares Mahout and SparkML. For now I
> have next results for k-means:
> Number of iterations= 10, number of elements = 1000, mahouttime= 602,
> spark time = 138
> Number of iterations= 40, number of elements = 1000, mahouttime= 1917,
> spark time = 330
> Number of iterations= 70, number of elements = 1000, mahouttime= 3203,
> spark time = 388
> Number of iterations= 10, number of elements = 1, mahouttime=
> 1235, spark time = 2226
> Number of iterations= 40, number of elements = 1, mahouttime=
> 2755, spark time = 6388
> Number of iterations= 70, number of elements = 1, mahouttime=
> 4107, spark time = 10967
> Number of iterations= 10, number of elements = 10, mahouttime=
> 7070, spark time = 25268
>
> Time in seconds. It runs on Yarn cluster with about 40 machines. Elements
> for clusterization are randomly created. When I changed persistence level
> from Memory to Memory_and_disk, on big data spark started to work faster.
>
> What am I missing?
>
> See my benchmarking code in attachment.
>
>
> --
>
>
>
> *Sincerely yours Egor PakhomovScala Developer, Yandex*
>


RE: [bug?] streaming window unexpected behaviour

2014-03-25 Thread Adrian Mocanu
Thanks TD!
Is it possible to perhaps add another window method that doesn't not generate 
partial windows? Or, Is it possible to remove the first few partial windows? 
I'm thinking of using an accumulator to count how many windows there are.

-A

-Original Message-
From: Tathagata Das [mailto:tathagata.das1...@gmail.com] 
Sent: March-24-14 6:55 PM
To: user@spark.apache.org
Cc: u...@spark.incubator.apache.org
Subject: Re: [bug?] streaming window unexpected behaviour

Yes, I believe that is current behavior. Essentially, the first few RDDs will 
be partial windows (assuming window duration > sliding interval).

TD


On Mon, Mar 24, 2014 at 1:12 PM, Adrian Mocanu  
wrote:
> I have what I would call unexpected behaviour when using window on a stream.
>
> I have 2 windowed streams with a 5s batch interval. One window stream 
> is (5s,5s)=smallWindow and the other (10s,5s)=bigWindow
>
> What I've noticed is that the 1st RDD produced by bigWindow is 
> incorrect and is of the size 5s not 10s. So instead of waiting 10s and 
> producing 1 RDD with size 10s, Spark produced the 1st 10s RDD of size 5s.
>
> Why is this happening? To me it looks like a bug; Matei or TD can you 
> verify that this is correct behaviour?
>
>
>
>
>
> I have the following code
>
> val ssc = new StreamingContext(conf, Seconds(5))
>
>
>
> val smallWindowStream = ssc.queueStream(smallWindowRddQueue)
>
> val bigWindowStream = ssc.queueStream(bigWindowRddQueue)
>
>
>
> val smallWindow = smallWindowReshapedStream.window(Seconds(5), 
> Seconds(5))
>
>   .reduceByKey((t1, t2) => (t1._1, t1._2, t1._3 + t2._3))
>
> val bigWindow = bigWindowReshapedStream.window(Seconds(10), 
> Seconds(5))
>
> .reduceByKey((t1, t2) => (t1._1, t1._2, t1._3 + t2._3))
>
>
>
> -Adrian
>
>


Re: K-means faster on Mahout then on Spark

2014-03-25 Thread Guillaume Pitel (eXenSa)
Maybe with "MEMORY_ONLY", spark has to recompute the RDD several times because 
they don't fit in memory. It makes things run slower.

As a general safe rule, use MEMORY_AND_DISK_SER



Guillaume Pitel - Président d'eXenSa 

Prashant Sharma  a écrit :

>I think Mahout uses FuzzyKmeans, which is different algorithm and it is not 
>iterative. 
>
>
>Prashant Sharma
>
>
>
>On Tue, Mar 25, 2014 at 6:50 PM, Egor Pahomov  wrote:
>
>Hi, I'm running benchmark, which compares Mahout and SparkML. For now I have 
>next results for k-means:
>Number of iterations= 10, number of elements = 1000, mahouttime= 602, 
>spark time = 138
>Number of iterations= 40, number of elements = 1000, mahouttime= 1917, 
>spark time = 330
>Number of iterations= 70, number of elements = 1000, mahouttime= 3203, 
>spark time = 388
>Number of iterations= 10, number of elements = 1, mahouttime= 1235, 
>spark time = 2226
>Number of iterations= 40, number of elements = 1, mahouttime= 2755, 
>spark time = 6388
>Number of iterations= 70, number of elements = 1, mahouttime= 4107, 
>spark time = 10967
>Number of iterations= 10, number of elements = 10, mahouttime= 7070, 
>spark time = 25268
>
>Time in seconds. It runs on Yarn cluster with about 40 machines. Elements for 
>clusterization are randomly created. When I changed persistence level from 
>Memory to Memory_and_disk, on big data spark started to work faster.
>
>What am I missing?
>
>See my benchmarking code in attachment.
>
>
>
>-- 
>
>Sincerely yours
>Egor Pakhomov
>Scala Developer, Yandex
>
>


K-means faster on Mahout then on Spark

2014-03-25 Thread Egor Pahomov
Hi, I'm running benchmark, which compares Mahout and SparkML. For now I
have next results for k-means:
Number of iterations= 10, number of elements = 1000, mahouttime= 602,
spark time = 138
Number of iterations= 40, number of elements = 1000, mahouttime= 1917,
spark time = 330
Number of iterations= 70, number of elements = 1000, mahouttime= 3203,
spark time = 388
Number of iterations= 10, number of elements = 1, mahouttime= 1235,
spark time = 2226
Number of iterations= 40, number of elements = 1, mahouttime= 2755,
spark time = 6388
Number of iterations= 70, number of elements = 1, mahouttime= 4107,
spark time = 10967
Number of iterations= 10, number of elements = 10, mahouttime=
7070, spark time = 25268

Time in seconds. It runs on Yarn cluster with about 40 machines. Elements
for clusterization are randomly created. When I changed persistence level
from Memory to Memory_and_disk, on big data spark started to work faster.

What am I missing?

See my benchmarking code in attachment.


-- 



*Sincerely yoursEgor PakhomovScala Developer, Yandex*

package ru.yandex.spark.examples

import scala.util.Random
import scala.collection.mutable
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.hadoop.conf.Configuration
import ru.yandex.spark.benchmark.Job
import org.apache.mahout.common.distance.EuclideanDistanceMeasure
import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.{SparkConf, SparkContext}
import org.slf4j.{LoggerFactory, Logger}
import org.apache.spark.storage.StorageLevel

object KMeansBenchMark {

  private final val log: Logger = LoggerFactory.getLogger(this.getClass)

  val benchPath: Path = new Path("/tmp/benchmark")
  val inputDataPath: Path = new Path("/tmp/benchmark/testdata")
  val outputDataPath: Path = new Path("/tmp/benchmark/output")

  val configuration = new Configuration()
  val fs = FileSystem.get(FileSystem.getDefaultUri(configuration), configuration)

  def main(args: Array[String]) {

type MahoutTime = Long
type SparkTime = Long
type NumberOfIterations = Int
type NumberOfElements = Long

val result = new mutable.MutableList[(NumberOfIterations, NumberOfElements, MahoutTime, SparkTime)]


System.setProperty("SPARK_YARN_APP_JAR", SparkContext.jarOfClass(this.getClass).head)
System.setProperty("SPARK_JAR", SparkContext.jarOfClass(SparkContext.getClass).head)
System.setProperty("spark.driver.port", "49014")

val conf = new SparkConf()
conf.setAppName("serp-api")
conf.setMaster("yarn-client")
conf.set("spark.httpBroadcast.port", "35660")
conf.set("spark.fileserver.port", "35661")
conf.setJars(SparkContext.jarOfClass(this.getClass))


val numbers = List(1000L, 1L, 10L, 10L)

for (numberOfElements: NumberOfElements <- numbers) {
  for (numberOfIterations: NumberOfIterations <- 10 until 80 by 30) {
println(s"- ${numberOfElements} ${numberOfIterations}")
prepareData(numberOfElements)

val sparkStart = System.currentTimeMillis()
val spark = new SparkContext(conf)
val input = spark.textFile(inputDataPath.toString).map(s => s.split(" ").map(number => number.toDouble)).persist(StorageLevel.DISK_ONLY)
KMeans.train(input, 10, numberOfIterations, 1, KMeans.RANDOM).clusterCenters
spark.stop()
val sparkEnd = System.currentTimeMillis()

val mahaoutStart = System.currentTimeMillis()
Job.run(configuration, inputDataPath, outputDataPath, new EuclideanDistanceMeasure, 10, 0.5, numberOfIterations)
val mahaoutEnd = System.currentTimeMillis()

val mahaoutTime: MahoutTime = (mahaoutEnd - mahaoutStart) / 1000
val sparkTime: SparkTime = (sparkEnd - sparkStart) / 1000
result += ((numberOfIterations, numberOfElements, mahaoutTime, sparkTime))
for (i <- result) {
  log.info(s"Number of iterations= ${i._1}, number of elements = ${i._2}, mahouttime= ${i._3}, spark time = ${i._4}")
}
for (i <- result) {
  println(s"Number of iterations= ${i._1}, number of elements = ${i._2}, mahouttime= ${i._3}, spark time = ${i._4}")
}
  }
}

  }

  def prepareData(numberOfElements: Long) = {
fs.delete(benchPath, true)
fs.mkdirs(benchPath)
val output = fs.create(inputDataPath)
for (i <- 0L until numberOfElements) {
  output.writeBytes(nextRandom + " " + nextRandom + " " + nextRandom + " " + nextRandom + " " + nextRandom + " " + nextRandom + " " + nextRandom + " " + nextRandom + " " + nextRandom + " " + nextRandom + " " + nextRandom + " " + nextRandom + " " + nextRandom + "\n")
}
output.close()
  }

  def nextRandom = {
Random.nextGaussian() * 10e5 - Random.nextInt(10) * 10e4
  }

}


Re: tuple as keys in pyspark show up reversed

2014-03-25 Thread Friso van Vollenhoven
OK, forget about this question. It was a nasty, one character typo in my
own code (sorting by rating instead of item at one point).
Best,
Friso


On Tue, Mar 25, 2014 at 1:53 PM, Friso van Vollenhoven <
f.van.vollenho...@gmail.com> wrote:

> Hi,
>
> I have an example where I use a tuple of (int,int) in Python as key for a
> RDD. When I do a reduceByKey(...), sometimes the tuples turn up with the
> two int's reversed in order (which is problematic, as the ordering is part
> of the key).
>
> Here is a ipython notebook that has some code and demonstrates this issue:
> http://nbviewer.ipython.org/urls/dl.dropboxusercontent.com/u/5812021/test-on-cluster.ipynb?create=1
>
> Here's the long story... I am doing collaborative filtering using cosine
> similarity on Spark using Python. Not because I need it, but it seemed like
> a appropriately simple but useful exercise to get started with Spark.
>
> I am seeing a difference in outcomes between running locally and running
> on a cluster.
>
> My approach is this:
>
>- given a dataset containing tuples of (user, item, rating)
>- group by user
>- for each user flatMap into tuples of ((item, item), (rating,
>rating)) for each combination of two items that that user has seen
>- then map into a structure containing ((item, item), (rating *
>rating, left rating ^ 2, right rating ^ 2, 1)
>- then reduceByKey, summing up the values in the tuples column-wise;
>this gives a tuple of (sum of rating product, sum of left rating squares,
>sum of right rating square, co-occurrence count) which can be used to
>calculate cosine similarity.
>- map into (item,item), (similarity, count)
>- this should result in a dataset that looks like: (item, item),
>(similarity, count)
>
> (In the notebook I leave out the final step of converting the sums into a
> cosine similarity.)
>
> When I do this for a artificial dataset of 10 users with each 70 items
> rated that are exactly the same for each user (so 7M ratings in total where
> the user,item matrix is dense), I would expect the resulting dataset to
> have 2415 tuples (== 70*69 / 2, the number of co-occurrences that exist
> amongst 70 items that each user has rated) and I would expect the
> co-occurrence count for each item,item pair to be 10, as there are 100K
> users.
>
> When I run my code locally, the above assumptions work out, but when I run
> on a small cluster (4 workers, on AWS), the numbers are way off. This
> happens because of the reversed tuples.
>
> Where am I going wrong?
>
>
> Thanks for any pointers, cheers,
> Friso
>
>


tuple as keys in pyspark show up reversed

2014-03-25 Thread Friso van Vollenhoven
Hi,

I have an example where I use a tuple of (int,int) in Python as key for a
RDD. When I do a reduceByKey(...), sometimes the tuples turn up with the
two int's reversed in order (which is problematic, as the ordering is part
of the key).

Here is a ipython notebook that has some code and demonstrates this issue:
http://nbviewer.ipython.org/urls/dl.dropboxusercontent.com/u/5812021/test-on-cluster.ipynb?create=1

Here's the long story... I am doing collaborative filtering using cosine
similarity on Spark using Python. Not because I need it, but it seemed like
a appropriately simple but useful exercise to get started with Spark.

I am seeing a difference in outcomes between running locally and running on
a cluster.

My approach is this:

   - given a dataset containing tuples of (user, item, rating)
   - group by user
   - for each user flatMap into tuples of ((item, item), (rating, rating))
   for each combination of two items that that user has seen
   - then map into a structure containing ((item, item), (rating * rating,
   left rating ^ 2, right rating ^ 2, 1)
   - then reduceByKey, summing up the values in the tuples column-wise;
   this gives a tuple of (sum of rating product, sum of left rating squares,
   sum of right rating square, co-occurrence count) which can be used to
   calculate cosine similarity.
   - map into (item,item), (similarity, count)
   - this should result in a dataset that looks like: (item, item),
   (similarity, count)

(In the notebook I leave out the final step of converting the sums into a
cosine similarity.)

When I do this for a artificial dataset of 10 users with each 70 items
rated that are exactly the same for each user (so 7M ratings in total where
the user,item matrix is dense), I would expect the resulting dataset to
have 2415 tuples (== 70*69 / 2, the number of co-occurrences that exist
amongst 70 items that each user has rated) and I would expect the
co-occurrence count for each item,item pair to be 10, as there are 100K
users.

When I run my code locally, the above assumptions work out, but when I run
on a small cluster (4 workers, on AWS), the numbers are way off. This
happens because of the reversed tuples.

Where am I going wrong?


Thanks for any pointers, cheers,
Friso


Re: Change print() in JavaNetworkWordCount

2014-03-25 Thread Sourav Chandra
You can extend DStream and override print() method. Then you can create
your own DSTream extending from this.


On Tue, Mar 25, 2014 at 6:07 PM, Eduardo Costa Alfaia <
e.costaalf...@unibs.it> wrote:

> Hi Guys,
> I think that I already did this question, but I don't remember if anyone
> has answered me. I would like changing in the function print() the quantity
> of words and the frequency number that are sent to driver's screen. The
> default value is 10.
>
> Anyone could help me with this?
>
> Best Regards
>
> --
> Informativa sulla Privacy: http://www.unibs.it/node/8155
>



-- 

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

sourav.chan...@livestream.com

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com


Change print() in JavaNetworkWordCount

2014-03-25 Thread Eduardo Costa Alfaia

Hi Guys,
I think that I already did this question, but I don't remember if anyone 
has answered me. I would like changing in the function print() the 
quantity of words and the frequency number that are sent to driver's 
screen. The default value is 10.


Anyone could help me with this?

Best Regards

--
Informativa sulla Privacy: http://www.unibs.it/node/8155


Worker Threads Vs Spark Executor Memory

2014-03-25 Thread Annamalai, Sai IN BLR STS
Hi All,

1)  Does number of worker threads bear any relationship to setting executor 
memory ?.
 I have a 16 GB RAM, with an 8-core processor. I had set SPARK_MEM to 12g and 
was running locally with default 1 thread.
So this means there can be maximum one executor in one node scheduled at any 
point of time.
If I increase the number of worker threads to say 4, do I need to reduce 
SPARK_MEM to 3g or I need not ???
2)  Is there any performance difference between running in interactive 
spark shell to non-interactive standalone spark apart from building 
time(sbt...) ???


Thanks !!





Re: Pig on Spark

2014-03-25 Thread lalit1303
Hi,

I have been following Aniket's spork github repository.
https://github.com/aniket486/pig
I have done all the changes mentioned in recently modified pig-spark file.

I am using:
hadoop 2.0.5 alpha
spark-0.8.1-incubating
mesos 0.16.0

##PIG variables
export *HADOOP_CONF_DIR*=$HADOOP_INSTALL/etc/hadoop
export *SPARK_YARN_APP_JAR*=/home/ubuntu/pig/pig-withouthadoop.jar
export *SPARK_JAVA_OPTS*=" -XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/tmp/heap.dump"
export
*SPARK_JAR*=/home/ubuntu/spark/assembly/target/scala-2.9.3/spark-assembly-0.8.1-incubating-hadoop2.0.5-alpha.jar
export *SPARK_MASTER*=yarn-client
export *SPARK_HOME*=/home/ubuntu/spark
export *SPARK_JARS*=/home/ubuntu/pig/contrib/piggybank/java/piggybank.jar
export
*PIG_CLASSPATH*=${SPARK_JAR}:${SPARK_JARS}:/home/ubuntu/mesos/build/src/mesos-0.16.0.jar:/home/ubuntu/pig/pig-withouthadoop.jar
export *SPARK_PIG_JAR*=/home/ubuntu/pig/pig-withouthadoop.jar


This works fine in Mapreduce and local mode. But, while running on spark
mode I am facing follwing error. This error come after the job is submitted
and run on yarn-master.
Can you please tell me how to proceed.

###error
message

*ERROR 2998*: *Unhandled internal error. class
org.apache.spark.util.InnerClosureFinder* has interface
org.objectweb.asm.ClassVisitor as super class

*java.lang.IncompatibleClassChangeError*: class
org.apache.spark.util.InnerClosureFinder has interface
org.objectweb.asm.ClassVisitor as super class
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:643)
at 
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:277)
at java.net.URLClassLoader.access$000(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:212)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
at java.lang.ClassLoader.loadClass(ClassLoader.java:323)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)
at java.lang.ClassLoader.loadClass(ClassLoader.java:268)
at
org.apache.spark.util.ClosureCleaner$.getInnerClasses(ClosureCleaner.scala:87)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:107)
at org.apache.spark.SparkContext.clean(SparkContext.scala:970)
at org.apache.spark.rdd.RDD.map(RDD.scala:246)
at
org.apache.pig.backend.hadoop.executionengine.spark.converter.LoadConverter.convert(LoadConverter.java:68)
at
org.apache.pig.backend.hadoop.executionengine.spark.converter.LoadConverter.convert(LoadConverter.java:38)
at
org.apache.pig.backend.hadoop.executionengine.spark.SparkLauncher.physicalToRDD(SparkLauncher.java:212)
at
org.apache.pig.backend.hadoop.executionengine.spark.SparkLauncher.physicalToRDD(SparkLauncher.java:201)
at
org.apache.pig.backend.hadoop.executionengine.spark.SparkLauncher.physicalToRDD(SparkLauncher.java:201)
at
org.apache.pig.backend.hadoop.executionengine.spark.SparkLauncher.launchPig(SparkLauncher.java:125)
at org.apache.pig.PigServer.launchPlan(PigServer.java:1328)
at 
org.apache.pig.PigServer.executeCompiledLogicalPlan(PigServer.java:1310)
at org.apache.pig.PigServer.storeEx(PigServer.java:993)
at org.apache.pig.PigServer.store(PigServer.java:957)
at org.apache.pig.PigServer.openIterator(PigServer.java:870)
at 
org.apache.pig.tools.grunt.GruntParser.processDump(GruntParser.java:729)
at
org.apache.pig.tools.pigscript.parser.PigScriptParser.parse(PigScriptParser.java:370)
at
org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:194)
at
org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:170)
at org.apache.pig.tools.grunt.Grunt.exec(Grunt.java:84)
at org.apache.pig.Main.run(Main.java:609)
at org.apache.pig.Main.main(Main.java:158)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:622)
at org.apache.hadoop.util.RunJar.main(RunJar.java:212)




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Pig-on-Spark-tp2367p3187.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Akka error with largish job (works fine for smaller versions)

2014-03-25 Thread Andrew Ash
Possibly one of your executors is in the middle of a large stop-the-world
GC and doesn't respond to network traffic during that period?  If you
shared some information about how each node in your cluster is set up (heap
size, memory, CPU, etc) that might help with debugging.

Andrew


On Mon, Mar 24, 2014 at 9:13 PM, Nathan Kronenfeld <
nkronenf...@oculusinfo.com> wrote:

> What does this error mean:
>
> @hadoop-s2.oculus.local:45186]: Error [Association failed with
> [akka.tcp://spark@hadoop-s2.oculus.local:45186]] [
> akka.remote.EndpointAssociationException: Association failed with
> [akka.tcp://spark@hadoop-s2.oculus.local:45186]
> Caused by:
> akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
> Connection refused: hadoop-s2.oculus.loca\
> l/192.168.0.47:45186
> ]
>
> ?
>
> --
> Nathan Kronenfeld
> Senior Visualization Developer
> Oculus Info Inc
> 2 Berkeley Street, Suite 600,
> Toronto, Ontario M5A 4J5
> Phone:  +1-416-203-3003 x 238
> Email:  nkronenf...@oculusinfo.com
>


Re: 答复: 答复: RDD usage

2014-03-25 Thread hequn cheng
Hi~I wrote a program to test.The non-idempotent "compute" function in
foreach does change the value of RDD. It may looks a little crazy to do so
since modify the RDD will make it impossible to keep RDD fault-tolerant in
spark :)



2014-03-25 11:11 GMT+08:00 林武康 :

>  Hi hequn, I dig into the source of spark a bit deeper, and I got some
> ideas, firstly, immutable is a feather of rdd but not a solid rule, there
> are ways to change it, for excample, a rdd with non-idempotent "compute"
> function, though it is really a bad design to make that function
> non-idempotent for uncontrollable side-effect. I agree with Mark that
> foreach can modify the elements of a rdd, but we should avoid this because
> it will effect all the rdds generate by this changed rdd , make the whole
> process inconsistent and unstable.
>
> Some rough opinions on the immutable feature of rdd, full discuss can make
> it more clear. Any ideas?
>  --
> 发件人: hequn cheng 
> 发送时间: 2014/3/25 10:40
> 收件人: user@spark.apache.org
> 主题: Re: 答复: RDD usage
>
>  First question:
> If you save your modified RDD like this:
> points.foreach(p=>p.y = another_value).collect() or
> points.foreach(p=>p.y = another_value).saveAsTextFile(...)
> the modified RDD will be materialized and this will not use any work's
> memory.
> If you have more transformatins after the map(), the spark will pipelines
> all transformations and build a DAG. Very little memory will be used in
> this stage and the memory will be free soon.
> Only cache() will persist your RDD in memory for a long time.
> Second question:
> Once RDD be created, it can not be changed due to the immutable
> feature.You can only create a new RDD from the existing RDD or from file
> system.
>
>
> 2014-03-25 9:45 GMT+08:00 林武康 :
>
>>  Hi hequn, a relative question, is that mean the memory usage will
>> doubled? And further more, if the compute function in a rdd is not
>> idempotent, rdd will changed during the job running, is that right?
>>  --
>> 发件人: hequn cheng 
>> 发送时间: 2014/3/25 9:35
>> 收件人: user@spark.apache.org
>> 主题: Re: RDD usage
>>
>>  points.foreach(p=>p.y = another_value) will return a new modified RDD.
>>
>>
>> 2014-03-24 18:13 GMT+08:00 Chieh-Yen :
>>
>>>  Dear all,
>>>
>>> I have a question about the usage of RDD.
>>> I implemented a class called AppDataPoint, it looks like:
>>>
>>> case class AppDataPoint(input_y : Double, input_x : Array[Double])
>>> extends Serializable {
>>>   var y : Double = input_y
>>>   var x : Array[Double] = input_x
>>>   ..
>>> }
>>> Furthermore, I created the RDD by the following function.
>>>
>>> def parsePoint(line: String): AppDataPoint = {
>>>   /* Some related works for parsing */
>>>   ..
>>> }
>>>
>>> Assume the RDD called "points":
>>>
>>> val lines = sc.textFile(inputPath, numPartition)
>>> var points = lines.map(parsePoint _).cache()
>>>
>>> The question is that, I tried to modify the value of this RDD, the
>>> operation is:
>>>
>>> points.foreach(p=>p.y = another_value)
>>>
>>> The operation is workable.
>>> There doesn't have any warning or error message showed by the system and
>>> the results are right.
>>> I wonder that if the modification for RDD is a correct and in fact
>>> workable design.
>>> The usage web said that the RDD is immutable, is there any suggestion?
>>>
>>> Thanks a lot.
>>>
>>> Chieh-Yen Lin
>>>
>>
>>
>


Re: Shark does not give any results with SELECT count(*) command

2014-03-25 Thread Praveen R
Hi Qingyang Li,

Shark-0.9.0 uses a patched version of hive-0.11 and using 
configuration/metastore of hive-0.12 could be incompatible.

May I know the reason you are using hive-site.xml from previous hive version(to 
use existing metastore?). You might just leave hive-site.xml blank, otherwise. 
Something like this:





In any case you could run ./bin/shark-withdebug for any errors.

Regards,
Praveen

On 25-Mar-2014, at 1:49 pm, qingyang li  wrote:

> reopen this thread because i encounter this problem again.
> Here is my env:
> scala 2.10.3 s  
> spark 0.9.0tandalone mode
> shark 0.9.0downlaod the source code and build by myself
> hive hive-shark-0.11
> I have copied hive-site.xml from my hadoop cluster , it's hive version is 
> 0.12,  after copied , i deleted some attributes from hive-site.xml 
> 
> When run select count(*) from xxx, no resut and no errors output.
> 
> Can someone give me some suggestions to debug ?
> 
> 
> 
> 
> 
> 2014-03-20 11:27 GMT+08:00 qingyang li :
> have found the cause , my problem is :  
> the style of file salves is not correct, so the task only be run on master.
> 
> explain here to help other guy who also encounter similiar problem.
> 
> 
> 2014-03-20 9:57 GMT+08:00 qingyang li :
> 
> Hi, i install spark0.9.0 and shark0.9 on 3 nodes , when i run select * from 
> src , i can get result, but when i run select count(*) from src or select * 
> from src limit 1,  there is no result output.
> 
> i have found similiar problem on google groups:
> https://groups.google.com/forum/#!searchin/spark-users/Shark$20does$20not$20give$20any$20results$20with$20SELECT$20command/spark-users/oKMBPBWim0U/_hbDCi4m-xUJ
> but , there is no solution on it.
> 
> Does anyone encounter such problem?
> 
> 



tracking resource usage for spark-shell commands

2014-03-25 Thread Bharath Bhushan
Is there a way to see the resource usage of each spark-shell command — say time 
taken and memory used?
I checked the WebUI of spark-shell and of the master and I don’t see any such 
breakdown. I see the time taken in the INFO logs but nothing about memory 
usage. It would also be nice to track the time taken in the spark-shell web UI.

—
Thanks

Re: N-Fold validation and RDD partitions

2014-03-25 Thread Jaonary Rabarisoa
There is also a "randomSplit" method in the latest version of spark
https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala


On Tue, Mar 25, 2014 at 1:21 AM, Holden Karau  wrote:

> There is also https://github.com/apache/spark/pull/18 against the current
> repo which may be easier to apply.
>
>
> On Fri, Mar 21, 2014 at 8:58 AM, Hai-Anh Trinh  wrote:
>
>> Hi Jaonary,
>>
>> You can find the code for k-fold CV in
>> https://github.com/apache/incubator-spark/pull/448. I have not find the
>> time to resubmit the pull to latest master.
>>
>>
>> On Fri, Mar 21, 2014 at 8:46 PM, Sanjay Awatramani > > wrote:
>>
>>> Hi Jaonary,
>>>
>>> I believe the n folds should be mapped into n Keys in spark using a map
>>> function. You can reduce the returned PairRDD and you should get your
>>> metric.
>>> I don't understand partitions fully, but from whatever I understand of
>>> it, they aren't required in your scenario.
>>>
>>> Regards,
>>> Sanjay
>>>
>>>
>>>   On Friday, 21 March 2014 7:03 PM, Jaonary Rabarisoa 
>>> wrote:
>>>   Hi
>>>
>>> I need to partition my data represented as RDD into n folds and run
>>> metrics computation in each fold and finally compute the means of my
>>> metrics overall the folds.
>>> Does spark can do the data partition out of the box or do I need to
>>> implement it myself. I know that RDD has a partitions method and
>>> mapPartitions but I really don't understand the purpose and the meaning of
>>> partition here.
>>>
>>>
>>>
>>> Cheers,
>>>
>>> Jaonary
>>>
>>>
>>>
>>
>>
>>  --
>> Hai-Anh Trinh | Senior Software Engineer | http://adatao.com/
>> http://www.linkedin.com/in/haianh
>>
>>
>
>
> --
> Cell : 425-233-8271
>


Re: How to set environment variable for a spark job

2014-03-25 Thread Sourav Chandra
You can pass them in the environment map used to create spark context.


On Tue, Mar 25, 2014 at 2:29 PM, santhoma  wrote:

> Hello
>
> I have a requirement to set some env values for my spark jobs.
> Does anyone know how to set them? Specifically following variables:
>
> 1) ORACLE_HOME
> 2) LD_LIBRARY_PATH
>
> thanks
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-set-environment-variable-for-a-spark-job-tp3180.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>



-- 

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

sourav.chan...@livestream.com

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com


How to set environment variable for a spark job

2014-03-25 Thread santhoma
Hello

I have a requirement to set some env values for my spark jobs.
Does anyone know how to set them? Specifically following variables:

1) ORACLE_HOME 
2) LD_LIBRARY_PATH

thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-set-environment-variable-for-a-spark-job-tp3180.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


graph.persist error

2014-03-25 Thread moxing
Hi,
I am dealing with a graph consisting of 20 million nodes and 2 billion
edges. When I want to persist the graph then an exception throw :
   Caused by: java.lang.UnsupportedOperationException: Cannot change storage
level of an RDD after it was already assigned a leve
Here is my code:
   def main(args: Array[String]) {
if (args.length == 0) {
  System.err.println("Usage: Graph_on_Spark [master] ")
  System.exit(1)
}
val sc = new SparkContext(args(0), "Graph_on_Spark",
  System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
val hdfspath = ""
var userRDD = sc.textFile(…)
var edgeRDD:RDD[Edge[String]] = sc.textFile(…)
for( no <- 1 to 4){
val vertexfile = sc.textFile(…)
userRDD = userRDD.union( vertexfile.map{… } )
val edgefile = sc.textFile(…)
edgeRDD = edgeRDD.union( …)
}
val graph = Graph(userRDD,edgeRDD,"Empty")
println(graph.vertices.count)
println(graph.edges.count)
println("graph form success")
val initialgraph = graph.persist(storage.StorageLevel.DISK_ONLY)

I don’t have no operation such as cache or persist before. 
Another question, when execute code below, would get a exception:
 Exception failure: java.lang.ArrayIndexOutOfBoundsException

   while(i < maxIter){
println("Iteration")
println(g.vertices.count)
val newVerts = g.vertices.innerJoin(messages)(pregel_vprog)
g = g.outerJoinVertices(newVerts) { (vid,old,newOpt) =>
newOpt.getOrElse((old._1,"")) }
println(g.vertices.count)
messages =
g.mapReduceTriplets[String](pregel_sendMsg,pregel_mergeFunc,Some((newVerts,activeDir)))
println(g.vertices.count)
i += 1
 }
   



Thanks




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/graph-persist-error-tp3179.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Java API - Serialization Issue

2014-03-25 Thread santhoma
This worked great. Thanks a lot



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Java-API-Serialization-Issue-tp1460p3178.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Shark does not give any results with SELECT count(*) command

2014-03-25 Thread qingyang li
reopen this thread because i encounter this problem again.
Here is my env:
scala 2.10.3 s
spark 0.9.0tandalone mode
shark 0.9.0downlaod the source code and build by myself
hive hive-shark-0.11
I have copied hive-site.xml from my hadoop cluster , it's hive version is
0.12,  after copied , i deleted some attributes from hive-site.xml

When run select count(*) from xxx, no resut and no errors output.

Can someone give me some suggestions to debug ?





2014-03-20 11:27 GMT+08:00 qingyang li :

> have found the cause , my problem is :
> the style of file salves is not correct, so the task only be run on master.
>
> explain here to help other guy who also encounter similiar problem.
>
>
> 2014-03-20 9:57 GMT+08:00 qingyang li :
>
> Hi, i install spark0.9.0 and shark0.9 on 3 nodes , when i run select *
>> from src , i can get result, but when i run select count(*) from src or
>> select * from src limit 1,  there is no result output.
>>
>> i have found similiar problem on google groups:
>>
>> https://groups.google.com/forum/#!searchin/spark-users/Shark$20does$20not$20give$20any$20results$20with$20SELECT$20command/spark-users/oKMBPBWim0U/_hbDCi4m-xUJ
>> but , there is no solution on it.
>>
>> Does anyone encounter such problem?
>>
>
>