The “Any” is required by the code it is being passed to, which is the 
Elasticsearch Spark index writing code. The values are actually RDD[(String, 
Map[String, String])]

No shuffle that I know of. RDDs are created from the output of Mahout 
SimilarityAnalysis.cooccurrence and are turned into RDD[(String, Map[String, 
String])], Since all actual values are simple there is no serialization except 
for standard Java/Scala so no custom serializers or use of Kryo.

I understand that the driver can’t know, I was suggesting that isEmpty could be 
backed by a boolean RDD member variable calculated for every RDD at creation 
time in Spark. This is really the only way to solve generally since sometimes 
you get an RDD from a lib, so wrapping it as I suggested is not practical, it 
would have to be in Spark. BTW the same approach could be used for count, an 
accumulator per RDD, then returned as a pre-calculated RDD state value.

Are you suggesting that both take(1) and isEmpty are unusable for some reason 
in my case? I can pass around this information if I have to, I just thought the 
worst case was O(n) where n was number of partitions and therefor always 
trivial to calculate.

This would be 0 time on the timeline if I explicitly keep track os RDD size 
with accumulators (just slightly slower .map), so is this my best path?


On Dec 9, 2015, at 10:06 AM, Sean Owen <so...@cloudera.com> wrote:

Yes but what is the code that generates the RDD? is it a shuffle of something? 
that could cause checking for any element to be expensive since computing the 
RDD at all is expensive. Look at the stages in these long-running jobs.

How could isEmpty not be distributed? the driver can't know whether the RDD's 
partitions are empty without evaluating at least one of them a little bit 
(unless there are 0 partitions). Caching the size doesn't help unless, well, 
you know the size already because the RDD was fully computed. And it might get 
weird anyway since RDDs are only as deterministic as their source -- counting 
lines of a text file will return a different number if the text file is 
appended to.

The only thing that sticks out is the time to serialize one value back to the 
driver. I don't know what your "Any" is there but could it be big or hard to 
serialize?

Really there's a little gotcha in this implementation: you can only check 
isEmpty on an RDD of serializable objects! which is a pretty good assumption; 
you won't get far with an RDD of something unserializable but it's not 
impossible for it to come up.

The serialization could be avoided by mapping everything to "1" or something 
and take-ing *that*. Returning a 1 to the driver is trivial. Or maybe adapt 
some version of the implementation of take() to be an optimized, smarter 
isEmpty(). Neither seemed worth the overhead at the time, but this could be a 
case against that, if it turns out somehow to be serialization time.


On Wed, Dec 9, 2015 at 5:55 PM, Pat Ferrel <p...@occamsmachete.com 
<mailto:p...@occamsmachete.com>> wrote:
Err, compiled for Spark 1.3.1, running on 1.5.1 if that makes any difference. 
The Spark impl is “provided” so should be using 1.5.1 code afaik.

The code is as you see below for isEmpty, so not sure what else could it could 
be measuring since it’s the only spark thing on the line. I can regen the 
timeline but here is the .take(1) timeline. It is an order of magnitude faster 
(from my recollection) but even the take(1) still seems incredibly slow for an 
empty test. I was surprised that isEmpty is a distributed calc. When run from 
the driver this value could have already been calculated as a byproduct of 
creating the RDD, no?

I could use an accumulator to count members as the RDD is created and get a 
negligible .isEmpty calc time, right? The RDD creation might be slightly slower 
due to using an accumulator.



<PredictionIO Training: org.template.RecommendationEngine - Spark Jobs 
2015-12-09 09-35-33.png>


On Dec 9, 2015, at 9:29 AM, Sean Owen <so...@cloudera.com 
<mailto:so...@cloudera.com>> wrote:

Are you sure it's isEmpty? and not an upstream stage? isEmpty is
definitely the action here.  It doesn't make sense that take(1) is so
much faster since it's the "same thing".

On Wed, Dec 9, 2015 at 5:11 PM, Pat Ferrel <p...@occamsmachete.com 
<mailto:p...@occamsmachete.com>> wrote:
> Sure, I thought this might be a known issue.
> 
> I have a 122M dataset, which is the trust and rating data from epinions. The 
> data is split into two RDDs and there is an item properties RDD. The code is 
> just trying to remove any empty RDD from the list.
> 
> val esRDDs: List[RDD[(String, Map[String, Any])]] =
>  (correlators ::: properties).filterNot( c => c.isEmpty())
> 
> On my 16G MBP with 4g per executor and 4 executors the IsEmpty takes over a 
> hundred minutes (going from memory, I can supply the timeline given a few 
> hours to recalc it).
> 
> Running a different version of the code that does a .count for debug and 
> .take(1) instead of the .isEmpty the count of one epinions RDD take 8 minutes 
> and the .take(1) uses 3 minutes.
> 
> Other users have seen total runtime on 13G dataset of 700 minutes with the 
> execution time mostly spent in isEmpty.
> 
> 
> On Dec 9, 2015, at 8:50 AM, Sean Owen <so...@cloudera.com 
> <mailto:so...@cloudera.com>> wrote:
> 
> It should at best collect 1 item to the driver. This means evaluating
> at least 1 element of 1 partition. I can imagine pathological cases
> where that's slow, but, do you have any more info? how slow is slow
> and what is slow?
> 
> On Wed, Dec 9, 2015 at 4:41 PM, Pat Ferrel <p...@occamsmachete.com 
> <mailto:p...@occamsmachete.com>> wrote:
>> I’m getting *huge* execution times on a moderate sized dataset during the
>> RDD.isEmpty. Everything in the calculation is fast except an RDD.isEmpty
>> calculation. I’m using Spark 1.5.1 and from researching I would expect this
>> calculation to be linearly proportional to the number of partitions as a
>> worst case, which should be a trivial amount of time but it is taking many
>> minutes to hours to complete this single phase.
>> 
>> I know that has been a small amount of discussion about using this so would
>> love to hear what the current thinking on the subject is. Is there a better
>> way to find if an RDD has data? Can someone explain why this is happening?
>> 
>> reference PR
>> https://github.com/apache/spark/pull/4534 
>> <https://github.com/apache/spark/pull/4534>
> 
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> <mailto:user-unsubscr...@spark.apache.org>
> For additional commands, e-mail: user-h...@spark.apache.org 
> <mailto:user-h...@spark.apache.org>
> 
> 

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
<mailto:user-unsubscr...@spark.apache.org>
For additional commands, e-mail: user-h...@spark.apache.org 
<mailto:user-h...@spark.apache.org>




Reply via email to