Reynold,

The source file you are directing me to is a little too terse for me to
understand what exactly is going on. Let me tell you what I'm trying to do
and what problems I'm encountering, so that you might be able to better
direct me investigation of the SparkSQL codebase.

I am computing the join of three tables, sharing the same primary key,
composed of three fields, and having several other fields. My first attempt
at computing this join was in SQL, with a query much like this slightly
simplified one:

         SELECT
          a.key1 key1, a.key2 key2, a.key3 key3,
          a.data1   adata1,    a.data2    adata2,    ...
          b.data1   bdata1,    b.data2    bdata2,    ...
          c.data1   cdata1,    c.data2    cdata2,    ...
        FROM a, b, c
        WHERE
          a.key1 = b.key1 AND a.key2 = b.key2 AND a.key3 = b.key3
          b.key1 = c.key1 AND b.key2 = c.key2 AND b.key3 = c.key3

This code yielded a SparkSQL job containing 40,000 stages, which failed
after filling up all available disk space on the worker nodes.

I then wrote this join as a plain mapreduce join. The code looks roughly
like this:
val a_ = a.map(row => (key(row), ("a", row))
val b_ = b.map(row => (key(row), ("b", row))
val c_ = c.map(row => (key(row), ("c", row"))
val join = UnionRDD(sc, List(a_, b_, c_)).groupByKey

This implementation yields approximately 1600 stages and completes in a few
minutes on a 256 core cluster. The huge difference in scale of the two jobs
makes me think that SparkSQL is implementing my join as cartesian product.
This is they query plan--I'm not sure I can read it, but it does seem to
imply that the filter conditions are not being pushed far down enough:

 'Project [...]
 'Filter (((((('a.key1 = 'b.key1)) && ('a.key2 = b.key2)) && ...)
  'Join Inner, None
   'Join Inner, None

Is maybe SparkSQL unable to push join conditions down from the WHERE clause
into the join itself?

Alex

On Thu, Jan 15, 2015 at 10:36 AM, Reynold Xin <r...@databricks.com> wrote:

> It's a bunch of strategies defined here:
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
>
> In most common use cases (e.g. inner equi join), filters are pushed below
> the join or into the join. Doing a cartesian product followed by a filter
> is too expensive.
>
>
> On Thu, Jan 15, 2015 at 7:39 AM, Alessandro Baretta <alexbare...@gmail.com
> > wrote:
>
>> Hello,
>>
>> Where can I find docs about how joins are implemented in SparkSQL? In
>> particular, I'd like to know whether they are implemented according to
>> their relational algebra definition as filters on top of a cartesian
>> product.
>>
>> Thanks,
>>
>> Alex
>>
>
>

Reply via email to