tags: pyspark,spark-graphframes Hello,
I am running pyspark in a podman container and I have issues with incorrect edges when I build my graph. I start with loading a source dataframe from a parquet directory on my server. The source dataframe has the following columns: +---------+-------+-----------------+---------+------+-----------------+------+-------------------+ |created |descr |last_modified|mnt_by |origin|start_address|prefix |external_origin| +---------+-------+-----------------+---------+------+-----------------+------+-------------------+ I aim to build a graph connecting prefix, mnt_by, origin and descr with edges storing the created and last_modified values. I start with generating IDs for the prefix, mnt_by, origin and descr using monotonically_increasing_id() [1] These IDs are prefixed with "m_", "p_", "o_" or "org_" to ensure they are unique IDs across the dataframe. Then I construct the vertices dataframe by collecting the ID, value and whether they are external for each vertex. [2] These vertices are then unioned together. Following the vertices, I construct the edges dataframe by selecting the IDs that I want to be the src and the dst and union those together. [3] These edges store the created and last_modified. Now I am ready to construct the graph. Here is where I run into my issue. When verifying my graph, I looked at a couple of vertices to see if they have the correct edges. I looked at the Utwente prefix, origin, descr and mnt_by and found that it generates incorrect edges. I saw edges going out to vertices that are not associated with the utwente values at all. The methods to find the vertices, edges and the output can be found in [4] We can already observe inconsistencies by viewing the prefix->maintainer and origin -> prefix edges. [5] Depending on what column I filter on the results are inconsistent. To make matters worse some edges contain IDs that are not connected to the original values in the source dataframe at all. What I have tried to resolve my issue: * Write a checker that verifies edges created against the source dataframe. [6] The aim of this checker was to determine where the inconsistency comes fro, to locate the bug and resolve it. I ran this checker a limited graphs from n=10 upwards to n=1000000 (or 1m). This felt close enough as there are only ~6.5m records in my source dataframe. This ran correctly, near the 1m it did experience significant slowdown at the full dataframe it errors/times out. I blamed this on the large joins that it performs on the source dataframe. * I found a github issue of someone with significantly larger graphs have similar issues. One suggestion there blamed indexing using strings rather than ints or longs. I rewrote my system to use int for IDs but I ran into the same issue. The amount of incorrect edges was the same, the link to which incorrects vertices it links to was the same too. * I re-ordered my source dataframe to see what the impact was. This results in considerably more incorrect edges using the checker in [4] If helpful I can post the output of this checker as well. Can you give me any pointers in what I can try or what I can do to clarify my situation better? Thanks in advance for your time. Kind regards, Jelle Nijland [1] import pyspark.sql.functions as psf # ID labels PREFIX_ID = "prefix_id" MAINTAINER_ID = "mnt_by_id" ORIGIN_ID = "origin_id" ORGANISATION_ID = "organisation_id" # Source dataframe column names MNT_BY = "mnt_by" PREFIX = "prefix" ORIGIN = "origin" DESCR = "descr" EXTERNAL_O = "external_origin" def generate_ids(df: DataFrame) -> DataFrame: """ Generates a unique ID for each distinct maintainer, prefix, origin and organisation Parameters ---------- df : DataFrame DataFrame to generate IDs for """ mnt_by_id = df.select(MNT_BY).distinct().withColumn(MAINTAINER_ID, psf.concat(psf.lit('m_'), psf.monotonically_increasing_id())) prefix_id = df.select(PREFIX).distinct().withColumn(PREFIX_ID, psf.concat(psf.lit('p_'), psf.monotonically_increasing_id())) origin_id = df.select(ORIGIN).distinct().withColumn(ORIGIN_ID, psf.concat(psf.lit('o_'), psf.monotonically_increasing_id())) organisation_id = df.select(DESCR).distinct().withColumn(ORGANISATION_ID, psf.concat(psf.lit('org_'), psf.monotonically_increasing_id())) df = df.join(mnt_by_id, on=MNT_BY, how="left").join(prefix_id, on=PREFIX, how="left").join(origin_id, on=ORIGIN, how="left").join(organisation_id, on=DESCR, how="left") return df def create_vertices(df: DataFrame) -> DataFrame: """ Creates vertices from a DataFrame with IDs Vertices have the format: ID (str) | VALUE (str) | EXTERNAL (bool) ID follows the format [p_|o_|m_|org_][0-9]+ Parameters ---------- df : DataFrame DataFrame to generate vertices for """ prefixes = df.select(PREFIX_ID, PREFIX, psf.lit(False)) maintainers = df.select(MAINTAINER_ID, MNT_BY, psf.lit(False)) origins = df.select(ORIGIN_ID, ORIGIN, EXTERNAL_O) organisations = df.select(ORGANISATION_ID, DESCR, psf.lit(False)) result_df = prefixes.union(maintainers).union(origins).union(organisations) result_df = result_df.dropDuplicates() result_df = result_df.withColumnRenamed("false", EXTERNAL) result_df = result_df.withColumnRenamed(PREFIX_ID, ID) result_df = result_df.withColumnRenamed(PREFIX, VALUE) return result_df [3] def create_edges(df: DataFrame) -> DataFrame: """ Creates edges from DataFrame with IDs Edges have the format: SRC (str) | DST (str) | Created (str) | Last_modified (str) Parameters ---------- df : DataFrame DataFrame to generate edges for """ p_to_mnt = df.select(PREFIX_ID, MAINTAINER_ID, CREATED, LAST_MODIFIED) m_to_o = df.select(MAINTAINER_ID, ORIGIN_ID, CREATED, LAST_MODIFIED) o_to_org = df.select(ORIGIN_ID, ORGANISATION_ID, CREATED, LAST_MODIFIED) o_to_p = df.select(ORIGIN_ID, PREFIX_ID, CREATED, LAST_MODIFIED) edges = p_to_mnt.union(m_to_o).union(o_to_org).union(o_to_p) # result_df = edges result_df = edges.dropDuplicates() result_df = result_df.withColumnRenamed(PREFIX_ID, SRC) result_df = result_df.withColumnRenamed(MAINTAINER_ID, DST) return result_df [4] # # Demonstrating bug with the edges, using UT's prefix/mnt/origin/org as example # # How-to-use: get the IDs and plug them in bug_show_related_edges() def bug_gather_ids(g: GraphFrame): vertex = "130.89.0.0/16" filtered_v = g.vertices.filter(psf.col(VALUE)==vertex) filtered_v.show(truncate=False) mnt = "SN-LIR-MNT RIPE-NCC-LEGACY-MNT" filtered_m = g.vertices.filter(psf.col(VALUE)==mnt) filtered_m.show(truncate=False) origin = "1133" filtered_o = g.vertices.filter(psf.col(VALUE)==origin) filtered_o.show(truncate=False) org = "Drienerlolaan 5 P.O. Box 217 NL - 7500 AE Enschede" filtered_org = g.vertices.filter(psf.col(VALUE)==org) filtered_org.show(truncate=False) def bug_show_related_edges(g: GraphFrame, p_id : str, m_id : str, o_id : str, org_id : str): con1_m = psf.col(DST)==m_id con2_m = psf.col(SRC)==m_id edg1_m = g.edges.filter(con1_m) edg1_m.show(truncate=False) edg2_m = g.edges.filter(con2_m) edg2_m.show(truncate=False) con1_p = psf.col(DST)==p_id con2_p = psf.col(SRC)==p_id edg1_p = g.edges.filter(con1_p) edg1_p.show(truncate=False) edg2_p = g.edges.filter(con2_p) edg2_p.show(truncate=False) con1_o = psf.col(DST)==o_id con2_o = psf.col(SRC)==o_id edg1_o = g.edges.filter(con1_o) edg1_o.show(truncate=False) edg2_o = g.edges.filter(con2_o) edg2_o.show(truncate=False) con1_org = psf.col(DST)==org_id con2_org = psf.col(SRC)==org_id edg1_org = g.edges.filter(con1_org) edg1_org.show(truncate=False) edg2_org = g.edges.filter(con2_org) edg2_org.show(truncate=False) # prefix 'p_60129612354' corresponds with 130.89.0.0/16 # maintainer 'm_2897' corresponds with SN-LIR-MNT RIPE-NCC-LEGACY-MNT # origin 'o_5130' corresponds with1133 # organisation 'org_197568516576' corresponds with Drienerlolaan 5 P.O. Box 217 NL - 7500 AE Enschede Output of bug_show_related_edges(g, "p_60129612354", "m_2897", "o_5130", "org_197568516576") # prefix -> maintainer edges (filtered on dst = maintainer) +---------------------+----------+----------------+-----------------+ | src | dst | created |last_modified| +---------------------+----------+----------------+-----------------+ |p_197568533425 |m_2897 |0 |0 | |p_94489347499 |m_2897 |0 |0 | |p_25769898645 |m_2897 |1020678697 |1020678697 | |p_128849058299 |m_2897 |0 |0 | |p_68719514870 |m_2897 |0 |0 | |p_146028965124 |m_2897 |1020267786 |1020267786 | |p_60129579570 |m_2897 |0 |0 | +---------------------+----------+-----------------+----------------+ # maintainer to origin edges(filtered on src = maintainer) +---------+------------------+----------------+----------------+ |src | dst |created |last_modified| +---------+------------------+----------------+----------------+ |m_2897|o_8589936949 |0 |0 | |m_2897|o_5130 |0 |0 | |m_2897|o_8589936949 |1020267786|1020267786 | |m_2897|o_8589936949 |1020678697|1020678697 | +---------+------------------+----------------+----------------+ # origin to prefix edges (filtered in dst = prefix) +-------+-------------------+---------------+-----------------+ |src |dst |created |last_modified | +-------+-------------------+---------------+-----------------+ |o_380|p_60129612354|1220513130|1220513130 | +-------+-------------------+---------------+-----------------+ # prefix to maintainer edges(filtered on src = prefix) +-------------------+--------+---------------+----------------+ |src | dst |created |last_modified| +-------------------+--------+---------------+----------------+ |p_60129612354|m_533|1220513130|1220513130 | +-------------------+--------+---------------+----------------+ # maintainer to origin edges(filtered on dst = origin) +---------+---------+----------+-----------------+ |src |dst |created |last_modified| +---------+---------+----------+-----------------+ |m_2897|o_5130|0 |0 | +---------+---------+----------+-----------------+ # origin to prefix(filtered on src = prefix) +--------+------------------------+---------+----------------+ |src |dst |created|last_modified| +--------+------------------------+---------+----------------+ |o_5130|p_94489347499 |0 |0 | |o_5130|org_283467856326|0 |0 | +--------+------------------------+---------+----------------+ # origin to organisation(filtered on dst = organisation) +------------------+-----------------------+---------------+----------------+ |src |dst |created |last_modified| +------------------+-----------------------+---------------+----------------+ |o_8589936415|org_197568516576|1320919317|1320919317 | |o_8589936415|org_197568516576|1285464368|1285464368 | +------------------+-----------------------+---------------+----------------+ # origin to organisation(filtered on src= organisation) # NB: this is intended, there are no outgoing edges from organisations +---+---+-------+-------------+ |src|dst|created|last_modified| +---+---+-------+-------------+ +---+---+-------+-------------+ [6] # Determines which rows are missing from edges # Determines which rows in edges are not in src_df (superfluous) def checker_df(edges: DataFrame, src_df: DataFrame, src: str, dst: str, src_id: str, dst_id: str) -> DataFrame: con1 = psf.col(SRC).startswith(src) con2 = psf.col(DST).startswith(dst) filtered_edges = edges.filter(con1 & con2) if filtered_edges.count() == 0: print(f"[Warning] No edges found after filtering! [checker_df] was checking {src_id} to {dst_id}") if DEBUG: print("[Checker_df] Printing Filtered edges:") filtered_edges.show(truncate=False) print(f"Records: {filtered_edges.count()}") # Shows if there are rows which are not represented in the edge df missing_edges = src_df.join(filtered_edges, ((filtered_edges[SRC] == src_df[src_id]) & (filtered_edges[DST] == src_df[dst_id])), "left_anti") missing_edges = missing_edges.dropDuplicates() if DEBUG: print("[Checker_df] Printing missing edges:") missing_edges.show(truncate=False) print(f"Records: {missing_edges.count()}") # Shows if there are edges which are not represented in the src df superfluous_edges = filtered_edges.join(src_df, ((filtered_edges[SRC] == src_df[src_id]) & (filtered_edges[DST] == src_df[dst_id])), "left") superfluous_edges2 = superfluous_edges.filter(superfluous_edges[src_id].isNull() | superfluous_edges[dst_id].isNull()) if DEBUG: print("[Checker_df] Printing superfluous edges:") superfluous_edges2.show(truncate=False) print(f"Records: {superfluous_edges2.count()}") return missing_edges, superfluous_edges2