I am new to spark (and scala) and I need to figure out the correct approach
to add a column to a dataframe that is calculated using a recursive approach
based on the value of another column against data within the same dataframe.

Given a simple hierarchical adjacency schema of my source dataframe
scala> orgHierarchy.printSchema
root
 |-- id: string (nullable = true)
 |-- parent_key: string (nullable = true)

I'd like to add a "root" column calculated for each row that takes the value
of the parent_key if it is not null and traverse the hierarchy up to the
root and take the id of that row which will have a parent_key of null. The
data does not all have a common root and there are varying number of levels
so a fixed iteration is not possible.  Ideally this would be as simple as a
call to an equivalent of the CONNECT_BY_ROOT function with the "CONNECT BY"
clause in Oracle SQL ( http://www.oradev.com/connect_by.jsp) like this 

sqlContext.sql("""
SELECT id, parent_key , CONNECT_BY_ROOT id AS root_key 
FROM orgHierarchy 
CONNECT BY parent_key = PRIOR id
")

Can a Spark SQL UDF access data outside of the current row using something
like this:

def findRoot(key: String, df: DataFrame) : String = {
  val parent_key = df.filter(s"id =
'$key'").select("parent_key").asInstanceOf[String]
  if (Option(parent_key).getOrElse("").isEmpty)
        return key
  findRoot(parent_key, df)
}

sqlContext.sql("""
SELECT id, parent_key , IF(parent_key IS NOT
NULL,findRoot(parent_key,orgHierarchy),id) AS root_key 
FROM orgHierarchy
""")

Or, can I use load the dataframe into GraphX as an EdgeRDD and map the
current Row id and parent_key column values as properties and then find the
shortest path for each vertex to the "root" using collectNeighborIds and
update each vertex in the traversal with the found key and somehow get that
back out to a dataframe? 

Or, do I just run a map against the dataframe and using something like the
findRoot function on each row and manually create a new dataframe as the
result?

I need to join the resulting dataframe from either approach against another
based on they id of the row too and this may be ultimately run for all rows
or it may be just from one starting row depending on the criteria used to
filter the other dataframe so I'm worried about either approach being opaque
to the catalyst optimizer. The current dataset I'm working with has around
6M rows with some hierarchies having no children and some having as many as
99 levels and I am trying to use the root from the hierarchy to provide a
common grouping for the data owned by these organizations to understand data
visibility holes in our current solr sharding strategy. 

Thanks,
Eric




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Calculate-Hierarchical-root-as-new-column-tp25037.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to