Hi Mich, do you mean using the skewed column as a join condition? I tried repartition(skewed column, unique column) but had no success, possibly because the join was still hash-partitioning on just the skewed column after I called repartition.
On Sun, Aug 14, 2016 at 1:49 AM, Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > Can you make the join more selective by using the skewed column ID + > another column that has valid unique vales( Repartitioning according to > column I know contains unique values)? > > > HTH > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > On 14 August 2016 at 07:17, Jestin Ma <jestinwith.a...@gmail.com> wrote: > >> Attached are screenshots mentioned, apologies for that. >> >> On Sat, Aug 13, 2016 at 11:15 PM, Jestin Ma <jestinwith.a...@gmail.com> >> wrote: >> >>> Hi, I'm currently trying to perform an outer join between two >>> DataFrames/Sets, one is ~150GB, one is about ~50 GB on a column, id. >>> >>> df1.id is skewed in that there are many 0's, the rest being unique IDs. >>> >>> df2.id is not skewed. If I filter df1.id != 0, then the join works >>> well. If I don't, then the join does not complete for a very, very long >>> time. >>> >>> I have diagnosed this problem due to the hashpartitioning on IDs, >>> resulting in one partition containing many values due to data skew. One >>> executor ends up reading most of the shuffle data, and writing all of the >>> shuffle data, as shown below. >>> >>> >>> >>> >>> >>> Shown above is the task in question assigned to one executor. >>> >>> >>> >>> This screenshot comes from one of the executors, showing one single >>> thread spilling sort data since the executor cannot hold 90%+ of the ~200 >>> GB result in memory. >>> >>> Moreover, looking at the event timeline, I find that the executor on >>> that task spends about 20% time reading shuffle data, 70% computation, and >>> 10% writing output data. >>> >>> I have tried the following: >>> >>> >>> - "Salting" the 0-value keys by monotonically_increasing_id().mod(N) >>> - - This doesn't seem to have an effect since now I have >>> hundreds/thousands of keys with tens of thousands of occurrences. >>> - - Should I increase N? Is there a way to just do random.mod(N) >>> instead of monotonically_increasing_id()? >>> - >>> - Repartitioning according to column I know contains unique values >>> - >>> - - This is overridden by Spark's sort-based shuffle manager which >>> hash repartitions on the skewed column >>> - >>> - - Is it possible to change this? Or will the join column need to >>> be hashed and partitioned on for joins to work >>> - >>> - Broadcasting does not work for my large tables >>> - >>> - Increasing/decreasing spark.sql.shuffle.partitions does not remedy >>> the skewed data problem as 0-product values are still being hashed to the >>> same partition. >>> >>> >>> ---------------------------------- >>> >>> What I am considering currently is doing the join at the RDD level, but >>> is there any level of control which can solve my skewed data problem? Other >>> than that, see the bolded question. >>> >>> I would appreciate any suggestions/tips/experience with this. Thank you! >>> >>> >> >> >> --------------------------------------------------------------------- >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> > >