Re: Spark - Timeout Issues - OutOfMemoryError

2015-05-04 Thread ๏̯͡๏
Hello Dean  Others,
Thanks for the response.

I tried with 100,200, 400, 600 and 1200 repartitions with 100,200,400 and
800 executors. Each time all the tasks of join complete in less than a
minute except one and that one tasks runs forever. I have a huge cluster at
my disposal.

The data for each of 1199 tasks is around 40MB/30k records and for 1 never
ending task is 1.5G/98million records. I see that there is data skew among
tasks. I had observed this a week earlier and i have no clue on how to fix
it and when someone suggested that repartition might make things more
parallel, but the problem is still persistent.

Please suggest on how to get the task to complete.
All i want to do is join two datasets. (dataset1 is in sequence file and
dataset2 is in avro format).



Ex:
Tasks IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch Time
DurationGC TimeShuffle Read Size / RecordsShuffle Spill (Memory)Shuffle
Spill (Disk)Errors  0 3771 0 RUNNING PROCESS_LOCAL 114 / host1 2015/05/04
01:27:44 7.3 min  19 s  1591.2 MB / 98931767  0.0 B 0.0 B   1 3772 0 SUCCESS
PROCESS_LOCAL 226 / host2 2015/05/04 01:27:44 28 s  2 s  39.2 MB / 29754
0.0 B 0.0 B   2 3773 0 SUCCESS PROCESS_LOCAL 283 / host3 2015/05/04 01:27:44 26
s  2 s  39.0 MB / 29646  0.0 B 0.0 B   5 3776 0 SUCCESS PROCESS_LOCAL 320
/ host4 2015/05/04 01:27:44 31 s  3 s  38.8 MB / 29512  0.0 B 0.0 B   4 3775
0 SUCCESS PROCESS_LOCAL 203 / host5 2015/05/04 01:27:44 41 s  3 s  38.4 MB
/ 29169  0.0 B 0.0 B   3 3774 0 SUCCESS PROCESS_LOCAL 84 / host6 2015/05/04
01:27:44 24 s  2 s  38.5 MB / 29258  0.0 B 0.0 B   8 3779 0 SUCCESS
PROCESS_LOCAL 309 / host7 2015/05/04 01:27:44 31 s  4 s  39.5 MB / 30008
0.0 B 0.0 B

There are 1200 tasks in total.


On Sun, May 3, 2015 at 9:53 PM, Dean Wampler deanwamp...@gmail.com wrote:

 I don't know the full context of what you're doing, but serialization
 errors usually mean you're attempting to serialize something that can't be
 serialized, like the SparkContext. Kryo won't help there.

 The arguments to spark-submit you posted previously look good:

 2)  --num-executors 96 --driver-memory 12g --driver-java-options
 -XX:MaxPermSize=10G --executor-memory 12g --executor-cores 4

 I suspect you aren't getting the parallelism you need. For partitioning,
 if your data is in HDFS and your block size is 128MB, then you'll get ~195
 partitions anyway. If it takes 7 hours to do a join over 25GB of data, you
 have some other serious bottleneck. You should examine the web console and
 the logs to determine where all the time is going. Questions you might
 pursue:

- How long does each task take to complete?
- How many of those 195 partitions/tasks are processed at the same
time? That is, how many slots are available?  Maybe you need more nodes
if the number of slots is too low. Based on your command arguments, you
should be able to process 1/2 of them at a time, unless the cluster is 
 busy.
- Is the cluster swamped with other work?
- How much data does each task process? Is the data roughly the same
from one task to the next? If not, then you might have serious key skew?

 You may also need to research the details of how joins are implemented and
 some of the common tricks for organizing data to minimize having to shuffle
 all N by M records.



 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Sun, May 3, 2015 at 11:02 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 Hello Deam,
 If I don;t use Kryo serializer i got Serialization error and hence am
 using it.
 If I don';t use partitionBy/reparition then the simply join never
 completed even after 7 hours and infact as next step i need to run it
 against 250G as that is my full dataset size. Someone here suggested to me
 to use repartition.

 Assuming reparition is mandatory , how do i decide whats the right number
 ? When i am using 400 i do not get NullPointerException that i talked
 about, which is strange. I never saw that exception against small random
 dataset but see it with 25G and again with 400 partitions , i do not see it.


 On Sun, May 3, 2015 at 9:15 PM, Dean Wampler deanwamp...@gmail.com
 wrote:

 IMHO, you are trying waaay to hard to optimize work on what is really a
 small data set. 25G, even 250G, is not that much data, especially if you've
 spent a month trying to get something to work that should be simple. All
 these errors are from optimization attempts.

 Kryo is great, but if it's not working reliably for some reason, then
 don't use it. Rather than force 200 partitions, let Spark try to figure out
 a good-enough number. (If you really need to force a partition count, use
 the repartition method instead, unless you're overriding the partitioner.)

 So. I recommend that you eliminate all the optimizations: Kryo,
 partitionBy, etc. Just use the simplest 

Re: Spark - Timeout Issues - OutOfMemoryError

2015-05-04 Thread Saisai Shao
IMHO If your data or your algorithm is prone to data skew, I think you have
to fix this from application level, Spark itself cannot overcome this
problem (if one key has large amount of values), you may change your
algorithm to choose another shuffle key, somethings like this to avoid
shuffle on skewed keys.

2015-05-04 16:41 GMT+08:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com:

 Hello Dean  Others,
 Thanks for the response.

 I tried with 100,200, 400, 600 and 1200 repartitions with 100,200,400 and
 800 executors. Each time all the tasks of join complete in less than a
 minute except one and that one tasks runs forever. I have a huge cluster at
 my disposal.

 The data for each of 1199 tasks is around 40MB/30k records and for 1 never
 ending task is 1.5G/98million records. I see that there is data skew among
 tasks. I had observed this a week earlier and i have no clue on how to fix
 it and when someone suggested that repartition might make things more
 parallel, but the problem is still persistent.

 Please suggest on how to get the task to complete.
 All i want to do is join two datasets. (dataset1 is in sequence file and
 dataset2 is in avro format).



 Ex:
 Tasks IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch Time
 DurationGC TimeShuffle Read Size / RecordsShuffle Spill (Memory)Shuffle
 Spill (Disk)Errors  0 3771 0 RUNNING PROCESS_LOCAL 114 / host1 2015/05/04
 01:27:44 7.3 min  19 s  1591.2 MB / 98931767  0.0 B 0.0 B   1 3772 0
 SUCCESS PROCESS_LOCAL 226 / host2 2015/05/04 01:27:44 28 s  2 s  39.2 MB
 / 29754  0.0 B 0.0 B   2 3773 0 SUCCESS PROCESS_LOCAL 283 / host3 2015/05/04
 01:27:44 26 s  2 s  39.0 MB / 29646  0.0 B 0.0 B   5 3776 0 SUCCESS
 PROCESS_LOCAL 320 / host4 2015/05/04 01:27:44 31 s  3 s  38.8 MB / 29512
 0.0 B 0.0 B   4 3775 0 SUCCESS PROCESS_LOCAL 203 / host5 2015/05/04
 01:27:44 41 s  3 s  38.4 MB / 29169  0.0 B 0.0 B   3 3774 0 SUCCESS
 PROCESS_LOCAL 84 / host6 2015/05/04 01:27:44 24 s  2 s  38.5 MB / 29258
 0.0 B 0.0 B   8 3779 0 SUCCESS PROCESS_LOCAL 309 / host7 2015/05/04
 01:27:44 31 s  4 s  39.5 MB / 30008  0.0 B 0.0 B

 There are 1200 tasks in total.


 On Sun, May 3, 2015 at 9:53 PM, Dean Wampler deanwamp...@gmail.com
 wrote:

 I don't know the full context of what you're doing, but serialization
 errors usually mean you're attempting to serialize something that can't be
 serialized, like the SparkContext. Kryo won't help there.

 The arguments to spark-submit you posted previously look good:

 2)  --num-executors 96 --driver-memory 12g --driver-java-options
 -XX:MaxPermSize=10G --executor-memory 12g --executor-cores 4

 I suspect you aren't getting the parallelism you need. For partitioning,
 if your data is in HDFS and your block size is 128MB, then you'll get ~195
 partitions anyway. If it takes 7 hours to do a join over 25GB of data, you
 have some other serious bottleneck. You should examine the web console and
 the logs to determine where all the time is going. Questions you might
 pursue:

- How long does each task take to complete?
- How many of those 195 partitions/tasks are processed at the same
time? That is, how many slots are available?  Maybe you need more nodes
if the number of slots is too low. Based on your command arguments, you
should be able to process 1/2 of them at a time, unless the cluster is 
 busy.
- Is the cluster swamped with other work?
- How much data does each task process? Is the data roughly the same
from one task to the next? If not, then you might have serious key skew?

 You may also need to research the details of how joins are implemented
 and some of the common tricks for organizing data to minimize having to
 shuffle all N by M records.



 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Sun, May 3, 2015 at 11:02 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 Hello Deam,
 If I don;t use Kryo serializer i got Serialization error and hence am
 using it.
 If I don';t use partitionBy/reparition then the simply join never
 completed even after 7 hours and infact as next step i need to run it
 against 250G as that is my full dataset size. Someone here suggested to me
 to use repartition.

 Assuming reparition is mandatory , how do i decide whats the right
 number ? When i am using 400 i do not get NullPointerException that i
 talked about, which is strange. I never saw that exception against small
 random dataset but see it with 25G and again with 400 partitions , i do not
 see it.


 On Sun, May 3, 2015 at 9:15 PM, Dean Wampler deanwamp...@gmail.com
 wrote:

 IMHO, you are trying waaay to hard to optimize work on what is really a
 small data set. 25G, even 250G, is not that much data, especially if you've
 spent a month trying to get something to work that should be simple. All
 these errors are from optimization attempts.


Re: Spark - Timeout Issues - OutOfMemoryError

2015-05-04 Thread ๏̯͡๏
Hello Shao,
Can you talk more about shuffle key or point me to APIs that allow me to
change shuffle key. I will try with different keys and see the performance.

What is the shuffle key by default ?

On Mon, May 4, 2015 at 2:37 PM, Saisai Shao sai.sai.s...@gmail.com wrote:

 IMHO If your data or your algorithm is prone to data skew, I think you
 have to fix this from application level, Spark itself cannot overcome this
 problem (if one key has large amount of values), you may change your
 algorithm to choose another shuffle key, somethings like this to avoid
 shuffle on skewed keys.

 2015-05-04 16:41 GMT+08:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com:

 Hello Dean  Others,
 Thanks for the response.

 I tried with 100,200, 400, 600 and 1200 repartitions with 100,200,400 and
 800 executors. Each time all the tasks of join complete in less than a
 minute except one and that one tasks runs forever. I have a huge cluster at
 my disposal.

 The data for each of 1199 tasks is around 40MB/30k records and for 1
 never ending task is 1.5G/98million records. I see that there is data skew
 among tasks. I had observed this a week earlier and i have no clue on how
 to fix it and when someone suggested that repartition might make things
 more parallel, but the problem is still persistent.

 Please suggest on how to get the task to complete.
 All i want to do is join two datasets. (dataset1 is in sequence file and
 dataset2 is in avro format).



 Ex:
 Tasks IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch Time
 DurationGC TimeShuffle Read Size / RecordsShuffle Spill (Memory)Shuffle
 Spill (Disk)Errors  0 3771 0 RUNNING PROCESS_LOCAL 114 / host1 2015/05/04
 01:27:44 7.3 min  19 s  1591.2 MB / 98931767  0.0 B 0.0 B   1 3772 0
 SUCCESS PROCESS_LOCAL 226 / host2 2015/05/04 01:27:44 28 s  2 s  39.2 MB
 / 29754  0.0 B 0.0 B   2 3773 0 SUCCESS PROCESS_LOCAL 283 / host3 2015/05/04
 01:27:44 26 s  2 s  39.0 MB / 29646  0.0 B 0.0 B   5 3776 0 SUCCESS
 PROCESS_LOCAL 320 / host4 2015/05/04 01:27:44 31 s  3 s  38.8 MB / 29512
 0.0 B 0.0 B   4 3775 0 SUCCESS PROCESS_LOCAL 203 / host5 2015/05/04
 01:27:44 41 s  3 s  38.4 MB / 29169  0.0 B 0.0 B   3 3774 0 SUCCESS
 PROCESS_LOCAL 84 / host6 2015/05/04 01:27:44 24 s  2 s  38.5 MB / 29258
 0.0 B 0.0 B   8 3779 0 SUCCESS PROCESS_LOCAL 309 / host7 2015/05/04
 01:27:44 31 s  4 s  39.5 MB / 30008  0.0 B 0.0 B

 There are 1200 tasks in total.


 On Sun, May 3, 2015 at 9:53 PM, Dean Wampler deanwamp...@gmail.com
 wrote:

 I don't know the full context of what you're doing, but serialization
 errors usually mean you're attempting to serialize something that can't be
 serialized, like the SparkContext. Kryo won't help there.

 The arguments to spark-submit you posted previously look good:

 2)  --num-executors 96 --driver-memory 12g --driver-java-options
 -XX:MaxPermSize=10G --executor-memory 12g --executor-cores 4

 I suspect you aren't getting the parallelism you need. For partitioning,
 if your data is in HDFS and your block size is 128MB, then you'll get ~195
 partitions anyway. If it takes 7 hours to do a join over 25GB of data, you
 have some other serious bottleneck. You should examine the web console and
 the logs to determine where all the time is going. Questions you might
 pursue:

- How long does each task take to complete?
- How many of those 195 partitions/tasks are processed at the same
time? That is, how many slots are available?  Maybe you need more nodes
if the number of slots is too low. Based on your command arguments, you
should be able to process 1/2 of them at a time, unless the cluster is 
 busy.
- Is the cluster swamped with other work?
- How much data does each task process? Is the data roughly the same
from one task to the next? If not, then you might have serious key skew?

 You may also need to research the details of how joins are implemented
 and some of the common tricks for organizing data to minimize having to
 shuffle all N by M records.



 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Sun, May 3, 2015 at 11:02 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 Hello Deam,
 If I don;t use Kryo serializer i got Serialization error and hence am
 using it.
 If I don';t use partitionBy/reparition then the simply join never
 completed even after 7 hours and infact as next step i need to run it
 against 250G as that is my full dataset size. Someone here suggested to me
 to use repartition.

 Assuming reparition is mandatory , how do i decide whats the right
 number ? When i am using 400 i do not get NullPointerException that i
 talked about, which is strange. I never saw that exception against small
 random dataset but see it with 25G and again with 400 partitions , i do not
 see it.


 On Sun, May 3, 2015 at 9:15 PM, Dean Wampler 

Re: Spark - Timeout Issues - OutOfMemoryError

2015-05-04 Thread Saisai Shao
Shuffle key is depending on your implementation, I'm not sure if you are
familiar with MapReduce, the mapper output is a key-value pair, where the
key is the shuffle key for shuffling, Spark is also the same.

2015-05-04 17:31 GMT+08:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com:

 Hello Shao,
 Can you talk more about shuffle key or point me to APIs that allow me to
 change shuffle key. I will try with different keys and see the performance.

 What is the shuffle key by default ?

 On Mon, May 4, 2015 at 2:37 PM, Saisai Shao sai.sai.s...@gmail.com
 wrote:

 IMHO If your data or your algorithm is prone to data skew, I think you
 have to fix this from application level, Spark itself cannot overcome this
 problem (if one key has large amount of values), you may change your
 algorithm to choose another shuffle key, somethings like this to avoid
 shuffle on skewed keys.

 2015-05-04 16:41 GMT+08:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com:

 Hello Dean  Others,
 Thanks for the response.

 I tried with 100,200, 400, 600 and 1200 repartitions with 100,200,400
 and 800 executors. Each time all the tasks of join complete in less than a
 minute except one and that one tasks runs forever. I have a huge cluster at
 my disposal.

 The data for each of 1199 tasks is around 40MB/30k records and for 1
 never ending task is 1.5G/98million records. I see that there is data skew
 among tasks. I had observed this a week earlier and i have no clue on how
 to fix it and when someone suggested that repartition might make things
 more parallel, but the problem is still persistent.

 Please suggest on how to get the task to complete.
 All i want to do is join two datasets. (dataset1 is in sequence file and
 dataset2 is in avro format).



 Ex:
 Tasks IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch Time
 DurationGC TimeShuffle Read Size / RecordsShuffle Spill (Memory)Shuffle
 Spill (Disk)Errors  0 3771 0 RUNNING PROCESS_LOCAL 114 / host1 2015/05/04
 01:27:44 7.3 min  19 s  1591.2 MB / 98931767  0.0 B 0.0 B   1 3772 0
 SUCCESS PROCESS_LOCAL 226 / host2 2015/05/04 01:27:44 28 s  2 s  39.2
 MB / 29754  0.0 B 0.0 B   2 3773 0 SUCCESS PROCESS_LOCAL 283 / host3 
 2015/05/04
 01:27:44 26 s  2 s  39.0 MB / 29646  0.0 B 0.0 B   5 3776 0 SUCCESS
 PROCESS_LOCAL 320 / host4 2015/05/04 01:27:44 31 s  3 s  38.8 MB /
 29512  0.0 B 0.0 B   4 3775 0 SUCCESS PROCESS_LOCAL 203 / host5 2015/05/04
 01:27:44 41 s  3 s  38.4 MB / 29169  0.0 B 0.0 B   3 3774 0 SUCCESS
 PROCESS_LOCAL 84 / host6 2015/05/04 01:27:44 24 s  2 s  38.5 MB / 29258
 0.0 B 0.0 B   8 3779 0 SUCCESS PROCESS_LOCAL 309 / host7 2015/05/04
 01:27:44 31 s  4 s  39.5 MB / 30008  0.0 B 0.0 B

 There are 1200 tasks in total.


 On Sun, May 3, 2015 at 9:53 PM, Dean Wampler deanwamp...@gmail.com
 wrote:

 I don't know the full context of what you're doing, but serialization
 errors usually mean you're attempting to serialize something that can't be
 serialized, like the SparkContext. Kryo won't help there.

 The arguments to spark-submit you posted previously look good:

 2)  --num-executors 96 --driver-memory 12g --driver-java-options
 -XX:MaxPermSize=10G --executor-memory 12g --executor-cores 4

 I suspect you aren't getting the parallelism you need. For
 partitioning, if your data is in HDFS and your block size is 128MB, then
 you'll get ~195 partitions anyway. If it takes 7 hours to do a join over
 25GB of data, you have some other serious bottleneck. You should examine
 the web console and the logs to determine where all the time is going.
 Questions you might pursue:

- How long does each task take to complete?
- How many of those 195 partitions/tasks are processed at the same
time? That is, how many slots are available?  Maybe you need more 
 nodes
if the number of slots is too low. Based on your command arguments, you
should be able to process 1/2 of them at a time, unless the cluster is 
 busy.
- Is the cluster swamped with other work?
- How much data does each task process? Is the data roughly the
same from one task to the next? If not, then you might have serious key
skew?

 You may also need to research the details of how joins are implemented
 and some of the common tricks for organizing data to minimize having to
 shuffle all N by M records.



 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Sun, May 3, 2015 at 11:02 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 Hello Deam,
 If I don;t use Kryo serializer i got Serialization error and hence am
 using it.
 If I don';t use partitionBy/reparition then the simply join never
 completed even after 7 hours and infact as next step i need to run it
 against 250G as that is my full dataset size. Someone here suggested to me
 to use repartition.

 Assuming reparition is mandatory , how do i decide whats the right
 number ? 

Re: Spark - Timeout Issues - OutOfMemoryError

2015-05-04 Thread ๏̯͡๏
One dataset (RDD Pair)

val lstgItem = listings.map { lstg = (lstg.getItemId().toLong, lstg) }

Second Dataset (RDDPair)

val viEvents = viEventsRaw.map { vi = (vi.get(14).asInstanceOf[Long], vi) }

As i want to join based on item Id that is used as first element in the
tuple in both cases and i think thats what is shuffle key.

listings == Data set contains all the unique item ids that are ever listed
on the ecommerce site.

viEvents === List of items viewed by user in last day. This will always be
a subset of the total set.

So i do not understand what is data skewness. When my long running task is
working on 1591.2 MB / 98,931,767 does that mean 98 million reocrds contain
all the same item ID ? How can millions of user look at the same item in
last day ?

Or does this dataset contain records across item ids ?


Regards,

Deepak




On Mon, May 4, 2015 at 3:08 PM, Saisai Shao sai.sai.s...@gmail.com wrote:

 Shuffle key is depending on your implementation, I'm not sure if you are
 familiar with MapReduce, the mapper output is a key-value pair, where the
 key is the shuffle key for shuffling, Spark is also the same.

 2015-05-04 17:31 GMT+08:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com:

 Hello Shao,
 Can you talk more about shuffle key or point me to APIs that allow me to
 change shuffle key. I will try with different keys and see the performance.

 What is the shuffle key by default ?

 On Mon, May 4, 2015 at 2:37 PM, Saisai Shao sai.sai.s...@gmail.com
 wrote:

 IMHO If your data or your algorithm is prone to data skew, I think you
 have to fix this from application level, Spark itself cannot overcome this
 problem (if one key has large amount of values), you may change your
 algorithm to choose another shuffle key, somethings like this to avoid
 shuffle on skewed keys.

 2015-05-04 16:41 GMT+08:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com:

 Hello Dean  Others,
 Thanks for the response.

 I tried with 100,200, 400, 600 and 1200 repartitions with 100,200,400
 and 800 executors. Each time all the tasks of join complete in less than a
 minute except one and that one tasks runs forever. I have a huge cluster at
 my disposal.

 The data for each of 1199 tasks is around 40MB/30k records and for 1
 never ending task is 1.5G/98million records. I see that there is data skew
 among tasks. I had observed this a week earlier and i have no clue on how
 to fix it and when someone suggested that repartition might make things
 more parallel, but the problem is still persistent.

 Please suggest on how to get the task to complete.
 All i want to do is join two datasets. (dataset1 is in sequence file
 and dataset2 is in avro format).



 Ex:
 Tasks IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch Time
 DurationGC TimeShuffle Read Size / RecordsShuffle Spill (Memory)Shuffle
 Spill (Disk)Errors  0 3771 0 RUNNING PROCESS_LOCAL 114 / host1 2015/05/04
 01:27:44 7.3 min  19 s  1591.2 MB / 98931767  0.0 B 0.0 B   1 3772 0
 SUCCESS PROCESS_LOCAL 226 / host2 2015/05/04 01:27:44 28 s  2 s  39.2
 MB / 29754  0.0 B 0.0 B   2 3773 0 SUCCESS PROCESS_LOCAL 283 / host3 
 2015/05/04
 01:27:44 26 s  2 s  39.0 MB / 29646  0.0 B 0.0 B   5 3776 0 SUCCESS
 PROCESS_LOCAL 320 / host4 2015/05/04 01:27:44 31 s  3 s  38.8 MB /
 29512  0.0 B 0.0 B   4 3775 0 SUCCESS PROCESS_LOCAL 203 / host5 2015/05/04
 01:27:44 41 s  3 s  38.4 MB / 29169  0.0 B 0.0 B   3 3774 0 SUCCESS
 PROCESS_LOCAL 84 / host6 2015/05/04 01:27:44 24 s  2 s  38.5 MB /
 29258  0.0 B 0.0 B   8 3779 0 SUCCESS PROCESS_LOCAL 309 / host7 2015/05/04
 01:27:44 31 s  4 s  39.5 MB / 30008  0.0 B 0.0 B

 There are 1200 tasks in total.


 On Sun, May 3, 2015 at 9:53 PM, Dean Wampler deanwamp...@gmail.com
 wrote:

 I don't know the full context of what you're doing, but serialization
 errors usually mean you're attempting to serialize something that can't be
 serialized, like the SparkContext. Kryo won't help there.

 The arguments to spark-submit you posted previously look good:

 2)  --num-executors 96 --driver-memory 12g --driver-java-options
 -XX:MaxPermSize=10G --executor-memory 12g --executor-cores 4

 I suspect you aren't getting the parallelism you need. For
 partitioning, if your data is in HDFS and your block size is 128MB, then
 you'll get ~195 partitions anyway. If it takes 7 hours to do a join over
 25GB of data, you have some other serious bottleneck. You should examine
 the web console and the logs to determine where all the time is going.
 Questions you might pursue:

- How long does each task take to complete?
- How many of those 195 partitions/tasks are processed at the same
time? That is, how many slots are available?  Maybe you need more 
 nodes
if the number of slots is too low. Based on your command arguments, you
should be able to process 1/2 of them at a time, unless the cluster is 
 busy.
- Is the cluster swamped with other work?
- How much data does each task process? Is the data roughly the
same from one task to the next? If not, then you 

Re: Spark - Timeout Issues - OutOfMemoryError

2015-05-04 Thread ๏̯͡๏
Four tasks are now failing with

IndexIDAttemptStatus ▾Locality LevelExecutor ID / HostLaunch TimeDurationGC
TimeShuffle Read Size / RecordsShuffle Spill (Memory)Shuffle Spill (Disk)
Errors  0 3771 0 FAILED PROCESS_LOCAL 114 / host1 2015/05/04 01:27:44
 /   ExecutorLostFailure
(executor 114 lost)  1007 4973 1 FAILED PROCESS_LOCAL 420 / host2 2015/05/04
02:13:14   /   FetchFailed(null, shuffleId=1, mapId=-1, reduceId=1007,
message= +details

FetchFailed(null, shuffleId=1, mapId=-1, reduceId=1007, message=
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an
output location for shuffle 1
at 
org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:385)
at 
org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:382)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at 
org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:381)
at 
org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:177)
at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
at 
org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40)
at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:137)
at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:127)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:127)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

)

 371 4972 1 FAILED PROCESS_LOCAL 563 / host3 2015/05/04 02:13:14   /
FetchFailed(null,
shuffleId=1, mapId=-1, reduceId=371, message= +details

FetchFailed(null, shuffleId=1, mapId=-1, reduceId=371, message=
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an
output location for shuffle 1
at 
org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:385)
at 
org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:382)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at 

Re: Spark - Timeout Issues - OutOfMemoryError

2015-05-04 Thread Saisai Shao
From the symptoms you mentioned that one task's shuffle write is much
larger than all the other task, it is quite similar to normal data skew
behavior, I just give some advice based on your descriptions, I think you
need to detect whether data is actually skewed or not.

The shuffle will put data with same partitioner strategy (default is hash
partitioner) into one task, so the same key data will be put into the same
task, but one task not just has only one key.

2015-05-04 18:04 GMT+08:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com:

 Attached image shows the Spark UI for the job.





 On Mon, May 4, 2015 at 3:28 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 Four tasks are now failing with

 IndexIDAttemptStatus ▾Locality LevelExecutor ID / HostLaunch TimeDurationGC
 TimeShuffle Read Size / RecordsShuffle Spill (Memory)Shuffle Spill (Disk)
 Errors  0 3771 0 FAILED PROCESS_LOCAL 114 / host1 2015/05/04 01:27:44   /
   ExecutorLostFailure (executor 114 lost)  1007 4973 1 FAILED
 PROCESS_LOCAL 420 / host2 2015/05/04 02:13:14   /   FetchFailed(null,
 shuffleId=1, mapId=-1, reduceId=1007, message= +details

 FetchFailed(null, shuffleId=1, mapId=-1, reduceId=1007, message=
 org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output 
 location for shuffle 1
  at 
 org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:385)
  at 
 org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:382)
  at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
  at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
  at 
 org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:381)
  at 
 org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:177)
  at 
 org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
  at 
 org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40)
  at 
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:137)
  at 
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:127)
  at 
 scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
  at 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
  at 
 scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
  at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:127)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
  at 
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
  at 
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
  at 
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
  at 
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
  at 
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
  at org.apache.spark.scheduler.Task.run(Task.scala:64)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
  at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
  at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
  at java.lang.Thread.run(Thread.java:745)

 )

  371 4972 1 FAILED PROCESS_LOCAL 563 / host3 2015/05/04 02:13:14   /   
 FetchFailed(null,
 shuffleId=1, mapId=-1, reduceId=371, message= +details

 FetchFailed(null, shuffleId=1, mapId=-1, reduceId=371, message=
 org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output 
 location for shuffle 1
  at 
 org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:385)
  at 
 

Re: Spark - Timeout Issues - OutOfMemoryError

2015-05-04 Thread ๏̯͡๏
I ran it against one file instead of 10 files and i see one task is still
running after 33 mins its shuffle read size is 780MB/50 mil records.

I did a count of records for each itemId from dataset-2 [One FILE] (Second
Dataset (RDDPair) val viEvents = viEventsRaw.map { vi = (vi.get(14
).asInstanceOf[Long], vi) } ). This is the dataset that contains the list
of items viewed by user in one day.

*Item IdCount*
201335783004 537
111654496030 353
141640703798 287
191568402102 258
111654479898 217
231521843148 211
251931716094 201
111654493548 181
181503913062 181
121635453050 152
261798565828 140
151494617682 139
251927181728 127
231516683056 119
141640492864 117
161677270656 117
171771073616 113
111649942124 109
191516989450 97
231539161292 94
221555628408 88
131497785968 87
121632233872 84
131335379184 83
281531363490 83
131492727742 79
231174157820 79
161666914810 77
251699753072 77
161683664300 76


I was assuming that data-skew would be if the top item(201335783004) had a
count of 1 million, however its only few hundreds, then why is Spark
skewing it in join ? What should i do that Spark distributes the records
evenly ?

In M/R we can change the Partitioner between mapper and reducer, how can i
do in Spark  for Join?


IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch TimeDurationGC
TimeShuffle Read Size / Records ▴Shuffle Spill (Memory)Shuffle Spill (Disk)
Errors  0 3618 0 RUNNING PROCESS_LOCAL 4 / host12015/05/04 05:09:53 33 min
8.5 min  783.9 MB / 50,761,322  4.6 GB 47.5 MB   433 4051 0 SUCCESS
PROCESS_LOCAL 1 / host2 2015/05/04 05:16:27 1.1 min  20 s  116.0 MB /
4505143  1282.3 MB 10.1 MB   218 3836 0 SUCCESS PROCESS_LOCAL 3 /
host3 2015/05/04
05:13:01 53 s  11 s  76.4 MB / 2865143  879.6 MB 6.9 MB   113 3731 0 SUCCESS
PROCESS_LOCAL 2 / host4 2015/05/04 05:11:30 31 s  8 s  6.9 MB / 5187  0.0 B 0.0
B

On Mon, May 4, 2015 at 6:00 PM, Saisai Shao sai.sai.s...@gmail.com wrote:

 From the symptoms you mentioned that one task's shuffle write is much
 larger than all the other task, it is quite similar to normal data skew
 behavior, I just give some advice based on your descriptions, I think you
 need to detect whether data is actually skewed or not.

 The shuffle will put data with same partitioner strategy (default is hash
 partitioner) into one task, so the same key data will be put into the same
 task, but one task not just has only one key.

 2015-05-04 18:04 GMT+08:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com:

 Attached image shows the Spark UI for the job.





 On Mon, May 4, 2015 at 3:28 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 Four tasks are now failing with

 IndexIDAttemptStatus ▾Locality LevelExecutor ID / HostLaunch Time
 DurationGC TimeShuffle Read Size / RecordsShuffle Spill (Memory)Shuffle
 Spill (Disk)Errors  0 3771 0 FAILED PROCESS_LOCAL 114 / host1 2015/05/04
 01:27:44   /   ExecutorLostFailure (executor 114 lost)  1007 4973 1
 FAILED PROCESS_LOCAL 420 / host2 2015/05/04 02:13:14   /   FetchFailed(null,
 shuffleId=1, mapId=-1, reduceId=1007, message= +details

 FetchFailed(null, shuffleId=1, mapId=-1, reduceId=1007, message=
 org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output 
 location for shuffle 1
 at 
 org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:385)
 at 
 org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:382)
 at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
 at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
 at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
 at 
 org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:381)
 at 
 org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:177)
 at 
 org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
 at 
 org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40)
 at 
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:137)
 at 
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:127)
 at 
 scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
 at 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
 at 
 scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
 at 

Re: Spark - Timeout Issues - OutOfMemoryError

2015-05-04 Thread ๏̯͡๏
I tried this

val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary, Long))]
= lstgItem.join(viEvents, new org.apache.spark.RangePartitioner(partitions
= 1200, rdd = viEvents)).map {


It fired two jobs and still i have 1 task that never completes.
IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch TimeDurationGC
TimeShuffle Read Size / Records ▴Shuffle Spill (Memory)Shuffle Spill (Disk)
Errors  0 4818 0 RUNNING PROCESS_LOCAL 5 / host1 2015/05/04 07:24:25 1.1 h
13 min  778.0 MB / 50314161  4.5 GB 47.4 MB   955 5773 0 SUCCESS
PROCESS_LOCAL 5 / host2 2015/05/04 07:47:16 2.2 min  1.5 min  106.3 MB /
4197539  0.0 B 0.0 B   1199 6017 0 SUCCESS PROCESS_LOCAL 3 / host3 2015/05/04
07:51:51 48 s  2 s  94.2 MB / 3618335  2.8 GB 8.6 MB   216



2)
I tried reversing the datasets in join

val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary, Long))]
=viEvents.join(lstgItem)

This led to same problem of a long running task.
3)
Next, i am trying this

val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary, Long))]
= lstgItem.join(viEvents, 1200).map {


I have exhausted all my options.


Regards,

Deepak


On Mon, May 4, 2015 at 6:24 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 I ran it against one file instead of 10 files and i see one task is still
 running after 33 mins its shuffle read size is 780MB/50 mil records.

 I did a count of records for each itemId from dataset-2 [One FILE] (Second
 Dataset (RDDPair) val viEvents = viEventsRaw.map { vi = (vi.get(14
 ).asInstanceOf[Long], vi) } ). This is the dataset that contains the list
 of items viewed by user in one day.

 *Item IdCount*
 201335783004 537
 111654496030 353
 141640703798 287
 191568402102 258
 111654479898 217
 231521843148 211
 251931716094 201
 111654493548 181
 181503913062 181
 121635453050 152
 261798565828 140
 151494617682 139
 251927181728 127
 231516683056 119
 141640492864 117
 161677270656 117
 171771073616 113
 111649942124 109
 191516989450 97
 231539161292 94
 221555628408 88
 131497785968 87
 121632233872 84
 131335379184 83
 281531363490 83
 131492727742 79
 231174157820 79
 161666914810 77
 251699753072 77
 161683664300 76


 I was assuming that data-skew would be if the top item(201335783004) had
 a count of 1 million, however its only few hundreds, then why is Spark
 skewing it in join ? What should i do that Spark distributes the records
 evenly ?

 In M/R we can change the Partitioner between mapper and reducer, how can i
 do in Spark  for Join?


 IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch TimeDurationGC
 TimeShuffle Read Size / Records ▴Shuffle Spill (Memory)Shuffle Spill
 (Disk)Errors  0 3618 0 RUNNING PROCESS_LOCAL 4 / host12015/05/04 05:09:53 33
 min  8.5 min  783.9 MB / 50,761,322  4.6 GB 47.5 MB   433 4051 0 SUCCESS
 PROCESS_LOCAL 1 / host2 2015/05/04 05:16:27 1.1 min  20 s  116.0 MB /
 4505143  1282.3 MB 10.1 MB   218 3836 0 SUCCESS PROCESS_LOCAL 3 / host3 
 2015/05/04
 05:13:01 53 s  11 s  76.4 MB / 2865143  879.6 MB 6.9 MB   113 3731 0
 SUCCESS PROCESS_LOCAL 2 / host4 2015/05/04 05:11:30 31 s  8 s  6.9 MB /
 5187  0.0 B 0.0 B

 On Mon, May 4, 2015 at 6:00 PM, Saisai Shao sai.sai.s...@gmail.com
 wrote:

 From the symptoms you mentioned that one task's shuffle write is much
 larger than all the other task, it is quite similar to normal data skew
 behavior, I just give some advice based on your descriptions, I think you
 need to detect whether data is actually skewed or not.

 The shuffle will put data with same partitioner strategy (default is hash
 partitioner) into one task, so the same key data will be put into the same
 task, but one task not just has only one key.

 2015-05-04 18:04 GMT+08:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com:

 Attached image shows the Spark UI for the job.





 On Mon, May 4, 2015 at 3:28 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 Four tasks are now failing with

 IndexIDAttemptStatus ▾Locality LevelExecutor ID / HostLaunch Time
 DurationGC TimeShuffle Read Size / RecordsShuffle Spill (Memory)Shuffle
 Spill (Disk)Errors  0 3771 0 FAILED PROCESS_LOCAL 114 / host1 2015/05/04
 01:27:44   /   ExecutorLostFailure (executor 114 lost)  1007 4973 1
 FAILED PROCESS_LOCAL 420 / host2 2015/05/04 02:13:14   /   
 FetchFailed(null,
 shuffleId=1, mapId=-1, reduceId=1007, message= +details

 FetchFailed(null, shuffleId=1, mapId=-1, reduceId=1007, message=
 org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output 
 location for shuffle 1
at 
 org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:385)
at 
 org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:382)
at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
 

Re: Spark - Timeout Issues - OutOfMemoryError

2015-05-04 Thread ๏̯͡๏
Data Set 1 : viEvents : Is the event activity data of 1 day. I took 10
files out of it and 10 records

*Item ID Count*
 201335783004 3419  191568402102 1793  111654479898 1362  181503913062
1310  261798565828 1028  111654493548 994  231516683056 862  131497785968
746  161666914810 633  221749455474 432  201324502754 410  201334042634 402
191562605592 380  271841178238 362  161663339210 344  251615941886 313
261855748678 309  271821726658 255  111657099518 224  261868369938 218
181725710132 216  171766164072 215  221757076934 213  171763906872 212
111650132368 206  181629904282 204  261867932788 198  161668475280 194
191398227282 194





Data set 2:
ItemID Count
2217305702 1
3842604614 1
4463421160 1
4581260446 1
4632783223 1
4645316947 1
4760829454 1
4786989430 1
5530758430 1
5610056107 1
5661929425 1
5953801612 1
6141607456 1
6197204992 1
6220704442 1
6271022614 1
6282402871 1
6525123621 1
6554834772 1
6566297541 1
This data set will always have only one element for each item as it
contains metadata information.

Given the nature of these two datasets, if at all there is skewness then it
must be with dataset1. In dataset1 the top 20-30 records do not have record
count for a given itemID (shuffle key) greater than 3000 and that is very
small.

Why am i still *not* able to do a join of these two datasets given i have
unlimited capacity, repartitioning but 12G memory limit on each node.
Each time i get a task that runs forever and it process roughly around 1.5G
data when others are processing few MBs. Also 1.5G data (shuffle read size)
is very small.

Please suggest.


On Mon, May 4, 2015 at 9:08 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 I tried this

 val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary,
 Long))] = lstgItem.join(viEvents, new
 org.apache.spark.RangePartitioner(partitions = 1200, rdd = viEvents)).map
 {


 It fired two jobs and still i have 1 task that never completes.
 IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch TimeDurationGC
 TimeShuffle Read Size / Records ▴Shuffle Spill (Memory)Shuffle Spill
 (Disk)Errors  0 4818 0 RUNNING PROCESS_LOCAL 5 / host1 2015/05/04 07:24:25 1.1
 h  13 min  778.0 MB / 50314161  4.5 GB 47.4 MB   955 5773 0 SUCCESS
 PROCESS_LOCAL 5 / host2 2015/05/04 07:47:16 2.2 min  1.5 min  106.3 MB /
 4197539  0.0 B 0.0 B   1199 6017 0 SUCCESS PROCESS_LOCAL 3 / host3 2015/05/04
 07:51:51 48 s  2 s  94.2 MB / 3618335  2.8 GB 8.6 MB   216



 2)
 I tried reversing the datasets in join

 val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary,
 Long))] =viEvents.join(lstgItem)

 This led to same problem of a long running task.
 3)
 Next, i am trying this

 val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary,
 Long))] = lstgItem.join(viEvents, 1200).map {


 I have exhausted all my options.


 Regards,

 Deepak


 On Mon, May 4, 2015 at 6:24 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 I ran it against one file instead of 10 files and i see one task is still
 running after 33 mins its shuffle read size is 780MB/50 mil records.

 I did a count of records for each itemId from dataset-2 [One FILE] (Second
 Dataset (RDDPair) val viEvents = viEventsRaw.map { vi = (vi.get(14
 ).asInstanceOf[Long], vi) } ). This is the dataset that contains the
 list of items viewed by user in one day.

 *Item IdCount*
 201335783004 537
 111654496030 353
 141640703798 287
 191568402102 258
 111654479898 217
 231521843148 211
 251931716094 201
 111654493548 181
 181503913062 181
 121635453050 152
 261798565828 140
 151494617682 139
 251927181728 127
 231516683056 119
 141640492864 117
 161677270656 117
 171771073616 113
 111649942124 109
 191516989450 97
 231539161292 94
 221555628408 88
 131497785968 87
 121632233872 84
 131335379184 83
 281531363490 83
 131492727742 79
 231174157820 79
 161666914810 77
 251699753072 77
 161683664300 76


 I was assuming that data-skew would be if the top item(201335783004) had
 a count of 1 million, however its only few hundreds, then why is Spark
 skewing it in join ? What should i do that Spark distributes the records
 evenly ?

 In M/R we can change the Partitioner between mapper and reducer, how can
 i do in Spark  for Join?


 IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch TimeDurationGC
 TimeShuffle Read Size / Records ▴Shuffle Spill (Memory)Shuffle Spill
 (Disk)Errors  0 3618 0 RUNNING PROCESS_LOCAL 4 / host12015/05/04 05:09:53 33
 min  8.5 min  783.9 MB / 50,761,322  4.6 GB 47.5 MB   433 4051 0 SUCCESS
 PROCESS_LOCAL 1 / host2 2015/05/04 05:16:27 1.1 min  20 s  116.0 MB /
 4505143  1282.3 MB 10.1 MB   218 3836 0 SUCCESS PROCESS_LOCAL 3 / host3 
 2015/05/04
 05:13:01 53 s  11 s  76.4 MB / 2865143  879.6 MB 6.9 MB   113 3731 0
 SUCCESS PROCESS_LOCAL 2 / host4 2015/05/04 05:11:30 31 s  8 s  6.9 MB /
 5187  0.0 B 0.0 B

 On Mon, May 4, 2015 at 6:00 PM, Saisai Shao sai.sai.s...@gmail.com
 wrote:

 From the symptoms you mentioned that one 

Re: Spark - Timeout Issues - OutOfMemoryError

2015-05-03 Thread Dean Wampler
I don't know the full context of what you're doing, but serialization
errors usually mean you're attempting to serialize something that can't be
serialized, like the SparkContext. Kryo won't help there.

The arguments to spark-submit you posted previously look good:

2)  --num-executors 96 --driver-memory 12g --driver-java-options
-XX:MaxPermSize=10G --executor-memory 12g --executor-cores 4

I suspect you aren't getting the parallelism you need. For partitioning, if
your data is in HDFS and your block size is 128MB, then you'll get ~195
partitions anyway. If it takes 7 hours to do a join over 25GB of data, you
have some other serious bottleneck. You should examine the web console and
the logs to determine where all the time is going. Questions you might
pursue:

   - How long does each task take to complete?
   - How many of those 195 partitions/tasks are processed at the same time?
   That is, how many slots are available?  Maybe you need more nodes if the
   number of slots is too low. Based on your command arguments, you should be
   able to process 1/2 of them at a time, unless the cluster is busy.
   - Is the cluster swamped with other work?
   - How much data does each task process? Is the data roughly the same
   from one task to the next? If not, then you might have serious key skew?

You may also need to research the details of how joins are implemented and
some of the common tricks for organizing data to minimize having to shuffle
all N by M records.



Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Sun, May 3, 2015 at 11:02 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 Hello Deam,
 If I don;t use Kryo serializer i got Serialization error and hence am
 using it.
 If I don';t use partitionBy/reparition then the simply join never
 completed even after 7 hours and infact as next step i need to run it
 against 250G as that is my full dataset size. Someone here suggested to me
 to use repartition.

 Assuming reparition is mandatory , how do i decide whats the right number
 ? When i am using 400 i do not get NullPointerException that i talked
 about, which is strange. I never saw that exception against small random
 dataset but see it with 25G and again with 400 partitions , i do not see it.


 On Sun, May 3, 2015 at 9:15 PM, Dean Wampler deanwamp...@gmail.com
 wrote:

 IMHO, you are trying waaay to hard to optimize work on what is really a
 small data set. 25G, even 250G, is not that much data, especially if you've
 spent a month trying to get something to work that should be simple. All
 these errors are from optimization attempts.

 Kryo is great, but if it's not working reliably for some reason, then
 don't use it. Rather than force 200 partitions, let Spark try to figure out
 a good-enough number. (If you really need to force a partition count, use
 the repartition method instead, unless you're overriding the partitioner.)

 So. I recommend that you eliminate all the optimizations: Kryo,
 partitionBy, etc. Just use the simplest code you can. Make it work first.
 Then, if it really isn't fast enough, look for actual evidence of
 bottlenecks and optimize those.



 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Sun, May 3, 2015 at 10:22 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 Hello Dean  Others,
 Thanks for your suggestions.
 I have two data sets and all i want to do is a simple equi join. I have
 10G limit and as my dataset_1 exceeded that it was throwing OOM error.
 Hence i switched back to use .join() API instead of map-side broadcast
 join.
 I am repartitioning the data with 100,200 and i see a
 NullPointerException now.

 When i run against 25G of each input and with .partitionBy(new
 org.apache.spark.HashPartitioner(200)) , I see NullPointerExveption


 this trace does not include a line from my code and hence i do not what
 is causing error ?
 I do have registered kryo serializer.

 val conf = new SparkConf()
   .setAppName(detail)
 *  .set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer)*
   .set(spark.kryoserializer.buffer.mb,
 arguments.get(buffersize).get)
   .set(spark.kryoserializer.buffer.max.mb,
 arguments.get(maxbuffersize).get)
   .set(spark.driver.maxResultSize,
 arguments.get(maxResultSize).get)
   .set(spark.yarn.maxAppAttempts, 0)
 * 
 .registerKryoClasses(Array(classOf[com.ebay.ep.poc.spark.reporting.process.model.dw.SpsLeve*
 lMetricSum]))
 val sc = new SparkContext(conf)

 I see the exception when this task runs

 val viEvents = details.map { vi = (vi.get(14).asInstanceOf[Long], vi) }

 Its a simple mapping of input records to (itemId, record)

 I found this

 

Re: Spark - Timeout Issues - OutOfMemoryError

2015-05-03 Thread Dean Wampler
How big is the data you're returning to the driver with collectAsMap? You
are probably running out of memory trying to copy too much data back to it.

If you're trying to force a map-side join, Spark can do that for you in
some cases within the regular DataFrame/RDD context. See
http://spark.apache.org/docs/latest/sql-programming-guide.html#performance-tuning
and this talk by Michael Armbrust for example,
http://spark-summit.org/wp-content/uploads/2014/07/Performing-Advanced-Analytics-on-Relational-Data-with-Spark-SQL-Michael-Armbrust.pdf.


dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Thu, Apr 30, 2015 at 12:40 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 Full Exception
 *15/04/30 09:59:49 INFO scheduler.DAGScheduler: Stage 1 (collectAsMap at
 VISummaryDataProvider.scala:37) failed in 884.087 s*
 *15/04/30 09:59:49 INFO scheduler.DAGScheduler: Job 0 failed: collectAsMap
 at VISummaryDataProvider.scala:37, took 1093.418249 s*
 15/04/30 09:59:49 ERROR yarn.ApplicationMaster: User class threw
 exception: Job aborted due to stage failure: Exception while getting task
 result: org.apache.spark.SparkException: Error sending message [message =
 GetLocations(taskresult_112)]
 org.apache.spark.SparkException: Job aborted due to stage failure:
 Exception while getting task result: org.apache.spark.SparkException: Error
 sending message [message = GetLocations(taskresult_112)]
 at org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at
 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
 at scala.Option.foreach(Option.scala:236)
 at
 org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
 at
 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
 at
 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
 at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
 15/04/30 09:59:49 INFO yarn.ApplicationMaster: Final app status: FAILED,
 exitCode: 15, (reason: User class threw exception: Job aborted due to stage
 failure: Exception while getting task result:
 org.apache.spark.SparkException: Error sending message [message =
 GetLocations(taskresult_112)])


 *Code at line 37*

 val lstgItemMap = listings.map { lstg = (lstg.getItemId().toLong, lstg) }
 .collectAsMap

 Listing data set size is 26G (10 files) and my driver memory is 12G (I
 cant go beyond it). The reason i do collectAsMap is to brodcast it and do a
 map-side join instead of regular join.


 Please suggest ?


 On Thu, Apr 30, 2015 at 10:52 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 My Spark Job is failing  and i see

 ==

 15/04/30 09:59:49 ERROR yarn.ApplicationMaster: User class threw
 exception: Job aborted due to stage failure: Exception while getting task
 result: org.apache.spark.SparkException: Error sending message [message =
 GetLocations(taskresult_112)]

 org.apache.spark.SparkException: Job aborted due to stage failure:
 Exception while getting task result: org.apache.spark.SparkException: Error
 sending message [message = GetLocations(taskresult_112)]

 at org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)

 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)

 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)

 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

 at
 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)

 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)

 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)

 at scala.Option.foreach(Option.scala:236)

 at
 org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)


 java.util.concurrent.TimeoutException: Futures timed out after [30
 seconds]


 I see 

Re: Spark - Timeout Issues - OutOfMemoryError

2015-05-03 Thread Dean Wampler
IMHO, you are trying waaay to hard to optimize work on what is really a
small data set. 25G, even 250G, is not that much data, especially if you've
spent a month trying to get something to work that should be simple. All
these errors are from optimization attempts.

Kryo is great, but if it's not working reliably for some reason, then don't
use it. Rather than force 200 partitions, let Spark try to figure out a
good-enough number. (If you really need to force a partition count, use the
repartition method instead, unless you're overriding the partitioner.)

So. I recommend that you eliminate all the optimizations: Kryo,
partitionBy, etc. Just use the simplest code you can. Make it work first.
Then, if it really isn't fast enough, look for actual evidence of
bottlenecks and optimize those.



Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Sun, May 3, 2015 at 10:22 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 Hello Dean  Others,
 Thanks for your suggestions.
 I have two data sets and all i want to do is a simple equi join. I have
 10G limit and as my dataset_1 exceeded that it was throwing OOM error.
 Hence i switched back to use .join() API instead of map-side broadcast
 join.
 I am repartitioning the data with 100,200 and i see a NullPointerException
 now.

 When i run against 25G of each input and with .partitionBy(new
 org.apache.spark.HashPartitioner(200)) , I see NullPointerExveption


 this trace does not include a line from my code and hence i do not what is
 causing error ?
 I do have registered kryo serializer.

 val conf = new SparkConf()
   .setAppName(detail)
 *  .set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer)*
   .set(spark.kryoserializer.buffer.mb,
 arguments.get(buffersize).get)
   .set(spark.kryoserializer.buffer.max.mb,
 arguments.get(maxbuffersize).get)
   .set(spark.driver.maxResultSize,
 arguments.get(maxResultSize).get)
   .set(spark.yarn.maxAppAttempts, 0)
 * 
 .registerKryoClasses(Array(classOf[com.ebay.ep.poc.spark.reporting.process.model.dw.SpsLeve*
 lMetricSum]))
 val sc = new SparkContext(conf)

 I see the exception when this task runs

 val viEvents = details.map { vi = (vi.get(14).asInstanceOf[Long], vi) }

 Its a simple mapping of input records to (itemId, record)

 I found this

 http://stackoverflow.com/questions/23962796/kryo-readobject-cause-nullpointerexception-with-arraylist
 and

 http://apache-spark-user-list.1001560.n3.nabble.com/Kryo-NPE-with-Array-td19797.html

 Looks like Kryo (2.21v)  changed something to stop using default
 constructors.

 (Kryo.DefaultInstantiatorStrategy) 
 kryo.getInstantiatorStrategy()).setFallbackInstantiatorStrategy(new 
 StdInstantiatorStrategy());


 Please suggest


 Trace:
 15/05/01 03:02:15 ERROR executor.Executor: Exception in task 110.1 in
 stage 2.0 (TID 774)
 com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
 Serialization trace:
 values (org.apache.avro.generic.GenericData$Record)
 datum (org.apache.avro.mapred.AvroKey)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
 at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
 at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:41)
 at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33)
 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
 Regards,


 Any suggestions.
 I am not able to get this thing to work over a month now, its kind of
 getting frustrating.

 On Sun, May 3, 2015 at 8:03 PM, Dean Wampler deanwamp...@gmail.com
 wrote:

 How big is the data you're returning to the driver with collectAsMap? You
 are probably running out of memory trying to copy too much data back to it.

 If you're trying to force a map-side join, Spark can do that for you in
 some cases within the regular DataFrame/RDD context. See
 http://spark.apache.org/docs/latest/sql-programming-guide.html#performance-tuning
 and this talk by Michael Armbrust for example,
 http://spark-summit.org/wp-content/uploads/2014/07/Performing-Advanced-Analytics-on-Relational-Data-with-Spark-SQL-Michael-Armbrust.pdf.


 dean

 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Thu, Apr 30, 2015 at 12:40 PM, ÐΞ€ρ@Ҝ 

Re: Spark - Timeout Issues - OutOfMemoryError

2015-05-03 Thread ๏̯͡๏
Hello Dean  Others,
Thanks for your suggestions.
I have two data sets and all i want to do is a simple equi join. I have 10G
limit and as my dataset_1 exceeded that it was throwing OOM error. Hence i
switched back to use .join() API instead of map-side broadcast join.
I am repartitioning the data with 100,200 and i see a NullPointerException
now.

When i run against 25G of each input and with .partitionBy(new
org.apache.spark.HashPartitioner(200)) , I see NullPointerExveption


this trace does not include a line from my code and hence i do not what is
causing error ?
I do have registered kryo serializer.

val conf = new SparkConf()
  .setAppName(detail)
*  .set(spark.serializer,
org.apache.spark.serializer.KryoSerializer)*
  .set(spark.kryoserializer.buffer.mb,
arguments.get(buffersize).get)
  .set(spark.kryoserializer.buffer.max.mb,
arguments.get(maxbuffersize).get)
  .set(spark.driver.maxResultSize, arguments.get(maxResultSize).get)
  .set(spark.yarn.maxAppAttempts, 0)
* 
.registerKryoClasses(Array(classOf[com.ebay.ep.poc.spark.reporting.process.model.dw.SpsLeve*
lMetricSum]))
val sc = new SparkContext(conf)

I see the exception when this task runs

val viEvents = details.map { vi = (vi.get(14).asInstanceOf[Long], vi) }

Its a simple mapping of input records to (itemId, record)

I found this
http://stackoverflow.com/questions/23962796/kryo-readobject-cause-nullpointerexception-with-arraylist
and
http://apache-spark-user-list.1001560.n3.nabble.com/Kryo-NPE-with-Array-td19797.html

Looks like Kryo (2.21v)  changed something to stop using default
constructors.

(Kryo.DefaultInstantiatorStrategy)
kryo.getInstantiatorStrategy()).setFallbackInstantiatorStrategy(new
StdInstantiatorStrategy());


Please suggest


Trace:
15/05/01 03:02:15 ERROR executor.Executor: Exception in task 110.1 in stage
2.0 (TID 774)
com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
Serialization trace:
values (org.apache.avro.generic.GenericData$Record)
datum (org.apache.avro.mapred.AvroKey)
at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:41)
at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
Regards,


Any suggestions.
I am not able to get this thing to work over a month now, its kind of
getting frustrating.

On Sun, May 3, 2015 at 8:03 PM, Dean Wampler deanwamp...@gmail.com wrote:

 How big is the data you're returning to the driver with collectAsMap? You
 are probably running out of memory trying to copy too much data back to it.

 If you're trying to force a map-side join, Spark can do that for you in
 some cases within the regular DataFrame/RDD context. See
 http://spark.apache.org/docs/latest/sql-programming-guide.html#performance-tuning
 and this talk by Michael Armbrust for example,
 http://spark-summit.org/wp-content/uploads/2014/07/Performing-Advanced-Analytics-on-Relational-Data-with-Spark-SQL-Michael-Armbrust.pdf.


 dean

 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Thu, Apr 30, 2015 at 12:40 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 Full Exception
 *15/04/30 09:59:49 INFO scheduler.DAGScheduler: Stage 1 (collectAsMap at
 VISummaryDataProvider.scala:37) failed in 884.087 s*
 *15/04/30 09:59:49 INFO scheduler.DAGScheduler: Job 0 failed:
 collectAsMap at VISummaryDataProvider.scala:37, took 1093.418249 s*
 15/04/30 09:59:49 ERROR yarn.ApplicationMaster: User class threw
 exception: Job aborted due to stage failure: Exception while getting task
 result: org.apache.spark.SparkException: Error sending message [message =
 GetLocations(taskresult_112)]
 org.apache.spark.SparkException: Job aborted due to stage failure:
 Exception while getting task result: org.apache.spark.SparkException: Error
 sending message [message = GetLocations(taskresult_112)]
 at org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
 at
 

Re: Spark - Timeout Issues - OutOfMemoryError

2015-05-03 Thread ๏̯͡๏
Hello Deam,
If I don;t use Kryo serializer i got Serialization error and hence am using
it.
If I don';t use partitionBy/reparition then the simply join never completed
even after 7 hours and infact as next step i need to run it against 250G as
that is my full dataset size. Someone here suggested to me to use
repartition.

Assuming reparition is mandatory , how do i decide whats the right number ?
When i am using 400 i do not get NullPointerException that i talked about,
which is strange. I never saw that exception against small random dataset
but see it with 25G and again with 400 partitions , i do not see it.


On Sun, May 3, 2015 at 9:15 PM, Dean Wampler deanwamp...@gmail.com wrote:

 IMHO, you are trying waaay to hard to optimize work on what is really a
 small data set. 25G, even 250G, is not that much data, especially if you've
 spent a month trying to get something to work that should be simple. All
 these errors are from optimization attempts.

 Kryo is great, but if it's not working reliably for some reason, then
 don't use it. Rather than force 200 partitions, let Spark try to figure out
 a good-enough number. (If you really need to force a partition count, use
 the repartition method instead, unless you're overriding the partitioner.)

 So. I recommend that you eliminate all the optimizations: Kryo,
 partitionBy, etc. Just use the simplest code you can. Make it work first.
 Then, if it really isn't fast enough, look for actual evidence of
 bottlenecks and optimize those.



 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Sun, May 3, 2015 at 10:22 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 Hello Dean  Others,
 Thanks for your suggestions.
 I have two data sets and all i want to do is a simple equi join. I have
 10G limit and as my dataset_1 exceeded that it was throwing OOM error.
 Hence i switched back to use .join() API instead of map-side broadcast
 join.
 I am repartitioning the data with 100,200 and i see a
 NullPointerException now.

 When i run against 25G of each input and with .partitionBy(new
 org.apache.spark.HashPartitioner(200)) , I see NullPointerExveption


 this trace does not include a line from my code and hence i do not what
 is causing error ?
 I do have registered kryo serializer.

 val conf = new SparkConf()
   .setAppName(detail)
 *  .set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer)*
   .set(spark.kryoserializer.buffer.mb,
 arguments.get(buffersize).get)
   .set(spark.kryoserializer.buffer.max.mb,
 arguments.get(maxbuffersize).get)
   .set(spark.driver.maxResultSize,
 arguments.get(maxResultSize).get)
   .set(spark.yarn.maxAppAttempts, 0)
 * 
 .registerKryoClasses(Array(classOf[com.ebay.ep.poc.spark.reporting.process.model.dw.SpsLeve*
 lMetricSum]))
 val sc = new SparkContext(conf)

 I see the exception when this task runs

 val viEvents = details.map { vi = (vi.get(14).asInstanceOf[Long], vi) }

 Its a simple mapping of input records to (itemId, record)

 I found this

 http://stackoverflow.com/questions/23962796/kryo-readobject-cause-nullpointerexception-with-arraylist
 and

 http://apache-spark-user-list.1001560.n3.nabble.com/Kryo-NPE-with-Array-td19797.html

 Looks like Kryo (2.21v)  changed something to stop using default
 constructors.

 (Kryo.DefaultInstantiatorStrategy) 
 kryo.getInstantiatorStrategy()).setFallbackInstantiatorStrategy(new 
 StdInstantiatorStrategy());


 Please suggest


 Trace:
 15/05/01 03:02:15 ERROR executor.Executor: Exception in task 110.1 in
 stage 2.0 (TID 774)
 com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
 Serialization trace:
 values (org.apache.avro.generic.GenericData$Record)
 datum (org.apache.avro.mapred.AvroKey)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
 at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
 at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:41)
 at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33)
 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
 Regards,


 Any suggestions.
 I am not able to get this thing to work over a month now, its kind of
 getting frustrating.

 On Sun, May 3, 2015 at 8:03 PM, Dean Wampler deanwamp...@gmail.com
 wrote:

 How big is the data you're returning to the driver with collectAsMap?
 You are probably running out of memory trying to 

Re: Spark - Timeout Issues - OutOfMemoryError

2015-05-02 Thread Akhil Das
You could try repartitioning your listings RDD, also doing a collectAsMap
would basically bring all your data to driver, in that case you might want
to set the storage level as Memory and disk not sure that will do any help
on the driver though.

Thanks
Best Regards

On Thu, Apr 30, 2015 at 11:10 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 Full Exception
 *15/04/30 09:59:49 INFO scheduler.DAGScheduler: Stage 1 (collectAsMap at
 VISummaryDataProvider.scala:37) failed in 884.087 s*
 *15/04/30 09:59:49 INFO scheduler.DAGScheduler: Job 0 failed: collectAsMap
 at VISummaryDataProvider.scala:37, took 1093.418249 s*
 15/04/30 09:59:49 ERROR yarn.ApplicationMaster: User class threw
 exception: Job aborted due to stage failure: Exception while getting task
 result: org.apache.spark.SparkException: Error sending message [message =
 GetLocations(taskresult_112)]
 org.apache.spark.SparkException: Job aborted due to stage failure:
 Exception while getting task result: org.apache.spark.SparkException: Error
 sending message [message = GetLocations(taskresult_112)]
 at org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at
 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
 at scala.Option.foreach(Option.scala:236)
 at
 org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
 at
 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
 at
 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
 at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
 15/04/30 09:59:49 INFO yarn.ApplicationMaster: Final app status: FAILED,
 exitCode: 15, (reason: User class threw exception: Job aborted due to stage
 failure: Exception while getting task result:
 org.apache.spark.SparkException: Error sending message [message =
 GetLocations(taskresult_112)])


 *Code at line 37*

 val lstgItemMap = listings.map { lstg = (lstg.getItemId().toLong, lstg) }
 .collectAsMap

 Listing data set size is 26G (10 files) and my driver memory is 12G (I
 cant go beyond it). The reason i do collectAsMap is to brodcast it and do a
 map-side join instead of regular join.


 Please suggest ?


 On Thu, Apr 30, 2015 at 10:52 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 My Spark Job is failing  and i see

 ==

 15/04/30 09:59:49 ERROR yarn.ApplicationMaster: User class threw
 exception: Job aborted due to stage failure: Exception while getting task
 result: org.apache.spark.SparkException: Error sending message [message =
 GetLocations(taskresult_112)]

 org.apache.spark.SparkException: Job aborted due to stage failure:
 Exception while getting task result: org.apache.spark.SparkException: Error
 sending message [message = GetLocations(taskresult_112)]

 at org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)

 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)

 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)

 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

 at
 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)

 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)

 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)

 at scala.Option.foreach(Option.scala:236)

 at
 org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)


 java.util.concurrent.TimeoutException: Futures timed out after [30
 seconds]


 I see multiple of these

 Caused by: java.util.concurrent.TimeoutException: Futures timed out after
 [30 seconds]

 And finally i see this
 java.lang.OutOfMemoryError: Java heap space
 at java.nio.HeapByteBuffer.init(HeapByteBuffer.java:57)
 at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
 at
 org.apache.spark.network.BlockTransferService$$anon$1.onBlockFetchSuccess(BlockTransferService.scala:95)
 at
 

Re: Spark - Timeout Issues - OutOfMemoryError

2015-04-30 Thread Akhil Das
You could try increasing your heap space explicitly. like export
_JAVA_OPTIONS=-Xmx10g, its not the correct approach but try.

Thanks
Best Regards

On Tue, Apr 28, 2015 at 10:35 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 I have a SparkApp that runs completes in 45 mins for 5 files (5*750MB
 size) and it takes 16 executors to do so.

 I wanted to run it against 10 files of each input type (10*3 files as
 there are three inputs that are transformed). [Input1 = 10*750 MB,
 Input2=10*2.5GB, Input3 = 10*1.5G], Hence i used 32 executors.

 I see multiple
 5/04/28 09:23:31 WARN executor.Executor: Issue communicating with driver
 in heartbeater
 org.apache.spark.SparkException: Error sending message [message =
 Heartbeat(22,[Lscala.Tuple2;@2e4c404a,BlockManagerId(22,
 phxaishdc9dn1048.stratus.phx.ebay.com, 39505))]
 at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:209)
 at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427)
 Caused by: java.util.concurrent.TimeoutException: Futures timed out after
 [30 seconds]
 at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
 at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
 at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
 at
 scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
 at scala.concurrent.Await$.result(package.scala:107)
 at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195)
 ... 1 more


 When i searched deeper, i found OOM error.
 15/04/28 09:10:15 INFO storage.BlockManagerMasterActor: Removing block
 manager BlockManagerId(17, phxdpehdc9dn2643.stratus.phx.ebay.com, 36819)
 15/04/28 09:11:26 WARN storage.BlockManagerMasterActor: Removing
 BlockManager BlockManagerId(9, phxaishdc9dn1783.stratus.phx.ebay.com,
 48304) with no recent heart beats: 121200ms exceeds 12ms
 15/04/28 09:11:26 INFO storage.BlockManagerMasterActor: Removing block
 manager BlockManagerId(9, phxaishdc9dn1783.stratus.phx.ebay.com, 48304)
 15/04/28 09:11:26 ERROR util.Utils: Uncaught exception in thread
 task-result-getter-3
 java.lang.OutOfMemoryError: Java heap space
 at java.util.Arrays.copyOf(Arrays.java:2245)
 at java.util.Arrays.copyOf(Arrays.java:2219)
 at java.util.ArrayList.grow(ArrayList.java:242)
 at java.util.ArrayList.ensureExplicitCapacity(ArrayList.java:216)
 at java.util.ArrayList.ensureCapacityInternal(ArrayList.java:208)
 at java.util.ArrayList.add(ArrayList.java:440)
 at
 com.esotericsoftware.kryo.util.MapReferenceResolver.nextReadId(MapReferenceResolver.java:33)
 at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:766)
 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:727)
 at
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
 at
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
 at
 org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:173)
 at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79)
 at
 org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:621)
 at
 org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:379)
 at
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:82)
 at
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
 at
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
 at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618)
 at
 org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
 Exception in thread task-result-getter-3 java.lang.OutOfMemoryError:
 Java heap space
 at java.util.Arrays.copyOf(Arrays.java:2245)
 at java.util.Arrays.copyOf(Arrays.java:2219)
 at java.util.ArrayList.grow(ArrayList.java:242)
 at java.util.ArrayList.ensureExplicitCapacity(ArrayList.java:216)
 at java.util.ArrayList.ensureCapacityInternal(ArrayList.java:208)
 at java.util.ArrayList.add(ArrayList.java:440)
 at
 com.esotericsoftware.kryo.util.MapReferenceResolver.nextReadId(MapReferenceResolver.java:33)
 at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:766)
 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:727)
 at
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
 at
 

Re: Spark - Timeout Issues - OutOfMemoryError

2015-04-30 Thread ๏̯͡๏
Did not work. Same problem.



On Thu, Apr 30, 2015 at 1:28 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 You could try increasing your heap space explicitly. like export
 _JAVA_OPTIONS=-Xmx10g, its not the correct approach but try.

 Thanks
 Best Regards

 On Tue, Apr 28, 2015 at 10:35 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 I have a SparkApp that runs completes in 45 mins for 5 files (5*750MB
 size) and it takes 16 executors to do so.

 I wanted to run it against 10 files of each input type (10*3 files as
 there are three inputs that are transformed). [Input1 = 10*750 MB,
 Input2=10*2.5GB, Input3 = 10*1.5G], Hence i used 32 executors.

 I see multiple
 5/04/28 09:23:31 WARN executor.Executor: Issue communicating with driver
 in heartbeater
 org.apache.spark.SparkException: Error sending message [message =
 Heartbeat(22,[Lscala.Tuple2;@2e4c404a,BlockManagerId(22,
 phxaishdc9dn1048.stratus.phx.ebay.com, 39505))]
 at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:209)
 at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427)
 Caused by: java.util.concurrent.TimeoutException: Futures timed out after
 [30 seconds]
 at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
 at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
 at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
 at
 scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
 at scala.concurrent.Await$.result(package.scala:107)
 at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195)
 ... 1 more


 When i searched deeper, i found OOM error.
 15/04/28 09:10:15 INFO storage.BlockManagerMasterActor: Removing block
 manager BlockManagerId(17, phxdpehdc9dn2643.stratus.phx.ebay.com, 36819)
 15/04/28 09:11:26 WARN storage.BlockManagerMasterActor: Removing
 BlockManager BlockManagerId(9, phxaishdc9dn1783.stratus.phx.ebay.com,
 48304) with no recent heart beats: 121200ms exceeds 12ms
 15/04/28 09:11:26 INFO storage.BlockManagerMasterActor: Removing block
 manager BlockManagerId(9, phxaishdc9dn1783.stratus.phx.ebay.com, 48304)
 15/04/28 09:11:26 ERROR util.Utils: Uncaught exception in thread
 task-result-getter-3
 java.lang.OutOfMemoryError: Java heap space
 at java.util.Arrays.copyOf(Arrays.java:2245)
 at java.util.Arrays.copyOf(Arrays.java:2219)
 at java.util.ArrayList.grow(ArrayList.java:242)
 at java.util.ArrayList.ensureExplicitCapacity(ArrayList.java:216)
 at java.util.ArrayList.ensureCapacityInternal(ArrayList.java:208)
 at java.util.ArrayList.add(ArrayList.java:440)
 at
 com.esotericsoftware.kryo.util.MapReferenceResolver.nextReadId(MapReferenceResolver.java:33)
 at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:766)
 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:727)
 at
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
 at
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
 at
 org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:173)
 at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79)
 at
 org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:621)
 at
 org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:379)
 at
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:82)
 at
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
 at
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
 at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618)
 at
 org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
 Exception in thread task-result-getter-3 java.lang.OutOfMemoryError:
 Java heap space
 at java.util.Arrays.copyOf(Arrays.java:2245)
 at java.util.Arrays.copyOf(Arrays.java:2219)
 at java.util.ArrayList.grow(ArrayList.java:242)
 at java.util.ArrayList.ensureExplicitCapacity(ArrayList.java:216)
 at java.util.ArrayList.ensureCapacityInternal(ArrayList.java:208)
 at java.util.ArrayList.add(ArrayList.java:440)
 at
 com.esotericsoftware.kryo.util.MapReferenceResolver.nextReadId(MapReferenceResolver.java:33)
 at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:766)
 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:727)
 at
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)

Re: Spark - Timeout Issues - OutOfMemoryError

2015-04-30 Thread ๏̯͡๏
Full Exception
*15/04/30 09:59:49 INFO scheduler.DAGScheduler: Stage 1 (collectAsMap at
VISummaryDataProvider.scala:37) failed in 884.087 s*
*15/04/30 09:59:49 INFO scheduler.DAGScheduler: Job 0 failed: collectAsMap
at VISummaryDataProvider.scala:37, took 1093.418249 s*
15/04/30 09:59:49 ERROR yarn.ApplicationMaster: User class threw exception:
Job aborted due to stage failure: Exception while getting task result:
org.apache.spark.SparkException: Error sending message [message =
GetLocations(taskresult_112)]
org.apache.spark.SparkException: Job aborted due to stage failure:
Exception while getting task result: org.apache.spark.SparkException: Error
sending message [message = GetLocations(taskresult_112)]
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
15/04/30 09:59:49 INFO yarn.ApplicationMaster: Final app status: FAILED,
exitCode: 15, (reason: User class threw exception: Job aborted due to stage
failure: Exception while getting task result:
org.apache.spark.SparkException: Error sending message [message =
GetLocations(taskresult_112)])


*Code at line 37*

val lstgItemMap = listings.map { lstg = (lstg.getItemId().toLong, lstg) }
.collectAsMap

Listing data set size is 26G (10 files) and my driver memory is 12G (I cant
go beyond it). The reason i do collectAsMap is to brodcast it and do a
map-side join instead of regular join.


Please suggest ?


On Thu, Apr 30, 2015 at 10:52 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 My Spark Job is failing  and i see

 ==

 15/04/30 09:59:49 ERROR yarn.ApplicationMaster: User class threw
 exception: Job aborted due to stage failure: Exception while getting task
 result: org.apache.spark.SparkException: Error sending message [message =
 GetLocations(taskresult_112)]

 org.apache.spark.SparkException: Job aborted due to stage failure:
 Exception while getting task result: org.apache.spark.SparkException: Error
 sending message [message = GetLocations(taskresult_112)]

 at org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)

 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)

 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)

 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

 at
 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)

 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)

 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)

 at scala.Option.foreach(Option.scala:236)

 at
 org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)


 java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]


 I see multiple of these

 Caused by: java.util.concurrent.TimeoutException: Futures timed out after
 [30 seconds]

 And finally i see this
 java.lang.OutOfMemoryError: Java heap space
 at java.nio.HeapByteBuffer.init(HeapByteBuffer.java:57)
 at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
 at
 org.apache.spark.network.BlockTransferService$$anon$1.onBlockFetchSuccess(BlockTransferService.scala:95)
 at
 org.apache.spark.network.shuffle.RetryingBlockFetcher$RetryingBlockFetchListener.onBlockFetchSuccess(RetryingBlockFetcher.java:206)
 at
 org.apache.spark.network.shuffle.OneForOneBlockFetcher$ChunkCallback.onSuccess(OneForOneBlockFetcher.java:72)
 at
 org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:124)
 at
 org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:93)
 at
 

Spark - Timeout Issues - OutOfMemoryError

2015-04-28 Thread ๏̯͡๏
I have a SparkApp that runs completes in 45 mins for 5 files (5*750MB size)
and it takes 16 executors to do so.

I wanted to run it against 10 files of each input type (10*3 files as there
are three inputs that are transformed). [Input1 = 10*750 MB,
Input2=10*2.5GB, Input3 = 10*1.5G], Hence i used 32 executors.

I see multiple
5/04/28 09:23:31 WARN executor.Executor: Issue communicating with driver in
heartbeater
org.apache.spark.SparkException: Error sending message [message =
Heartbeat(22,[Lscala.Tuple2;@2e4c404a,BlockManagerId(22,
phxaishdc9dn1048.stratus.phx.ebay.com, 39505))]
at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:209)
at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after
[30 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195)
... 1 more


When i searched deeper, i found OOM error.
15/04/28 09:10:15 INFO storage.BlockManagerMasterActor: Removing block
manager BlockManagerId(17, phxdpehdc9dn2643.stratus.phx.ebay.com, 36819)
15/04/28 09:11:26 WARN storage.BlockManagerMasterActor: Removing
BlockManager BlockManagerId(9, phxaishdc9dn1783.stratus.phx.ebay.com,
48304) with no recent heart beats: 121200ms exceeds 12ms
15/04/28 09:11:26 INFO storage.BlockManagerMasterActor: Removing block
manager BlockManagerId(9, phxaishdc9dn1783.stratus.phx.ebay.com, 48304)
15/04/28 09:11:26 ERROR util.Utils: Uncaught exception in thread
task-result-getter-3
java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:2245)
at java.util.Arrays.copyOf(Arrays.java:2219)
at java.util.ArrayList.grow(ArrayList.java:242)
at java.util.ArrayList.ensureExplicitCapacity(ArrayList.java:216)
at java.util.ArrayList.ensureCapacityInternal(ArrayList.java:208)
at java.util.ArrayList.add(ArrayList.java:440)
at
com.esotericsoftware.kryo.util.MapReferenceResolver.nextReadId(MapReferenceResolver.java:33)
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:766)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:727)
at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
at
org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:173)
at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79)
at
org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:621)
at
org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:379)
at
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:82)
at
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
at
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618)
at
org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Exception in thread task-result-getter-3 java.lang.OutOfMemoryError: Java
heap space
at java.util.Arrays.copyOf(Arrays.java:2245)
at java.util.Arrays.copyOf(Arrays.java:2219)
at java.util.ArrayList.grow(ArrayList.java:242)
at java.util.ArrayList.ensureExplicitCapacity(ArrayList.java:216)
at java.util.ArrayList.ensureCapacityInternal(ArrayList.java:208)
at java.util.ArrayList.add(ArrayList.java:440)
at
com.esotericsoftware.kryo.util.MapReferenceResolver.nextReadId(MapReferenceResolver.java:33)
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:766)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:727)
at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
at
org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:173)
at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79)
at