Hi all, I implemented a recursive UDF, that tries to find a document number in a long list of predecessor documents. This can be a multi-level hierarchy: C is successor of B is successor of A (but many more levels are possible)
As input to that UDF I prepare a dict that contains the complete document flow reduced to the required fields to follow the path back to the originating document. The dict is broadcasted and then used by the UDF. Actually this approach is very slow and now - as data growth - it kills my executors regularly so that RDDs get lost and task fail. Sometimes also the workers (docker containers) become unresponsive and are getting killed. Here is the coding of the methods: 1.: Prepare and define the UDF, broadcast dict. # Define function for recursive lookup of root document def __gen_caseid_udf_sales_document_flow(self): global bc_document_flow, udf_sales_document_flow # Prepare docflow for broadcasting by only selecting required fields df_vbfa_subset = self.spark.table("FLOWTABLE").select("clnt", "predecessor_head", "predecessor_item", "doc_num", "doc_item") # Prepare dictionary for broadcast document_flow_dic = {} for clnt, predecessor_head, predecessor_item, doc_num, doc_item in df_subset.rdd.collect(): document_flow_dic[(clnt, doc_num, doc_item)] = predecessor_head, predecessor_item # Broadcast dictionary to workers bc_document_flow = self.spark.sparkContext.broadcast(document_flow_dic) # Register new user defined function UDF udf_vbfa_sales_document_flow = func.udf(gen_caseid_udf_sale_root_lookup) 2.: The recursive function used in the UDF # Find root document def gen_caseid_udf_sale_get_root_doc(lt, clnt, docnr, posnr): global bc_document_flow if not clnt or not docnr or not posnr: return None, None key = clnt, docnr, posnr if key in lt: docnr_tmp, item_tmp = lt[key] if docnr_tmp == docnr and item_tmp == posnr: return docnr, posnr else: return gen_caseid_udf_sale_get_root_doc(lt, clnt, docnr_tmp, item_tmp) else: return docnr, posnr 3: The UDF # Define udf function to look up root document def gen_caseid_udf_sale_root_lookup(clnt, doc_num, posnr): global bc_document_flow # Name of the broad cast variable lt = bc_document_flow.value h, p = gen_caseid_udf_vbfa_sale_get_root_doc(lt, clnt, doc_num, posnr) return str(clnt) + str(h) + str(p) ## 4. Usage of the UDF on a DF that might contain several ten thousands of rows: # Lookup root document from document flow documents = documents.withColumn("root_doc", udf_sales_document_flow(func.col('clnt'), func.col('document_number'), func.col('item_number'))) Do you have any hint on my coding or are there any ideas how to implement a recursive select without implement a potential unoptimizable UDF? I came along https://en.wikipedia.org/wiki/Hierarchical_and_recursive_queries_in_SQL which might an option, does Spark support this kind of construct? Thanks and all the best, Meikel