Hi Ashish,

The issue is not related to converting a RDD to a DF. I did it. I was just 
asking if I should do it differently.

The issue regards the exception when using array_contains with a sql.Column 
instead of a value.

I found another way to do it using explode as follows : 

df.select(explode(df("addresses.id")).as("aid"), df("id")).join(df_input, 
$"aid" === df_input("id")).select(df("id"))

However, I'm wondering if it does almost the same or if the query is different 
and worst in term of performance.

If someone can comment on it and maybe give me advices.

Thank you.

> On May 8, 2016, at 22:12, Ashish Dubey <ashish....@gmail.com> wrote:
> 
> Is there any reason you dont want to convert this - i dont think join b/w RDD 
> and DF is supported.
> 
> On Sat, May 7, 2016 at 11:41 PM, Cyril Scetbon <cyril.scet...@free.fr 
> <mailto:cyril.scet...@free.fr>> wrote:
> Hi,
> 
> I have a RDD built during a spark streaming job and I'd like to join it to a 
> DataFrame (E/S input) to enrich it.
> It seems that I can't join the RDD and the DF without converting first the 
> RDD to a DF (Tell me if I'm wrong). Here are the schemas of both DF :
> 
> scala> df
> res32: org.apache.spark.sql.DataFrame = [f1: string, addresses: 
> array<struct<sf1:int,sf2:string,sf3:string,id:string>>, id: string]
> 
> scala> df_input
> res33: org.apache.spark.sql.DataFrame = [id: string]
> 
> scala> df_input.collect
> res34: Array[org.apache.spark.sql.Row] = Array([idaddress2], [idaddress12])
> 
> I can get ids I want if I know the value to look for in addresses.id 
> <http://addresses.id/> using :
> 
> scala> df.filter(array_contains(df("addresses.id <http://addresses.id/>"), 
> "idaddress2")).select("id").collect
> res35: Array[org.apache.spark.sql.Row] = Array([XXXX], [YY])
> 
> However when I try to join df_input and df and to use the previous filter as 
> the join condition I get an exception :
> 
> scala> df.join(df_input, array_contains(df("adresses.id 
> <http://adresses.id/>"), df_input("id")))
> java.lang.RuntimeException: Unsupported literal type class 
> org.apache.spark.sql.Column id
>         at 
> org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:50)
>         at 
> org.apache.spark.sql.functions$.array_contains(functions.scala:2452)
>         ...
> 
> It seems that array_contains only supports static arguments and does not 
> replace a sql.Column by its value.
> 
> What's the best way to achieve what I want to do ? (Also speaking in term of 
> performance)
> 
> Thanks
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> <mailto:user-unsubscr...@spark.apache.org>
> For additional commands, e-mail: user-h...@spark.apache.org 
> <mailto:user-h...@spark.apache.org>
> 
> 

Reply via email to