Re: [spark-graphframes]: Generating incorrect edges

2024-05-11 Thread Nijland, J.G.W. (Jelle, Student M-CS)
Hi all,

The issue is solved.
I conducted a lot more testing and built checkers to verify at which size it's 
going wrong.
When checking for specific edges, I could construct successful graphs up to 
261k records.
When verifying all edges created, is breaks somewhere in the 200-250k records.
I didn't bother finding the specific error threshold, as runs take up to 7 
minutes per slice.

I started looking at all underlying assumptions of my code along with my 
supervisor.
We located the problem in the generate_ids() function.
I selected all distinct values to give them an ID and subsequently joining 
those results back to the main DataFrame.
I replaced this by generating unique IDs for each value occurrence by hashing 
them with 'withColumn' rather than joining them back.
This resolved my issues and ended up to be a significant performance boost as 
well.

My fixed generate_ids() code
def generate_ids(df: DataFrame) -> DataFrame:
   """
   Generates a unique ID for each distinct maintainer, prefix, origin and 
organisation

   Parameters
   --
   df : DataFrame
   DataFrame to generate IDs for
   """
   df = df.withColumn(MAINTAINER_ID, psf.concat(psf.lit(PREFIX_M), 
psf.sha2(df.mnt_by, 256)))
   df = df.withColumn(PREFIX_ID, psf.concat(psf.lit(PREFIX_P), 
psf.sha2(df.prefix, 256)))
   df = df.withColumn(ORIGIN_ID, psf.concat(psf.lit(PREFIX_O), 
psf.sha2(df.origin, 256)))
   df = df.withColumn(ORGANISATION_ID, psf.concat(psf.lit(PREFIX_ORG), 
psf.sha2(df.descr, 256)))
   return df

Hope this email finds someone running into a similar issue in the future.

Kind regards,
Jelle




From: Mich Talebzadeh 
Sent: Wednesday, May 1, 2024 11:56 AM
To: Stephen Coy 
Cc: Nijland, J.G.W. (Jelle, Student M-CS) ; 
user@spark.apache.org 
Subject: Re: [spark-graphframes]: Generating incorrect edges

Hi Steve,

Thanks for your statement. I tend to use uuid myself to avoid collisions. This 
built-in function generates random IDs that are highly likely to be unique 
across systems. My concerns are on edge so to speak. If the Spark application 
runs for a very long time or encounters restarts, the 
monotonically_increasing_id() sequence might restart from the beginning. This 
could again cause duplicate IDs if other Spark applications are running 
concurrently or if data is processed across multiple runs of the same 
application..

HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
London
United Kingdom


 
[https://ci3.googleusercontent.com/mail-sig/AIorK4zholKucR2Q9yMrKbHNn-o1TuS4mYXyi2KO6Xmx6ikHPySa9MLaLZ8t2hrA6AUcxSxDgHIwmKE]
   view my Linkedin 
profile<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: The information provided is correct to the best of my knowledge but 
of course cannot be guaranteed . It is essential to note that, as with any 
advice, quote "one test result is worth one-thousand expert opinions (Werner 
<https://en.wikipedia.org/wiki/Wernher_von_Braun> Von 
Braun<https://en.wikipedia.org/wiki/Wernher_von_Braun>)".


On Wed, 1 May 2024 at 01:22, Stephen Coy 
mailto:s...@infomedia.com.au>> wrote:
Hi Mich,

I was just reading random questions on the user list when I noticed that you 
said:

On 25 Apr 2024, at 2:12 AM, Mich Talebzadeh 
mailto:mich.talebza...@gmail.com>> wrote:

1) You are using monotonically_increasing_id(), which is not 
collision-resistant in distributed environments like Spark. Multiple hosts
   can generate the same ID. I suggest switching to UUIDs (e.g., uuid.uuid4()) 
for guaranteed uniqueness.


It’s my understanding that the *Spark* `monotonically_increasing_id()` function 
exists for the exact purpose of generating a collision-resistant unique id 
across nodes on different hosts.
We use it extensively for this purpose and have never encountered an issue.

Are we wrong or are you thinking of a different (not Spark) function?

Cheers,

Steve C




This email contains confidential information of and is the copyright of 
Infomedia. It must not be forwarded, amended or disclosed without consent of 
the sender. If you received this message by mistake, please advise the sender 
and delete all copies. Security of transmission on the internet cannot be 
guaranteed, could be infected, intercepted, or corrupted and you should ensure 
you have suitable antivirus protection in place. By sending us your or any 
third party personal details, you consent to (or confirm you have obtained 
consent from such third parties) to Infomedia’s privacy policy. 
http://www.infomedia.com.au/privacy-policy/


Re: [spark-graphframes]: Generating incorrect edges

2024-05-01 Thread Mich Talebzadeh
Hi Steve,

Thanks for your statement. I tend to use uuid myself to avoid
collisions. This built-in function generates random IDs that are highly
likely to be unique across systems. My concerns are on edge so to speak. If
the Spark application runs for a very long time or encounters restarts, the
monotonically_increasing_id() sequence might restart from the beginning.
This could again cause duplicate IDs if other Spark applications are
running concurrently or if data is processed across multiple runs of the
same application..

HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Wed, 1 May 2024 at 01:22, Stephen Coy  wrote:

> Hi Mich,
>
> I was just reading random questions on the user list when I noticed that
> you said:
>
> On 25 Apr 2024, at 2:12 AM, Mich Talebzadeh 
> wrote:
>
> 1) You are using monotonically_increasing_id(), which is not
> collision-resistant in distributed environments like Spark. Multiple hosts
>can generate the same ID. I suggest switching to UUIDs (e.g.,
> uuid.uuid4()) for guaranteed uniqueness.
>
>
> It’s my understanding that the *Spark* `monotonically_increasing_id()`
> function exists for the exact purpose of generating a collision-resistant
> unique id across nodes on different hosts.
> We use it extensively for this purpose and have never encountered an issue.
>
> Are we wrong or are you thinking of a different (not Spark) function?
>
> Cheers,
>
> Steve C
>
>
>
>
> This email contains confidential information of and is the copyright of
> Infomedia. It must not be forwarded, amended or disclosed without consent
> of the sender. If you received this message by mistake, please advise the
> sender and delete all copies. Security of transmission on the internet
> cannot be guaranteed, could be infected, intercepted, or corrupted and you
> should ensure you have suitable antivirus protection in place. By sending
> us your or any third party personal details, you consent to (or confirm you
> have obtained consent from such third parties) to Infomedia’s privacy
> policy. http://www.infomedia.com.au/privacy-policy/
>


Re: [spark-graphframes]: Generating incorrect edges

2024-04-30 Thread Stephen Coy
Hi Mich,

I was just reading random questions on the user list when I noticed that you 
said:

On 25 Apr 2024, at 2:12 AM, Mich Talebzadeh  wrote:

1) You are using monotonically_increasing_id(), which is not 
collision-resistant in distributed environments like Spark. Multiple hosts
   can generate the same ID. I suggest switching to UUIDs (e.g., uuid.uuid4()) 
for guaranteed uniqueness.


It’s my understanding that the *Spark* `monotonically_increasing_id()` function 
exists for the exact purpose of generating a collision-resistant unique id 
across nodes on different hosts.
We use it extensively for this purpose and have never encountered an issue.

Are we wrong or are you thinking of a different (not Spark) function?

Cheers,

Steve C




This email contains confidential information of and is the copyright of 
Infomedia. It must not be forwarded, amended or disclosed without consent of 
the sender. If you received this message by mistake, please advise the sender 
and delete all copies. Security of transmission on the internet cannot be 
guaranteed, could be infected, intercepted, or corrupted and you should ensure 
you have suitable antivirus protection in place. By sending us your or any 
third party personal details, you consent to (or confirm you have obtained 
consent from such third parties) to Infomedia’s privacy policy. 
http://www.infomedia.com.au/privacy-policy/


Re: [spark-graphframes]: Generating incorrect edges

2024-04-25 Thread Nijland, J.G.W. (Jelle, Student M-CS)
Hi Mich,

Thanks for your suggestions.
1) It currently runs on one server with plenty of resources assigned. But I 
will keep it in mind to replace monotonically_increasing_id() with uuid() once 
we scale up.
2) I have replaced the null values in origin with a string 
{prefix}-{mnt_by}-{organisation}

replacement_string = psf.concat_ws("-", psf.col("prefix"), psf.col("mnt_by"), 
psf.col("descr"))
df = df.withColumn("origin", psf.coalesce(psf.col("origin"), 
replacement_string))

I have verified my other columns have no Null values.

3) This is my logic how i generate IDs

mnt_by_id = df.select(MNT_BY).distinct().withColumn(MAINTAINER_ID, 
psf.concat(psf.lit('m_'), psf.monotonically_increasing_id()))
prefix_id = df.select(PREFIX).distinct().withColumn(PREFIX_ID, 
psf.concat(psf.lit('p_'), psf.monotonically_increasing_id()))
origin_id = df.select(ORIGIN).distinct().withColumn(ORIGIN_ID, 
psf.concat(psf.lit('o_'), psf.monotonically_increasing_id()))
organisation_id = df.select(DESCR).distinct().withColumn(ORGANISATION_ID, 
psf.concat(psf.lit('org_'), psf.monotonically_increasing_id()))

df = df.join(mnt_by_id, on=MNT_BY, how="left").join(prefix_id, on=PREFIX, 
how="left").join(origin_id, on=ORIGIN, how="left").join(organisation_id, 
on=DESCR, how="left")

I create the ID using the distinct values in the columns "mnt_by", "prefix", 
"origin" and "descr". The same columns I join "on".

4) This is my current resource allocation, I run it on the server of my 
university.
It has 112 cores and 1.48T ram, I can request more resources but in my eyes 
this sound be plenty.
If you think more resource would help, I will ask them.

spark_conf = SparkConf().setAppName(f"pyspark-{APP_NAME}-{int(time())}").set(
"spark.submit.deployMode", "client"
).set("spark.sql.parquet.binaryAsString", "true"
).set("spark.driver.bindAddress", "localhost"
).set("spark.driver.host", "127.0.0.1"
# ).set("spark.driver.port", "0"
).set("spark.ui.port", "4041"
).set("spark.executor.instances", "1"
).set("spark.executor.cores", "50"
).set("spark.executor.memory", "128G"
).set("spark.executor.memoryOverhead", "32G"
).set("spark.driver.cores", "16"
).set("spark.driver.memory", "64G"
)

I dont think b) applies as its a single machine.

Kind regards,
Jelle


From: Mich Talebzadeh 
Sent: Wednesday, April 24, 2024 6:12 PM
To: Nijland, J.G.W. (Jelle, Student M-CS) 
Cc: user@spark.apache.org 
Subject: Re: [spark-graphframes]: Generating incorrect edges

OK let us have a look at these

1) You are using monotonically_increasing_id(), which is not 
collision-resistant in distributed environments like Spark. Multiple hosts
   can generate the same ID. I suggest switching to UUIDs (e.g., uuid.uuid4()) 
for guaranteed uniqueness.

2) Missing values in the Origin column lead to null IDs, potentially causing 
problems downstream. You can handle missing values appropriately, say
   a) Filter out rows with missing origins or b) impute missing values with a 
strategy that preserves relationships (if applicable).

3) With join code, you mentioned left joining on the same column used for ID 
creation, not very clear!

4) Edge Issue, it appears to me the issue seems to occur with larger datasets 
(>100K records). Possible causes could be
   a) Resource Constraints as data size increases, PySpark might struggle with 
joins or computations if resources are limited (memory, CPU).
   b) Data Skew: Uneven distribution of values in certain columns could lead to 
imbalanced processing across machines.  Check Spark UI (4040) on staging and 
execution tabs

HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
London
United Kingdom


 
[https://ci3.googleusercontent.com/mail-sig/AIorK4zholKucR2Q9yMrKbHNn-o1TuS4mYXyi2KO6Xmx6ikHPySa9MLaLZ8t2hrA6AUcxSxDgHIwmKE]
   view my Linkedin 
profile<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: The information provided is correct to the best of my knowledge but 
of course cannot be guaranteed . It is essential to note that, as with any 
advice, quote "one test result is worth one-thousand expert opinions (Werner 
<https://en.wikipedia.org/wiki/Wernher_von_Braun> Von 
Braun<https://en.wikipedia.org/wiki/Wernher_von_Braun>)".


On Wed, 24 Apr 2024 at 16:44, Nijland, J.G.W. (Jelle, Student M-CS) 
mailto:j.g.w.nijl...@student.utwente.nl>> 
wrote:
Hi Mich,

Thanks for your reply,
1) ID generation is done using 
monotonically_increasing_id()<https://spark.apache.org/

Re: [spark-graphframes]: Generating incorrect edges

2024-04-24 Thread Mich Talebzadeh
OK let us have a look at these

1) You are using monotonically_increasing_id(), which is not
collision-resistant in distributed environments like Spark. Multiple hosts
   can generate the same ID. I suggest switching to UUIDs (e.g.,
uuid.uuid4()) for guaranteed uniqueness.

2) Missing values in the Origin column lead to null IDs, potentially
causing problems downstream. You can handle missing values appropriately,
say
   a) Filter out rows with missing origins or b) impute missing values with
a strategy that preserves relationships (if applicable).

3) With join code, you mentioned left joining on the same column used for
ID creation, not very clear!

4) Edge Issue, it appears to me the issue seems to occur with larger
datasets (>100K records). Possible causes could be
   a) Resource Constraints as data size increases, PySpark might struggle
with joins or computations if resources are limited (memory, CPU).
   b) Data Skew: Uneven distribution of values in certain columns could
lead to imbalanced processing across machines.  Check Spark UI (4040) on
staging and execution tabs

HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
London
United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".


On Wed, 24 Apr 2024 at 16:44, Nijland, J.G.W. (Jelle, Student M-CS) <
j.g.w.nijl...@student.utwente.nl> wrote:

> Hi Mich,
>
> Thanks for your reply,
> 1) ID generation is done using monotonically_increasing_id()
> <https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.monotonically_increasing_id.html>
>  this
> is then prefixed with "p_", "m_", "o_" or "org_" depending on the type of
> the value it identifies.
> 2) There are some missing values in the Origin column, these will result
> in a Null ID
> 3) The join code is present in [1], I join "left" on the same column
> I create the ID on
> 4) I dont think the issue is in ID or edge generation, if i limit my input
> dataframe and union it with my Utwente data row, I can verify those edges
> are created correctly up to 100K records.
> Once I go past that amount of records the results become inconsistent and
> incorrect.
>
> Kind regards,
> Jelle Nijland
>
>
> --
> *From:* Mich Talebzadeh 
> *Sent:* Wednesday, April 24, 2024 4:40 PM
> *To:* Nijland, J.G.W. (Jelle, Student M-CS) <
> j.g.w.nijl...@student.utwente.nl>
> *Cc:* user@spark.apache.org 
> *Subject:* Re: [spark-graphframes]: Generating incorrect edges
>
> OK few observations
>
> 1) ID Generation Method: How are you generating unique IDs (UUIDs,
> sequential numbers, etc.)?
> 2) Data Inconsistencies: Have you checked for missing values impacting ID
> generation?
> 3) Join Verification: If relevant, can you share the code for joining data
> points during ID creation? Are joins matching columns correctly?
> 4) Specific Edge Issues: Can you share examples of vertex IDs with
> incorrect connections? Is this related to ID generation or edge creation
> logic?
>
> HTH
> Mich Talebzadeh,
> Technologist | Architect | Data Engineer  | Generative AI, FinCrime
> London
> United Kingdom
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
> Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>
>
> On Wed, 24 Apr 2024 at 12:24, Nijland, J.G.W. (Jelle, Student M-CS) <
> j.g.w.nijl...@student.utwente.nl> wrote:
>
> tags: pyspark,spark-graphframes
>
> Hello,
>
> I am running pyspark in a podman container and I have issues with
> incorrect edges when I build my graph.
> I start with loading a source dataframe from a parquet directory on my
> server. The source dataframe has the following columns:
>
> +-+---+-+-+--+-+--+---+
> |created |descr |last_modified

Re: [spark-graphframes]: Generating incorrect edges

2024-04-24 Thread Nijland, J.G.W. (Jelle, Student M-CS)
Hi Mich,

Thanks for your reply,
1) ID generation is done using 
monotonically_increasing_id()<https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.monotonically_increasing_id.html>
 this is then prefixed with "p_", "m_", "o_" or "org_" depending on the type of 
the value it identifies.
2) There are some missing values in the Origin column, these will result in a 
Null ID
3) The join code is present in [1], I join "left" on the same column I create 
the ID on
4) I dont think the issue is in ID or edge generation, if i limit my input 
dataframe and union it with my Utwente data row, I can verify those edges are 
created correctly up to 100K records.
Once I go past that amount of records the results become inconsistent and 
incorrect.

Kind regards,
Jelle Nijland



From: Mich Talebzadeh 
Sent: Wednesday, April 24, 2024 4:40 PM
To: Nijland, J.G.W. (Jelle, Student M-CS) 
Cc: user@spark.apache.org 
Subject: Re: [spark-graphframes]: Generating incorrect edges

OK few observations

1) ID Generation Method: How are you generating unique IDs (UUIDs, sequential 
numbers, etc.)?
2) Data Inconsistencies: Have you checked for missing values impacting ID 
generation?
3) Join Verification: If relevant, can you share the code for joining data 
points during ID creation? Are joins matching columns correctly?
4) Specific Edge Issues: Can you share examples of vertex IDs with incorrect 
connections? Is this related to ID generation or edge creation logic?

HTH
Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI, FinCrime
London
United Kingdom


 
[https://ci3.googleusercontent.com/mail-sig/AIorK4zholKucR2Q9yMrKbHNn-o1TuS4mYXyi2KO6Xmx6ikHPySa9MLaLZ8t2hrA6AUcxSxDgHIwmKE]
   view my Linkedin 
profile<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: The information provided is correct to the best of my knowledge but 
of course cannot be guaranteed . It is essential to note that, as with any 
advice, quote "one test result is worth one-thousand expert opinions (Werner 
<https://en.wikipedia.org/wiki/Wernher_von_Braun> Von 
Braun<https://en.wikipedia.org/wiki/Wernher_von_Braun>)".


On Wed, 24 Apr 2024 at 12:24, Nijland, J.G.W. (Jelle, Student M-CS) 
mailto:j.g.w.nijl...@student.utwente.nl>> 
wrote:
tags: pyspark,spark-graphframes

Hello,

I am running pyspark in a podman container and I have issues with incorrect 
edges when I build my graph.
I start with loading a source dataframe from a parquet directory on my server. 
The source dataframe has the following columns:
+-+---+-+-+--+-+--+---+
|created |descr |last_modified|mnt_by |origin|start_address|prefix 
|external_origin|
+-+---+-+-+--+-+--+---+

I aim to build a graph connecting prefix, mnt_by, origin and descr with edges 
storing the created and last_modified values.
I start with generating IDs for the prefix, mnt_by, origin and descr using 
monotonically_increasing_id() [1]
These IDs are prefixed with "m_", "p_", "o_" or "org_" to ensure they are 
unique IDs across the dataframe.

Then I construct the vertices dataframe by collecting the ID, value and whether 
they are external for each vertex. [2]
These vertices are then unioned together.
Following the vertices, I construct the edges dataframe by selecting the IDs 
that I want to be the src and the dst and union those together. [3]
These edges store the created and last_modified.

Now I am ready to construct the graph. Here is where I run into my issue.

When verifying my graph, I looked at a couple of vertices to see if they have 
the correct edges.
I looked at the Utwente prefix, origin, descr and mnt_by and found that it 
generates incorrect edges.

I saw edges going out to vertices that are not associated with the utwente 
values at all.
The methods to find the vertices, edges and the output can be found in [4]
We can already observe inconsistencies by viewing the prefix->maintainer and 
origin -> prefix edges. [5]
Depending on what column I filter on the results are inconsistent.
To make matters worse some edges contain IDs that are not connected to the 
original values in the source dataframe at all.

What I have tried to resolve my issue:

  *
Write a checker that verifies edges created against the source dataframe. [6]
The aim of this checker was to determine where the inconsistency comes fro, to 
locate the bug and resolve it.
I ran this checker a limited graphs from n=10 upwards to n=100 (or 1m).
This felt close enough as there are only ~6.5m records in my source dataframe.
This ran correctly, near the 1m it did experience significant slowdown at the 
f

Re: [spark-graphframes]: Generating incorrect edges

2024-04-24 Thread Mich Talebzadeh
OK few observations

1) ID Generation Method: How are you generating unique IDs (UUIDs,
sequential numbers, etc.)?
2) Data Inconsistencies: Have you checked for missing values impacting ID
generation?
3) Join Verification: If relevant, can you share the code for joining data
points during ID creation? Are joins matching columns correctly?
4) Specific Edge Issues: Can you share examples of vertex IDs with
incorrect connections? Is this related to ID generation or edge creation
logic?

HTH
Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI, FinCrime
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Wed, 24 Apr 2024 at 12:24, Nijland, J.G.W. (Jelle, Student M-CS) <
j.g.w.nijl...@student.utwente.nl> wrote:

> tags: pyspark,spark-graphframes
>
> Hello,
>
> I am running pyspark in a podman container and I have issues with
> incorrect edges when I build my graph.
> I start with loading a source dataframe from a parquet directory on my
> server. The source dataframe has the following columns:
>
> +-+---+-+-+--+-+--+---+
> |created |descr |last_modified|mnt_by |origin|start_address|prefix
> |external_origin|
>
> +-+---+-+-+--+-+--+---+
>
> I aim to build a graph connecting prefix, mnt_by, origin and descr with
> edges storing the created and last_modified values.
> I start with generating IDs for the prefix, mnt_by, origin and descr using
> monotonically_increasing_id() [1]
> These IDs are prefixed with "m_", "p_", "o_" or "org_" to ensure they are
> unique IDs across the dataframe.
>
> Then I construct the vertices dataframe by collecting the ID, value and
> whether they are external for each vertex. [2]
> These vertices are then unioned together.
> Following the vertices, I construct the edges dataframe by selecting the
> IDs that I want to be the src and the dst and union those together. [3]
> These edges store the created and last_modified.
>
> Now I am ready to construct the graph. Here is where I run into my issue.
>
> When verifying my graph, I looked at a couple of vertices to see if they
> have the correct edges.
> I looked at the Utwente prefix, origin, descr and mnt_by and found that it
> generates incorrect edges.
>
> I saw edges going out to vertices that are not associated with the utwente
> values at all.
> The methods to find the vertices, edges and the output can be found in [4]
> We can already observe inconsistencies by viewing the prefix->maintainer
> and origin -> prefix edges. [5]
> Depending on what column I filter on the results are inconsistent.
> To make matters worse some edges contain IDs that are not connected to the
> original values in the source dataframe at all.
>
> What I have tried to resolve my issue:
>
>- Write a checker that verifies edges created against the source
>dataframe. [6]
>The aim of this checker was to determine where the inconsistency comes
>fro, to locate the bug and resolve it.
>I ran this checker a limited graphs from n=10 upwards to n=100 (or
>1m).
>This felt close enough as there are only ~6.5m records in my source
>dataframe.
>This ran correctly, near the 1m it did experience significant slowdown
>at the full dataframe it errors/times out.
>I blamed this on the large joins that it performs on the source
>dataframe.
>- I found a github issue of someone with significantly larger graphs
>have similar issues.
>One suggestion there blamed indexing using strings rather than ints or
>longs.
>I rewrote my system to use int for IDs but I ran into the same issue.
>The amount of incorrect edges was the same, the link to which
>incorrects vertices it links to was the same too.
>- I re-ordered my source dataframe to see what the impact was.
>This results in considerably more incorrect edges using the checker in
>[4]
>If helpful I can post the output of this checker as well.
>
>
> Can you give me any pointers in what I can try or what I can do to clarify
> my situation better?
> Thanks in advance for your time.
>
> Kind regards,
> Jelle Nijland
>
>
>
>
> [1]
> import pyspark.sql.functions as psf
>
> # ID labels
> PREFIX_ID = "prefix_id"
> MAINTAINER_ID = "mnt_by_id"
> ORIGIN_ID = "origin_id"
> ORGANISATION_ID = "organisation_id"
>
> # Source dataframe column names
> MNT_BY = "mnt_by"
> PREFIX = "prefix"
> ORIGIN = "origin"
> DESCR = "descr"
> 

[spark-graphframes]: Generating incorrect edges

2024-04-24 Thread Nijland, J.G.W. (Jelle, Student M-CS)
tags: pyspark,spark-graphframes

Hello,

I am running pyspark in a podman container and I have issues with incorrect 
edges when I build my graph.
I start with loading a source dataframe from a parquet directory on my server. 
The source dataframe has the following columns:
+-+---+-+-+--+-+--+---+
|created |descr |last_modified|mnt_by |origin|start_address|prefix 
|external_origin|
+-+---+-+-+--+-+--+---+

I aim to build a graph connecting prefix, mnt_by, origin and descr with edges 
storing the created and last_modified values.
I start with generating IDs for the prefix, mnt_by, origin and descr using 
monotonically_increasing_id() [1]
These IDs are prefixed with "m_", "p_", "o_" or "org_" to ensure they are 
unique IDs across the dataframe.

Then I construct the vertices dataframe by collecting the ID, value and whether 
they are external for each vertex. [2]
These vertices are then unioned together.
Following the vertices, I construct the edges dataframe by selecting the IDs 
that I want to be the src and the dst and union those together. [3]
These edges store the created and last_modified.

Now I am ready to construct the graph. Here is where I run into my issue.

When verifying my graph, I looked at a couple of vertices to see if they have 
the correct edges.
I looked at the Utwente prefix, origin, descr and mnt_by and found that it 
generates incorrect edges.

I saw edges going out to vertices that are not associated with the utwente 
values at all.
The methods to find the vertices, edges and the output can be found in [4]
We can already observe inconsistencies by viewing the prefix->maintainer and 
origin -> prefix edges. [5]
Depending on what column I filter on the results are inconsistent.
To make matters worse some edges contain IDs that are not connected to the 
original values in the source dataframe at all.

What I have tried to resolve my issue:

  *
Write a checker that verifies edges created against the source dataframe. [6]
The aim of this checker was to determine where the inconsistency comes fro, to 
locate the bug and resolve it.
I ran this checker a limited graphs from n=10 upwards to n=100 (or 1m).
This felt close enough as there are only ~6.5m records in my source dataframe.
This ran correctly, near the 1m it did experience significant slowdown at the 
full dataframe it errors/times out.
I blamed this on the large joins that it performs on the source dataframe.
  *
I found a github issue of someone with significantly larger graphs have similar 
issues.
One suggestion there blamed indexing using strings rather than ints or longs.
I rewrote my system to use int for IDs but I ran into the same issue.
The amount of incorrect edges was the same, the link to which incorrects 
vertices it links to was the same too.
  *
I re-ordered my source dataframe to see what the impact was.
This results in considerably more incorrect edges using the checker in [4]
If helpful I can post the output of this checker as well.

Can you give me any pointers in what I can try or what I can do to clarify my 
situation better?
Thanks in advance for your time.

Kind regards,
Jelle Nijland




[1]
import pyspark.sql.functions as psf

# ID labels
PREFIX_ID = "prefix_id"
MAINTAINER_ID = "mnt_by_id"
ORIGIN_ID = "origin_id"
ORGANISATION_ID = "organisation_id"

# Source dataframe column names
MNT_BY = "mnt_by"
PREFIX = "prefix"
ORIGIN = "origin"
DESCR = "descr"
EXTERNAL_O = "external_origin"


def generate_ids(df: DataFrame) -> DataFrame:
"""
Generates a unique ID for each distinct maintainer, prefix, origin and 
organisation

Parameters
--
df : DataFrame
DataFrame to generate IDs for
"""
mnt_by_id = df.select(MNT_BY).distinct().withColumn(MAINTAINER_ID, 
psf.concat(psf.lit('m_'), psf.monotonically_increasing_id()))
prefix_id = df.select(PREFIX).distinct().withColumn(PREFIX_ID, 
psf.concat(psf.lit('p_'), psf.monotonically_increasing_id()))
origin_id = df.select(ORIGIN).distinct().withColumn(ORIGIN_ID, 
psf.concat(psf.lit('o_'), psf.monotonically_increasing_id()))
organisation_id = df.select(DESCR).distinct().withColumn(ORGANISATION_ID, 
psf.concat(psf.lit('org_'), psf.monotonically_increasing_id()))

df = df.join(mnt_by_id, on=MNT_BY, how="left").join(prefix_id, on=PREFIX, 
how="left").join(origin_id, on=ORIGIN, how="left").join(organisation_id, 
on=DESCR, how="left")
return df

def create_vertices(df: DataFrame) -> DataFrame:
"""
Creates vertices from a DataFrame with IDs
Vertices have the format:
ID (str) | VALUE (str) | EXTERNAL (bool)

ID follows the format [p_|o_|m_|org_][0-9]+

Parameters
--
df : DataFrame
DataFrame to generate vertices for
"""
prefixes = df.select(PREFIX_ID, PREFIX, psf.lit(False))
maintainers = df.select(MAINTAINER_ID,