tags: pyspark,spark-graphframes

Hello,

I am running pyspark in a podman container and I have issues with incorrect 
edges when I build my graph.
I start with loading a source dataframe from a parquet directory on my server. 
The source dataframe has the following columns:
+---------+-------+-----------------+---------+------+-----------------+------+-------------------+
|created |descr |last_modified|mnt_by |origin|start_address|prefix 
|external_origin|
+---------+-------+-----------------+---------+------+-----------------+------+-------------------+

I aim to build a graph connecting prefix, mnt_by, origin and descr with edges 
storing the created and last_modified values.
I start with generating IDs for the prefix, mnt_by, origin and descr using 
monotonically_increasing_id() [1]
These IDs are prefixed with "m_", "p_", "o_" or "org_" to ensure they are 
unique IDs across the dataframe.

Then I construct the vertices dataframe by collecting the ID, value and whether 
they are external for each vertex. [2]
These vertices are then unioned together.
Following the vertices, I construct the edges dataframe by selecting the IDs 
that I want to be the src and the dst and union those together. [3]
These edges store the created and last_modified.

Now I am ready to construct the graph. Here is where I run into my issue.

When verifying my graph, I looked at a couple of vertices to see if they have 
the correct edges.
I looked at the Utwente prefix, origin, descr and mnt_by and found that it 
generates incorrect edges.

I saw edges going out to vertices that are not associated with the utwente 
values at all.
The methods to find the vertices, edges and the output can be found in [4]
We can already observe inconsistencies by viewing the prefix->maintainer and 
origin -> prefix edges. [5]
Depending on what column I filter on the results are inconsistent.
To make matters worse some edges contain IDs that are not connected to the 
original values in the source dataframe at all.

What I have tried to resolve my issue:

  *
Write a checker that verifies edges created against the source dataframe. [6]
The aim of this checker was to determine where the inconsistency comes fro, to 
locate the bug and resolve it.
I ran this checker a limited graphs from n=10 upwards to n=1000000 (or 1m).
This felt close enough as there are only ~6.5m records in my source dataframe.
This ran correctly, near the 1m it did experience significant slowdown at the 
full dataframe it errors/times out.
I blamed this on the large joins that it performs on the source dataframe.
  *
I found a github issue of someone with significantly larger graphs have similar 
issues.
One suggestion there blamed indexing using strings rather than ints or longs.
I rewrote my system to use int for IDs but I ran into the same issue.
The amount of incorrect edges was the same, the link to which incorrects 
vertices it links to was the same too.
  *
I re-ordered my source dataframe to see what the impact was.
This results in considerably more incorrect edges using the checker in [4]
If helpful I can post the output of this checker as well.

Can you give me any pointers in what I can try or what I can do to clarify my 
situation better?
Thanks in advance for your time.

Kind regards,
Jelle Nijland




[1]
import pyspark.sql.functions as psf

# ID labels
PREFIX_ID = "prefix_id"
MAINTAINER_ID = "mnt_by_id"
ORIGIN_ID = "origin_id"
ORGANISATION_ID = "organisation_id"

# Source dataframe column names
MNT_BY = "mnt_by"
PREFIX = "prefix"
ORIGIN = "origin"
DESCR = "descr"
EXTERNAL_O = "external_origin"


def generate_ids(df: DataFrame) -> DataFrame:
    """
    Generates a unique ID for each distinct maintainer, prefix, origin and 
organisation

    Parameters
    ----------
    df : DataFrame
        DataFrame to generate IDs for
    """
    mnt_by_id = df.select(MNT_BY).distinct().withColumn(MAINTAINER_ID, 
psf.concat(psf.lit('m_'), psf.monotonically_increasing_id()))
    prefix_id = df.select(PREFIX).distinct().withColumn(PREFIX_ID, 
psf.concat(psf.lit('p_'), psf.monotonically_increasing_id()))
    origin_id = df.select(ORIGIN).distinct().withColumn(ORIGIN_ID, 
psf.concat(psf.lit('o_'), psf.monotonically_increasing_id()))
    organisation_id = df.select(DESCR).distinct().withColumn(ORGANISATION_ID, 
psf.concat(psf.lit('org_'), psf.monotonically_increasing_id()))

    df = df.join(mnt_by_id, on=MNT_BY, how="left").join(prefix_id, on=PREFIX, 
how="left").join(origin_id, on=ORIGIN, how="left").join(organisation_id, 
on=DESCR, how="left")
    return df

def create_vertices(df: DataFrame) -> DataFrame:
    """
    Creates vertices from a DataFrame with IDs
    Vertices have the format:
    ID (str) | VALUE (str) | EXTERNAL (bool)

    ID follows the format [p_|o_|m_|org_][0-9]+

    Parameters
    ----------
    df : DataFrame
        DataFrame to generate vertices for
    """
    prefixes = df.select(PREFIX_ID, PREFIX, psf.lit(False))
    maintainers = df.select(MAINTAINER_ID, MNT_BY, psf.lit(False))
    origins = df.select(ORIGIN_ID, ORIGIN, EXTERNAL_O)
    organisations = df.select(ORGANISATION_ID, DESCR, psf.lit(False))

    result_df = prefixes.union(maintainers).union(origins).union(organisations)
    result_df = result_df.dropDuplicates()
    result_df = result_df.withColumnRenamed("false", EXTERNAL)
    result_df = result_df.withColumnRenamed(PREFIX_ID, ID)
    result_df = result_df.withColumnRenamed(PREFIX, VALUE)
    return result_df

[3]
def create_edges(df: DataFrame) -> DataFrame:
    """
    Creates edges from DataFrame with IDs
    Edges have the format:
    SRC (str) | DST (str) | Created (str) | Last_modified (str)

    Parameters
    ----------
    df : DataFrame
        DataFrame to generate edges for
    """
    p_to_mnt = df.select(PREFIX_ID, MAINTAINER_ID, CREATED, LAST_MODIFIED)
    m_to_o = df.select(MAINTAINER_ID, ORIGIN_ID, CREATED, LAST_MODIFIED)
    o_to_org = df.select(ORIGIN_ID, ORGANISATION_ID, CREATED, LAST_MODIFIED)
    o_to_p = df.select(ORIGIN_ID, PREFIX_ID, CREATED, LAST_MODIFIED)

    edges = p_to_mnt.union(m_to_o).union(o_to_org).union(o_to_p)
    # result_df = edges
    result_df = edges.dropDuplicates()
    result_df = result_df.withColumnRenamed(PREFIX_ID, SRC)
    result_df = result_df.withColumnRenamed(MAINTAINER_ID, DST)
    return result_df

[4]
# # Demonstrating bug with the edges, using UT's prefix/mnt/origin/org as 
example
# # How-to-use: get the IDs and plug them in bug_show_related_edges()
def bug_gather_ids(g: GraphFrame):
    vertex = "130.89.0.0/16"
    filtered_v = g.vertices.filter(psf.col(VALUE)==vertex)
    filtered_v.show(truncate=False)

    mnt = "SN-LIR-MNT RIPE-NCC-LEGACY-MNT"
    filtered_m = g.vertices.filter(psf.col(VALUE)==mnt)
    filtered_m.show(truncate=False)

    origin = "1133"
    filtered_o = g.vertices.filter(psf.col(VALUE)==origin)
    filtered_o.show(truncate=False)

    org = "Drienerlolaan 5 P.O. Box 217 NL - 7500 AE Enschede"
    filtered_org = g.vertices.filter(psf.col(VALUE)==org)
    filtered_org.show(truncate=False)

def bug_show_related_edges(g: GraphFrame, p_id : str, m_id : str, o_id : str, 
org_id : str):
    con1_m = psf.col(DST)==m_id
    con2_m = psf.col(SRC)==m_id
    edg1_m = g.edges.filter(con1_m)
    edg1_m.show(truncate=False)
    edg2_m = g.edges.filter(con2_m)
    edg2_m.show(truncate=False)

    con1_p = psf.col(DST)==p_id
    con2_p = psf.col(SRC)==p_id
    edg1_p = g.edges.filter(con1_p)
    edg1_p.show(truncate=False)
    edg2_p = g.edges.filter(con2_p)
    edg2_p.show(truncate=False)

    con1_o = psf.col(DST)==o_id
    con2_o = psf.col(SRC)==o_id
    edg1_o = g.edges.filter(con1_o)
    edg1_o.show(truncate=False)
    edg2_o = g.edges.filter(con2_o)
    edg2_o.show(truncate=False)

    con1_org = psf.col(DST)==org_id
    con2_org = psf.col(SRC)==org_id
    edg1_org = g.edges.filter(con1_org)
    edg1_org.show(truncate=False)
    edg2_org = g.edges.filter(con2_org)
    edg2_org.show(truncate=False)

# prefix 'p_60129612354' corresponds with 130.89.0.0/16
# maintainer 'm_2897' corresponds with SN-LIR-MNT RIPE-NCC-LEGACY-MNT
# origin 'o_5130' corresponds with1133
# organisation 'org_197568516576' corresponds with Drienerlolaan 5 P.O. Box 217 
NL - 7500 AE Enschede
Output of bug_show_related_edges(g, "p_60129612354", "m_2897", "o_5130", 
"org_197568516576")
# prefix -> maintainer edges (filtered on dst = maintainer)
+---------------------+----------+----------------+-----------------+
| src                             | dst           | created           
|last_modified|
+---------------------+----------+----------------+-----------------+
|p_197568533425 |m_2897 |0                         |0                           
|
|p_94489347499    |m_2897 |0                         |0                         
  |
|p_25769898645    |m_2897 |1020678697 |1020678697   |
|p_128849058299 |m_2897 |0                         |0                           
|
|p_68719514870    |m_2897 |0                         |0                         
  |
|p_146028965124 |m_2897 |1020267786  |1020267786   |
|p_60129579570    |m_2897 |0                          |0                        
  |
+---------------------+----------+-----------------+----------------+

# maintainer to origin edges(filtered on src = maintainer)
+---------+------------------+----------------+----------------+
|src          | dst                        |created            |last_modified|
+---------+------------------+----------------+----------------+
|m_2897|o_8589936949 |0                        |0                          |
|m_2897|o_5130                 |0                        |0                     
     |
|m_2897|o_8589936949 |1020267786|1020267786   |
|m_2897|o_8589936949 |1020678697|1020678697   |
+---------+------------------+----------------+----------------+

# origin to prefix edges (filtered in dst = prefix)
+-------+-------------------+---------------+-----------------+
|src       |dst                          |created          |last_modified |
+-------+-------------------+---------------+-----------------+
|o_380|p_60129612354|1220513130|1220513130   |
+-------+-------------------+---------------+-----------------+

# prefix to maintainer edges(filtered on src = prefix)
+-------------------+--------+---------------+----------------+
|src                           | dst       |created          |last_modified|
+-------------------+--------+---------------+----------------+
|p_60129612354|m_533|1220513130|1220513130   |
+-------------------+--------+---------------+----------------+

# maintainer to origin edges(filtered on dst = origin)
+---------+---------+----------+-----------------+
|src          |dst          |created  |last_modified|
+---------+---------+----------+-----------------+
|m_2897|o_5130|0                 |0                           |
+---------+---------+----------+-----------------+

# origin to prefix(filtered on src = prefix)
+--------+------------------------+---------+----------------+
|src         |dst                                  |created|last_modified|
+--------+------------------------+---------+----------------+
|o_5130|p_94489347499       |0               |0                        |
|o_5130|org_283467856326|0               |0                        |
+--------+------------------------+---------+----------------+

# origin to organisation(filtered on dst = organisation)
+------------------+-----------------------+---------------+----------------+
|src                         |dst                                 |created      
     |last_modified|
+------------------+-----------------------+---------------+----------------+
|o_8589936415|org_197568516576|1320919317|1320919317   |
|o_8589936415|org_197568516576|1285464368|1285464368   |
+------------------+-----------------------+---------------+----------------+

# origin to organisation(filtered on src= organisation)
# NB: this is intended, there are no outgoing edges from organisations
+---+---+-------+-------------+
|src|dst|created|last_modified|
+---+---+-------+-------------+
+---+---+-------+-------------+

[6]
# Determines which rows are missing from edges
# Determines which rows in edges are not in src_df (superfluous)
def checker_df(edges: DataFrame, src_df: DataFrame, src: str, dst: str, src_id: 
str, dst_id: str) -> DataFrame:
    con1 = psf.col(SRC).startswith(src)
    con2 = psf.col(DST).startswith(dst)
    filtered_edges = edges.filter(con1 & con2)
    if filtered_edges.count() == 0:
        print(f"[Warning] No edges found after filtering! [checker_df] was 
checking {src_id} to {dst_id}")
    if DEBUG:
        print("[Checker_df] Printing Filtered edges:")
        filtered_edges.show(truncate=False)
        print(f"Records: {filtered_edges.count()}")
    # Shows if there are rows which are not represented in the edge df
    missing_edges = src_df.join(filtered_edges,
            ((filtered_edges[SRC] == src_df[src_id]) &
             (filtered_edges[DST] == src_df[dst_id])),
             "left_anti")
    missing_edges = missing_edges.dropDuplicates()
    if DEBUG:
        print("[Checker_df] Printing missing edges:")
        missing_edges.show(truncate=False)
        print(f"Records: {missing_edges.count()}")
    #  Shows if there are edges which are not represented in the src df
    superfluous_edges = filtered_edges.join(src_df,
            ((filtered_edges[SRC] == src_df[src_id]) &
             (filtered_edges[DST] == src_df[dst_id])),
             "left")
    superfluous_edges2 = 
superfluous_edges.filter(superfluous_edges[src_id].isNull() | 
superfluous_edges[dst_id].isNull())
    if DEBUG:
        print("[Checker_df] Printing superfluous edges:")
        superfluous_edges2.show(truncate=False)
        print(f"Records: {superfluous_edges2.count()}")
    return missing_edges, superfluous_edges2

Reply via email to