Hi Everyone,

Thanks a lot for your answers. It helped me a lot to clear the concept :)

Best,
Sid

On Mon, Aug 1, 2022 at 12:17 AM Vinod KC <vinod.kc...@gmail.com> wrote:

> Hi Sid,
> This example code with output will add some more clarity
>
> spark-shell --conf spark.sql.shuffle.partitions=3 --conf
>> spark.sql.autoBroadcastJoinThreshold=-1
>>
>>
>> scala> import org.apache.spark.sql.DataFrame
>> import org.apache.spark.sql.DataFrame
>>
>> scala> import org.apache.spark.sql.functions.{array, concat, explode,
>> floor, lit, rand}
>> import org.apache.spark.sql.functions.{array, concat, explode, floor,
>> lit, rand}
>>
>>
>>
>> scala>  import spark.implicits._
>> import spark.implicits._
>>
>> scala>
>>
>> scala>  val df1 = Seq(
>>      |     ("x", "bc"),
>>      |     ("x", "ce"),
>>      |     ("x", "ab"),
>>      |     ("x", "ef"),
>>      |     ("x", "gh"),
>>      |     ("y", "hk"),
>>      |     ("z", "jk")
>>      |   ).toDF("t1c1","t1c2")
>> df1: org.apache.spark.sql.DataFrame = [t1c1: string, t1c2: string]
>>
>> scala>  df1.show(10,false)
>> +----+----+
>> |t1c1|t1c2|
>> +----+----+
>> |x   |bc  |
>> |x   |ce  |
>> |x   |ab  |
>> |x   |ef  |
>> |x   |gh  |
>> |y   |hk  |
>> |z   |jk  |
>> +----+----+
>>
>>
>> scala> val df2 = Seq(
>>      |     ("x", "gkl"),
>>      |     ("y", "nmb"),
>>      |     ("z", "qwe")
>>      |   ).toDF("t2c1","t2c2")
>> df2: org.apache.spark.sql.DataFrame = [t2c1: string, t2c2: string]
>>
>> scala>    df2.show(10,false)
>> +----+----+
>> |t2c1|t2c2|
>> +----+----+
>> |x   |gkl |
>> |y   |nmb |
>> |z   |qwe |
>> +----+----+
>>
>>
>> scala>
>>      |   def applySalt(leftTable: DataFrame, leftCol: String, rightTable:
>> DataFrame) = {
>>      |
>>      |     var df1 = leftTable
>>      |       .withColumn(leftCol, concat(
>>      |         leftTable.col(leftCol), lit("_"), lit(floor(rand(123456) *
>> 10))))
>>      |     var df2 = rightTable
>>      |       .withColumn("explodedCol",
>>      |         explode(
>>      |           array((0 to 10).map(lit(_)): _ *)
>>      |         ))
>>      |
>>      |     (df1, df2)
>>      |   }
>> applySalt: (leftTable: org.apache.spark.sql.DataFrame, leftCol: String,
>> rightTable: org.apache.spark.sql.DataFrame)(org.apache.spark.sql.DataFrame,
>> org.apache.spark.sql.DataFrame)
>>
>> scala>   val (df3, df4) = applySalt(df1, "t1c1", df2)
>> df3: org.apache.spark.sql.DataFrame = [t1c1: string, t1c2: string]
>> df4: org.apache.spark.sql.DataFrame = [t2c1: string, t2c2: string ... 1
>> more field]
>>
>> scala>
>>
>> scala>   df3.show(100, false)
>> +----+----+
>> |t1c1|t1c2|
>> +----+----+
>> |x_4 |bc  |
>> |x_8 |ce  |
>> |x_3 |ab  |
>> |x_0 |ef  |
>> |x_6 |gh  |
>> |y_9 |hk  |
>> |z_7 |jk  |
>> +----+----+
>>
>>
>> scala>   df4.show(100, false)
>> +----+----+-----------+
>> |t2c1|t2c2|explodedCol|
>> +----+----+-----------+
>> |x   |gkl |0          |
>> |x   |gkl |1          |
>> |x   |gkl |2          |
>> |x   |gkl |3          |
>> |x   |gkl |4          |
>> |x   |gkl |5          |
>> |x   |gkl |6          |
>> |x   |gkl |7          |
>> |x   |gkl |8          |
>> |x   |gkl |9          |
>> |x   |gkl |10         |
>> |y   |nmb |0          |
>> |y   |nmb |1          |
>> |y   |nmb |2          |
>> |y   |nmb |3          |
>> |y   |nmb |4          |
>> |y   |nmb |5          |
>> |y   |nmb |6          |
>> |y   |nmb |7          |
>> |y   |nmb |8          |
>> |y   |nmb |9          |
>> |y   |nmb |10         |
>> |z   |qwe |0          |
>> |z   |qwe |1          |
>> |z   |qwe |2          |
>> |z   |qwe |3          |
>> |z   |qwe |4          |
>> |z   |qwe |5          |
>> |z   |qwe |6          |
>> |z   |qwe |7          |
>> |z   |qwe |8          |
>> |z   |qwe |9          |
>> |z   |qwe |10         |
>> +----+----+-----------+
>>
>>
>> scala>   //join after elminating data skewness
>>
>> scala>   val df5 = df3.join(df4, df3.col("t1c1")<=>
>> concat(df4.col("t2c1"),lit("_"),df4.col("explodedCol")))
>> df5: org.apache.spark.sql.DataFrame = [t1c1: string, t1c2: string ... 3
>> more fields]
>>
>> scala> df5.show(100,false)
>> +----+----+----+----+-----------+
>> |t1c1|t1c2|t2c1|t2c2|explodedCol|
>> +----+----+----+----+-----------+
>> |x_0 |ef  |x   |gkl |0          |
>> |y_9 |hk  |y   |nmb |9          |
>> |x_3 |ab  |x   |gkl |3          |
>> |x_4 |bc  |x   |gkl |4          |
>> |x_6 |gh  |x   |gkl |6          |
>> |z_7 |jk  |z   |qwe |7          |
>> |x_8 |ce  |x   |gkl |8          |
>> +----+----+----+----+-----------+
>>
>> scala> df5.queryExecution.sparkPlan
>> res14: org.apache.spark.sql.execution.SparkPlan =
>> SortMergeJoin [coalesce(t1c1#32, ), isnull(t1c1#32)],
>> [coalesce(concat(t2c1#21, _, cast(explodedCol#36 as string)), ),
>> isnull(concat(t2c1#21, _, cast(explodedCol#36 as string)))], Inner
>> :- LocalTableScan [t1c1#32, t1c2#6]
>> +- Generate explode([0,1,2,3,4,5,6,7,8,9,10]), [t2c1#21, t2c2#22], false,
>> [explodedCol#36]
>>    +- LocalTableScan [t2c1#21, t2c2#22]
>>
>>
>> scala> df5.drop("t1c1", "explodedCol").show
>> +----+----+----+
>> |t1c2|t2c1|t2c2|
>> +----+----+----+
>> |  ef|   x| gkl|
>> |  hk|   y| nmb|
>> |  ab|   x| gkl|
>> |  bc|   x| gkl|
>> |  gh|   x| gkl|
>> |  jk|   z| qwe|
>> |  ce|   x| gkl|
>> +----+----+----+
>>
>
> Regards
> Vinod
>
> On Sun, Jul 31, 2022 at 1:59 AM ayan guha <guha.a...@gmail.com> wrote:
>
>> One option is create a separate column in table A with salting. Use it as
>> partition key. Use original column for joining.
>>
>> Ayan
>>
>> On Sun, 31 Jul 2022 at 6:45 pm, Jacob Lynn <abebopare...@gmail.com>
>> wrote:
>>
>>> The key is this line from Amit's email (emphasis added):
>>>
>>> > Change the join_col to *all possible values* of the sale.
>>>
>>> The two tables are treated asymmetrically:
>>>
>>> 1. The skewed table gets random salts appended to the join key.
>>> 2. The other table gets all possible salts appended to the join key
>>> (e.g. using a range array literal + explode).
>>>
>>> Thus guarantees that every row in the skewed table will match a row in
>>> the other table. This StackOverflow answer
>>> <https://stackoverflow.com/a/57951114/1892435> gives an example.
>>>
>>> Op zo 31 jul. 2022 om 10:41 schreef Amit Joshi <
>>> mailtojoshia...@gmail.com>:
>>>
>>>> Hi Sid,
>>>>
>>>> I am not sure I understood your question.
>>>> But the keys cannot be different post salting in both the tables, this
>>>> is what i have shown in the explanation.
>>>> You salt Table A and then explode Table B to create all possible values.
>>>>
>>>> In your case, I do not understand, what Table B has x_8/9. It should be
>>>> all possible values which you used to create salt.
>>>>
>>>> I hope you understand.
>>>>
>>>> Thanks
>>>>
>>>>
>>>>
>>>> On Sun, Jul 31, 2022 at 10:02 AM Sid <flinkbyhe...@gmail.com> wrote:
>>>>
>>>>> Hi Amit,
>>>>>
>>>>> Thanks for your reply. However, your answer doesn't seem different
>>>>> from what I have explained.
>>>>>
>>>>> My question is after salting if the keys are different like in my
>>>>> example then post join there would be no results assuming the join type as
>>>>> inner join because even though the keys are segregated in different
>>>>> partitions based on unique keys they are not matching because x_1/x_2
>>>>> !=x_8/x_9
>>>>>
>>>>> How do you ensure that the results are matched?
>>>>>
>>>>> Best,
>>>>> Sid
>>>>>
>>>>> On Sun, Jul 31, 2022 at 1:34 AM Amit Joshi <mailtojoshia...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Sid,
>>>>>>
>>>>>> Salting is normally a technique to add random characters to existing
>>>>>> values.
>>>>>> In big data we can use salting to deal with the skewness.
>>>>>> Salting in join cas be used as :
>>>>>> * Table A-*
>>>>>> Col1, join_col , where join_col values are {x1, x2, x3}
>>>>>> x1
>>>>>> x1
>>>>>> x1
>>>>>> x2
>>>>>> x2
>>>>>> x3
>>>>>>
>>>>>> *Table B-*
>>>>>> join_col, Col3 , where join_col  value are {x1, x2}
>>>>>> x1
>>>>>> x2
>>>>>>
>>>>>> *Problem: *Let say for table A, data is skewed on x1
>>>>>> Now salting goes like this.  *Salt value =2*
>>>>>> For
>>>>>> *table A, *create a new col with values by salting join col
>>>>>> *New_Join_Col*
>>>>>> x1_1
>>>>>> x1_2
>>>>>> x1_1
>>>>>> x2_1
>>>>>> x2_2
>>>>>> x3_1
>>>>>>
>>>>>> For *Table B,*
>>>>>> Change the join_col to all possible values of the sale.
>>>>>> join_col
>>>>>> x1_1
>>>>>> x1_2
>>>>>> x2_1
>>>>>> x2_2
>>>>>>
>>>>>> And then join it like
>>>>>> table1.join(table2, where tableA.new_join_col == tableB. join_col)
>>>>>>
>>>>>> Let me know if you have any questions.
>>>>>>
>>>>>> Regards
>>>>>> Amit Joshi
>>>>>>
>>>>>>
>>>>>> On Sat, Jul 30, 2022 at 7:16 PM Sid <flinkbyhe...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Team,
>>>>>>>
>>>>>>> I was trying to understand the Salting technique for the column
>>>>>>> where there would be a huge load on a single partition because of the 
>>>>>>> same
>>>>>>> keys.
>>>>>>>
>>>>>>> I referred to one youtube video with the below understanding:
>>>>>>>
>>>>>>> So, using the salting technique we can actually change the joining
>>>>>>> column values by appending some random number in a specified range.
>>>>>>>
>>>>>>> So, suppose I have these two values in a partition of two different
>>>>>>> tables:
>>>>>>>
>>>>>>> Table A:
>>>>>>> Partition1:
>>>>>>> x
>>>>>>> .
>>>>>>> .
>>>>>>> .
>>>>>>> x
>>>>>>>
>>>>>>> Table B:
>>>>>>> Partition1:
>>>>>>> x
>>>>>>> .
>>>>>>> .
>>>>>>> .
>>>>>>> x
>>>>>>>
>>>>>>> After Salting it would be something like the below:
>>>>>>>
>>>>>>> Table A:
>>>>>>> Partition1:
>>>>>>> x_1
>>>>>>>
>>>>>>> Partition 2:
>>>>>>> x_2
>>>>>>>
>>>>>>> Table B:
>>>>>>> Partition1:
>>>>>>> x_3
>>>>>>>
>>>>>>> Partition 2:
>>>>>>> x_8
>>>>>>>
>>>>>>> Now, when I inner join these two tables after salting in order to
>>>>>>> avoid data skewness problems, I won't get a match since the keys are
>>>>>>> different after applying salting techniques.
>>>>>>>
>>>>>>> So how does this resolves the data skewness issue or if there is
>>>>>>> some understanding gap?
>>>>>>>
>>>>>>> Could anyone help me in layman's terms?
>>>>>>>
>>>>>>> TIA,
>>>>>>> Sid
>>>>>>>
>>>>>> --
>> Best Regards,
>> Ayan Guha
>>
>

Reply via email to