Hi Mich, Thanks for your suggestions. 1) It currently runs on one server with plenty of resources assigned. But I will keep it in mind to replace monotonically_increasing_id() with uuid() once we scale up. 2) I have replaced the null values in origin with a string {prefix}-{mnt_by}-{organisation}
replacement_string = psf.concat_ws("-", psf.col("prefix"), psf.col("mnt_by"), psf.col("descr")) df = df.withColumn("origin", psf.coalesce(psf.col("origin"), replacement_string)) I have verified my other columns have no Null values. 3) This is my logic how i generate IDs mnt_by_id = df.select(MNT_BY).distinct().withColumn(MAINTAINER_ID, psf.concat(psf.lit('m_'), psf.monotonically_increasing_id())) prefix_id = df.select(PREFIX).distinct().withColumn(PREFIX_ID, psf.concat(psf.lit('p_'), psf.monotonically_increasing_id())) origin_id = df.select(ORIGIN).distinct().withColumn(ORIGIN_ID, psf.concat(psf.lit('o_'), psf.monotonically_increasing_id())) organisation_id = df.select(DESCR).distinct().withColumn(ORGANISATION_ID, psf.concat(psf.lit('org_'), psf.monotonically_increasing_id())) df = df.join(mnt_by_id, on=MNT_BY, how="left").join(prefix_id, on=PREFIX, how="left").join(origin_id, on=ORIGIN, how="left").join(organisation_id, on=DESCR, how="left") I create the ID using the distinct values in the columns "mnt_by", "prefix", "origin" and "descr". The same columns I join "on". 4) This is my current resource allocation, I run it on the server of my university. It has 112 cores and 1.48T ram, I can request more resources but in my eyes this sound be plenty. If you think more resource would help, I will ask them. spark_conf = SparkConf().setAppName(f"pyspark-{APP_NAME}-{int(time())}").set( "spark.submit.deployMode", "client" ).set("spark.sql.parquet.binaryAsString", "true" ).set("spark.driver.bindAddress", "localhost" ).set("spark.driver.host", "127.0.0.1" # ).set("spark.driver.port", "0" ).set("spark.ui.port", "4041" ).set("spark.executor.instances", "1" ).set("spark.executor.cores", "50" ).set("spark.executor.memory", "128G" ).set("spark.executor.memoryOverhead", "32G" ).set("spark.driver.cores", "16" ).set("spark.driver.memory", "64G" ) I dont think b) applies as its a single machine. Kind regards, Jelle ________________________________ From: Mich Talebzadeh <mich.talebza...@gmail.com> Sent: Wednesday, April 24, 2024 6:12 PM To: Nijland, J.G.W. (Jelle, Student M-CS) <j.g.w.nijl...@student.utwente.nl> Cc: user@spark.apache.org <user@spark.apache.org> Subject: Re: [spark-graphframes]: Generating incorrect edges OK let us have a look at these 1) You are using monotonically_increasing_id(), which is not collision-resistant in distributed environments like Spark. Multiple hosts can generate the same ID. I suggest switching to UUIDs (e.g., uuid.uuid4()) for guaranteed uniqueness. 2) Missing values in the Origin column lead to null IDs, potentially causing problems downstream. You can handle missing values appropriately, say a) Filter out rows with missing origins or b) impute missing values with a strategy that preserves relationships (if applicable). 3) With join code, you mentioned left joining on the same column used for ID creation, not very clear! 4) Edge Issue, it appears to me the issue seems to occur with larger datasets (>100K records). Possible causes could be a) Resource Constraints as data size increases, PySpark might struggle with joins or computations if resources are limited (memory, CPU). b) Data Skew: Uneven distribution of values in certain columns could lead to imbalanced processing across machines. Check Spark UI (4040) on staging and execution tabs HTH Mich Talebzadeh, Technologist | Architect | Data Engineer | Generative AI | FinCrime London United Kingdom [https://ci3.googleusercontent.com/mail-sig/AIorK4zholKucR2Q9yMrKbHNn-o1TuS4mYXyi2KO6Xmx6ikHPySa9MLaLZ8t2hrA6AUcxSxDgHIwmKE] view my Linkedin profile<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh Disclaimer: The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun> Von Braun<https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Wed, 24 Apr 2024 at 16:44, Nijland, J.G.W. (Jelle, Student M-CS) <j.g.w.nijl...@student.utwente.nl<mailto:j.g.w.nijl...@student.utwente.nl>> wrote: Hi Mich, Thanks for your reply, 1) ID generation is done using monotonically_increasing_id()<https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.monotonically_increasing_id.html> this is then prefixed with "p_", "m_", "o_" or "org_" depending on the type of the value it identifies. 2) There are some missing values in the Origin column, these will result in a Null ID 3) The join code is present in [1], I join "left" on the same column I create the ID on 4) I dont think the issue is in ID or edge generation, if i limit my input dataframe and union it with my Utwente data row, I can verify those edges are created correctly up to 100K records. Once I go past that amount of records the results become inconsistent and incorrect. Kind regards, Jelle Nijland ________________________________ From: Mich Talebzadeh <mich.talebza...@gmail.com<mailto:mich.talebza...@gmail.com>> Sent: Wednesday, April 24, 2024 4:40 PM To: Nijland, J.G.W. (Jelle, Student M-CS) <j.g.w.nijl...@student.utwente.nl<mailto:j.g.w.nijl...@student.utwente.nl>> Cc: user@spark.apache.org<mailto:user@spark.apache.org> <user@spark.apache.org<mailto:user@spark.apache.org>> Subject: Re: [spark-graphframes]: Generating incorrect edges OK few observations 1) ID Generation Method: How are you generating unique IDs (UUIDs, sequential numbers, etc.)? 2) Data Inconsistencies: Have you checked for missing values impacting ID generation? 3) Join Verification: If relevant, can you share the code for joining data points during ID creation? Are joins matching columns correctly? 4) Specific Edge Issues: Can you share examples of vertex IDs with incorrect connections? Is this related to ID generation or edge creation logic? HTH Mich Talebzadeh, Technologist | Architect | Data Engineer | Generative AI, FinCrime London United Kingdom [https://ci3.googleusercontent.com/mail-sig/AIorK4zholKucR2Q9yMrKbHNn-o1TuS4mYXyi2KO6Xmx6ikHPySa9MLaLZ8t2hrA6AUcxSxDgHIwmKE] view my Linkedin profile<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh Disclaimer: The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun> Von Braun<https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Wed, 24 Apr 2024 at 12:24, Nijland, J.G.W. (Jelle, Student M-CS) <j.g.w.nijl...@student.utwente.nl<mailto:j.g.w.nijl...@student.utwente.nl>> wrote: tags: pyspark,spark-graphframes Hello, I am running pyspark in a podman container and I have issues with incorrect edges when I build my graph. I start with loading a source dataframe from a parquet directory on my server. The source dataframe has the following columns: +---------+-------+-----------------+---------+------+-----------------+------+-------------------+ |created |descr |last_modified|mnt_by |origin|start_address|prefix |external_origin| +---------+-------+-----------------+---------+------+-----------------+------+-------------------+ I aim to build a graph connecting prefix, mnt_by, origin and descr with edges storing the created and last_modified values. I start with generating IDs for the prefix, mnt_by, origin and descr using monotonically_increasing_id() [1] These IDs are prefixed with "m_", "p_", "o_" or "org_" to ensure they are unique IDs across the dataframe. Then I construct the vertices dataframe by collecting the ID, value and whether they are external for each vertex. [2] These vertices are then unioned together. Following the vertices, I construct the edges dataframe by selecting the IDs that I want to be the src and the dst and union those together. [3] These edges store the created and last_modified. Now I am ready to construct the graph. Here is where I run into my issue. When verifying my graph, I looked at a couple of vertices to see if they have the correct edges. I looked at the Utwente prefix, origin, descr and mnt_by and found that it generates incorrect edges. I saw edges going out to vertices that are not associated with the utwente values at all. The methods to find the vertices, edges and the output can be found in [4] We can already observe inconsistencies by viewing the prefix->maintainer and origin -> prefix edges. [5] Depending on what column I filter on the results are inconsistent. To make matters worse some edges contain IDs that are not connected to the original values in the source dataframe at all. What I have tried to resolve my issue: * Write a checker that verifies edges created against the source dataframe. [6] The aim of this checker was to determine where the inconsistency comes fro, to locate the bug and resolve it. I ran this checker a limited graphs from n=10 upwards to n=1000000 (or 1m). This felt close enough as there are only ~6.5m records in my source dataframe. This ran correctly, near the 1m it did experience significant slowdown at the full dataframe it errors/times out. I blamed this on the large joins that it performs on the source dataframe. * I found a github issue of someone with significantly larger graphs have similar issues. One suggestion there blamed indexing using strings rather than ints or longs. I rewrote my system to use int for IDs but I ran into the same issue. The amount of incorrect edges was the same, the link to which incorrects vertices it links to was the same too. * I re-ordered my source dataframe to see what the impact was. This results in considerably more incorrect edges using the checker in [4] If helpful I can post the output of this checker as well. Can you give me any pointers in what I can try or what I can do to clarify my situation better? Thanks in advance for your time. Kind regards, Jelle Nijland [1] import pyspark.sql.functions as psf # ID labels PREFIX_ID = "prefix_id" MAINTAINER_ID = "mnt_by_id" ORIGIN_ID = "origin_id" ORGANISATION_ID = "organisation_id" # Source dataframe column names MNT_BY = "mnt_by" PREFIX = "prefix" ORIGIN = "origin" DESCR = "descr" EXTERNAL_O = "external_origin" def generate_ids(df: DataFrame) -> DataFrame: """ Generates a unique ID for each distinct maintainer, prefix, origin and organisation Parameters ---------- df : DataFrame DataFrame to generate IDs for """ mnt_by_id = df.select(MNT_BY).distinct().withColumn(MAINTAINER_ID, psf.concat(psf.lit('m_'), psf.monotonically_increasing_id())) prefix_id = df.select(PREFIX).distinct().withColumn(PREFIX_ID, psf.concat(psf.lit('p_'), psf.monotonically_increasing_id())) origin_id = df.select(ORIGIN).distinct().withColumn(ORIGIN_ID, psf.concat(psf.lit('o_'), psf.monotonically_increasing_id())) organisation_id = df.select(DESCR).distinct().withColumn(ORGANISATION_ID, psf.concat(psf.lit('org_'), psf.monotonically_increasing_id())) df = df.join(mnt_by_id, on=MNT_BY, how="left").join(prefix_id, on=PREFIX, how="left").join(origin_id, on=ORIGIN, how="left").join(organisation_id, on=DESCR, how="left") return df def create_vertices(df: DataFrame) -> DataFrame: """ Creates vertices from a DataFrame with IDs Vertices have the format: ID (str) | VALUE (str) | EXTERNAL (bool) ID follows the format [p_|o_|m_|org_][0-9]+ Parameters ---------- df : DataFrame DataFrame to generate vertices for """ prefixes = df.select(PREFIX_ID, PREFIX, psf.lit(False)) maintainers = df.select(MAINTAINER_ID, MNT_BY, psf.lit(False)) origins = df.select(ORIGIN_ID, ORIGIN, EXTERNAL_O) organisations = df.select(ORGANISATION_ID, DESCR, psf.lit(False)) result_df = prefixes.union(maintainers).union(origins).union(organisations) result_df = result_df.dropDuplicates() result_df = result_df.withColumnRenamed("false", EXTERNAL) result_df = result_df.withColumnRenamed(PREFIX_ID, ID) result_df = result_df.withColumnRenamed(PREFIX, VALUE) return result_df [3] def create_edges(df: DataFrame) -> DataFrame: """ Creates edges from DataFrame with IDs Edges have the format: SRC (str) | DST (str) | Created (str) | Last_modified (str) Parameters ---------- df : DataFrame DataFrame to generate edges for """ p_to_mnt = df.select(PREFIX_ID, MAINTAINER_ID, CREATED, LAST_MODIFIED) m_to_o = df.select(MAINTAINER_ID, ORIGIN_ID, CREATED, LAST_MODIFIED) o_to_org = df.select(ORIGIN_ID, ORGANISATION_ID, CREATED, LAST_MODIFIED) o_to_p = df.select(ORIGIN_ID, PREFIX_ID, CREATED, LAST_MODIFIED) edges = p_to_mnt.union(m_to_o).union(o_to_org).union(o_to_p) # result_df = edges result_df = edges.dropDuplicates() result_df = result_df.withColumnRenamed(PREFIX_ID, SRC) result_df = result_df.withColumnRenamed(MAINTAINER_ID, DST) return result_df [4] # # Demonstrating bug with the edges, using UT's prefix/mnt/origin/org as example # # How-to-use: get the IDs and plug them in bug_show_related_edges() def bug_gather_ids(g: GraphFrame): vertex = "130.89.0.0/16<http://130.89.0.0/16>" filtered_v = g.vertices.filter(psf.col(VALUE)==vertex) filtered_v.show(truncate=False) mnt = "SN-LIR-MNT RIPE-NCC-LEGACY-MNT" filtered_m = g.vertices.filter(psf.col(VALUE)==mnt) filtered_m.show(truncate=False) origin = "1133" filtered_o = g.vertices.filter(psf.col(VALUE)==origin) filtered_o.show(truncate=False) org = "Drienerlolaan 5 P.O. Box 217 NL - 7500 AE Enschede" filtered_org = g.vertices.filter(psf.col(VALUE)==org) filtered_org.show(truncate=False) def bug_show_related_edges(g: GraphFrame, p_id : str, m_id : str, o_id : str, org_id : str): con1_m = psf.col(DST)==m_id con2_m = psf.col(SRC)==m_id edg1_m = g.edges.filter(con1_m) edg1_m.show(truncate=False) edg2_m = g.edges.filter(con2_m) edg2_m.show(truncate=False) con1_p = psf.col(DST)==p_id con2_p = psf.col(SRC)==p_id edg1_p = g.edges.filter(con1_p) edg1_p.show(truncate=False) edg2_p = g.edges.filter(con2_p) edg2_p.show(truncate=False) con1_o = psf.col(DST)==o_id con2_o = psf.col(SRC)==o_id edg1_o = g.edges.filter(con1_o) edg1_o.show(truncate=False) edg2_o = g.edges.filter(con2_o) edg2_o.show(truncate=False) con1_org = psf.col(DST)==org_id con2_org = psf.col(SRC)==org_id edg1_org = g.edges.filter(con1_org) edg1_org.show(truncate=False) edg2_org = g.edges.filter(con2_org) edg2_org.show(truncate=False) # prefix 'p_60129612354' corresponds with 130.89.0.0/16<http://130.89.0.0/16> # maintainer 'm_2897' corresponds with SN-LIR-MNT RIPE-NCC-LEGACY-MNT # origin 'o_5130' corresponds with1133 # organisation 'org_197568516576' corresponds with Drienerlolaan 5 P.O. Box 217 NL - 7500 AE Enschede Output of bug_show_related_edges(g, "p_60129612354", "m_2897", "o_5130", "org_197568516576") # prefix -> maintainer edges (filtered on dst = maintainer) +---------------------+----------+----------------+-----------------+ | src | dst | created |last_modified| +---------------------+----------+----------------+-----------------+ |p_197568533425 |m_2897 |0 |0 | |p_94489347499 |m_2897 |0 |0 | |p_25769898645 |m_2897 |1020678697 |1020678697 | |p_128849058299 |m_2897 |0 |0 | |p_68719514870 |m_2897 |0 |0 | |p_146028965124 |m_2897 |1020267786 |1020267786 | |p_60129579570 |m_2897 |0 |0 | +---------------------+----------+-----------------+----------------+ # maintainer to origin edges(filtered on src = maintainer) +---------+------------------+----------------+----------------+ |src | dst |created |last_modified| +---------+------------------+----------------+----------------+ |m_2897|o_8589936949 |0 |0 | |m_2897|o_5130 |0 |0 | |m_2897|o_8589936949 |1020267786|1020267786 | |m_2897|o_8589936949 |1020678697|1020678697 | +---------+------------------+----------------+----------------+ # origin to prefix edges (filtered in dst = prefix) +-------+-------------------+---------------+-----------------+ |src |dst |created |last_modified | +-------+-------------------+---------------+-----------------+ |o_380|p_60129612354|1220513130|1220513130 | +-------+-------------------+---------------+-----------------+ # prefix to maintainer edges(filtered on src = prefix) +-------------------+--------+---------------+----------------+ |src | dst |created |last_modified| +-------------------+--------+---------------+----------------+ |p_60129612354|m_533|1220513130|1220513130 | +-------------------+--------+---------------+----------------+ # maintainer to origin edges(filtered on dst = origin) +---------+---------+----------+-----------------+ |src |dst |created |last_modified| +---------+---------+----------+-----------------+ |m_2897|o_5130|0 |0 | +---------+---------+----------+-----------------+ # origin to prefix(filtered on src = prefix) +--------+------------------------+---------+----------------+ |src |dst |created|last_modified| +--------+------------------------+---------+----------------+ |o_5130|p_94489347499 |0 |0 | |o_5130|org_283467856326|0 |0 | +--------+------------------------+---------+----------------+ # origin to organisation(filtered on dst = organisation) +------------------+-----------------------+---------------+----------------+ |src |dst |created |last_modified| +------------------+-----------------------+---------------+----------------+ |o_8589936415|org_197568516576|1320919317|1320919317 | |o_8589936415|org_197568516576|1285464368|1285464368 | +------------------+-----------------------+---------------+----------------+ # origin to organisation(filtered on src= organisation) # NB: this is intended, there are no outgoing edges from organisations +---+---+-------+-------------+ |src|dst|created|last_modified| +---+---+-------+-------------+ +---+---+-------+-------------+ [6] # Determines which rows are missing from edges # Determines which rows in edges are not in src_df (superfluous) def checker_df(edges: DataFrame, src_df: DataFrame, src: str, dst: str, src_id: str, dst_id: str) -> DataFrame: con1 = psf.col(SRC).startswith(src) con2 = psf.col(DST).startswith(dst) filtered_edges = edges.filter(con1 & con2) if filtered_edges.count() == 0: print(f"[Warning] No edges found after filtering! [checker_df] was checking {src_id} to {dst_id}") if DEBUG: print("[Checker_df] Printing Filtered edges:") filtered_edges.show(truncate=False) print(f"Records: {filtered_edges.count()}") # Shows if there are rows which are not represented in the edge df missing_edges = src_df.join(filtered_edges, ((filtered_edges[SRC] == src_df[src_id]) & (filtered_edges[DST] == src_df[dst_id])), "left_anti") missing_edges = missing_edges.dropDuplicates() if DEBUG: print("[Checker_df] Printing missing edges:") missing_edges.show(truncate=False) print(f"Records: {missing_edges.count()}") # Shows if there are edges which are not represented in the src df superfluous_edges = filtered_edges.join(src_df, ((filtered_edges[SRC] == src_df[src_id]) & (filtered_edges[DST] == src_df[dst_id])), "left") superfluous_edges2 = superfluous_edges.filter(superfluous_edges[src_id].isNull() | superfluous_edges[dst_id].isNull()) if DEBUG: print("[Checker_df] Printing superfluous edges:") superfluous_edges2.show(truncate=False) print(f"Records: {superfluous_edges2.count()}") return missing_edges, superfluous_edges2