[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2015-11-24 Thread josephlijia
Github user josephlijia commented on the pull request:

https://github.com/apache/spark/pull/1297#issuecomment-159197234
  
We have implemented a faster way by using zipPartition. But the final 
results are packaged in RDD. When data volumes are huge, it is much faster than 
it is now. Could you please tell me how can I apply for contributing this into 
IndexeddRDD? Thank you very much. I am expecting your answer.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2015-11-22 Thread pwendell
Github user pwendell commented on the pull request:

https://github.com/apache/spark/pull/1297#issuecomment-158851260
  
@josephlijia this feature has moved into a Spark package. If you want to 
file an issue report it's best to do it here:

https://github.com/amplab/spark-indexedrdd


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2015-11-21 Thread josephlijia
Github user josephlijia commented on the pull request:

https://github.com/apache/spark/pull/1297#issuecomment-158647538
  
When we looked up one certain key-value by IndexedRDD, we found that it was 
even slower than ordinary RDD. We use 100, keys in our experiment. When we 
tested it by IndexedRDDPartition, it was faster than ordinary RDD. I am 
expecting your answer. Thanks you.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2015-10-15 Thread tispratik
Github user tispratik commented on the pull request:

https://github.com/apache/spark/pull/1297#issuecomment-148489954
  
This is very interesting. Thanks for working on it. Hopefully it will be 
out soon.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2015-08-27 Thread zerosign
Github user zerosign commented on the pull request:

https://github.com/apache/spark/pull/1297#issuecomment-135373104
  
Hi Ankur, 

Any update on this pull request ? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2015-07-14 Thread swethakasireddi
Github user swethakasireddi commented on the pull request:

https://github.com/apache/spark/pull/1297#issuecomment-121435467
  
Hi Ankur,

Is this available in Spark 1.4.0 ? Also, can this be used in Spark 
Streaming for lookups/updates/deletes based on key instead of having to iterate 
all the values?




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2015-06-26 Thread josephlijia
Github user josephlijia commented on the pull request:

https://github.com/apache/spark/pull/1297#issuecomment-115716387
  
When I want to update one value by one key using IndexedRDD, it only 
re-creates one LeafNode. It is the cost of updating. Is it right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2015-06-26 Thread ankurdave
Github user ankurdave commented on the pull request:

https://github.com/apache/spark/pull/1297#issuecomment-115831041
  
@josephlijia For the old version of IndexedRDD (version 0.1), an update 
recreates one LeafNode, plus all InternalNodes up to the root.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2015-06-24 Thread josephlijia
Github user josephlijia commented on the pull request:

https://github.com/apache/spark/pull/1297#issuecomment-114842646
  
I met a question when I did some testings based on IndexedRDD. I compared 
original RDD with IndexedRDD when looking up, updating, joining and deleting. 
However, I found that the time consumption of IndexedRDD is almost the same 
with the original RDD. I took a screenshot below. Could you please tell the 
reason for this strange result? I am expecting your answer. Thanks a lot.








At 2015-06-10 14:46:54, Ankur Dave notificati...@github.com wrote:


It does send all keys to all partitions, because ksByPartition is 
referenced in the closure passed to context.runJob and so is shipped in full to 
all partitions. On the other hand, multiput creates an RDD out of the keys and 
then repartitions that RDD, causing Spark to handle the data movement and 
avoiding this behavior.

—
Reply to this email directly or view it on GitHub.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2015-06-24 Thread adamnovak
Github user adamnovak commented on the pull request:

https://github.com/apache/spark/pull/1297#issuecomment-114941993
  
I'm not sure this is the appropriate place to ask. Maybe make a new issue
on the IndexedRDD repo?

On Wed, Jun 24, 2015 at 4:52 AM, josephlijia notificati...@github.com
wrote:

 I met a question when I did some testings based on IndexedRDD. I compared
 original RDD with IndexedRDD when looking up, updating, joining and
 deleting. However, I found that the time consumption of IndexedRDD is
 almost the same with the original RDD. I took a screenshot below. Could 
you
 please tell the reason for this strange result? I am expecting your 
answer.
 Thanks a lot.









 At 2015-06-10 14:46:54, Ankur Dave notificati...@github.com wrote:


 It does send all keys to all partitions, because ksByPartition is
 referenced in the closure passed to context.runJob and so is shipped in
 full to all partitions. On the other hand, multiput creates an RDD out of
 the keys and then repartitions that RDD, causing Spark to handle the data
 movement and avoiding this behavior.

 —
 Reply to this email directly or view it on GitHub.

 —
 Reply to this email directly or view it on GitHub
 https://github.com/apache/spark/pull/1297#issuecomment-114842646.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2015-06-10 Thread josephlijia
Github user josephlijia commented on the pull request:

https://github.com/apache/spark/pull/1297#issuecomment-110605232
  
Well, I found that getting is slower than putting by using IndexedRDD. But 
getting should be faster than putting, is it right? I am expecting your reply. 
Thanks a lot.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2015-06-10 Thread ankurdave
Github user ankurdave commented on the pull request:

https://github.com/apache/spark/pull/1297#issuecomment-110610103
  
@josephlijia Right, getting should generally be faster than putting. 
However, for large batches of keys, multiget might be slower than multiput 
because it currently broadcasts all keys to all partitions. How are you doing 
the measurement?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2015-06-10 Thread josephlijia
Github user josephlijia commented on the pull request:

https://github.com/apache/spark/pull/1297#issuecomment-110615752
  
Look at the code below:
def multiget(ks: Array[Id]): Map[Id, V] = {
val ksByPartition = ks.groupBy(k = 
self.partitioner.get.getPartition(k))
   val results: Array[Array[(Id, V)]] = 
self.context.runJob(self.partitionsRDD,
  (context: TaskContext, partIter: Iterator[P[V]]) = {
if (partIter.hasNext  
ksByPartition.contains(context.partitionId)) {
  val part = partIter.next()
  val ksForPartition = ksByPartition.get(context.partitionId).get
  part.multiget(ksForPartition).toArray
} else {
  Array.empty
}
  }, partitions, allowLocal = true)}
It partitions the keys according to self partitioner. That is to say, each 
partition only gets its corresponding keys. It does't send all keys to all 
partitions, is it? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2015-06-10 Thread ankurdave
Github user ankurdave commented on the pull request:

https://github.com/apache/spark/pull/1297#issuecomment-110616280
  
It does send all keys to all partitions, because `ksByPartition` is 
referenced in the closure passed to `context.runJob` and so is shipped in full 
to all partitions. On the other hand, `multiput` creates an RDD out of the keys 
and then repartitions that RDD, causing Spark to handle the data movement and 
avoiding this behavior.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2015-03-13 Thread jason-dai
Github user jason-dai commented on the pull request:

https://github.com/apache/spark/pull/1297#issuecomment-79776523
  
@jegonzal I wonder if you can share more details on your stack overflow 
issue. We were considering a general fix (e.g., as I outlined in 
https://issues.apache.org/jira/browse/SPARK-4672), but there were not specific 
requests for this at that time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2015-01-11 Thread jegonzal
Github user jegonzal commented on the pull request:

https://github.com/apache/spark/pull/1297#issuecomment-69504481
  
We should really address this stack overflow issue. Is there a JIRA we can 
promote?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2015-01-11 Thread octavian-ganea
Github user octavian-ganea commented on the pull request:

https://github.com/apache/spark/pull/1297#issuecomment-69505333
  
Writing the RDD to disk from time to time is not a solution for me. Also 
the second idea it's not good if I am doing random put and get ops. A common 
usecase is an SGD algorithm in which a large set of params need to be updated 
(in parallel) . Is there any mutable version of this IndexedRDD that I could 
use with Spark, a version that would have an interface similar to HashMap from 
Java and that doesn't copy the entire RDD after each insert ? Many thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2015-01-11 Thread ash211
Github user ash211 commented on the pull request:

https://github.com/apache/spark/pull/1297#issuecomment-69521957
  
@jegonzal https://issues.apache.org/jira/browse/SPARK-4672 is relevant for 
specifically GraphX encountering the stack overflow and has extensive 
discussion, but I don't think there's a Jira for the general issue outside of 
GraphX.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2015-01-11 Thread jegonzal
Github user jegonzal commented on the pull request:

https://github.com/apache/spark/pull/1297#issuecomment-69531109
  
Hmm, we really need to elevate this to a full issue.  I have run into the
stack overflow in MLlib (ALS) as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2015-01-10 Thread ankurdave
Github user ankurdave commented on the pull request:

https://github.com/apache/spark/pull/1297#issuecomment-69475120
  
@octavian-ganea IndexedRDD creates a new lineage entry for each operation. 
This enables fault tolerance but, as with other iterative Spark programs, 
causes stack overflows when the lineage chain gets too long. There are two ways 
to mitigate this:

1. 
[Checkpoint](http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing)
 the lineage periodically (every 50-100 iterations) using 
`sc.setCheckpointDir(...)` and `indexed.checkpoint()`. This will write each 
checkpointed RDD to disk in full. There isn't yet support for incremental 
checkpoints that only write the changes since the last checkpoint.

2. Reduce the lineage length by batching operations using multiput or join. 
This will also improve per-operation performance by amortizing the cost of the 
lineage entry across all batched elements.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2015-01-09 Thread octavian-ganea
Github user octavian-ganea commented on the pull request:

https://github.com/apache/spark/pull/1297#issuecomment-69365816
  
Thanks for the nice work! 
I am trying to use this IndexedRDD as a distributed hash map and I would 
like to be able to insert and update many entries (tens of millions). However, 
for the following code I get rapidly a StackOverflow on a 8 nodes cluster, each 
node having 120GB of RAM: 

val rdd = sc.parallelize((1 to 10).map(x = (x.toLong, 1)))
var indexed = IndexedRDD(rdd).cache
for (i - 1 until 1000) {
  indexed = indexed.put(i, 0)
  if (i % 1000 == 0) {
println(i =  + i +  val =  + indexed.get(i))
  }
}

I tried also doing an update like this: indexed = indexed.put(i, 
0).cache(), or even keep a second indexed2 and do an update like:

indexed2 = indexed.put(i, 0).cache();
indexed.unpersist();
indexed = indexed2;

For both methods I get stackoverflow after less than 1000 iterations. 

Can you please help me with this issue ?

Many thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2014-12-23 Thread ankurdave
Github user ankurdave closed the pull request at:

https://github.com/apache/spark/pull/1297


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2014-12-23 Thread ankurdave
Github user ankurdave commented on the pull request:

https://github.com/apache/spark/pull/1297#issuecomment-68017172
  
IndexedRDD is now part of Spark Packages, so I'm closing this PR and have 
moved it to a separate repository: https://github.com/amplab/spark-indexedrdd. 

The package has changed from org.apache.spark.rdd.IndexedRDD to 
edu.berkeley.cs.amplab.spark.IndexedRDD to reflect the move.

You can add it as a Maven dependency as described in the README.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2014-12-23 Thread nchammas
Github user nchammas commented on the pull request:

https://github.com/apache/spark/pull/1297#issuecomment-68019141
  
@ankurdave Does this mean IndexedRDD will not become part of Spark Core, or 
is that still potentially happening in the near future?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2014-12-23 Thread ankurdave
Github user ankurdave commented on the pull request:

https://github.com/apache/spark/pull/1297#issuecomment-68019859
  
@nchammas I don't think that's going to happen in the near future since the 
interface and implementation are relatively unstable, but it could still happen 
eventually.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2014-11-21 Thread adamnovak
Github user adamnovak commented on the pull request:

https://github.com/apache/spark/pull/1297#issuecomment-6365
  
Can it be in Spark 1.3? This sort of functionality would really help us get 
a Spark-based implementation of the stuff that 
@ga4gh/global-alliance-committers is doing to provide an API to enormous 
genomics data sets.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2014-11-12 Thread bobbych
Github user bobbych commented on the pull request:

https://github.com/apache/spark/pull/1297#issuecomment-62686554
  
Firstly Thanks for the work! 

i have one question?, does it support getPersistentRDDs ?  use case is 
reusing cached rdd, something along line of spark job server 
```
scala sc.getPersistentRDDs.values.head
res1: org.apache.spark.rdd.RDD[_] = MapPartitionsRDD[2] at apply at 
console:15

scala val getrdd =  
sc.getPersistentRDDs.values.head.asInstanceOf[org.apache.spark.rdd.IndexedRDD[Int]]
java.lang.ClassCastException: org.apache.spark.rdd.MapPartitionsRDD cannot 
be cast to org.apache.spark.rdd.IndexedRDD
at $iwC$$iwC$$iwC$$iwC.init(console:13)
at $iwC$$iwC$$iwC.init(console:18)
at $iwC$$iwC.init(console:20)
at $iwC.init(console:22)
at init(console:24)
at .init(console:28)
at .clinit(console)
at .init(console:7)
at .clinit(console)
at $print(console)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)
at 
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125)
at 
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669)
at 
org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:828)
at 
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:873)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:785)
at 
org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:628)
at 
org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:636)
at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:641)
at 
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:968)
at 
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916)
at 
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916)
at 
scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:916)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1011)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:353)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
```



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2014-11-12 Thread ankurdave
Github user ankurdave commented on the pull request:

https://github.com/apache/spark/pull/1297#issuecomment-62687768
  
@bobbych IndexedRDD handles persistence by caching its partitionsRDD, which 
is the MapPartitionsRDD that you're getting back from sc.getPersistentRDDs. As 
far as I know this is the only way to get Spark to store the partitions rather 
than the individual key-value pairs, but it does prevent getPersistentRDDs from 
working. Hopefully it should be easy to store the IndexedRDD in a separate 
shared variable.

@pwais Thanks! :) This won't be in 1.2, but it may yet make it into a later 
release.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2014-11-12 Thread pwais
Github user pwais commented on the pull request:

https://github.com/apache/spark/pull/1297#issuecomment-62687253
  
Curious, will this ship in 1.2 ?  (Also just want to ❤ for such a lovely 
PR)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2014-10-04 Thread MLnick
Github user MLnick commented on the pull request:

https://github.com/apache/spark/pull/1297#issuecomment-57905819
  
This looks really interesting. Is there a blocker for supporting generic 
keys (or at least say `String`), or is that a performance issue?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2014-10-04 Thread ankurdave
Github user ankurdave commented on the pull request:

https://github.com/apache/spark/pull/1297#issuecomment-57917807
  
@MLnick It's a slight performance issue, since we currently use 
PrimitiveKeyOpenHashMap which optimizes for primitive keys by avoiding null 
tracking, but I think the performance loss is worth it and I'm working on 
adding this ([SPARK-3668](https://issues.apache.org/jira/browse/SPARK-3668)).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2014-10-02 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1297#issuecomment-57693369
  
  [QA tests have 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21218/consoleFull)
 for   PR 1297 at commit 
[`02a9bde`](https://github.com/apache/spark/commit/02a9bde62102facfd5f9146039a2d68cdc0f180b).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2014-10-02 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/1297#issuecomment-57703581
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21218/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2014-10-02 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1297#issuecomment-57703566
  
  [QA tests have 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21218/consoleFull)
 for   PR 1297 at commit 
[`02a9bde`](https://github.com/apache/spark/commit/02a9bde62102facfd5f9146039a2d68cdc0f180b).
 * This patch **passes** unit tests.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `class IndexedRDD[@specialized(Long, Int, Double) V: ClassTag]`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2014-09-26 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1297#issuecomment-56926924
  
  [QA tests have 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20845/consoleFull)
 for   PR 1297 at commit 
[`eac1282`](https://github.com/apache/spark/commit/eac12827c58a05f69371ff92d82f3c97e1bcb8b2).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2014-09-26 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/1297#issuecomment-56932588
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/20845/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2014-09-26 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1297#issuecomment-56932585
  
  [QA tests have 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20845/consoleFull)
 for   PR 1297 at commit 
[`eac1282`](https://github.com/apache/spark/commit/eac12827c58a05f69371ff92d82f3c97e1bcb8b2).
 * This patch **passes** unit tests.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `class IndexedRDD[@specialized(Long, Int, Double) V: ClassTag]`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2014-09-23 Thread markncooper
Github user markncooper commented on the pull request:

https://github.com/apache/spark/pull/1297#issuecomment-56558807
  
Is it correct to assume that persist() is necessary otherwise the index 
will get recreated each time it's used?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2014-09-23 Thread ankurdave
Github user ankurdave commented on the pull request:

https://github.com/apache/spark/pull/1297#issuecomment-56559155
  
@markncooper Yes, the IndexedRDD operations are implemented purely in terms 
of Spark transformations, so they will get recomputed each time the result is 
used unless you explicitly persist it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2014-09-23 Thread ankurdave
Github user ankurdave commented on a diff in the pull request:

https://github.com/apache/spark/pull/1297#discussion_r17945448
  
--- Diff: 
core/src/main/scala/org/apache/spark/rdd/IndexedRDDPartitionLike.scala ---
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import scala.collection.immutable.LongMap
+import scala.language.higherKinds
+import scala.reflect.ClassTag
+
+import org.apache.spark.Logging
+import org.apache.spark.util.collection.BitSet
+import org.apache.spark.util.collection.ImmutableBitSet
+import org.apache.spark.util.collection.ImmutableLongOpenHashSet
+import org.apache.spark.util.collection.ImmutableVector
+import org.apache.spark.util.collection.OpenHashSet
+import org.apache.spark.util.collection.PrimitiveKeyOpenHashMap
+
+import IndexedRDD.Id
+import IndexedRDDPartition.Index
+
+/**
+ * Contains members that are shared among all variants of 
IndexedRDDPartition (e.g.,
+ * IndexedRDDPartition, ShippableVertexPartition).
+ *
+ * @tparam V the type of the values stored in the IndexedRDDPartition
+ * @tparam Self the type of the implementing container. This allows 
transformation methods on any
+ * implementing container to yield a result of the same type.
+ */
+private[spark] trait IndexedRDDPartitionLike[
+@specialized(Long, Int, Double) V,
+Self[X] : IndexedRDDPartitionLike[X, Self]]
+  extends Serializable with Logging {
+
+  /** A generator for ClassTags of the value type V. */
+  implicit def vTag: ClassTag[V]
+
+  /** Accessor for the IndexedRDDPartition variant that is mixing in this 
trait. */
+  def self: Self[V]
+
+  def index: Index
+  def values: ImmutableVector[V]
+  def mask: ImmutableBitSet
+
+  def withIndex(index: Index): Self[V]
+  def withValues[V2: ClassTag](values: ImmutableVector[V2]): Self[V2]
+  def withMask(mask: ImmutableBitSet): Self[V]
+
+  val capacity: Int = index.capacity
+
+  def size: Int = mask.cardinality()
+
+  /** Return the value for the given key. */
+  def apply(k: Id): V = values(index.getPos(k))
+
+  def isDefined(k: Id): Boolean = {
+val pos = index.getPos(k)
+pos = 0  mask.get(pos)
+  }
+
+  def iterator: Iterator[(Id, V)] =
+mask.iterator.map(ind = (index.getValue(ind), values(ind)))
+
+  /**
+   * Gets the values corresponding to the specified keys, if any.
+   */
+  def multiget(ks: Array[Id]): LongMap[V] = {
+var result = LongMap.empty[V]
+var i = 0
+while (i  ks.length) {
+  val k = ks(i)
+  if (self.isDefined(k)) {
+result = result.updated(k, self(k))
+  }
+  i += 1
+}
+result
+  }
+
+  /**
+   * Updates the keys in `kvs` to their corresponding values, running 
`merge` on old and new values
+   * if necessary. Returns a new IndexedRDDPartition that reflects the 
modification.
+   */
+  def multiput(kvs: Seq[(Id, V)], merge: (Id, V, V) = V): Self[V] = {
--- End diff --

Bug: we don't use `merge` in the else branch


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2014-09-23 Thread ankurdave
Github user ankurdave commented on a diff in the pull request:

https://github.com/apache/spark/pull/1297#discussion_r17945543
  
--- Diff: 
core/src/main/scala/org/apache/spark/rdd/IndexedRDDPartitionLike.scala ---
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import scala.collection.immutable.LongMap
+import scala.language.higherKinds
+import scala.reflect.ClassTag
+
+import org.apache.spark.Logging
+import org.apache.spark.util.collection.BitSet
+import org.apache.spark.util.collection.ImmutableBitSet
+import org.apache.spark.util.collection.ImmutableLongOpenHashSet
+import org.apache.spark.util.collection.ImmutableVector
+import org.apache.spark.util.collection.OpenHashSet
+import org.apache.spark.util.collection.PrimitiveKeyOpenHashMap
+
+import IndexedRDD.Id
+import IndexedRDDPartition.Index
+
+/**
+ * Contains members that are shared among all variants of 
IndexedRDDPartition (e.g.,
+ * IndexedRDDPartition, ShippableVertexPartition).
+ *
+ * @tparam V the type of the values stored in the IndexedRDDPartition
+ * @tparam Self the type of the implementing container. This allows 
transformation methods on any
+ * implementing container to yield a result of the same type.
+ */
+private[spark] trait IndexedRDDPartitionLike[
+@specialized(Long, Int, Double) V,
+Self[X] : IndexedRDDPartitionLike[X, Self]]
+  extends Serializable with Logging {
+
+  /** A generator for ClassTags of the value type V. */
+  implicit def vTag: ClassTag[V]
+
+  /** Accessor for the IndexedRDDPartition variant that is mixing in this 
trait. */
+  def self: Self[V]
+
+  def index: Index
+  def values: ImmutableVector[V]
+  def mask: ImmutableBitSet
+
+  def withIndex(index: Index): Self[V]
+  def withValues[V2: ClassTag](values: ImmutableVector[V2]): Self[V2]
+  def withMask(mask: ImmutableBitSet): Self[V]
+
+  val capacity: Int = index.capacity
+
+  def size: Int = mask.cardinality()
+
+  /** Return the value for the given key. */
+  def apply(k: Id): V = values(index.getPos(k))
+
+  def isDefined(k: Id): Boolean = {
+val pos = index.getPos(k)
+pos = 0  mask.get(pos)
+  }
+
+  def iterator: Iterator[(Id, V)] =
+mask.iterator.map(ind = (index.getValue(ind), values(ind)))
+
+  /**
+   * Gets the values corresponding to the specified keys, if any.
+   */
+  def multiget(ks: Array[Id]): LongMap[V] = {
+var result = LongMap.empty[V]
+var i = 0
+while (i  ks.length) {
+  val k = ks(i)
+  if (self.isDefined(k)) {
+result = result.updated(k, self(k))
+  }
+  i += 1
+}
+result
+  }
+
+  /**
+   * Updates the keys in `kvs` to their corresponding values, running 
`merge` on old and new values
+   * if necessary. Returns a new IndexedRDDPartition that reflects the 
modification.
+   */
+  def multiput(kvs: Seq[(Id, V)], merge: (Id, V, V) = V): Self[V] = {
+if (kvs.forall(kv = self.isDefined(kv._1))) {
+  // Pure updates can be implemented by modifying only the values
+  join(kvs.iterator)(merge)
+} else {
+  var newIndex = self.index
+  var newValues = self.values
+  var newMask = self.mask
+
+  var preMoveValues: ImmutableVector[V] = null
+  var preMoveMask: ImmutableBitSet = null
+  def grow(newSize: Int) {
+preMoveValues = newValues
+preMoveMask = newMask
+
+newValues = ImmutableVector.fill(newSize)(null.asInstanceOf[V])
+newMask = new ImmutableBitSet(newSize)
+  }
+  def move(oldPos: Int, newPos: Int) {
+newValues = newValues.updated(newPos, preMoveValues(oldPos))
+if (preMoveMask.get(oldPos)) newMask = newMask.set(newPos)
+  }
+
+  for (kv - kvs) {
+val id = kv._1
+

[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2014-09-23 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1297#issuecomment-56605365
  
  [QA tests have 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20734/consoleFull)
 for   PR 1297 at commit 
[`1c864cd`](https://github.com/apache/spark/commit/1c864cd29c0034736172750265b2df58699d10bb).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2014-09-23 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1297#issuecomment-56610747
  
  [QA tests have 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20734/consoleFull)
 for   PR 1297 at commit 
[`1c864cd`](https://github.com/apache/spark/commit/1c864cd29c0034736172750265b2df58699d10bb).
 * This patch **passes** unit tests.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `class IndexedRDD[@specialized(Long, Int, Double) V: ClassTag]`
  * `logInfo(Interrupting user class to stop.)`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2014-09-23 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1297#issuecomment-56610754
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/20734/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2014-09-19 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/1297#discussion_r17790219
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/IndexedRDDLike.scala ---
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import scala.collection.immutable.LongMap
+import scala.language.higherKinds
+import scala.reflect.ClassTag
+
+import org.apache.spark._
+import org.apache.spark.SparkContext._
+import org.apache.spark.storage.StorageLevel
+
+import IndexedRDD.Id
+
+/**
+ * Contains members that are shared among all variants of IndexedRDD 
(e.g., IndexedRDD,
+ * VertexRDD).
+ *
+ * @tparam V the type of the values stored in the IndexedRDD
+ * @tparam P the type of the partitions making up the IndexedRDD
+ * @tparam Self the type of the implementing container. This allows 
transformation methods on any
+ * implementing container to yield a result of the same type.
+ */
+private[spark] trait IndexedRDDLike[
+@specialized(Long, Int, Double) V,
+P[X] : IndexedRDDPartitionLike[X, P],
+Self[X] : IndexedRDDLike[X, P, Self]]
+  extends RDD[(Id, V)] {
+
+  /** A generator for ClassTags of the value type V. */
+  protected implicit def vTag: ClassTag[V]
+
+  /** A generator for ClassTags of the partition type P. */
+  protected implicit def pTag[V2]: ClassTag[P[V2]]
+
+  /** Accessor for the IndexedRDD variant that is mixing in this trait. */
+  protected def self: Self[V]
+
+  /** The underlying representation of the IndexedRDD as an RDD of 
partitions. */
+  def partitionsRDD: RDD[P[V]]
+  require(partitionsRDD.partitioner.isDefined)
+
+  def withPartitionsRDD[V2: ClassTag](partitionsRDD: RDD[P[V2]]): Self[V2]
+
+  override val partitioner = partitionsRDD.partitioner
+
+  override protected def getPartitions: Array[Partition] = 
partitionsRDD.partitions
+
+  override protected def getPreferredLocations(s: Partition): Seq[String] =
+partitionsRDD.preferredLocations(s)
+
+  override def persist(newLevel: StorageLevel): this.type = {
+partitionsRDD.persist(newLevel)
+this
+  }
+
+  override def unpersist(blocking: Boolean = true): this.type = {
+partitionsRDD.unpersist(blocking)
+this
+  }
+
+  override def count(): Long = {
+partitionsRDD.map(_.size).reduce(_ + _)
+  }
+
+  /** Provides the `RDD[(Id, V)]` equivalent output. */
+  override def compute(part: Partition, context: TaskContext): 
Iterator[(Id, V)] = {
+firstParent[P[V]].iterator(part, context).next.iterator
+  }
+
+  /** Gets the value corresponding to the specified key, if any. */
+  def get(k: Id): Option[V] = multiget(Array(k)).get(k)
+
+  /** Gets the values corresponding to the specified keys, if any. */
+  def multiget(ks: Array[Id]): Map[Id, V] = {
+val ksByPartition = ks.groupBy(k = 
self.partitioner.get.getPartition(k))
+val partitions = ksByPartition.keys.toSeq
+def unionMaps(maps: TraversableOnce[LongMap[V]]): LongMap[V] = {
+  maps.foldLeft(LongMap.empty[V]) {
+(accum, map) = accum.unionWith(map, (id, a, b) = a)
+  }
+}
+// TODO: avoid sending all keys to all partitions by creating and 
zipping an RDD of keys
--- End diff --

would this be another use of the `bulkMultiget` I suggested in jira?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2014-09-19 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/1297#discussion_r17791303
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ImmutableLongOpenHashSet.scala
 ---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import scala.reflect._
+import com.google.common.hash.Hashing
+
+/**
+ * A fast, immutable hash set optimized for insertions and lookups (but 
not deletions) of `Long`
+ * elements. Because it exposes the position of a key in the underlying 
array, this is useful as a
+ * building block for higher level data structures such as a hash map (for 
example,
+ * IndexedRDDPartition).
+ *
+ * It uses quadratic probing with a power-of-2 hash table size, which is 
guaranteed to explore all
+ * spaces for each key (see 
http://en.wikipedia.org/wiki/Quadratic_probing).
+ */
+private[spark] class ImmutableLongOpenHashSet(
+/** Underlying array of elements used as a hash table. */
+val data: ImmutableVector[Long],
+/** Whether or not there is an element at the corresponding position 
in `data`. */
+val bitset: ImmutableBitSet,
+/**
+ * Position of a focused element. This is useful when returning a 
modified set along with a
+ * pointer to the location of modification.
+ */
+val focus: Int,
+/** Load threshold at which to grow the underlying vectors. */
+loadFactor: Double
+  ) extends Serializable {
+
+  require(loadFactor  1.0, Load factor must be less than 1.0)
+  require(loadFactor  0.0, Load factor must be greater than 0.0)
+  require(capacity == nextPowerOf2(capacity), data capacity must be a 
power of 2)
+
+  import OpenHashSet.{INVALID_POS, NONEXISTENCE_MASK, POSITION_MASK, 
Hasher, LongHasher}
+
+  private val hasher: Hasher[Long] = new LongHasher
+
+  private def mask = capacity - 1
+  private def growThreshold = (loadFactor * capacity).toInt
+
+  def withFocus(focus: Int): ImmutableLongOpenHashSet =
+new ImmutableLongOpenHashSet(data, bitset, focus, loadFactor)
+
+  /** The number of elements in the set. */
+  def size: Int = bitset.cardinality
+
+  /** The capacity of the set (i.e. size of the underlying vector). */
+  def capacity: Int = data.size
+
+  /** Return true if this set contains the specified element. */
+  def contains(k: Long): Boolean = getPos(k) != INVALID_POS
+
+  /**
+   * Nondestructively add an element to the set, returning a new set. If 
the set is over capacity
+   * after the insertion, grows the set and rehashes all elements.
+   */
+  def add(k: Long): ImmutableLongOpenHashSet = {
+addWithoutResize(k).rehashIfNeeded(ImmutableLongOpenHashSet.grow, 
ImmutableLongOpenHashSet.move)
+  }
+
+  /**
+   * Add an element to the set. This one differs from add in that it 
doesn't trigger rehashing.
+   * The caller is responsible for calling rehashIfNeeded.
+   *
+   * Use (retval.focus  POSITION_MASK) to get the actual position, and
+   * (retval.focus  NONEXISTENCE_MASK) == 0 for prior existence.
+   */
+  def addWithoutResize(k: Long): ImmutableLongOpenHashSet = {
+var pos = hashcode(hasher.hash(k))  mask
+var i = 1
+var result: ImmutableLongOpenHashSet = null
+while (result == null) {
+  if (!bitset.get(pos)) {
+// This is a new key.
+result = new ImmutableLongOpenHashSet(
+  data.updated(pos, k), bitset.set(pos), pos | NONEXISTENCE_MASK, 
loadFactor)
+  } else if (data(pos) == k) {
+// Found an existing key.
+result = this.withFocus(pos)
+  } else {
+val delta = i
+pos = (pos + delta)  mask
+i += 1
+  }
+}
+result
+  }
+
+  /**
+   * Rehash the set if it is overloaded.
+   * @param allocateFunc Callback invoked when we 

[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2014-09-19 Thread squito
Github user squito commented on the pull request:

https://github.com/apache/spark/pull/1297#issuecomment-56199798
  
This looks great!  my comments are minor.

I know its early to be discussing example docs, but I just wanted to 
mention that I can see caching being an area of confusion.  Eg., you wouldn't 
want to serialize  cache each update to an indexedRDD, as each cache would 
make a full copy and not get the benefits of the ImmutableVectors. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2014-09-19 Thread markncooper
Github user markncooper commented on the pull request:

https://github.com/apache/spark/pull/1297#issuecomment-56236278
  
For what it's worth (and we are early on in our Spark usage) but we've 
kicked the tires on this IndexedRDD and we love it.  Thanks Ankur. We'll report 
back with a more thorough analysis soon.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2014-09-03 Thread JoshRosen
Github user JoshRosen commented on the pull request:

https://github.com/apache/spark/pull/1297#issuecomment-54382233
  
What's the status of this PR?  Are we blocking on design review or 
Spark/GraphX roadmap discussions?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2014-09-03 Thread ankurdave
Github user ankurdave commented on the pull request:

https://github.com/apache/spark/pull/1297#issuecomment-54383337
  
We've had a design review; the summary was that this design is good, though 
we will eventually want to support alternative update mechanisms such as 
log-structured updates. I think we are just blocking on someone to review the 
implementation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2014-07-14 Thread ankurdave
Github user ankurdave commented on a diff in the pull request:

https://github.com/apache/spark/pull/1297#discussion_r14908709
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/IndexedRDDLike.scala ---
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import scala.collection.immutable.LongMap
+import scala.language.higherKinds
+import scala.reflect.ClassTag
+
+import org.apache.spark._
+import org.apache.spark.SparkContext._
+import org.apache.spark.storage.StorageLevel
+
+import IndexedRDD.Id
+
+/**
+ * Contains members that are shared among all variants of IndexedRDD 
(e.g., IndexedRDD,
+ * VertexRDD).
+ *
+ * @tparam V the type of the values stored in the IndexedRDD
+ * @tparam P the type of the partitions making up the IndexedRDD
+ * @tparam Self the type of the implementing container. This allows 
transformation methods on any
+ * implementing container to yield a result of the same type.
+ */
+private[spark] trait IndexedRDDLike[
+@specialized(Long, Int, Double) V,
+P[X] : IndexedRDDPartitionLike[X, P],
+Self[X] : IndexedRDDLike[X, P, Self]]
+  extends RDD[(Id, V)] {
+
+  /** A generator for ClassTags of the value type V. */
+  protected implicit def vTag: ClassTag[V]
+
+  /** A generator for ClassTags of the partition type P. */
+  protected implicit def pTag[V2]: ClassTag[P[V2]]
+
+  /** Accessor for the IndexedRDD variant that is mixing in this trait. */
+  protected def self: Self[V]
+
+  /** The underlying representation of the IndexedRDD as an RDD of 
partitions. */
+  def partitionsRDD: RDD[P[V]]
+  require(partitionsRDD.partitioner.isDefined)
+
+  def withPartitionsRDD[V2: ClassTag](partitionsRDD: RDD[P[V2]]): Self[V2]
+
+  override val partitioner = partitionsRDD.partitioner
+
+  override protected def getPartitions: Array[Partition] = 
partitionsRDD.partitions
+
+  override protected def getPreferredLocations(s: Partition): Seq[String] =
+partitionsRDD.preferredLocations(s)
+
+  override def persist(newLevel: StorageLevel): this.type = {
+partitionsRDD.persist(newLevel)
+this
+  }
+
+  override def unpersist(blocking: Boolean = true): this.type = {
+partitionsRDD.unpersist(blocking)
+this
+  }
+
+  override def count(): Long = {
+partitionsRDD.map(_.size).reduce(_ + _)
+  }
+
+  /** Provides the `RDD[(Id, V)]` equivalent output. */
+  override def compute(part: Partition, context: TaskContext): 
Iterator[(Id, V)] = {
+firstParent[P[V]].iterator(part, context).next.iterator
+  }
+
+  /** Gets the value corresponding to the specified key, if any. */
+  def get(k: Id): Option[V] = multiget(Array(k)).get(k)
--- End diff --

We should also override `PairRDDFunctions.lookup` to use this fast 
implementation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2014-07-14 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1297#issuecomment-48988645
  
QA tests have started for PR 1297. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16655/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2014-07-10 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/1297#discussion_r14799543
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/IndexedRDD.scala ---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import scala.language.higherKinds
+import scala.reflect.{classTag, ClassTag}
+
+import org.apache.spark._
+import org.apache.spark.SparkContext._
+import org.apache.spark.storage.StorageLevel
+
+import IndexedRDD.Id
+
+/**
+ * :: Experimental ::
+ * An RDD of key-value `(Id, V)` pairs that enforces key uniqueness and 
pre-indexes the entries for
+ * efficient joins and point lookups/updates. Two IndexedRDDs with the 
same index can be joined
+ * efficiently. All operations except [[reindex]] preserve the index. To 
construct an `IndexedRDD`,
+ * use the [[org.apache.spark.rdd.IndexedRDD$ IndexedRDD object]].
+ *
+ * @tparam V the value associated with each entry in the set.
+ */
+class IndexedRDD[@specialized(Long, Int, Double) V: ClassTag]
--- End diff --

Could you add the @experimental annotation here too?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2014-07-08 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/1297#issuecomment-48417786
  
Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2014-07-08 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/1297#issuecomment-48417780
  
 Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2014-07-08 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/1297#issuecomment-48418341
  
 Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2014-07-08 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/1297#issuecomment-48418352
  
Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2014-07-08 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/1297#issuecomment-48418901
  
 Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2014-07-08 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/1297#issuecomment-48418910
  
Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2014-07-08 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/1297#issuecomment-48419500
  
Merged build finished. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2014-07-08 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/1297#issuecomment-48420477
  
Merged build finished. All automated tests passed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2014-07-08 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/1297#issuecomment-48420478
  
All automated tests passed.
Refer to this link for build results: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16435/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2014-07-08 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/1297#issuecomment-48421014
  
All automated tests passed.
Refer to this link for build results: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16436/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2014-07-08 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/1297#issuecomment-48421013
  
Merged build finished. All automated tests passed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2014-07-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/1297#issuecomment-48144983
  
Merged build finished. All automated tests passed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2014-07-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/1297#issuecomment-48144986
  
All automated tests passed.
Refer to this link for build results: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16361/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2014-07-06 Thread concretevitamin
Github user concretevitamin commented on a diff in the pull request:

https://github.com/apache/spark/pull/1297#discussion_r14578164
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ImmutableVector.scala ---
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import scala.reflect.ClassTag
+
+/**
+ * An immutable vector that supports efficient point updates. Similarly to
+ * scala.collection.immutable.Vector, it is implemented using a 32-ary 
tree with 32-element arrays
+ * at the leaves. Unlike Scala's Vector, it is specialized on the value 
type, making it much more
+ * memory-efficient for primitive values.
+ */
+private[spark] class ImmutableVector[@specialized(Long, Int) A](val size: 
Int, root: VectorNode[A])
+  extends Serializable {
+
+  def iterator: Iterator[A] = new VectorIterator[A](root)
+  def apply(index: Int): A = root(index)
+  def updated(index: Int, elem: A): ImmutableVector[A] =
+new ImmutableVector(size, root.updated(index, elem))
+}
+
+private[spark] object ImmutableVector {
+  def empty[A: ClassTag]: ImmutableVector[A] = new ImmutableVector(0, 
emptyNode)
+
+  def fromArray[A: ClassTag](array: Array[A]): ImmutableVector[A] = {
+fromArray(array, 0, array.length)
+  }
+
+  def fromArray[A: ClassTag](array: Array[A], start: Int, end: Int): 
ImmutableVector[A] = {
+new ImmutableVector(end - start, nodeFromArray(array, start, end))
+  }
+
+  def fill[A: ClassTag](n: Int)(a: A): ImmutableVector[A] = {
+// TODO: Implement this without allocating an extra array
+fromArray(Array.fill(n)(a), 0, n)
+  }
+
+  /** Returns the root of a 32-ary tree representing the specified 
interval into the array. */
+  private def nodeFromArray[A: ClassTag](array: Array[A], start: Int, end: 
Int): VectorNode[A] = {
+val length = end - start
+if (length == 0) {
+  emptyNode
+} else {
+  val depth = depthOf(length)
+  if (depth == 0) {
+new LeafNode(array.slice(start, end))
+  } else {
+val shift = 5 * depth
+val numChildren = ((length - 1)  shift) + 1
+val children = new Array[VectorNode[A]](numChildren)
+var i = 0
+while (i  numChildren) {
+  val childStart = start + (i  shift)
+  var childEnd = start + ((i + 1)  shift)
+  if (end  childEnd) {
+childEnd = end
+  }
+  children(i) = nodeFromArray(array, childStart, childEnd)
+  i += 1
+}
+new InternalNode(children, depth)
+  }
+}
+  }
+
+  private def emptyNode[A: ClassTag] = new LeafNode(Array.empty)
+
+  /** Returns the required tree depth for an ImmutableVector of the given 
size. */
+  private def depthOf(size: Int): Int = {
+var depth = 0
+var sizeLeft = (size - 1)  5
+while (sizeLeft  0) {
+  sizeLeft = 5
+  depth += 1
+}
+depth
+  }
+}
+
+/** Trait representing nodes in the vector tree. */
+private sealed trait VectorNode[@specialized(Long, Int) A] extends 
Serializable {
+  def apply(index: Int): A
+  def updated(index: Int, elem: A): VectorNode[A]
+  def numChildren: Int
+}
+
+/** An internal node in the vector tree (one containing other nodes rather 
than vector elements). */
+private class InternalNode[@specialized(Long, Int) A: ClassTag](
+children: Array[VectorNode[A]],
+val depth: Int)
+  extends VectorNode[A] {
+
+  require(children.length  0, InternalNode must have children)
+  require(children.length = 32, nodes cannot have more than 32 children 
(got ${children.length}))
+  require(depth = 1, InternalNode must have depth = 1 (got $depth))
--- End diff --

minor nit: add s, same for the line above


---
If your project is set up for it, you can reply 

[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2014-07-06 Thread concretevitamin
Github user concretevitamin commented on a diff in the pull request:

https://github.com/apache/spark/pull/1297#discussion_r14578170
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ImmutableVector.scala ---
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import scala.reflect.ClassTag
+
+/**
+ * An immutable vector that supports efficient point updates. Similarly to
+ * scala.collection.immutable.Vector, it is implemented using a 32-ary 
tree with 32-element arrays
+ * at the leaves. Unlike Scala's Vector, it is specialized on the value 
type, making it much more
+ * memory-efficient for primitive values.
+ */
+private[spark] class ImmutableVector[@specialized(Long, Int) A](val size: 
Int, root: VectorNode[A])
+  extends Serializable {
+
+  def iterator: Iterator[A] = new VectorIterator[A](root)
+  def apply(index: Int): A = root(index)
+  def updated(index: Int, elem: A): ImmutableVector[A] =
+new ImmutableVector(size, root.updated(index, elem))
+}
+
+private[spark] object ImmutableVector {
+  def empty[A: ClassTag]: ImmutableVector[A] = new ImmutableVector(0, 
emptyNode)
+
+  def fromArray[A: ClassTag](array: Array[A]): ImmutableVector[A] = {
+fromArray(array, 0, array.length)
+  }
+
+  def fromArray[A: ClassTag](array: Array[A], start: Int, end: Int): 
ImmutableVector[A] = {
+new ImmutableVector(end - start, nodeFromArray(array, start, end))
+  }
+
+  def fill[A: ClassTag](n: Int)(a: A): ImmutableVector[A] = {
+// TODO: Implement this without allocating an extra array
+fromArray(Array.fill(n)(a), 0, n)
+  }
+
+  /** Returns the root of a 32-ary tree representing the specified 
interval into the array. */
+  private def nodeFromArray[A: ClassTag](array: Array[A], start: Int, end: 
Int): VectorNode[A] = {
+val length = end - start
+if (length == 0) {
+  emptyNode
+} else {
+  val depth = depthOf(length)
+  if (depth == 0) {
+new LeafNode(array.slice(start, end))
+  } else {
+val shift = 5 * depth
+val numChildren = ((length - 1)  shift) + 1
+val children = new Array[VectorNode[A]](numChildren)
+var i = 0
+while (i  numChildren) {
+  val childStart = start + (i  shift)
+  var childEnd = start + ((i + 1)  shift)
+  if (end  childEnd) {
+childEnd = end
+  }
+  children(i) = nodeFromArray(array, childStart, childEnd)
+  i += 1
+}
+new InternalNode(children, depth)
+  }
+}
+  }
+
+  private def emptyNode[A: ClassTag] = new LeafNode(Array.empty)
+
+  /** Returns the required tree depth for an ImmutableVector of the given 
size. */
+  private def depthOf(size: Int): Int = {
+var depth = 0
+var sizeLeft = (size - 1)  5
+while (sizeLeft  0) {
+  sizeLeft = 5
+  depth += 1
+}
+depth
+  }
+}
+
+/** Trait representing nodes in the vector tree. */
+private sealed trait VectorNode[@specialized(Long, Int) A] extends 
Serializable {
+  def apply(index: Int): A
+  def updated(index: Int, elem: A): VectorNode[A]
+  def numChildren: Int
+}
+
+/** An internal node in the vector tree (one containing other nodes rather 
than vector elements). */
+private class InternalNode[@specialized(Long, Int) A: ClassTag](
+children: Array[VectorNode[A]],
+val depth: Int)
+  extends VectorNode[A] {
+
+  require(children.length  0, InternalNode must have children)
+  require(children.length = 32, nodes cannot have more than 32 children 
(got ${children.length}))
+  require(depth = 1, InternalNode must have depth = 1 (got $depth))
+
+  def childAt(index: Int): VectorNode[A] = children(index)
+
+  override def apply(index: Int): A = {

[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2014-07-06 Thread ankurdave
Github user ankurdave commented on the pull request:

https://github.com/apache/spark/pull/1297#issuecomment-48138509
  
@concretevitamin Thanks for the comments. I also found a way to simplify 
the design by unifying `IndexedRDD(Partition)Like` and 
`IndexedRDD(Partition)Ops` as you suggested, so I'll update the PR with that 
soon.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2014-07-06 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/1297#issuecomment-48140661
  
Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2014-07-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/1297#issuecomment-48006656
  
Merged build finished. All automated tests passed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2014-07-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/1297#issuecomment-48006657
  
All automated tests passed.
Refer to this link for build results: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16334/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---