Re: [GraphX]: Prevent recomputation of DAG

2024-03-18 Thread Mich Talebzadeh
Hi,

I must admit I don't know much about this Fruchterman-Reingold (call
it FR) visualization using GraphX and Kubernetes..But you are
suggesting this slowdown issue starts after the second iteration, and
caching/persisting the graph after each iteration does not help. FR
involves many computations between vertex pairs. In MapReduce (or
shuffle) steps, Data might be shuffled across the network, impacting
performance for large graphs. The usual steps to verify this is
through Spark UI in Stages, SQL and execute tabbs, You will see the
time taken for each step and the amount of read/write  etc. Also
repeatedly creating and destroying GraphX graphs in each iteration may
lead to garbage collection (GC) overhead.So you should consider r
profiling your application to identify bottlenecks and pinpoint which
part of the code is causing the slowdown.  As I mentioned Spark offers
profiling tools like Spark UI or third-party libraries.for this
purpose.

HTH


Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile


 https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner Von Braun)".

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile


 https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner Von Braun)".



On Sun, 17 Mar 2024 at 18:45, Marek Berith  wrote:
>
> Dear community,
> for my diploma thesis, we are implementing a distributed version of
> Fruchterman-Reingold visualization algorithm, using GraphX and Kubernetes. Our
> solution is a backend that continously computes new positions of vertices in a
> graph and sends them via RabbitMQ to a consumer. Fruchterman-Reingold is an
> iterative algorithm, meaning that in each iteration repulsive and attractive
> forces between vertices are computed and then new positions of vertices based
> on those forces are computed. Graph vertices and edges are stored in a GraphX
> graph structure. Forces between vertices are computed using MapReduce(between
> each pair of vertices) and aggregateMessages(for vertices connected via
> edges). After an iteration of the algorithm, the recomputed positions from the
> RDD are serialized using collect and sent to the RabbitMQ queue.
>
> Here comes the issue. The first two iterations of the algorithm seem to be
> quick, but at the third iteration, the algorithm is very slow until it reaches
> a point at which it cannot finish an iteration in real time. It seems like
> caching of the graph may be an issue, because if we serialize the graph after
> each iteration in an array and create new graph from the array in the new
> iteration, we get a constant usage of memory and each iteration takes the same
> amount of time. We had already tried to cache/persist/checkpoint the graph
> after each iteration but it didn't help, so maybe we are doing something
> wrong. We do not think that serializing the graph into an array should be the
> solution for such a complex library like Apache Spark. I'm also not very
> confident how this fix will affect performance for large graphs or in parallel
> environment. We are attaching a short example of code that shows doing an
> iteration of the algorithm, input and output example.
>
> We would appreciate if you could help us fix this issue or give us any
> meaningful ideas, as we had tried everything that came to mind.
>
> We look forward to your reply.
> Thank you, Marek Berith
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[GraphX]: Prevent recomputation of DAG

2024-03-17 Thread Marek Berith

Dear community,
for my diploma thesis, we are implementing a distributed version of 
Fruchterman-Reingold visualization algorithm, using GraphX and Kubernetes. Our 
solution is a backend that continously computes new positions of vertices in a 
graph and sends them via RabbitMQ to a consumer. Fruchterman-Reingold is an 
iterative algorithm, meaning that in each iteration repulsive and attractive 
forces between vertices are computed and then new positions of vertices based 
on those forces are computed. Graph vertices and edges are stored in a GraphX 
graph structure. Forces between vertices are computed using MapReduce(between 
each pair of vertices) and aggregateMessages(for vertices connected via 
edges). After an iteration of the algorithm, the recomputed positions from the 
RDD are serialized using collect and sent to the RabbitMQ queue.


Here comes the issue. The first two iterations of the algorithm seem to be 
quick, but at the third iteration, the algorithm is very slow until it reaches 
a point at which it cannot finish an iteration in real time. It seems like 
caching of the graph may be an issue, because if we serialize the graph after 
each iteration in an array and create new graph from the array in the new 
iteration, we get a constant usage of memory and each iteration takes the same 
amount of time. We had already tried to cache/persist/checkpoint the graph 
after each iteration but it didn't help, so maybe we are doing something 
wrong. We do not think that serializing the graph into an array should be the 
solution for such a complex library like Apache Spark. I'm also not very 
confident how this fix will affect performance for large graphs or in parallel 
environment. We are attaching a short example of code that shows doing an 
iteration of the algorithm, input and output example.


We would appreciate if you could help us fix this issue or give us any 
meaningful ideas, as we had tried everything that came to mind.


We look forward to your reply.
Thank you, Marek Berith
 def iterate(
  sc: SparkContext,
  graph: graphx.Graph[GeneralVertex, EdgeProperty],
  metaGraph: graphx.Graph[GeneralVertex, EdgeProperty])
  : (graphx.Graph[GeneralVertex, EdgeProperty], graphx.Graph[GeneralVertex, 
EdgeProperty]) = {
val attractiveDisplacement: VertexRDD[(VertexId, Vector)] =
  this.calculateAttractiveForces(graph)
val repulsiveDisplacement: RDD[(VertexId, Vector)] = 
this.calculateRepulsiveForces(graph)
val metaVertexDisplacement: RDD[(VertexId, Vector)] =
  this.calculateMetaVertexForces(graph, metaGraph.vertices)
val metaEdgeDisplacement: RDD[(VertexId, Vector)] =
  this.calculateMetaEdgeForces(metaGraph)
val displacements: RDD[(VertexId, Vector)] = this.combineDisplacements(
  attractiveDisplacement,
  repulsiveDisplacement,
  metaVertexDisplacement,
  metaEdgeDisplacement)
val newVertices: RDD[(VertexId, GeneralVertex)] = 
this.displaceVertices(graph, displacements)
val newGraph = graphx.Graph(newVertices, graph.edges)
// persist or checkpoint or cache? or something else?
newGraph.persist()
metaGraph.persist()
(newGraph, metaGraph)
  }

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org