Nobody has the answer ? Another thing I've seen is that if I have no documents at all :
scala> df.select(explode(df("addresses.id")).as("aid")).collect res27: Array[org.apache.spark.sql.Row] = Array() Then scala> df.select(explode(df("addresses.id")).as("aid"), df("id")) org.apache.spark.sql.AnalysisException: Cannot resolve column name "id" among (adresses); at org.apache.spark.sql.DataFrame$$anonfun$resolve$1.apply(DataFrame.scala:152) at org.apache.spark.sql.DataFrame$$anonfun$resolve$1.apply(DataFrame.scala:152) Is there a better way to query nested objects and to join between a DF containing nested objects and another regular data frame (yes it's the current case) > On May 9, 2016, at 00:42, Cyril Scetbon <cyril.scet...@free.fr> wrote: > > Hi Ashish, > > The issue is not related to converting a RDD to a DF. I did it. I was just > asking if I should do it differently. > > The issue regards the exception when using array_contains with a sql.Column > instead of a value. > > I found another way to do it using explode as follows : > > df.select(explode(df("addresses.id")).as("aid"), df("id")).join(df_input, > $"aid" === df_input("id")).select(df("id")) > > However, I'm wondering if it does almost the same or if the query is > different and worst in term of performance. > > If someone can comment on it and maybe give me advices. > > Thank you. > >> On May 8, 2016, at 22:12, Ashish Dubey <ashish....@gmail.com >> <mailto:ashish....@gmail.com>> wrote: >> >> Is there any reason you dont want to convert this - i dont think join b/w >> RDD and DF is supported. >> >> On Sat, May 7, 2016 at 11:41 PM, Cyril Scetbon <cyril.scet...@free.fr >> <mailto:cyril.scet...@free.fr>> wrote: >> Hi, >> >> I have a RDD built during a spark streaming job and I'd like to join it to a >> DataFrame (E/S input) to enrich it. >> It seems that I can't join the RDD and the DF without converting first the >> RDD to a DF (Tell me if I'm wrong). Here are the schemas of both DF : >> >> scala> df >> res32: org.apache.spark.sql.DataFrame = [f1: string, addresses: >> array<struct<sf1:int,sf2:string,sf3:string,id:string>>, id: string] >> >> scala> df_input >> res33: org.apache.spark.sql.DataFrame = [id: string] >> >> scala> df_input.collect >> res34: Array[org.apache.spark.sql.Row] = Array([idaddress2], [idaddress12]) >> >> I can get ids I want if I know the value to look for in addresses.id >> <http://addresses.id/> using : >> >> scala> df.filter(array_contains(df("addresses.id <http://addresses.id/>"), >> "idaddress2")).select("id").collect >> res35: Array[org.apache.spark.sql.Row] = Array([XXXX], [YY]) >> >> However when I try to join df_input and df and to use the previous filter as >> the join condition I get an exception : >> >> scala> df.join(df_input, array_contains(df("adresses.id >> <http://adresses.id/>"), df_input("id"))) >> java.lang.RuntimeException: Unsupported literal type class >> org.apache.spark.sql.Column id >> at >> org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:50) >> at >> org.apache.spark.sql.functions$.array_contains(functions.scala:2452) >> ... >> >> It seems that array_contains only supports static arguments and does not >> replace a sql.Column by its value. >> >> What's the best way to achieve what I want to do ? (Also speaking in term of >> performance) >> >> Thanks >> --------------------------------------------------------------------- >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> <mailto:user-unsubscr...@spark.apache.org> >> For additional commands, e-mail: user-h...@spark.apache.org >> <mailto:user-h...@spark.apache.org> >> >> >