GraphX. How to remove vertex or edge?

2014-04-30 Thread Николай Кинаш
Hello.

How to remove vertex or edges from graph in GraphX?


How to handle this situation: Huge File Shared by All maps and Each Computer Has one copy?

2014-04-30 Thread PengWeiPRC
Hi there,

I was wondering if somebody could give me some suggestions about how to
handle this situation:

  I have a spark program, in which it reads a 6GB file first (Not RDD)
locally, and then do the map/reduce tasks. This 6GB file contains 
information that will be shared by all the map tasks. Previously, I handled
it using the broadcast function in Spark, which is like this:
global_file = fileRead("filename")
global_file.broadcast()
rdd.map(ele => MapFunc(ele))

  However, when running the spark program with a cluster of multiple
computers, I found that the remote nodes waited forever for the broadcasting
of the global_file. I think that it may not be a good solution to have each
map task to load the global file by themselves, which would incur huge
overhead.

  Actually, we have this global file in each node of our cluster. The ideal
behavior I hope is that for each node, they can read this global file only
from its local disk (and stay in memory), and then for all the map/reduce
tasks scheduled to this node, it can share that data. Hence, the global file
is neither like broadcasting variables, which is shared by all map/reduce
tasks, nor private variables only seen by one map task. It is shared
node-widely, which is read in each node only one time and shared by all the
tasks mapped to this node.

Could anybody tell me how to program in Spark to handle it? Thanks so much.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-handle-this-situation-Huge-File-Shared-by-All-maps-and-Each-Computer-Has-one-copy-tp5139.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: something about memory usage

2014-04-30 Thread wxhsdp
Hi, daniel, thx for your help

  i'am just running 1 core slaves. but still i can not work it out. the
executor does the task one by one,
  task0, task1, task2...

  how can i get the memory task1 used with so many threads running in the
background, also with GC.

 

  it's not accurate to use Runtime.getRuntime().freeMemory



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/something-about-memory-usage-tp5107p5137.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Reading multiple S3 objects, transforming, writing back one

2014-04-30 Thread Patrick Wendell
This is a consequence of the way the Hadoop files API works. However,
you can (fairly easily) add code to just rename the file because it
will always produce the same filename.

(heavy use of pseudo code)

dir = "/some/dir"
rdd.coalesce(1).saveAsTextFile(dir)
f = new File(dir + "part-0")
f.moveTo("somewhere else")
dir.remove()

It might be cool to add a utility called `saveAsSingleFile` or
something that does this for you. In fact probably we should have
called saveAsTextfile "saveAsTextFiles" to make it more clear...

On Wed, Apr 30, 2014 at 2:00 PM, Peter  wrote:
> Thanks Nicholas, this is a bit of a shame, not very practical for log roll
> up for example when every output needs to be in it's own "directory".
> On Wednesday, April 30, 2014 12:15 PM, Nicholas Chammas
>  wrote:
> Yes, saveAsTextFile() will give you 1 part per RDD partition. When you
> coalesce(1), you move everything in the RDD to a single partition, which
> then gives you 1 output file.
> It will still be called part-0 or something like that because that's
> defined by the Hadoop API that Spark uses for reading to/writing from S3. I
> don't know of a way to change that.
>
>
> On Wed, Apr 30, 2014 at 2:47 PM, Peter  wrote:
>
> Ah, looks like RDD.coalesce(1) solves one part of the problem.
> On Wednesday, April 30, 2014 11:15 AM, Peter 
> wrote:
> Hi
>
> Playing around with Spark & S3, I'm opening multiple objects (CSV files)
> with:
>
> val hfile = sc.textFile("s3n://bucket/2014-04-28/")
>
> so hfile is a RDD representing 10 objects that were "underneath" 2014-04-28.
> After I've sorted and otherwise transformed the content, I'm trying to write
> it back to a single object:
>
>
> sortedMap.values.map(_.mkString(",")).saveAsTextFile("s3n://bucket/concatted.csv")
>
> unfortunately this results in a "folder" named concatted.csv with 10 objects
> underneath, part-0 .. part-00010, corresponding to the 10 original
> objects loaded.
>
> How can I achieve the desired behaviour of putting a single object named
> concatted.csv ?
>
> I've tried 0.9.1 and 1.0.0-RC3.
>
> Thanks!
> Peter
>
>
>
>
>
>
>


same partition id means same location?

2014-04-30 Thread wxhsdp
Hi,

  i'am just reviewing "advanced spark features". it's about the pagerank
example.

  it said "any shuffle operation on two RDDs will take on the partitioner of
one of them, if one is set".

  so first we partition the Links by hashPartitioner, then we join the Links
and Ranks0. Ranks0 will take 
  the hashPartitioner according to the document. the following reduceByKey
operation also respect the
  hashPartitioner, so when we join Links and Ranks1, there is no shuffle at
all.

  does that mean partitions of different RDDs with the same id will go
exactly to the same location even
  if the different RDDs locates at different nodes originally?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/same-partition-id-means-same-location-tp5136.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


CDH 5.0 and Spark 0.9.0

2014-04-30 Thread Paul Schooss
Hello,

So I was unable to run the following commands from the spark shell with CDH
5.0 and spark 0.9.0, see below.

Once I removed the property


io.compression.codec.lzo.class
com.hadoop.compression.lzo.LzoCodec
true


from the core-site.xml on the node, the spark commands worked. Is there a
specific setup I am missing?

scala> var log = sc.textFile("hdfs://jobs-ab-hnn1//input/core-site.xml")
14/04/30 22:43:16 INFO MemoryStore: ensureFreeSpace(78800) called with
curMem=150115, maxMem=308713881
14/04/30 22:43:16 INFO MemoryStore: Block broadcast_1 stored as values to
memory (estimated size 77.0 KB, free 294.2 MB)
14/04/30 22:43:16 WARN Configuration: mapred-default.xml:an attempt to
override final parameter: mapreduce.tasktracker.cache.local.size; Ignoring.
14/04/30 22:43:16 WARN Configuration: yarn-site.xml:an attempt to override
final parameter: mapreduce.output.fileoutputformat.compress.type; Ignoring.
14/04/30 22:43:16 WARN Configuration: hdfs-site.xml:an attempt to override
final parameter: mapreduce.map.output.compress.codec; Ignoring.
log: org.apache.spark.rdd.RDD[String] = MappedRDD[3] at textFile at
:12

scala> log.count()
14/04/30 22:43:03 WARN JobConf: The variable mapred.child.ulimit is no
longer used.
14/04/30 22:43:04 WARN Configuration: mapred-default.xml:an attempt to
override final parameter: mapreduce.tasktracker.cache.local.size; Ignoring.
14/04/30 22:43:04 WARN Configuration: yarn-site.xml:an attempt to override
final parameter: mapreduce.output.fileoutputformat.compress.type; Ignoring.
14/04/30 22:43:04 WARN Configuration: hdfs-site.xml:an attempt to override
final parameter: mapreduce.map.output.compress.codec; Ignoring.
java.lang.IllegalArgumentException: java.net.UnknownHostException:
jobs-a-hnn1
at
org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:377)
at
org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:237)
at
org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:141)
at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:576)
at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:521)
at
org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:146)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2397)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:89)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2431)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2413)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:368)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
at
org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:221)
at
org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:270)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:140)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:902)
at org.apache.spark.rdd.RDD.count(RDD.scala:720)
at $iwC$$iwC$$iwC$$iwC.(:15)
at $iwC$$iwC$$iwC.(:20)
at $iwC$$iwC.(:22)
at $iwC.(:24)
at (:26)
at .(:30)
at .()
at .(:7)
at .()
at $print()
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:772)
at
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1040)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:609)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:640)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:604)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:795)
at
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:840)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:752)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:600)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:607)
at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:610)
at
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:935)
at
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:883)
at
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:883)
at
scala.tools.nsc.util.ScalaC

Re: Any advice for using big spark.cleaner.delay value in Spark Streaming?

2014-04-30 Thread Tathagata Das
Whatever is inside the mapPartition get executed on workers. If that
mapPartition function refers to a global variable in the driver, then that
variable get serialized and sent to the workers as well. So the hll
(defined in lline 63) is an empty HyperLogLogMonoid, that gets serialized
and sent to workers to execute the mapPartition in lines 67-69. In each
batch of data, each user id is converted to a HLL object (using the empty
hll object) and then the HLL objects from all the partitions are reduced
using the "+" in line 69 to a single HLL (i.e. a RDD  containing one
element, which is the HLL of the batch).

The subsequent foreachRDD at line 73 get that HLL from the workers to the
driver and merges it with the running globalHll.

I agree, there is probably a better way/ more intuitive way to write this
example. :)

TD


On Wed, Apr 30, 2014 at 3:45 PM, buremba  wrote:

> Thanks for your reply. Sorry for the late response, I wanted to do some
> tests
> before writing back.
>
> The counting part works similar to your advice. I specify a minimum
> interval
> like 1 minute, in each hour, day etc. it sums all counters of the current
> children intervals.
>
> However when I want to "count unique visitors of the month" things get much
> more complex. I need to merge 30 sets which contains visitor id's and each
> of them has more than a few hundred thousands of elements. Merging sets may
> be still the best option rather than keeping another Set for last month
> though, however I'm not sure because when there are many intersections it
> may be inefficient.
>
> BTW, I have one more question. The HLL example in repository seems
> confusing
> to me. How Spark handles global variable usages in mapPartitions method?
> (
> https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala#L68
> )
> I'm also a newbie but I thought the map and mapPartitions methods are
> similar to Hadoop's map methods so when we run the example on a cluster how
> an external node reaches a global variable in a single node? Does Spark
> replicate HyperLogLogMonoid instances across the cluster?
>
> Thanks,
> Burak Emre
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Any-advice-for-using-big-spark-cleaner-delay-value-in-Spark-Streaming-tp4895p5131.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: Strange lookup behavior. Possible bug?

2014-04-30 Thread Yadid Ayzenberg

Dear Sparkers,

Has anyone got any insight on this ? I am really stuck.

Yadid


On 4/28/14, 11:28 AM, Yadid Ayzenberg wrote:

Thanks for your answer.
I tried running on a single machine - master and worker on one host. I 
get exactly the same results.
Very little CPU activity on the machine in question. The web UI shows 
a single task and its state is RUNNING. it will remain so indefinitely.

I have a single partition, and its size is 1626.2 MB

Currently the RDD has 200 elements, but I have tried it with 20 and 
the behavior is the same.

The key is of the form:  (0,52fb9aff3004f07d1a87c8ea)
Where the first number in the tuple is always 0, and the second one is 
some string that can appear more than once.


The RDD is created by using the newAPIHadoopRDD.

Any additional info I can provide?

Yadid




On 4/28/14 10:46 AM, Daniel Darabos wrote:
That is quite mysterious, and I do not think we have enough 
information to answer. JavaPairRDD.lookup() works 
fine on a remote Spark cluster:


$ MASTER=spark://localhost:7077 bin/spark-shell
scala> val rdd = 
org.apache.spark.api.java.JavaPairRDD.fromRDD(sc.makeRDD(0 until 10, 
3).map(x => ((x%3).toString, (x, x%3

scala> rdd.lookup("1")
res0: java.util.List[(Int, Int)] = [(1,1), (4,1), (7,1)]

You suggest maybe the driver does not receive a message from an 
executor. I guess it is likely possible, though it has not happened 
to me. I would recommend running on a single machine in the 
standalone setup. Start the master and worker on the same machine, 
run the application there too. This should eliminate network 
configuration problems.


If you still see the issue, I'd check whether the task has really 
completed. What do you see on the web UI? Is the executor using CPU?


Good luck.




On Mon, Apr 28, 2014 at 2:35 AM, Yadid Ayzenberg > wrote:


Can someone please suggest how I can move forward with this?
My spark version is 0.9.1.
The big challenge is that this issue is not recreated when
running in local mode. What could be the difference?

I would really appreciate any pointers, as currently the the job
just hangs.




On 4/25/14, 7:37 PM, Yadid Ayzenberg wrote:

Some additional information - maybe this rings a bell with
someone:

I suspect this happens when the lookup returns more than one
value.
For 0 and 1 values, the function behaves as you would expect.

Anyone ?



On 4/25/14, 1:55 PM, Yadid Ayzenberg wrote:

Hi All,

Im running a lookup on a JavaPairRDD.
When running on local machine - the lookup is
successfull. However, when running a standalone cluster
with the exact same dataset - one of the tasks never ends
(constantly in RUNNING status).
When viewing the worker log, it seems that the task has
finished successfully:

14/04/25 13:40:38 INFO BlockManager: Found block rdd_2_0
locally
14/04/25 13:40:38 INFO Executor: Serialized size of
result for 2 is 10896794
14/04/25 13:40:38 INFO Executor: Sending result for 2
directly to driver
14/04/25 13:40:38 INFO Executor: Finished task ID 2

But it seems the driver is not aware of this, and hangs
indefinitely.

If I execute a count priot to the lookup - I get the
correct number which suggests that the cluster is
operating as expected.

The exact same scenario works with a different type of
key (Tuple2): JavaPairRDD.

Any ideas on how to debug this problem ?

Thanks,

Yadid










update of RDDs

2014-04-30 Thread narayanabhatla NarasimhaMurthy
In our application, we need distributed RDDs containing key-value maps. We
have operations that update RDDs by way of adding entries to the map, delete
entries from the map as well as update value part of maps.
We also have map reduce functions that operate on the RDDs.The questions are
the following.
1. Can RDDs be updated? if Yes, what rae the methods? 
2. If we update RDDs, will it happen in place or does it create new RDDs
with almost double the original RDD size (original+newly created RDD)?
Thank you very much.
N.N.Murthy



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/update-of-RDDs-tp5132.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Any advice for using big spark.cleaner.delay value in Spark Streaming?

2014-04-30 Thread buremba
Thanks for your reply. Sorry for the late response, I wanted to do some tests
before writing back.

The counting part works similar to your advice. I specify a minimum interval
like 1 minute, in each hour, day etc. it sums all counters of the current
children intervals.

However when I want to "count unique visitors of the month" things get much
more complex. I need to merge 30 sets which contains visitor id's and each
of them has more than a few hundred thousands of elements. Merging sets may
be still the best option rather than keeping another Set for last month
though, however I'm not sure because when there are many intersections it
may be inefficient.

BTW, I have one more question. The HLL example in repository seems confusing
to me. How Spark handles global variable usages in mapPartitions method?
(https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala#L68)
 
I'm also a newbie but I thought the map and mapPartitions methods are
similar to Hadoop's map methods so when we run the example on a cluster how
an external node reaches a global variable in a single node? Does Spark
replicate HyperLogLogMonoid instances across the cluster?

Thanks,
Burak Emre



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Any-advice-for-using-big-spark-cleaner-delay-value-in-Spark-Streaming-tp4895p5131.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


My talk on "Spark: The Next Top (Compute) Model"

2014-04-30 Thread Dean Wampler
I meant to post this last week, but this is a talk I gave at the Philly ETE
conf. last week:

http://www.slideshare.net/deanwampler/spark-the-next-top-compute-model

Also here:

http://polyglotprogramming.com/papers/Spark-TheNextTopComputeModel.pdf

dean

-- 
Dean Wampler, Ph.D.
Typesafe
@deanwampler
http://typesafe.com
http://polyglotprogramming.com


[ANN]: Scala By The Bay Conference ( aka Silicon Valley Scala Symposium)

2014-04-30 Thread Chester Chen
Hi,  
     This is not related to Spark. But I thought you might be interested in the 
second SF Scala conference is coming this August. The SF Scala conference was 
called "Sillicon Valley Scala Symposium" last year.  From now on, it will be 
known as "Scala By The Bay". 

http://www.scalabythebay.org

-- watch that space for announcements and the CFP!

Chester

Re: NoSuchMethodError from Spark Java

2014-04-30 Thread Marcelo Vanzin
Hi,

One thing you can do is set the spark version your project depends on
to "1.0.0-SNAPSHOT" (make sure it matches the version of Spark you're
building); then before building your project, run "sbt publishLocal"
on the Spark tree.

On Wed, Apr 30, 2014 at 12:11 AM, wxhsdp  wrote:
> i fixed it.
>
> i make my sbt project depend on
> spark/trunk/assembly/target/scala-2.10/spark-assembly-1.0.0-SNAPSHOT-hadoop1.0.4.jar
> and it works
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/NoSuchMethodError-from-Spark-Java-tp4937p5096.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.



-- 
Marcelo


Re: What is Seq[V] in updateStateByKey?

2014-04-30 Thread Tathagata Das
Yeah, I remember changing fold to sum in a few places, probably in
testsuites, but missed this example I guess.



On Wed, Apr 30, 2014 at 1:29 PM, Sean Owen  wrote:

> S is the previous count, if any. Seq[V] are potentially many new
> counts. All of them have to be added together to keep an accurate
> total.  It's as if the count were 3, and I tell you I've just observed
> 2, 5, and 1 additional occurrences -- the new count is 3 + (2+5+1) not
> 1 + 1.
>
>
> I butted in since I'd like to ask a different question about the same
> line of code. Why:
>
>   val currentCount = values.foldLeft(0)(_ + _)
>
> instead of
>
>   val currentCount = values.sum
>
> This happens a few places in the code. sum seems equivalent and likely
> quicker. Same with things like "filter(_ == 200).size" instead of
> "count(_ == 200)"... pretty trivial but hey.
>
>
> On Wed, Apr 30, 2014 at 9:23 PM, Adrian Mocanu
>  wrote:
> > Hi TD,
> >
> > Why does the example keep recalculating the count via fold?
> >
> > Wouldn’t it make more sense to get the last count in values Seq and add
> 1 to
> > it and save that as current count?
> >
> >
> >
> > From what Sean explained I understand that all values in Seq have the
> same
> > key. Then when a new value for that key is found it is added to this Seq
> > collection and the update function is called.
> >
> >
> >
> > Is my understanding correct?
>


Re: Reading multiple S3 objects, transforming, writing back one

2014-04-30 Thread Peter
Thanks Nicholas, this is a bit of a shame, not very practical for log roll up 
for example when every output needs to be in it's own "directory". 
On Wednesday, April 30, 2014 12:15 PM, Nicholas Chammas 
 wrote:
 
Yes, saveAsTextFile() will give you 1 part per RDD partition. When you 
coalesce(1), you move everything in the RDD to a single partition, which then 
gives you 1 output file. 
It will still be called part-0 or something like that because that’s 
defined by the Hadoop API that Spark uses for reading to/writing from S3. I 
don’t know of a way to change that.



On Wed, Apr 30, 2014 at 2:47 PM, Peter  wrote:

Ah, looks like RDD.coalesce(1) solves one part of the problem.
>On Wednesday, April 30, 2014 11:15 AM, Peter  wrote:
> 
>Hi
>
>
>Playing around with Spark & S3, I'm opening multiple objects (CSV files) with:
>
>
>    val hfile = sc.textFile("s3n://bucket/2014-04-28/")
>
>
>so hfile is a RDD representing 10 objects that were "underneath" 2014-04-28. 
>After I've sorted and otherwise transformed the content, I'm trying to write 
>it back to a single object:
>
>
>    
>sortedMap.values.map(_.mkString(",")).saveAsTextFile("s3n://bucket/concatted.csv")
>
>
>unfortunately this results in a "folder" named concatted.csv with 10 objects 
>underneath, part-0 .. part-00010, corresponding to the 10 original objects 
>loaded. 
>
>
>How can I achieve the desired behaviour of putting a single object named 
>concatted.csv ?
>
>
>I've tried 0.9.1 and 1.0.0-RC3. 
>
>
>Thanks!
>Peter
>
>
>
>
>
>

Re: What is Seq[V] in updateStateByKey?

2014-04-30 Thread Sean Owen
S is the previous count, if any. Seq[V] are potentially many new
counts. All of them have to be added together to keep an accurate
total.  It's as if the count were 3, and I tell you I've just observed
2, 5, and 1 additional occurrences -- the new count is 3 + (2+5+1) not
1 + 1.


I butted in since I'd like to ask a different question about the same
line of code. Why:

  val currentCount = values.foldLeft(0)(_ + _)

instead of

  val currentCount = values.sum

This happens a few places in the code. sum seems equivalent and likely
quicker. Same with things like "filter(_ == 200).size" instead of
"count(_ == 200)"... pretty trivial but hey.


On Wed, Apr 30, 2014 at 9:23 PM, Adrian Mocanu
 wrote:
> Hi TD,
>
> Why does the example keep recalculating the count via fold?
>
> Wouldn’t it make more sense to get the last count in values Seq and add 1 to
> it and save that as current count?
>
>
>
> From what Sean explained I understand that all values in Seq have the same
> key. Then when a new value for that key is found it is added to this Seq
> collection and the update function is called.
>
>
>
> Is my understanding correct?


RE: What is Seq[V] in updateStateByKey?

2014-04-30 Thread Adrian Mocanu
Hi TD,
Why does the example keep recalculating the count via fold?
Wouldn’t it make more sense to get the last count in values Seq and add 1 to it 
and save that as current count?

From what Sean explained I understand that all values in Seq have the same key. 
Then when a new value for that key is found it is added to this Seq collection 
and the update function is called.

Is my understanding correct?

-Adrian

From: Tathagata Das [mailto:tathagata.das1...@gmail.com]
Sent: April-29-14 4:57 PM
To: user@spark.apache.org
Cc: u...@spark.incubator.apache.org
Subject: Re: What is Seq[V] in updateStateByKey?

You may have already seen it, but I will mention it anyways. This example may 
help.
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala

Here the state is essentially a running count of the words seen. So the value 
type (i.e, V) is Int (count of a word in each batch) and the state type (i.e. 
S) is also a Int (running count). The updateFunction essentially sums up the 
running count with the new count and to generate a new running count.

TD


On Tue, Apr 29, 2014 at 1:49 PM, Sean Owen 
mailto:so...@cloudera.com>> wrote:
The original DStream is of (K,V). This function creates a DStream of
(K,S). Each time slice brings one or more new V for each K. The old
state S (can be different from V!) for each K -- possibly non-existent
-- is updated in some way by a bunch of new V, to produce a new state
S -- which also might not exist anymore after update. That's why the
function is from a Seq[V], and an Option[S], to an Option[S].

If you RDD has value type V = Double then your function needs to
update state based on a new Seq[Double] at each time slice, since
Doubles are the new thing arriving for each key at each time slice.


On Tue, Apr 29, 2014 at 7:50 PM, Adrian Mocanu
mailto:amoc...@verticalscope.com>> wrote:
> What is Seq[V] in updateStateByKey?
>
> Does this store the collected tuples of the RDD in a collection?
>
>
>
> Method signature:
>
> def updateStateByKey[S: ClassTag] ( updateFunc: (Seq[V], Option[S]) =>
> Option[S] ): DStream[(K, S)]
>
>
>
> In my case I used Seq[Double] assuming a sequence of Doubles in the RDD; the
> moment I switched to a different type like Seq[(String, Double)] the code
> didn’t compile.
>
>
>
> -Adrian
>
>



Re: Reading multiple S3 objects, transforming, writing back one

2014-04-30 Thread Nicholas Chammas
Yes, saveAsTextFile() will give you 1 part per RDD partition. When you
coalesce(1), you move everything in the RDD to a single partition, which
then gives you 1 output file.

It will still be called part-0 or something like that because that’s
defined by the Hadoop API that Spark uses for reading to/writing from S3. I
don’t know of a way to change that.


On Wed, Apr 30, 2014 at 2:47 PM, Peter  wrote:

> Ah, looks like RDD.coalesce(1) solves one part of the problem.
>   On Wednesday, April 30, 2014 11:15 AM, Peter 
> wrote:
>  Hi
>
> Playing around with Spark & S3, I'm opening multiple objects (CSV files)
> with:
>
> val hfile = sc.textFile("s3n://bucket/2014-04-28/")
>
> so hfile is a RDD representing 10 objects that were "underneath"
> 2014-04-28. After I've sorted and otherwise transformed the content, I'm
> trying to write it back to a single object:
>
>
> sortedMap.values.map(_.mkString(",")).saveAsTextFile("s3n://bucket/concatted.csv")
>
> unfortunately this results in a "folder" named concatted.csv with 10
> objects underneath, part-0 .. part-00010, corresponding to the 10
> original objects loaded.
>
> How can I achieve the desired behaviour of putting a single object named
> concatted.csv ?
>
> I've tried 0.9.1 and 1.0.0-RC3.
>
> Thanks!
> Peter
>
>
>
>
>


Re: Union of 2 RDD's only returns the first one

2014-04-30 Thread Mingyu Kim
I agree with you in general that as an API user, I shouldn’t be relying on
code. However, without looking at the code, there is no way for me to find
out even whether map() keeps the row order. Without the knowledge at all,
I’d need to do “sort” every time I need certain things in a certain order.
(and, sort is really expensive.) On the other hand, if I can assume, say,
“filter” or “map” doesn’t shuffle the rows around, I can do the sort once
and assume that the order is retained throughout such operations saving a
lot of time from doing unnecessary sorts.

Mingyu

From:  Mark Hamstra 
Reply-To:  "user@spark.apache.org" 
Date:  Wednesday, April 30, 2014 at 11:36 AM
To:  "user@spark.apache.org" 
Subject:  Re: Union of 2 RDD's only returns the first one

Which is what you shouldn't be doing as an API user, since that
implementation code might change.  The documentation doesn't mention a row
ordering guarantee, so none should be assumed.

It is hard enough for us to correctly document all of the things that the
API does do.  We really shouldn't be forced into the expectation that we
will also fully document everything that the API doesn't do.


On Wed, Apr 30, 2014 at 11:13 AM, Mingyu Kim  wrote:
> Okay, that makes sense. It’d be great if this can be better documented at
> some point, because the only way to find out about the resulting RDD row
> order is by looking at the code.
> 
> Thanks for the discussion!
> 
> Mingyu
> 
> 
> 
> 
> On 4/29/14, 11:59 PM, "Patrick Wendell"  wrote:
> 
>> >I don't think we guarantee anywhere that union(A, B) will behave by
>> >concatenating the partitions, it just happens to be an artifact of the
>> >current implementation.
>> >
>> >rdd1 = [1,2,3]
>> >rdd2 = [1,4,5]
>> >
>> >rdd1.union(rdd2) = [1,2,3,1,4,5] // how it is now
>> >rdd1.union(rdd2) = [1,4,5,1,2,3] // some day it could be like this, it
>> >wouldn't violate the contract of union
>> >
>> >AFIAK the only guarentee is the resulting RDD will contain all elements.
>> >
>> >- Patrick
>> >
>> >On Tue, Apr 29, 2014 at 11:26 PM, Mingyu Kim  wrote:
>>> >> Yes, that’s what I meant. Sure, the numbers might not be actually
>>> >>sorted,
>>> >> but the order of rows semantically are kept throughout non-shuffling
>>> >> transforms. I’m on board with you on union as well.
>>> >>
>>> >> Back to the original question, then, why is it important to coalesce to
>>> >>a
>>> >> single partition? When you union two RDDs, for example, rdd1 = [“a, b,
>>> >> c”], rdd2 = [“1, 2, 3”, “4, 5, 6”], then
>>> >> rdd1.union(rdd2).saveAsTextFile(…) should’ve resulted in a file with
>>> >>three
>>> >> lines “a, b, c”, “1, 2, 3”, and “4, 5, 6” because the partitions from
>>> >>the
>>> >> two reds are concatenated.
>>> >>
>>> >> Mingyu
>>> >>
>>> >>
>>> >>
>>> >>
>>> >> On 4/29/14, 10:55 PM, "Patrick Wendell"  wrote:
>>> >>
 >>>If you call map() on an RDD it will retain the ordering it had before,
 >>>but that is not necessarily a correct sort order for the new RDD.
 >>>
 >>>var rdd = sc.parallelize([2, 1, 3]);
 >>>var sorted = rdd.map(x => (x, x)).sort(); // should be [1, 2, 3]
 >>>var mapped = sorted.mapValues(x => 3 - x); // should be [2, 1, 0]
 >>>
 >>>Note that mapped is no longer sorted.
 >>>
 >>>When you union two RDD's together it will effectively concatenate the
 >>>two orderings, which is also not a valid sorted order on the new RDD:
 >>>
 >>>rdd1 = [1,2,3]
 >>>rdd2 = [1,4,5]
 >>>
 >>>rdd1.union(rdd2) = [1,2,3,1,4,5]
 >>>
 >>>On Tue, Apr 29, 2014 at 10:44 PM, Mingyu Kim  wrote:
>  Thanks for the quick response!
> 
>  To better understand it, the reason sorted RDD has a well-defined
> ordering
>  is because sortedRDD.getPartitions() returns the partitions in the
> right
>  order and each partition internally is properly sorted. So, if you
> have
> 
>  var rdd = sc.parallelize([2, 1, 3]);
>  var sorted = rdd.map(x => (x, x)).sort(); // should be [1, 2, 3]
>  var mapped = sorted.mapValues(x => x + 1); // should be [2, 3, 4]
> 
>  Since mapValues doesn’t change the order of partitions not change the
>  order of rows within the partitions, I think “mapped” should have the
>  exact same order as “sorted”. Sure, if a transform involves
> shuffling,
> the
>  order will change. Am I mistaken? Is there an extra detail in
> sortedRDD
>  that guarantees a well-defined ordering?
> 
>  If it’s true that the order of partitions returned by
> RDD.getPartitions()
>  and the row orders within the partitions determine the row order, I’m
> not
>  sure why union doesn’t respect the order because union operation
> simply
>  concatenates the two lists of partitions from the two RDDs.
> 
>  Mingyu
> 
> 
> 
> 
>  On 4/

Re: Reading multiple S3 objects, transforming, writing back one

2014-04-30 Thread Peter
Ah, looks like RDD.coalesce(1) solves one part of the problem.
On Wednesday, April 30, 2014 11:15 AM, Peter  wrote:
 
Hi

Playing around with Spark & S3, I'm opening multiple objects (CSV files) with:

    val hfile = sc.textFile("s3n://bucket/2014-04-28/")

so hfile is a RDD representing 10 objects that were "underneath" 2014-04-28. 
After I've sorted and otherwise transformed the content, I'm trying to write it 
back to a single object:

    
sortedMap.values.map(_.mkString(",")).saveAsTextFile("s3n://bucket/concatted.csv")

unfortunately this results in a "folder" named concatted.csv with 10 objects 
underneath, part-0 .. part-00010, corresponding to the 10 original objects 
loaded. 

How can I achieve the desired behaviour of putting a single object named 
concatted.csv ?

I've tried 0.9.1 and 1.0.0-RC3. 

Thanks!
Peter

Re: Union of 2 RDD's only returns the first one

2014-04-30 Thread Mark Hamstra
Which is what you shouldn't be doing as an API user, since that
implementation code might change.  The documentation doesn't mention a row
ordering guarantee, so none should be assumed.

It is hard enough for us to correctly document all of the things that the
API does do.  We really shouldn't be forced into the expectation that we
will also fully document everything that the API doesn't do.


On Wed, Apr 30, 2014 at 11:13 AM, Mingyu Kim  wrote:

> Okay, that makes sense. It’d be great if this can be better documented at
> some point, because the only way to find out about the resulting RDD row
> order is by looking at the code.
>
> Thanks for the discussion!
>
> Mingyu
>
>
>
>
> On 4/29/14, 11:59 PM, "Patrick Wendell"  wrote:
>
> >I don't think we guarantee anywhere that union(A, B) will behave by
> >concatenating the partitions, it just happens to be an artifact of the
> >current implementation.
> >
> >rdd1 = [1,2,3]
> >rdd2 = [1,4,5]
> >
> >rdd1.union(rdd2) = [1,2,3,1,4,5] // how it is now
> >rdd1.union(rdd2) = [1,4,5,1,2,3] // some day it could be like this, it
> >wouldn't violate the contract of union
> >
> >AFIAK the only guarentee is the resulting RDD will contain all elements.
> >
> >- Patrick
> >
> >On Tue, Apr 29, 2014 at 11:26 PM, Mingyu Kim  wrote:
> >> Yes, that’s what I meant. Sure, the numbers might not be actually
> >>sorted,
> >> but the order of rows semantically are kept throughout non-shuffling
> >> transforms. I’m on board with you on union as well.
> >>
> >> Back to the original question, then, why is it important to coalesce to
> >>a
> >> single partition? When you union two RDDs, for example, rdd1 = [“a, b,
> >> c”], rdd2 = [“1, 2, 3”, “4, 5, 6”], then
> >> rdd1.union(rdd2).saveAsTextFile(…) should’ve resulted in a file with
> >>three
> >> lines “a, b, c”, “1, 2, 3”, and “4, 5, 6” because the partitions from
> >>the
> >> two reds are concatenated.
> >>
> >> Mingyu
> >>
> >>
> >>
> >>
> >> On 4/29/14, 10:55 PM, "Patrick Wendell"  wrote:
> >>
> >>>If you call map() on an RDD it will retain the ordering it had before,
> >>>but that is not necessarily a correct sort order for the new RDD.
> >>>
> >>>var rdd = sc.parallelize([2, 1, 3]);
> >>>var sorted = rdd.map(x => (x, x)).sort(); // should be [1, 2, 3]
> >>>var mapped = sorted.mapValues(x => 3 - x); // should be [2, 1, 0]
> >>>
> >>>Note that mapped is no longer sorted.
> >>>
> >>>When you union two RDD's together it will effectively concatenate the
> >>>two orderings, which is also not a valid sorted order on the new RDD:
> >>>
> >>>rdd1 = [1,2,3]
> >>>rdd2 = [1,4,5]
> >>>
> >>>rdd1.union(rdd2) = [1,2,3,1,4,5]
> >>>
> >>>On Tue, Apr 29, 2014 at 10:44 PM, Mingyu Kim  wrote:
>  Thanks for the quick response!
> 
>  To better understand it, the reason sorted RDD has a well-defined
> ordering
>  is because sortedRDD.getPartitions() returns the partitions in the
> right
>  order and each partition internally is properly sorted. So, if you
> have
> 
>  var rdd = sc.parallelize([2, 1, 3]);
>  var sorted = rdd.map(x => (x, x)).sort(); // should be [1, 2, 3]
>  var mapped = sorted.mapValues(x => x + 1); // should be [2, 3, 4]
> 
>  Since mapValues doesn’t change the order of partitions not change the
>  order of rows within the partitions, I think “mapped” should have the
>  exact same order as “sorted”. Sure, if a transform involves shuffling,
> the
>  order will change. Am I mistaken? Is there an extra detail in
> sortedRDD
>  that guarantees a well-defined ordering?
> 
>  If it’s true that the order of partitions returned by
> RDD.getPartitions()
>  and the row orders within the partitions determine the row order, I’m
> not
>  sure why union doesn’t respect the order because union operation
> simply
>  concatenates the two lists of partitions from the two RDDs.
> 
>  Mingyu
> 
> 
> 
> 
>  On 4/29/14, 10:25 PM, "Patrick Wendell"  wrote:
> 
> >You are right, once you sort() the RDD, then yes it has a well defined
> >ordering.
> >
> >But that ordering is lost as soon as you transform the RDD, including
> >if you union it with another RDD.
> >
> >On Tue, Apr 29, 2014 at 10:22 PM, Mingyu Kim 
> >wrote:
> >> Hi Patrick,
> >>
> >> I¹m a little confused about your comment that RDDs are not ordered.
> >>As
> >>far
> >> as I know, RDDs keep list of partitions that are ordered and this is
> >>why I
> >> can call RDD.take() and get the same first k rows every time I call
> >>it
> >>and
> >> RDD.take() returns the same entries as RDD.map(Š).take() because map
> >> preserves the partition order. RDD order is also what allows me to
> >>get
> >>the
> >> top k out of RDD by doing RDD.sort().take().
> >>
> >> Am I misunderstanding it? Or, is it just when RDD is written to disk
> >>that
> >> the order is not wel

Reading multiple S3 objects, transforming, writing back one

2014-04-30 Thread Peter
Hi

Playing around with Spark & S3, I'm opening multiple objects (CSV files) with:

    val hfile = sc.textFile("s3n://bucket/2014-04-28/")

so hfile is a RDD representing 10 objects that were "underneath" 2014-04-28. 
After I've sorted and otherwise transformed the content, I'm trying to write it 
back to a single object:

    
sortedMap.values.map(_.mkString(",")).saveAsTextFile("s3n://bucket/concatted.csv")

unfortunately this results in a "folder" named concatted.csv with 10 objects 
underneath, part-0 .. part-00010, corresponding to the 10 original objects 
loaded. 

How can I achieve the desired behaviour of putting a single object named 
concatted.csv ?

I've tried 0.9.1 and 1.0.0-RC3. 

Thanks!
Peter

Re: Union of 2 RDD's only returns the first one

2014-04-30 Thread Mingyu Kim
Okay, that makes sense. It’d be great if this can be better documented at
some point, because the only way to find out about the resulting RDD row
order is by looking at the code.

Thanks for the discussion!

Mingyu




On 4/29/14, 11:59 PM, "Patrick Wendell"  wrote:

>I don't think we guarantee anywhere that union(A, B) will behave by
>concatenating the partitions, it just happens to be an artifact of the
>current implementation.
>
>rdd1 = [1,2,3]
>rdd2 = [1,4,5]
>
>rdd1.union(rdd2) = [1,2,3,1,4,5] // how it is now
>rdd1.union(rdd2) = [1,4,5,1,2,3] // some day it could be like this, it
>wouldn't violate the contract of union
>
>AFIAK the only guarentee is the resulting RDD will contain all elements.
>
>- Patrick
>
>On Tue, Apr 29, 2014 at 11:26 PM, Mingyu Kim  wrote:
>> Yes, that’s what I meant. Sure, the numbers might not be actually
>>sorted,
>> but the order of rows semantically are kept throughout non-shuffling
>> transforms. I’m on board with you on union as well.
>>
>> Back to the original question, then, why is it important to coalesce to
>>a
>> single partition? When you union two RDDs, for example, rdd1 = [“a, b,
>> c”], rdd2 = [“1, 2, 3”, “4, 5, 6”], then
>> rdd1.union(rdd2).saveAsTextFile(…) should’ve resulted in a file with
>>three
>> lines “a, b, c”, “1, 2, 3”, and “4, 5, 6” because the partitions from
>>the
>> two reds are concatenated.
>>
>> Mingyu
>>
>>
>>
>>
>> On 4/29/14, 10:55 PM, "Patrick Wendell"  wrote:
>>
>>>If you call map() on an RDD it will retain the ordering it had before,
>>>but that is not necessarily a correct sort order for the new RDD.
>>>
>>>var rdd = sc.parallelize([2, 1, 3]);
>>>var sorted = rdd.map(x => (x, x)).sort(); // should be [1, 2, 3]
>>>var mapped = sorted.mapValues(x => 3 - x); // should be [2, 1, 0]
>>>
>>>Note that mapped is no longer sorted.
>>>
>>>When you union two RDD's together it will effectively concatenate the
>>>two orderings, which is also not a valid sorted order on the new RDD:
>>>
>>>rdd1 = [1,2,3]
>>>rdd2 = [1,4,5]
>>>
>>>rdd1.union(rdd2) = [1,2,3,1,4,5]
>>>
>>>On Tue, Apr 29, 2014 at 10:44 PM, Mingyu Kim  wrote:
 Thanks for the quick response!

 To better understand it, the reason sorted RDD has a well-defined
ordering
 is because sortedRDD.getPartitions() returns the partitions in the
right
 order and each partition internally is properly sorted. So, if you
have

 var rdd = sc.parallelize([2, 1, 3]);
 var sorted = rdd.map(x => (x, x)).sort(); // should be [1, 2, 3]
 var mapped = sorted.mapValues(x => x + 1); // should be [2, 3, 4]

 Since mapValues doesn’t change the order of partitions not change the
 order of rows within the partitions, I think “mapped” should have the
 exact same order as “sorted”. Sure, if a transform involves shuffling,
the
 order will change. Am I mistaken? Is there an extra detail in
sortedRDD
 that guarantees a well-defined ordering?

 If it’s true that the order of partitions returned by
RDD.getPartitions()
 and the row orders within the partitions determine the row order, I’m
not
 sure why union doesn’t respect the order because union operation
simply
 concatenates the two lists of partitions from the two RDDs.

 Mingyu




 On 4/29/14, 10:25 PM, "Patrick Wendell"  wrote:

>You are right, once you sort() the RDD, then yes it has a well defined
>ordering.
>
>But that ordering is lost as soon as you transform the RDD, including
>if you union it with another RDD.
>
>On Tue, Apr 29, 2014 at 10:22 PM, Mingyu Kim 
>wrote:
>> Hi Patrick,
>>
>> I¹m a little confused about your comment that RDDs are not ordered.
>>As
>>far
>> as I know, RDDs keep list of partitions that are ordered and this is
>>why I
>> can call RDD.take() and get the same first k rows every time I call
>>it
>>and
>> RDD.take() returns the same entries as RDD.map(Š).take() because map
>> preserves the partition order. RDD order is also what allows me to
>>get
>>the
>> top k out of RDD by doing RDD.sort().take().
>>
>> Am I misunderstanding it? Or, is it just when RDD is written to disk
>>that
>> the order is not well preserved? Thanks in advance!
>>
>> Mingyu
>>
>>
>>
>>
>> On 1/22/14, 4:46 PM, "Patrick Wendell"  wrote:
>>
>>>Ah somehow after all this time I've never seen that!
>>>
>>>On Wed, Jan 22, 2014 at 4:45 PM, Aureliano Buendia
>>>
>>>wrote:



 On Thu, Jan 23, 2014 at 12:37 AM, Patrick Wendell

 wrote:
>
> What is the ++ operator here? Is this something you defined?


 No, it's an alias for union defined in RDD.scala:

 def ++(other: RDD[T]): RDD[T] = this.union(other)

>
>
> Another issue is that RDD's are not ordere

Re: processing s3n:// files in parallel

2014-04-30 Thread foundart
Thanks, Andrew.  As it turns out, the tasks were getting processed in
parallel in separate threads on the same node.  Using the parallel
collection of hadoop files was sufficient to trigger that but my expectation
that the tasks would be spread across nodes rather than cores on a single
node led me not to see that right away.

val matches = hadoopFiles.par.map((hadoopFile) ...








--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/processing-s3n-files-in-parallel-tp4989p5116.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: the spark configuage

2014-04-30 Thread Diana Carroll
I'm guessing your shell stopping when it attempts to connect to the RM is
not related to that warning.  You'll get that message out of the box from
Spark if you don't have HADOOP_HOME set correctly.  I'm using CDH 5.0
installed in default locations, and got rid of the warning by setting
HADOOP_HOME to /usr/lib/hadoop.  The stopping issue might be something
unrelated.

Diana


On Wed, Apr 30, 2014 at 3:58 AM, Sophia  wrote:

> Hi,
> when I configue spark, run the shell instruction:
> ./spark-shellit told me like this:
> WARN:NativeCodeLoader:Uable to load native-hadoop livrary for your
> builtin-java classes where applicable,when it connect to ResourceManager,it
> stopped. What should I DO?
> Wish your reply
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/the-spark-configuage-tp5098.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


new Washington DC Area Spark Meetup

2014-04-30 Thread Donna-M. Fernandez
Hi, all!  For those in the Washington DC area (DC/MD/VA), we just started a
new Spark Meetup.  We'd love for you to join!  -d

Here's the link: http://www.meetup.com/Washington-DC-Area-Spark-Interactive/


Description:

This is an interactive meetup for Washington DC, Virginia and Maryland
users, enthusiasts, and explorers of Apache Spark (www.spark-project.org).
Spark is the powerful open source Data processing framework that extends and
accelerates Hadoop; built around speed, ease of use, and sophisticated
analytics. This is a very interactive meetup where we will exchange ideas,
inspire and learn from each other, and bring the top minds and innovators
around big data and real-time solutions to the table. This meetup is meant
for both the business and technical community (both the curious and the
experts) interested in the "how" and the "why" of Sparks' capabilities. The
meetup will include introductions to the various Spark features and
functions, case studies from current users, best practices for deployment,
and speakers from the top technology leaders and vendors who are helping
craft Spark's roadmap for the future. Let's build this exciting Spark
community together!


Thanks,
Donna-M. Fernandez | VP of Operations & Services | MetiStream, Inc. |
do...@metistream.com | 202.642.3220 





<>

Can a job running on a cluster read from a local file path ?

2014-04-30 Thread Shubhabrata
1) Can a job (python script), running on a standalone cluster read from local
file path ? 
2) Does sc.addPyFile(path) create a directory or only copies the file ?
3) If the path contains a zip file, does it automatically gets unzipped ? 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Can-a-job-running-on-a-cluster-read-from-a-local-file-path-tp5112.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Shuffle phase is very slow, any help, thx!

2014-04-30 Thread Daniel Darabos
So the problem is that 99 tasks are fast (< 1 second), but 1 task is really
slow (5+ hours), is that right? And your operation is graph.vertices.count?
That is odd, but it could be that this job includes running previous
transformations. How did you construct the graph?

On Tue, Apr 29, 2014 at 3:41 AM, gogototo  wrote:

> I has an application using grapx, and some phase is very slow.
>

That would look great on a T-shirt!

Stage IdDescription Submitted   Duration ▴  Tasks:
> Succeeded/Total  Shuffle
> ReadShuffle Write
> 282 reduce at VertexRDD.scala:912014/04/28 14:07:13  5.20 h
> 100/100 3.8 MB
> 419 zipPartitions at ReplicatedVertexView.scala:101 2014/04/28 22:18:37
> 5.14 h  100/100 71.3 KB 4.5 MB
>
> In it, you can see task info as below:
> 94  5758SUCCESS PROCESS_LOCAL   BP-YZH-2-5971.360buy.com
>  2014/04/28 14:07:13
> 54 ms37.7 KB
> 71  5759SUCCESS PROCESS_LOCAL   BP-YZH-2-5978.360buy.com
>  2014/04/28 14:07:13
> 15 ms38.7 KB
> 14  5760SUCCESS PROCESS_LOCAL   BP-YZH-2-5977.360buy.com
>  2014/04/28 14:07:16
> 585 ms   38.6 KB
> 91  5761SUCCESS PROCESS_LOCAL   BP-YZH-2-5977.360buy.com
>  2014/04/28 14:07:16
> 209 ms   38.3 KB
> 53  5762SUCCESS NODE_LOCAL  BP-YZH-2-5977.360buy.com
>  2014/04/28 14:07:19
> 5.20 h   40.8 s  39.6 KB
>
> And in the slow task, can see log:
> 14/04/29 09:30:10 INFO
> storage.BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight:
> 50331648, minRequest: 10066329
> 14/04/29 09:30:10 INFO
> storage.BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight:
> 50331648, minRequest: 10066329
> 14/04/29 09:30:10 INFO
> storage.BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight:
> 50331648, minRequest: 10066329
> 14/04/29 09:30:10 INFO
> storage.BlockFetcherIterator$BasicBlockFetcherIterator: Getting 100
> non-zero-bytes blocks out of 100 blocks
> 14/04/29 09:30:10 INFO
> storage.BlockFetcherIterator$BasicBlockFetcherIterator: Started 45 remote
> gets in  2 ms
>
>
> Why this? How to solve it? Many Thx!
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-phase-is-very-slow-any-help-thx-tp5004.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: something about memory usage

2014-04-30 Thread Daniel Darabos
On Wed, Apr 30, 2014 at 1:52 PM, wxhsdp  wrote:

> Hi, guys
>
>   i want to do some optimizations of my spark codes. i use VisualVM to
> monitor the executor when run the app.
>   here's the snapshot:
> <
> http://apache-spark-user-list.1001560.n3.nabble.com/file/n5107/executor.png
> >
>
> from the snapshot, i can get the memory usage information about the
> executor, but the executor contains lots of tasks. is it possible to get
> the
> memory usage of one single task in JVM with GC running in the background?
>

I guess you could run 1-core slaves. That way they would only work on one
task at a time.

by the way, you can see every time when memory is consumed up to 90%, JVM
> does GC operation.
> i'am a little confused about that. i originally thought that 60% of the
> memory is kept for Spark's memory cache(i did not cache any RDDs in my
> application), so there was only 40% left for running the app.
>

The way I understand it, Spark does not have a tight control on the memory.
Your code running on the executor can easily use more than 40% of memory.
Spark only limits the memory used for RDD caches and shuffles. If its RDD
caches are full, taking up 60% of the heap, and your code takes up more
than 40% (after GC), the executor will die with OOM.

I suppose there is not much Spark could do about this. You cannot control
how much memory a function you call is allowed to use.


something about memory usage

2014-04-30 Thread wxhsdp
Hi, guys

  i want to do some optimizations of my spark codes. i use VisualVM to
monitor the executor when run the app.
  here's the snapshot:
 

from the snapshot, i can get the memory usage information about the
executor, but the executor contains lots of tasks. is it possible to get the
memory usage of one single task in JVM with GC running in the background?

by the way, you can see every time when memory is consumed up to 90%, JVM
does GC operation.
i'am a little confused about that. i originally thought that 60% of the
memory is kept for Spark's memory cache(i did not cache any RDDs in my
application), so there was only 40% left for running the app.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/something-about-memory-usage-tp5107.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: the spark configuage

2014-04-30 Thread Andras Nemeth
On 30 Apr 2014 10:35, "Akhil Das"  wrote:
>
> Hi
>
> The reason you saw that warning is the native Hadoop library
$HADOOP_HOME/lib/native/libhadoop.so.1.0.0 was actually compiled on 32 bit.
>
> Anyway, it's just a warning, and won't impact Hadoop's functionalities.
>
> Here is the way if you do want to eliminate this warning, download the
source code of Hadoop and recompile libhadoop.so.1.0.0 on 64bit system,
then replace the 32bit one.
>
> Steps on how to recompile source code are included here for Ubuntu:
>
>
http://www.ercoppa.org/Linux-Compile-Hadoop-220-fix-Unable-to-load-native-hadoop-library.htm
>
http://www.csrdu.org/nauman/2014/01/23/geting-started-with-hadoop-2-2-0-building/
>
> Good luck.
>
>
> On Wed, Apr 30, 2014 at 1:28 PM, Sophia  wrote:
>>
>> Hi,
>> when I configue spark, run the shell instruction:
>> ./spark-shellit told me like this:
>> WARN:NativeCodeLoader:Uable to load native-hadoop livrary for your
>> builtin-java classes where applicable,when it connect to
ResourceManager,it
>> stopped. What should I DO?
>> Wish your reply
>>
>>
>>
>> --
>> View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/the-spark-configuage-tp5098.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
>


Re: NoSuchMethodError from Spark Java

2014-04-30 Thread Andras Nemeth
On 30 Apr 2014 06:59, "Patrick Wendell"  wrote:
>
> The signature of this function was changed in spark 1.0... is there
> any chance that somehow you are actually running against a newer
> version of Spark?
>
> On Tue, Apr 29, 2014 at 8:58 PM, wxhsdp  wrote:
> > i met with the same question when update to spark 0.9.1
> > (svn checkout https://github.com/apache/spark/)
> >
> > Exception in thread "main" java.lang.NoSuchMethodError:
> >
org.apache.spark.SparkContext$.jarOfClass(Ljava/lang/Class;)Lscala/collection/Seq;
> > at
org.apache.spark.examples.GroupByTest$.main(GroupByTest.scala:38)
> > at org.apache.spark.examples.GroupByTest.main(GroupByTest.scala)
> >
> > sbt.buid:
> > name := "GroupByTest"
> >
> > version := "1.0"
> >
> > scalaVersion := "2.10.4"
> >
> > libraryDependencies += "org.apache.spark" %% "spark-core" % "0.9.1"
> >
> > resolvers += "Akka Repository" at "http://repo.akka.io/releases/";
> >
> > is there something need to modify?
> >
> >
> >
> >
> > --
> > View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/NoSuchMethodError-from-Spark-Java-tp4937p5076.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: the spark configuage

2014-04-30 Thread Rahul Singhal
Hi,

Just in case you already have the 64 bit version, the following works for me on 
spark 0.9.1

SPARK_LIBRARY_PATH=/opt/hadoop/lib/native/ ./bin/spark-shell

(where my libhadoop.so is present in /opt/hadoop/lib/native/)

Thanks,
Rahul Singhal

From: Akhil Das mailto:ak...@sigmoidanalytics.com>>
Reply-To: "user@spark.apache.org" 
mailto:user@spark.apache.org>>
Date: Wednesday 30 April 2014 2:06 PM
To: "user@spark.apache.org" 
mailto:user@spark.apache.org>>
Cc: "u...@spark.incubator.apache.org" 
mailto:u...@spark.incubator.apache.org>>
Subject: Re: the spark configuage

Hi

The reason you saw that warning is the native Hadoop library 
$HADOOP_HOME/lib/native/libhadoop.so.1.0.0 was actually compiled on 32 bit.

Anyway, it's just a warning, and won't impact Hadoop's functionalities.

Here is the way if you do want to eliminate this warning, download the source 
code of Hadoop and recompile libhadoop.so.1.0.0 on 64bit system, then replace 
the 32bit one.

Steps on how to recompile source code are included here for Ubuntu:

http://www.ercoppa.org/Linux-Compile-Hadoop-220-fix-Unable-to-load-native-hadoop-library.htm
http://www.csrdu.org/nauman/2014/01/23/geting-started-with-hadoop-2-2-0-building/

Good luck.

Thanks
Best Regards


On Wed, Apr 30, 2014 at 1:28 PM, Sophia 
mailto:sln-1...@163.com>> wrote:
Hi,
when I configue spark, run the shell instruction:
./spark-shellit told me like this:
WARN:NativeCodeLoader:Uable to load native-hadoop livrary for your
builtin-java classes where applicable,when it connect to ResourceManager,it
stopped. What should I DO?
Wish your reply



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/the-spark-configuage-tp5098.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: the spark configuage

2014-04-30 Thread Akhil Das
Hi

The reason you saw that warning is the native Hadoop library
$HADOOP_HOME/lib/native/libhadoop.so.1.0.0 was actually compiled on 32 bit.

Anyway, it's just a warning, and won't impact Hadoop's functionalities.

Here is the way if you do want to eliminate this warning, download the
source code of Hadoop and recompile libhadoop.so.1.0.0 on 64bit system,
then replace the 32bit one.

Steps on how to recompile source code are included here for Ubuntu:

http://www.ercoppa.org/Linux-Compile-Hadoop-220-fix-Unable-to-load-native-hadoop-library.htm
http://www.csrdu.org/nauman/2014/01/23/geting-started-with-hadoop-2-2-0-building/

Good luck.

Thanks
Best Regards


On Wed, Apr 30, 2014 at 1:28 PM, Sophia  wrote:

> Hi,
> when I configue spark, run the shell instruction:
> ./spark-shellit told me like this:
> WARN:NativeCodeLoader:Uable to load native-hadoop livrary for your
> builtin-java classes where applicable,when it connect to ResourceManager,it
> stopped. What should I DO?
> Wish your reply
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/the-spark-configuage-tp5098.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: the spark configuage

2014-04-30 Thread Akhil Das
Hi

The reason you saw that warning is the native Hadoop library
$HADOOP_HOME/lib/native/libhadoop.so.1.0.0 was actually compiled on 32 bit.

Anyway, it's just a warning, and won't impact Hadoop's functionalities.

Here is the way if you do want to eliminate this warning, download the
source code of Hadoop and recompile libhadoop.so.1.0.0 on 64bit system,
then replace the 32bit one.

Steps on how to recompile source code are included here for Ubuntu:

http://www.ercoppa.org/Linux-Compile-Hadoop-220-fix-Unable-to-load-native-hadoop-library.htm
http://www.csrdu.org/nauman/2014/01/23/geting-started-with-hadoop-2-2-0-building/

Good luck.


On Wed, Apr 30, 2014 at 1:28 PM, Sophia  wrote:

> Hi,
> when I configue spark, run the shell instruction:
> ./spark-shellit told me like this:
> WARN:NativeCodeLoader:Uable to load native-hadoop livrary for your
> builtin-java classes where applicable,when it connect to ResourceManager,it
> stopped. What should I DO?
> Wish your reply
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/the-spark-configuage-tp5098.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: Joining not-pair RDDs in Spark

2014-04-30 Thread jsantos
That's the approach I finally used.
Thanks for your help :-)




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Joining-not-pair-RDDs-in-Spark-tp5034p5099.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


the spark configuage

2014-04-30 Thread Sophia
Hi,
when I configue spark, run the shell instruction:
./spark-shellit told me like this:
WARN:NativeCodeLoader:Uable to load native-hadoop livrary for your
builtin-java classes where applicable,when it connect to ResourceManager,it
stopped. What should I DO?
Wish your reply



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/the-spark-configuage-tp5098.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Shuffle Spill Issue

2014-04-30 Thread Daniel Darabos
Whoops, you are right. Sorry for the misinformation. Indeed reduceByKey
just calls combineByKey:

def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] =
{
  combineByKey[V]((v: V) => v, func, func, partitioner)
}

(I think I confused reduceByKey with groupByKey.)


On Wed, Apr 30, 2014 at 2:55 AM, Liu, Raymond  wrote:

> Hi Daniel
>
> Thanks for your reply, While I think for reduceByKey, it will also
> do map side combine, thus extra the result is the same, say, for each
> partition, one entry per distinct word. In my case with javaserializer,
>  240MB dataset yield to around 70MB shuffle data. Only that shuffle Spill (
> memory ) is abnormal, and sounds to me should not trigger at all. And, by
> the way, this behavior only occurs in map out side, on reduce / shuffle
> fetch side, this strange behavior won't happen.
>
> Best Regards,
> Raymond Liu
>
> From: Daniel Darabos [mailto:daniel.dara...@lynxanalytics.com]
>
> I have no idea why shuffle spill is so large. But this might make it
> smaller:
>
> val addition = (a: Int, b: Int) => a + b
> val wordsCount = wordsPair.combineByKey(identity, addition, addition)
>
> This way only one entry per distinct word will end up in the shuffle for
> each partition, instead of one entry per word occurrence.
>
> On Tue, Apr 29, 2014 at 7:48 AM, Liu, Raymond 
> wrote:
> Hi  Patrick
>
> I am just doing simple word count , the data is generated by
> hadoop random text writer.
>
> This seems to me not quite related to compress , If I turn off
> compress on shuffle, the metrics is something like below for the smaller
> 240MB Dataset.
>
>
> Executor ID Address Task Time   Total Tasks Failed Tasks
>  Succeeded Tasks Shuffle ReadShuffle Write   Shuffle Spill (Memory)
>  Shuffle Spill (Disk)
> 10  sr437:48527 35 s8   0   8   0.0 B   2.5 MB
>  2.2 GB  1291.2 KB
> 12  sr437:46077 34 s8   0   8   0.0 B   2.5 MB
>  1822.6 MB   1073.3 KB
> 13  sr434:37896 31 s8   0   8   0.0 B   2.4 MB
>  1099.2 MB   621.2 KB
> 15  sr438:52819 31 s8   0   8   0.0 B   2.5 MB
>  1898.8 MB   1072.6 KB
> 16  sr434:37103 32 s8   0   8   0.0 B   2.4 MB
>  1638.0 MB   1044.6 KB
>
>
> And the program pretty simple:
>
> val files = sc.textFile(args(1))
> val words = files.flatMap(_.split(" "))
> val wordsPair = words.map(x => (x, 1))
>
> val wordsCount = wordsPair.reduceByKey(_ + _)
> val count = wordsCount.count()
>
> println("Number of words = " + count)
>
>
> Best Regards,
> Raymond Liu
>
> From: Patrick Wendell [mailto:pwend...@gmail.com]
>
> Could you explain more what your job is doing and what data types you are
> using? These numbers alone don't necessarily indicate something is wrong.
> The relationship between the in-memory and on-disk shuffle amount is
> definitely a bit strange, the data gets compressed when written to disk,
> but unless you have a weird dataset (E.g. all zeros) I wouldn't expect it
> to compress _that_ much.
>
> On Mon, Apr 28, 2014 at 1:18 AM, Liu, Raymond 
> wrote:
> Hi
>
>
> I am running a simple word count program on spark standalone
> cluster. The cluster is made up of 6 node, each run 4 worker and each
> worker own 10G memory and 16 core thus total 96 core and 240G memory. (
> well, also used to configed as 1 worker with 40G memory on each node )
>
> I run a very small data set (2.4GB on HDFS on total) to confirm
> the problem here as below:
>
> As you can read from part of the task metrics as below, I noticed
> that the shuffle spill part of metrics indicate that there are something
> wrong.
>
> Executor ID Address Task Time   Total Tasks Failed Tasks
>  Succeeded Tasks Shuffle ReadShuffle Write   Shuffle Spill (Memory)
>  Shuffle Spill (Disk)
> 0   sr437:42139 29 s4   0   4   0.0 B   4.3 MB
>  23.6 GB 4.3 MB
> 1   sr433:46935 1.1 min 4   0   4   0.0 B   4.2 MB
>  19.0 GB 3.4 MB
> 10  sr436:53277 26 s4   0   4   0.0 B   4.3 MB
>  25.6 GB 4.6 MB
> 11  sr437:58872 32 s4   0   4   0.0 B   4.3 MB
>  25.0 GB 4.4 MB
> 12  sr435:48358 27 s4   0   4   0.0 B   4.3 MB
>  25.1 GB 4.4 MB
>
>
> You can see that the Shuffle Spill (Memory) is pretty high, almost 5000x
> of the actual shuffle data and Shuffle Spill (Disk), and also it seems to
> me that by no means that the spill should trigger, since the memory is not
> used up at all.
>
> To verify that I further reduce the data size to 240MB on total
>
> And here is the result:
>
>
> Executor ID Address Task Time   Total Tasks Failed Tasks
>  Succeeded Tasks Shuffle ReadShuffle Write   Shuffle Spill (Memory)
>  Shuffle Spill (Disk)
> 0   sr437:50895 15 s4   0   4   0.0 B   703.0 KB
>  80.0 MB 43.2 KB
> 1   sr433:50207 17 s4   

Re: NoSuchMethodError from Spark Java

2014-04-30 Thread wxhsdp
i fixed it. 

i make my sbt project depend on
spark/trunk/assembly/target/scala-2.10/spark-assembly-1.0.0-SNAPSHOT-hadoop1.0.4.jar
and it works



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/NoSuchMethodError-from-Spark-Java-tp4937p5096.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Union of 2 RDD's only returns the first one

2014-04-30 Thread Patrick Wendell
I don't think we guarantee anywhere that union(A, B) will behave by
concatenating the partitions, it just happens to be an artifact of the
current implementation.

rdd1 = [1,2,3]
rdd2 = [1,4,5]

rdd1.union(rdd2) = [1,2,3,1,4,5] // how it is now
rdd1.union(rdd2) = [1,4,5,1,2,3] // some day it could be like this, it
wouldn't violate the contract of union

AFIAK the only guarentee is the resulting RDD will contain all elements.

- Patrick

On Tue, Apr 29, 2014 at 11:26 PM, Mingyu Kim  wrote:
> Yes, that’s what I meant. Sure, the numbers might not be actually sorted,
> but the order of rows semantically are kept throughout non-shuffling
> transforms. I’m on board with you on union as well.
>
> Back to the original question, then, why is it important to coalesce to a
> single partition? When you union two RDDs, for example, rdd1 = [“a, b,
> c”], rdd2 = [“1, 2, 3”, “4, 5, 6”], then
> rdd1.union(rdd2).saveAsTextFile(…) should’ve resulted in a file with three
> lines “a, b, c”, “1, 2, 3”, and “4, 5, 6” because the partitions from the
> two reds are concatenated.
>
> Mingyu
>
>
>
>
> On 4/29/14, 10:55 PM, "Patrick Wendell"  wrote:
>
>>If you call map() on an RDD it will retain the ordering it had before,
>>but that is not necessarily a correct sort order for the new RDD.
>>
>>var rdd = sc.parallelize([2, 1, 3]);
>>var sorted = rdd.map(x => (x, x)).sort(); // should be [1, 2, 3]
>>var mapped = sorted.mapValues(x => 3 - x); // should be [2, 1, 0]
>>
>>Note that mapped is no longer sorted.
>>
>>When you union two RDD's together it will effectively concatenate the
>>two orderings, which is also not a valid sorted order on the new RDD:
>>
>>rdd1 = [1,2,3]
>>rdd2 = [1,4,5]
>>
>>rdd1.union(rdd2) = [1,2,3,1,4,5]
>>
>>On Tue, Apr 29, 2014 at 10:44 PM, Mingyu Kim  wrote:
>>> Thanks for the quick response!
>>>
>>> To better understand it, the reason sorted RDD has a well-defined
>>>ordering
>>> is because sortedRDD.getPartitions() returns the partitions in the right
>>> order and each partition internally is properly sorted. So, if you have
>>>
>>> var rdd = sc.parallelize([2, 1, 3]);
>>> var sorted = rdd.map(x => (x, x)).sort(); // should be [1, 2, 3]
>>> var mapped = sorted.mapValues(x => x + 1); // should be [2, 3, 4]
>>>
>>> Since mapValues doesn’t change the order of partitions not change the
>>> order of rows within the partitions, I think “mapped” should have the
>>> exact same order as “sorted”. Sure, if a transform involves shuffling,
>>>the
>>> order will change. Am I mistaken? Is there an extra detail in sortedRDD
>>> that guarantees a well-defined ordering?
>>>
>>> If it’s true that the order of partitions returned by
>>>RDD.getPartitions()
>>> and the row orders within the partitions determine the row order, I’m
>>>not
>>> sure why union doesn’t respect the order because union operation simply
>>> concatenates the two lists of partitions from the two RDDs.
>>>
>>> Mingyu
>>>
>>>
>>>
>>>
>>> On 4/29/14, 10:25 PM, "Patrick Wendell"  wrote:
>>>
You are right, once you sort() the RDD, then yes it has a well defined
ordering.

But that ordering is lost as soon as you transform the RDD, including
if you union it with another RDD.

On Tue, Apr 29, 2014 at 10:22 PM, Mingyu Kim  wrote:
> Hi Patrick,
>
> I¹m a little confused about your comment that RDDs are not ordered. As
>far
> as I know, RDDs keep list of partitions that are ordered and this is
>why I
> can call RDD.take() and get the same first k rows every time I call it
>and
> RDD.take() returns the same entries as RDD.map(Š).take() because map
> preserves the partition order. RDD order is also what allows me to get
>the
> top k out of RDD by doing RDD.sort().take().
>
> Am I misunderstanding it? Or, is it just when RDD is written to disk
>that
> the order is not well preserved? Thanks in advance!
>
> Mingyu
>
>
>
>
> On 1/22/14, 4:46 PM, "Patrick Wendell"  wrote:
>
>>Ah somehow after all this time I've never seen that!
>>
>>On Wed, Jan 22, 2014 at 4:45 PM, Aureliano Buendia
>>
>>wrote:
>>>
>>>
>>>
>>> On Thu, Jan 23, 2014 at 12:37 AM, Patrick Wendell
>>>
>>> wrote:

 What is the ++ operator here? Is this something you defined?
>>>
>>>
>>> No, it's an alias for union defined in RDD.scala:
>>>
>>> def ++(other: RDD[T]): RDD[T] = this.union(other)
>>>


 Another issue is that RDD's are not ordered, so when you union two
 together it doesn't have a well defined ordering.

 If you do want to do this you could coalesce into one partition,
then
 call MapPartitions and return an iterator that first adds your
header
 and then the rest of the file, then call saveAsTextFile. Keep in
mind
 this will only work if you coalesce into a single partition.
>>>
>>>
>>> Thank