Hi, I have a RDD built during a spark streaming job and I'd like to join it to a DataFrame (E/S input) to enrich it. It seems that I can't join the RDD and the DF without converting first the RDD to a DF (Tell me if I'm wrong). Here are the schemas of both DF :
scala> df res32: org.apache.spark.sql.DataFrame = [f1: string, addresses: array<struct<sf1:int,sf2:string,sf3:string,id:string>>, id: string] scala> df_input res33: org.apache.spark.sql.DataFrame = [id: string] scala> df_input.collect res34: Array[org.apache.spark.sql.Row] = Array([idaddress2], [idaddress12]) I can get ids I want if I know the value to look for in addresses.id using : scala> df.filter(array_contains(df("addresses.id"), "idaddress2")).select("id").collect res35: Array[org.apache.spark.sql.Row] = Array([XXXX], [YY]) However when I try to join df_input and df and to use the previous filter as the join condition I get an exception : scala> df.join(df_input, array_contains(df("adresses.id"), df_input("id"))) java.lang.RuntimeException: Unsupported literal type class org.apache.spark.sql.Column id at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:50) at org.apache.spark.sql.functions$.array_contains(functions.scala:2452) ... It seems that array_contains only supports static arguments and does not replace a sql.Column by its value. What's the best way to achieve what I want to do ? (Also speaking in term of performance) Thanks --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org