I am new to spark (and scala) and I need to figure out the correct approach to add a column to a dataframe that is calculated using a recursive approach based on the value of another column against data within the same dataframe.
Given a simple hierarchical adjacency schema of my source dataframe scala> orgHierarchy.printSchema root |-- id: string (nullable = true) |-- parent_key: string (nullable = true) I'd like to add a "root" column calculated for each row that takes the value of the parent_key if it is not null and traverse the hierarchy up to the root and take the id of that row which will have a parent_key of null. The data does not all have a common root and there are varying number of levels so a fixed iteration is not possible. Ideally this would be as simple as a call to an equivalent of the CONNECT_BY_ROOT function with the "CONNECT BY" clause in Oracle SQL ( http://www.oradev.com/connect_by.jsp) like this sqlContext.sql(""" SELECT id, parent_key , CONNECT_BY_ROOT id AS root_key FROM orgHierarchy CONNECT BY parent_key = PRIOR id ") Can a Spark SQL UDF access data outside of the current row using something like this: def findRoot(key: String, df: DataFrame) : String = { val parent_key = df.filter(s"id = '$key'").select("parent_key").asInstanceOf[String] if (Option(parent_key).getOrElse("").isEmpty) return key findRoot(parent_key, df) } sqlContext.sql(""" SELECT id, parent_key , IF(parent_key IS NOT NULL,findRoot(parent_key,orgHierarchy),id) AS root_key FROM orgHierarchy """) Or, can I use load the dataframe into GraphX as an EdgeRDD and map the current Row id and parent_key column values as properties and then find the shortest path for each vertex to the "root" using collectNeighborIds and update each vertex in the traversal with the found key and somehow get that back out to a dataframe? Or, do I just run a map against the dataframe and using something like the findRoot function on each row and manually create a new dataframe as the result? I need to join the resulting dataframe from either approach against another based on they id of the row too and this may be ultimately run for all rows or it may be just from one starting row depending on the criteria used to filter the other dataframe so I'm worried about either approach being opaque to the catalyst optimizer. The current dataset I'm working with has around 6M rows with some hierarchies having no children and some having as many as 99 levels and I am trying to use the root from the hierarchy to provide a common grouping for the data owned by these organizations to understand data visibility holes in our current solr sharding strategy. Thanks, Eric -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Calculate-Hierarchical-root-as-new-column-tp25037.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org