OK let us have a look at these

1) You are using monotonically_increasing_id(), which is not
collision-resistant in distributed environments like Spark. Multiple hosts
   can generate the same ID. I suggest switching to UUIDs (e.g.,
uuid.uuid4()) for guaranteed uniqueness.

2) Missing values in the Origin column lead to null IDs, potentially
causing problems downstream. You can handle missing values appropriately,
say
   a) Filter out rows with missing origins or b) impute missing values with
a strategy that preserves relationships (if applicable).

3) With join code, you mentioned left joining on the same column used for
ID creation, not very clear!

4) Edge Issue, it appears to me the issue seems to occur with larger
datasets (>100K records). Possible causes could be
   a) Resource Constraints as data size increases, PySpark might struggle
with joins or computations if resources are limited (memory, CPU).
   b) Data Skew: Uneven distribution of values in certain columns could
lead to imbalanced processing across machines.  Check Spark UI (4040) on
staging and execution tabs

HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
London
United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".


On Wed, 24 Apr 2024 at 16:44, Nijland, J.G.W. (Jelle, Student M-CS) <
j.g.w.nijl...@student.utwente.nl> wrote:

> Hi Mich,
>
> Thanks for your reply,
> 1) ID generation is done using monotonically_increasing_id()
> <https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.monotonically_increasing_id.html>
>  this
> is then prefixed with "p_", "m_", "o_" or "org_" depending on the type of
> the value it identifies.
> 2) There are some missing values in the Origin column, these will result
> in a Null ID
> 3) The join code is present in [1], I join "left" on the same column
> I create the ID on
> 4) I dont think the issue is in ID or edge generation, if i limit my input
> dataframe and union it with my Utwente data row, I can verify those edges
> are created correctly up to 100K records.
> Once I go past that amount of records the results become inconsistent and
> incorrect.
>
> Kind regards,
> Jelle Nijland
>
>
> ------------------------------
> *From:* Mich Talebzadeh <mich.talebza...@gmail.com>
> *Sent:* Wednesday, April 24, 2024 4:40 PM
> *To:* Nijland, J.G.W. (Jelle, Student M-CS) <
> j.g.w.nijl...@student.utwente.nl>
> *Cc:* user@spark.apache.org <user@spark.apache.org>
> *Subject:* Re: [spark-graphframes]: Generating incorrect edges
>
> OK few observations
>
> 1) ID Generation Method: How are you generating unique IDs (UUIDs,
> sequential numbers, etc.)?
> 2) Data Inconsistencies: Have you checked for missing values impacting ID
> generation?
> 3) Join Verification: If relevant, can you share the code for joining data
> points during ID creation? Are joins matching columns correctly?
> 4) Specific Edge Issues: Can you share examples of vertex IDs with
> incorrect connections? Is this related to ID generation or edge creation
> logic?
>
> HTH
> Mich Talebzadeh,
> Technologist | Architect | Data Engineer  | Generative AI, FinCrime
> London
> United Kingdom
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
> Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>
>
> On Wed, 24 Apr 2024 at 12:24, Nijland, J.G.W. (Jelle, Student M-CS) <
> j.g.w.nijl...@student.utwente.nl> wrote:
>
> tags: pyspark,spark-graphframes
>
> Hello,
>
> I am running pyspark in a podman container and I have issues with
> incorrect edges when I build my graph.
> I start with loading a source dataframe from a parquet directory on my
> server. The source dataframe has the following columns:
>
> +---------+-------+-----------------+---------+------+-----------------+------+-------------------+
> |created |descr |last_modified|mnt_by |origin|start_address|prefix
> |external_origin|
>
> +---------+-------+-----------------+---------+------+-----------------+------+-------------------+
>
> I aim to build a graph connecting prefix, mnt_by, origin and descr with
> edges storing the created and last_modified values.
> I start with generating IDs for the prefix, mnt_by, origin and descr using
> monotonically_increasing_id() [1]
> These IDs are prefixed with "m_", "p_", "o_" or "org_" to ensure they are
> unique IDs across the dataframe.
>
> Then I construct the vertices dataframe by collecting the ID, value and
> whether they are external for each vertex. [2]
> These vertices are then unioned together.
> Following the vertices, I construct the edges dataframe by selecting the
> IDs that I want to be the src and the dst and union those together. [3]
> These edges store the created and last_modified.
>
> Now I am ready to construct the graph. Here is where I run into my issue.
>
> When verifying my graph, I looked at a couple of vertices to see if they
> have the correct edges.
> I looked at the Utwente prefix, origin, descr and mnt_by and found that it
> generates incorrect edges.
>
> I saw edges going out to vertices that are not associated with the utwente
> values at all.
> The methods to find the vertices, edges and the output can be found in [4]
> We can already observe inconsistencies by viewing the prefix->maintainer
> and origin -> prefix edges. [5]
> Depending on what column I filter on the results are inconsistent.
> To make matters worse some edges contain IDs that are not connected to the
> original values in the source dataframe at all.
>
> What I have tried to resolve my issue:
>
>    - Write a checker that verifies edges created against the source
>    dataframe. [6]
>    The aim of this checker was to determine where the inconsistency comes
>    fro, to locate the bug and resolve it.
>    I ran this checker a limited graphs from n=10 upwards to n=1000000 (or
>    1m).
>    This felt close enough as there are only ~6.5m records in my source
>    dataframe.
>    This ran correctly, near the 1m it did experience significant slowdown
>    at the full dataframe it errors/times out.
>    I blamed this on the large joins that it performs on the source
>    dataframe.
>    - I found a github issue of someone with significantly larger graphs
>    have similar issues.
>    One suggestion there blamed indexing using strings rather than ints or
>    longs.
>    I rewrote my system to use int for IDs but I ran into the same issue.
>    The amount of incorrect edges was the same, the link to which
>    incorrects vertices it links to was the same too.
>    - I re-ordered my source dataframe to see what the impact was.
>    This results in considerably more incorrect edges using the checker in
>    [4]
>    If helpful I can post the output of this checker as well.
>
>
> Can you give me any pointers in what I can try or what I can do to clarify
> my situation better?
> Thanks in advance for your time.
>
> Kind regards,
> Jelle Nijland
>
>
>
>
> [1]
> import pyspark.sql.functions as psf
>
> # ID labels
> PREFIX_ID = "prefix_id"
> MAINTAINER_ID = "mnt_by_id"
> ORIGIN_ID = "origin_id"
> ORGANISATION_ID = "organisation_id"
>
> # Source dataframe column names
> MNT_BY = "mnt_by"
> PREFIX = "prefix"
> ORIGIN = "origin"
> DESCR = "descr"
> EXTERNAL_O = "external_origin"
>
>
> def generate_ids(df: DataFrame) -> DataFrame:
>     """
>     Generates a unique ID for each distinct maintainer, prefix, origin and
> organisation
>
>     Parameters
>     ----------
>     df : DataFrame
>         DataFrame to generate IDs for
>     """
>     mnt_by_id = df.select(MNT_BY).distinct().withColumn(MAINTAINER_ID,
> psf.concat(psf.lit('m_'), psf.monotonically_increasing_id()))
>     prefix_id = df.select(PREFIX).distinct().withColumn(PREFIX_ID,
> psf.concat(psf.lit('p_'), psf.monotonically_increasing_id()))
>     origin_id = df.select(ORIGIN).distinct().withColumn(ORIGIN_ID,
> psf.concat(psf.lit('o_'), psf.monotonically_increasing_id()))
>     organisation_id =
> df.select(DESCR).distinct().withColumn(ORGANISATION_ID,
> psf.concat(psf.lit('org_'), psf.monotonically_increasing_id()))
>
>     df = df.join(mnt_by_id, on=MNT_BY, how="left").join(prefix_id,
> on=PREFIX, how="left").join(origin_id, on=ORIGIN,
> how="left").join(organisation_id, on=DESCR, how="left")
>     return df
>
> def create_vertices(df: DataFrame) -> DataFrame:
>     """
>     Creates vertices from a DataFrame with IDs
>     Vertices have the format:
>     ID (str) | VALUE (str) | EXTERNAL (bool)
>
>     ID follows the format [p_|o_|m_|org_][0-9]+
>
>     Parameters
>     ----------
>     df : DataFrame
>         DataFrame to generate vertices for
>     """
>     prefixes = df.select(PREFIX_ID, PREFIX, psf.lit(False))
>     maintainers = df.select(MAINTAINER_ID, MNT_BY, psf.lit(False))
>     origins = df.select(ORIGIN_ID, ORIGIN, EXTERNAL_O)
>     organisations = df.select(ORGANISATION_ID, DESCR, psf.lit(False))
>
>     result_df =
> prefixes.union(maintainers).union(origins).union(organisations)
>     result_df = result_df.dropDuplicates()
>     result_df = result_df.withColumnRenamed("false", EXTERNAL)
>     result_df = result_df.withColumnRenamed(PREFIX_ID, ID)
>     result_df = result_df.withColumnRenamed(PREFIX, VALUE)
>     return result_df
>
> [3]
> def create_edges(df: DataFrame) -> DataFrame:
>     """
>     Creates edges from DataFrame with IDs
>     Edges have the format:
>     SRC (str) | DST (str) | Created (str) | Last_modified (str)
>
>     Parameters
>     ----------
>     df : DataFrame
>         DataFrame to generate edges for
>     """
>     p_to_mnt = df.select(PREFIX_ID, MAINTAINER_ID, CREATED, LAST_MODIFIED)
>     m_to_o = df.select(MAINTAINER_ID, ORIGIN_ID, CREATED, LAST_MODIFIED)
>     o_to_org = df.select(ORIGIN_ID, ORGANISATION_ID, CREATED,
> LAST_MODIFIED)
>     o_to_p = df.select(ORIGIN_ID, PREFIX_ID, CREATED, LAST_MODIFIED)
>
>     edges = p_to_mnt.union(m_to_o).union(o_to_org).union(o_to_p)
>     # result_df = edges
>     result_df = edges.dropDuplicates()
>     result_df = result_df.withColumnRenamed(PREFIX_ID, SRC)
>     result_df = result_df.withColumnRenamed(MAINTAINER_ID, DST)
>     return result_df
>
> [4]
> # # Demonstrating bug with the edges, using UT's prefix/mnt/origin/org as
> example
> # # How-to-use: get the IDs and plug them in bug_show_related_edges()
> def bug_gather_ids(g: GraphFrame):
>     vertex = "130.89.0.0/16"
>     filtered_v = g.vertices.filter(psf.col(VALUE)==vertex)
>     filtered_v.show(truncate=False)
>
>     mnt = "SN-LIR-MNT RIPE-NCC-LEGACY-MNT"
>     filtered_m = g.vertices.filter(psf.col(VALUE)==mnt)
>     filtered_m.show(truncate=False)
>
>     origin = "1133"
>     filtered_o = g.vertices.filter(psf.col(VALUE)==origin)
>     filtered_o.show(truncate=False)
>
>     org = "Drienerlolaan 5 P.O. Box 217 NL - 7500 AE Enschede"
>     filtered_org = g.vertices.filter(psf.col(VALUE)==org)
>     filtered_org.show(truncate=False)
>
> def bug_show_related_edges(g: GraphFrame, p_id : str, m_id : str, o_id :
> str, org_id : str):
>     con1_m = psf.col(DST)==m_id
>     con2_m = psf.col(SRC)==m_id
>     edg1_m = g.edges.filter(con1_m)
>     edg1_m.show(truncate=False)
>     edg2_m = g.edges.filter(con2_m)
>     edg2_m.show(truncate=False)
>
>     con1_p = psf.col(DST)==p_id
>     con2_p = psf.col(SRC)==p_id
>     edg1_p = g.edges.filter(con1_p)
>     edg1_p.show(truncate=False)
>     edg2_p = g.edges.filter(con2_p)
>     edg2_p.show(truncate=False)
>
>     con1_o = psf.col(DST)==o_id
>     con2_o = psf.col(SRC)==o_id
>     edg1_o = g.edges.filter(con1_o)
>     edg1_o.show(truncate=False)
>     edg2_o = g.edges.filter(con2_o)
>     edg2_o.show(truncate=False)
>
>     con1_org = psf.col(DST)==org_id
>     con2_org = psf.col(SRC)==org_id
>     edg1_org = g.edges.filter(con1_org)
>     edg1_org.show(truncate=False)
>     edg2_org = g.edges.filter(con2_org)
>     edg2_org.show(truncate=False)
>
> # prefix 'p_60129612354' corresponds with 130.89.0.0/16
> # maintainer 'm_2897' corresponds with SN-LIR-MNT RIPE-NCC-LEGACY-MNT
> # origin 'o_5130' corresponds with1133
> # organisation 'org_197568516576' corresponds with Drienerlolaan 5 P.O.
> Box 217 NL - 7500 AE Enschede
> Output of bug_show_related_edges(g, "p_60129612354", "m_2897", "o_5130",
> "org_197568516576")
> # prefix -> maintainer edges (filtered on dst = maintainer)
> +---------------------+----------+----------------+-----------------+
> | src                             | dst           | created
> |last_modified|
> +---------------------+----------+----------------+-----------------+
> |p_197568533425 |m_2897 |0                         |0
>                 |
> |p_94489347499    |m_2897 |0                         |0
>                 |
> |p_25769898645    |m_2897 |1020678697 |1020678697   |
> |p_128849058299 |m_2897 |0                         |0
>                 |
> |p_68719514870    |m_2897 |0                         |0
>                 |
> |p_146028965124 |m_2897 |1020267786  |1020267786   |
> |p_60129579570    |m_2897 |0                          |0
>                |
> +---------------------+----------+-----------------+----------------+
>
> # maintainer to origin edges(filtered on src = maintainer)
> +---------+------------------+----------------+----------------+
> |src          | dst                        |created
> |last_modified|
> +---------+------------------+----------------+----------------+
> |m_2897|o_8589936949 |0                        |0
> |
> |m_2897|o_5130                 |0                        |0
>                |
> |m_2897|o_8589936949 |1020267786|1020267786   |
> |m_2897|o_8589936949 |1020678697|1020678697   |
> +---------+------------------+----------------+----------------+
>
> # origin to prefix edges (filtered in dst = prefix)
> +-------+-------------------+---------------+-----------------+
> |src       |dst                          |created          |last_modified |
> +-------+-------------------+---------------+-----------------+
> |o_380|p_60129612354|1220513130|1220513130   |
> +-------+-------------------+---------------+-----------------+
>
> # prefix to maintainer edges(filtered on src = prefix)
> +-------------------+--------+---------------+----------------+
> |src                           | dst       |created
> |last_modified|
> +-------------------+--------+---------------+----------------+
> |p_60129612354|m_533|1220513130|1220513130   |
> +-------------------+--------+---------------+----------------+
>
> # maintainer to origin edges(filtered on dst = origin)
> +---------+---------+----------+-----------------+
> |src          |dst          |created  |last_modified|
> +---------+---------+----------+-----------------+
> |m_2897|o_5130|0                 |0                           |
> +---------+---------+----------+-----------------+
>
> # origin to prefix(filtered on src = prefix)
> +--------+------------------------+---------+----------------+
> |src         |dst                                  |created|last_modified|
> +--------+------------------------+---------+----------------+
> |o_5130|p_94489347499       |0               |0                        |
> |o_5130|org_283467856326|0               |0                        |
> +--------+------------------------+---------+----------------+
>
> # origin to organisation(filtered on dst = organisation)
>
> +------------------+-----------------------+---------------+----------------+
> |src                         |dst                                 |created
>           |last_modified|
>
> +------------------+-----------------------+---------------+----------------+
> |o_8589936415|org_197568516576|1320919317|1320919317   |
> |o_8589936415|org_197568516576|1285464368|1285464368   |
>
> +------------------+-----------------------+---------------+----------------+
>
> # origin to organisation(filtered on src= organisation)
> # NB: this is intended, there are no outgoing edges from organisations
> +---+---+-------+-------------+
> |src|dst|created|last_modified|
> +---+---+-------+-------------+
> +---+---+-------+-------------+
>
> [6]
> # Determines which rows are missing from edges
> # Determines which rows in edges are not in src_df (superfluous)
> def checker_df(edges: DataFrame, src_df: DataFrame, src: str, dst: str,
> src_id: str, dst_id: str) -> DataFrame:
>     con1 = psf.col(SRC).startswith(src)
>     con2 = psf.col(DST).startswith(dst)
>     filtered_edges = edges.filter(con1 & con2)
>     if filtered_edges.count() == 0:
>         print(f"[Warning] No edges found after filtering! [checker_df] was
> checking {src_id} to {dst_id}")
>     if DEBUG:
>         print("[Checker_df] Printing Filtered edges:")
>         filtered_edges.show(truncate=False)
>         print(f"Records: {filtered_edges.count()}")
>     # Shows if there are rows which are not represented in the edge df
>     missing_edges = src_df.join(filtered_edges,
>             ((filtered_edges[SRC] == src_df[src_id]) &
>              (filtered_edges[DST] == src_df[dst_id])),
>              "left_anti")
>     missing_edges = missing_edges.dropDuplicates()
>     if DEBUG:
>         print("[Checker_df] Printing missing edges:")
>         missing_edges.show(truncate=False)
>         print(f"Records: {missing_edges.count()}")
>     #  Shows if there are edges which are not represented in the src df
>     superfluous_edges = filtered_edges.join(src_df,
>             ((filtered_edges[SRC] == src_df[src_id]) &
>              (filtered_edges[DST] == src_df[dst_id])),
>              "left")
>     superfluous_edges2 =
> superfluous_edges.filter(superfluous_edges[src_id].isNull() |
> superfluous_edges[dst_id].isNull())
>
>     if DEBUG:
>         print("[Checker_df] Printing superfluous edges:")
>         superfluous_edges2.show(truncate=False)
>         print(f"Records: {superfluous_edges2.count()}")
>     return missing_edges, superfluous_edges2
>
>

Reply via email to