Hi,

I have a RDD built during a spark streaming job and I'd like to join it to a 
DataFrame (E/S input) to enrich it.
It seems that I can't join the RDD and the DF without converting first the RDD 
to a DF (Tell me if I'm wrong). Here are the schemas of both DF :

scala> df
res32: org.apache.spark.sql.DataFrame = [f1: string, addresses: 
array<struct<sf1:int,sf2:string,sf3:string,id:string>>, id: string]

scala> df_input
res33: org.apache.spark.sql.DataFrame = [id: string]

scala> df_input.collect
res34: Array[org.apache.spark.sql.Row] = Array([idaddress2], [idaddress12])

I can get ids I want if I know the value to look for in addresses.id using :

scala> df.filter(array_contains(df("addresses.id"), 
"idaddress2")).select("id").collect
res35: Array[org.apache.spark.sql.Row] = Array([XXXX], [YY])

However when I try to join df_input and df and to use the previous filter as 
the join condition I get an exception :

scala> df.join(df_input, array_contains(df("adresses.id"), df_input("id")))
java.lang.RuntimeException: Unsupported literal type class 
org.apache.spark.sql.Column id
        at 
org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:50)
        at org.apache.spark.sql.functions$.array_contains(functions.scala:2452)
        ...

It seems that array_contains only supports static arguments and does not 
replace a sql.Column by its value.

What's the best way to achieve what I want to do ? (Also speaking in term of 
performance)

Thanks
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to