Hi,

I am seeing different shuffle write sizes when using SchemaRDD (versus
normal RDD). I'm doing the following:

case class DomainObj(a: String, b: String, c: String, d: String)

val logs: RDD[String] = sc.textFile(...)
val filtered: RDD[String] = logs.filter(...)
val myDomainObjects: RDD[DomainObj] = filtered.flatMap(...)

------------------------------------------------------------
1. Operations on RDD:
------------------------------------------------------------
val results = requests
    .filter(obj => obj.a == "SomeValue" || obj.a == "SomeOtherValue")
    .mapPartitions(objs => objs.map(obj => (obj, 1)))
    .reduceByKey(_ + _, 200)
    .collect()

------------------------------------------------------------
2. Operations on SchemaRDD:
------------------------------------------------------------
myDomainObjects.registerTempTable("myDomainObjects")

val results = sqlContext.sql("""
    SELECT
        a, b, c, d, COUNT(*) total
    FROM
        myDomainObjects
    WHERE
        a IN ('SomeValue', 'SomeOtherValue')
    GROUP BY
        a, b, c, d
""").collect()

In the first case (RDD), the query returns in 2 minutes and 30 seconds with
the input size 28.4GB, and shuffle write size 525.3MB and shuffle read size
472.5MB.

In the second case (SchemaRDD), the query returns in 2 minutes and 9
seconds with input size 28.4GB, and shuffle write 258.9MB and shuffle read
233.0MB.

Since in the second case, the shuffle size is half of the first case, I'd
like to understand why.

Thanks,
Grega

Reply via email to