Re: RDD Partitions not distributed evenly to executors

2016-04-05 Thread Khaled Ammar
I have a similar experience.

Using 32 machines, I can see than number of tasks (partitions) assigned to
executors (machines) is not even. Moreover, the distribution change every
stage (iteration).

I wonder why Spark needs to move partitions around any way, should not the
scheduler reduce network (and other IO) overhead by reducing such
relocation.

Thanks,
-Khaled




On Mon, Apr 4, 2016 at 10:57 PM, Koert Kuipers  wrote:

> can you try:
> spark.shuffle.reduceLocality.enabled=false
>
> On Mon, Apr 4, 2016 at 8:17 PM, Mike Hynes <91m...@gmail.com> wrote:
>
>> Dear all,
>>
>> Thank you for your responses.
>>
>> Michael Slavitch:
>> > Just to be sure:  Has spark-env.sh and spark-defaults.conf been
>> correctly propagated to all nodes?  Are they identical?
>> Yes; these files are stored on a shared memory directory accessible to
>> all nodes.
>>
>> Koert Kuipers:
>> > we ran into similar issues and it seems related to the new memory
>> > management. can you try:
>> > spark.memory.useLegacyMode = true
>> I reran the exact same code with a restarted cluster using this
>> modification, and did not observe any difference. The partitioning is
>> still imbalanced.
>>
>> Ted Yu:
>> > If the changes can be ported over to 1.6.1, do you mind reproducing the
>> issue there ?
>> Since the spark.memory.useLegacyMode setting did not impact my code
>> execution, I will have to change the Spark dependency back to earlier
>> versions to see if the issue persists and get back to you.
>>
>> Meanwhile, if anyone else has any other ideas or experience, please let
>> me know.
>>
>> Mike
>>
>> On 4/4/16, Koert Kuipers  wrote:
>> > we ran into similar issues and it seems related to the new memory
>> > management. can you try:
>> > spark.memory.useLegacyMode = true
>> >
>> > On Mon, Apr 4, 2016 at 9:12 AM, Mike Hynes <91m...@gmail.com> wrote:
>> >
>> >> [ CC'ing dev list since nearly identical questions have occurred in
>> >> user list recently w/o resolution;
>> >> c.f.:
>> >>
>> >>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-work-distribution-among-execs-tt26502.html
>> >>
>> >>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Partitions-are-get-placed-on-the-single-node-tt26597.html
>> >> ]
>> >>
>> >> Hello,
>> >>
>> >> In short, I'm reporting a problem concerning load imbalance of RDD
>> >> partitions across a standalone cluster. Though there are 16 cores
>> >> available per node, certain nodes will have >16 partitions, and some
>> >> will correspondingly have <16 (and even 0).
>> >>
>> >> In more detail: I am running some scalability/performance tests for
>> >> vector-type operations. The RDDs I'm considering are simple block
>> >> vectors of type RDD[(Int,Vector)] for a Breeze vector type. The RDDs
>> >> are generated with a fixed number of elements given by some multiple
>> >> of the available cores, and subsequently hash-partitioned by their
>> >> integer block index.
>> >>
>> >> I have verified that the hash partitioning key distribution, as well
>> >> as the keys themselves, are both correct; the problem is truly that
>> >> the partitions are *not* evenly distributed across the nodes.
>> >>
>> >> For instance, here is a representative output for some stages and
>> >> tasks in an iterative program. This is a very simple test with 2
>> >> nodes, 64 partitions, 32 cores (16 per node), and 2 executors. Two
>> >> examples stages from the stderr log are stages 7 and 9:
>> >> 7,mapPartitions at DummyVector.scala:113,64,1459771364404,1459771365272
>> >> 9,mapPartitions at DummyVector.scala:113,64,1459771364431,1459771365639
>> >>
>> >> When counting the location of the partitions on the compute nodes from
>> >> the stderr logs, however, you can clearly see the imbalance. Examples
>> >> lines are:
>> >> 13627 task 0.0 in stage 7.0 (TID 196,
>> >> himrod-2, partition 0,PROCESS_LOCAL, 3987 bytes)&
>> >> 13628 task 1.0 in stage 7.0 (TID 197,
>> >> himrod-2, partition 1,PROCESS_LOCAL, 3987 bytes)&
>> >> 13629 task 2.0 in stage 7.0 (TID 198,
>> >> himrod-2, partition 2,PROCESS_LOCAL, 3987 bytes)&
>> >>
>> >> Grep'ing the full set of above lines for each hostname, himrod-?,
>> >> shows the problem occurs in each stage. Below is the output, where the
>> >> number of partitions stored on each node is given alongside its
>> >> hostname as in (himrod-?,num_partitions):
>> >> Stage 7: (himrod-1,0) (himrod-2,64)
>> >> Stage 9: (himrod-1,16) (himrod-2,48)
>> >> Stage 12: (himrod-1,0) (himrod-2,64)
>> >> Stage 14: (himrod-1,16) (himrod-2,48)
>> >> The imbalance is also visible when the executor ID is used to count
>> >> the partitions operated on by executors.
>> >>
>> >> I am working off a fairly recent modification of 2.0.0-SNAPSHOT branch
>> >> (but the modifications do not touch the scheduler, and are irrelevant
>> >> for these particular tests). Has something changed radically in 1.6+
>> >> that would make a previously (<=1.5) correct configuration go haywire?
>> >> Have new 

GraphX replication factor

2016-04-05 Thread Khaled Ammar
Hi,

I wonder if it is possible to figure out the replication factor used in
GraphX partitioning from its log files.

-- 
Thanks,
-Khaled


Re: Graphx

2016-03-11 Thread Khaled Ammar
This is an interesting discussion,

I have had some success running GraphX on large graphs with more than a
Billion edges using clusters of different size up to 64 machines. However,
the performance goes down when I double the cluster size to reach 128
machines of r3.xlarge. Does any one have experience with very large GraphX
clusters?

@Ovidiu-Cristian, @Alexis and @Alexander, could you please share the
configurations for Spark / GraphX that works best for you?

Thanks,
-Khaled

On Fri, Mar 11, 2016 at 1:25 PM, John Lilley 
wrote:

> We have almost zero node info – just an identifying integer.
>
> *John Lilley*
>
>
>
> *From:* Alexis Roos [mailto:alexis.r...@gmail.com]
> *Sent:* Friday, March 11, 2016 11:24 AM
> *To:* Alexander Pivovarov 
> *Cc:* John Lilley ; Ovidiu-Cristian MARCU <
> ovidiu-cristian.ma...@inria.fr>; lihu ; Andrew A <
> andrew.a...@gmail.com>; u...@spark.incubator.apache.org; Geoff Thompson <
> geoff.thomp...@redpoint.net>
> *Subject:* Re: Graphx
>
>
>
> Also we keep the Node info minimal as needed for connected components and
> rejoin later.
>
>
>
> Alexis
>
>
>
> On Fri, Mar 11, 2016 at 10:12 AM, Alexander Pivovarov <
> apivova...@gmail.com> wrote:
>
> we use it in prod
>
>
>
> 70 boxes, 61GB RAM each
>
>
>
> GraphX Connected Components works fine on 250M Vertices and 1B Edges
> (takes about 5-10 min)
>
>
>
> Spark likes memory, so use r3.2xlarge boxes (61GB)
>
> For example 10 x r3.2xlarge (61GB) work much faster than 20 x r3.xlarge
> (30.5 GB) (especially if you have skewed data)
>
>
>
> Also, use checkpoints before and after Connected Components to reduce DAG
> delays
>
>
>
> You can also try to enable Kryo and register classes used in RDD
>
>
>
>
>
> On Fri, Mar 11, 2016 at 8:07 AM, John Lilley 
> wrote:
>
> I suppose for a 2.6bn case we’d need Long:
>
>
>
> public class GenCCInput {
>
>   public static void main(String[] args) {
>
> if (args.length != 2) {
>
>   System.err.println("Usage: \njava GenCCInput  ");
>
>   System.exit(-1);
>
> }
>
> long edges = Long.parseLong(args[0]);
>
> long groupSize = Long.parseLong(args[1]);
>
> long currentEdge = 1;
>
> long currentGroupSize = 0;
>
> for (long i = 0; i < edges; i++) {
>
>   System.out.println(currentEdge + " " + (currentEdge + 1));
>
>   if (currentGroupSize == 0) {
>
> currentGroupSize = 2;
>
>   } else {
>
> currentGroupSize++;
>
>   }
>
>   if (currentGroupSize >= groupSize) {
>
> currentGroupSize = 0;
>
> currentEdge += 2;
>
>   } else {
>
> currentEdge++;
>
>   }
>
> }
>
>   }
>
> }
>
>
>
> *John Lilley*
>
> Chief Architect, RedPoint Global Inc.
>
> T: +1 303 541 1516  *| *M: +1 720 938 5761 *|* F: +1 781-705-2077
>
> Skype: jlilley.redpoint *|* john.lil...@redpoint.net *|* www.redpoint.net
>
>
>
> *From:* John Lilley [mailto:john.lil...@redpoint.net]
> *Sent:* Friday, March 11, 2016 8:46 AM
> *To:* Ovidiu-Cristian MARCU 
> *Cc:* lihu ; Andrew A ;
> u...@spark.incubator.apache.org; Geoff Thompson <
> geoff.thomp...@redpoint.net>
> *Subject:* RE: Graphx
>
>
>
> Ovidiu,
>
>
>
> IMHO, this is one of the biggest issues facing GraphX and Spark.  There
> are a lot of knobs and levers to pull to affect performance, with very
> little guidance about which settings work in general.  We cannot ship
> software that requires end-user tuning; it just has to work.  Unfortunately
> GraphX seems very sensitive to working set size relative to available RAM
> and fails catastrophically as opposed to gracefully when working set is too
> large.  It is also very sensitive to the nature of the data.  For example,
> if we build a test file with input-edge representation like:
>
> 1 2
>
> 2 3
>
> 3 4
>
> 5 6
>
> 6 7
>
> 7 8
>
> …
>
> this represents a graph with connected components in groups of four.  We
> found experimentally that when this data in input in clustered order, the
> required memory is lower and runtime is much faster than when data is input
> in random order.  This makes intuitive sense because of the additional
> communication required for the random order.
>
>
>
> Our 1bn-edge test case was of this same form, input in clustered order,
> with groups of 10 vertices per component.  It failed at 8 x 60GB.  This is
> the kind of data that our application processes, so it is a realistic test
> for us.  I’ve found that social media test data sets tend to follow
> power-law distributions, and that GraphX has much less problem with them.
>
>
>
> A comparable test scaled to your cluster (16 x 80GB) would be 2.6bn edges
> in 10-vertex components using the synthetic test input I describe above.  I
> would be curious to know if this works and what settings you use to
> succeed, and if it continues to succeed for random input order.
>
>
>
> As for 

GraphX stopped without finishing and with no ERRORs !

2015-11-18 Thread Khaled Ammar
Hi all,

I have a problem running some algorithms on GraphX. Occasionally, it
stopped running without any errors. The task state is FINISHED, but the
executers state is KILLED. However, I can see that one job is not finished
yet. It took too much time (minutes) while every job/iteration were
typically finished in few seconds.

I am using Spark 1.5.2, I appreciate any suggestions or recommendation to
fix this or check it further.

-- 
Thanks,
-Khaled


"Master: got disassociated, removing it."

2015-11-05 Thread Khaled Ammar
Hi,

I am using GRAPHX in standalone SPARK 1.5.1 in a medium size cluster (64+1).

I could execute PageRank with large number of iterations on this cluster.
However, when I run SSSP, it always fail at iteration 23 or 24. This is
always at after about 11 mins. Note that PageRank takes more than that.

*These are the messages in the log file:*

15/11/06 04:52:41 INFO Master: 172.31.27.133:59109 got disassociated,
removing it.
15/11/06 04:52:41 INFO Master: Removing app app-20151106043327-
15/11/06 04:52:41 WARN ReliableDeliverySupervisor: Association with remote
system [akka.tcp://sparkDriver@172.31.27.133:59109] has failed, address is
now gated for [5000] ms. Reason is: [Disassociated].


I tried to increase akka and spark network timeout to 100 and 600, with no
help.

I appreciate any comment or suggestion !

-- 
Thanks,
-Khaled


Re: Why some executors are lazy?

2015-11-04 Thread Khaled Ammar
Thank you Adrian,

The dataset is indeed skewed. My concern was that some executors do not
participate in computation at all. I understand that executors finish tasks
sequentially. Therefore, using more executors allow for better parallelism.

I managed to force all executors to participate by increasing number of
partitions. My guess is, the scheduler preferred to reduce number of
machines participating in the computation to decrease network overhead.

Do you think my analysis is correct? How should one decide on number of
partitions? Does it depend on the workload or dataset or both ?

Thanks,
-Khaled


On Wed, Nov 4, 2015 at 7:21 AM, Adrian Tanase <atan...@adobe.com> wrote:

> If some of the operations required involve shuffling and partitioning, it
> might mean that the data set is skewed to specific partitions which will
> create hot spotting on certain executors.
>
> -adrian
>
> From: Khaled Ammar
> Date: Tuesday, November 3, 2015 at 11:43 PM
> To: "user@spark.apache.org"
> Subject: Why some executors are lazy?
>
> Hi,
>
> I'm using the most recent Spark version on a standalone setup of 16+1
> machines.
>
> While running GraphX workloads, I found that some executors are lazy? They
> *rarely* participate in computation. This causes some other executors to do
> their work. This behavior is consistent in all iterations and even in the
> data loading step. Only two specific executors do not participate in most
> computations.
>
>
> Does any one know how to fix that?
>
>
> *More details:*
> Each machine has 4 cores. I set number of partitions to be 3*16. Each
> executor was supposed to do 3 tasks, but few of them end up working on 4
> task instead, which causes delay in computation.
>
>
>
> --
> Thanks,
> -Khaled
>



-- 
Thanks,
-Khaled


Why some executors are lazy?

2015-11-03 Thread Khaled Ammar
Hi,

I'm using the most recent Spark version on a standalone setup of 16+1
machines.

While running GraphX workloads, I found that some executors are lazy? They
*rarely* participate in computation. This causes some other executors to do
their work. This behavior is consistent in all iterations and even in the
data loading step. Only two specific executors do not participate in most
computations.


Does any one know how to fix that?


*More details:*
Each machine has 4 cores. I set number of partitions to be 3*16. Each
executor was supposed to do 3 tasks, but few of them end up working on 4
task instead, which causes delay in computation.



-- 
Thanks,
-Khaled


What does "write time" means exactly in Spark UI?

2015-11-03 Thread Khaled Ammar
Hi,

I wonder what does write time means exactly?

I run GraphX workloads and noticed the main bottleneck in most stages is
one or two tasks takes too long in "write time" and delay the whole job.
Enabling speculation helps a little but I am still interested to know how
to fix that?

I use MEMORY_ONLY, spill to disk is turned off.


-- 
Thanks,
-Khaled


Performance issues in SSSP using GraphX

2015-10-30 Thread Khaled Ammar
Hi all,

I have an interesting behavior from GraphX while running SSSP. I use the
stand-alone mode with 16+1 machines, each has 30GB memory and 4 cores. The
dataset is 63GB. However, the input for some stages is huge, about 16 TB !

The computation takes very long time. I stopped it.

For your information, I use the same SSSP code mentioned in the GraphX
documentation:
http://spark.apache.org/docs/latest/graphx-programming-guide.html#pregel-api

I use StorageLevel.MEMORY_ONLY since I have plenty of memory.

I appreciate your comment/help about this issue.

-- 
Thanks,
-Khaled

[image: Inline image 1]


NaN in GraphX PageRank answer

2015-08-18 Thread Khaled Ammar
Hi all,

I was trying to use GraphX to compute pagerank and found that pagerank
value for several vertices is NaN.

I am using Spark 1.3. Any idea how to fix that?

-- 
Thanks,
-Khaled


Fwd: Performance questions regarding Spark 1.3 standalone mode

2015-07-27 Thread Khaled Ammar
Hi all,

I wonder if any one has an explanation for this behavior.

Thank you,
-Khaled

-- Forwarded message --
From: Khaled Ammar khaled.am...@gmail.com
Date: Fri, Jul 24, 2015 at 9:35 AM
Subject: Performance questions regarding Spark 1.3 standalone mode
To: user@spark.apache.org


Hi all,

I have a standalone spark cluster setup on EC2 machines. I did the setup
manually without the ec2 scripts. I have two questions about Spark/GraphX
performance:

1) When I run the PageRank example, the storage tab does not show that all
RDDs are cached. Only one RDD is 100% cached, but the remaining range from
25% to 97%. Kindly note there is enough memory to cache all RDDs.

2) I noticed that loading the dataset partitions, total of 25 GB, is not
always evenly distributed to executors. Occasionally, one or two executor
become responsible for loading several partitions, while others are loading
only 1 partition. Does any one know the reason behind this behavior? Is it
a bug, or it is possible to fix this using configuration parameters.

-- 
Thanks,
-Khaled



-- 
Thanks,
-Khaled


Performance questions regarding Spark 1.3 standalone mode

2015-07-24 Thread Khaled Ammar
Hi all,

I have a standalone spark cluster setup on EC2 machines. I did the setup
manually without the ec2 scripts. I have two questions about Spark/GraphX
performance:

1) When I run the PageRank example, the storage tab does not show that all
RDDs are cached. Only one RDD is 100% cached, but the remaining range from
25% to 97%. Kindly note there is enough memory to cache all RDDs.

2) I noticed that loading the dataset partitions, total of 25 GB, is not
always evenly distributed to executors. Occasionally, one or two executor
become responsible for loading several partitions, while others are loading
only 1 partition. Does any one know the reason behind this behavior? Is it
a bug, or it is possible to fix this using configuration parameters.

-- 
Thanks,
-Khaled


Re: GraphX Synth Benchmark

2015-07-09 Thread Khaled Ammar
Hi,

I am not a spark expert but I found that passing a small partitions value
might help. Try to use this option --numEPart=$partitions where
partitions=3 (number of workers) or at most 3*40 (total number of cores).

Thanks,
-Khaled

On Thu, Jul 9, 2015 at 11:37 AM, AshutoshRaghuvanshi 
ashutosh.raghuvans...@gmail.com wrote:

 I am running spark cluster over ssh in standalone mode,

 I have run pagerank LiveJounral example:

 MASTER=spark://172.17.27.12:7077 bin/run-example graphx.SynthBenchmark
 -app=pagerank -niters=100 -nverts=4847571  Output/soc-liveJounral.txt

 its been running for more than 2hours, I guess this is not normal, what am
 i
 doing wrong?

 system details:
 4 nodes (1+3)
 40 cores each, 64G memory out of which I have given spark.executer 50G

 one more this I notice one of the server is used more than others.

 Please help ASAP.

 Thank you
 http://apache-spark-user-list.1001560.n3.nabble.com/file/n23747/13.png



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-Synth-Benchmark-tp23747.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Thanks,
-Khaled


Question about master memory requirement and GraphX pagerank performance !

2015-07-07 Thread Khaled Ammar
Hi all,

I am fairly new to spark and wonder if you can help me. I am exploring
GraphX/Spark by running the pagerank example on a medium size graph (12 GB)
using this command:

My cluster is 1+16 machines, the master has 15 GB memory and each worker
has 30 GB. The master has 2 cores and each worker has 4 cores.

/home/ubuntu/spark-1.3.0/bin/spark-submit --master spark://Master IP:7077
--class org.apache.spark.examples.graphx.Analytics
/home/ubuntu/spark-1.3.0/examples/target/scala-2.10/spark-examples-1.3.0-hadoop1.0.4.jar
pagerank /user/ubuntu/input/dataset --numEPart=64
--output=/user/ubuntu/spark/16_pagerank --numIter=30


I have two questions:

1- When I set SPARK_EXECUTOR_MEMORY=25000M, I received errors because
master cannot allocate this memory since the launched task includes -Xms
25000M. Based on my understanding, the master does not do any computation
and this executor memory is only required in the worker machines. Why the
application cannot start without allocating all required memory in the
master as well as in all workers. !

2- I changed the executor memory to 15 GB and the application worked fine.
However, it did not finish the thirty iterations after 7 hours. There is
one that was taking 4+ hours, and its input is 400+ GB. I must be doing
something wrong, any comment?

-- 
Thanks,
-Khaled Ammar
www.khaledammar.com


Basic GraphX deployment and usage question

2015-03-16 Thread Khaled Ammar
Hi,

I'm very new to Spark and GraphX. I downloaded and configured Spark on a
cluster, which uses Hadoop 1.x. The master UI shows all workers. The
example command run-example SparkPi works fine and completes
successfully.

I'm interested in GraphX. Although the documentation says it is built-in
with Spark, I could not find any GraphX jar files under lib. I also
wonder if any of the algorithms mentioned in GraphX programming guide page
is pre-combiled and available for testing.

My main objective is to ensure that at least one correct graph application
is working with no errors using GraphX, before I start writing my own.

-- 
Thanks,
-Khaled