Hi Mich,

Thanks for your suggestions.
1) It currently runs on one server with plenty of resources assigned. But I 
will keep it in mind to replace monotonically_increasing_id() with uuid() once 
we scale up.
2) I have replaced the null values in origin with a string 
{prefix}-{mnt_by}-{organisation}

replacement_string = psf.concat_ws("-", psf.col("prefix"), psf.col("mnt_by"), 
psf.col("descr"))
df = df.withColumn("origin", psf.coalesce(psf.col("origin"), 
replacement_string))

I have verified my other columns have no Null values.

3) This is my logic how i generate IDs

mnt_by_id = df.select(MNT_BY).distinct().withColumn(MAINTAINER_ID, 
psf.concat(psf.lit('m_'), psf.monotonically_increasing_id()))
prefix_id = df.select(PREFIX).distinct().withColumn(PREFIX_ID, 
psf.concat(psf.lit('p_'), psf.monotonically_increasing_id()))
origin_id = df.select(ORIGIN).distinct().withColumn(ORIGIN_ID, 
psf.concat(psf.lit('o_'), psf.monotonically_increasing_id()))
organisation_id = df.select(DESCR).distinct().withColumn(ORGANISATION_ID, 
psf.concat(psf.lit('org_'), psf.monotonically_increasing_id()))

df = df.join(mnt_by_id, on=MNT_BY, how="left").join(prefix_id, on=PREFIX, 
how="left").join(origin_id, on=ORIGIN, how="left").join(organisation_id, 
on=DESCR, how="left")

I create the ID using the distinct values in the columns "mnt_by", "prefix", 
"origin" and "descr". The same columns I join "on".

4) This is my current resource allocation, I run it on the server of my 
university.
It has 112 cores and 1.48T ram, I can request more resources but in my eyes 
this sound be plenty.
If you think more resource would help, I will ask them.

spark_conf = SparkConf().setAppName(f"pyspark-{APP_NAME}-{int(time())}").set(
    "spark.submit.deployMode", "client"
).set("spark.sql.parquet.binaryAsString", "true"
).set("spark.driver.bindAddress", "localhost"
).set("spark.driver.host", "127.0.0.1"
# ).set("spark.driver.port", "0"
).set("spark.ui.port", "4041"
).set("spark.executor.instances", "1"
).set("spark.executor.cores", "50"
).set("spark.executor.memory", "128G"
).set("spark.executor.memoryOverhead", "32G"
).set("spark.driver.cores", "16"
).set("spark.driver.memory", "64G"
)

I dont think b) applies as its a single machine.

Kind regards,
Jelle

________________________________
From: Mich Talebzadeh <mich.talebza...@gmail.com>
Sent: Wednesday, April 24, 2024 6:12 PM
To: Nijland, J.G.W. (Jelle, Student M-CS) <j.g.w.nijl...@student.utwente.nl>
Cc: user@spark.apache.org <user@spark.apache.org>
Subject: Re: [spark-graphframes]: Generating incorrect edges

OK let us have a look at these

1) You are using monotonically_increasing_id(), which is not 
collision-resistant in distributed environments like Spark. Multiple hosts
   can generate the same ID. I suggest switching to UUIDs (e.g., uuid.uuid4()) 
for guaranteed uniqueness.

2) Missing values in the Origin column lead to null IDs, potentially causing 
problems downstream. You can handle missing values appropriately, say
   a) Filter out rows with missing origins or b) impute missing values with a 
strategy that preserves relationships (if applicable).

3) With join code, you mentioned left joining on the same column used for ID 
creation, not very clear!

4) Edge Issue, it appears to me the issue seems to occur with larger datasets 
(>100K records). Possible causes could be
   a) Resource Constraints as data size increases, PySpark might struggle with 
joins or computations if resources are limited (memory, CPU).
   b) Data Skew: Uneven distribution of values in certain columns could lead to 
imbalanced processing across machines.  Check Spark UI (4040) on staging and 
execution tabs

HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
London
United Kingdom


 
[https://ci3.googleusercontent.com/mail-sig/AIorK4zholKucR2Q9yMrKbHNn-o1TuS4mYXyi2KO6Xmx6ikHPySa9MLaLZ8t2hrA6AUcxSxDgHIwmKE]
   view my Linkedin 
profile<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: The information provided is correct to the best of my knowledge but 
of course cannot be guaranteed . It is essential to note that, as with any 
advice, quote "one test result is worth one-thousand expert opinions (Werner 
<https://en.wikipedia.org/wiki/Wernher_von_Braun> Von 
Braun<https://en.wikipedia.org/wiki/Wernher_von_Braun>)".


On Wed, 24 Apr 2024 at 16:44, Nijland, J.G.W. (Jelle, Student M-CS) 
<j.g.w.nijl...@student.utwente.nl<mailto:j.g.w.nijl...@student.utwente.nl>> 
wrote:
Hi Mich,

Thanks for your reply,
1) ID generation is done using 
monotonically_increasing_id()<https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.monotonically_increasing_id.html>
 this is then prefixed with "p_", "m_", "o_" or "org_" depending on the type of 
the value it identifies.
2) There are some missing values in the Origin column, these will result in a 
Null ID
3) The join code is present in [1], I join "left" on the same column I create 
the ID on
4) I dont think the issue is in ID or edge generation, if i limit my input 
dataframe and union it with my Utwente data row, I can verify those edges are 
created correctly up to 100K records.
Once I go past that amount of records the results become inconsistent and 
incorrect.

Kind regards,
Jelle Nijland


________________________________
From: Mich Talebzadeh 
<mich.talebza...@gmail.com<mailto:mich.talebza...@gmail.com>>
Sent: Wednesday, April 24, 2024 4:40 PM
To: Nijland, J.G.W. (Jelle, Student M-CS) 
<j.g.w.nijl...@student.utwente.nl<mailto:j.g.w.nijl...@student.utwente.nl>>
Cc: user@spark.apache.org<mailto:user@spark.apache.org> 
<user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Re: [spark-graphframes]: Generating incorrect edges

OK few observations

1) ID Generation Method: How are you generating unique IDs (UUIDs, sequential 
numbers, etc.)?
2) Data Inconsistencies: Have you checked for missing values impacting ID 
generation?
3) Join Verification: If relevant, can you share the code for joining data 
points during ID creation? Are joins matching columns correctly?
4) Specific Edge Issues: Can you share examples of vertex IDs with incorrect 
connections? Is this related to ID generation or edge creation logic?

HTH
Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI, FinCrime
London
United Kingdom


 
[https://ci3.googleusercontent.com/mail-sig/AIorK4zholKucR2Q9yMrKbHNn-o1TuS4mYXyi2KO6Xmx6ikHPySa9MLaLZ8t2hrA6AUcxSxDgHIwmKE]
   view my Linkedin 
profile<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: The information provided is correct to the best of my knowledge but 
of course cannot be guaranteed . It is essential to note that, as with any 
advice, quote "one test result is worth one-thousand expert opinions (Werner 
<https://en.wikipedia.org/wiki/Wernher_von_Braun> Von 
Braun<https://en.wikipedia.org/wiki/Wernher_von_Braun>)".


On Wed, 24 Apr 2024 at 12:24, Nijland, J.G.W. (Jelle, Student M-CS) 
<j.g.w.nijl...@student.utwente.nl<mailto:j.g.w.nijl...@student.utwente.nl>> 
wrote:
tags: pyspark,spark-graphframes

Hello,

I am running pyspark in a podman container and I have issues with incorrect 
edges when I build my graph.
I start with loading a source dataframe from a parquet directory on my server. 
The source dataframe has the following columns:
+---------+-------+-----------------+---------+------+-----------------+------+-------------------+
|created |descr |last_modified|mnt_by |origin|start_address|prefix 
|external_origin|
+---------+-------+-----------------+---------+------+-----------------+------+-------------------+

I aim to build a graph connecting prefix, mnt_by, origin and descr with edges 
storing the created and last_modified values.
I start with generating IDs for the prefix, mnt_by, origin and descr using 
monotonically_increasing_id() [1]
These IDs are prefixed with "m_", "p_", "o_" or "org_" to ensure they are 
unique IDs across the dataframe.

Then I construct the vertices dataframe by collecting the ID, value and whether 
they are external for each vertex. [2]
These vertices are then unioned together.
Following the vertices, I construct the edges dataframe by selecting the IDs 
that I want to be the src and the dst and union those together. [3]
These edges store the created and last_modified.

Now I am ready to construct the graph. Here is where I run into my issue.

When verifying my graph, I looked at a couple of vertices to see if they have 
the correct edges.
I looked at the Utwente prefix, origin, descr and mnt_by and found that it 
generates incorrect edges.

I saw edges going out to vertices that are not associated with the utwente 
values at all.
The methods to find the vertices, edges and the output can be found in [4]
We can already observe inconsistencies by viewing the prefix->maintainer and 
origin -> prefix edges. [5]
Depending on what column I filter on the results are inconsistent.
To make matters worse some edges contain IDs that are not connected to the 
original values in the source dataframe at all.

What I have tried to resolve my issue:

  *
Write a checker that verifies edges created against the source dataframe. [6]
The aim of this checker was to determine where the inconsistency comes fro, to 
locate the bug and resolve it.
I ran this checker a limited graphs from n=10 upwards to n=1000000 (or 1m).
This felt close enough as there are only ~6.5m records in my source dataframe.
This ran correctly, near the 1m it did experience significant slowdown at the 
full dataframe it errors/times out.
I blamed this on the large joins that it performs on the source dataframe.
  *
I found a github issue of someone with significantly larger graphs have similar 
issues.
One suggestion there blamed indexing using strings rather than ints or longs.
I rewrote my system to use int for IDs but I ran into the same issue.
The amount of incorrect edges was the same, the link to which incorrects 
vertices it links to was the same too.
  *
I re-ordered my source dataframe to see what the impact was.
This results in considerably more incorrect edges using the checker in [4]
If helpful I can post the output of this checker as well.

Can you give me any pointers in what I can try or what I can do to clarify my 
situation better?
Thanks in advance for your time.

Kind regards,
Jelle Nijland




[1]
import pyspark.sql.functions as psf

# ID labels
PREFIX_ID = "prefix_id"
MAINTAINER_ID = "mnt_by_id"
ORIGIN_ID = "origin_id"
ORGANISATION_ID = "organisation_id"

# Source dataframe column names
MNT_BY = "mnt_by"
PREFIX = "prefix"
ORIGIN = "origin"
DESCR = "descr"
EXTERNAL_O = "external_origin"


def generate_ids(df: DataFrame) -> DataFrame:
    """
    Generates a unique ID for each distinct maintainer, prefix, origin and 
organisation

    Parameters
    ----------
    df : DataFrame
        DataFrame to generate IDs for
    """
    mnt_by_id = df.select(MNT_BY).distinct().withColumn(MAINTAINER_ID, 
psf.concat(psf.lit('m_'), psf.monotonically_increasing_id()))
    prefix_id = df.select(PREFIX).distinct().withColumn(PREFIX_ID, 
psf.concat(psf.lit('p_'), psf.monotonically_increasing_id()))
    origin_id = df.select(ORIGIN).distinct().withColumn(ORIGIN_ID, 
psf.concat(psf.lit('o_'), psf.monotonically_increasing_id()))
    organisation_id = df.select(DESCR).distinct().withColumn(ORGANISATION_ID, 
psf.concat(psf.lit('org_'), psf.monotonically_increasing_id()))

    df = df.join(mnt_by_id, on=MNT_BY, how="left").join(prefix_id, on=PREFIX, 
how="left").join(origin_id, on=ORIGIN, how="left").join(organisation_id, 
on=DESCR, how="left")
    return df

def create_vertices(df: DataFrame) -> DataFrame:
    """
    Creates vertices from a DataFrame with IDs
    Vertices have the format:
    ID (str) | VALUE (str) | EXTERNAL (bool)

    ID follows the format [p_|o_|m_|org_][0-9]+

    Parameters
    ----------
    df : DataFrame
        DataFrame to generate vertices for
    """
    prefixes = df.select(PREFIX_ID, PREFIX, psf.lit(False))
    maintainers = df.select(MAINTAINER_ID, MNT_BY, psf.lit(False))
    origins = df.select(ORIGIN_ID, ORIGIN, EXTERNAL_O)
    organisations = df.select(ORGANISATION_ID, DESCR, psf.lit(False))

    result_df = prefixes.union(maintainers).union(origins).union(organisations)
    result_df = result_df.dropDuplicates()
    result_df = result_df.withColumnRenamed("false", EXTERNAL)
    result_df = result_df.withColumnRenamed(PREFIX_ID, ID)
    result_df = result_df.withColumnRenamed(PREFIX, VALUE)
    return result_df

[3]
def create_edges(df: DataFrame) -> DataFrame:
    """
    Creates edges from DataFrame with IDs
    Edges have the format:
    SRC (str) | DST (str) | Created (str) | Last_modified (str)

    Parameters
    ----------
    df : DataFrame
        DataFrame to generate edges for
    """
    p_to_mnt = df.select(PREFIX_ID, MAINTAINER_ID, CREATED, LAST_MODIFIED)
    m_to_o = df.select(MAINTAINER_ID, ORIGIN_ID, CREATED, LAST_MODIFIED)
    o_to_org = df.select(ORIGIN_ID, ORGANISATION_ID, CREATED, LAST_MODIFIED)
    o_to_p = df.select(ORIGIN_ID, PREFIX_ID, CREATED, LAST_MODIFIED)

    edges = p_to_mnt.union(m_to_o).union(o_to_org).union(o_to_p)
    # result_df = edges
    result_df = edges.dropDuplicates()
    result_df = result_df.withColumnRenamed(PREFIX_ID, SRC)
    result_df = result_df.withColumnRenamed(MAINTAINER_ID, DST)
    return result_df

[4]
# # Demonstrating bug with the edges, using UT's prefix/mnt/origin/org as 
example
# # How-to-use: get the IDs and plug them in bug_show_related_edges()
def bug_gather_ids(g: GraphFrame):
    vertex = "130.89.0.0/16<http://130.89.0.0/16>"
    filtered_v = g.vertices.filter(psf.col(VALUE)==vertex)
    filtered_v.show(truncate=False)

    mnt = "SN-LIR-MNT RIPE-NCC-LEGACY-MNT"
    filtered_m = g.vertices.filter(psf.col(VALUE)==mnt)
    filtered_m.show(truncate=False)

    origin = "1133"
    filtered_o = g.vertices.filter(psf.col(VALUE)==origin)
    filtered_o.show(truncate=False)

    org = "Drienerlolaan 5 P.O. Box 217 NL - 7500 AE Enschede"
    filtered_org = g.vertices.filter(psf.col(VALUE)==org)
    filtered_org.show(truncate=False)

def bug_show_related_edges(g: GraphFrame, p_id : str, m_id : str, o_id : str, 
org_id : str):
    con1_m = psf.col(DST)==m_id
    con2_m = psf.col(SRC)==m_id
    edg1_m = g.edges.filter(con1_m)
    edg1_m.show(truncate=False)
    edg2_m = g.edges.filter(con2_m)
    edg2_m.show(truncate=False)

    con1_p = psf.col(DST)==p_id
    con2_p = psf.col(SRC)==p_id
    edg1_p = g.edges.filter(con1_p)
    edg1_p.show(truncate=False)
    edg2_p = g.edges.filter(con2_p)
    edg2_p.show(truncate=False)

    con1_o = psf.col(DST)==o_id
    con2_o = psf.col(SRC)==o_id
    edg1_o = g.edges.filter(con1_o)
    edg1_o.show(truncate=False)
    edg2_o = g.edges.filter(con2_o)
    edg2_o.show(truncate=False)

    con1_org = psf.col(DST)==org_id
    con2_org = psf.col(SRC)==org_id
    edg1_org = g.edges.filter(con1_org)
    edg1_org.show(truncate=False)
    edg2_org = g.edges.filter(con2_org)
    edg2_org.show(truncate=False)

# prefix 'p_60129612354' corresponds with 130.89.0.0/16<http://130.89.0.0/16>
# maintainer 'm_2897' corresponds with SN-LIR-MNT RIPE-NCC-LEGACY-MNT
# origin 'o_5130' corresponds with1133
# organisation 'org_197568516576' corresponds with Drienerlolaan 5 P.O. Box 217 
NL - 7500 AE Enschede
Output of bug_show_related_edges(g, "p_60129612354", "m_2897", "o_5130", 
"org_197568516576")
# prefix -> maintainer edges (filtered on dst = maintainer)
+---------------------+----------+----------------+-----------------+
| src                             | dst           | created           
|last_modified|
+---------------------+----------+----------------+-----------------+
|p_197568533425 |m_2897 |0                         |0                           
|
|p_94489347499    |m_2897 |0                         |0                         
  |
|p_25769898645    |m_2897 |1020678697 |1020678697   |
|p_128849058299 |m_2897 |0                         |0                           
|
|p_68719514870    |m_2897 |0                         |0                         
  |
|p_146028965124 |m_2897 |1020267786  |1020267786   |
|p_60129579570    |m_2897 |0                          |0                        
  |
+---------------------+----------+-----------------+----------------+

# maintainer to origin edges(filtered on src = maintainer)
+---------+------------------+----------------+----------------+
|src          | dst                        |created            |last_modified|
+---------+------------------+----------------+----------------+
|m_2897|o_8589936949 |0                        |0                          |
|m_2897|o_5130                 |0                        |0                     
     |
|m_2897|o_8589936949 |1020267786|1020267786   |
|m_2897|o_8589936949 |1020678697|1020678697   |
+---------+------------------+----------------+----------------+

# origin to prefix edges (filtered in dst = prefix)
+-------+-------------------+---------------+-----------------+
|src       |dst                          |created          |last_modified |
+-------+-------------------+---------------+-----------------+
|o_380|p_60129612354|1220513130|1220513130   |
+-------+-------------------+---------------+-----------------+

# prefix to maintainer edges(filtered on src = prefix)
+-------------------+--------+---------------+----------------+
|src                           | dst       |created          |last_modified|
+-------------------+--------+---------------+----------------+
|p_60129612354|m_533|1220513130|1220513130   |
+-------------------+--------+---------------+----------------+

# maintainer to origin edges(filtered on dst = origin)
+---------+---------+----------+-----------------+
|src          |dst          |created  |last_modified|
+---------+---------+----------+-----------------+
|m_2897|o_5130|0                 |0                           |
+---------+---------+----------+-----------------+

# origin to prefix(filtered on src = prefix)
+--------+------------------------+---------+----------------+
|src         |dst                                  |created|last_modified|
+--------+------------------------+---------+----------------+
|o_5130|p_94489347499       |0               |0                        |
|o_5130|org_283467856326|0               |0                        |
+--------+------------------------+---------+----------------+

# origin to organisation(filtered on dst = organisation)
+------------------+-----------------------+---------------+----------------+
|src                         |dst                                 |created      
     |last_modified|
+------------------+-----------------------+---------------+----------------+
|o_8589936415|org_197568516576|1320919317|1320919317   |
|o_8589936415|org_197568516576|1285464368|1285464368   |
+------------------+-----------------------+---------------+----------------+

# origin to organisation(filtered on src= organisation)
# NB: this is intended, there are no outgoing edges from organisations
+---+---+-------+-------------+
|src|dst|created|last_modified|
+---+---+-------+-------------+
+---+---+-------+-------------+

[6]
# Determines which rows are missing from edges
# Determines which rows in edges are not in src_df (superfluous)
def checker_df(edges: DataFrame, src_df: DataFrame, src: str, dst: str, src_id: 
str, dst_id: str) -> DataFrame:
    con1 = psf.col(SRC).startswith(src)
    con2 = psf.col(DST).startswith(dst)
    filtered_edges = edges.filter(con1 & con2)
    if filtered_edges.count() == 0:
        print(f"[Warning] No edges found after filtering! [checker_df] was 
checking {src_id} to {dst_id}")
    if DEBUG:
        print("[Checker_df] Printing Filtered edges:")
        filtered_edges.show(truncate=False)
        print(f"Records: {filtered_edges.count()}")
    # Shows if there are rows which are not represented in the edge df
    missing_edges = src_df.join(filtered_edges,
            ((filtered_edges[SRC] == src_df[src_id]) &
             (filtered_edges[DST] == src_df[dst_id])),
             "left_anti")
    missing_edges = missing_edges.dropDuplicates()
    if DEBUG:
        print("[Checker_df] Printing missing edges:")
        missing_edges.show(truncate=False)
        print(f"Records: {missing_edges.count()}")
    #  Shows if there are edges which are not represented in the src df
    superfluous_edges = filtered_edges.join(src_df,
            ((filtered_edges[SRC] == src_df[src_id]) &
             (filtered_edges[DST] == src_df[dst_id])),
             "left")
    superfluous_edges2 = 
superfluous_edges.filter(superfluous_edges[src_id].isNull() | 
superfluous_edges[dst_id].isNull())
    if DEBUG:
        print("[Checker_df] Printing superfluous edges:")
        superfluous_edges2.show(truncate=False)
        print(f"Records: {superfluous_edges2.count()}")
    return missing_edges, superfluous_edges2

Reply via email to