Hello,

I recently noticed that spark doesn't optimize the joins when we are
limiting it.

Say when we have

payment.join(customer,Seq("customerId"), "left").limit(1).explain(true)


Spark doesn't optimize it.

>  == Physical Plan ==
> CollectLimit 1
> +- *(5) Project [customerId#29, paymentId#28, amount#30, name#41]
>    +- SortMergeJoin [customerId#29], [customerId#40], LeftOuter
>       :- *(2) Sort [customerId#29 ASC NULLS FIRST], false, 0
>       :  +- Exchange hashpartitioning(customerId#29, 200)
>       :     +- *(1) Project [_1#24 AS paymentId#28, _2#25 AS
> customerId#29, _3#26 AS amount#30]
>       :        +- *(1) SerializeFromObject [assertnotnull(input[0,
> scala.Tuple3, true])._1 AS _1#24, assertnotnull(input[0, scala.Tuple3,
> true])._2 AS _2#25, assertnotnull(input[0, scala.Tuple3, true])._3 AS _3#26]
>       :           +- Scan[obj#23]
>       +- *(4) Sort [customerId#40 ASC NULLS FIRST], false, 0
>          +- Exchange hashpartitioning(customerId#40, 200)
>             +- *(3) Project [_1#37 AS customerId#40, _2#38 AS name#41]
>                +- *(3) SerializeFromObject [assertnotnull(input[0,
> scala.Tuple2, true])._1 AS _1#37, staticinvoke(class
> org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
> assertnotnull(input[0, scala.Tuple2, true])._2, true, false) AS _2#38]
>                   +- Scan[obj#36]


Am I missing something here? Is there a way to avoid unnecessary joining of
data?

Regards,
Akhil

Reply via email to