Hi all,

I implemented a recursive UDF, that tries to find a document number in a long 
list of predecessor documents. This can be a multi-level hierarchy:
C is successor of B is successor of A (but many more levels are possible)

As input to that UDF I prepare a dict that contains the complete document flow 
reduced to the required fields to follow the path back to the originating 
document.
The dict is broadcasted and then used  by the UDF. Actually this approach is 
very slow and now - as data growth - it kills my executors regularly so that 
RDDs get lost and task fail. Sometimes also the workers (docker containers) 
become unresponsive and are getting killed.

Here is the coding of the methods:

1.: Prepare and define the UDF, broadcast dict.

    # Define function for recursive lookup of root document
    def __gen_caseid_udf_sales_document_flow(self):
        global bc_document_flow, udf_sales_document_flow

        # Prepare docflow for broadcasting by only selecting required fields
        df_vbfa_subset = self.spark.table("FLOWTABLE").select("clnt", 
"predecessor_head", "predecessor_item", "doc_num", "doc_item")

        # Prepare dictionary for broadcast
        document_flow_dic = {}
        for clnt, predecessor_head, predecessor_item, doc_num, doc_item in 
df_subset.rdd.collect():
            document_flow_dic[(clnt, doc_num, doc_item)] = predecessor_head, 
predecessor_item

        # Broadcast dictionary to workers
        bc_document_flow = self.spark.sparkContext.broadcast(document_flow_dic)

        # Register new user defined function UDF
        udf_vbfa_sales_document_flow = func.udf(gen_caseid_udf_sale_root_lookup)


2.: The recursive function used in the UDF
# Find root document
def gen_caseid_udf_sale_get_root_doc(lt, clnt, docnr, posnr):
    global bc_document_flow

    if not clnt or not docnr or not posnr:
        return None, None

    key = clnt, docnr, posnr

    if key in lt:
        docnr_tmp, item_tmp = lt[key]
        if docnr_tmp == docnr and item_tmp == posnr:
            return docnr, posnr
        else:
            return gen_caseid_udf_sale_get_root_doc(lt, clnt, docnr_tmp, 
item_tmp)
    else:
        return docnr, posnr

3: The UDF
# Define udf function to look up root document
def gen_caseid_udf_sale_root_lookup(clnt, doc_num, posnr):
    global bc_document_flow # Name of the broad cast variable

    lt = bc_document_flow.value
    h, p = gen_caseid_udf_vbfa_sale_get_root_doc(lt, clnt, doc_num, posnr)
return str(clnt) + str(h) + str(p)

##
4. Usage of the UDF on a DF that might contain several ten thousands of rows:

# Lookup root document from document flow
documents = documents.withColumn("root_doc", 
udf_sales_document_flow(func.col('clnt'),
                                                                         
func.col('document_number'),
                                                                         
func.col('item_number')))

Do you have any hint on my coding or are there any ideas how to implement a 
recursive select without implement a potential unoptimizable UDF?
I came along 
https://en.wikipedia.org/wiki/Hierarchical_and_recursive_queries_in_SQL which 
might an option, does Spark support this kind of construct?

Thanks and all the best,
Meikel

Reply via email to