I've been trying to understand the performance of Datasets (and filters) in Spark 2.0.
I have a Dataset which I've read from a parquet file and cached into memory (deser). This is spread across 8 partitions and consumes a total of 826MB of memory on my cluster. I verified that the dataset was 100% cached in memory by looking at the Spark UI. I'm using an AWS c3.2xlarge for my 1 worker (8 cores). There are 108,587,678 total records in my cached dataset (om). I run the following command (against this cached Dataset) and it takes 13.56s. om.filter(textAnnotation => textAnnotation.annotType == "ce:para").count This returns a count of 1,039,993 When I look at the explain() for this query, I see the following: == Physical Plan == *Filter <function1>.apply+- InMemoryTableScan [docId#394, annotSet#395, annotType#396, startOffset#397L, endOffset#398L, annotId#399L, parentId#400L, orig#401, lemma#402, pos#403, xmlId#404], [<function1>.apply] +- InMemoryRelation [docId#394, annotSet#395, annotType#396, startOffset#397L, endOffset#398L, annotId#399L, parentId#400L, orig#401, lemma#402, pos#403, xmlId#404], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) +- Exchange hashpartitioning(docId#394, 8) ... I was a bit perplexed why this takes so long as I had read that Spark could filter 1B rows a second on a single cpu. Granted, my row is likely more complex but I thought it should be faster than 13+ seconds to read in 100M rows that had been cached into memory. So, I modified the above query to the following: om.filter("annotType == 'ce:para'").count The query now completes in just over 1s (a huge improvement). When I do the explain plan for this query, I see the following: == Physical Plan == *Filter (isnotnull(annotType#396) && (annotType#396 = ce:para)) +- InMemoryTableScan [docId#394, annotSet#395, annotType#396, startOffset#397L, endOffset#398L, annotId#399L, parentId#400L, orig#401, lemma#402, pos#403, xmlId#404], [isnotnull(annotType#396), (annotType#396 = ce:para)] +- InMemoryRelation [docId#394, annotSet#395, annotType#396, startOffset#397L, endOffset#398L, annotId#399L, parentId#400L, orig#401, lemma#402, pos#403, xmlId#404], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) +- Exchange hashpartitioning(docId#394, 8) This is very similar to the first with the notable exception of *Filter (isnotnull(annotType#396) && (annotType#396 = ce:para)) instead of *Filter <function1>.apply I'm guessing the improved performance is because the object TextAnnotation must be created in the first example (and not the second). Although, this is not clear from the explain plans. Is that correct? Or is there some other reason why the second approach is significantly faster? I would really like to get a solid understanding for why the performance of the second query is so much faster. I also want to clarify whether the InMemoryTableScan and inMemoryRelation are part of the whole-stage code generation. I'm thinking they aren't as they aren't prefixed by a "*". If not, is there something I could do to make take this part of whole-stage code generation? My goal is to make the above operation as fast as possible. I could of course increase the partitions (and the size of my cluster) but I also want to clarify my understanding of whole-stage code generation. Any thought/suggestions would be appreciated. Also, if anyone has found good resources that further explain the details of the DAG and whole-stage code generation, I would appreciate those as well. Thanks. Darin. --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org