Re: Which is more efficient : first join three RDDs and then do filtering or vice versa?

2015-03-13 Thread shahab
Thanks, it makes sense.

On Thursday, March 12, 2015, Daniel Siegmann 
wrote:

> Join causes a shuffle (sending data across the network). I expect it will
> be better to filter before you join, so you reduce the amount of data which
> is sent across the network.
>
> Note this would be true for *any* transformation which causes a shuffle.
> It would not be true if you're combining RDDs with union, since that
> doesn't cause a shuffle.
>
> On Thu, Mar 12, 2015 at 11:04 AM, shahab  > wrote:
>
>> Hi,
>>
>> Probably this question is already answered sometime in the mailing list,
>> but i couldn't find it. Sorry for posting this again.
>>
>> I need to to join and apply filtering on three different RDDs, I just
>> wonder which of the following alternatives are more efficient:
>> 1- first joint all three RDDs and then do  filtering on resulting joint
>> RDD   or
>> 2- Apply filtering on each individual RDD and then join the resulting RDDs
>>
>>
>> Or probably there is no difference due to lazy evaluation and under
>> beneath Spark optimisation?
>>
>> best,
>> /Shahab
>>
>
>


Re: Which is more efficient : first join three RDDs and then do filtering or vice versa?

2015-03-12 Thread Daniel Siegmann
Join causes a shuffle (sending data across the network). I expect it will
be better to filter before you join, so you reduce the amount of data which
is sent across the network.

Note this would be true for *any* transformation which causes a shuffle. It
would not be true if you're combining RDDs with union, since that doesn't
cause a shuffle.

On Thu, Mar 12, 2015 at 11:04 AM, shahab  wrote:

> Hi,
>
> Probably this question is already answered sometime in the mailing list,
> but i couldn't find it. Sorry for posting this again.
>
> I need to to join and apply filtering on three different RDDs, I just
> wonder which of the following alternatives are more efficient:
> 1- first joint all three RDDs and then do  filtering on resulting joint
> RDD   or
> 2- Apply filtering on each individual RDD and then join the resulting RDDs
>
>
> Or probably there is no difference due to lazy evaluation and under
> beneath Spark optimisation?
>
> best,
> /Shahab
>


Which is more efficient : first join three RDDs and then do filtering or vice versa?

2015-03-12 Thread shahab
Hi,

Probably this question is already answered sometime in the mailing list,
but i couldn't find it. Sorry for posting this again.

I need to to join and apply filtering on three different RDDs, I just
wonder which of the following alternatives are more efficient:
1- first joint all three RDDs and then do  filtering on resulting joint RDD
  or
2- Apply filtering on each individual RDD and then join the resulting RDDs


Or probably there is no difference due to lazy evaluation and under beneath
Spark optimisation?

best,
/Shahab


Re: Is there a limit to the number of RDDs in a Spark context?

2015-03-12 Thread Juan Rodríguez Hortalá
Hi,

It's been some time since my last message on the subject of using many RDDs
in a Spark job, but I have just encountered the same problem again. The
thing it's that I have an RDD of time tagged data, that I want to 1) divide
into windows according to a timestamp field; 2) compute KMeans for each
time window; 3) collect the results in a RDD of pairs that contains the
centroids for each time window. For 1) I generate a RDD of pairs where the
key is an id for the time window, but for 2) I find the problem that KMeans
from MLlib only accepts a RDD, and I cannot call it from aggregateByKey. I
think this is a reusability problem for any algorithms in MLlib based on
passing an RDD, if we want to apply the algorithm independently to several
groups of data. So the only approaches I can imagine are:

a) Generate an RDD per time window, which is easy to do but doesn't work
because it's easy to end up with thousand of windows hence thousands of
RDDs, which freezes the Spark scheduler, as seen in my previous messages

b) Collect the set of ids for the time windows in the driver, and traverse
that set by generating an RDD per each window, calling KMeans, and then
storing the results with an export action. I will try that now and I think
that could work because only one RDD per window will be present at the same
time. The point here is that we avoid creating an RDD with a lineage
dependending on a thousand RDDs, like in the collecting phase 3) of a). But
that implies a sequential execution of the computation of KMeans, which is
a waste of resources: imagine I have a cluster with 200 machines and I can
compute each call to KMeans in 5 machines in 10 minutes, and I have 1000
windows to compute hence 1000 calls to KMeans; by sequencing the KMeans
computations I would be having 195 idle machines and a running time of 10 *
1000 windows. Maybe this could be overcome by having not 1 RDD but m RDDs
for some number m that doesn't freezes the Spark scheduler, but I think
that's a not very clean workaround.  Also, this makes very difficult to
reuse this computation of KMeans by window in a bigger program, because I'm
not able to get an RDD with a key per window id and the centroids in the
values. The only way I imagine I could do that is by storing the pairs in a
database during the export actions, and then loading all the results in a
single RDD, but I would prefer to do everything inside Spark if possible.

Maybe I'm missing something here, any idea would be appreciated.

Thanks in advance for your help,

Greetings,

Juan Rodriguez


2015-02-18 20:23 GMT+01:00 Juan Rodríguez Hortalá <
juan.rodriguez.hort...@gmail.com>:

> Hi Sean,
>
> Thanks a lot for your answer. That explains it, as I was creating
> thousands of RDDs, so I guess the communication overhead was the reason why
> the Spark job was freezing. After changing the code to use RDDs of pairs
> and aggregateByKey it works just fine, and quite fast.
>
> Again, thanks a lot for your help.
>
> Greetings,
>
> Juan
>
> 2015-02-18 15:35 GMT+01:00 Sean Owen :
>
>> At some level, enough RDDs creates hundreds of thousands of tiny
>> partitions of data each of which creates a task for each stage. The
>> raw overhead of all the message passing can slow things down a lot. I
>> would not design something to use an RDD per key. You would generally
>> use key by some value you want to divide and filter on, and then use a
>> *ByKey to do your work.
>>
>> You don't work with individual RDDs this way, but usually that's good
>> news. You usually have a lot more flexibility operating just in pure
>> Java / Scala to do whatever you need inside your function.
>>
>> On Wed, Feb 18, 2015 at 2:12 PM, Juan Rodríguez Hortalá
>>  wrote:
>> > Hi Paweł,
>> >
>> > Thanks a lot for your answer. I finally got the program to work by using
>> > aggregateByKey, but I was wondering why creating thousands of RDDs
>> doesn't
>> > work. I think that could be interesting for using methods that work on
>> RDDs
>> > like for example JavaDoubleRDD.stats() (
>> >
>> http://spark.apache.org/docs/latest/api/java/org/apache/spark/api/java/JavaDoubleRDD.html#stats%28%29
>> ).
>> > If the groups are small then I can chain groupBy(), collect(),
>> parallelize()
>> > and stats(), but that is quite inefficient because it implies moving
>> data to
>> > and from the driver, and also doesn't scale to big groups; on the other
>> hand
>> > if I use aggregateByKey or a similar function then I cannot use stats()
>> so I
>> > have to reimplement it. In general I was looking for a way to reuse
>> other
>> > functions that I have that work on RDDs, for using them on groups

Re: RDDs

2015-03-03 Thread Manas Kar
The above is a great example using thread.
Does any one have an example using scala/Akka Future to do the same.
I am looking for an example like that which uses a Akka Future and does
something if the Future "Timesout"

On Tue, Mar 3, 2015 at 9:16 AM, Manas Kar 
wrote:

> The above is a great example using thread.
> Does any one have an example using scala/Akka Future to do the same.
> I am looking for an example like that which uses a Akka Future and does
> something if the Future "Timesout"
>
> On Tue, Mar 3, 2015 at 7:00 AM, Kartheek.R 
> wrote:
>
>> Hi TD,
>> "You can always run two jobs on the same cached RDD, and they can run in
>> parallel (assuming you launch the 2 jobs from two different threads)"
>>
>> Is this a correct way to launch jobs from two different threads?
>>
>> val threadA = new Thread(new Runnable {
>>   def run() {
>>   for(i<- 0 until end)
>>   {
>> val numAs = logData.filter(line => line.contains("a"))
>> println("Lines with a: %s".format(numAs.count))
>>   }
>>  }
>> })
>>
>>val threadB = new Thread(new Runnable {
>>   def run() {
>>   for(i<- 0 until end)
>>   {
>> val numBs = logData.filter(line => line.contains("b"))
>> println("Lines with b: %s".format(numBs.count))
>>   }
>>   }
>> })
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/RDDs-tp13343p21892.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: RDDs

2015-03-03 Thread Manas Kar
The above is a great example using thread.
Does any one have an example using scala/Akka Future to do the same.
I am looking for an example like that which uses a Akka Future and does
something if the Future "Timesout"

On Tue, Mar 3, 2015 at 7:00 AM, Kartheek.R  wrote:

> Hi TD,
> "You can always run two jobs on the same cached RDD, and they can run in
> parallel (assuming you launch the 2 jobs from two different threads)"
>
> Is this a correct way to launch jobs from two different threads?
>
> val threadA = new Thread(new Runnable {
>   def run() {
>   for(i<- 0 until end)
>   {
> val numAs = logData.filter(line => line.contains("a"))
> println("Lines with a: %s".format(numAs.count))
>   }
>  }
> })
>
>val threadB = new Thread(new Runnable {
>   def run() {
>   for(i<- 0 until end)
>   {
> val numBs = logData.filter(line => line.contains("b"))
> println("Lines with b: %s".format(numBs.count))
>   }
>   }
> })
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/RDDs-tp13343p21892.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: RDDs

2015-03-03 Thread Kartheek.R
Hi TD,
"You can always run two jobs on the same cached RDD, and they can run in
parallel (assuming you launch the 2 jobs from two different threads)"

Is this a correct way to launch jobs from two different threads?

val threadA = new Thread(new Runnable { 
  def run() {
  for(i<- 0 until end)
  {  
val numAs = logData.filter(line => line.contains("a")) 
println("Lines with a: %s".format(numAs.count)) 
  } 
 } 
}) 

   val threadB = new Thread(new Runnable { 
  def run() { 
  for(i<- 0 until end)
  {
val numBs = logData.filter(line => line.contains("b")) 
println("Lines with b: %s".format(numBs.count)) 
  }
  } 
}) 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/RDDs-tp13343p21892.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Columnar-Oriented RDDs

2015-03-01 Thread Koert Kuipers
Hey,
I do not have any statistics. I just wanted to show it can be done but left
it at that. The memory usage should be predictable: the benefit comes from
using arrays for primitive types. Accessing the data row-wise means
re-assembling the rows from the columnar data, which i have not tried to
profile or optimize at all yet, but for sure there should be some overhead
compared to an row-oriented RDD.
Also the format relies on compile-time types (which is what allows the
usage of arrays for primitive types).

On Sun, Mar 1, 2015 at 6:33 AM, Night Wolf  wrote:

> Thanks for the comments guys.
>
> Parquet is awesome. My question with using Parquet for on disk storage -
> how should I load that into memory as a spark RDD and cache it and keep it
> in a columnar format?
>
> I know I can use Spark SQL with parquet which is awesome. But as soon as I
> step out of SQL we have problems as it kinda gets converted back to a row
> oriented format.
>
> @Koert - that looks really exciting. Do you have any statistics on memory
> and scan performance?
>
>
> On Saturday, February 14, 2015, Koert Kuipers  wrote:
>
>> i wrote a proof of concept to automatically store any RDD of tuples or
>> case classes in columar format using arrays (and strongly typed, so you get
>> the benefit of primitive arrays). see:
>> https://github.com/tresata/spark-columnar
>>
>> On Fri, Feb 13, 2015 at 3:06 PM, Michael Armbrust > > wrote:
>>
>>> Shark's in-memory code was ported to Spark SQL and is used by default
>>> when you run .cache on a SchemaRDD or CACHE TABLE.
>>>
>>> I'd also look at parquet which is more efficient and handles nested data
>>> better.
>>>
>>> On Fri, Feb 13, 2015 at 7:36 AM, Night Wolf 
>>> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I'd like to build/use column oriented RDDs in some of my Spark code. A
>>>> normal Spark RDD is stored as row oriented object if I understand
>>>> correctly.
>>>>
>>>> I'd like to leverage some of the advantages of a columnar memory
>>>> format. Shark (used to) and SparkSQL uses a columnar storage format using
>>>> primitive arrays for each column.
>>>>
>>>> I'd be interested to know more about this approach and how I could
>>>> build my own custom columnar-oriented RDD which I can use outside of Spark
>>>> SQL.
>>>>
>>>> Could anyone give me some pointers on where to look to do something
>>>> like this, either from scratch or using whats there in the SparkSQL libs or
>>>> elsewhere. I know Evan Chan in a presentation made mention of building a
>>>> custom RDD of column-oriented blocks of data.
>>>>
>>>> Cheers,
>>>> ~N
>>>>
>>>
>>>
>>


Re: Columnar-Oriented RDDs

2015-03-01 Thread Night Wolf
Thanks for the comments guys.

Parquet is awesome. My question with using Parquet for on disk storage -
how should I load that into memory as a spark RDD and cache it and keep it
in a columnar format?

I know I can use Spark SQL with parquet which is awesome. But as soon as I
step out of SQL we have problems as it kinda gets converted back to a row
oriented format.

@Koert - that looks really exciting. Do you have any statistics on memory
and scan performance?

On Saturday, February 14, 2015, Koert Kuipers  wrote:

> i wrote a proof of concept to automatically store any RDD of tuples or
> case classes in columar format using arrays (and strongly typed, so you get
> the benefit of primitive arrays). see:
> https://github.com/tresata/spark-columnar
>
> On Fri, Feb 13, 2015 at 3:06 PM, Michael Armbrust  > wrote:
>
>> Shark's in-memory code was ported to Spark SQL and is used by default
>> when you run .cache on a SchemaRDD or CACHE TABLE.
>>
>> I'd also look at parquet which is more efficient and handles nested data
>> better.
>>
>> On Fri, Feb 13, 2015 at 7:36 AM, Night Wolf > > wrote:
>>
>>> Hi all,
>>>
>>> I'd like to build/use column oriented RDDs in some of my Spark code. A
>>> normal Spark RDD is stored as row oriented object if I understand
>>> correctly.
>>>
>>> I'd like to leverage some of the advantages of a columnar memory format.
>>> Shark (used to) and SparkSQL uses a columnar storage format using primitive
>>> arrays for each column.
>>>
>>> I'd be interested to know more about this approach and how I could build
>>> my own custom columnar-oriented RDD which I can use outside of Spark SQL.
>>>
>>> Could anyone give me some pointers on where to look to do something like
>>> this, either from scratch or using whats there in the SparkSQL libs or
>>> elsewhere. I know Evan Chan in a presentation made mention of building a
>>> custom RDD of column-oriented blocks of data.
>>>
>>> Cheers,
>>> ~N
>>>
>>
>>
>


Re: Iterating on RDDs

2015-02-27 Thread Vijayasarathy Kannan
As you suggested, I tried to save the grouped RDD and persisted it in
memory before the iterations begin. The performance seems to be much better
now.

My previous comment that the run times doubled was from a wrong observation.

Thanks.


On Fri, Feb 27, 2015 at 10:27 AM, Vijayasarathy Kannan 
wrote:

> Thanks.
>
> I tried persist() on the RDD. The runtimes appear to have doubled now
> (without persist() it was ~7s per iteration and now its ~15s). I am running
> standalone Spark on a 8-core machine.
> Any thoughts on why the increase in runtime?
>
> On Thu, Feb 26, 2015 at 4:27 PM, Imran Rashid 
> wrote:
>
>>
>> val grouped = R.groupBy[VertexId](G).persist(StorageLeve.MEMORY_ONLY_SER)
>> // or whatever persistence makes more sense for you ...
>> while(true) {
>>   val res = grouped.flatMap(F)
>>   res.collect.foreach(func)
>>   if(criteria)
>>  break
>> }
>>
>> On Thu, Feb 26, 2015 at 10:56 AM, Vijayasarathy Kannan 
>> wrote:
>>
>>> Hi,
>>>
>>> I have the following use case.
>>>
>>> (1) I have an RDD of edges of a graph (say R).
>>> (2) do a groupBy on R (by say source vertex) and call a function F on
>>> each group.
>>> (3) collect the results from Fs and do some computation
>>> (4) repeat the above steps until some criteria is met
>>>
>>> In (2), the groups are always going to be the same (since R is grouped
>>> by source vertex).
>>>
>>> Question:
>>> Is R distributed every iteration (when in (2)) or is it distributed only
>>> once when it is created?
>>>
>>> A sample code snippet is below.
>>>
>>> while(true) {
>>>   val res = R.groupBy[VertexId](G).flatMap(F)
>>>   res.collect.foreach(func)
>>>   if(criteria)
>>>  break
>>> }
>>>
>>> Since the groups remain the same, what is the best way to go about
>>> implementing the above logic?
>>>
>>
>>
>


Re: Iterating on RDDs

2015-02-27 Thread Vijayasarathy Kannan
Thanks.

I tried persist() on the RDD. The runtimes appear to have doubled now
(without persist() it was ~7s per iteration and now its ~15s). I am running
standalone Spark on a 8-core machine.
Any thoughts on why the increase in runtime?

On Thu, Feb 26, 2015 at 4:27 PM, Imran Rashid  wrote:

>
> val grouped = R.groupBy[VertexId](G).persist(StorageLeve.MEMORY_ONLY_SER)
> // or whatever persistence makes more sense for you ...
> while(true) {
>   val res = grouped.flatMap(F)
>   res.collect.foreach(func)
>   if(criteria)
>  break
> }
>
> On Thu, Feb 26, 2015 at 10:56 AM, Vijayasarathy Kannan 
> wrote:
>
>> Hi,
>>
>> I have the following use case.
>>
>> (1) I have an RDD of edges of a graph (say R).
>> (2) do a groupBy on R (by say source vertex) and call a function F on
>> each group.
>> (3) collect the results from Fs and do some computation
>> (4) repeat the above steps until some criteria is met
>>
>> In (2), the groups are always going to be the same (since R is grouped by
>> source vertex).
>>
>> Question:
>> Is R distributed every iteration (when in (2)) or is it distributed only
>> once when it is created?
>>
>> A sample code snippet is below.
>>
>> while(true) {
>>   val res = R.groupBy[VertexId](G).flatMap(F)
>>   res.collect.foreach(func)
>>   if(criteria)
>>  break
>> }
>>
>> Since the groups remain the same, what is the best way to go about
>> implementing the above logic?
>>
>
>


Re: Iterating on RDDs

2015-02-26 Thread Imran Rashid
val grouped = R.groupBy[VertexId](G).persist(StorageLeve.MEMORY_ONLY_SER)
// or whatever persistence makes more sense for you ...
while(true) {
  val res = grouped.flatMap(F)
  res.collect.foreach(func)
  if(criteria)
 break
}

On Thu, Feb 26, 2015 at 10:56 AM, Vijayasarathy Kannan 
wrote:

> Hi,
>
> I have the following use case.
>
> (1) I have an RDD of edges of a graph (say R).
> (2) do a groupBy on R (by say source vertex) and call a function F on each
> group.
> (3) collect the results from Fs and do some computation
> (4) repeat the above steps until some criteria is met
>
> In (2), the groups are always going to be the same (since R is grouped by
> source vertex).
>
> Question:
> Is R distributed every iteration (when in (2)) or is it distributed only
> once when it is created?
>
> A sample code snippet is below.
>
> while(true) {
>   val res = R.groupBy[VertexId](G).flatMap(F)
>   res.collect.foreach(func)
>   if(criteria)
>  break
> }
>
> Since the groups remain the same, what is the best way to go about
> implementing the above logic?
>


Iterating on RDDs

2015-02-26 Thread Vijayasarathy Kannan
Hi,

I have the following use case.

(1) I have an RDD of edges of a graph (say R).
(2) do a groupBy on R (by say source vertex) and call a function F on each
group.
(3) collect the results from Fs and do some computation
(4) repeat the above steps until some criteria is met

In (2), the groups are always going to be the same (since R is grouped by
source vertex).

Question:
Is R distributed every iteration (when in (2)) or is it distributed only
once when it is created?

A sample code snippet is below.

while(true) {
  val res = R.groupBy[VertexId](G).flatMap(F)
  res.collect.foreach(func)
  if(criteria)
 break
}

Since the groups remain the same, what is the best way to go about
implementing the above logic?


Re: Spark Streaming - Collecting RDDs into array in the driver program

2015-02-25 Thread Tobias Pfeiffer
Hi,

On Thu, Feb 26, 2015 at 11:24 AM, Thanigai Vellore <
thanigai.vell...@gmail.com> wrote:

> It appears that the function immediately returns even before the
> foreachrdd stage is executed. Is that possible?
>
Sure, that's exactly what happens. foreachRDD() schedules a computation, it
does not perform it. Maybe your streaming application would not ever
terminate, but still the function needs to return, right?

If you remove the toArray(), you will return a reference to the ArrayBuffer
that will be appended to over time. You can then, in a different thread,
check the contents of that ArrayBuffer as processing happens, or wait until
processing ends.

Tobias


Re: Spark Streaming - Collecting RDDs into array in the driver program

2015-02-25 Thread Thanigai Vellore
I didn't include the complete driver code but I do run the streaming
context from the main program which calls this function. Again, I can print
the red elements within the foreachrdd block but the array that is returned
is always empty. It appears that the function immediately returns even
before the foreachrdd stage is executed. Is that possible?
On Feb 25, 2015 5:41 PM, "Tathagata Das"  wrote:

> You are just setting up the computation here using foreacRDD. You have not
> even run the streaming context to get any data.
>
>
> On Wed, Feb 25, 2015 at 2:21 PM, Thanigai Vellore <
> thanigai.vell...@gmail.com> wrote:
>
>> I have this function in the driver program which collects the result from
>> rdds (in a stream) into an array and return. However, even though the RDDs
>> (in the dstream) have data, the function is returning an empty array...What
>> am I doing wrong?
>>
>> I can print the RDD values inside the foreachRDD call but the array is
>> always empty.
>>
>> def runTopFunction() : Array[(String, Int)] = {
>> val topSearches = some function
>> val summary = new ArrayBuffer[(String,Int)]()
>> topSearches.foreachRDD(rdd => {
>> summary = summary.++(rdd.collect())
>> })
>>
>> return summary.toArray
>> }
>>
>>
>


Re: Spark Streaming - Collecting RDDs into array in the driver program

2015-02-25 Thread Tathagata Das
You are just setting up the computation here using foreacRDD. You have not
even run the streaming context to get any data.


On Wed, Feb 25, 2015 at 2:21 PM, Thanigai Vellore <
thanigai.vell...@gmail.com> wrote:

> I have this function in the driver program which collects the result from
> rdds (in a stream) into an array and return. However, even though the RDDs
> (in the dstream) have data, the function is returning an empty array...What
> am I doing wrong?
>
> I can print the RDD values inside the foreachRDD call but the array is
> always empty.
>
> def runTopFunction() : Array[(String, Int)] = {
> val topSearches = some function
> val summary = new ArrayBuffer[(String,Int)]()
> topSearches.foreachRDD(rdd => {
> summary = summary.++(rdd.collect())
> })
>
> return summary.toArray
> }
>
>


Spark Streaming - Collecting RDDs into array in the driver program

2015-02-25 Thread Thanigai Vellore
I have this function in the driver program which collects the result from
rdds (in a stream) into an array and return. However, even though the RDDs
(in the dstream) have data, the function is returning an empty array...What
am I doing wrong?

I can print the RDD values inside the foreachRDD call but the array is
always empty.

def runTopFunction() : Array[(String, Int)] = {
val topSearches = some function
val summary = new ArrayBuffer[(String,Int)]()
topSearches.foreachRDD(rdd => {
summary = summary.++(rdd.collect())
})

return summary.toArray
}


Re: Executors dropping all memory stored RDDs?

2015-02-24 Thread Thomas Gerber
I have a strong suspicion that it was caused by a disk full on the executor.
I am not sure if the executor was supposed to recover that way from it.

I cannot be sure about it, I should have had enough disk space, but I think
I had some data skew which could have lead to some executor to run out of
disk.

So, in case someone else notices a behavior like this, make sure you check
your cluster monitor (like ganglia).

On Wed, Jan 28, 2015 at 5:40 PM, Thomas Gerber 
wrote:

> Hello,
>
> I am storing RDDs with the MEMORY_ONLY_SER Storage Level, during the run
> of a big job.
>
> At some point during the job, I went to the Executors page, and saw that
> 80% of my executors did not have stored RDDs anymore (executors.png). On
> the storage page, everything seems "there" (storage.png).
>
> But if I look at a given RDD (RDD_83.png), although it tells me on top
> that all 100 partitions are cached, when I look at the details, only 17 are
> actually stored (RDD_83_partitions), all on the 20% of executors that still
> had stored RDDs based on the Executors page.
>
> So I wonder:
> 1. Are those RDD still cached (in which case, we have a small reporting
> error), or not?
> 2. If not, what could cause an executor to drop its memory-stored RDD
> blocks?
>
> I guess a restart of an executor? When I compare an executor that seems to
> have dropped blocks vs one that has not:
> *** their
> *spark-hadoop-org.apache.spark.deploy.worker.Worker-1-ip-XX-XX-XX-XX.ec2.internal.out*
> content look the same
> *** they both have the same etime in ps (so, I guess no restart?)
> *** didn't see anything in the app log in the work folder (but it is
> large, so I might have missed it)
>
> Also, I must mention that the cluster was doing a lot of GCs, which might
> be a cause of the trouble.
>
> I would appreciate any pointer.
> Thomas
>
>


Re: Is there a limit to the number of RDDs in a Spark context?

2015-02-18 Thread Juan Rodríguez Hortalá
Hi Sean,

Thanks a lot for your answer. That explains it, as I was creating thousands
of RDDs, so I guess the communication overhead was the reason why the Spark
job was freezing. After changing the code to use RDDs of pairs and
aggregateByKey it works just fine, and quite fast.

Again, thanks a lot for your help.

Greetings,

Juan

2015-02-18 15:35 GMT+01:00 Sean Owen :

> At some level, enough RDDs creates hundreds of thousands of tiny
> partitions of data each of which creates a task for each stage. The
> raw overhead of all the message passing can slow things down a lot. I
> would not design something to use an RDD per key. You would generally
> use key by some value you want to divide and filter on, and then use a
> *ByKey to do your work.
>
> You don't work with individual RDDs this way, but usually that's good
> news. You usually have a lot more flexibility operating just in pure
> Java / Scala to do whatever you need inside your function.
>
> On Wed, Feb 18, 2015 at 2:12 PM, Juan Rodríguez Hortalá
>  wrote:
> > Hi Paweł,
> >
> > Thanks a lot for your answer. I finally got the program to work by using
> > aggregateByKey, but I was wondering why creating thousands of RDDs
> doesn't
> > work. I think that could be interesting for using methods that work on
> RDDs
> > like for example JavaDoubleRDD.stats() (
> >
> http://spark.apache.org/docs/latest/api/java/org/apache/spark/api/java/JavaDoubleRDD.html#stats%28%29
> ).
> > If the groups are small then I can chain groupBy(), collect(),
> parallelize()
> > and stats(), but that is quite inefficient because it implies moving
> data to
> > and from the driver, and also doesn't scale to big groups; on the other
> hand
> > if I use aggregateByKey or a similar function then I cannot use stats()
> so I
> > have to reimplement it. In general I was looking for a way to reuse other
> > functions that I have that work on RDDs, for using them on groups of
> data in
> > a RDD, because I don't see a how to directly apply them to each of the
> > groups in a pair RDD.
> >
> > Again, thanks a lot for your answer,
> >
> > Greetings,
> >
> > Juan Rodriguez
> >
> >
> >
> >
> > 2015-02-18 14:56 GMT+01:00 Paweł Szulc :
> >>
> >> Maybe you can omit using grouping all together with groupByKey? What is
> >> your next step after grouping elements by key? Are you trying to reduce
> >> values? If so then I would recommend using some reducing functions like
> for
> >> example reduceByKey or aggregateByKey. Those will first reduce value for
> >> each key locally on each node before doing actual IO over the network.
> There
> >> will also be no grouping phase so you will not run into memory issues.
> >>
> >> Please let me know if that helped
> >>
> >> Pawel Szulc
> >> @rabbitonweb
> >> http://www.rabbitonweb.com
> >>
> >>
> >> On Wed, Feb 18, 2015 at 12:06 PM, Juan Rodríguez Hortalá
> >>  wrote:
> >>>
> >>> Hi,
> >>>
> >>> I'm writing a Spark program where I want to divide a RDD into different
> >>> groups, but the groups are too big to use groupByKey. To cope with
> that,
> >>> since I know in advance the list of keys for each group, I build a map
> from
> >>> the keys to the RDDs that result from filtering the input RDD to get
> the
> >>> records for the corresponding key. This works when I have a small
> number of
> >>> keys, but for big number of keys (tens of thousands) the execution gets
> >>> stuck, without issuing any new Spark stage. I suspect the reason is
> that the
> >>> Spark scheduler is not able to handle so many RDDs. Does it make
> sense? I'm
> >>> rewriting the program to use a single RDD of pairs, with cached
> partions,
> >>> but I wanted to be sure I understand the problem here.
> >>>
> >>> Thanks a lot in advance,
> >>>
> >>> Greetings,
> >>>
> >>> Juan Rodriguez
> >>
> >>
> >
>


Re: Is there a limit to the number of RDDs in a Spark context?

2015-02-18 Thread Sean Owen
At some level, enough RDDs creates hundreds of thousands of tiny
partitions of data each of which creates a task for each stage. The
raw overhead of all the message passing can slow things down a lot. I
would not design something to use an RDD per key. You would generally
use key by some value you want to divide and filter on, and then use a
*ByKey to do your work.

You don't work with individual RDDs this way, but usually that's good
news. You usually have a lot more flexibility operating just in pure
Java / Scala to do whatever you need inside your function.

On Wed, Feb 18, 2015 at 2:12 PM, Juan Rodríguez Hortalá
 wrote:
> Hi Paweł,
>
> Thanks a lot for your answer. I finally got the program to work by using
> aggregateByKey, but I was wondering why creating thousands of RDDs doesn't
> work. I think that could be interesting for using methods that work on RDDs
> like for example JavaDoubleRDD.stats() (
> http://spark.apache.org/docs/latest/api/java/org/apache/spark/api/java/JavaDoubleRDD.html#stats%28%29).
> If the groups are small then I can chain groupBy(), collect(), parallelize()
> and stats(), but that is quite inefficient because it implies moving data to
> and from the driver, and also doesn't scale to big groups; on the other hand
> if I use aggregateByKey or a similar function then I cannot use stats() so I
> have to reimplement it. In general I was looking for a way to reuse other
> functions that I have that work on RDDs, for using them on groups of data in
> a RDD, because I don't see a how to directly apply them to each of the
> groups in a pair RDD.
>
> Again, thanks a lot for your answer,
>
> Greetings,
>
> Juan Rodriguez
>
>
>
>
> 2015-02-18 14:56 GMT+01:00 Paweł Szulc :
>>
>> Maybe you can omit using grouping all together with groupByKey? What is
>> your next step after grouping elements by key? Are you trying to reduce
>> values? If so then I would recommend using some reducing functions like for
>> example reduceByKey or aggregateByKey. Those will first reduce value for
>> each key locally on each node before doing actual IO over the network. There
>> will also be no grouping phase so you will not run into memory issues.
>>
>> Please let me know if that helped
>>
>> Pawel Szulc
>> @rabbitonweb
>> http://www.rabbitonweb.com
>>
>>
>> On Wed, Feb 18, 2015 at 12:06 PM, Juan Rodríguez Hortalá
>>  wrote:
>>>
>>> Hi,
>>>
>>> I'm writing a Spark program where I want to divide a RDD into different
>>> groups, but the groups are too big to use groupByKey. To cope with that,
>>> since I know in advance the list of keys for each group, I build a map from
>>> the keys to the RDDs that result from filtering the input RDD to get the
>>> records for the corresponding key. This works when I have a small number of
>>> keys, but for big number of keys (tens of thousands) the execution gets
>>> stuck, without issuing any new Spark stage. I suspect the reason is that the
>>> Spark scheduler is not able to handle so many RDDs. Does it make sense? I'm
>>> rewriting the program to use a single RDD of pairs, with cached partions,
>>> but I wanted to be sure I understand the problem here.
>>>
>>> Thanks a lot in advance,
>>>
>>> Greetings,
>>>
>>> Juan Rodriguez
>>
>>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Is there a limit to the number of RDDs in a Spark context?

2015-02-18 Thread Juan Rodríguez Hortalá
Hi Paweł,

Thanks a lot for your answer. I finally got the program to work by using
aggregateByKey, but I was wondering why creating thousands of RDDs doesn't
work. I think that could be interesting for using methods that work on RDDs
like for example JavaDoubleRDD.stats() (
http://spark.apache.org/docs/latest/api/java/org/apache/spark/api/java/JavaDoubleRDD.html#stats%28%29).
If the groups are small then I can chain groupBy(), collect(),
parallelize() and stats(), but that is quite inefficient because it implies
moving data to and from the driver, and also doesn't scale to big groups;
on the other hand if I use aggregateByKey or a similar function then I
cannot use stats() so I have to reimplement it. In general I was looking
for a way to reuse other functions that I have that work on RDDs, for using
them on groups of data in a RDD, because I don't see a how to directly
apply them to each of the groups in a pair RDD.

Again, thanks a lot for your answer,

Greetings,

Juan Rodriguez




2015-02-18 14:56 GMT+01:00 Paweł Szulc :

> Maybe you can omit using grouping all together with groupByKey? What is
> your next step after grouping elements by key? Are you trying to reduce
> values? If so then I would recommend using some reducing functions like for
> example reduceByKey or aggregateByKey. Those will first reduce value for
> each key locally on each node before doing actual IO over the network.
> There will also be no grouping phase so you will not run into memory issues.
>
> Please let me know if that helped
>
> Pawel Szulc
> @rabbitonweb
> http://www.rabbitonweb.com
>
>
> On Wed, Feb 18, 2015 at 12:06 PM, Juan Rodríguez Hortalá <
> juan.rodriguez.hort...@gmail.com> wrote:
>
>> Hi,
>>
>> I'm writing a Spark program where I want to divide a RDD into different
>> groups, but the groups are too big to use groupByKey. To cope with that,
>> since I know in advance the list of keys for each group, I build a map from
>> the keys to the RDDs that result from filtering the input RDD to get the
>> records for the corresponding key. This works when I have a small number of
>> keys, but for big number of keys (tens of thousands) the execution gets
>> stuck, without issuing any new Spark stage. I suspect the reason is that
>> the Spark scheduler is not able to handle so many RDDs. Does it make sense?
>> I'm rewriting the program to use a single RDD of pairs, with cached
>> partions, but I wanted to be sure I understand the problem here.
>>
>> Thanks a lot in advance,
>>
>> Greetings,
>>
>> Juan Rodriguez
>>
>
>


Re: Is there a limit to the number of RDDs in a Spark context?

2015-02-18 Thread Paweł Szulc
Maybe you can omit using grouping all together with groupByKey? What is
your next step after grouping elements by key? Are you trying to reduce
values? If so then I would recommend using some reducing functions like for
example reduceByKey or aggregateByKey. Those will first reduce value for
each key locally on each node before doing actual IO over the network.
There will also be no grouping phase so you will not run into memory issues.

Please let me know if that helped

Pawel Szulc
@rabbitonweb
http://www.rabbitonweb.com


On Wed, Feb 18, 2015 at 12:06 PM, Juan Rodríguez Hortalá <
juan.rodriguez.hort...@gmail.com> wrote:

> Hi,
>
> I'm writing a Spark program where I want to divide a RDD into different
> groups, but the groups are too big to use groupByKey. To cope with that,
> since I know in advance the list of keys for each group, I build a map from
> the keys to the RDDs that result from filtering the input RDD to get the
> records for the corresponding key. This works when I have a small number of
> keys, but for big number of keys (tens of thousands) the execution gets
> stuck, without issuing any new Spark stage. I suspect the reason is that
> the Spark scheduler is not able to handle so many RDDs. Does it make sense?
> I'm rewriting the program to use a single RDD of pairs, with cached
> partions, but I wanted to be sure I understand the problem here.
>
> Thanks a lot in advance,
>
> Greetings,
>
> Juan Rodriguez
>


Re: Creating RDDs from within foreachPartition() [Spark-Streaming]

2015-02-18 Thread Sean Owen
You can't use RDDs inside RDDs. RDDs are managed from the driver, and
functions like foreachRDD execute things on the remote executors.

You can write code to simply directly save whatever you want to ES.
There is not necessarily a need to use RDDs for that.

On Wed, Feb 18, 2015 at 11:36 AM, t1ny  wrote:
> Hi all,
>
> I am trying to create RDDs from within /rdd.foreachPartition()/ so I can
> save these RDDs to ElasticSearch on the fly :
>
> stream.foreachRDD(rdd => {
> rdd.foreachPartition {
>   iterator => {
> val sc = rdd.context
> iterator.foreach {
>   case (cid, sid, ts) => {
>
> [...]
>
> sc.makeRDD(...).saveToEs(...) <- *throws a
> NullPointerException (sc is null)*
>   }
> }
>   }
> }
> }
>
> Unfortunately this doesn't work as I can't seem to be able to access the
> SparkContext from anywhere within /foreachPartition()/. The code above
> throws a NullPointerException, but if I change it to ssc.sc.makeRDD (where
> ssc is the StreamingContext object created in the main function, outside of
> /foreachPartition/) then I get a NotSerializableException.
>
> What is the correct way to do this ?
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Creating-RDDs-from-within-foreachPartition-Spark-Streaming-tp21700.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Creating RDDs from within foreachPartition() [Spark-Streaming]

2015-02-18 Thread t1ny
Hi all,

I am trying to create RDDs from within /rdd.foreachPartition()/ so I can
save these RDDs to ElasticSearch on the fly :

stream.foreachRDD(rdd => {
rdd.foreachPartition {
  iterator => {
val sc = rdd.context
iterator.foreach {
  case (cid, sid, ts) => {

[...]
  
sc.makeRDD(...).saveToEs(...) <- *throws a
NullPointerException (sc is null)*
  }
}
  }
}
}

Unfortunately this doesn't work as I can't seem to be able to access the
SparkContext from anywhere within /foreachPartition()/. The code above
throws a NullPointerException, but if I change it to ssc.sc.makeRDD (where
ssc is the StreamingContext object created in the main function, outside of
/foreachPartition/) then I get a NotSerializableException.

What is the correct way to do this ?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Creating-RDDs-from-within-foreachPartition-Spark-Streaming-tp21700.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Is there a limit to the number of RDDs in a Spark context?

2015-02-18 Thread Juan Rodríguez Hortalá
Hi,

I'm writing a Spark program where I want to divide a RDD into different
groups, but the groups are too big to use groupByKey. To cope with that,
since I know in advance the list of keys for each group, I build a map from
the keys to the RDDs that result from filtering the input RDD to get the
records for the corresponding key. This works when I have a small number of
keys, but for big number of keys (tens of thousands) the execution gets
stuck, without issuing any new Spark stage. I suspect the reason is that
the Spark scheduler is not able to handle so many RDDs. Does it make sense?
I'm rewriting the program to use a single RDD of pairs, with cached
partions, but I wanted to be sure I understand the problem here.

Thanks a lot in advance,

Greetings,

Juan Rodriguez


Re: Shuffle on joining two RDDs

2015-02-16 Thread Davies Liu
This will be fixed by https://github.com/apache/spark/pull/4629

On Fri, Feb 13, 2015 at 10:43 AM, Imran Rashid  wrote:
> yeah I thought the same thing at first too, I suggested something equivalent
> w/ preservesPartitioning = true, but that isn't enough.  the join is done by
> union-ing the two transformed rdds, which is very different from the way it
> works under the hood in scala to enable narrow dependencies.  It really
> needs a bigger change to pyspark.  I filed this issue:
> https://issues.apache.org/jira/browse/SPARK-5785
>
> (and the somewhat related issue about documentation:
> https://issues.apache.org/jira/browse/SPARK-5786)
>
> partitioning should still work in pyspark, you still need some notion of
> distributing work, and the pyspark functions have a partitionFunc to decide
> that.  But, I am not an authority on pyspark, so perhaps there are more
> holes I'm not aware of ...
>
> Imran
>
> On Fri, Feb 13, 2015 at 8:36 AM, Karlson  wrote:
>>
>> In https://github.com/apache/spark/blob/master/python/pyspark/join.py#L38,
>> wouldn't it help to change the lines
>>
>> vs = rdd.map(lambda (k, v): (k, (1, v)))
>> ws = other.map(lambda (k, v): (k, (2, v)))
>>
>> to
>>
>> vs = rdd.mapValues(lambda v: (1, v))
>> ws = other.mapValues(lambda v: (2, v))
>>
>> ?
>> As I understand, this would preserve the original partitioning.
>>
>>
>>
>> On 2015-02-13 12:43, Karlson wrote:
>>>
>>> Does that mean partitioning does not work in Python? Or does this only
>>> effect joining?
>>>
>>> On 2015-02-12 19:27, Davies Liu wrote:
>>>>
>>>> The feature works as expected in Scala/Java, but not implemented in
>>>> Python.
>>>>
>>>> On Thu, Feb 12, 2015 at 9:24 AM, Imran Rashid 
>>>> wrote:
>>>>>
>>>>> I wonder if the issue is that these lines just need to add
>>>>> preservesPartitioning = true
>>>>> ?
>>>>>
>>>>> https://github.com/apache/spark/blob/master/python/pyspark/join.py#L38
>>>>>
>>>>> I am getting the feeling this is an issue w/ pyspark
>>>>>
>>>>>
>>>>> On Thu, Feb 12, 2015 at 10:43 AM, Imran Rashid 
>>>>> wrote:
>>>>>>
>>>>>>
>>>>>> ah, sorry I am not too familiar w/ pyspark, sorry I missed that part.
>>>>>> It
>>>>>> could be that pyspark doesn't properly support narrow dependencies, or
>>>>>> maybe
>>>>>> you need to be more explicit about the partitioner.  I am looking into
>>>>>> the
>>>>>> pyspark api but you might have some better guesses here than I
>>>>>> thought.
>>>>>>
>>>>>> My suggestion to do
>>>>>>
>>>>>> joinedRdd.getPartitions.foreach{println}
>>>>>>
>>>>>> was just to see if the partition was a NarrowCoGroupSplitDep or a
>>>>>> ShuffleCoGroupSplitDep -- but it was actually a bad suggestion, those
>>>>>> fields
>>>>>> are hidden deeper inside and are not user-visible.  But I think a
>>>>>> better way
>>>>>> (in scala, anyway) is to look at rdd.dependencies.  its a little
>>>>>> tricky,
>>>>>> though, you need to look deep into the lineage (example at the end).
>>>>>>
>>>>>> Sean -- yes it does require both RDDs have the same partitioner, but
>>>>>> that
>>>>>> should happen naturally if you just specify the same number of
>>>>>> partitions,
>>>>>> you'll get equal HashPartitioners.  There is a little difference in
>>>>>> the
>>>>>> scala & python api that I missed here.  For partitionBy in scala, you
>>>>>> actually need to specify the partitioner, but not in python.  However
>>>>>> I
>>>>>> thought it would work like groupByKey, which does just take an int.
>>>>>>
>>>>>>
>>>>>> Here's a code example in scala -- not sure what is available from
>>>>>> python.
>>>>>> Hopefully somebody knows a simpler way to confirm narrow
>>>>>> dependencies??
>>>>>>
>>>>>>> val d = sc.parallelize(1 to 1e6.toInt).map{x => x ->
>>>>>>>

Re: Columnar-Oriented RDDs

2015-02-13 Thread Koert Kuipers
i wrote a proof of concept to automatically store any RDD of tuples or case
classes in columar format using arrays (and strongly typed, so you get the
benefit of primitive arrays). see:
https://github.com/tresata/spark-columnar

On Fri, Feb 13, 2015 at 3:06 PM, Michael Armbrust 
wrote:

> Shark's in-memory code was ported to Spark SQL and is used by default when
> you run .cache on a SchemaRDD or CACHE TABLE.
>
> I'd also look at parquet which is more efficient and handles nested data
> better.
>
> On Fri, Feb 13, 2015 at 7:36 AM, Night Wolf 
> wrote:
>
>> Hi all,
>>
>> I'd like to build/use column oriented RDDs in some of my Spark code. A
>> normal Spark RDD is stored as row oriented object if I understand
>> correctly.
>>
>> I'd like to leverage some of the advantages of a columnar memory format.
>> Shark (used to) and SparkSQL uses a columnar storage format using primitive
>> arrays for each column.
>>
>> I'd be interested to know more about this approach and how I could build
>> my own custom columnar-oriented RDD which I can use outside of Spark SQL.
>>
>> Could anyone give me some pointers on where to look to do something like
>> this, either from scratch or using whats there in the SparkSQL libs or
>> elsewhere. I know Evan Chan in a presentation made mention of building a
>> custom RDD of column-oriented blocks of data.
>>
>> Cheers,
>> ~N
>>
>
>


Re: Columnar-Oriented RDDs

2015-02-13 Thread Michael Armbrust
Shark's in-memory code was ported to Spark SQL and is used by default when
you run .cache on a SchemaRDD or CACHE TABLE.

I'd also look at parquet which is more efficient and handles nested data
better.

On Fri, Feb 13, 2015 at 7:36 AM, Night Wolf  wrote:

> Hi all,
>
> I'd like to build/use column oriented RDDs in some of my Spark code. A
> normal Spark RDD is stored as row oriented object if I understand
> correctly.
>
> I'd like to leverage some of the advantages of a columnar memory format.
> Shark (used to) and SparkSQL uses a columnar storage format using primitive
> arrays for each column.
>
> I'd be interested to know more about this approach and how I could build
> my own custom columnar-oriented RDD which I can use outside of Spark SQL.
>
> Could anyone give me some pointers on where to look to do something like
> this, either from scratch or using whats there in the SparkSQL libs or
> elsewhere. I know Evan Chan in a presentation made mention of building a
> custom RDD of column-oriented blocks of data.
>
> Cheers,
> ~N
>


Re: Shuffle on joining two RDDs

2015-02-13 Thread Imran Rashid
yeah I thought the same thing at first too, I suggested something
equivalent w/ preservesPartitioning = true, but that isn't enough.  the
join is done by union-ing the two transformed rdds, which is very different
from the way it works under the hood in scala to enable narrow
dependencies.  It really needs a bigger change to pyspark.  I filed this
issue: https://issues.apache.org/jira/browse/SPARK-5785

(and the somewhat related issue about documentation:
https://issues.apache.org/jira/browse/SPARK-5786)

partitioning should still work in pyspark, you still need some notion of
distributing work, and the pyspark functions have a partitionFunc to decide
that.  But, I am not an authority on pyspark, so perhaps there are more
holes I'm not aware of ...

Imran

On Fri, Feb 13, 2015 at 8:36 AM, Karlson  wrote:

> In https://github.com/apache/spark/blob/master/python/pyspark/join.py#L38,
> wouldn't it help to change the lines
>
> vs = rdd.map(lambda (k, v): (k, (1, v)))
> ws = other.map(lambda (k, v): (k, (2, v)))
>
> to
>
> vs = rdd.mapValues(lambda v: (1, v))
> ws = other.mapValues(lambda v: (2, v))
>
> ?
> As I understand, this would preserve the original partitioning.
>
>
>
> On 2015-02-13 12:43, Karlson wrote:
>
>> Does that mean partitioning does not work in Python? Or does this only
>> effect joining?
>>
>> On 2015-02-12 19:27, Davies Liu wrote:
>>
>>> The feature works as expected in Scala/Java, but not implemented in
>>> Python.
>>>
>>> On Thu, Feb 12, 2015 at 9:24 AM, Imran Rashid 
>>> wrote:
>>>
>>>> I wonder if the issue is that these lines just need to add
>>>> preservesPartitioning = true
>>>> ?
>>>>
>>>> https://github.com/apache/spark/blob/master/python/pyspark/join.py#L38
>>>>
>>>> I am getting the feeling this is an issue w/ pyspark
>>>>
>>>>
>>>> On Thu, Feb 12, 2015 at 10:43 AM, Imran Rashid 
>>>> wrote:
>>>>
>>>>>
>>>>> ah, sorry I am not too familiar w/ pyspark, sorry I missed that part.
>>>>> It
>>>>> could be that pyspark doesn't properly support narrow dependencies, or
>>>>> maybe
>>>>> you need to be more explicit about the partitioner.  I am looking into
>>>>> the
>>>>> pyspark api but you might have some better guesses here than I thought.
>>>>>
>>>>> My suggestion to do
>>>>>
>>>>> joinedRdd.getPartitions.foreach{println}
>>>>>
>>>>> was just to see if the partition was a NarrowCoGroupSplitDep or a
>>>>> ShuffleCoGroupSplitDep -- but it was actually a bad suggestion, those
>>>>> fields
>>>>> are hidden deeper inside and are not user-visible.  But I think a
>>>>> better way
>>>>> (in scala, anyway) is to look at rdd.dependencies.  its a little
>>>>> tricky,
>>>>> though, you need to look deep into the lineage (example at the end).
>>>>>
>>>>> Sean -- yes it does require both RDDs have the same partitioner, but
>>>>> that
>>>>> should happen naturally if you just specify the same number of
>>>>> partitions,
>>>>> you'll get equal HashPartitioners.  There is a little difference in the
>>>>> scala & python api that I missed here.  For partitionBy in scala, you
>>>>> actually need to specify the partitioner, but not in python.  However I
>>>>> thought it would work like groupByKey, which does just take an int.
>>>>>
>>>>>
>>>>> Here's a code example in scala -- not sure what is available from
>>>>> python.
>>>>> Hopefully somebody knows a simpler way to confirm narrow dependencies??
>>>>>
>>>>>  val d = sc.parallelize(1 to 1e6.toInt).map{x => x -> x}.groupByKey(64)
>>>>>> val d2 = sc.parallelize(3 to 1e6.toInt).map{x => x ->
>>>>>> x}.groupByKey(64)
>>>>>> scala> d.partitioner == d2.partitioner
>>>>>> res2: Boolean = true
>>>>>> val joined = d.join(d2)
>>>>>> val d3 = sc.parallelize(3 to 1e6.toInt).map{x => x ->
>>>>>> x}.groupByKey(100)
>>>>>> val badJoined = d.join(d3)
>>>>>>
>>>>>> d.setName("d")
>>>>>> d2.setName("d2")
>>>>>> d3.setName("d3")
>

Columnar-Oriented RDDs

2015-02-13 Thread Night Wolf
Hi all,

I'd like to build/use column oriented RDDs in some of my Spark code. A
normal Spark RDD is stored as row oriented object if I understand
correctly.

I'd like to leverage some of the advantages of a columnar memory format.
Shark (used to) and SparkSQL uses a columnar storage format using primitive
arrays for each column.

I'd be interested to know more about this approach and how I could build my
own custom columnar-oriented RDD which I can use outside of Spark SQL.

Could anyone give me some pointers on where to look to do something like
this, either from scratch or using whats there in the SparkSQL libs or
elsewhere. I know Evan Chan in a presentation made mention of building a
custom RDD of column-oriented blocks of data.

Cheers,
~N


Re: Shuffle on joining two RDDs

2015-02-13 Thread Karlson
In 
https://github.com/apache/spark/blob/master/python/pyspark/join.py#L38, 
wouldn't it help to change the lines


vs = rdd.map(lambda (k, v): (k, (1, v)))
ws = other.map(lambda (k, v): (k, (2, v)))

to

vs = rdd.mapValues(lambda v: (1, v))
ws = other.mapValues(lambda v: (2, v))

?
As I understand, this would preserve the original partitioning.


On 2015-02-13 12:43, Karlson wrote:

Does that mean partitioning does not work in Python? Or does this only
effect joining?

On 2015-02-12 19:27, Davies Liu wrote:
The feature works as expected in Scala/Java, but not implemented in 
Python.


On Thu, Feb 12, 2015 at 9:24 AM, Imran Rashid  
wrote:

I wonder if the issue is that these lines just need to add
preservesPartitioning = true
?

https://github.com/apache/spark/blob/master/python/pyspark/join.py#L38

I am getting the feeling this is an issue w/ pyspark


On Thu, Feb 12, 2015 at 10:43 AM, Imran Rashid  
wrote:


ah, sorry I am not too familiar w/ pyspark, sorry I missed that 
part.  It
could be that pyspark doesn't properly support narrow dependencies, 
or maybe
you need to be more explicit about the partitioner.  I am looking 
into the
pyspark api but you might have some better guesses here than I 
thought.


My suggestion to do

joinedRdd.getPartitions.foreach{println}

was just to see if the partition was a NarrowCoGroupSplitDep or a
ShuffleCoGroupSplitDep -- but it was actually a bad suggestion, 
those fields
are hidden deeper inside and are not user-visible.  But I think a 
better way
(in scala, anyway) is to look at rdd.dependencies.  its a little 
tricky,

though, you need to look deep into the lineage (example at the end).

Sean -- yes it does require both RDDs have the same partitioner, but 
that
should happen naturally if you just specify the same number of 
partitions,
you'll get equal HashPartitioners.  There is a little difference in 
the
scala & python api that I missed here.  For partitionBy in scala, 
you
actually need to specify the partitioner, but not in python.  
However I

thought it would work like groupByKey, which does just take an int.


Here's a code example in scala -- not sure what is available from 
python.
Hopefully somebody knows a simpler way to confirm narrow 
dependencies??


val d = sc.parallelize(1 to 1e6.toInt).map{x => x -> 
x}.groupByKey(64)
val d2 = sc.parallelize(3 to 1e6.toInt).map{x => x -> 
x}.groupByKey(64)

scala> d.partitioner == d2.partitioner
res2: Boolean = true
val joined = d.join(d2)
val d3 = sc.parallelize(3 to 1e6.toInt).map{x => x -> 
x}.groupByKey(100)

val badJoined = d.join(d3)

d.setName("d")
d2.setName("d2")
d3.setName("d3")
joined.setName("joined")
badJoined.setName("badJoined")


//unfortunatley, just looking at the immediate dependencies of 
joined &

badJoined is misleading, b/c join actually creates
// one more step after the shuffle
scala> joined.dependencies
res20: Seq[org.apache.spark.Dependency[_]] =
List(org.apache.spark.OneToOneDependency@74751ac8)
//even with the join that does require a shuffle, we still see a
OneToOneDependency, but thats just a simple flatMap step
scala> badJoined.dependencies
res21: Seq[org.apache.spark.Dependency[_]] =
List(org.apache.spark.OneToOneDependency@1cf356cc)






 //so lets make a helper function to get all the dependencies 
recursively


def flattenDeps(rdd: RDD[_]): Seq[(RDD[_], Dependency[_])] = {
  val deps = rdd.dependencies
  deps.map{rdd -> _} ++ deps.flatMap{dep => flattenDeps(dep.rdd)}
}


//full dependencies of the good join

scala> flattenDeps(joined).foreach{println}
(joined FlatMappedValuesRDD[9] at join at
:16,org.apache.spark.OneToOneDependency@74751ac8)
(MappedValuesRDD[8] at join at
:16,org.apache.spark.OneToOneDependency@623264af)
(CoGroupedRDD[7] at join at
:16,org.apache.spark.OneToOneDependency@5a704f86)
(CoGroupedRDD[7] at join at
:16,org.apache.spark.OneToOneDependency@37514cd)
(d ShuffledRDD[3] at groupByKey at
:12,org.apache.spark.ShuffleDependency@7ba8a080)
(MappedRDD[2] at map at
:12,org.apache.spark.OneToOneDependency@7bc172ec)
(d2 ShuffledRDD[6] at groupByKey at
:12,org.apache.spark.ShuffleDependency@5960236d)
(MappedRDD[5] at map at
:12,org.apache.spark.OneToOneDependency@36b5f6f2)



//full dependencies of the bad join -- notice the 
ShuffleDependency!


scala> flattenDeps(badJoined).foreach{println}
(badJoined FlatMappedValuesRDD[15] at join at
:16,org.apache.spark.OneToOneDependency@1cf356cc)
(MappedValuesRDD[14] at join at
:16,org.apache.spark.OneToOneDependency@5dea4db)
(CoGroupedRDD[13] at join at
:16,org.apache.spark.ShuffleDependency@5c1928df)
(CoGroupedRDD[13] at join at
:16,org.apache.spark.OneToOneDependency@77ca77b5)
(d ShuffledRDD[3] at groupByKey at
:12,org.apache.spark.ShuffleDependency@7ba8a080)
(MappedRDD[2] at map at
:12,org.apache.spark.OneToOneDependency@7bc172ec)
(d3 ShuffledRDD[12] at groupByKey at
:12,org.apache.spark.

Re: Shuffle on joining two RDDs

2015-02-13 Thread Karlson
Does that mean partitioning does not work in Python? Or does this only 
effect joining?


On 2015-02-12 19:27, Davies Liu wrote:
The feature works as expected in Scala/Java, but not implemented in 
Python.


On Thu, Feb 12, 2015 at 9:24 AM, Imran Rashid  
wrote:

I wonder if the issue is that these lines just need to add
preservesPartitioning = true
?

https://github.com/apache/spark/blob/master/python/pyspark/join.py#L38

I am getting the feeling this is an issue w/ pyspark


On Thu, Feb 12, 2015 at 10:43 AM, Imran Rashid  
wrote:


ah, sorry I am not too familiar w/ pyspark, sorry I missed that part. 
 It
could be that pyspark doesn't properly support narrow dependencies, 
or maybe
you need to be more explicit about the partitioner.  I am looking 
into the
pyspark api but you might have some better guesses here than I 
thought.


My suggestion to do

joinedRdd.getPartitions.foreach{println}

was just to see if the partition was a NarrowCoGroupSplitDep or a
ShuffleCoGroupSplitDep -- but it was actually a bad suggestion, those 
fields
are hidden deeper inside and are not user-visible.  But I think a 
better way
(in scala, anyway) is to look at rdd.dependencies.  its a little 
tricky,

though, you need to look deep into the lineage (example at the end).

Sean -- yes it does require both RDDs have the same partitioner, but 
that
should happen naturally if you just specify the same number of 
partitions,
you'll get equal HashPartitioners.  There is a little difference in 
the

scala & python api that I missed here.  For partitionBy in scala, you
actually need to specify the partitioner, but not in python.  However 
I

thought it would work like groupByKey, which does just take an int.


Here's a code example in scala -- not sure what is available from 
python.
Hopefully somebody knows a simpler way to confirm narrow 
dependencies??


val d = sc.parallelize(1 to 1e6.toInt).map{x => x -> 
x}.groupByKey(64)
val d2 = sc.parallelize(3 to 1e6.toInt).map{x => x -> 
x}.groupByKey(64)

scala> d.partitioner == d2.partitioner
res2: Boolean = true
val joined = d.join(d2)
val d3 = sc.parallelize(3 to 1e6.toInt).map{x => x -> 
x}.groupByKey(100)

val badJoined = d.join(d3)

d.setName("d")
d2.setName("d2")
d3.setName("d3")
joined.setName("joined")
badJoined.setName("badJoined")


//unfortunatley, just looking at the immediate dependencies of 
joined &

badJoined is misleading, b/c join actually creates
// one more step after the shuffle
scala> joined.dependencies
res20: Seq[org.apache.spark.Dependency[_]] =
List(org.apache.spark.OneToOneDependency@74751ac8)
//even with the join that does require a shuffle, we still see a
OneToOneDependency, but thats just a simple flatMap step
scala> badJoined.dependencies
res21: Seq[org.apache.spark.Dependency[_]] =
List(org.apache.spark.OneToOneDependency@1cf356cc)






 //so lets make a helper function to get all the dependencies 
recursively


def flattenDeps(rdd: RDD[_]): Seq[(RDD[_], Dependency[_])] = {
  val deps = rdd.dependencies
  deps.map{rdd -> _} ++ deps.flatMap{dep => flattenDeps(dep.rdd)}
}


//full dependencies of the good join

scala> flattenDeps(joined).foreach{println}
(joined FlatMappedValuesRDD[9] at join at
:16,org.apache.spark.OneToOneDependency@74751ac8)
(MappedValuesRDD[8] at join at
:16,org.apache.spark.OneToOneDependency@623264af)
(CoGroupedRDD[7] at join at
:16,org.apache.spark.OneToOneDependency@5a704f86)
(CoGroupedRDD[7] at join at
:16,org.apache.spark.OneToOneDependency@37514cd)
(d ShuffledRDD[3] at groupByKey at
:12,org.apache.spark.ShuffleDependency@7ba8a080)
(MappedRDD[2] at map at
:12,org.apache.spark.OneToOneDependency@7bc172ec)
(d2 ShuffledRDD[6] at groupByKey at
:12,org.apache.spark.ShuffleDependency@5960236d)
(MappedRDD[5] at map at
:12,org.apache.spark.OneToOneDependency@36b5f6f2)



//full dependencies of the bad join -- notice the ShuffleDependency!

scala> flattenDeps(badJoined).foreach{println}
(badJoined FlatMappedValuesRDD[15] at join at
:16,org.apache.spark.OneToOneDependency@1cf356cc)
(MappedValuesRDD[14] at join at
:16,org.apache.spark.OneToOneDependency@5dea4db)
(CoGroupedRDD[13] at join at
:16,org.apache.spark.ShuffleDependency@5c1928df)
(CoGroupedRDD[13] at join at
:16,org.apache.spark.OneToOneDependency@77ca77b5)
(d ShuffledRDD[3] at groupByKey at
:12,org.apache.spark.ShuffleDependency@7ba8a080)
(MappedRDD[2] at map at
:12,org.apache.spark.OneToOneDependency@7bc172ec)
(d3 ShuffledRDD[12] at groupByKey at
:12,org.apache.spark.ShuffleDependency@d794984)
(MappedRDD[11] at map at
:12,org.apache.spark.OneToOneDependency@15c98005)




On Thu, Feb 12, 2015 at 10:05 AM, Karlson  
wrote:


Hi Imran,

thanks for your quick reply.

Actually I am doing this:

rddA = rddA.partitionBy(n).cache()
rddB = rddB.partitionBy(n).cache()

followed by

rddA.count()
rddB.count()

then joinedRDD = rddA.join(rddB)

I thought that the 

Re: Shuffle on joining two RDDs

2015-02-12 Thread Davies Liu
The feature works as expected in Scala/Java, but not implemented in Python.

On Thu, Feb 12, 2015 at 9:24 AM, Imran Rashid  wrote:
> I wonder if the issue is that these lines just need to add
> preservesPartitioning = true
> ?
>
> https://github.com/apache/spark/blob/master/python/pyspark/join.py#L38
>
> I am getting the feeling this is an issue w/ pyspark
>
>
> On Thu, Feb 12, 2015 at 10:43 AM, Imran Rashid  wrote:
>>
>> ah, sorry I am not too familiar w/ pyspark, sorry I missed that part.  It
>> could be that pyspark doesn't properly support narrow dependencies, or maybe
>> you need to be more explicit about the partitioner.  I am looking into the
>> pyspark api but you might have some better guesses here than I thought.
>>
>> My suggestion to do
>>
>> joinedRdd.getPartitions.foreach{println}
>>
>> was just to see if the partition was a NarrowCoGroupSplitDep or a
>> ShuffleCoGroupSplitDep -- but it was actually a bad suggestion, those fields
>> are hidden deeper inside and are not user-visible.  But I think a better way
>> (in scala, anyway) is to look at rdd.dependencies.  its a little tricky,
>> though, you need to look deep into the lineage (example at the end).
>>
>> Sean -- yes it does require both RDDs have the same partitioner, but that
>> should happen naturally if you just specify the same number of partitions,
>> you'll get equal HashPartitioners.  There is a little difference in the
>> scala & python api that I missed here.  For partitionBy in scala, you
>> actually need to specify the partitioner, but not in python.  However I
>> thought it would work like groupByKey, which does just take an int.
>>
>>
>> Here's a code example in scala -- not sure what is available from python.
>> Hopefully somebody knows a simpler way to confirm narrow dependencies??
>>
>>> val d = sc.parallelize(1 to 1e6.toInt).map{x => x -> x}.groupByKey(64)
>>> val d2 = sc.parallelize(3 to 1e6.toInt).map{x => x -> x}.groupByKey(64)
>>> scala> d.partitioner == d2.partitioner
>>> res2: Boolean = true
>>> val joined = d.join(d2)
>>> val d3 = sc.parallelize(3 to 1e6.toInt).map{x => x -> x}.groupByKey(100)
>>> val badJoined = d.join(d3)
>>>
>>> d.setName("d")
>>> d2.setName("d2")
>>> d3.setName("d3")
>>> joined.setName("joined")
>>> badJoined.setName("badJoined")
>>>
>>>
>>> //unfortunatley, just looking at the immediate dependencies of joined &
>>> badJoined is misleading, b/c join actually creates
>>> // one more step after the shuffle
>>> scala> joined.dependencies
>>> res20: Seq[org.apache.spark.Dependency[_]] =
>>> List(org.apache.spark.OneToOneDependency@74751ac8)
>>> //even with the join that does require a shuffle, we still see a
>>> OneToOneDependency, but thats just a simple flatMap step
>>> scala> badJoined.dependencies
>>> res21: Seq[org.apache.spark.Dependency[_]] =
>>> List(org.apache.spark.OneToOneDependency@1cf356cc)
>>
>>
>>
>>>
>>>  //so lets make a helper function to get all the dependencies recursively
>>>
>>> def flattenDeps(rdd: RDD[_]): Seq[(RDD[_], Dependency[_])] = {
>>>   val deps = rdd.dependencies
>>>   deps.map{rdd -> _} ++ deps.flatMap{dep => flattenDeps(dep.rdd)}
>>> }
>>>
>>>
>>> //full dependencies of the good join
>>>
>>> scala> flattenDeps(joined).foreach{println}
>>> (joined FlatMappedValuesRDD[9] at join at
>>> :16,org.apache.spark.OneToOneDependency@74751ac8)
>>> (MappedValuesRDD[8] at join at
>>> :16,org.apache.spark.OneToOneDependency@623264af)
>>> (CoGroupedRDD[7] at join at
>>> :16,org.apache.spark.OneToOneDependency@5a704f86)
>>> (CoGroupedRDD[7] at join at
>>> :16,org.apache.spark.OneToOneDependency@37514cd)
>>> (d ShuffledRDD[3] at groupByKey at
>>> :12,org.apache.spark.ShuffleDependency@7ba8a080)
>>> (MappedRDD[2] at map at
>>> :12,org.apache.spark.OneToOneDependency@7bc172ec)
>>> (d2 ShuffledRDD[6] at groupByKey at
>>> :12,org.apache.spark.ShuffleDependency@5960236d)
>>> (MappedRDD[5] at map at
>>> :12,org.apache.spark.OneToOneDependency@36b5f6f2)
>>>
>>>
>>>
>>> //full dependencies of the bad join -- notice the ShuffleDependency!
>>>
>>> scala> flattenDeps(badJoined).foreach{println}
>>> (badJoined FlatMappedValuesRDD[15] a

Re: Shuffle on joining two RDDs

2015-02-12 Thread Imran Rashid
I wonder if the issue is that these lines just need to add
preservesPartitioning = true
?

https://github.com/apache/spark/blob/master/python/pyspark/join.py#L38

I am getting the feeling this is an issue w/ pyspark


On Thu, Feb 12, 2015 at 10:43 AM, Imran Rashid  wrote:

> ah, sorry I am not too familiar w/ pyspark, sorry I missed that part.  It
> could be that pyspark doesn't properly support narrow dependencies, or
> maybe you need to be more explicit about the partitioner.  I am looking
> into the pyspark api but you might have some better guesses here than I
> thought.
>
> My suggestion to do
>
> joinedRdd.getPartitions.foreach{println}
>
> was just to see if the partition was a NarrowCoGroupSplitDep or a
> ShuffleCoGroupSplitDep -- but it was actually a bad suggestion, those
> fields are hidden deeper inside and are not user-visible.  But I think a
> better way (in scala, anyway) is to look at rdd.dependencies.  its a little
> tricky, though, you need to look deep into the lineage (example at the end).
>
> Sean -- yes it does require both RDDs have the same partitioner, but that
> should happen naturally if you just specify the same number of partitions,
> you'll get equal HashPartitioners.  There is a little difference in the
> scala & python api that I missed here.  For partitionBy in scala, you
> actually need to specify the partitioner, but not in python.  However I
> thought it would work like groupByKey, which does just take an int.
>
>
> Here's a code example in scala -- not sure what is available from python.
> Hopefully somebody knows a simpler way to confirm narrow dependencies??
>
> val d = sc.parallelize(1 to 1e6.toInt).map{x => x -> x}.groupByKey(64)
>> val d2 = sc.parallelize(3 to 1e6.toInt).map{x => x -> x}.groupByKey(64)
>> scala> d.partitioner == d2.partitioner
>> res2: Boolean = true
>> val joined = d.join(d2)
>> val d3 = sc.parallelize(3 to 1e6.toInt).map{x => x -> x}.groupByKey(100)
>> val badJoined = d.join(d3)
>>
>> d.setName("d")
>> d2.setName("d2")
>> d3.setName("d3")
>> joined.setName("joined")
>> badJoined.setName("badJoined")
>
>
>> //unfortunatley, just looking at the immediate dependencies of joined &
>> badJoined is misleading, b/c join actually creates
>> // one more step after the shuffle
>> scala> joined.dependencies
>> res20: Seq[org.apache.spark.Dependency[_]] =
>> List(org.apache.spark.OneToOneDependency@74751ac8)
>> //even with the join that does require a shuffle, we still see a
>> OneToOneDependency, but thats just a simple flatMap step
>> scala> badJoined.dependencies
>> res21: Seq[org.apache.spark.Dependency[_]] =
>> List(org.apache.spark.OneToOneDependency@1cf356cc)
>>
>
>
>
>>  //so lets make a helper function to get all the dependencies recursively
>>
> def flattenDeps(rdd: RDD[_]): Seq[(RDD[_], Dependency[_])] = {
>>   val deps = rdd.dependencies
>>   deps.map{rdd -> _} ++ deps.flatMap{dep => flattenDeps(dep.rdd)}
>> }
>>
>>
>> //full dependencies of the good join
>
> scala> flattenDeps(joined).foreach{println}
>> (joined FlatMappedValuesRDD[9] at join at
>> :16,org.apache.spark.OneToOneDependency@74751ac8)
>> (MappedValuesRDD[8] at join at
>> :16,org.apache.spark.OneToOneDependency@623264af)
>>
>> *(CoGroupedRDD[7] at join at
>> :16,org.apache.spark.OneToOneDependency@5a704f86)*(CoGroupedRDD[7]
>> at join at :16,org.apache.spark.OneToOneDependency@37514cd)
>> (d ShuffledRDD[3] at groupByKey at
>> :12,org.apache.spark.ShuffleDependency@7ba8a080)
>> (MappedRDD[2] at map at
>> :12,org.apache.spark.OneToOneDependency@7bc172ec)
>> (d2 ShuffledRDD[6] at groupByKey at
>> :12,org.apache.spark.ShuffleDependency@5960236d)
>> (MappedRDD[5] at map at
>> :12,org.apache.spark.OneToOneDependency@36b5f6f2)
>>
>>
>
>> //full dependencies of the bad join -- notice the ShuffleDependency!
>
> scala> flattenDeps(badJoined).foreach{println}
>> (badJoined FlatMappedValuesRDD[15] at join at
>> :16,org.apache.spark.OneToOneDependency@1cf356cc)
>> (MappedValuesRDD[14] at join at
>> :16,org.apache.spark.OneToOneDependency@5dea4db)
>>
>> *(CoGroupedRDD[13] at join at
>> :16,org.apache.spark.ShuffleDependency@5c1928df)*(CoGroupedRDD[13]
>> at join at :16,org.apache.spark.OneToOneDependency@77ca77b5)
>> (d ShuffledRDD[3] at groupByKey at
>> :12,org.apache.spark.ShuffleDependency@7ba8a080)
>> (MappedRDD[2] at map at
>> :12,org.apache.spark.OneToOneDependency@7bc172ec)
>> (d3 Shuffl

Re: Shuffle on joining two RDDs

2015-02-12 Thread Imran Rashid
ah, sorry I am not too familiar w/ pyspark, sorry I missed that part.  It
could be that pyspark doesn't properly support narrow dependencies, or
maybe you need to be more explicit about the partitioner.  I am looking
into the pyspark api but you might have some better guesses here than I
thought.

My suggestion to do

joinedRdd.getPartitions.foreach{println}

was just to see if the partition was a NarrowCoGroupSplitDep or a
ShuffleCoGroupSplitDep -- but it was actually a bad suggestion, those
fields are hidden deeper inside and are not user-visible.  But I think a
better way (in scala, anyway) is to look at rdd.dependencies.  its a little
tricky, though, you need to look deep into the lineage (example at the end).

Sean -- yes it does require both RDDs have the same partitioner, but that
should happen naturally if you just specify the same number of partitions,
you'll get equal HashPartitioners.  There is a little difference in the
scala & python api that I missed here.  For partitionBy in scala, you
actually need to specify the partitioner, but not in python.  However I
thought it would work like groupByKey, which does just take an int.


Here's a code example in scala -- not sure what is available from python.
Hopefully somebody knows a simpler way to confirm narrow dependencies??

val d = sc.parallelize(1 to 1e6.toInt).map{x => x -> x}.groupByKey(64)
> val d2 = sc.parallelize(3 to 1e6.toInt).map{x => x -> x}.groupByKey(64)
> scala> d.partitioner == d2.partitioner
> res2: Boolean = true
> val joined = d.join(d2)
> val d3 = sc.parallelize(3 to 1e6.toInt).map{x => x -> x}.groupByKey(100)
> val badJoined = d.join(d3)
>
> d.setName("d")
> d2.setName("d2")
> d3.setName("d3")
> joined.setName("joined")
> badJoined.setName("badJoined")


> //unfortunatley, just looking at the immediate dependencies of joined &
> badJoined is misleading, b/c join actually creates
> // one more step after the shuffle
> scala> joined.dependencies
> res20: Seq[org.apache.spark.Dependency[_]] =
> List(org.apache.spark.OneToOneDependency@74751ac8)
> //even with the join that does require a shuffle, we still see a
> OneToOneDependency, but thats just a simple flatMap step
> scala> badJoined.dependencies
> res21: Seq[org.apache.spark.Dependency[_]] =
> List(org.apache.spark.OneToOneDependency@1cf356cc)
>



>  //so lets make a helper function to get all the dependencies recursively
>
def flattenDeps(rdd: RDD[_]): Seq[(RDD[_], Dependency[_])] = {
>   val deps = rdd.dependencies
>   deps.map{rdd -> _} ++ deps.flatMap{dep => flattenDeps(dep.rdd)}
> }
>
>
> //full dependencies of the good join

scala> flattenDeps(joined).foreach{println}
> (joined FlatMappedValuesRDD[9] at join at
> :16,org.apache.spark.OneToOneDependency@74751ac8)
> (MappedValuesRDD[8] at join at
> :16,org.apache.spark.OneToOneDependency@623264af)
>
> *(CoGroupedRDD[7] at join at
> :16,org.apache.spark.OneToOneDependency@5a704f86)*(CoGroupedRDD[7]
> at join at :16,org.apache.spark.OneToOneDependency@37514cd)
> (d ShuffledRDD[3] at groupByKey at
> :12,org.apache.spark.ShuffleDependency@7ba8a080)
> (MappedRDD[2] at map at
> :12,org.apache.spark.OneToOneDependency@7bc172ec)
> (d2 ShuffledRDD[6] at groupByKey at
> :12,org.apache.spark.ShuffleDependency@5960236d)
> (MappedRDD[5] at map at
> :12,org.apache.spark.OneToOneDependency@36b5f6f2)
>
>

> //full dependencies of the bad join -- notice the ShuffleDependency!

scala> flattenDeps(badJoined).foreach{println}
> (badJoined FlatMappedValuesRDD[15] at join at
> :16,org.apache.spark.OneToOneDependency@1cf356cc)
> (MappedValuesRDD[14] at join at
> :16,org.apache.spark.OneToOneDependency@5dea4db)
>
> *(CoGroupedRDD[13] at join at
> :16,org.apache.spark.ShuffleDependency@5c1928df)*(CoGroupedRDD[13]
> at join at :16,org.apache.spark.OneToOneDependency@77ca77b5)
> (d ShuffledRDD[3] at groupByKey at
> :12,org.apache.spark.ShuffleDependency@7ba8a080)
> (MappedRDD[2] at map at
> :12,org.apache.spark.OneToOneDependency@7bc172ec)
> (d3 ShuffledRDD[12] at groupByKey at
> :12,org.apache.spark.ShuffleDependency@d794984)
> (MappedRDD[11] at map at
> :12,org.apache.spark.OneToOneDependency@15c98005)



On Thu, Feb 12, 2015 at 10:05 AM, Karlson  wrote:

> Hi Imran,
>
> thanks for your quick reply.
>
> Actually I am doing this:
>
> rddA = rddA.partitionBy(n).cache()
> rddB = rddB.partitionBy(n).cache()
>
> followed by
>
> rddA.count()
> rddB.count()
>
> then joinedRDD = rddA.join(rddB)
>
> I thought that the count() would force the evaluation, so any subsequent
> joins would be shuffleless. I was wrong about the shuffle amounts however.
> The shuffle write is

Re: Shuffle on joining two RDDs

2015-02-12 Thread Karlson

Hi,

I believe that partitionBy will use the same (default) partitioner on 
both RDDs.


On 2015-02-12 17:12, Sean Owen wrote:

Doesn't this require that both RDDs have the same partitioner?

On Thu, Feb 12, 2015 at 3:48 PM, Imran Rashid  
wrote:

Hi Karlson,

I think your assumptions are correct -- that join alone shouldn't 
require

any shuffling.  But its possible you are getting tripped up by lazy
evaluation of RDDs.  After you do your partitionBy, are you sure those 
RDDs
are actually materialized & cached somewhere?  eg., if you just did 
this:


val rddA = someData.partitionBy(N)
val rddB = someOtherData.partitionBy(N)
val joinedRdd = rddA.join(rddB)
joinedRdd.count() //or any other action

then the partitioning isn't actually getting run until you do the 
join.  So
though the join itself can happen without partitioning, 
joinedRdd.count()
will trigger the evaluation of rddA & rddB which will require 
shuffles.

Note that even if you have some intervening action on rddA & rddB that
shuffles them, unless you persist the result, you will need to 
reshuffle

them for the join.

If this doesn't help explain things, for debugging

joinedRdd.getPartitions.foreach{println}

this is getting into the weeds, but at least this will tell us whether 
or
not you are getting narrow dependencies, which would avoid the 
shuffle.

(Does anyone know of a simpler way to check this?)

hope this helps,
Imran




On Thu, Feb 12, 2015 at 9:25 AM, Karlson  wrote:


Hi All,

using Pyspark, I create two RDDs (one with about 2M records (~200MB), 
the

other with about 8M records (~2GB)) of the format (key, value).

I've done a partitionBy(num_partitions) on both RDDs and verified 
that
both RDDs have the same number of partitions and that equal keys 
reside on

the same partition (via mapPartitionsWithIndex).

Now I'd expect that for a join on the two RDDs no shuffling is 
necessary.
Looking at the Web UI under http://driver:4040 however reveals that 
that

assumption is false.

In fact I am seeing shuffle writes of about 200MB and reads of about 
50MB.


What's the explanation for that behaviour? Where am I wrong with my
assumption?

Thanks in advance,

Karlson

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Shuffle on joining two RDDs

2015-02-12 Thread Sean Owen
Doesn't this require that both RDDs have the same partitioner?

On Thu, Feb 12, 2015 at 3:48 PM, Imran Rashid  wrote:
> Hi Karlson,
>
> I think your assumptions are correct -- that join alone shouldn't require
> any shuffling.  But its possible you are getting tripped up by lazy
> evaluation of RDDs.  After you do your partitionBy, are you sure those RDDs
> are actually materialized & cached somewhere?  eg., if you just did this:
>
> val rddA = someData.partitionBy(N)
> val rddB = someOtherData.partitionBy(N)
> val joinedRdd = rddA.join(rddB)
> joinedRdd.count() //or any other action
>
> then the partitioning isn't actually getting run until you do the join.  So
> though the join itself can happen without partitioning, joinedRdd.count()
> will trigger the evaluation of rddA & rddB which will require shuffles.
> Note that even if you have some intervening action on rddA & rddB that
> shuffles them, unless you persist the result, you will need to reshuffle
> them for the join.
>
> If this doesn't help explain things, for debugging
>
> joinedRdd.getPartitions.foreach{println}
>
> this is getting into the weeds, but at least this will tell us whether or
> not you are getting narrow dependencies, which would avoid the shuffle.
> (Does anyone know of a simpler way to check this?)
>
> hope this helps,
> Imran
>
>
>
>
> On Thu, Feb 12, 2015 at 9:25 AM, Karlson  wrote:
>>
>> Hi All,
>>
>> using Pyspark, I create two RDDs (one with about 2M records (~200MB), the
>> other with about 8M records (~2GB)) of the format (key, value).
>>
>> I've done a partitionBy(num_partitions) on both RDDs and verified that
>> both RDDs have the same number of partitions and that equal keys reside on
>> the same partition (via mapPartitionsWithIndex).
>>
>> Now I'd expect that for a join on the two RDDs no shuffling is necessary.
>> Looking at the Web UI under http://driver:4040 however reveals that that
>> assumption is false.
>>
>> In fact I am seeing shuffle writes of about 200MB and reads of about 50MB.
>>
>> What's the explanation for that behaviour? Where am I wrong with my
>> assumption?
>>
>> Thanks in advance,
>>
>> Karlson
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Shuffle on joining two RDDs

2015-02-12 Thread Karlson

Hi Imran,

thanks for your quick reply.

Actually I am doing this:

rddA = rddA.partitionBy(n).cache()
rddB = rddB.partitionBy(n).cache()

followed by

rddA.count()
rddB.count()

then joinedRDD = rddA.join(rddB)

I thought that the count() would force the evaluation, so any subsequent 
joins would be shuffleless. I was wrong about the shuffle amounts 
however. The shuffle write is actually 2GB (i.e. the size of the bigger 
RDD) whil there is no Shuffle read. A joinedRdd.count() does a shuffle 
read of about 1GB in size, though.


The getPartitions-method does not exist on the resulting RDD (I am using 
the Python API). There is however foreachPartition(). What is the line


joinedRdd.getPartitions.foreach{println}

supposed to do?

Thank you,

Karlson

PS: Sorry for sending this twice, I accidentally did not reply to the 
mailing list first.



On 2015-02-12 16:48, Imran Rashid wrote:

Hi Karlson,

I think your assumptions are correct -- that join alone shouldn't 
require

any shuffling.  But its possible you are getting tripped up by lazy
evaluation of RDDs.  After you do your partitionBy, are you sure those 
RDDs
are actually materialized & cached somewhere?  eg., if you just did 
this:


val rddA = someData.partitionBy(N)
val rddB = someOtherData.partitionBy(N)
val joinedRdd = rddA.join(rddB)
joinedRdd.count() //or any other action

then the partitioning isn't actually getting run until you do the join. 
 So
though the join itself can happen without partitioning, 
joinedRdd.count()

will trigger the evaluation of rddA & rddB which will require shuffles.
Note that even if you have some intervening action on rddA & rddB that
shuffles them, unless you persist the result, you will need to 
reshuffle

them for the join.

If this doesn't help explain things, for debugging

joinedRdd.getPartitions.foreach{println}

this is getting into the weeds, but at least this will tell us whether 
or

not you are getting narrow dependencies, which would avoid the shuffle.
 (Does anyone know of a simpler way to check this?)

hope this helps,
Imran




On Thu, Feb 12, 2015 at 9:25 AM, Karlson  wrote:


Hi All,

using Pyspark, I create two RDDs (one with about 2M records (~200MB), 
the

other with about 8M records (~2GB)) of the format (key, value).

I've done a partitionBy(num_partitions) on both RDDs and verified that
both RDDs have the same number of partitions and that equal keys 
reside on

the same partition (via mapPartitionsWithIndex).

Now I'd expect that for a join on the two RDDs no shuffling is 
necessary.
Looking at the Web UI under http://driver:4040 however reveals that 
that

assumption is false.

In fact I am seeing shuffle writes of about 200MB and reads of about 
50MB.


What's the explanation for that behaviour? Where am I wrong with my
assumption?

Thanks in advance,

Karlson

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Shuffle on joining two RDDs

2015-02-12 Thread Imran Rashid
Hi Karlson,

I think your assumptions are correct -- that join alone shouldn't require
any shuffling.  But its possible you are getting tripped up by lazy
evaluation of RDDs.  After you do your partitionBy, are you sure those RDDs
are actually materialized & cached somewhere?  eg., if you just did this:

val rddA = someData.partitionBy(N)
val rddB = someOtherData.partitionBy(N)
val joinedRdd = rddA.join(rddB)
joinedRdd.count() //or any other action

then the partitioning isn't actually getting run until you do the join.  So
though the join itself can happen without partitioning, joinedRdd.count()
will trigger the evaluation of rddA & rddB which will require shuffles.
Note that even if you have some intervening action on rddA & rddB that
shuffles them, unless you persist the result, you will need to reshuffle
them for the join.

If this doesn't help explain things, for debugging

joinedRdd.getPartitions.foreach{println}

this is getting into the weeds, but at least this will tell us whether or
not you are getting narrow dependencies, which would avoid the shuffle.
 (Does anyone know of a simpler way to check this?)

hope this helps,
Imran




On Thu, Feb 12, 2015 at 9:25 AM, Karlson  wrote:

> Hi All,
>
> using Pyspark, I create two RDDs (one with about 2M records (~200MB), the
> other with about 8M records (~2GB)) of the format (key, value).
>
> I've done a partitionBy(num_partitions) on both RDDs and verified that
> both RDDs have the same number of partitions and that equal keys reside on
> the same partition (via mapPartitionsWithIndex).
>
> Now I'd expect that for a join on the two RDDs no shuffling is necessary.
> Looking at the Web UI under http://driver:4040 however reveals that that
> assumption is false.
>
> In fact I am seeing shuffle writes of about 200MB and reads of about 50MB.
>
> What's the explanation for that behaviour? Where am I wrong with my
> assumption?
>
> Thanks in advance,
>
> Karlson
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Shuffle on joining two RDDs

2015-02-12 Thread Karlson

Hi All,

using Pyspark, I create two RDDs (one with about 2M records (~200MB), 
the other with about 8M records (~2GB)) of the format (key, value).


I've done a partitionBy(num_partitions) on both RDDs and verified that 
both RDDs have the same number of partitions and that equal keys reside 
on the same partition (via mapPartitionsWithIndex).


Now I'd expect that for a join on the two RDDs no shuffling is 
necessary. Looking at the Web UI under http://driver:4040 however 
reveals that that assumption is false.


In fact I am seeing shuffle writes of about 200MB and reads of about 
50MB.


What's the explanation for that behaviour? Where am I wrong with my 
assumption?


Thanks in advance,

Karlson

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Zipping RDDs of equal size not possible

2015-02-05 Thread Niklas Wilcke
Hi Xiangrui,

I'm sorry. I didn't recognize your mail.
What I did is a workaround only working for my special case.
It does not scale and only works for small data sets but that is fine
for me so far.

Kind Regards,
Niklas

  def securlyZipRdds[A, B: ClassTag](rdd1: RDD[A], rdd2: RDD[B]):
RDD[(A, B)] = {
val rdd1Repartitioned = rdd1.repartition(1)
val rdd2Repartitioned = rdd2.repartition(1)
val (rdd1Balanced, rdd2Balanced) =
balanceRddSizes(rdd1Repartitioned, rdd2Repartitioned)
rdd1Balanced.zip(rdd2Balanced)
  }

  def balanceRddSizes[A, B](rdd1: RDD[A], rdd2: RDD[B]): (RDD[A],
RDD[B]) = {
val rdd1count = rdd1.count()
val rdd2count = rdd2.count()
val difference = math.abs(rdd1count - rdd2count).toInt
if (rdd1count > rdd2count) {
  (removeRandomElements(rdd1, difference), rdd2)
} else if (rdd2count > rdd1count) {
  (rdd1, removeRandomElements(rdd2, difference))
} else {
  (rdd1, rdd2)
}
  }

  def removeRandomElements[A](rdd: RDD[A], numberOfElements: Int):
RDD[A] = {
val sample: Array[A] = rdd.takeSample(false, numberOfElements)
val set: Set[A] = Set(sample: _*)
rdd.filter(x => if (set.contains(x)) false else true)
  }

On 10.01.2015 06:56, Xiangrui Meng wrote:
> "sample 2 * n tuples, split them into two parts, balance the sizes of
> these parts by filtering some tuples out"
> 
> How do you guarantee that the two RDDs have the same size?
> 
> -Xiangrui
> 
> On Fri, Jan 9, 2015 at 3:40 AM, Niklas Wilcke
> <1wil...@informatik.uni-hamburg.de> wrote:
>> Hi Spark community,
>>
>> I have a problem with zipping two RDDs of the same size and same number of
>> partitions.
>> The error message says that zipping is only allowed on RDDs which are
>> partitioned into chunks of exactly the same sizes.
>> How can I assure this? My workaround at the moment is to repartition both
>> RDDs to only one partition but that obviously
>> does not scale.
>>
>> This problem originates from my problem to draw n random tuple pairs (Tuple,
>> Tuple) from an RDD[Tuple].
>> What I do is to sample 2 * n tuples, split them into two parts, balance the
>> sizes of these parts
>> by filtering some tuples out and zipping them together.
>>
>> I would appreciate to read better approaches for both problems.
>>
>> Thanks in advance,
>> Niklas

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: New combination-like RDD based on two RDDs

2015-02-04 Thread dash
Problem solved. A simple join will do the work

val prefix = new PairRDDFunctions[Int, Set[Int]](sc.parallelize(List((9,
Set(4)), (1,Set(3)), (2,Set(5)), (2,Set(4)

val suffix = sc.parallelize(List((1, Set(1)), (2, Set(6)), (2, Set(5)), (2,
Set(7

prefix.join(suffix).collect().foreach(println)





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/New-combination-like-RDD-based-on-two-RDDs-tp21508p21511.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: New combination-like RDD based on two RDDs

2015-02-04 Thread Jim Green
You should use join:
val rdd1 = sc.parallelize(List((1,(3)), (2,(5)), (3,(6
val rdd2 = sc.parallelize(List((2,(1)), (2,(3)), (3,(9

rdd1.join(rdd2).collect
res0: Array[(Int, (Int, Int))] = Array((2,(5,1)), (2,(5,3)), (3,(6,9)))

Please see my cheat sheet at
* 3.14 join(otherDataset, [numTasks])*
http://www.openkb.info/2015/01/scala-on-spark-cheatsheet.html


On Wed, Feb 4, 2015 at 3:52 PM, dash  wrote:

> Hey Spark gurus! Sorry for the confusing title. I do not know the exactly
> description of my problem, if you know please tell me so I can change it
> :-)
>
> Say I have two RDDs right now, and they are
>
> val rdd1 = sc.parallelize(List((1,(3)), (2,(5)), (3,(6
> val rdd2 = sc.parallelize(List((2,(1)), (2,(3)), (3,(9
>
> I want combine rdd1 and rdd2 to get rdd3 which looks like
>
> List((1,(3)), (2,(5,1)), (2,(5,3)), (3, (6,9)))
>
> The order in _._2 does not matter, so you can treat it as a Set.
>
> I tried to use zip, but since there is no guarantee that the length of rdd1
> and rdd2 will be the same I do not know if it is doable.
>
> Also I looked into PairedRDD, some people use union operation on two RDDs
> and then apply a map function on it. Since I want all combinations
> according
> to _._1, I do not know how to achieve it by union and map.
>
> Thanks in advance!
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/New-combination-like-RDD-based-on-two-RDDs-tp21508.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Thanks,
www.openkb.info
(Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)


New combination-like RDD based on two RDDs

2015-02-04 Thread dash
Hey Spark gurus! Sorry for the confusing title. I do not know the exactly
description of my problem, if you know please tell me so I can change it :-)

Say I have two RDDs right now, and they are

val rdd1 = sc.parallelize(List((1,(3)), (2,(5)), (3,(6
val rdd2 = sc.parallelize(List((2,(1)), (2,(3)), (3,(9

I want combine rdd1 and rdd2 to get rdd3 which looks like

List((1,(3)), (2,(5,1)), (2,(5,3)), (3, (6,9)))

The order in _._2 does not matter, so you can treat it as a Set.

I tried to use zip, but since there is no guarantee that the length of rdd1
and rdd2 will be the same I do not know if it is doable.

Also I looked into PairedRDD, some people use union operation on two RDDs
and then apply a map function on it. Since I want all combinations according
to _._1, I do not know how to achieve it by union and map.

Thanks in advance!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/New-combination-like-RDD-based-on-two-RDDs-tp21508.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Joining piped RDDs

2015-02-04 Thread Pavel Velikhov
Hi!

  We have a common usecase with Spark - we go out to some database, e.g. 
Cassandra, crunch though all of its data, but along the RDD pipeline we use a 
pipe operator to some script. All the data before the pipe has some unique IDs, 
but inside the pipe everything is lost.

  The only current solution we have is to format the data into the pipe, so it 
includes the ids, and then restore it all in a map after the pipe.

  However it would be much nicer if we could just join/zip back the output of 
the pipe. However we can’t cache the RDDs, so it would be nice to have a 
forkRDD of some sort that only keeps the last partition in cache (since we’re 
guaranteed that there’ll be a zip later on and the dataflow will be 
synchronized). Or maybe we can already do this in Spark?

Thank you,
Pavel Velikhov
Chief Science Officer
TopRater
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Trying to find where Spark persists RDDs when run with YARN

2015-01-18 Thread Sean Owen
These will be under the working directory of the YARN container
running the executor. I don't have it handy but think it will also be
a "spark-local" or similar directory.

On Sun, Jan 18, 2015 at 2:50 PM, Hemanth Yamijala  wrote:
> Hi,
>
> I am trying to find where Spark persists RDDs when we call the persist() api
> and executed under YARN. This is purely for understanding...
>
> In my driver program, I wait indefinitely, so as to avoid any clean up
> problems.
>
> In the actual job, I roughly do the following:
>
> JavaRDD lines = context.textFile(args[0]);
> lines.persist(StorageLevel.DISK_ONLY());
> lines.collect();
>
> When run with local executor, I can see that the files (like rdd_1_0) are
> persisted under directories like
> /var/folders/mt/51srrjc15wl3n829qkgnh2dmgp/T/spark-local-20150118201458-6147/15.
>
> Where similarly can I find these under Yarn ?
>
> Thanks
> hemanth

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Trying to find where Spark persists RDDs when run with YARN

2015-01-18 Thread Hemanth Yamijala
Hi,

I am trying to find where Spark persists RDDs when we call the persist()
api and executed under YARN. This is purely for understanding...

In my driver program, I wait indefinitely, so as to avoid any clean up
problems.

In the actual job, I roughly do the following:

JavaRDD lines = context.textFile(args[0]);
lines.persist(StorageLevel.DISK_ONLY());
lines.collect();

When run with local executor, I can see that the files (like rdd_1_0) are
persisted under directories like
/var/folders/mt/51srrjc15wl3n829qkgnh2dmgp/T/spark-local-20150118201458-6147/15.

Where similarly can I find these under Yarn ?

Thanks
hemanth


Re: Zipping RDDs of equal size not possible

2015-01-09 Thread Xiangrui Meng
"sample 2 * n tuples, split them into two parts, balance the sizes of
these parts by filtering some tuples out"

How do you guarantee that the two RDDs have the same size?

-Xiangrui

On Fri, Jan 9, 2015 at 3:40 AM, Niklas Wilcke
<1wil...@informatik.uni-hamburg.de> wrote:
> Hi Spark community,
>
> I have a problem with zipping two RDDs of the same size and same number of
> partitions.
> The error message says that zipping is only allowed on RDDs which are
> partitioned into chunks of exactly the same sizes.
> How can I assure this? My workaround at the moment is to repartition both
> RDDs to only one partition but that obviously
> does not scale.
>
> This problem originates from my problem to draw n random tuple pairs (Tuple,
> Tuple) from an RDD[Tuple].
> What I do is to sample 2 * n tuples, split them into two parts, balance the
> sizes of these parts
> by filtering some tuples out and zipping them together.
>
> I would appreciate to read better approaches for both problems.
>
> Thanks in advance,
> Niklas

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Zipping RDDs of equal size not possible

2015-01-09 Thread Niklas Wilcke
Hi Spark community,

I have a problem with zipping two RDDs of the same size and same number
of partitions.
The error message says that zipping is only allowed on RDDs which are
partitioned into chunks of exactly the same sizes.
How can I assure this? My workaround at the moment is to repartition
both RDDs to only one partition but that obviously
does not scale.

This problem originates from my problem to draw n random tuple pairs
(Tuple, Tuple) from an RDD[Tuple].
What I do is to sample 2 * n tuples, split them into two parts, balance
the sizes of these parts
by filtering some tuples out and zipping them together.

I would appreciate to read better approaches for both problems.

Thanks in advance,
Niklas


Re: Join RDDs with DStreams

2015-01-08 Thread Akhil Das
Here's how you do it:

val joined_stream = *myStream*.transform((x: RDD[(String, String)]) =>
{  val prdd = new PairRDDFunctions[String, String](x)
prdd.join(*myRDD*)})




Thanks
Best Regards

On Thu, Jan 8, 2015 at 10:20 PM, Asim Jalis  wrote:

> Is there a way to join non-DStream RDDs with DStream RDDs?
>
> Here is the use case. I have a lookup table stored in HDFS that I want to
> read as an RDD. Then I want to join it with the RDDs that are coming in
> through the DStream. How can I do this?
>
> Thanks.
>
> Asim
>


Re: Join RDDs with DStreams

2015-01-08 Thread Gerard Maas
You are looking for dstream.transform(rdd => rdd.(otherRdd))

The docs contain an example on how to use transform.

https://spark.apache.org/docs/latest/streaming-programming-guide.html#transformations-on-dstreams

-kr, Gerard.

On Thu, Jan 8, 2015 at 5:50 PM, Asim Jalis  wrote:

> Is there a way to join non-DStream RDDs with DStream RDDs?
>
> Here is the use case. I have a lookup table stored in HDFS that I want to
> read as an RDD. Then I want to join it with the RDDs that are coming in
> through the DStream. How can I do this?
>
> Thanks.
>
> Asim
>


Join RDDs with DStreams

2015-01-08 Thread Asim Jalis
Is there a way to join non-DStream RDDs with DStream RDDs?

Here is the use case. I have a lookup table stored in HDFS that I want to
read as an RDD. Then I want to join it with the RDDs that are coming in
through the DStream. How can I do this?

Thanks.

Asim


Re: Strange DAG scheduling behavior on currently dependent RDDs

2015-01-07 Thread Corey Nolet
I asked this question too soon. I am caching off a bunch of RDDs in a
TrieMap so that our framework can wire them together and the locking was
not completely correct- therefore it was creating multiple new RDDs at
times instead of using cached versions- which were creating completely
separate lineages.

What's strange is that this bug only surfaced when I updated Spark.

On Wed, Jan 7, 2015 at 9:12 AM, Corey Nolet  wrote:

> We just updated to Spark 1.2.0 from Spark 1.1.0. We have a small framework
> that we've been developing that connects various different RDDs together
> based on some predefined business cases. After updating to 1.2.0, some of
> the concurrency expectations about how the stages within jobs are executed
> have changed quite significantly.
>
> Given 3 RDDs:
>
> RDD1 = inputFromHDFS().groupBy().sortBy().etc().cache()
> RDD2 = RDD1.outputToFile
> RDD3 = RDD1.groupBy().outputToFile
>
> In Spark 1.1.0, we expected RDD1 to be scheduled based on the first stage
> encountered (RDD2's outputToFile or RDD3's groupBy()) and then for RDD2 and
> RDD3 to both block waiting for RDD1 to complete and cache- at which point
> RDD2 and RDD3 both use the cached version to complete their work.
>
> Spark 1.2.0 seems to schedule two (be it concurrently running) stages for
> each of RDD1's stages (inputFromHDFS, groupBy(), sortBy(), etc() will each
> get run twice). It does not look like there is any sharing of the results
> between these jobs.
>
> Are we doing something wrong? Is there a setting that I'm not
> understanding somewhere?
>


Re: How to merge a RDD of RDDs into one uber RDD

2015-01-07 Thread Raghavendra Pandey
Yup, i meant union only.

On Wed, Jan 7, 2015, 16:19 Sean Owen  wrote:

> I think you mean union(). Yes, you could also simply make an RDD for each
> file, and use SparkContext.union() to put them together.
>
> On Wed, Jan 7, 2015 at 9:51 AM, Raghavendra Pandey <
> raghavendra.pan...@gmail.com> wrote:
>
>> You can also use join function of rdd. This is actually kind of append
>> funtion that add up all the rdds and create one uber rdd.
>>
>> On Wed, Jan 7, 2015, 14:30 rkgurram  wrote:
>>
>>> Thank you for the response, sure will try that out.
>>>
>>> Currently I changed my code such that the first map "files.map" to
>>> "files.flatMap", which I guess will do similar what you are saying, it
>>> gives
>>> me a List[] of elements (in this case LabeledPoints, I could also do
>>> RDDs)
>>> which I then turned into a mega RDD. The current problem seems to be
>>> gone, I
>>> no longer get the NPE but further down I am getting a indexOutOfBounds,
>>> so
>>> trying to figure out if the original problem is manifesting itself as a
>>> new
>>> one.
>>>
>>>
>>> Regards
>>> -Ravi
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context: http://apache-spark-user-list.
>>> 1001560.n3.nabble.com/How-to-merge-a-RDD-of-RDDs-into-one-
>>> uber-RDD-tp20986p21012.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>


Strange DAG scheduling behavior on currently dependent RDDs

2015-01-07 Thread Corey Nolet
We just updated to Spark 1.2.0 from Spark 1.1.0. We have a small framework
that we've been developing that connects various different RDDs together
based on some predefined business cases. After updating to 1.2.0, some of
the concurrency expectations about how the stages within jobs are executed
have changed quite significantly.

Given 3 RDDs:

RDD1 = inputFromHDFS().groupBy().sortBy().etc().cache()
RDD2 = RDD1.outputToFile
RDD3 = RDD1.groupBy().outputToFile

In Spark 1.1.0, we expected RDD1 to be scheduled based on the first stage
encountered (RDD2's outputToFile or RDD3's groupBy()) and then for RDD2 and
RDD3 to both block waiting for RDD1 to complete and cache- at which point
RDD2 and RDD3 both use the cached version to complete their work.

Spark 1.2.0 seems to schedule two (be it concurrently running) stages for
each of RDD1's stages (inputFromHDFS, groupBy(), sortBy(), etc() will each
get run twice). It does not look like there is any sharing of the results
between these jobs.

Are we doing something wrong? Is there a setting that I'm not understanding
somewhere?


Re: How to merge a RDD of RDDs into one uber RDD

2015-01-07 Thread Sean Owen
I think you mean union(). Yes, you could also simply make an RDD for each
file, and use SparkContext.union() to put them together.

On Wed, Jan 7, 2015 at 9:51 AM, Raghavendra Pandey <
raghavendra.pan...@gmail.com> wrote:

> You can also use join function of rdd. This is actually kind of append
> funtion that add up all the rdds and create one uber rdd.
>
> On Wed, Jan 7, 2015, 14:30 rkgurram  wrote:
>
>> Thank you for the response, sure will try that out.
>>
>> Currently I changed my code such that the first map "files.map" to
>> "files.flatMap", which I guess will do similar what you are saying, it
>> gives
>> me a List[] of elements (in this case LabeledPoints, I could also do RDDs)
>> which I then turned into a mega RDD. The current problem seems to be
>> gone, I
>> no longer get the NPE but further down I am getting a indexOutOfBounds, so
>> trying to figure out if the original problem is manifesting itself as a
>> new
>> one.
>>
>>
>> Regards
>> -Ravi
>>
>>
>>
>>
>> --
>> View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/How-to-merge-a-RDD-of-RDDs-into-one-
>> uber-RDD-tp20986p21012.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>


Re: How to merge a RDD of RDDs into one uber RDD

2015-01-07 Thread Raghavendra Pandey
You can also use join function of rdd. This is actually kind of append
funtion that add up all the rdds and create one uber rdd.

On Wed, Jan 7, 2015, 14:30 rkgurram  wrote:

> Thank you for the response, sure will try that out.
>
> Currently I changed my code such that the first map "files.map" to
> "files.flatMap", which I guess will do similar what you are saying, it
> gives
> me a List[] of elements (in this case LabeledPoints, I could also do RDDs)
> which I then turned into a mega RDD. The current problem seems to be gone,
> I
> no longer get the NPE but further down I am getting a indexOutOfBounds, so
> trying to figure out if the original problem is manifesting itself as a new
> one.
>
>
> Regards
> -Ravi
>
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/How-to-merge-a-RDD-of-RDDs-into-one-
> uber-RDD-tp20986p21012.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: How to merge a RDD of RDDs into one uber RDD

2015-01-07 Thread rkgurram
Thank you for the response, sure will try that out.

Currently I changed my code such that the first map "files.map" to
"files.flatMap", which I guess will do similar what you are saying, it gives
me a List[] of elements (in this case LabeledPoints, I could also do RDDs)
which I then turned into a mega RDD. The current problem seems to be gone, I
no longer get the NPE but further down I am getting a indexOutOfBounds, so
trying to figure out if the original problem is manifesting itself as a new
one.


Regards
-Ravi




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-merge-a-RDD-of-RDDs-into-one-uber-RDD-tp20986p21012.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to merge a RDD of RDDs into one uber RDD

2015-01-06 Thread k.tham
an RDD cannot contain elements of type RDD. (i.e. you can't nest RDDs within
RDDs, in fact, I don't think it makes any sense)

I suggest rather than having an RDD of file names, collect those file name
strings back on to the driver as a Scala array of file names, and then from
there, make an array of RDDs from which you can fold over them and merge
them.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-merge-a-RDD-of-RDDs-into-one-uber-RDD-tp20986p21007.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Cannot see RDDs in Spark UI

2015-01-06 Thread Andrew Ash
Hi Manoj,

I've noticed that the storage tab only shows RDDs that have been cached.
Did you call .cache() or .persist() on any of the RDDs?

Andrew

On Tue, Jan 6, 2015 at 6:48 PM, Manoj Samel 
wrote:

> Hi,
>
> I create a bunch of RDDs, including schema RDDs. When I run the program
> and go to UI on xxx:4040, the storage tab does not shows any RDDs.
>
>
> Spark version is 1.1.1 (Hadoop 2.3)
>
> Any thoughts?
>
> Thanks,
>


Cannot see RDDs in Spark UI

2015-01-06 Thread Manoj Samel
Hi,

I create a bunch of RDDs, including schema RDDs. When I run the program and
go to UI on xxx:4040, the storage tab does not shows any RDDs.


Spark version is 1.1.1 (Hadoop 2.3)

Any thoughts?

Thanks,


Exception after changing RDDs

2014-12-23 Thread kai.lu
Hi All,

We are getting exception after we added one RDD to another RDD.

We first declared an empty RDD "A", then received new Dstream "B" from
Kafka; for each RDD in the Dstream "B", we kept adding them to the existing
RDD "A". Error happened when we were trying to use the updated RDD "A".

Could anybody give some hints about what's going wrong here ? Thanks !

*Sample Code :*

val sparkConf = new SparkConf().setMaster("local[*]").setAppName(“newAPP")
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")
   ///Declare an empty RDD for A/
@volatile var rddA: RDD[A] = ssc.sparkContext.emptyRDD[A]

/   //Receives A and B data from Kafka/
val kafkaMessages = KafkaUtils.createStream(ssc, "localhost:2181",
"CONSUMERGROUP", Map(“TOPIC" -> 1)).map(_._2)….
val dstreamA: DStream[A] = kafkaMessages.map(msg =>
JSONConverter.fromJSON[A](msg)).flatMap(_.toOption)…
val dstreamB: DStream[B] = kafkaMessages.map(msg =>
JSONConverter.fromJSON[B](msg)).flatMap(_.toOption)…

 /  //Updates RDD A from DStream A/
   dstreamA.foreachRDD(rdd => rddA = rddA ++ rdd ) 

 /  //Joins DStream B with existing RDD /
   val results : Dstream[(A,B)] = dstreamB.transform(rdd => {
  val id_A: RDD[(String, A)] = rddA.keyBy(_.id)
  val id_B: RDD[(String, B)] = rdd.keyBy(_.id)
  val rddA_B: RDD[(A, B)] = id_A.join(id_B).map { case (id, (a, b)) =>
  (a, b)
  }
  rddA_B
   })


>From my observation, the exception happens when we were doing the
rddA.keyBy(_.id)

*Exception Output :*

14/12/23 15:35:23 WARN storage.BlockManager: Block input-0-1419377723400
already exists on this machine; not re-adding it
14/12/23 15:35:24 WARN util.NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
14/12/23 15:35:34 WARN storage.BlockManager: Block input-0-1419377734600
already exists on this machine; not re-adding it
14/12/23 15:35:37 ERROR scheduler.DAGSchedulerActorSupervisor:
eventProcesserActor failed; shutting down SparkContext
org.apache.spark.SparkException: Attempted to use BlockRDD[5] at BlockRDD at
ReceiverInputDStream.scala:69 after its blocks have been removed!
at org.apache.spark.rdd.BlockRDD.assertValid(BlockRDD.scala:83)
at 
org.apache.spark.rdd.BlockRDD.getPreferredLocations(BlockRDD.scala:56)
at
org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:216)
at
org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:216)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:215)
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1297)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply$mcVI$sp(DAGScheduler.scala:1307)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1306)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1306)
at scala.collection.immutable.List.foreach(List.scala:318)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1306)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1304)
at scala.collection.immutable.List.foreach(List.scala:318)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Exception-after-changing-RDDs-tp20841.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: UNION two RDDs

2014-12-22 Thread Jerry Lam
Hi Sean and Madhu,

Thank you for the explanation. I really appreciate it.

Best Regards,

Jerry


On Fri, Dec 19, 2014 at 4:50 AM, Sean Owen  wrote:

> coalesce actually changes the number of partitions. Unless the
> original RDD had just 1 partition, coalesce(1) will make an RDD with 1
> partition that is larger than the original partitions, of course.
>
> I don't think the question is about ordering of things within an
> element of the RDD?
>
> If the original RDD was sorted, and so has a defined ordering, then it
> will be preserved. Otherwise I believe you do not have any guarantees
> about ordering. In practice, you may find that you still encounter the
> elements in the same order after coalesce(1), although I am not sure
> that is even true.
>
> union() is the same story; unless the RDDs are sorted I don't think
> there are guarantees. However I'm almost certain that in practice, as
> it happens now, A's elements would come before B's after a union, if
> you did traverse them.
>
> On Fri, Dec 19, 2014 at 5:41 AM, madhu phatak 
> wrote:
> > Hi,
> > coalesce is an operation which changes no of records in a partition. It
> will
> > not touch ordering with in a row AFAIK.
> >
> > On Fri, Dec 19, 2014 at 2:22 AM, Jerry Lam  wrote:
> >>
> >> Hi Spark users,
> >>
> >> I wonder if val resultRDD = RDDA.union(RDDB) will always have records in
> >> RDDA before records in RDDB.
> >>
> >> Also, will resultRDD.coalesce(1) change this ordering?
> >>
> >> Best Regards,
> >>
> >> Jerry
> >
> >
> >
> > --
> > Regards,
> > Madhukara Phatak
> > http://www.madhukaraphatak.com
>


Re: spark-shell bug with RDDs and case classes?

2014-12-19 Thread Sean Owen
AFAIK it's a known issue of some sort in the Scala REPL, which is what
the Spark REPL is. The PR that was closed was just adding tests to
show it's a bug. I don't know if there is any workaround now.

On Fri, Dec 19, 2014 at 7:21 PM, Jay Hutfles  wrote:
> Found a problem in the spark-shell, but can't confirm that it's related to
> open issues on Spark's JIRA page.  I was wondering if anyone could help
> identify if this is an issue or if it's already being addressed.
>
> Test:  (in spark-shell)
> case class Person(name: String, age: Int)
> val peopleList = List(Person("Alice", 35), Person("Bob", 47),
> Person("Alice", 35), Person("Bob", 15))
> val peopleRDD = sc.parallelize(peopleList)
> assert(peopleList.distinct.size == peopleRDD.distinct.count)
>
>
> At first I thought it was related to issue SPARK-2620
> (https://issues.apache.org/jira/browse/SPARK-2620), which says case classes
> can't be used as keys in spark-shell due to how case classes are compiled by
> the REPL.  It lists .reduceByKey, .groupByKey and .distinct as being
> affected.  But the associated pull request for adding tests to cover this
> (https://github.com/apache/spark/pull/1588) was closed.
>
> Is this something I just have to live with when using the REPL?  Or is this
> covered by something bigger that's being addressed?
>
> Thanks in advance
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/spark-shell-bug-with-RDDs-and-case-classes-tp20789.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



spark-shell bug with RDDs and case classes?

2014-12-19 Thread Jay Hutfles
Found a problem in the spark-shell, but can't confirm that it's related to
open issues on Spark's JIRA page.  I was wondering if anyone could help
identify if this is an issue or if it's already being addressed.

Test:  (in spark-shell)
case class Person(name: String, age: Int)
val peopleList = List(Person("Alice", 35), Person("Bob", 47),
Person("Alice", 35), Person("Bob", 15))
val peopleRDD = sc.parallelize(peopleList)
assert(peopleList.distinct.size == peopleRDD.distinct.count) 


At first I thought it was related to issue SPARK-2620
(https://issues.apache.org/jira/browse/SPARK-2620), which says case classes
can't be used as keys in spark-shell due to how case classes are compiled by
the REPL.  It lists .reduceByKey, .groupByKey and .distinct as being
affected.  But the associated pull request for adding tests to cover this
(https://github.com/apache/spark/pull/1588) was closed.  

Is this something I just have to live with when using the REPL?  Or is this
covered by something bigger that's being addressed?

Thanks in advance



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-shell-bug-with-RDDs-and-case-classes-tp20789.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: UNION two RDDs

2014-12-19 Thread Sean Owen
coalesce actually changes the number of partitions. Unless the
original RDD had just 1 partition, coalesce(1) will make an RDD with 1
partition that is larger than the original partitions, of course.

I don't think the question is about ordering of things within an
element of the RDD?

If the original RDD was sorted, and so has a defined ordering, then it
will be preserved. Otherwise I believe you do not have any guarantees
about ordering. In practice, you may find that you still encounter the
elements in the same order after coalesce(1), although I am not sure
that is even true.

union() is the same story; unless the RDDs are sorted I don't think
there are guarantees. However I'm almost certain that in practice, as
it happens now, A's elements would come before B's after a union, if
you did traverse them.

On Fri, Dec 19, 2014 at 5:41 AM, madhu phatak  wrote:
> Hi,
> coalesce is an operation which changes no of records in a partition. It will
> not touch ordering with in a row AFAIK.
>
> On Fri, Dec 19, 2014 at 2:22 AM, Jerry Lam  wrote:
>>
>> Hi Spark users,
>>
>> I wonder if val resultRDD = RDDA.union(RDDB) will always have records in
>> RDDA before records in RDDB.
>>
>> Also, will resultRDD.coalesce(1) change this ordering?
>>
>> Best Regards,
>>
>> Jerry
>
>
>
> --
> Regards,
> Madhukara Phatak
> http://www.madhukaraphatak.com

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: UNION two RDDs

2014-12-18 Thread madhu phatak
Hi,
coalesce is an operation which changes no of records in a partition. It
will not touch ordering with in a row AFAIK.

On Fri, Dec 19, 2014 at 2:22 AM, Jerry Lam  wrote:
>
> Hi Spark users,
>
> I wonder if val resultRDD = RDDA.union(RDDB) will always have records in
> RDDA before records in RDDB.
>
> Also, will resultRDD.coalesce(1) change this ordering?
>
> Best Regards,
>
> Jerry
>


-- 
Regards,
Madhukara Phatak
http://www.madhukaraphatak.com


UNION two RDDs

2014-12-18 Thread Jerry Lam
Hi Spark users,

I wonder if val resultRDD = RDDA.union(RDDB) will always have records in
RDDA before records in RDDB.

Also, will resultRDD.coalesce(1) change this ordering?

Best Regards,

Jerry


Re: RDDs being cleaned too fast

2014-12-16 Thread Harihar Nahak
RDD.persist() can be useful here.

On 11 December 2014 at 14:34, ankits [via Apache Spark User List] <
ml-node+s1001560n20613...@n3.nabble.com> wrote:
>
> I'm using spark 1.1.0 and am seeing persisted RDDs being cleaned up too
> fast. How can i inspect the size of RDD in memory and get more information
> about why it was cleaned up. There should be more than enough memory
> available on the cluster to store them, and by default, the
> spark.cleaner.ttl is infinite, so I want more information about why this is
> happening and how to prevent it.
>
> Spark just logs this when removing RDDs:
>
> [2014-12-11 01:19:34,006] INFO  spark.storage.BlockManager [] [] -
> Removing RDD 33
> [2014-12-11 01:19:34,010] INFO  pache.spark.ContextCleaner []
> [akka://JobServer/user/context-supervisor/job-context1] - Cleaned RDD 33
> [2014-12-11 01:19:34,012] INFO  spark.storage.BlockManager [] [] -
> Removing RDD 33
> [2014-12-11 01:19:34,016] INFO  pache.spark.ContextCleaner []
> [akka://JobServer/user/context-supervisor/job-context1] - Cleaned RDD 33
>
> --
>  If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/RDDs-being-cleaned-too-fast-tp20613.html
>  To start a new topic under Apache Spark User List, email
> ml-node+s1001560n1...@n3.nabble.com
> To unsubscribe from Apache Spark User List, click here
> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=aG5haGFrQHd5bnlhcmRncm91cC5jb218MXwtMTgxOTE5MTkyOQ==>
> .
> NAML
> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>


-- 
Regards,
Harihar Nahak
BigData Developer
Wynyard
Email:hna...@wynyardgroup.com | Extn: 8019




-
--Harihar
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/RDDs-being-cleaned-too-fast-tp20613p20738.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: RDDs being cleaned too fast

2014-12-11 Thread Ranga
I was having similar issues with my persistent RDDs. After some digging
around, I noticed that the partitions were not balanced evenly across the
available nodes. After a "repartition", the RDD was spread evenly across
all available memory. Not sure if that is something that would help your
use-case though.
You could also increase the spark.storage.memoryFraction if that is an
option.


- Ranga

On Wed, Dec 10, 2014 at 10:23 PM, Aaron Davidson  wrote:

> The ContextCleaner uncaches RDDs that have gone out of scope on the
> driver. So it's possible that the given RDD is no longer reachable in your
> program's control flow, or else it'd be a bug in the ContextCleaner.
>
> On Wed, Dec 10, 2014 at 5:34 PM, ankits  wrote:
>
>> I'm using spark 1.1.0 and am seeing persisted RDDs being cleaned up too
>> fast.
>> How can i inspect the size of RDD in memory and get more information about
>> why it was cleaned up. There should be more than enough memory available
>> on
>> the cluster to store them, and by default, the spark.cleaner.ttl is
>> infinite, so I want more information about why this is happening and how
>> to
>> prevent it.
>>
>> Spark just logs this when removing RDDs:
>>
>> [2014-12-11 01:19:34,006] INFO  spark.storage.BlockManager [] [] -
>> Removing
>> RDD 33
>> [2014-12-11 01:19:34,010] INFO  pache.spark.ContextCleaner []
>> [akka://JobServer/user/context-supervisor/job-context1] - Cleaned RDD 33
>> [2014-12-11 01:19:34,012] INFO  spark.storage.BlockManager [] [] -
>> Removing
>> RDD 33
>> [2014-12-11 01:19:34,016] INFO  pache.spark.ContextCleaner []
>> [akka://JobServer/user/context-supervisor/job-context1] - Cleaned RDD 33
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/RDDs-being-cleaned-too-fast-tp20613.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Locking for shared RDDs

2014-12-11 Thread Tathagata Das
Aditya, I think you have the mental model of spark streaming a little
off the mark. Unlike traditional streaming systems, where any kind of
state is mutable, SparkStreaming is designed on Sparks immutable RDDs.
Streaming data is received and divided into immutable blocks, then
form immutable RDDs, and then transformations form new immutable RDDs.
Its best that you first read the Spark paper and then the Spark
Streaming paper to under the model. Once you understand that, you will
realize that since everything is immutable, the question of
consistency does not even arise :)

TD

On Mon, Dec 8, 2014 at 9:44 PM, Raghavendra Pandey
 wrote:
> You don't need to worry about locks as such as one thread/worker is
> responsible exclusively for one partition of the RDD. You can use
> Accumulator variables that spark provides to get the state updates.
>
>
> On Mon Dec 08 2014 at 8:14:28 PM aditya.athalye 
> wrote:
>>
>> I am relatively new to Spark. I am planning to use Spark Streaming for my
>> OLAP use case, but I would like to know how RDDs are shared between
>> multiple
>> workers.
>> If I need to constantly compute some stats on the streaming data,
>> presumably
>> shared state would have to updated serially by different spark workers. Is
>> this managed by Spark automatically or does the application need to ensure
>> distributed locks are acquired?
>>
>> Thanks
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Locking-for-shared-RDDs-tp20578.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: RDDs being cleaned too fast

2014-12-10 Thread Aaron Davidson
The ContextCleaner uncaches RDDs that have gone out of scope on the driver.
So it's possible that the given RDD is no longer reachable in your
program's control flow, or else it'd be a bug in the ContextCleaner.

On Wed, Dec 10, 2014 at 5:34 PM, ankits  wrote:

> I'm using spark 1.1.0 and am seeing persisted RDDs being cleaned up too
> fast.
> How can i inspect the size of RDD in memory and get more information about
> why it was cleaned up. There should be more than enough memory available on
> the cluster to store them, and by default, the spark.cleaner.ttl is
> infinite, so I want more information about why this is happening and how to
> prevent it.
>
> Spark just logs this when removing RDDs:
>
> [2014-12-11 01:19:34,006] INFO  spark.storage.BlockManager [] [] - Removing
> RDD 33
> [2014-12-11 01:19:34,010] INFO  pache.spark.ContextCleaner []
> [akka://JobServer/user/context-supervisor/job-context1] - Cleaned RDD 33
> [2014-12-11 01:19:34,012] INFO  spark.storage.BlockManager [] [] - Removing
> RDD 33
> [2014-12-11 01:19:34,016] INFO  pache.spark.ContextCleaner []
> [akka://JobServer/user/context-supervisor/job-context1] - Cleaned RDD 33
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/RDDs-being-cleaned-too-fast-tp20613.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


RDDs being cleaned too fast

2014-12-10 Thread ankits
I'm using spark 1.1.0 and am seeing persisted RDDs being cleaned up too fast.
How can i inspect the size of RDD in memory and get more information about
why it was cleaned up. There should be more than enough memory available on
the cluster to store them, and by default, the spark.cleaner.ttl is
infinite, so I want more information about why this is happening and how to
prevent it.

Spark just logs this when removing RDDs:

[2014-12-11 01:19:34,006] INFO  spark.storage.BlockManager [] [] - Removing
RDD 33
[2014-12-11 01:19:34,010] INFO  pache.spark.ContextCleaner []
[akka://JobServer/user/context-supervisor/job-context1] - Cleaned RDD 33
[2014-12-11 01:19:34,012] INFO  spark.storage.BlockManager [] [] - Removing
RDD 33
[2014-12-11 01:19:34,016] INFO  pache.spark.ContextCleaner []
[akka://JobServer/user/context-supervisor/job-context1] - Cleaned RDD 33



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/RDDs-being-cleaned-too-fast-tp20613.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Caching RDDs with shared memory - bug or feature?

2014-12-09 Thread insperatum
If all RDD elements within a partition contain pointers to a single shared
object, Spark persists as expected when the RDD is small. However, if the
RDD is more than *200 elements* then Spark reports requiring much more
memory than it actually does. This becomes a problem for large RDDs, as
Spark refuses to persist even though it can. Is this a bug or is there a
feature that I'm missing?
Cheers, Luke

*val* /n/ = ???
*class* Elem(*val* s:Array[Int])
*val* /rdd/ = /sc/.parallelize(/Seq/(1)).mapPartitions( _ => {
*val* sharedArray = Array./ofDim/[Int](1000)   /// Should require
~40MB/
(1 to /n/).toIterator.map(_ => *new* Elem(sharedArray))
}).cache().count()   /// force computation/

For n = 100: /MemoryStore: Block rdd_1_0 stored as values in memory
(estimated size *38.1 MB*, free 898.7 MB)/
For n = 200: /MemoryStore: Block rdd_1_0 stored as values in memory
(estimated size *38.2 MB*, free 898.7 MB)/
For n = 201: /MemoryStore: Block rdd_1_0 stored as values in memory
(estimated size *76.7 MB*, free 860.2 MB)/
For n = 5000: /MemoryStore: *Not enough space to cache rdd_1_0 in memory!*
(computed 781.3 MB so far)/

Note: For medium sized n (where n>200 but spark can still cache), the actual
application memory still stays where it should - Spark just seems to vastly
overreport how much memory it's using. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Caching-RDDs-with-shared-memory-bug-or-feature-tp20596.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Locking for shared RDDs

2014-12-08 Thread Raghavendra Pandey
You don't need to worry about locks as such as one thread/worker is
responsible exclusively for one partition of the RDD. You can use
Accumulator variables that spark provides to get the state updates.

On Mon Dec 08 2014 at 8:14:28 PM aditya.athalye 
wrote:

> I am relatively new to Spark. I am planning to use Spark Streaming for my
> OLAP use case, but I would like to know how RDDs are shared between
> multiple
> workers.
> If I need to constantly compute some stats on the streaming data,
> presumably
> shared state would have to updated serially by different spark workers. Is
> this managed by Spark automatically or does the application need to ensure
> distributed locks are acquired?
>
> Thanks
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Locking-for-shared-RDDs-tp20578.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Locking for shared RDDs

2014-12-08 Thread aditya.athalye
I am relatively new to Spark. I am planning to use Spark Streaming for my
OLAP use case, but I would like to know how RDDs are shared between multiple
workers. 
If I need to constantly compute some stats on the streaming data, presumably
shared state would have to updated serially by different spark workers. Is
this managed by Spark automatically or does the application need to ensure
distributed locks are acquired?

Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Locking-for-shared-RDDs-tp20578.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Running two different Spark jobs vs multi-threading RDDs

2014-12-06 Thread Aaron Davidson
You can actually submit multiple jobs to a single SparkContext in different
threads. In the case you mentioned with 2 stages having a common parent,
both will wait for the parent stage to complete and then the two will
execute in parallel, sharing the cluster resources.

Solutions that submit multiple applications are also reasonable, but then
you have to manage the job dependencies yourself.

On Sat, Dec 6, 2014 at 8:41 AM, Corey Nolet  wrote:

> Reading the documentation a little more closely, I'm using the wrong
> terminology. I'm using stages to refer to what spark is calling a job. I
> guess application (more than one spark context) is what I'm asking about
> On Dec 5, 2014 5:19 PM, "Corey Nolet"  wrote:
>
>> I've read in the documentation that RDDs can be run concurrently when
>> submitted in separate threads. I'm curious how the scheduler would handle
>> propagating these down to the tasks.
>>
>> I have 3 RDDs:
>> - one RDD which loads some initial data, transforms it and caches it
>> - two RDDs which use the cached RDD to provide reports
>>
>> I'm trying to figure out how the resources will be scheduled to perform
>> these stages if I were to concurrently run the two RDDs that depend on the
>> first RDD. Would the two RDDs run sequentially? Will they both run @ the
>> same time and be smart about how they are caching?
>>
>> Would this be a time when I'd want to use Tachyon instead and run this as
>> 2 separate physical jobs: one to place the shared data in the RAMDISK and
>> one to run the two dependent RDDs concurrently? Or would it even be best in
>> that case to run 3 completely separate jobs?
>>
>> We're planning on using YARN so there's 2 levels of scheduling going on.
>> We're trying to figure out the best way to utilize the resources so that we
>> are fully saturating the system and making sure there's constantly work
>> being done rather than anything spinning gears waiting on upstream
>> processing to occur (in mapreduce, we'd just submit a ton of jobs and have
>> them wait in line).
>>
>


Re: Running two different Spark jobs vs multi-threading RDDs

2014-12-06 Thread Corey Nolet
Reading the documentation a little more closely, I'm using the wrong
terminology. I'm using stages to refer to what spark is calling a job. I
guess application (more than one spark context) is what I'm asking about
On Dec 5, 2014 5:19 PM, "Corey Nolet"  wrote:

> I've read in the documentation that RDDs can be run concurrently when
> submitted in separate threads. I'm curious how the scheduler would handle
> propagating these down to the tasks.
>
> I have 3 RDDs:
> - one RDD which loads some initial data, transforms it and caches it
> - two RDDs which use the cached RDD to provide reports
>
> I'm trying to figure out how the resources will be scheduled to perform
> these stages if I were to concurrently run the two RDDs that depend on the
> first RDD. Would the two RDDs run sequentially? Will they both run @ the
> same time and be smart about how they are caching?
>
> Would this be a time when I'd want to use Tachyon instead and run this as
> 2 separate physical jobs: one to place the shared data in the RAMDISK and
> one to run the two dependent RDDs concurrently? Or would it even be best in
> that case to run 3 completely separate jobs?
>
> We're planning on using YARN so there's 2 levels of scheduling going on.
> We're trying to figure out the best way to utilize the resources so that we
> are fully saturating the system and making sure there's constantly work
> being done rather than anything spinning gears waiting on upstream
> processing to occur (in mapreduce, we'd just submit a ton of jobs and have
> them wait in line).
>


Running two different Spark jobs vs multi-threading RDDs

2014-12-05 Thread Corey Nolet
I've read in the documentation that RDDs can be run concurrently when
submitted in separate threads. I'm curious how the scheduler would handle
propagating these down to the tasks.

I have 3 RDDs:
- one RDD which loads some initial data, transforms it and caches it
- two RDDs which use the cached RDD to provide reports

I'm trying to figure out how the resources will be scheduled to perform
these stages if I were to concurrently run the two RDDs that depend on the
first RDD. Would the two RDDs run sequentially? Will they both run @ the
same time and be smart about how they are caching?

Would this be a time when I'd want to use Tachyon instead and run this as 2
separate physical jobs: one to place the shared data in the RAMDISK and one
to run the two dependent RDDs concurrently? Or would it even be best in
that case to run 3 completely separate jobs?

We're planning on using YARN so there's 2 levels of scheduling going on.
We're trying to figure out the best way to utilize the resources so that we
are fully saturating the system and making sure there's constantly work
being done rather than anything spinning gears waiting on upstream
processing to occur (in mapreduce, we'd just submit a ton of jobs and have
them wait in line).


RE: Determination of number of RDDs

2014-12-04 Thread Kapil Malik
Regarding: Can we create such an array and then parallelize it?

Parallelizing an array of RDDs -> i.e. RDD[RDD[x]] is not possible.
RDD is not serializable.

From: Deep Pradhan [mailto:pradhandeep1...@gmail.com]
Sent: 04 December 2014 15:39
To: user@spark.apache.org
Subject: Determination of number of RDDs

Hi,
I have a graph and I want to create RDDs equal in number to the nodes in the 
graph. How can I do that?
If I have 10 nodes then I want to create 10 rdds. Is that possible in GraphX?
Like in C language we have array of pointers. Do we have array of RDDs in Spark.
Can we create such an array and then parallelize it?

Thank You


Re: Determination of number of RDDs

2014-12-04 Thread Ankur Dave
At 2014-12-04 02:08:45 -0800, Deep Pradhan  wrote:
> I have a graph and I want to create RDDs equal in number to the nodes in
> the graph. How can I do that?
> If I have 10 nodes then I want to create 10 rdds. Is that possible in
> GraphX?

This is possible: you can collect the elements to the driver, then create an 
RDD for each element.

If you have so many elements that collect them to the driver is infeasible, 
there's probably an alternative solution that doesn't involve creating one RDD 
per element.

Ankur

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Determination of number of RDDs

2014-12-04 Thread Deep Pradhan
Hi,
I have a graph and I want to create RDDs equal in number to the nodes in
the graph. How can I do that?
If I have 10 nodes then I want to create 10 rdds. Is that possible in
GraphX?
Like in C language we have array of pointers. Do we have array of RDDs in
Spark.
Can we create such an array and then parallelize it?

Thank You


Re: Loading RDDs in a streaming fashion

2014-12-02 Thread Ashish Rangole
This is a common use case and this is how Hadoop APIs for reading data
work, they return an Iterator [Your Record] instead of reading every record
in at once.
On Dec 1, 2014 9:43 PM, "Andy Twigg"  wrote:

> You may be able to construct RDDs directly from an iterator - not sure
> - you may have to subclass your own.
>
> On 1 December 2014 at 18:40, Keith Simmons  wrote:
> > Yep, that's definitely possible.  It's one of the workarounds I was
> > considering.  I was just curious if there was a simpler (and perhaps more
> > efficient) approach.
> >
> > Keith
> >
> > On Mon, Dec 1, 2014 at 6:28 PM, Andy Twigg  wrote:
> >>
> >> Could you modify your function so that it streams through the files
> record
> >> by record and outputs them to hdfs, then read them all in as RDDs and
> take
> >> the union? That would only use bounded memory.
> >>
> >> On 1 December 2014 at 17:19, Keith Simmons  wrote:
> >>>
> >>> Actually, I'm working with a binary format.  The api allows reading
> out a
> >>> single record at a time, but I'm not sure how to get those records into
> >>> spark (without reading everything into memory from a single file at
> once).
> >>>
> >>>
> >>>
> >>> On Mon, Dec 1, 2014 at 5:07 PM, Andy Twigg 
> wrote:
> >>>>>
> >>>>> file => tranform file into a bunch of records
> >>>>
> >>>>
> >>>> What does this function do exactly? Does it load the file locally?
> >>>> Spark supports RDDs exceeding global RAM (cf the terasort example),
> but
> >>>> if your example just loads each file locally, then this may cause
> problems.
> >>>> Instead, you should load each file into an rdd with
> context.textFile(),
> >>>> flatmap that and union these rdds.
> >>>>
> >>>> also see
> >>>>
> >>>>
> http://stackoverflow.com/questions/23397907/spark-context-textfile-load-multiple-files
> >>>>
> >>>>
> >>>> On 1 December 2014 at 16:50, Keith Simmons  wrote:
> >>>>>
> >>>>> This is a long shot, but...
> >>>>>
> >>>>> I'm trying to load a bunch of files spread out over hdfs into an RDD,
> >>>>> and in most cases it works well, but for a few very large files, I
> exceed
> >>>>> available memory.  My current workflow basically works like this:
> >>>>>
> >>>>> context.parallelize(fileNames).flatMap { file =>
> >>>>>   tranform file into a bunch of records
> >>>>> }
> >>>>>
> >>>>> I'm wondering if there are any APIs to somehow "flush" the records
> of a
> >>>>> big dataset so I don't have to load them all into memory at once.  I
> know
> >>>>> this doesn't exist, but conceptually:
> >>>>>
> >>>>> context.parallelize(fileNames).streamMap { (file, stream) =>
> >>>>>  for every 10K records write records to stream and flush
> >>>>> }
> >>>>>
> >>>>> Keith
> >>>>
> >>>>
> >>>
> >>
> >
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Loading RDDs in a streaming fashion

2014-12-01 Thread Andy Twigg
You may be able to construct RDDs directly from an iterator - not sure
- you may have to subclass your own.

On 1 December 2014 at 18:40, Keith Simmons  wrote:
> Yep, that's definitely possible.  It's one of the workarounds I was
> considering.  I was just curious if there was a simpler (and perhaps more
> efficient) approach.
>
> Keith
>
> On Mon, Dec 1, 2014 at 6:28 PM, Andy Twigg  wrote:
>>
>> Could you modify your function so that it streams through the files record
>> by record and outputs them to hdfs, then read them all in as RDDs and take
>> the union? That would only use bounded memory.
>>
>> On 1 December 2014 at 17:19, Keith Simmons  wrote:
>>>
>>> Actually, I'm working with a binary format.  The api allows reading out a
>>> single record at a time, but I'm not sure how to get those records into
>>> spark (without reading everything into memory from a single file at once).
>>>
>>>
>>>
>>> On Mon, Dec 1, 2014 at 5:07 PM, Andy Twigg  wrote:
>>>>>
>>>>> file => tranform file into a bunch of records
>>>>
>>>>
>>>> What does this function do exactly? Does it load the file locally?
>>>> Spark supports RDDs exceeding global RAM (cf the terasort example), but
>>>> if your example just loads each file locally, then this may cause problems.
>>>> Instead, you should load each file into an rdd with context.textFile(),
>>>> flatmap that and union these rdds.
>>>>
>>>> also see
>>>>
>>>> http://stackoverflow.com/questions/23397907/spark-context-textfile-load-multiple-files
>>>>
>>>>
>>>> On 1 December 2014 at 16:50, Keith Simmons  wrote:
>>>>>
>>>>> This is a long shot, but...
>>>>>
>>>>> I'm trying to load a bunch of files spread out over hdfs into an RDD,
>>>>> and in most cases it works well, but for a few very large files, I exceed
>>>>> available memory.  My current workflow basically works like this:
>>>>>
>>>>> context.parallelize(fileNames).flatMap { file =>
>>>>>   tranform file into a bunch of records
>>>>> }
>>>>>
>>>>> I'm wondering if there are any APIs to somehow "flush" the records of a
>>>>> big dataset so I don't have to load them all into memory at once.  I know
>>>>> this doesn't exist, but conceptually:
>>>>>
>>>>> context.parallelize(fileNames).streamMap { (file, stream) =>
>>>>>  for every 10K records write records to stream and flush
>>>>> }
>>>>>
>>>>> Keith
>>>>
>>>>
>>>
>>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Loading RDDs in a streaming fashion

2014-12-01 Thread Keith Simmons
Yep, that's definitely possible.  It's one of the workarounds I was
considering.  I was just curious if there was a simpler (and perhaps more
efficient) approach.

Keith

On Mon, Dec 1, 2014 at 6:28 PM, Andy Twigg  wrote:

> Could you modify your function so that it streams through the files record
> by record and outputs them to hdfs, then read them all in as RDDs and take
> the union? That would only use bounded memory.
>
> On 1 December 2014 at 17:19, Keith Simmons  wrote:
>
>> Actually, I'm working with a binary format.  The api allows reading out a
>> single record at a time, but I'm not sure how to get those records into
>> spark (without reading everything into memory from a single file at once).
>>
>>
>>
>> On Mon, Dec 1, 2014 at 5:07 PM, Andy Twigg  wrote:
>>
>>> file => tranform file into a bunch of records
>>>
>>>
>>> What does this function do exactly? Does it load the file locally?
>>> Spark supports RDDs exceeding global RAM (cf the terasort example), but
>>> if your example just loads each file locally, then this may cause problems.
>>> Instead, you should load each file into an rdd with context.textFile(),
>>> flatmap that and union these rdds.
>>>
>>> also see
>>>
>>> http://stackoverflow.com/questions/23397907/spark-context-textfile-load-multiple-files
>>>
>>>
>>> On 1 December 2014 at 16:50, Keith Simmons  wrote:
>>>
>>>> This is a long shot, but...
>>>>
>>>> I'm trying to load a bunch of files spread out over hdfs into an RDD,
>>>> and in most cases it works well, but for a few very large files, I exceed
>>>> available memory.  My current workflow basically works like this:
>>>>
>>>> context.parallelize(fileNames).flatMap { file =>
>>>>   tranform file into a bunch of records
>>>> }
>>>>
>>>> I'm wondering if there are any APIs to somehow "flush" the records of a
>>>> big dataset so I don't have to load them all into memory at once.  I know
>>>> this doesn't exist, but conceptually:
>>>>
>>>> context.parallelize(fileNames).streamMap { (file, stream) =>
>>>>  for every 10K records write records to stream and flush
>>>> }
>>>>
>>>> Keith
>>>>
>>>
>>>
>>
>


Re: Loading RDDs in a streaming fashion

2014-12-01 Thread Andy Twigg
Could you modify your function so that it streams through the files record
by record and outputs them to hdfs, then read them all in as RDDs and take
the union? That would only use bounded memory.

On 1 December 2014 at 17:19, Keith Simmons  wrote:

> Actually, I'm working with a binary format.  The api allows reading out a
> single record at a time, but I'm not sure how to get those records into
> spark (without reading everything into memory from a single file at once).
>
>
>
> On Mon, Dec 1, 2014 at 5:07 PM, Andy Twigg  wrote:
>
>> file => tranform file into a bunch of records
>>
>>
>> What does this function do exactly? Does it load the file locally?
>> Spark supports RDDs exceeding global RAM (cf the terasort example), but
>> if your example just loads each file locally, then this may cause problems.
>> Instead, you should load each file into an rdd with context.textFile(),
>> flatmap that and union these rdds.
>>
>> also see
>>
>> http://stackoverflow.com/questions/23397907/spark-context-textfile-load-multiple-files
>>
>>
>> On 1 December 2014 at 16:50, Keith Simmons  wrote:
>>
>>> This is a long shot, but...
>>>
>>> I'm trying to load a bunch of files spread out over hdfs into an RDD,
>>> and in most cases it works well, but for a few very large files, I exceed
>>> available memory.  My current workflow basically works like this:
>>>
>>> context.parallelize(fileNames).flatMap { file =>
>>>   tranform file into a bunch of records
>>> }
>>>
>>> I'm wondering if there are any APIs to somehow "flush" the records of a
>>> big dataset so I don't have to load them all into memory at once.  I know
>>> this doesn't exist, but conceptually:
>>>
>>> context.parallelize(fileNames).streamMap { (file, stream) =>
>>>  for every 10K records write records to stream and flush
>>> }
>>>
>>> Keith
>>>
>>
>>
>


Re: Loading RDDs in a streaming fashion

2014-12-01 Thread Keith Simmons
Actually, I'm working with a binary format.  The api allows reading out a
single record at a time, but I'm not sure how to get those records into
spark (without reading everything into memory from a single file at once).



On Mon, Dec 1, 2014 at 5:07 PM, Andy Twigg  wrote:

> file => tranform file into a bunch of records
>
>
> What does this function do exactly? Does it load the file locally?
> Spark supports RDDs exceeding global RAM (cf the terasort example), but if
> your example just loads each file locally, then this may cause problems.
> Instead, you should load each file into an rdd with context.textFile(),
> flatmap that and union these rdds.
>
> also see
>
> http://stackoverflow.com/questions/23397907/spark-context-textfile-load-multiple-files
>
>
> On 1 December 2014 at 16:50, Keith Simmons  wrote:
>
>> This is a long shot, but...
>>
>> I'm trying to load a bunch of files spread out over hdfs into an RDD, and
>> in most cases it works well, but for a few very large files, I exceed
>> available memory.  My current workflow basically works like this:
>>
>> context.parallelize(fileNames).flatMap { file =>
>>   tranform file into a bunch of records
>> }
>>
>> I'm wondering if there are any APIs to somehow "flush" the records of a
>> big dataset so I don't have to load them all into memory at once.  I know
>> this doesn't exist, but conceptually:
>>
>> context.parallelize(fileNames).streamMap { (file, stream) =>
>>  for every 10K records write records to stream and flush
>> }
>>
>> Keith
>>
>
>


Re: Loading RDDs in a streaming fashion

2014-12-01 Thread Andy Twigg
>
> file => tranform file into a bunch of records


What does this function do exactly? Does it load the file locally?
Spark supports RDDs exceeding global RAM (cf the terasort example), but if
your example just loads each file locally, then this may cause problems.
Instead, you should load each file into an rdd with context.textFile(),
flatmap that and union these rdds.

also see
http://stackoverflow.com/questions/23397907/spark-context-textfile-load-multiple-files


On 1 December 2014 at 16:50, Keith Simmons  wrote:

> This is a long shot, but...
>
> I'm trying to load a bunch of files spread out over hdfs into an RDD, and
> in most cases it works well, but for a few very large files, I exceed
> available memory.  My current workflow basically works like this:
>
> context.parallelize(fileNames).flatMap { file =>
>   tranform file into a bunch of records
> }
>
> I'm wondering if there are any APIs to somehow "flush" the records of a
> big dataset so I don't have to load them all into memory at once.  I know
> this doesn't exist, but conceptually:
>
> context.parallelize(fileNames).streamMap { (file, stream) =>
>  for every 10K records write records to stream and flush
> }
>
> Keith
>


Loading RDDs in a streaming fashion

2014-12-01 Thread Keith Simmons
This is a long shot, but...

I'm trying to load a bunch of files spread out over hdfs into an RDD, and
in most cases it works well, but for a few very large files, I exceed
available memory.  My current workflow basically works like this:

context.parallelize(fileNames).flatMap { file =>
  tranform file into a bunch of records
}

I'm wondering if there are any APIs to somehow "flush" the records of a big
dataset so I don't have to load them all into memory at once.  I know this
doesn't exist, but conceptually:

context.parallelize(fileNames).streamMap { (file, stream) =>
 for every 10K records write records to stream and flush
}

Keith


Re: RDDs join problem: incorrect result

2014-11-30 Thread Harihar Nahak
what do you mean by incorrect? could you please share some examples from
both the RDD and resultant RDD also If you get any exception paste that
too. it helps to debug where is the issue

On 27 November 2014 at 17:07, liuboya [via Apache Spark User List] <
ml-node+s1001560n19928...@n3.nabble.com> wrote:

> Hi,
>I ran into a problem when doing two RDDs join operation. For example,
> RDDa: RDD[(String,String)] and RDDb:RDD[(String,Int)]. Then, the result
> RDDc:[String,(String,Int)] = RDDa.join(RDDb). But I find the results in
> RDDc are  incorrect compared with RDDb. What's wrong in join?
>
> --
>  If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/RDDs-join-problem-incorrect-result-tp19928.html
>  To start a new topic under Apache Spark User List, email
> ml-node+s1001560n1...@n3.nabble.com
> To unsubscribe from Apache Spark User List, click here
> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=aG5haGFrQHd5bnlhcmRncm91cC5jb218MXwtMTgxOTE5MTkyOQ==>
> .
> NAML
> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>



-- 
Regards,
Harihar Nahak
BigData Developer
Wynyard
Email:hna...@wynyardgroup.com | Extn: 8019




-----
--Harihar
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/RDDs-join-problem-incorrect-result-tp19928p20056.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

RDDs join problem: incorrect result

2014-11-26 Thread liuboya
Hi,
   I ran into a problem when doing two RDDs join operation. For example,
RDDa: RDD[(String,String)] and RDDb:RDD[(String,Int)]. Then, the result
RDDc:[String,(String,Int)] = RDDa.join(RDDb). But I find the results in RDDc
are  incorrect compared with RDDb. What's wrong in join?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/RDDs-join-problem-incorrect-result-tp19928.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to join two RDDs with mutually exclusive keys

2014-11-20 Thread Harihar Nahak
Thanks Daniel , 

Applied Join from PairedRDD 

 val countByUsername = file1.join(file2)

.map {
case (id, (username, count)) => (id, username, count)
}  





-
--Harihar
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-join-two-RDDs-with-mutually-exclusive-keys-tp19417p19431.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to join two RDDs with mutually exclusive keys

2014-11-20 Thread Daniel Siegmann
Harihar, your question is the opposite of what was asked. In the future,
please start a new thread for new questions.

You want to do a join in your case. The join function does an inner join,
which I think is what you want because you stated your IDs are common in
both RDDs. For other cases you can look at leftOuterJoin, rightOuterJoin,
and cogroup (alias groupWith). These are all on PairRDDFunctions (in Scala)
[1]

Alternatively, if one of your RDDs is small, you could collect it and
broadcast the collection, using it in functions on the other RDD. But I
don't think this will apply in your case because the number of records will
be equal in each RDD. [2]

[1]
http://spark.apache.org/docs/latest/programming-guide.html#working-with-key-value-pairs
[2]
http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables

On Thu, Nov 20, 2014 at 4:53 PM, Harihar Nahak 
wrote:

> I've similar type of issue, want to join two different type of RDD in one
> RDD
>
> file1.txt content (ID, counts)
> val x : RDD[Long, Int] = sc.textFile("file1.txt").map( line =>
> line.split(",")).map(row => (row(0).toLong, row(1).toInt)
> [(4407 ,40),
> (2064, 38),
> (7815 ,10),
> (5736,17),
> (8031,3)]
>
> Second RDD from : file2.txt contains (ID, name)
> val y: RDD[(Long, String)]{where ID is common in both the RDDs}
> [(4407 ,Jhon),
> (2064, Maria),
> (7815 ,Casto),
> (5736,Ram),
> (8031,XYZ)]
>
> and I'm expecting result should be like this : [(ID, Name, Count)]
> [(4407 ,Jhon, 40),
> (2064, Maria, 38),
> (7815 ,Casto, 10),
> (5736,Ram, 17),
> (8031,XYZ, 3)]
>
>
> Any help will really appreciate. Thanks
>
>
>
>
> On 21 November 2014 09:18, dsiegmann [via Apache Spark User List] <[hidden
> email] <http://user/SendEmail.jtp?type=node&node=19423&i=0>> wrote:
>
>> You want to use RDD.union (or SparkContext.union for many RDDs). These
>> don't join on a key. Union doesn't really do anything itself, so it is low
>> overhead. Note that the combined RDD will have all the partitions of the
>> original RDDs, so you may want to coalesce after the union.
>>
>> val x = sc.parallelize(Seq( (1, 3), (2, 4) ))
>> val y = sc.parallelize(Seq( (3, 5), (4, 7) ))
>> val z = x.union(y)
>>
>> z.collect
>> res0: Array[(Int, Int)] = Array((1,3), (2,4), (3,5), (4,7))
>>
>>
>> On Thu, Nov 20, 2014 at 3:06 PM, Blind Faith <[hidden email]
>> <http://user/SendEmail.jtp?type=node&node=19419&i=0>> wrote:
>>
>>> Say I have two RDDs with the following values
>>>
>>> x = [(1, 3), (2, 4)]
>>>
>>> and
>>>
>>> y = [(3, 5), (4, 7)]
>>>
>>> and I want to have
>>>
>>> z = [(1, 3), (2, 4), (3, 5), (4, 7)]
>>>
>>> How can I achieve this. I know you can use outerJoin followed by map to
>>> achieve this, but is there a more direct way for this.
>>>
>>
>>
>>
>> --
>> Daniel Siegmann, Software Developer
>> Velos
>> Accelerating Machine Learning
>>
>> 54 W 40th St, New York, NY 10018
>> E: [hidden email] <http://user/SendEmail.jtp?type=node&node=19419&i=1>
>> W: www.velos.io
>>
>>
>> --
>>  If you reply to this email, your message will be added to the
>> discussion below:
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-join-two-RDDs-with-mutually-exclusive-keys-tp19417p19419.html
>>  To start a new topic under Apache Spark User List, email [hidden email]
>> <http://user/SendEmail.jtp?type=node&node=19423&i=1>
>> To unsubscribe from Apache Spark User List, click here.
>> NAML
>> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>
>
>
>
> --
> Regards,
> Harihar Nahak
> BigData Developer
> Wynyard
> [hidden email] <http://user/SendEmail.jtp?type=node&node=19423&i=2> |
> Extn: 8019
>  --Harihar
>
> --
> View this message in context: Re: How to join two RDDs with mutually
> exclusive keys
> <http://apache-spark-user-list.1001560.n3.nabble.com/How-to-join-two-RDDs-with-mutually-exclusive-keys-tp19417p19423.html>
> Sent from the Apache Spark User List mailing list archive
> <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com.
>



-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

54 W 40th St, New York, NY 10018
E: daniel.siegm...@velos.io W: www.velos.io


Re: How to join two RDDs with mutually exclusive keys

2014-11-20 Thread Daniel Haviv
There is probably a better way to do it but I would register both as temp 
tables and then join them via SQL.

BR,
Daniel

> On 20 בנוב׳ 2014, at 23:53, Harihar Nahak  wrote:
> 
> I've similar type of issue, want to join two different type of RDD in one RDD 
>  
> 
> file1.txt content (ID, counts) 
> val x : RDD[Long, Int] = sc.textFile("file1.txt").map( line => 
> line.split(",")).map(row => (row(0).toLong, row(1).toInt)
> [(4407 ,40), 
> (2064, 38),
> (7815 ,10),
> (5736,17), 
> (8031,3)]
> 
> Second RDD from : file2.txt contains (ID, name)
> val y: RDD[(Long, String)]{where ID is common in both the RDDs}
> [(4407 ,Jhon), 
> (2064, Maria),
> (7815 ,Casto),
> (5736,Ram), 
> (8031,XYZ)]
> 
> and I'm expecting result should be like this : [(ID, Name, Count)]
> [(4407 ,Jhon, 40), 
> (2064, Maria, 38),
> (7815 ,Casto, 10),
> (5736,Ram, 17), 
> (8031,XYZ, 3)]
> 
> 
> Any help will really appreciate. Thanks 
> 
> 
>  
> 
>> On 21 November 2014 09:18, dsiegmann [via Apache Spark User List] <[hidden 
>> email]> wrote:
>> You want to use RDD.union (or SparkContext.union for many RDDs). These don't 
>> join on a key. Union doesn't really do anything itself, so it is low 
>> overhead. Note that the combined RDD will have all the partitions of the 
>> original RDDs, so you may want to coalesce after the union.
>> 
>> val x = sc.parallelize(Seq( (1, 3), (2, 4) ))
>> val y = sc.parallelize(Seq( (3, 5), (4, 7) ))
>> val z = x.union(y)
>> 
>> z.collect
>> res0: Array[(Int, Int)] = Array((1,3), (2,4), (3,5), (4,7))
>> 
>> 
>>> On Thu, Nov 20, 2014 at 3:06 PM, Blind Faith <[hidden email]> wrote:
>>> Say I have two RDDs with the following values
>>> 
>>> x = [(1, 3), (2, 4)]
>>> and
>>> 
>>> y = [(3, 5), (4, 7)]
>>> and I want to have
>>> 
>>> z = [(1, 3), (2, 4), (3, 5), (4, 7)]
>>> How can I achieve this. I know you can use outerJoin followed by map to 
>>> achieve this, but is there a more direct way for this.
>>> 
>> 
>> 
>> 
>> -- 
>> Daniel Siegmann, Software Developer
>> Velos
>> Accelerating Machine Learning
>> 
>> 54 W 40th St, New York, NY 10018
>> E: [hidden email] W: www.velos.io
>> 
>> 
>> If you reply to this email, your message will be added to the discussion 
>> below:
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-join-two-RDDs-with-mutually-exclusive-keys-tp19417p19419.html
>> To start a new topic under Apache Spark User List, email [hidden email] 
>> To unsubscribe from Apache Spark User List, click here.
>> NAML
> 
> 
> 
> -- 
> Regards,
> Harihar Nahak
> BigData Developer  
> Wynyard 
> [hidden email] | Extn: 8019
> --Harihar
> 
> View this message in context: Re: How to join two RDDs with mutually 
> exclusive keys
> Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: How to join two RDDs with mutually exclusive keys

2014-11-20 Thread Harihar Nahak
I've similar type of issue, want to join two different type of RDD in one
RDD

file1.txt content (ID, counts)
val x : RDD[Long, Int] = sc.textFile("file1.txt").map( line =>
line.split(",")).map(row => (row(0).toLong, row(1).toInt)
[(4407 ,40),
(2064, 38),
(7815 ,10),
(5736,17),
(8031,3)]

Second RDD from : file2.txt contains (ID, name)
val y: RDD[(Long, String)]{where ID is common in both the RDDs}
[(4407 ,Jhon),
(2064, Maria),
(7815 ,Casto),
(5736,Ram),
(8031,XYZ)]

and I'm expecting result should be like this : [(ID, Name, Count)]
[(4407 ,Jhon, 40),
(2064, Maria, 38),
(7815 ,Casto, 10),
(5736,Ram, 17),
(8031,XYZ, 3)]


Any help will really appreciate. Thanks




On 21 November 2014 09:18, dsiegmann [via Apache Spark User List] <
ml-node+s1001560n19419...@n3.nabble.com> wrote:

> You want to use RDD.union (or SparkContext.union for many RDDs). These
> don't join on a key. Union doesn't really do anything itself, so it is low
> overhead. Note that the combined RDD will have all the partitions of the
> original RDDs, so you may want to coalesce after the union.
>
> val x = sc.parallelize(Seq( (1, 3), (2, 4) ))
> val y = sc.parallelize(Seq( (3, 5), (4, 7) ))
> val z = x.union(y)
>
> z.collect
> res0: Array[(Int, Int)] = Array((1,3), (2,4), (3,5), (4,7))
>
>
> On Thu, Nov 20, 2014 at 3:06 PM, Blind Faith <[hidden email]
> <http://user/SendEmail.jtp?type=node&node=19419&i=0>> wrote:
>
>> Say I have two RDDs with the following values
>>
>> x = [(1, 3), (2, 4)]
>>
>> and
>>
>> y = [(3, 5), (4, 7)]
>>
>> and I want to have
>>
>> z = [(1, 3), (2, 4), (3, 5), (4, 7)]
>>
>> How can I achieve this. I know you can use outerJoin followed by map to
>> achieve this, but is there a more direct way for this.
>>
>
>
>
> --
> Daniel Siegmann, Software Developer
> Velos
> Accelerating Machine Learning
>
> 54 W 40th St, New York, NY 10018
> E: [hidden email] <http://user/SendEmail.jtp?type=node&node=19419&i=1> W:
> www.velos.io
>
>
> --
>  If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-join-two-RDDs-with-mutually-exclusive-keys-tp19417p19419.html
>  To start a new topic under Apache Spark User List, email
> ml-node+s1001560n1...@n3.nabble.com
> To unsubscribe from Apache Spark User List, click here
> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=aG5haGFrQHd5bnlhcmRncm91cC5jb218MXwtMTgxOTE5MTkyOQ==>
> .
> NAML
> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>



-- 
Regards,
Harihar Nahak
BigData Developer
Wynyard
Email:hna...@wynyardgroup.com | Extn: 8019




-
--Harihar
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-join-two-RDDs-with-mutually-exclusive-keys-tp19417p19423.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: How to join two RDDs with mutually exclusive keys

2014-11-20 Thread Daniel Siegmann
You want to use RDD.union (or SparkContext.union for many RDDs). These
don't join on a key. Union doesn't really do anything itself, so it is low
overhead. Note that the combined RDD will have all the partitions of the
original RDDs, so you may want to coalesce after the union.

val x = sc.parallelize(Seq( (1, 3), (2, 4) ))
val y = sc.parallelize(Seq( (3, 5), (4, 7) ))
val z = x.union(y)

z.collect
res0: Array[(Int, Int)] = Array((1,3), (2,4), (3,5), (4,7))


On Thu, Nov 20, 2014 at 3:06 PM, Blind Faith 
wrote:

> Say I have two RDDs with the following values
>
> x = [(1, 3), (2, 4)]
>
> and
>
> y = [(3, 5), (4, 7)]
>
> and I want to have
>
> z = [(1, 3), (2, 4), (3, 5), (4, 7)]
>
> How can I achieve this. I know you can use outerJoin followed by map to
> achieve this, but is there a more direct way for this.
>



-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

54 W 40th St, New York, NY 10018
E: daniel.siegm...@velos.io W: www.velos.io


How to join two RDDs with mutually exclusive keys

2014-11-20 Thread Blind Faith
Say I have two RDDs with the following values

x = [(1, 3), (2, 4)]

and

y = [(3, 5), (4, 7)]

and I want to have

z = [(1, 3), (2, 4), (3, 5), (4, 7)]

How can I achieve this. I know you can use outerJoin followed by map to
achieve this, but is there a more direct way for this.


Re: Transform RDD.groupBY result to multiple RDDs

2014-11-19 Thread Sean Owen
What's your use case? You would not generally want to make so many small
RDDs.
On Nov 20, 2014 6:19 AM, "Dai, Kevin"  wrote:

>  Hi, all
>
>
>
> Suppose I have a RDD of (K, V) tuple and I do groupBy on it with the key K.
>
>
>
> My question is how to make each groupBy resukt whick is (K, iterable[V]) a
> RDD.
>
>
>
> BTW, can we transform it as a DStream and also each groupBY result is a
> RDD in it?
>
>
>
> Best Regards,
>
> Kevin.
>


Transform RDD.groupBY result to multiple RDDs

2014-11-19 Thread Dai, Kevin
Hi, all

Suppose I have a RDD of (K, V) tuple and I do groupBy on it with the key K.

My question is how to make each groupBy resukt whick is (K, iterable[V]) a RDD.

BTW, can we transform it as a DStream and also each groupBY result is a RDD in 
it?

Best Regards,
Kevin.


Re: Declaring multiple RDDs and efficiency concerns

2014-11-14 Thread Sean Owen
This code executes on the driver, and an "RDD" here is really just a
handle on all the distributed data out there. It's a local bookkeeping
object. So, manipulation of these objects themselves in the local
driver code has virtually no performance impact. These two versions
would be about identical*.

* maybe someone can point out a case where not maintaining the
reference lets something get cleaned up earlier, but I'm not aware of
this sort of effect

On Fri, Nov 14, 2014 at 4:31 PM, Simone Franzini  wrote:
> Let's say I have to apply a complex sequence of operations to a certain RDD.
> In order to make code more modular/readable, I would typically have
> something like this:
>
> object myObject {
>   def main(args: Array[String]) {
> val rdd1 = function1(myRdd)
> val rdd2 = function2(rdd1)
> val rdd3 = function3(rdd2)
>   }
>
>   def function1(rdd: RDD) : RDD = { doSomething }
>   def function2(rdd: RDD) : RDD = { doSomethingElse }
>   def function3(rdd: RDD) : RDD = { doSomethingElseYet }
> }
>
> So I am explicitly declaring vals for the intermediate steps. Does this end
> up using more storage than if I just chained all of the operations and
> declared only one val instead?
> If yes, is there a better way to chain together the operations?
> Ideally I would like to do something like:
>
> val rdd = function1.function2.function3
>
> Is there a way I can write the signature of my functions to accomplish this?
> Is this also an efficiency issue or just a stylistic one?
>
> Simone Franzini, PhD
>
> http://www.linkedin.com/in/simonefranzini

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



<    1   2   3   4   5   6   >