Re: Joining a RDD to a Dataframe

2016-05-13 Thread Xinh Huynh
Hi Cyril,

In the case where there are no documents, it looks like there is a typo in
"addresses" (check the number of "d"s):

| scala> df.select(explode(df("addresses.id")).as("aid"), df("id"))  <==
addresses
| org.apache.spark.sql.AnalysisException: Cannot resolve column name "id"
among (adresses); <== adresses

As for your question about joining on a nested array column, I don't know
if it is possible. Is it supported in normal SQL? Exploding seems the right
way because then there is only one join key per row, as opposed to the
array, which could have multiple join keys inside the array.

Xinh

On Thu, May 12, 2016 at 7:32 PM, Cyril Scetbon 
wrote:

> Nobody has the answer ?
>
> Another thing I've seen is that if I have no documents at all :
>
> scala> df.select(explode(df("addresses.id")).as("aid")).collect
> res27: Array[org.apache.spark.sql.Row] = Array()
>
> Then
>
> scala> df.select(explode(df("addresses.id")).as("aid"), df("id"))
> org.apache.spark.sql.AnalysisException: Cannot resolve column name "id"
> among (adresses);
> at
> org.apache.spark.sql.DataFrame$$anonfun$resolve$1.apply(DataFrame.scala:152)
> at
> org.apache.spark.sql.DataFrame$$anonfun$resolve$1.apply(DataFrame.scala:152)
>
> Is there a better way to query nested objects and to join between a DF
> containing nested objects and another regular data frame (yes it's the
> current case)
>
> On May 9, 2016, at 00:42, Cyril Scetbon  wrote:
>
> Hi Ashish,
>
> The issue is not related to converting a RDD to a DF. I did it. I was just
> asking if I should do it differently.
>
> The issue regards the exception when using array_contains with a
> sql.Column instead of a value.
>
> I found another way to do it using explode as follows :
>
> df.select(explode(df("addresses.id")).as("aid"), df("id")).join(df_input,
> $"aid" === df_input("id")).select(df("id"))
>
> However, I'm wondering if it does almost the same or if the query is
> different and worst in term of performance.
>
> If someone can comment on it and maybe give me advices.
>
> Thank you.
>
> On May 8, 2016, at 22:12, Ashish Dubey  wrote:
>
> Is there any reason you dont want to convert this - i dont think join b/w
> RDD and DF is supported.
>
> On Sat, May 7, 2016 at 11:41 PM, Cyril Scetbon 
> wrote:
>
>> Hi,
>>
>> I have a RDD built during a spark streaming job and I'd like to join it
>> to a DataFrame (E/S input) to enrich it.
>> It seems that I can't join the RDD and the DF without converting first
>> the RDD to a DF (Tell me if I'm wrong). Here are the schemas of both DF :
>>
>> scala> df
>> res32: org.apache.spark.sql.DataFrame = [f1: string, addresses:
>> array>, id: string]
>>
>> scala> df_input
>> res33: org.apache.spark.sql.DataFrame = [id: string]
>>
>> scala> df_input.collect
>> res34: Array[org.apache.spark.sql.Row] = Array([idaddress2],
>> [idaddress12])
>>
>> I can get ids I want if I know the value to look for in addresses.id
>> using :
>>
>> scala> df.filter(array_contains(df("addresses.id"),
>> "idaddress2")).select("id").collect
>> res35: Array[org.apache.spark.sql.Row] = Array([], [YY])
>>
>> However when I try to join df_input and df and to use the previous filter
>> as the join condition I get an exception :
>>
>> scala> df.join(df_input, array_contains(df("adresses.id"),
>> df_input("id")))
>> java.lang.RuntimeException: Unsupported literal type class
>> org.apache.spark.sql.Column id
>> at
>> org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:50)
>> at
>> org.apache.spark.sql.functions$.array_contains(functions.scala:2452)
>> ...
>>
>> It seems that array_contains only supports static arguments and does not
>> replace a sql.Column by its value.
>>
>> What's the best way to achieve what I want to do ? (Also speaking in term
>> of performance)
>>
>> Thanks
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>
>


Re: Joining a RDD to a Dataframe

2016-05-12 Thread Cyril Scetbon
Nobody has the answer ? 

Another thing I've seen is that if I have no documents at all : 

scala> df.select(explode(df("addresses.id")).as("aid")).collect
res27: Array[org.apache.spark.sql.Row] = Array()

Then

scala> df.select(explode(df("addresses.id")).as("aid"), df("id"))
org.apache.spark.sql.AnalysisException: Cannot resolve column name "id" among 
(adresses);
at 
org.apache.spark.sql.DataFrame$$anonfun$resolve$1.apply(DataFrame.scala:152)
at 
org.apache.spark.sql.DataFrame$$anonfun$resolve$1.apply(DataFrame.scala:152)

Is there a better way to query nested objects and to join between a DF 
containing nested objects and another regular data frame (yes it's the current 
case) 

> On May 9, 2016, at 00:42, Cyril Scetbon  wrote:
> 
> Hi Ashish,
> 
> The issue is not related to converting a RDD to a DF. I did it. I was just 
> asking if I should do it differently.
> 
> The issue regards the exception when using array_contains with a sql.Column 
> instead of a value.
> 
> I found another way to do it using explode as follows : 
> 
> df.select(explode(df("addresses.id")).as("aid"), df("id")).join(df_input, 
> $"aid" === df_input("id")).select(df("id"))
> 
> However, I'm wondering if it does almost the same or if the query is 
> different and worst in term of performance.
> 
> If someone can comment on it and maybe give me advices.
> 
> Thank you.
> 
>> On May 8, 2016, at 22:12, Ashish Dubey > > wrote:
>> 
>> Is there any reason you dont want to convert this - i dont think join b/w 
>> RDD and DF is supported.
>> 
>> On Sat, May 7, 2016 at 11:41 PM, Cyril Scetbon > > wrote:
>> Hi,
>> 
>> I have a RDD built during a spark streaming job and I'd like to join it to a 
>> DataFrame (E/S input) to enrich it.
>> It seems that I can't join the RDD and the DF without converting first the 
>> RDD to a DF (Tell me if I'm wrong). Here are the schemas of both DF :
>> 
>> scala> df
>> res32: org.apache.spark.sql.DataFrame = [f1: string, addresses: 
>> array>, id: string]
>> 
>> scala> df_input
>> res33: org.apache.spark.sql.DataFrame = [id: string]
>> 
>> scala> df_input.collect
>> res34: Array[org.apache.spark.sql.Row] = Array([idaddress2], [idaddress12])
>> 
>> I can get ids I want if I know the value to look for in addresses.id 
>>  using :
>> 
>> scala> df.filter(array_contains(df("addresses.id "), 
>> "idaddress2")).select("id").collect
>> res35: Array[org.apache.spark.sql.Row] = Array([], [YY])
>> 
>> However when I try to join df_input and df and to use the previous filter as 
>> the join condition I get an exception :
>> 
>> scala> df.join(df_input, array_contains(df("adresses.id 
>> "), df_input("id")))
>> java.lang.RuntimeException: Unsupported literal type class 
>> org.apache.spark.sql.Column id
>> at 
>> org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:50)
>> at 
>> org.apache.spark.sql.functions$.array_contains(functions.scala:2452)
>> ...
>> 
>> It seems that array_contains only supports static arguments and does not 
>> replace a sql.Column by its value.
>> 
>> What's the best way to achieve what I want to do ? (Also speaking in term of 
>> performance)
>> 
>> Thanks
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
>> 
>> For additional commands, e-mail: user-h...@spark.apache.org 
>> 
>> 
>> 
> 



Re: Joining a RDD to a Dataframe

2016-05-08 Thread Cyril Scetbon
Hi Ashish,

The issue is not related to converting a RDD to a DF. I did it. I was just 
asking if I should do it differently.

The issue regards the exception when using array_contains with a sql.Column 
instead of a value.

I found another way to do it using explode as follows : 

df.select(explode(df("addresses.id")).as("aid"), df("id")).join(df_input, 
$"aid" === df_input("id")).select(df("id"))

However, I'm wondering if it does almost the same or if the query is different 
and worst in term of performance.

If someone can comment on it and maybe give me advices.

Thank you.

> On May 8, 2016, at 22:12, Ashish Dubey  wrote:
> 
> Is there any reason you dont want to convert this - i dont think join b/w RDD 
> and DF is supported.
> 
> On Sat, May 7, 2016 at 11:41 PM, Cyril Scetbon  > wrote:
> Hi,
> 
> I have a RDD built during a spark streaming job and I'd like to join it to a 
> DataFrame (E/S input) to enrich it.
> It seems that I can't join the RDD and the DF without converting first the 
> RDD to a DF (Tell me if I'm wrong). Here are the schemas of both DF :
> 
> scala> df
> res32: org.apache.spark.sql.DataFrame = [f1: string, addresses: 
> array>, id: string]
> 
> scala> df_input
> res33: org.apache.spark.sql.DataFrame = [id: string]
> 
> scala> df_input.collect
> res34: Array[org.apache.spark.sql.Row] = Array([idaddress2], [idaddress12])
> 
> I can get ids I want if I know the value to look for in addresses.id 
>  using :
> 
> scala> df.filter(array_contains(df("addresses.id "), 
> "idaddress2")).select("id").collect
> res35: Array[org.apache.spark.sql.Row] = Array([], [YY])
> 
> However when I try to join df_input and df and to use the previous filter as 
> the join condition I get an exception :
> 
> scala> df.join(df_input, array_contains(df("adresses.id 
> "), df_input("id")))
> java.lang.RuntimeException: Unsupported literal type class 
> org.apache.spark.sql.Column id
> at 
> org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:50)
> at 
> org.apache.spark.sql.functions$.array_contains(functions.scala:2452)
> ...
> 
> It seems that array_contains only supports static arguments and does not 
> replace a sql.Column by its value.
> 
> What's the best way to achieve what I want to do ? (Also speaking in term of 
> performance)
> 
> Thanks
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> 
> For additional commands, e-mail: user-h...@spark.apache.org 
> 
> 
> 



Re: Joining a RDD to a Dataframe

2016-05-08 Thread Ashish Dubey
Is there any reason you dont want to convert this - i dont think join b/w
RDD and DF is supported.

On Sat, May 7, 2016 at 11:41 PM, Cyril Scetbon 
wrote:

> Hi,
>
> I have a RDD built during a spark streaming job and I'd like to join it to
> a DataFrame (E/S input) to enrich it.
> It seems that I can't join the RDD and the DF without converting first the
> RDD to a DF (Tell me if I'm wrong). Here are the schemas of both DF :
>
> scala> df
> res32: org.apache.spark.sql.DataFrame = [f1: string, addresses:
> array>, id: string]
>
> scala> df_input
> res33: org.apache.spark.sql.DataFrame = [id: string]
>
> scala> df_input.collect
> res34: Array[org.apache.spark.sql.Row] = Array([idaddress2], [idaddress12])
>
> I can get ids I want if I know the value to look for in addresses.id
> using :
>
> scala> df.filter(array_contains(df("addresses.id"),
> "idaddress2")).select("id").collect
> res35: Array[org.apache.spark.sql.Row] = Array([], [YY])
>
> However when I try to join df_input and df and to use the previous filter
> as the join condition I get an exception :
>
> scala> df.join(df_input, array_contains(df("adresses.id"),
> df_input("id")))
> java.lang.RuntimeException: Unsupported literal type class
> org.apache.spark.sql.Column id
> at
> org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:50)
> at
> org.apache.spark.sql.functions$.array_contains(functions.scala:2452)
> ...
>
> It seems that array_contains only supports static arguments and does not
> replace a sql.Column by its value.
>
> What's the best way to achieve what I want to do ? (Also speaking in term
> of performance)
>
> Thanks
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Joining a RDD to a Dataframe

2016-05-08 Thread Cyril Scetbon
Hi,

I have a RDD built during a spark streaming job and I'd like to join it to a 
DataFrame (E/S input) to enrich it.
It seems that I can't join the RDD and the DF without converting first the RDD 
to a DF (Tell me if I'm wrong). Here are the schemas of both DF :

scala> df
res32: org.apache.spark.sql.DataFrame = [f1: string, addresses: 
array>, id: string]

scala> df_input
res33: org.apache.spark.sql.DataFrame = [id: string]

scala> df_input.collect
res34: Array[org.apache.spark.sql.Row] = Array([idaddress2], [idaddress12])

I can get ids I want if I know the value to look for in addresses.id using :

scala> df.filter(array_contains(df("addresses.id"), 
"idaddress2")).select("id").collect
res35: Array[org.apache.spark.sql.Row] = Array([], [YY])

However when I try to join df_input and df and to use the previous filter as 
the join condition I get an exception :

scala> df.join(df_input, array_contains(df("adresses.id"), df_input("id")))
java.lang.RuntimeException: Unsupported literal type class 
org.apache.spark.sql.Column id
at 
org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:50)
at org.apache.spark.sql.functions$.array_contains(functions.scala:2452)
...

It seems that array_contains only supports static arguments and does not 
replace a sql.Column by its value.

What's the best way to achieve what I want to do ? (Also speaking in term of 
performance)

Thanks
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org