Hi Michael, Mich, and Jacek, thank you for providing good suggestions. I found some ways of getting rid of skew, such as the approaches you have suggested (filtering, broadcasting, joining, unioning), as well as salting my 0-value IDs.
Thank you for the help! On Sun, Aug 14, 2016 at 11:33 AM, Michael Armbrust <mich...@databricks.com> wrote: > You can force a broadcast, but with tables that large its probably not a > good idea. However, filtering and then broadcasting one of the joins is > likely to get you the benefits of broadcasting (no shuffle on the larger > table that will colocate all the skewed tuples to a single overloaded > executor) without attempting to broadcast something thats too large. > > On Sun, Aug 14, 2016 at 11:02 AM, Jacek Laskowski <ja...@japila.pl> wrote: > >> Hi Michael, >> >> As I understand broadcast joins, Jestin could also use broadcast >> function on a dataset to make it broadcast. Jestin could force the >> brodcast without the trick hoping it's gonna kick off brodcast. >> Correct? >> >> Pozdrawiam, >> Jacek Laskowski >> ---- >> https://medium.com/@jaceklaskowski/ >> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark >> Follow me at https://twitter.com/jaceklaskowski >> >> >> On Sun, Aug 14, 2016 at 9:51 AM, Michael Armbrust >> <mich...@databricks.com> wrote: >> > Have you tried doing the join in two parts (id == 0 and id != 0) and >> then >> > doing a union of the results? It is possible that with this technique, >> that >> > the join which only contains skewed data would be filtered enough to >> allow >> > broadcasting of one side. >> > >> > On Sat, Aug 13, 2016 at 11:15 PM, Jestin Ma <jestinwith.a...@gmail.com> >> > wrote: >> >> >> >> Hi, I'm currently trying to perform an outer join between two >> >> DataFrames/Sets, one is ~150GB, one is about ~50 GB on a column, id. >> >> >> >> df1.id is skewed in that there are many 0's, the rest being unique >> IDs. >> >> >> >> df2.id is not skewed. If I filter df1.id != 0, then the join works >> well. >> >> If I don't, then the join does not complete for a very, very long time. >> >> >> >> I have diagnosed this problem due to the hashpartitioning on IDs, >> >> resulting in one partition containing many values due to data skew. One >> >> executor ends up reading most of the shuffle data, and writing all of >> the >> >> shuffle data, as shown below. >> >> >> >> >> >> >> >> >> >> >> >> Shown above is the task in question assigned to one executor. >> >> >> >> >> >> >> >> This screenshot comes from one of the executors, showing one single >> thread >> >> spilling sort data since the executor cannot hold 90%+ of the ~200 GB >> result >> >> in memory. >> >> >> >> Moreover, looking at the event timeline, I find that the executor on >> that >> >> task spends about 20% time reading shuffle data, 70% computation, and >> 10% >> >> writing output data. >> >> >> >> I have tried the following: >> >> >> >> "Salting" the 0-value keys by monotonically_increasing_id().mod(N) >> >> - This doesn't seem to have an effect since now I have >> hundreds/thousands >> >> of keys with tens of thousands of occurrences. >> >> - Should I increase N? Is there a way to just do random.mod(N) instead >> of >> >> monotonically_increasing_id()? >> >> >> >> Repartitioning according to column I know contains unique values >> >> >> >> - This is overridden by Spark's sort-based shuffle manager which hash >> >> repartitions on the skewed column >> >> >> >> - Is it possible to change this? Or will the join column need to be >> hashed >> >> and partitioned on for joins to work >> >> >> >> Broadcasting does not work for my large tables >> >> >> >> Increasing/decreasing spark.sql.shuffle.partitions does not remedy the >> >> skewed data problem as 0-product values are still being hashed to the >> same >> >> partition. >> >> >> >> >> >> ---------------------------------- >> >> >> >> What I am considering currently is doing the join at the RDD level, >> but is >> >> there any level of control which can solve my skewed data problem? >> Other >> >> than that, see the bolded question. >> >> >> >> I would appreciate any suggestions/tips/experience with this. Thank >> you! >> >> >> > >> > >