Hi Everyone, Thanks a lot for your answers. It helped me a lot to clear the concept :)
Best, Sid On Mon, Aug 1, 2022 at 12:17 AM Vinod KC <vinod.kc...@gmail.com> wrote: > Hi Sid, > This example code with output will add some more clarity > > spark-shell --conf spark.sql.shuffle.partitions=3 --conf >> spark.sql.autoBroadcastJoinThreshold=-1 >> >> >> scala> import org.apache.spark.sql.DataFrame >> import org.apache.spark.sql.DataFrame >> >> scala> import org.apache.spark.sql.functions.{array, concat, explode, >> floor, lit, rand} >> import org.apache.spark.sql.functions.{array, concat, explode, floor, >> lit, rand} >> >> >> >> scala> import spark.implicits._ >> import spark.implicits._ >> >> scala> >> >> scala> val df1 = Seq( >> | ("x", "bc"), >> | ("x", "ce"), >> | ("x", "ab"), >> | ("x", "ef"), >> | ("x", "gh"), >> | ("y", "hk"), >> | ("z", "jk") >> | ).toDF("t1c1","t1c2") >> df1: org.apache.spark.sql.DataFrame = [t1c1: string, t1c2: string] >> >> scala> df1.show(10,false) >> +----+----+ >> |t1c1|t1c2| >> +----+----+ >> |x |bc | >> |x |ce | >> |x |ab | >> |x |ef | >> |x |gh | >> |y |hk | >> |z |jk | >> +----+----+ >> >> >> scala> val df2 = Seq( >> | ("x", "gkl"), >> | ("y", "nmb"), >> | ("z", "qwe") >> | ).toDF("t2c1","t2c2") >> df2: org.apache.spark.sql.DataFrame = [t2c1: string, t2c2: string] >> >> scala> df2.show(10,false) >> +----+----+ >> |t2c1|t2c2| >> +----+----+ >> |x |gkl | >> |y |nmb | >> |z |qwe | >> +----+----+ >> >> >> scala> >> | def applySalt(leftTable: DataFrame, leftCol: String, rightTable: >> DataFrame) = { >> | >> | var df1 = leftTable >> | .withColumn(leftCol, concat( >> | leftTable.col(leftCol), lit("_"), lit(floor(rand(123456) * >> 10)))) >> | var df2 = rightTable >> | .withColumn("explodedCol", >> | explode( >> | array((0 to 10).map(lit(_)): _ *) >> | )) >> | >> | (df1, df2) >> | } >> applySalt: (leftTable: org.apache.spark.sql.DataFrame, leftCol: String, >> rightTable: org.apache.spark.sql.DataFrame)(org.apache.spark.sql.DataFrame, >> org.apache.spark.sql.DataFrame) >> >> scala> val (df3, df4) = applySalt(df1, "t1c1", df2) >> df3: org.apache.spark.sql.DataFrame = [t1c1: string, t1c2: string] >> df4: org.apache.spark.sql.DataFrame = [t2c1: string, t2c2: string ... 1 >> more field] >> >> scala> >> >> scala> df3.show(100, false) >> +----+----+ >> |t1c1|t1c2| >> +----+----+ >> |x_4 |bc | >> |x_8 |ce | >> |x_3 |ab | >> |x_0 |ef | >> |x_6 |gh | >> |y_9 |hk | >> |z_7 |jk | >> +----+----+ >> >> >> scala> df4.show(100, false) >> +----+----+-----------+ >> |t2c1|t2c2|explodedCol| >> +----+----+-----------+ >> |x |gkl |0 | >> |x |gkl |1 | >> |x |gkl |2 | >> |x |gkl |3 | >> |x |gkl |4 | >> |x |gkl |5 | >> |x |gkl |6 | >> |x |gkl |7 | >> |x |gkl |8 | >> |x |gkl |9 | >> |x |gkl |10 | >> |y |nmb |0 | >> |y |nmb |1 | >> |y |nmb |2 | >> |y |nmb |3 | >> |y |nmb |4 | >> |y |nmb |5 | >> |y |nmb |6 | >> |y |nmb |7 | >> |y |nmb |8 | >> |y |nmb |9 | >> |y |nmb |10 | >> |z |qwe |0 | >> |z |qwe |1 | >> |z |qwe |2 | >> |z |qwe |3 | >> |z |qwe |4 | >> |z |qwe |5 | >> |z |qwe |6 | >> |z |qwe |7 | >> |z |qwe |8 | >> |z |qwe |9 | >> |z |qwe |10 | >> +----+----+-----------+ >> >> >> scala> //join after elminating data skewness >> >> scala> val df5 = df3.join(df4, df3.col("t1c1")<=> >> concat(df4.col("t2c1"),lit("_"),df4.col("explodedCol"))) >> df5: org.apache.spark.sql.DataFrame = [t1c1: string, t1c2: string ... 3 >> more fields] >> >> scala> df5.show(100,false) >> +----+----+----+----+-----------+ >> |t1c1|t1c2|t2c1|t2c2|explodedCol| >> +----+----+----+----+-----------+ >> |x_0 |ef |x |gkl |0 | >> |y_9 |hk |y |nmb |9 | >> |x_3 |ab |x |gkl |3 | >> |x_4 |bc |x |gkl |4 | >> |x_6 |gh |x |gkl |6 | >> |z_7 |jk |z |qwe |7 | >> |x_8 |ce |x |gkl |8 | >> +----+----+----+----+-----------+ >> >> scala> df5.queryExecution.sparkPlan >> res14: org.apache.spark.sql.execution.SparkPlan = >> SortMergeJoin [coalesce(t1c1#32, ), isnull(t1c1#32)], >> [coalesce(concat(t2c1#21, _, cast(explodedCol#36 as string)), ), >> isnull(concat(t2c1#21, _, cast(explodedCol#36 as string)))], Inner >> :- LocalTableScan [t1c1#32, t1c2#6] >> +- Generate explode([0,1,2,3,4,5,6,7,8,9,10]), [t2c1#21, t2c2#22], false, >> [explodedCol#36] >> +- LocalTableScan [t2c1#21, t2c2#22] >> >> >> scala> df5.drop("t1c1", "explodedCol").show >> +----+----+----+ >> |t1c2|t2c1|t2c2| >> +----+----+----+ >> | ef| x| gkl| >> | hk| y| nmb| >> | ab| x| gkl| >> | bc| x| gkl| >> | gh| x| gkl| >> | jk| z| qwe| >> | ce| x| gkl| >> +----+----+----+ >> > > Regards > Vinod > > On Sun, Jul 31, 2022 at 1:59 AM ayan guha <guha.a...@gmail.com> wrote: > >> One option is create a separate column in table A with salting. Use it as >> partition key. Use original column for joining. >> >> Ayan >> >> On Sun, 31 Jul 2022 at 6:45 pm, Jacob Lynn <abebopare...@gmail.com> >> wrote: >> >>> The key is this line from Amit's email (emphasis added): >>> >>> > Change the join_col to *all possible values* of the sale. >>> >>> The two tables are treated asymmetrically: >>> >>> 1. The skewed table gets random salts appended to the join key. >>> 2. The other table gets all possible salts appended to the join key >>> (e.g. using a range array literal + explode). >>> >>> Thus guarantees that every row in the skewed table will match a row in >>> the other table. This StackOverflow answer >>> <https://stackoverflow.com/a/57951114/1892435> gives an example. >>> >>> Op zo 31 jul. 2022 om 10:41 schreef Amit Joshi < >>> mailtojoshia...@gmail.com>: >>> >>>> Hi Sid, >>>> >>>> I am not sure I understood your question. >>>> But the keys cannot be different post salting in both the tables, this >>>> is what i have shown in the explanation. >>>> You salt Table A and then explode Table B to create all possible values. >>>> >>>> In your case, I do not understand, what Table B has x_8/9. It should be >>>> all possible values which you used to create salt. >>>> >>>> I hope you understand. >>>> >>>> Thanks >>>> >>>> >>>> >>>> On Sun, Jul 31, 2022 at 10:02 AM Sid <flinkbyhe...@gmail.com> wrote: >>>> >>>>> Hi Amit, >>>>> >>>>> Thanks for your reply. However, your answer doesn't seem different >>>>> from what I have explained. >>>>> >>>>> My question is after salting if the keys are different like in my >>>>> example then post join there would be no results assuming the join type as >>>>> inner join because even though the keys are segregated in different >>>>> partitions based on unique keys they are not matching because x_1/x_2 >>>>> !=x_8/x_9 >>>>> >>>>> How do you ensure that the results are matched? >>>>> >>>>> Best, >>>>> Sid >>>>> >>>>> On Sun, Jul 31, 2022 at 1:34 AM Amit Joshi <mailtojoshia...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi Sid, >>>>>> >>>>>> Salting is normally a technique to add random characters to existing >>>>>> values. >>>>>> In big data we can use salting to deal with the skewness. >>>>>> Salting in join cas be used as : >>>>>> * Table A-* >>>>>> Col1, join_col , where join_col values are {x1, x2, x3} >>>>>> x1 >>>>>> x1 >>>>>> x1 >>>>>> x2 >>>>>> x2 >>>>>> x3 >>>>>> >>>>>> *Table B-* >>>>>> join_col, Col3 , where join_col value are {x1, x2} >>>>>> x1 >>>>>> x2 >>>>>> >>>>>> *Problem: *Let say for table A, data is skewed on x1 >>>>>> Now salting goes like this. *Salt value =2* >>>>>> For >>>>>> *table A, *create a new col with values by salting join col >>>>>> *New_Join_Col* >>>>>> x1_1 >>>>>> x1_2 >>>>>> x1_1 >>>>>> x2_1 >>>>>> x2_2 >>>>>> x3_1 >>>>>> >>>>>> For *Table B,* >>>>>> Change the join_col to all possible values of the sale. >>>>>> join_col >>>>>> x1_1 >>>>>> x1_2 >>>>>> x2_1 >>>>>> x2_2 >>>>>> >>>>>> And then join it like >>>>>> table1.join(table2, where tableA.new_join_col == tableB. join_col) >>>>>> >>>>>> Let me know if you have any questions. >>>>>> >>>>>> Regards >>>>>> Amit Joshi >>>>>> >>>>>> >>>>>> On Sat, Jul 30, 2022 at 7:16 PM Sid <flinkbyhe...@gmail.com> wrote: >>>>>> >>>>>>> Hi Team, >>>>>>> >>>>>>> I was trying to understand the Salting technique for the column >>>>>>> where there would be a huge load on a single partition because of the >>>>>>> same >>>>>>> keys. >>>>>>> >>>>>>> I referred to one youtube video with the below understanding: >>>>>>> >>>>>>> So, using the salting technique we can actually change the joining >>>>>>> column values by appending some random number in a specified range. >>>>>>> >>>>>>> So, suppose I have these two values in a partition of two different >>>>>>> tables: >>>>>>> >>>>>>> Table A: >>>>>>> Partition1: >>>>>>> x >>>>>>> . >>>>>>> . >>>>>>> . >>>>>>> x >>>>>>> >>>>>>> Table B: >>>>>>> Partition1: >>>>>>> x >>>>>>> . >>>>>>> . >>>>>>> . >>>>>>> x >>>>>>> >>>>>>> After Salting it would be something like the below: >>>>>>> >>>>>>> Table A: >>>>>>> Partition1: >>>>>>> x_1 >>>>>>> >>>>>>> Partition 2: >>>>>>> x_2 >>>>>>> >>>>>>> Table B: >>>>>>> Partition1: >>>>>>> x_3 >>>>>>> >>>>>>> Partition 2: >>>>>>> x_8 >>>>>>> >>>>>>> Now, when I inner join these two tables after salting in order to >>>>>>> avoid data skewness problems, I won't get a match since the keys are >>>>>>> different after applying salting techniques. >>>>>>> >>>>>>> So how does this resolves the data skewness issue or if there is >>>>>>> some understanding gap? >>>>>>> >>>>>>> Could anyone help me in layman's terms? >>>>>>> >>>>>>> TIA, >>>>>>> Sid >>>>>>> >>>>>> -- >> Best Regards, >> Ayan Guha >> >