Re: [Spark SQL] Unexpected Behaviour

2016-03-29 Thread Jerry Lam
Hi guys,

Another point is that if this is unsupported shouldn't it throw an
exception instead of giving the wrong answer? I mean if
d1.join(d2, "id").select(d2("label")) should not work at all, the proper
behaviour is to throw the analysis exception. It now returns a wrong answer
though.

As I said, this is just a tip of iceberg. I have experience worsen than
this. For example, you might think renaming fields will work but in some
cases, it still returns wrong results.

Best Regards,

Jerry

On Tue, Mar 29, 2016 at 7:38 AM, Jerry Lam  wrote:

> Hi Divya,
>
> This is not a self-join. d1 and d2 contain totally different rows. They
> are derived from the same table. The transformation that are applied to
> generate d1 and d2 should be able to disambiguate the labels in the
> question.
>
>
> Best Regards,
>
> Jerry
>
>
> On Tue, Mar 29, 2016 at 2:43 AM, Divya Gehlot 
> wrote:
>
>>
>> def join(right: DataFrame, joinExprs: Column, joinType: String):
>> DataFrame = {
>> // Note that in this function, we introduce a hack in the case of
>> self-join to automatically
>> // resolve ambiguous join conditions into ones that might make sense
>> [SPARK-6231].
>> // Consider this case: df.join(df, df("key") === df("key"))
>> // Since df("key") === df("key") is a trivially true condition, this
>> actually becomes a
>> // cartesian join. However, most likely users expect to perform a self
>> join using "key".
>> // With that assumption, this hack turns the trivially true condition
>> into equality on join
>> // keys that are resolved to both sides.
>> // Trigger analysis so in the case of self-join, the analyzer will clone
>> the plan.
>> // After the cloning, left and right side will have distinct expression
>> ids.
>>
>> On 29 March 2016 at 14:33, Jerry Lam  wrote:
>>
>>> Hi guys,
>>>
>>> I have another example to illustrate the issue. I think the problem is
>>> pretty nasty.
>>>
>>> val base = sc.parallelize(( 0 to 49).zip( 0 to 49) ++ (30 to 79).zip(50
>>> to 99)).toDF("id", "label")
>>> val d1 = base.where($"label" < 60)
>>> val d2 = base.where($"label" === 60)
>>> d1.join(d2, "id").show
>>> +---+-+-+
>>> | id|label|label|
>>> +---+-+-+
>>> | 40|   40|   60|
>>> +---+-+-+
>>>
>>> d1.join(d2, "id").select(d1("label")).show
>>> +-+
>>> |label|
>>> +-+
>>> |   40|
>>> +-+
>>> (expected answer: 40, right!)
>>>
>>> d1.join(d2, "id").select(d2("label")).show
>>> +-+
>>> |label|
>>> +-+
>>> |   40|
>>> +-+
>>> (expected answer: 60, wrong!)
>>>
>>> d1.join(d2, "id").select(d2("label")).explain
>>> == Physical Plan ==
>>> TungstenProject [label#15]
>>>  SortMergeJoin [id#14], [id#30]
>>>   TungstenSort [id#14 ASC], false, 0
>>>TungstenExchange hashpartitioning(id#14)
>>> TungstenProject [_1#12 AS id#14,_2#13 AS label#15]
>>>  Filter (_2#13 < 60)
>>>   Scan PhysicalRDD[_1#12,_2#13]
>>>   TungstenSort [id#30 ASC], false, 0
>>>TungstenExchange hashpartitioning(id#30)
>>> TungstenProject [_1#12 AS id#30]
>>>  Filter (_2#13 = 60)
>>>   Scan PhysicalRDD[_1#12,_2#13]
>>>
>>> Again, this is just a tip of the iceberg. I have spent hours to find out
>>> this weird behaviour.
>>>
>>> Best Regards,
>>>
>>> Jerry
>>>
>>>
>>> Best Regards,
>>>
>>> Jerry
>>>
>>> On Tue, Mar 29, 2016 at 2:01 AM, Jerry Lam  wrote:
>>>
 Hi Sunitha,

 Thank you for the reference Jira. It looks like this is the bug I'm
 hitting. Most of the bugs related to this seems to associate with
 dataframes derived from the one dataframe (base in this case). In SQL, this
 is a self-join and dropping d2.label should not affect d1.label. There are
 other bugs I found these three days that are associated with this type of
 joins. In one case, if I don't drop the duplicate column BEFORE the join,
 spark has preferences on the columns from d2 dataframe. I will see if I can
 replicate in a small program like above.

 Best Regards,

 Jerry


 On Mon, Mar 28, 2016 at 6:27 PM, Sunitha Kambhampati <
 skambha...@gmail.com> wrote:

> Hi Jerry,
>
> I think you are running into an issue similar to SPARK-14040
> https://issues.apache.org/jira/browse/SPARK-14040
>
> One way to resolve it is to use alias.
>
> Here is an example that I tried on trunk and I do not see any
> exceptions.
>
> val d1=base.where($"label" === 0) as("d1")
> val d2=base.where($"label" === 1).as("d2")
>
> d1.join(d2, $"d1.id" === $"d2.id", 
> "left_outer").drop($"d2.label").select($"d1.label")
>
>
> Hope this helps some.
>
> Best regards,
> Sunitha.
>
> On Mar 28, 2016, at 2:34 PM, Jerry Lam  wrote:
>
> Hi spark users and developers,
>
> I'm using spark 1.5.1 (I have no choice because this is what we used).
> I ran into some very unexpected behaviour when I did some join operations
> lately. I cannot post my actual code here and the following code is 

Re: [Spark SQL] Unexpected Behaviour

2016-03-29 Thread Jerry Lam
Hi Divya,

This is not a self-join. d1 and d2 contain totally different rows. They are
derived from the same table. The transformation that are applied to
generate d1 and d2 should be able to disambiguate the labels in the
question.


Best Regards,

Jerry


On Tue, Mar 29, 2016 at 2:43 AM, Divya Gehlot 
wrote:

>
> def join(right: DataFrame, joinExprs: Column, joinType: String): DataFrame
> = {
> // Note that in this function, we introduce a hack in the case of
> self-join to automatically
> // resolve ambiguous join conditions into ones that might make sense
> [SPARK-6231].
> // Consider this case: df.join(df, df("key") === df("key"))
> // Since df("key") === df("key") is a trivially true condition, this
> actually becomes a
> // cartesian join. However, most likely users expect to perform a self
> join using "key".
> // With that assumption, this hack turns the trivially true condition into
> equality on join
> // keys that are resolved to both sides.
> // Trigger analysis so in the case of self-join, the analyzer will clone
> the plan.
> // After the cloning, left and right side will have distinct expression
> ids.
>
> On 29 March 2016 at 14:33, Jerry Lam  wrote:
>
>> Hi guys,
>>
>> I have another example to illustrate the issue. I think the problem is
>> pretty nasty.
>>
>> val base = sc.parallelize(( 0 to 49).zip( 0 to 49) ++ (30 to 79).zip(50
>> to 99)).toDF("id", "label")
>> val d1 = base.where($"label" < 60)
>> val d2 = base.where($"label" === 60)
>> d1.join(d2, "id").show
>> +---+-+-+
>> | id|label|label|
>> +---+-+-+
>> | 40|   40|   60|
>> +---+-+-+
>>
>> d1.join(d2, "id").select(d1("label")).show
>> +-+
>> |label|
>> +-+
>> |   40|
>> +-+
>> (expected answer: 40, right!)
>>
>> d1.join(d2, "id").select(d2("label")).show
>> +-+
>> |label|
>> +-+
>> |   40|
>> +-+
>> (expected answer: 60, wrong!)
>>
>> d1.join(d2, "id").select(d2("label")).explain
>> == Physical Plan ==
>> TungstenProject [label#15]
>>  SortMergeJoin [id#14], [id#30]
>>   TungstenSort [id#14 ASC], false, 0
>>TungstenExchange hashpartitioning(id#14)
>> TungstenProject [_1#12 AS id#14,_2#13 AS label#15]
>>  Filter (_2#13 < 60)
>>   Scan PhysicalRDD[_1#12,_2#13]
>>   TungstenSort [id#30 ASC], false, 0
>>TungstenExchange hashpartitioning(id#30)
>> TungstenProject [_1#12 AS id#30]
>>  Filter (_2#13 = 60)
>>   Scan PhysicalRDD[_1#12,_2#13]
>>
>> Again, this is just a tip of the iceberg. I have spent hours to find out
>> this weird behaviour.
>>
>> Best Regards,
>>
>> Jerry
>>
>>
>> Best Regards,
>>
>> Jerry
>>
>> On Tue, Mar 29, 2016 at 2:01 AM, Jerry Lam  wrote:
>>
>>> Hi Sunitha,
>>>
>>> Thank you for the reference Jira. It looks like this is the bug I'm
>>> hitting. Most of the bugs related to this seems to associate with
>>> dataframes derived from the one dataframe (base in this case). In SQL, this
>>> is a self-join and dropping d2.label should not affect d1.label. There are
>>> other bugs I found these three days that are associated with this type of
>>> joins. In one case, if I don't drop the duplicate column BEFORE the join,
>>> spark has preferences on the columns from d2 dataframe. I will see if I can
>>> replicate in a small program like above.
>>>
>>> Best Regards,
>>>
>>> Jerry
>>>
>>>
>>> On Mon, Mar 28, 2016 at 6:27 PM, Sunitha Kambhampati <
>>> skambha...@gmail.com> wrote:
>>>
 Hi Jerry,

 I think you are running into an issue similar to SPARK-14040
 https://issues.apache.org/jira/browse/SPARK-14040

 One way to resolve it is to use alias.

 Here is an example that I tried on trunk and I do not see any
 exceptions.

 val d1=base.where($"label" === 0) as("d1")
 val d2=base.where($"label" === 1).as("d2")

 d1.join(d2, $"d1.id" === $"d2.id", 
 "left_outer").drop($"d2.label").select($"d1.label")


 Hope this helps some.

 Best regards,
 Sunitha.

 On Mar 28, 2016, at 2:34 PM, Jerry Lam  wrote:

 Hi spark users and developers,

 I'm using spark 1.5.1 (I have no choice because this is what we used).
 I ran into some very unexpected behaviour when I did some join operations
 lately. I cannot post my actual code here and the following code is not for
 practical reasons but it should demonstrate the issue.

 val base = sc.parallelize(( 0 to 49).map(i =>(i,0)) ++ (50 to
 99).map((_,1))).toDF("id", "label")
 val d1=base.where($"label" === 0)
 val d2=base.where($"label" === 1)
 d1.join(d2, d1("id") === d2("id"),
 "left_outer").drop(d2("label")).select(d1("label"))


 The above code will throw an exception saying the column label is not
 found. Do you have a reason for throwing an exception when the column has
 not been dropped for d1("label")?

 Best Regards,

 Jerry



>>>
>>
>


Re: [Spark SQL] Unexpected Behaviour

2016-03-28 Thread Jerry Lam
Hi guys,

I have another example to illustrate the issue. I think the problem is
pretty nasty.

val base = sc.parallelize(( 0 to 49).zip( 0 to 49) ++ (30 to 79).zip(50 to
99)).toDF("id", "label")
val d1 = base.where($"label" < 60)
val d2 = base.where($"label" === 60)
d1.join(d2, "id").show
+---+-+-+
| id|label|label|
+---+-+-+
| 40|   40|   60|
+---+-+-+

d1.join(d2, "id").select(d1("label")).show
+-+
|label|
+-+
|   40|
+-+
(expected answer: 40, right!)

d1.join(d2, "id").select(d2("label")).show
+-+
|label|
+-+
|   40|
+-+
(expected answer: 60, wrong!)

d1.join(d2, "id").select(d2("label")).explain
== Physical Plan ==
TungstenProject [label#15]
 SortMergeJoin [id#14], [id#30]
  TungstenSort [id#14 ASC], false, 0
   TungstenExchange hashpartitioning(id#14)
TungstenProject [_1#12 AS id#14,_2#13 AS label#15]
 Filter (_2#13 < 60)
  Scan PhysicalRDD[_1#12,_2#13]
  TungstenSort [id#30 ASC], false, 0
   TungstenExchange hashpartitioning(id#30)
TungstenProject [_1#12 AS id#30]
 Filter (_2#13 = 60)
  Scan PhysicalRDD[_1#12,_2#13]

Again, this is just a tip of the iceberg. I have spent hours to find out
this weird behaviour.

Best Regards,

Jerry


Best Regards,

Jerry

On Tue, Mar 29, 2016 at 2:01 AM, Jerry Lam  wrote:

> Hi Sunitha,
>
> Thank you for the reference Jira. It looks like this is the bug I'm
> hitting. Most of the bugs related to this seems to associate with
> dataframes derived from the one dataframe (base in this case). In SQL, this
> is a self-join and dropping d2.label should not affect d1.label. There are
> other bugs I found these three days that are associated with this type of
> joins. In one case, if I don't drop the duplicate column BEFORE the join,
> spark has preferences on the columns from d2 dataframe. I will see if I can
> replicate in a small program like above.
>
> Best Regards,
>
> Jerry
>
>
> On Mon, Mar 28, 2016 at 6:27 PM, Sunitha Kambhampati  > wrote:
>
>> Hi Jerry,
>>
>> I think you are running into an issue similar to SPARK-14040
>> https://issues.apache.org/jira/browse/SPARK-14040
>>
>> One way to resolve it is to use alias.
>>
>> Here is an example that I tried on trunk and I do not see any exceptions.
>>
>>
>> val d1=base.where($"label" === 0) as("d1")
>> val d2=base.where($"label" === 1).as("d2")
>>
>> d1.join(d2, $"d1.id" === $"d2.id", 
>> "left_outer").drop($"d2.label").select($"d1.label")
>>
>>
>> Hope this helps some.
>>
>> Best regards,
>> Sunitha.
>>
>> On Mar 28, 2016, at 2:34 PM, Jerry Lam  wrote:
>>
>> Hi spark users and developers,
>>
>> I'm using spark 1.5.1 (I have no choice because this is what we used). I
>> ran into some very unexpected behaviour when I did some join operations
>> lately. I cannot post my actual code here and the following code is not for
>> practical reasons but it should demonstrate the issue.
>>
>> val base = sc.parallelize(( 0 to 49).map(i =>(i,0)) ++ (50 to
>> 99).map((_,1))).toDF("id", "label")
>> val d1=base.where($"label" === 0)
>> val d2=base.where($"label" === 1)
>> d1.join(d2, d1("id") === d2("id"),
>> "left_outer").drop(d2("label")).select(d1("label"))
>>
>>
>> The above code will throw an exception saying the column label is not
>> found. Do you have a reason for throwing an exception when the column has
>> not been dropped for d1("label")?
>>
>> Best Regards,
>>
>> Jerry
>>
>>
>>
>


Re: [Spark SQL] Unexpected Behaviour

2016-03-28 Thread Jerry Lam
Hi Sunitha,

Thank you for the reference Jira. It looks like this is the bug I'm
hitting. Most of the bugs related to this seems to associate with
dataframes derived from the one dataframe (base in this case). In SQL, this
is a self-join and dropping d2.label should not affect d1.label. There are
other bugs I found these three days that are associated with this type of
joins. In one case, if I don't drop the duplicate column BEFORE the join,
spark has preferences on the columns from d2 dataframe. I will see if I can
replicate in a small program like above.

Best Regards,

Jerry


On Mon, Mar 28, 2016 at 6:27 PM, Sunitha Kambhampati 
wrote:

> Hi Jerry,
>
> I think you are running into an issue similar to SPARK-14040
> https://issues.apache.org/jira/browse/SPARK-14040
>
> One way to resolve it is to use alias.
>
> Here is an example that I tried on trunk and I do not see any exceptions.
>
> val d1=base.where($"label" === 0) as("d1")
> val d2=base.where($"label" === 1).as("d2")
>
> d1.join(d2, $"d1.id" === $"d2.id", 
> "left_outer").drop($"d2.label").select($"d1.label")
>
>
> Hope this helps some.
>
> Best regards,
> Sunitha.
>
> On Mar 28, 2016, at 2:34 PM, Jerry Lam  wrote:
>
> Hi spark users and developers,
>
> I'm using spark 1.5.1 (I have no choice because this is what we used). I
> ran into some very unexpected behaviour when I did some join operations
> lately. I cannot post my actual code here and the following code is not for
> practical reasons but it should demonstrate the issue.
>
> val base = sc.parallelize(( 0 to 49).map(i =>(i,0)) ++ (50 to
> 99).map((_,1))).toDF("id", "label")
> val d1=base.where($"label" === 0)
> val d2=base.where($"label" === 1)
> d1.join(d2, d1("id") === d2("id"),
> "left_outer").drop(d2("label")).select(d1("label"))
>
>
> The above code will throw an exception saying the column label is not
> found. Do you have a reason for throwing an exception when the column has
> not been dropped for d1("label")?
>
> Best Regards,
>
> Jerry
>
>
>


Re: [Spark SQL] Unexpected Behaviour

2016-03-28 Thread Sunitha Kambhampati
Hi Jerry, 

I think you are running into an issue similar to SPARK-14040
https://issues.apache.org/jira/browse/SPARK-14040 
  

One way to resolve it is to use alias.  

Here is an example that I tried on trunk and I do not see any exceptions.  

val d1=base.where($"label" === 0) as("d1")
val d2=base.where($"label" === 1).as("d2")

d1.join(d2, $"d1.id" === $"d2.id", 
"left_outer").drop($"d2.label").select($"d1.label")

Hope this helps some. 

Best regards,
Sunitha.

> On Mar 28, 2016, at 2:34 PM, Jerry Lam  wrote:
> 
> Hi spark users and developers,
> 
> I'm using spark 1.5.1 (I have no choice because this is what we used). I ran 
> into some very unexpected behaviour when I did some join operations lately. I 
> cannot post my actual code here and the following code is not for practical 
> reasons but it should demonstrate the issue.
> 
> val base = sc.parallelize(( 0 to 49).map(i =>(i,0)) ++ (50 to 
> 99).map((_,1))).toDF("id", "label")
> val d1=base.where($"label" === 0)
> val d2=base.where($"label" === 1)
> d1.join(d2, d1("id") === d2("id"), 
> "left_outer").drop(d2("label")).select(d1("label"))
> 
> 
> The above code will throw an exception saying the column label is not found. 
> Do you have a reason for throwing an exception when the column has not been 
> dropped for d1("label")?
> 
> Best Regards,
> 
> Jerry 



Re: [Spark SQL] Unexpected Behaviour

2016-03-28 Thread Alexander Krasnukhin
You drop label column and later you try to select it. It won't find it, indeed.

--
Alexander
aka Six-Hat-Thinker

> On 28 Mar 2016, at 23:34, Jerry Lam  wrote:
> 
> Hi spark users and developers,
> 
> I'm using spark 1.5.1 (I have no choice because this is what we used). I ran 
> into some very unexpected behaviour when I did some join operations lately. I 
> cannot post my actual code here and the following code is not for practical 
> reasons but it should demonstrate the issue.
> 
> val base = sc.parallelize(( 0 to 49).map(i =>(i,0)) ++ (50 to 
> 99).map((_,1))).toDF("id", "label")
> val d1=base.where($"label" === 0)
> val d2=base.where($"label" === 1)
> d1.join(d2, d1("id") === d2("id"), 
> "left_outer").drop(d2("label")).select(d1("label"))
> 
> 
> The above code will throw an exception saying the column label is not found. 
> Do you have a reason for throwing an exception when the column has not been 
> dropped for d1("label")?
> 
> Best Regards,
> 
> Jerry 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: [Spark SQL] Unexpected Behaviour

2016-03-28 Thread Mich Talebzadeh
Hi Jerry

What do you expect the outcome to be?

This is Spark 1.6.1

I see this without dropping d2!


scala> d1.join(d2, d1("id") === d2("id"),
"left_outer").select(d1("label")).collect
res15: Array[org.apache.spark.sql.Row] = Array([0], [0], [0], [0], [0],
[0], [0], [0], [0], [0], [0], [0], [0], [0], [0], [0], [0], [0], [0], [0],
[0], [0], [0], [0], [0], [0], [0], [0], [0], [0], [0], [0], [0], [0], [0],
[0], [0], [0], [0], [0], [0], [0], [0], [0], [0], [0], [0], [0], [0], [0])



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 28 March 2016 at 22:34, Jerry Lam  wrote:

> Hi spark users and developers,
>
> I'm using spark 1.5.1 (I have no choice because this is what we used). I
> ran into some very unexpected behaviour when I did some join operations
> lately. I cannot post my actual code here and the following code is not for
> practical reasons but it should demonstrate the issue.
>
> val base = sc.parallelize(( 0 to 49).map(i =>(i,0)) ++ (50 to
> 99).map((_,1))).toDF("id", "label")
> val d1=base.where($"label" === 0)
> val d2=base.where($"label" === 1)
> d1.join(d2, d1("id") === d2("id"),
> "left_outer").drop(d2("label")).select(d1("label"))
>
>
> The above code will throw an exception saying the column label is not
> found. Do you have a reason for throwing an exception when the column has
> not been dropped for d1("label")?
>
> Best Regards,
>
> Jerry
>


[Spark SQL] Unexpected Behaviour

2016-03-28 Thread Jerry Lam
Hi spark users and developers,

I'm using spark 1.5.1 (I have no choice because this is what we used). I
ran into some very unexpected behaviour when I did some join operations
lately. I cannot post my actual code here and the following code is not for
practical reasons but it should demonstrate the issue.

val base = sc.parallelize(( 0 to 49).map(i =>(i,0)) ++ (50 to
99).map((_,1))).toDF("id", "label")
val d1=base.where($"label" === 0)
val d2=base.where($"label" === 1)
d1.join(d2, d1("id") === d2("id"),
"left_outer").drop(d2("label")).select(d1("label"))


The above code will throw an exception saying the column label is not
found. Do you have a reason for throwing an exception when the column has
not been dropped for d1("label")?

Best Regards,

Jerry