Is there any reason you dont want to convert this - i dont think join b/w
RDD and DF is supported.

On Sat, May 7, 2016 at 11:41 PM, Cyril Scetbon <cyril.scet...@free.fr>
wrote:

> Hi,
>
> I have a RDD built during a spark streaming job and I'd like to join it to
> a DataFrame (E/S input) to enrich it.
> It seems that I can't join the RDD and the DF without converting first the
> RDD to a DF (Tell me if I'm wrong). Here are the schemas of both DF :
>
> scala> df
> res32: org.apache.spark.sql.DataFrame = [f1: string, addresses:
> array<struct<sf1:int,sf2:string,sf3:string,id:string>>, id: string]
>
> scala> df_input
> res33: org.apache.spark.sql.DataFrame = [id: string]
>
> scala> df_input.collect
> res34: Array[org.apache.spark.sql.Row] = Array([idaddress2], [idaddress12])
>
> I can get ids I want if I know the value to look for in addresses.id
> using :
>
> scala> df.filter(array_contains(df("addresses.id"),
> "idaddress2")).select("id").collect
> res35: Array[org.apache.spark.sql.Row] = Array([XXXX], [YY])
>
> However when I try to join df_input and df and to use the previous filter
> as the join condition I get an exception :
>
> scala> df.join(df_input, array_contains(df("adresses.id"),
> df_input("id")))
> java.lang.RuntimeException: Unsupported literal type class
> org.apache.spark.sql.Column id
>         at
> org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:50)
>         at
> org.apache.spark.sql.functions$.array_contains(functions.scala:2452)
>         ...
>
> It seems that array_contains only supports static arguments and does not
> replace a sql.Column by its value.
>
> What's the best way to achieve what I want to do ? (Also speaking in term
> of performance)
>
> Thanks
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

Reply via email to