Hi Divya,

This is not a self-join. d1 and d2 contain totally different rows. They are
derived from the same table. The transformation that are applied to
generate d1 and d2 should be able to disambiguate the labels in the
question.


Best Regards,

Jerry


On Tue, Mar 29, 2016 at 2:43 AM, Divya Gehlot <divya.htco...@gmail.com>
wrote:

>
> def join(right: DataFrame, joinExprs: Column, joinType: String): DataFrame
> = {
> // Note that in this function, we introduce a hack in the case of
> self-join to automatically
> // resolve ambiguous join conditions into ones that might make sense
> [SPARK-6231].
> // Consider this case: df.join(df, df("key") === df("key"))
> // Since df("key") === df("key") is a trivially true condition, this
> actually becomes a
> // cartesian join. However, most likely users expect to perform a self
> join using "key".
> // With that assumption, this hack turns the trivially true condition into
> equality on join
> // keys that are resolved to both sides.
> // Trigger analysis so in the case of self-join, the analyzer will clone
> the plan.
> // After the cloning, left and right side will have distinct expression
> ids.
>
> On 29 March 2016 at 14:33, Jerry Lam <chiling...@gmail.com> wrote:
>
>> Hi guys,
>>
>> I have another example to illustrate the issue. I think the problem is
>> pretty nasty.
>>
>> val base = sc.parallelize(( 0 to 49).zip( 0 to 49) ++ (30 to 79).zip(50
>> to 99)).toDF("id", "label")
>> val d1 = base.where($"label" < 60)
>> val d2 = base.where($"label" === 60)
>> d1.join(d2, "id").show
>> +---+-----+-----+
>> | id|label|label|
>> +---+-----+-----+
>> | 40|   40|   60|
>> +---+-----+-----+
>>
>> d1.join(d2, "id").select(d1("label")).show
>> +-----+
>> |label|
>> +-----+
>> |   40|
>> +-----+
>> (expected answer: 40, right!)
>>
>> d1.join(d2, "id").select(d2("label")).show
>> +-----+
>> |label|
>> +-----+
>> |   40|
>> +-----+
>> (expected answer: 60, wrong!)
>>
>> d1.join(d2, "id").select(d2("label")).explain
>> == Physical Plan ==
>> TungstenProject [label#15]
>>  SortMergeJoin [id#14], [id#30]
>>   TungstenSort [id#14 ASC], false, 0
>>    TungstenExchange hashpartitioning(id#14)
>>     TungstenProject [_1#12 AS id#14,_2#13 AS label#15]
>>      Filter (_2#13 < 60)
>>       Scan PhysicalRDD[_1#12,_2#13]
>>   TungstenSort [id#30 ASC], false, 0
>>    TungstenExchange hashpartitioning(id#30)
>>     TungstenProject [_1#12 AS id#30]
>>      Filter (_2#13 = 60)
>>       Scan PhysicalRDD[_1#12,_2#13]
>>
>> Again, this is just a tip of the iceberg. I have spent hours to find out
>> this weird behaviour.
>>
>> Best Regards,
>>
>> Jerry
>>
>>
>> Best Regards,
>>
>> Jerry
>>
>> On Tue, Mar 29, 2016 at 2:01 AM, Jerry Lam <chiling...@gmail.com> wrote:
>>
>>> Hi Sunitha,
>>>
>>> Thank you for the reference Jira. It looks like this is the bug I'm
>>> hitting. Most of the bugs related to this seems to associate with
>>> dataframes derived from the one dataframe (base in this case). In SQL, this
>>> is a self-join and dropping d2.label should not affect d1.label. There are
>>> other bugs I found these three days that are associated with this type of
>>> joins. In one case, if I don't drop the duplicate column BEFORE the join,
>>> spark has preferences on the columns from d2 dataframe. I will see if I can
>>> replicate in a small program like above.
>>>
>>> Best Regards,
>>>
>>> Jerry
>>>
>>>
>>> On Mon, Mar 28, 2016 at 6:27 PM, Sunitha Kambhampati <
>>> skambha...@gmail.com> wrote:
>>>
>>>> Hi Jerry,
>>>>
>>>> I think you are running into an issue similar to SPARK-14040
>>>> https://issues.apache.org/jira/browse/SPARK-14040
>>>>
>>>> One way to resolve it is to use alias.
>>>>
>>>> Here is an example that I tried on trunk and I do not see any
>>>> exceptions.
>>>>
>>>> val d1=base.where($"label" === 0) as("d1")
>>>> val d2=base.where($"label" === 1).as("d2")
>>>>
>>>> d1.join(d2, $"d1.id" === $"d2.id", 
>>>> "left_outer").drop($"d2.label").select($"d1.label")
>>>>
>>>>
>>>> Hope this helps some.
>>>>
>>>> Best regards,
>>>> Sunitha.
>>>>
>>>> On Mar 28, 2016, at 2:34 PM, Jerry Lam <chiling...@gmail.com> wrote:
>>>>
>>>> Hi spark users and developers,
>>>>
>>>> I'm using spark 1.5.1 (I have no choice because this is what we used).
>>>> I ran into some very unexpected behaviour when I did some join operations
>>>> lately. I cannot post my actual code here and the following code is not for
>>>> practical reasons but it should demonstrate the issue.
>>>>
>>>> val base = sc.parallelize(( 0 to 49).map(i =>(i,0)) ++ (50 to
>>>> 99).map((_,1))).toDF("id", "label")
>>>> val d1=base.where($"label" === 0)
>>>> val d2=base.where($"label" === 1)
>>>> d1.join(d2, d1("id") === d2("id"),
>>>> "left_outer").drop(d2("label")).select(d1("label"))
>>>>
>>>>
>>>> The above code will throw an exception saying the column label is not
>>>> found. Do you have a reason for throwing an exception when the column has
>>>> not been dropped for d1("label")?
>>>>
>>>> Best Regards,
>>>>
>>>> Jerry
>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to