Re: DataFrame RDDs

2013-11-18 Thread Anwar Rizal
I had that in mind too when Miles Sabin presented Shapeless at Scala.IO
Paris last month.

If anybody would like to experiment with shapeless in Spark to create
something like R data frame or In canter dataset, I would be happy to see
and eventually help.

My feeling is however the fact that shapeless goes fast (eg. in my
understanding, the latest shapeless requires 2.11) may be a problem.
On Nov 19, 2013 12:46 AM, "andy petrella"  wrote:

> Maybe I'm wrong, but this use case could be a good fit for 
> Shapeless'
> records.
>
> Shapeless' records are like, so to say, lisp's record but typed! In that
> sense, they're more closer to Haskell's record notation, but imho less
> powerful, since the access will be based on String (field name) for
> Shapeless where Haskell will use pure functions!
>
> Anyway, this 
> documentation
>  is
> self-explanatory and straightforward how we (maybe) could use them to
> simulate an R's frame
>
> Thinking out loud: when reading a csv file, for instance, what would be
> needed are
>  * a Read[T] for each column,
>  * fold'ling the list of columns by "reading" each and prepending the
> result (combined with the name with ->>) to an HList
>
> The gain would be that we should recover one helpful feature of R's frame
> which is:
>   R   :: frame$newCol = frame$post - frame$pre
>   // which adds a column to a frame
>   Shpls :: frame2 = frame + ("newCol" --> (frame("post") - frame("pre")))
> // type safe "difference" between ints for instance
>
> Of course, we're not recovering R's frame as is, because we're simply
> dealing with rows on by one, where a frame is dealing with the full table
> -- but in the case of Spark this would have no sense to mimic that, since
> we use RDDs for that :-D.
>
> I didn't experimented this yet, but It'd be fun to try, don't know if
> someone is interested in ^^
>
> Cheers
>
> andy
>
>
> On Fri, Nov 15, 2013 at 8:49 PM, Christopher Nguyen wrote:
>
>> Sure, Shay. Let's connect offline.
>>
>> Sent while mobile. Pls excuse typos etc.
>> On Nov 16, 2013 2:27 AM, "Shay Seng"  wrote:
>>
>>> Nice, any possibility of sharing this code in advance?
>>>
>>>
>>> On Fri, Nov 15, 2013 at 11:22 AM, Christopher Nguyen wrote:
>>>
 Shay, we've done this at Adatao, specifically a big data frame in RDD
 representation and subsetting/projections/data mining/machine learning
 algorithms on that in-memory table structure.

 We're planning to harmonize that with the MLBase work in the near
 future. Just a matter of prioritization on limited resources. If there's
 enough interest we'll accelerate that.

 Sent while mobile. Pls excuse typos etc.
 On Nov 16, 2013 1:11 AM, "Shay Seng"  wrote:

> Hi,
>
> Is there some way to get R-style Data.Frame data structures into RDDs?
> I've been using RDD[Seq[]] but this is getting quite error-prone and the
> code gets pretty hard to read especially after a few joins, maps etc.
>
> Rather than access columns by index, I would prefer to access them by
> name.
> e.g. instead of writing:
> myrdd.map(l => Seq(l(0), l(1), l,(4), l(9))
> I would prefer to write
> myrdd.map(l => DataFrame(l.id, l.entryTime, l.exitTime, l.cost))
>
> Also joins are particularly irritating. Currently I have to first
> construct a pair:
> somePairRdd.join(myrdd.map(l=> (l(1),l(2)), (l(0),l(1),l(2),l(3)))
> Now I have to unzip away the join-key and remap the values into a seq
>
> instead I would rather write
> someDataFrame.join(myrdd , l=> l.entryTime && l.exitTime)
>
>
> The question is this:
> (1) I started writing a DataFrameRDD class that kept track of the
> column names and column values, and some optional attributes common to the
> entire dataframe. However I got a little muddled when trying to figure out
> what happens when a dataframRDD is chained with other operations and get
> transformed to other types of RDDs. The Value part of the RDD is obvious,
> but I didn't know the best way to pass on the "column and attribute"
> portions of the DataFrame class.
>
> I googled around for some documentation on how to write RDDs, but only
> found a pptx slide presentation with very vague info. Is there a better
> source of info on how to write RDDs?
>
> (2) Even better than info on how to write RDDs, has anyone written an
> RDD that functions as a DataFrame? :-)
>
> tks
> shay
>

>>>
>


Re: DataFrame RDDs

2013-11-18 Thread Matei Zaharia
Interesting idea — in Scala you can also use the Dynamic type 
(http://hacking-scala.org/post/49051516694/introduction-to-type-dynamic) to 
allow dynamic properties. It has the same potential pitfalls as string names, 
but with nicer syntax.

Matei

On Nov 18, 2013, at 3:45 PM, andy petrella  wrote:

> Maybe I'm wrong, but this use case could be a good fit for Shapeless' records.
> 
> Shapeless' records are like, so to say, lisp's record but typed! In that 
> sense, they're more closer to Haskell's record notation, but imho less 
> powerful, since the access will be based on String (field name) for Shapeless 
> where Haskell will use pure functions!
> 
> Anyway, this documentation is self-explanatory and straightforward how we 
> (maybe) could use them to simulate an R's frame
> 
> Thinking out loud: when reading a csv file, for instance, what would be 
> needed are 
>  * a Read[T] for each column, 
>  * fold'ling the list of columns by "reading" each and prepending the result 
> (combined with the name with ->>) to an HList
> 
> The gain would be that we should recover one helpful feature of R's frame 
> which is:
>   R   :: frame$newCol = frame$post - frame$pre
>// which adds a column to a frame
>   Shpls :: frame2 = frame + ("newCol" --> (frame("post") - frame("pre"))) 
> // type safe "difference" between ints for instance
>
> Of course, we're not recovering R's frame as is, because we're simply dealing 
> with rows on by one, where a frame is dealing with the full table -- but in 
> the case of Spark this would have no sense to mimic that, since we use RDDs 
> for that :-D.
> 
> I didn't experimented this yet, but It'd be fun to try, don't know if someone 
> is interested in ^^
> 
> Cheers
> 
> andy
> 
> 
> On Fri, Nov 15, 2013 at 8:49 PM, Christopher Nguyen  wrote:
> Sure, Shay. Let's connect offline.
> 
> Sent while mobile. Pls excuse typos etc.
> 
> On Nov 16, 2013 2:27 AM, "Shay Seng"  wrote:
> Nice, any possibility of sharing this code in advance? 
> 
> 
> On Fri, Nov 15, 2013 at 11:22 AM, Christopher Nguyen  wrote:
> Shay, we've done this at Adatao, specifically a big data frame in RDD 
> representation and subsetting/projections/data mining/machine learning 
> algorithms on that in-memory table structure.
> 
> We're planning to harmonize that with the MLBase work in the near future. 
> Just a matter of prioritization on limited resources. If there's enough 
> interest we'll accelerate that.
> 
> Sent while mobile. Pls excuse typos etc.
> 
> On Nov 16, 2013 1:11 AM, "Shay Seng"  wrote:
> Hi, 
> 
> Is there some way to get R-style Data.Frame data structures into RDDs? I've 
> been using RDD[Seq[]] but this is getting quite error-prone and the code gets 
> pretty hard to read especially after a few joins, maps etc. 
> 
> Rather than access columns by index, I would prefer to access them by name.
> e.g. instead of writing:
> myrdd.map(l => Seq(l(0), l(1), l,(4), l(9))
> I would prefer to write
> myrdd.map(l => DataFrame(l.id, l.entryTime, l.exitTime, l.cost))
> 
> Also joins are particularly irritating. Currently I have to first construct a 
> pair:
> somePairRdd.join(myrdd.map(l=> (l(1),l(2)), (l(0),l(1),l(2),l(3)))
> Now I have to unzip away the join-key and remap the values into a seq
> 
> instead I would rather write
> someDataFrame.join(myrdd , l=> l.entryTime && l.exitTime)
> 
> 
> The question is this:
> (1) I started writing a DataFrameRDD class that kept track of the column 
> names and column values, and some optional attributes common to the entire 
> dataframe. However I got a little muddled when trying to figure out what 
> happens when a dataframRDD is chained with other operations and get 
> transformed to other types of RDDs. The Value part of the RDD is obvious, but 
> I didn't know the best way to pass on the "column and attribute" portions of 
> the DataFrame class.
> 
> I googled around for some documentation on how to write RDDs, but only found 
> a pptx slide presentation with very vague info. Is there a better source of 
> info on how to write RDDs? 
> 
> (2) Even better than info on how to write RDDs, has anyone written an RDD 
> that functions as a DataFrame? :-)
> 
> tks
> shay
> 
> 



Re: Joining files

2013-11-18 Thread Horia
It seems to me that what you want is the following procedure
- parse each file line by line
- generate key, value pairs
- join by key

I think the following should accomplish what you're looking for

val students = sc.textFile("./students.txt")// mapping over this RDD
already maps over lines
val courses = sc.textFile("./courses.txt")// mapping over this RDD
already maps over lines
val left = students.map( x => {
columns = x.split(",")
(columns(1), (columns(0), columns(2)))
} )
val right = courses.map( x => {
columns = x.split(",")
(columns(0), columns(1))
} )
val joined = left.join(right)


The major difference is selectively returning the fields which you actually
want to join, rather than all the fields. A secondary difference is
syntactic: you don't need a .map().map() since you can use a slightly more
complex function block as illustrated. I think Spark is smart enough to
optimize the .map().map() to basically what I've explicitly written...

Horia



On Mon, Nov 18, 2013 at 10:34 PM, Something Something <
mailinglist...@gmail.com> wrote:

> Was my question so dumb?  Or, is this not a good use case for Spark?
>
>
> On Sun, Nov 17, 2013 at 11:41 PM, Something Something <
> mailinglist...@gmail.com> wrote:
>
>> I am a newbie to both Spark & Scala, but I've been working with
>> Hadoop/Pig for quite some time.
>>
>> We've quite a few ETL processes running in production that use Pig, but
>> now we're evaluating Spark to see if they would indeed run faster.
>>
>> A very common use case in our Pig script is joining a file containing
>> Facts to a file containing Dimension data.  The joins are of course, inner,
>> left & outer.
>>
>> I thought I would start simple.  Let's say I've 2 files:
>>
>> 1)  Students:  student_id, course_id, score
>> 2)  Course:  course_id, course_title
>>
>> We want to produce a file that contains:  student_id, course_title, score
>>
>> (Note:  This is a hypothetical case.  The real files have millions of
>> facts & thousands of dimensions)
>>
>> Would something like this work?  Note:  I did say I am a newbie ;)
>>
>> val students = sc.textFile("./students.txt")
>> val courses = sc.textFile("./courses.txt")
>> val s = students.map(x => x.split(','))
>> val left = students.map(x => x.split(',')).map(y => (y(1), y))
>> val right = courses.map(x => x.split(',')).map(y => (y(0), y))
>> val joined = left.join(right)
>>
>>
>> Any pointers in this regard would be greatly appreciated.  Thanks.
>>
>
>


Re: Joining files

2013-11-18 Thread Alex Boisvert
Yes it would work and fit spark nicely... Pretty typical I think.
On Nov 18, 2013 10:34 PM, "Something Something" 
wrote:

> Was my question so dumb?  Or, is this not a good use case for Spark?
>
>
> On Sun, Nov 17, 2013 at 11:41 PM, Something Something <
> mailinglist...@gmail.com> wrote:
>
>> I am a newbie to both Spark & Scala, but I've been working with
>> Hadoop/Pig for quite some time.
>>
>> We've quite a few ETL processes running in production that use Pig, but
>> now we're evaluating Spark to see if they would indeed run faster.
>>
>> A very common use case in our Pig script is joining a file containing
>> Facts to a file containing Dimension data.  The joins are of course, inner,
>> left & outer.
>>
>> I thought I would start simple.  Let's say I've 2 files:
>>
>> 1)  Students:  student_id, course_id, score
>> 2)  Course:  course_id, course_title
>>
>> We want to produce a file that contains:  student_id, course_title, score
>>
>> (Note:  This is a hypothetical case.  The real files have millions of
>> facts & thousands of dimensions)
>>
>> Would something like this work?  Note:  I did say I am a newbie ;)
>>
>> val students = sc.textFile("./students.txt")
>> val courses = sc.textFile("./courses.txt")
>> val s = students.map(x => x.split(','))
>> val left = students.map(x => x.split(',')).map(y => (y(1), y))
>> val right = courses.map(x => x.split(',')).map(y => (y(0), y))
>> val joined = left.join(right)
>>
>>
>> Any pointers in this regard would be greatly appreciated.  Thanks.
>>
>
>


Re: Joining files

2013-11-18 Thread Something Something
Was my question so dumb?  Or, is this not a good use case for Spark?


On Sun, Nov 17, 2013 at 11:41 PM, Something Something <
mailinglist...@gmail.com> wrote:

> I am a newbie to both Spark & Scala, but I've been working with Hadoop/Pig
> for quite some time.
>
> We've quite a few ETL processes running in production that use Pig, but
> now we're evaluating Spark to see if they would indeed run faster.
>
> A very common use case in our Pig script is joining a file containing
> Facts to a file containing Dimension data.  The joins are of course, inner,
> left & outer.
>
> I thought I would start simple.  Let's say I've 2 files:
>
> 1)  Students:  student_id, course_id, score
> 2)  Course:  course_id, course_title
>
> We want to produce a file that contains:  student_id, course_title, score
>
> (Note:  This is a hypothetical case.  The real files have millions of
> facts & thousands of dimensions)
>
> Would something like this work?  Note:  I did say I am a newbie ;)
>
> val students = sc.textFile("./students.txt")
> val courses = sc.textFile("./courses.txt")
> val s = students.map(x => x.split(','))
> val left = students.map(x => x.split(',')).map(y => (y(1), y))
> val right = courses.map(x => x.split(',')).map(y => (y(0), y))
> val joined = left.join(right)
>
>
> Any pointers in this regard would be greatly appreciated.  Thanks.
>


[Advice works] Re: Can not get the expected output when running the BroadcastTest example program.

2013-11-18 Thread 杨强
Thanks, Aaron.
Your advice really works.

Does this mean that the collect() method pulls all related data from slave 
nodes to master node?




Sincerely

Yang, Qiang

发件人: Aaron Davidson
发送时间: 2013年11月19日(星期二) 下午12:38
收件人: user; yangqiang
主题: Re: Can not get the expected output when running the BroadcastTestexample 
program.
Assuming your cluster is actually working (e.g., other examples like SparkPi 
work), then the problem is probably that println() doesn't actually write 
output back to the driver; instead, it may just be outputting locally to each 
slave. You can test this by replacing lines 43 through 45 with:


  sc.parallelize(1 to 10, slices).map {
i => barr1.value.size
  }.collect().foreach(i => println(i))


which should gather the exact same data but ensure that the printlns actually 
occur on the driver.

Re: Can not get the expected output when running the BroadcastTest example program.

2013-11-18 Thread Aaron Davidson
Assuming your cluster is actually working (e.g., other examples like
SparkPi work), then the problem is probably that println() doesn't actually
write output back to the driver; instead, it may just be outputting locally
to each slave. You can test this by replacing lines 43 through 45 with:

  sc.parallelize(1 to 10, slices).map {
i => barr1.value.size
  }.collect().foreach(i => println(i))

which should gather the exact same data but ensure that the printlns
actually occur on the driver.

On Mon, Nov 18, 2013 at 5:59 PM, 杨强  wrote:

>  Hi, all.
> I'm using spark-0.8.0-incubating.
>
> I tried the example BroadcastTest in local mode.
> ./run-example org.apache.spark.examples.BroadcastTest local 1 2>/dev/null
> This works fine and get the result:
>  Iteration 0
> ===
> 100
> 100
> 100
> 100
> 100
> 100
> 100
> 100
> 100
> 100
> Iteration 1
> ===
> 100
> 100
> 100
> 100
> 100
> 100
> 100
> 100
> 100
> 100
>
> But when I run this program in the cluster(standalone mode) with:
> ./run-example org.apache.spark.examples.BroadcastTest spark://
> 172.16.1.39:7077 5 2>/dev/null
> This output is as follows:
>  Iteration 0
> ===
> Iteration 1
> ===
>
> I also tried command
> ./run-example org.apache.spark.examples.BroadcastTest spark://
> 172.16.1.39:7077 5
> but I did not find any error message.
>
> Hope someone can give me some advices. Thank you.
>
>
> The content of file etc/spark-env.sh is as follows:
>
>  export SCALA_HOME=/usr/lib/scala-2.9.3
> export SPARK_MASTER_IP=172.16.1.39
> export SPARK_MASTER_WEBUI_PORT=8090
> export SPARK_WORKER_WEBUI_PORT=8091
> export SPARK_WORKER_MEMORY=2G
>
> #export 
> SPARK_CLASSPATH=.:/home/spark-0.7.3/core/target/spark-core-assembly-0.7.3.jar:$SPACK_CLASSPATH
>
> export 
> SPARK_CLASSPATH=.:/home/hadoop/spark-0.8.0-incubating/conf:/home/hadoop/spark-0.8.0-incubating/assembly/target/scala-2.9.3/spark-assembly-0.8.0-incubating-hadoop1.0.1.jar:/home/hadoop/hadoop-1.0.1/conf
>
> --
>  Sincerely
>
> Yang, Qiang
>


Re: EC2 node submit jobs to separate Spark Cluster

2013-11-18 Thread Aaron Davidson
The main issue with running a spark-shell locally is that it orchestrates
the actual computation, so you want it to be "close" to the actual Worker
nodes for latency reasons. Running a spark-shell on EC2 in the same region
as the Spark cluster avoids this problem.

The error you're seeing seems to indicate a different issue. Check the
Master web UI (accessible on port 8080 at the master's IP address) to make
sure that Workers are successfully registered and they have the expected
amount of memory available to Spark. You can also check to see how much
memory your spark-shell is trying to get per executor. A couple common
problems are (1) an abandoned spark-shell is holding onto all of your
cluster's resources or (2) you've manually configured your spark-shell to
try to get more memory than your Workers have available. Both of these
should be visible in the web UI.


On Mon, Nov 18, 2013 at 5:00 PM, Matt Cheah  wrote:

>  Hi,
>
>  I'm working with an infrastructure that already has its own web server
> set up on EC2. I would like to set up a *separate* spark cluster on EC2
> with the scripts and have the web server submit jobs to this spark cluster.
>
>  Is it possible to do this? I'm getting some errors running the spark
> shell from the spark shell on the web server: "Initial job has not accepted
> any resources; check your cluster UI to ensure that workers are registered
> and have sufficient memory". I have heard that it's not possible for any
> local computer to connect to the spark cluster, but I was wondering if
> other EC2 nodes could have their firewalls configured to allow this.
>
>  We don't want to deploy the web server on the master node of the spark
> cluster.
>
>  Thanks,
>
>  -Matt Cheah
>
>
>


Re: debugging a Spark error

2013-11-18 Thread Aaron Davidson
Have you looked a the Spark executor logs? They're usually located in the
$SPARK_HOME/work/ directory. If you're running in a cluster, they'll be on
the individual slave nodes. These should hopefully reveal more information.


On Mon, Nov 18, 2013 at 3:42 PM, Chris Grier wrote:

> Hi,
>
> I'm trying to figure out what the problem is with a job that we are
> running on Spark 0.7.3. When we write out via saveAsTextFile we get an
> exception that doesn't reveal much:
>
> 13/11/18 15:06:19 INFO cluster.TaskSetManager: Loss was due to
> java.io.IOException
> java.io.IOException: Map failed
> at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:849)
> at spark.storage.DiskStore.getBytes(DiskStore.scala:86)
> at spark.storage.DiskStore.getValues(DiskStore.scala:92)
> at spark.storage.BlockManager.getLocal(BlockManager.scala:284)
> at spark.storage.BlockFetcherIterator$$anonfun$
> 13.apply(BlockManager.scala:1027)
> at spark.storage.BlockFetcherIterator$$anonfun$
> 13.apply(BlockManager.scala:1026)
> at scala.collection.mutable.ResizableArray$class.foreach(
> ResizableArray.scala:60)
> at scala.collection.mutable.ArrayBuffer.foreach(
> ArrayBuffer.scala:47)
> at spark.storage.BlockFetcherIterator.(
> BlockManager.scala:1026)
> at spark.storage.BlockManager.getMultiple(BlockManager.scala:478)
> at spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.
> scala:51)
> at spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.
> scala:10)
> at spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(
> CoGroupedRDD.scala:127)
> at spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(
> CoGroupedRDD.scala:115)
> at scala.collection.IndexedSeqOptimized$class.
> foreach(IndexedSeqOptimized.scala:34)
> at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:38)
> at spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:115)
> at spark.RDD.computeOrReadCheckpoint(RDD.scala:207)
> at spark.RDD.iterator(RDD.scala:196)
> at spark.MappedValuesRDD.compute(PairRDDFunctions.scala:704)
> at spark.RDD.computeOrReadCheckpoint(RDD.scala:207)
> at spark.RDD.iterator(RDD.scala:196)
> at spark.FlatMappedValuesRDD.compute(PairRDDFunctions.scala:714)
> at spark.RDD.computeOrReadCheckpoint(RDD.scala:207)
> at spark.RDD.iterator(RDD.scala:196)
> at spark.rdd.MappedRDD.compute(MappedRDD.scala:12)
> at spark.RDD.computeOrReadCheckpoint(RDD.scala:207)
> at spark.RDD.iterator(RDD.scala:196)
> at spark.rdd.MappedRDD.compute(MappedRDD.scala:12)
> at spark.RDD.computeOrReadCheckpoint(RDD.scala:207)
> at spark.RDD.iterator(RDD.scala:196)
> at spark.scheduler.ResultTask.run(ResultTask.scala:77)
> at spark.executor.Executor$TaskRunner.run(Executor.scala:100)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1145)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:724)
>
> Any ideas?
>
> -Chris
>


Re: App master failed to find application jar in the master branch on YARN

2013-11-18 Thread Tom Graves
Sorry for the delay. What is the default filesystem on your HDFS setup?  It 
looks like its set to file: rather then hdfs://.  That is the only reason I can 
think its listing the directory as  
file:/home/work/.sparkStaging/application_1384588058297_0056.  Its basically 
just copying it local rather then uploading to hdfs and its just trying to use 
the local  
file:/home/work/guojiacheng/spark/assembly/target/scala-2.9.3/spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop2.0.5-alpha.jar.
  It generally would create that in hdfs so it accessible on all the nodes.  Is 
your /home/work nfs mounted on all the nodes?    

You can find the default fs by looking at the Hadoop config files.  Generally 
in core-site.xml.  its specified by:         fs.default.name

Its pretty odd if those are its erroring with file:// when you specified 
hdfs://.
when you tried the hdfs:// did you upload both the spark jar and your client 
jar (SparkAUC-assembly-0.1.jar)?  If not try that and make sure to put hdfs:// 
on them when you export SPARK_JAR and specify the --jar option.  


I'll try to reproduce the error tomorrow to see if a bug was introduced when I 
added the feature to run spark from HDFS.

Tom



On Monday, November 18, 2013 11:13 AM, guojc  wrote:
 
Hi Tom,
   I'm on Hadoop 2.05.  I can launch application spark 0.8 release normally. 
However I switch to git master branch version with application built with it, I 
got the jar not found exception and same happens to the example application. I 
have tried both file:// protocol and hdfs:// protocol with jar in local file 
system and hdfs respectively, and even tried jar list parameter when new spark 
context.  The exception is slightly different for hdfs protocol and local file 
path. My application launch command is   

 
SPARK_JAR=/home/work/guojiacheng/spark/assembly/target/scala-2.9.3/spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop2.0.5-alpha.jar
 /home/work/guojiacheng/spark/spark-class  org.apache.spark.deploy.yarn.Client 
--jar 
/home/work/guojiacheng/spark-auc/target/scala-2.9.3/SparkAUC-assembly-0.1.jar 
--class  myClass.SparkAUC --args -c --args yarn-standalone  --args -i --args 
hdfs://{hdfs_host}:9000/user/work/guojiacheng/data --args -m --args 
hdfs://{hdfs_host}:9000/user/work/guojiacheng/model_large --args -o --args 
hdfs://{hdfs_host}:9000/user/work/guojiacheng/score --num-workers 60  
--master-memory 6g --worker-memory 7g --worker-cores 1

And my build command is SPARK_HADOOP_VERSION=2.0.5-alpha SPARK_YARN=true 
sbt/sbt assembly

Only thing I can think of might be related is on each cluster node, it has a 
env SPARK_HOME point to a copy of 0.8 version's position, and its bin fold is 
in Path environment variable. And 0.9 version is not there.  It was something 
left over, when cluster was setup.  But I don't know whether it is related, as 
my understand is the yarn version try to distribute spark through yarn.

hdfs version error message:


         appDiagnostics: Application application_1384588058297_0056 failed 1 
times due to AM Container for appattempt_1384588058297_0056_01 exited with  
exitCode: -1000 due to: RemoteTrace: 
java.io.FileNotFoundException: File 
file:/home/work/.sparkStaging/application_1384588058297_0056/SparkAUC-assembly-0.1.jar
 does not exist
   
local version error message.
appDiagnostics: Application application_1384588058297_0066 failed 1 times due 
to AM Container for appattempt_1384588058297_0066_01 exited with  exitCode: 
-1000 due to: java.io.FileNotFoundException: File 
file:/home/work/guojiacheng/spark/assembly/target/scala-2.9.3/spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop2.0.5-alpha.jar
 does not exist


Best Regards,
Jiacheng GUo




On Mon, Nov 18, 2013 at 10:34 PM, Tom Graves  wrote:

Hey Jiacheng Guo,
>
>
>do you have SPARK_EXAMPLES_JAR env variable set?  If you do, you have to add 
>the --addJars parameter to the yarn client and point to the spark examples 
>jar.  Or just unset SPARK_EXAMPLES_JAR env variable.
>
>
>You should only have to set SPARK_JAR env variable.  
>
>
>If that isn't the issue let me know the build command you used and hadoop 
>version, and your defaultFs or hadoop.
>
>
>Tom
>
>
>
>On Saturday, November 16, 2013 2:32 AM, guojc  wrote:
> 
>hi,
>   After reading about the exiting progress in consolidating shuffle, I'm 
>eager to trying out the last master branch. However up to launch the example 
>application, the job failed with prompt the app master failed to find the 
>target jar. appDiagnostics: Application application_1384588058297_0017 failed 
>1 times due to AM Container for appattempt_1384588058297_0017_01 exited 
>with  exitCode: -1000 due to: java.io.FileNotFoundException: File 
>file:/${my_work_dir}/spark/examples/target/scala-2.9.3/spark-examples-assembly-0.9.0-incubating-SNAPSHOT.jar
> does not exist.
>
>
>  Is there any change on how to launch a yarn job now?
>
>
>Best Regards,
>Jiacheng Guo
>
>
>
>

Can not get the expected output when running the BroadcastTest example program.

2013-11-18 Thread 杨强
Hi, all.
I'm using spark-0.8.0-incubating.

I tried the example BroadcastTest in local mode.
./run-example org.apache.spark.examples.BroadcastTest local 1 2>/dev/null 
This works fine and get the result:
Iteration 0
===
100
100
100
100
100
100
100
100
100
100
Iteration 1
===
100
100
100
100
100
100
100
100
100
100

But when I run this program in the cluster(standalone mode) with:
./run-example org.apache.spark.examples.BroadcastTest spark://172.16.1.39:7077 
5 2>/dev/null 
This output is as follows:
Iteration 0
===
Iteration 1
===

I also tried command
./run-example org.apache.spark.examples.BroadcastTest spark://172.16.1.39:7077 5
but I did not find any error message.

Hope someone can give me some advices. Thank you.


The content of file etc/spark-env.sh is as follows:

export SCALA_HOME=/usr/lib/scala-2.9.3
export SPARK_MASTER_IP=172.16.1.39
export SPARK_MASTER_WEBUI_PORT=8090
export SPARK_WORKER_WEBUI_PORT=8091
export SPARK_WORKER_MEMORY=2G
#export 
SPARK_CLASSPATH=.:/home/spark-0.7.3/core/target/spark-core-assembly-0.7.3.jar:$SPACK_CLASSPATH
export 
SPARK_CLASSPATH=.:/home/hadoop/spark-0.8.0-incubating/conf:/home/hadoop/spark-0.8.0-incubating/assembly/target/scala-2.9.3/spark-assembly-0.8.0-incubating-hadoop1.0.1.jar:/home/hadoop/hadoop-1.0.1/conf




Sincerely

Yang, Qiang

EC2 node submit jobs to separate Spark Cluster

2013-11-18 Thread Matt Cheah
Hi,

I'm working with an infrastructure that already has its own web server set up 
on EC2. I would like to set up a separate spark cluster on EC2 with the scripts 
and have the web server submit jobs to this spark cluster.

Is it possible to do this? I'm getting some errors running the spark shell from 
the spark shell on the web server: "Initial job has not accepted any resources; 
check your cluster UI to ensure that workers are registered and have sufficient 
memory". I have heard that it's not possible for any local computer to connect 
to the spark cluster, but I was wondering if other EC2 nodes could have their 
firewalls configured to allow this.

We don't want to deploy the web server on the master node of the spark cluster.

Thanks,

-Matt Cheah




Re: DataFrame RDDs

2013-11-18 Thread andy petrella
Maybe I'm wrong, but this use case could be a good fit for
Shapeless'
records.

Shapeless' records are like, so to say, lisp's record but typed! In that
sense, they're more closer to Haskell's record notation, but imho less
powerful, since the access will be based on String (field name) for
Shapeless where Haskell will use pure functions!

Anyway, this 
documentation
is
self-explanatory and straightforward how we (maybe) could use them to
simulate an R's frame

Thinking out loud: when reading a csv file, for instance, what would be
needed are
 * a Read[T] for each column,
 * fold'ling the list of columns by "reading" each and prepending the
result (combined with the name with ->>) to an HList

The gain would be that we should recover one helpful feature of R's frame
which is:
  R   :: frame$newCol = frame$post - frame$pre
  // which adds a column to a frame
  Shpls :: frame2 = frame + ("newCol" --> (frame("post") - frame("pre")))
  // type safe "difference" between ints for instance

Of course, we're not recovering R's frame as is, because we're simply
dealing with rows on by one, where a frame is dealing with the full table
-- but in the case of Spark this would have no sense to mimic that, since
we use RDDs for that :-D.

I didn't experimented this yet, but It'd be fun to try, don't know if
someone is interested in ^^

Cheers

andy


On Fri, Nov 15, 2013 at 8:49 PM, Christopher Nguyen  wrote:

> Sure, Shay. Let's connect offline.
>
> Sent while mobile. Pls excuse typos etc.
> On Nov 16, 2013 2:27 AM, "Shay Seng"  wrote:
>
>> Nice, any possibility of sharing this code in advance?
>>
>>
>> On Fri, Nov 15, 2013 at 11:22 AM, Christopher Nguyen wrote:
>>
>>> Shay, we've done this at Adatao, specifically a big data frame in RDD
>>> representation and subsetting/projections/data mining/machine learning
>>> algorithms on that in-memory table structure.
>>>
>>> We're planning to harmonize that with the MLBase work in the near
>>> future. Just a matter of prioritization on limited resources. If there's
>>> enough interest we'll accelerate that.
>>>
>>> Sent while mobile. Pls excuse typos etc.
>>> On Nov 16, 2013 1:11 AM, "Shay Seng"  wrote:
>>>
 Hi,

 Is there some way to get R-style Data.Frame data structures into RDDs?
 I've been using RDD[Seq[]] but this is getting quite error-prone and the
 code gets pretty hard to read especially after a few joins, maps etc.

 Rather than access columns by index, I would prefer to access them by
 name.
 e.g. instead of writing:
 myrdd.map(l => Seq(l(0), l(1), l,(4), l(9))
 I would prefer to write
 myrdd.map(l => DataFrame(l.id, l.entryTime, l.exitTime, l.cost))

 Also joins are particularly irritating. Currently I have to first
 construct a pair:
 somePairRdd.join(myrdd.map(l=> (l(1),l(2)), (l(0),l(1),l(2),l(3)))
 Now I have to unzip away the join-key and remap the values into a seq

 instead I would rather write
 someDataFrame.join(myrdd , l=> l.entryTime && l.exitTime)


 The question is this:
 (1) I started writing a DataFrameRDD class that kept track of the
 column names and column values, and some optional attributes common to the
 entire dataframe. However I got a little muddled when trying to figure out
 what happens when a dataframRDD is chained with other operations and get
 transformed to other types of RDDs. The Value part of the RDD is obvious,
 but I didn't know the best way to pass on the "column and attribute"
 portions of the DataFrame class.

 I googled around for some documentation on how to write RDDs, but only
 found a pptx slide presentation with very vague info. Is there a better
 source of info on how to write RDDs?

 (2) Even better than info on how to write RDDs, has anyone written an
 RDD that functions as a DataFrame? :-)

 tks
 shay

>>>
>>


debugging a Spark error

2013-11-18 Thread Chris Grier
Hi,

I'm trying to figure out what the problem is with a job that we are running
on Spark 0.7.3. When we write out via saveAsTextFile we get an exception
that doesn't reveal much:

13/11/18 15:06:19 INFO cluster.TaskSetManager: Loss was due to
java.io.IOException
java.io.IOException: Map failed
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:849)
at spark.storage.DiskStore.getBytes(DiskStore.scala:86)
at spark.storage.DiskStore.getValues(DiskStore.scala:92)
at spark.storage.BlockManager.getLocal(BlockManager.scala:284)
at spark.storage.BlockFetcherIterator$$anonfun$
13.apply(BlockManager.scala:1027)
at spark.storage.BlockFetcherIterator$$anonfun$
13.apply(BlockManager.scala:1026)
at scala.collection.mutable.ResizableArray$class.foreach(
ResizableArray.scala:60)
at scala.collection.mutable.ArrayBuffer.foreach(
ArrayBuffer.scala:47)
at spark.storage.BlockFetcherIterator.(
BlockManager.scala:1026)
at spark.storage.BlockManager.getMultiple(BlockManager.scala:478)
at spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.
scala:51)
at spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.
scala:10)
at spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(
CoGroupedRDD.scala:127)
at spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(
CoGroupedRDD.scala:115)
at scala.collection.IndexedSeqOptimized$class.
foreach(IndexedSeqOptimized.scala:34)
at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:38)
at spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:115)
at spark.RDD.computeOrReadCheckpoint(RDD.scala:207)
at spark.RDD.iterator(RDD.scala:196)
at spark.MappedValuesRDD.compute(PairRDDFunctions.scala:704)
at spark.RDD.computeOrReadCheckpoint(RDD.scala:207)
at spark.RDD.iterator(RDD.scala:196)
at spark.FlatMappedValuesRDD.compute(PairRDDFunctions.scala:714)
at spark.RDD.computeOrReadCheckpoint(RDD.scala:207)
at spark.RDD.iterator(RDD.scala:196)
at spark.rdd.MappedRDD.compute(MappedRDD.scala:12)
at spark.RDD.computeOrReadCheckpoint(RDD.scala:207)
at spark.RDD.iterator(RDD.scala:196)
at spark.rdd.MappedRDD.compute(MappedRDD.scala:12)
at spark.RDD.computeOrReadCheckpoint(RDD.scala:207)
at spark.RDD.iterator(RDD.scala:196)
at spark.scheduler.ResultTask.run(ResultTask.scala:77)
at spark.executor.Executor$TaskRunner.run(Executor.scala:100)
at java.util.concurrent.ThreadPoolExecutor.runWorker(
ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(
ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:724)

Any ideas?

-Chris


Re: Spark & Avro in Scala

2013-11-18 Thread Matt Massie
Agree with Eugen that you should used Kryo.

But even better is to embed your Avro objects inside of Kryo. This allows
you to have the benefits of both Avro and Kryo.

Here's example code for using Avro with Kryo.

https://github.com/massie/adam/blob/master/adam-commands/src/main/scala/edu/berkeley/cs/amplab/adam/serialization/AdamKryoRegistrator.scala

You need to register all of you Avro SpecificClasses with Kryo and have it
use the AvroSerializer class to encode/decode them.

e.g.

class AdamKryoRegistrator extends KryoRegistrator {
  override def registerClasses(kryo: Kryo) {
kryo.register(classOf[MySpecificAvroClass], new
AvroSerializer[MySpecificAvroClass]()
  }
}










--
Matt Massie
UC, Berkeley AMPLab
Twitter: @matt_massie ,
@amplab
https://amplab.cs.berkeley.edu/


On Mon, Nov 18, 2013 at 10:45 AM, Eugen Cepoi  wrote:

> Hi Robert,
>
> The problem is that spark uses java serialization requiring serialized
> objects to implement Serializable, AvroKey doesn't.
> As a workaround you can try using 
> kryofor
>  the serialization.
>
> Eugen
>
>
> 2013/11/11 Robert Fink 
>
>> Hi,
>>
>> I am trying to get the following minimal Scala example work: Using Spark
>> to process Avro records. Here's my dummy Avro definition:
>>
>> {
>>   "namespace": "com.avrotest",
>>   "type": "record",
>>   "name": "AvroTest",
>>   "fields": [
>> {"name": "field1", "type": ["string", "null"]}
>>   ]
>> }
>>
>> I experiment with a simple job that creates three AvroTest objects,
>> writes them out to a file through a SparkContext, and then reads in the
>> thus generated Avro file and performs a simple grouping operation:
>>
>> //
>> -
>> import org.apache.spark.SparkContext._
>> import org.apache.avro.specific.SpecificDatumWriter
>> import org.apache.avro.file.DataFileWriter
>> import org.apache.avro._
>> import org.apache.avro.generic._
>> import org.apache.hadoop.mapreduce.Job
>> import com.avrotest.AvroTest
>> import java.io.File
>>
>> object SparkTest{
>>   def main(args: Array[String]) {
>>
>> def avrofile = "output.avro"
>> def sc = new SparkContext("local", "Simple App")
>> val job = new Job()
>>
>> val record1 = new AvroTest()
>> record1.setField1("value1")
>> val record2 = new AvroTest()
>> record2.setField1("value1")
>> val record3 = new AvroTest()
>> record3.setField1("value2")
>>
>> def userDatumWriter = new SpecificDatumWriter[AvroTest]()
>> val dataFileWriter = new DataFileWriter[AvroTest](userDatumWriter)
>>  def file = new File(avrofile)
>> dataFileWriter.create(record1.getSchema(), file)
>> dataFileWriter.append(record1)
>> dataFileWriter.append(record2)
>> dataFileWriter.append(record3)
>> dataFileWriter.close()
>>
>> def rdd = sc.newAPIHadoopFile(
>>   avrofile,
>>   classOf[org.apache.avro.mapreduce.AvroKeyInputFormat[AvroTest]],
>>   classOf[org.apache.avro.mapred.AvroKey[AvroTest]],
>>   classOf[org.apache.hadoop.io.NullWritable],
>>   job.getConfiguration)
>> // rdd.foreach( x => println(x._1.datum.getField1) ) // Prints
>> value1, value1, value2
>> val numGroups= rdd.groupBy(x => x._1.datum.getField1).count()
>>   }
>> }
>> //
>> -
>>
>> I would expect numGroups==2 in the last step, because record1 and record2
>> share the getField1()=="value1", and record3 has getField1() == "value2".
>> However, the script fails to execute with the following error (see below).
>> Can anyone give me a hint what could be wrong in the above code, or post an
>> example of reading from an Avro file and performing some simple
>> computations on the retrieved objects? Thank you so much! Robert.
>>
>> 11650 [pool-109-thread-1] WARN
>> org.apache.avro.mapreduce.AvroKeyInputFormat - Reader schema was not set.
>> Use AvroJob.setInputKeySchema() if desired.
>> 11661 [pool-109-thread-1] INFO
>> org.apache.avro.mapreduce.AvroKeyInputFormat - Using a reader schema equal
>> to the writer schema.
>> 12293 [spark-akka.actor.default-dispatcher-5] INFO
>> org.apache.spark.scheduler.local.LocalTaskSetManager - Loss was due to
>> java.io.NotSerializableException
>> java.io.NotSerializableException: org.apache.avro.mapred.AvroKey
>> at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
>> at
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>> at
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>> at
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>> at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>> at
>> 

Re: Spark & Avro in Scala

2013-11-18 Thread Eugen Cepoi
Hi Robert,

The problem is that spark uses java serialization requiring serialized
objects to implement Serializable, AvroKey doesn't.
As a workaround you can try using
kryofor
the serialization.

Eugen


2013/11/11 Robert Fink 

> Hi,
>
> I am trying to get the following minimal Scala example work: Using Spark
> to process Avro records. Here's my dummy Avro definition:
>
> {
>   "namespace": "com.avrotest",
>   "type": "record",
>   "name": "AvroTest",
>   "fields": [
> {"name": "field1", "type": ["string", "null"]}
>   ]
> }
>
> I experiment with a simple job that creates three AvroTest objects, writes
> them out to a file through a SparkContext, and then reads in the thus
> generated Avro file and performs a simple grouping operation:
>
> //
> -
> import org.apache.spark.SparkContext._
> import org.apache.avro.specific.SpecificDatumWriter
> import org.apache.avro.file.DataFileWriter
> import org.apache.avro._
> import org.apache.avro.generic._
> import org.apache.hadoop.mapreduce.Job
> import com.avrotest.AvroTest
> import java.io.File
>
> object SparkTest{
>   def main(args: Array[String]) {
>
> def avrofile = "output.avro"
> def sc = new SparkContext("local", "Simple App")
> val job = new Job()
>
> val record1 = new AvroTest()
> record1.setField1("value1")
> val record2 = new AvroTest()
> record2.setField1("value1")
> val record3 = new AvroTest()
> record3.setField1("value2")
>
> def userDatumWriter = new SpecificDatumWriter[AvroTest]()
> val dataFileWriter = new DataFileWriter[AvroTest](userDatumWriter)
> def file = new File(avrofile)
> dataFileWriter.create(record1.getSchema(), file)
> dataFileWriter.append(record1)
> dataFileWriter.append(record2)
> dataFileWriter.append(record3)
> dataFileWriter.close()
>
> def rdd = sc.newAPIHadoopFile(
>   avrofile,
>   classOf[org.apache.avro.mapreduce.AvroKeyInputFormat[AvroTest]],
>   classOf[org.apache.avro.mapred.AvroKey[AvroTest]],
>   classOf[org.apache.hadoop.io.NullWritable],
>   job.getConfiguration)
> // rdd.foreach( x => println(x._1.datum.getField1) ) // Prints value1,
> value1, value2
> val numGroups= rdd.groupBy(x => x._1.datum.getField1).count()
>   }
> }
> //
> -
>
> I would expect numGroups==2 in the last step, because record1 and record2
> share the getField1()=="value1", and record3 has getField1() == "value2".
> However, the script fails to execute with the following error (see below).
> Can anyone give me a hint what could be wrong in the above code, or post an
> example of reading from an Avro file and performing some simple
> computations on the retrieved objects? Thank you so much! Robert.
>
> 11650 [pool-109-thread-1] WARN
> org.apache.avro.mapreduce.AvroKeyInputFormat - Reader schema was not set.
> Use AvroJob.setInputKeySchema() if desired.
> 11661 [pool-109-thread-1] INFO
> org.apache.avro.mapreduce.AvroKeyInputFormat - Using a reader schema equal
> to the writer schema.
> 12293 [spark-akka.actor.default-dispatcher-5] INFO
> org.apache.spark.scheduler.local.LocalTaskSetManager - Loss was due to
> java.io.NotSerializableException
> java.io.NotSerializableException: org.apache.avro.mapred.AvroKey
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
> at
> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
> at
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:27)
> at
> org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.write(DiskStore.scala:109)
> at
> org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:152)
> at
> org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:149)
> at scala.collection.Iterator$class.foreach(Iterator.scala:772)
> at scala.collection.Iterator$$anon$19.foreach(Iterator.scala:399)
> at
> org.apache.spark.scheduler.Shu

Re: code review - splitting columns

2013-11-18 Thread Tom Vacek
This is in response to your question about something in the API that
already does this.  You might want to keep your eye on MLI (
http://www.mlbase.org), which is columnar table written for machine
learning but applicable to a lot of problems.  It's not perfect right now.


On Fri, Nov 15, 2013 at 7:56 PM, Aaron Davidson  wrote:

> Regarding only your last point, you could always split backwards to avoid
> having to worry about updated indices (i.e., split the highest index column
> first). But if you're additionally worried about efficiency, a combined
> approach could make more sense to avoid making two full passes on the data.
>
> Otherwise, I don't see anything particularly amiss here, but I'm no expert.
>
>
> On Wed, Nov 13, 2013 at 3:00 PM, Philip Ogren wrote:
>
>> Hi Spark community,
>>
>> I learned a lot the last time I posted some elementary Spark code here.
>>  So, I thought I would do it again.  Someone politely tell me offline if
>> this is noise or unfair use of the list!  I acknowledge that this borders
>> on asking Scala 101 questions
>>
>> I have an RDD[List[String]] corresponding to columns of data and I want
>> to split one of the columns using some arbitrary function and return an RDD
>> updated with the new columns.  Here is the code I came up with.
>>
>> def splitColumn(columnsRDD: RDD[List[String]], columnIndex: Int,
>> numSplits: Int, splitFx: String => List[String]): RDD[List[String]] = {
>>
>> def insertColumns(columns: List[String]) : List[String] = {
>>   val split = columns.splitAt(columnIndex)
>>   val left = split._1
>>   val splitColumn = split._2.head
>>   val splitColumns = splitFx(splitColumn).padTo(numSplits,
>> "").take(numSplits)
>>   val right = split._2.tail
>>   left ++ splitColumns ++ right
>> }
>>
>> columnsRDD.map(columns => insertColumns(columns))
>>   }
>>
>> Here is a simple test that demonstrates the behavior:
>>
>>   val spark = new SparkContext("local", "test spark")
>>   val testStrings = List(List("1.2", "a b"), List("3.4", "c d e"),
>> List("5.6", "f"))
>>   var testRDD: RDD[List[String]] = spark.parallelize(testStrings)
>>   testRDD = splitColumn(testRDD, 0, 2, _.split("\\.").toList)
>>   testRDD = splitColumn(testRDD, 2, 2, _.split(" ").toList) //Line 5
>>   val actualStrings = testRDD.collect.toList
>>   assertEquals(4, actualStrings(0).length)
>>   assertEquals("1, 2, a, b", actualStrings(0).mkString(", "))
>>   assertEquals(4, actualStrings(1).length)
>>   assertEquals("3, 4, c, d", actualStrings(1).mkString(", "))
>>   assertEquals(4, actualStrings(2).length)
>>   assertEquals("5, 6, f, ", actualStrings(2).mkString(", "))
>>
>>
>> My first concern about this code is that I'm missing out on something
>> that does exactly this in the API.  This seems like such a common use case
>> that I would not be surprised if there's a readily available way to do this.
>>
>> I'm a little uncertain about the typing of splitColumn - i.e. the first
>> parameter and the return value.  It seems like a general solution wouldn't
>> require every column to be a String value.  I'm also annoyed that line 5 in
>> the test code requires that I use an updated index to split what was
>> originally the second column.  This suggests that perhaps I should split
>> all the columns that need splitting in one function call - but it seems
>> like doing that would require an unwieldy function signature.
>>
>> Any advice or insight is appreciated!
>>
>> Thanks,
>> Philip
>>
>
>


Re: App master failed to find application jar in the master branch on YARN

2013-11-18 Thread guojc
Hi Tom,
   I'm on Hadoop 2.05.  I can launch application spark 0.8 release
normally. However I switch to git master branch version with application
built with it, I got the jar not found exception and same happens to the
example application. I have tried both file:// protocol and hdfs://
protocol with jar in local file system and hdfs respectively, and even
tried jar list parameter when new spark context.  The exception is slightly
different for hdfs protocol and local file path. My application launch
command is

 
SPARK_JAR=/home/work/guojiacheng/spark/assembly/target/scala-2.9.3/spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop2.0.5-alpha.jar
/home/work/guojiacheng/spark/spark-class
 org.apache.spark.deploy.yarn.Client --jar
/home/work/guojiacheng/spark-auc/target/scala-2.9.3/SparkAUC-assembly-0.1.jar
--class  myClass.SparkAUC --args -c --args yarn-standalone  --args -i
--args hdfs://{hdfs_host}:9000/user/work/guojiacheng/data --args -m --args
hdfs://{hdfs_host}:9000/user/work/guojiacheng/model_large --args -o --args
hdfs://{hdfs_host}:9000/user/work/guojiacheng/score --num-workers 60
 --master-memory 6g --worker-memory 7g --worker-cores 1

And my build command is SPARK_HADOOP_VERSION=2.0.5-alpha SPARK_YARN=true
sbt/sbt assembly

Only thing I can think of might be related is on each cluster node, it has
a env SPARK_HOME point to a copy of 0.8 version's position, and its bin
fold is in Path environment variable. And 0.9 version is not there.  It was
something left over, when cluster was setup.  But I don't know whether it
is related, as my understand is the yarn version try to distribute spark
through yarn.

hdfs version error message:

 appDiagnostics: Application application_1384588058297_0056 failed
1 times due to AM Container for appattempt_1384588058297_0056_01 exited
with  exitCode: -1000 due to: RemoteTrace:
java.io.FileNotFoundException: File
file:/home/work/.sparkStaging/application_1384588058297_0056/SparkAUC-assembly-0.1.jar
does not exist

local version error message.
appDiagnostics: Application application_1384588058297_0066 failed 1 times
due to AM Container for appattempt_1384588058297_0066_01 exited with
 exitCode: -1000 due to: java.io.FileNotFoundException: File
file:/home/work/guojiacheng/spark/assembly/target/scala-2.9.3/spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop2.0.5-alpha.jar
does not exist

Best Regards,
Jiacheng GUo



On Mon, Nov 18, 2013 at 10:34 PM, Tom Graves  wrote:

> Hey Jiacheng Guo,
>
> do you have SPARK_EXAMPLES_JAR env variable set?  If you do, you have to
> add the --addJars parameter to the yarn client and point to the spark
> examples jar.  Or just unset SPARK_EXAMPLES_JAR env variable.
>
> You should only have to set SPARK_JAR env variable.
>
> If that isn't the issue let me know the build command you used and hadoop
> version, and your defaultFs or hadoop.
>
> Tom
>
>
>   On Saturday, November 16, 2013 2:32 AM, guojc  wrote:
>  hi,
>After reading about the exiting progress in consolidating shuffle, I'm
> eager to trying out the last master branch. However up to launch the
> example application, the job failed with prompt the app master failed to
> find the target jar. appDiagnostics: Application
> application_1384588058297_0017 failed 1 times due to AM Container for
> appattempt_1384588058297_0017_01 exited with  exitCode: -1000 due to:
> java.io.FileNotFoundException: File
> file:/${my_work_dir}/spark/examples/target/scala-2.9.3/spark-examples-assembly-0.9.0-incubating-SNAPSHOT.jar
> does not exist.
>
>   Is there any change on how to launch a yarn job now?
>
> Best Regards,
> Jiacheng Guo
>
>
>
>


Fast Data Processing with Spark

2013-11-18 Thread R. Revert
Hello

Does any one read the Fast Data Processing with Spark  book (
http://www.amazon.com/Fast-Processing-Spark-Holden-Karau/dp/1782167064/ref=sr_1_1?ie=UTF8&qid=1384791032&sr=8-1&keywords=fast+spark+data+processing
)

any review or opinions about the material?
 because im thinking to buy the book so i can learn more about spark but i
didnt see any comment's

Thanks



*#__*
*Atte.*
*Rafael R.*


Re: foreachPartition in Java

2013-11-18 Thread Yadid Ayzenberg

Great, I will use mapPartitions instead.
Thanks for the advice,

Yadid


On 11/17/13 8:13 PM, Aaron Davidson wrote:
Also, in general, you can workaround shortcomings in the Java API by 
converting to a Scala RDD (using JavaRDD's rdd() method). The API 
tends to be much clunkier since you have to jump through some hoops to 
talk to a Scala API in Java, though. In this case, JavaRDD's 
mapPartition() method will likely be the cleanest solution as Patrick 
said.



On Sun, Nov 17, 2013 at 5:03 PM, Patrick Wendell > wrote:


Can you just call mapPartitions and ignore the result?

- Patrick

On Sun, Nov 17, 2013 at 4:45 PM, Yadid Ayzenberg
mailto:ya...@media.mit.edu>> wrote:
> Hi,
>
> According to the API, foreachPartition() is not yet implemented
in Java.
> Are there any workarounds to get the same functionality ?
> I have a non serializable DB connection and instantiating it is
pretty
> expensive, so I prefer to do it on a per partition basis.
>
> thanks,
> Yadid
>






TeraSort on Spark

2013-11-18 Thread Rivera, Dario
Hello spark community.
I wanted to ask if any work has been done on porting TeraSort (Tera 
Gen/Sort/Validate) from Hadoop to Spark on EC2/EMR
I am looking for some guidance on lessons learned from this or similar efforts 
as we are trying to do some benchmarking on some of the newer EC2 instances to 
determine how to optimize in-memory processing of these instances with Spark 
for some of AWS' customers looking to move to Spark for their data processing 
workloads.

Any guidance the community can provide on this effort is greatly appreciated!

Thanks,

Dario Rivera
Solutions Architect
Cell: 571-205-2731
Email: dar...@amazon.com

[AWS Graphic]

<>

Re: App master failed to find application jar in the master branch on YARN

2013-11-18 Thread Tom Graves
Hey Jiacheng Guo,

do you have SPARK_EXAMPLES_JAR env variable set?  If you do, you have to add 
the --addJars parameter to the yarn client and point to the spark examples jar. 
 Or just unset SPARK_EXAMPLES_JAR env variable.

You should only have to set SPARK_JAR env variable.  

If that isn't the issue let me know the build command you used and hadoop 
version, and your defaultFs or hadoop.

Tom



On Saturday, November 16, 2013 2:32 AM, guojc  wrote:
 
hi,
   After reading about the exiting progress in consolidating shuffle, I'm eager 
to trying out the last master branch. However up to launch the example 
application, the job failed with prompt the app master failed to find the 
target jar. appDiagnostics: Application application_1384588058297_0017 failed 1 
times due to AM Container for appattempt_1384588058297_0017_01 exited with  
exitCode: -1000 due to: java.io.FileNotFoundException: File 
file:/${my_work_dir}/spark/examples/target/scala-2.9.3/spark-examples-assembly-0.9.0-incubating-SNAPSHOT.jar
 does not exist.

  Is there any change on how to launch a yarn job now?

Best Regards,
Jiacheng Guo

Re: interesting question on quora

2013-11-18 Thread Koert Kuipers
the core of hadoop is currently hdfs + mapreduce. the more appropriate
question is if it will become hdfs + spark. so will spark overtake
mapreduce as the dominant computational engine? its a very serious
candidate for that i think. it can do many things mapreduce cannot do, and
has an awesome api.

it's missing a few things to truly replace mapreduce:
* handling data that does not fit in memory per key/reducer
* security support (integrate with hdfs authorization/authentication)
* scalability??? (has spark been tested on 1000 machines)


On Mon, Nov 18, 2013 at 12:38 AM, jamal sasha  wrote:

> I found this interesting question on quora.. and thought of sharing here.
> https://www.quora.com/Apache-Hadoop/Will-spark-ever-overtake-hadoop
> So.. is spark missing any capabilty?
>
>


How to efficiently manage resources across a cluster and avoid GC overhead exceeded errors?

2013-11-18 Thread ioannis.deligiannis
Hi,

I have a cluster of 20 servers, each having 24 cores and 30GB of RAM allocated 
to Spark. Spark runs in a STANDALONE mode.
I am trying to load some 200+GB files and cache the rows using ".cache()".

What I would like to do is the following: (ATM from the scala console)
-Evenly load the files across the 20 servers (preferably using all 20*24 cores 
for the load)
-Verify that data are loaded as NODE_LOCAL
Looking into the :4040 console, I see in some runs a lot of NODE_LOCAL but in 
others a lot of ANY. Is there a way to identify what is that TID doing in ANY

If I allocate less than ~double the memory I need, I get an OutOfMemory error.

If I use the textFile (int) parameter,

* i.e. "sc.textFile("hdfs://...",20)
Then the error goes away.

On the other hand, if I allocate enough memory, I can see from the admin 
console that some of my workers have too much load and some other less than 
half. I understand that I could use a partitioner to balance my data but I 
wouldn't expect an OOME if nodes are significantly under-used. Am I missing 
something?

Thanks,

Ioannis Deligiannis


___

This message is for information purposes only, it is not a recommendation, 
advice, offer or solicitation to buy or sell a product or service nor an 
official confirmation of any transaction. It is directed at persons who are 
professionals and is not intended for retail customer use. Intended for 
recipient only. This message is subject to the terms at: 
www.barclays.com/emaildisclaimer.

For important disclosures, please see: 
www.barclays.com/salesandtradingdisclaimer regarding market commentary from 
Barclays Sales and/or Trading, who are active market participants; and in 
respect of Barclays Research, including disclosures relating to specific 
issuers, please see http://publicresearch.barclays.com.

___


Re: PySpark script works itself, but fails when called from other script

2013-11-18 Thread Andrei
I've tried adding task.py to pyFiles during SparkContext creation and it
worked perfectly. Thanks for your help!

If you need some more information for further investigation, here's what
I've noticed. Without explicitly adding file to SparkContext, only
functions that are defined in main module run by PySpark can be passed to
distributed jobs. E.g. if I define myfunc() in runner.py (and run
runner.py), it works pretty well. But if I define myfunc() in task.py (and
still run runner.py), it fails as I've described above. I've posted stderr
from failed executor here , but essentially
it just says that Python worker crashed without any reference to the cause.
For the sake of completeness, here's also console
output
.

To make it clear: all these errors occur only in my initial setup, adding
"task.py" to SparkContext fixes it anyway. Hope this helps.

Thanks,
Andrei





On Sat, Nov 16, 2013 at 2:12 PM, Andrei  wrote:

> Hi,
>
> thanks for your replies. I'm out of office now, so I will check it out on
> Monday morning, but guess about serialization/deserialization looks
> plausible.
>
> Thanks,
> Andrei
>
>
> On Sat, Nov 16, 2013 at 11:11 AM, Jey Kottalam wrote:
>
>> Hi Andrei,
>>
>> Could you please post the stderr logfile from the failed executor? You
>> can find this in the "work" subdirectory of the worker that had the failed
>> task. You'll need the executor id to find the corresonding stderr file.
>>
>> Thanks,
>> -Jey
>>
>>
>> On Friday, November 15, 2013, Andrei wrote:
>>
>>> I have 2 Python modules/scripts - task.py and runner.py. First one
>>> (task.py) is a little Spark job and works perfectly well by itself.
>>> However, when called from runner.py with exactly the same arguments, it
>>> fails with only useless message (both - in terminal and worker logs).
>>>
>>> org.apache.spark.SparkException: Python worker exited unexpectedly
>>> (crashed)
>>>
>>> Below there's code for both - task.py and runner.py:
>>>
>>> task.py
>>> ---
>>>
>>> #!/usr/bin/env pyspark
>>> from __future__ import print_function
>>> from pyspark import SparkContext
>>>
>>> def process(line):
>>> return line.strip()
>>>
>>> def main(spark_master, path):
>>> sc = SparkContext(spark_master, 'My Job')
>>> rdd = sc.textFile(path)
>>> rdd = rdd.map(process) # this line causes troubles when called
>>> from runner.py
>>> count = rdd.count()
>>> print(count)
>>>
>>> if __name__ == '__main__':
>>> main('spark://spark-master-host:7077',
>>> 'hdfs://hdfs-namenode-host:8020/path/to/file.log')
>>>
>>>
>>> runner.py
>>> -
>>>
>>> #!/usr/bin/env pyspark
>>>
>>> import task
>>>
>>> if __name__ == '__main__':
>>> task.main('spark://spark-master-host:7077',
>>>'hdfs://hdfs-namenode-host:8020/path/to/file.log')
>>>
>>>
>>> ---
>>>
>>> So, what's the difference between calling PySpark-enabled script
>>> directly and as Python module? What are good rules for writing multi-module
>>> Python programs with Spark?
>>>
>>> Thanks,
>>> Andrei
>>>
>>
>