Re: Spark - Timeout Issues - OutOfMemoryError
Hello Dean Others, Thanks for the response. I tried with 100,200, 400, 600 and 1200 repartitions with 100,200,400 and 800 executors. Each time all the tasks of join complete in less than a minute except one and that one tasks runs forever. I have a huge cluster at my disposal. The data for each of 1199 tasks is around 40MB/30k records and for 1 never ending task is 1.5G/98million records. I see that there is data skew among tasks. I had observed this a week earlier and i have no clue on how to fix it and when someone suggested that repartition might make things more parallel, but the problem is still persistent. Please suggest on how to get the task to complete. All i want to do is join two datasets. (dataset1 is in sequence file and dataset2 is in avro format). Ex: Tasks IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch Time DurationGC TimeShuffle Read Size / RecordsShuffle Spill (Memory)Shuffle Spill (Disk)Errors 0 3771 0 RUNNING PROCESS_LOCAL 114 / host1 2015/05/04 01:27:44 7.3 min 19 s 1591.2 MB / 98931767 0.0 B 0.0 B 1 3772 0 SUCCESS PROCESS_LOCAL 226 / host2 2015/05/04 01:27:44 28 s 2 s 39.2 MB / 29754 0.0 B 0.0 B 2 3773 0 SUCCESS PROCESS_LOCAL 283 / host3 2015/05/04 01:27:44 26 s 2 s 39.0 MB / 29646 0.0 B 0.0 B 5 3776 0 SUCCESS PROCESS_LOCAL 320 / host4 2015/05/04 01:27:44 31 s 3 s 38.8 MB / 29512 0.0 B 0.0 B 4 3775 0 SUCCESS PROCESS_LOCAL 203 / host5 2015/05/04 01:27:44 41 s 3 s 38.4 MB / 29169 0.0 B 0.0 B 3 3774 0 SUCCESS PROCESS_LOCAL 84 / host6 2015/05/04 01:27:44 24 s 2 s 38.5 MB / 29258 0.0 B 0.0 B 8 3779 0 SUCCESS PROCESS_LOCAL 309 / host7 2015/05/04 01:27:44 31 s 4 s 39.5 MB / 30008 0.0 B 0.0 B There are 1200 tasks in total. On Sun, May 3, 2015 at 9:53 PM, Dean Wampler deanwamp...@gmail.com wrote: I don't know the full context of what you're doing, but serialization errors usually mean you're attempting to serialize something that can't be serialized, like the SparkContext. Kryo won't help there. The arguments to spark-submit you posted previously look good: 2) --num-executors 96 --driver-memory 12g --driver-java-options -XX:MaxPermSize=10G --executor-memory 12g --executor-cores 4 I suspect you aren't getting the parallelism you need. For partitioning, if your data is in HDFS and your block size is 128MB, then you'll get ~195 partitions anyway. If it takes 7 hours to do a join over 25GB of data, you have some other serious bottleneck. You should examine the web console and the logs to determine where all the time is going. Questions you might pursue: - How long does each task take to complete? - How many of those 195 partitions/tasks are processed at the same time? That is, how many slots are available? Maybe you need more nodes if the number of slots is too low. Based on your command arguments, you should be able to process 1/2 of them at a time, unless the cluster is busy. - Is the cluster swamped with other work? - How much data does each task process? Is the data roughly the same from one task to the next? If not, then you might have serious key skew? You may also need to research the details of how joins are implemented and some of the common tricks for organizing data to minimize having to shuffle all N by M records. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Sun, May 3, 2015 at 11:02 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Hello Deam, If I don;t use Kryo serializer i got Serialization error and hence am using it. If I don';t use partitionBy/reparition then the simply join never completed even after 7 hours and infact as next step i need to run it against 250G as that is my full dataset size. Someone here suggested to me to use repartition. Assuming reparition is mandatory , how do i decide whats the right number ? When i am using 400 i do not get NullPointerException that i talked about, which is strange. I never saw that exception against small random dataset but see it with 25G and again with 400 partitions , i do not see it. On Sun, May 3, 2015 at 9:15 PM, Dean Wampler deanwamp...@gmail.com wrote: IMHO, you are trying waaay to hard to optimize work on what is really a small data set. 25G, even 250G, is not that much data, especially if you've spent a month trying to get something to work that should be simple. All these errors are from optimization attempts. Kryo is great, but if it's not working reliably for some reason, then don't use it. Rather than force 200 partitions, let Spark try to figure out a good-enough number. (If you really need to force a partition count, use the repartition method instead, unless you're overriding the partitioner.) So. I recommend that you eliminate all the optimizations: Kryo, partitionBy, etc. Just use the simplest
Re: Spark - Timeout Issues - OutOfMemoryError
IMHO If your data or your algorithm is prone to data skew, I think you have to fix this from application level, Spark itself cannot overcome this problem (if one key has large amount of values), you may change your algorithm to choose another shuffle key, somethings like this to avoid shuffle on skewed keys. 2015-05-04 16:41 GMT+08:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com: Hello Dean Others, Thanks for the response. I tried with 100,200, 400, 600 and 1200 repartitions with 100,200,400 and 800 executors. Each time all the tasks of join complete in less than a minute except one and that one tasks runs forever. I have a huge cluster at my disposal. The data for each of 1199 tasks is around 40MB/30k records and for 1 never ending task is 1.5G/98million records. I see that there is data skew among tasks. I had observed this a week earlier and i have no clue on how to fix it and when someone suggested that repartition might make things more parallel, but the problem is still persistent. Please suggest on how to get the task to complete. All i want to do is join two datasets. (dataset1 is in sequence file and dataset2 is in avro format). Ex: Tasks IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch Time DurationGC TimeShuffle Read Size / RecordsShuffle Spill (Memory)Shuffle Spill (Disk)Errors 0 3771 0 RUNNING PROCESS_LOCAL 114 / host1 2015/05/04 01:27:44 7.3 min 19 s 1591.2 MB / 98931767 0.0 B 0.0 B 1 3772 0 SUCCESS PROCESS_LOCAL 226 / host2 2015/05/04 01:27:44 28 s 2 s 39.2 MB / 29754 0.0 B 0.0 B 2 3773 0 SUCCESS PROCESS_LOCAL 283 / host3 2015/05/04 01:27:44 26 s 2 s 39.0 MB / 29646 0.0 B 0.0 B 5 3776 0 SUCCESS PROCESS_LOCAL 320 / host4 2015/05/04 01:27:44 31 s 3 s 38.8 MB / 29512 0.0 B 0.0 B 4 3775 0 SUCCESS PROCESS_LOCAL 203 / host5 2015/05/04 01:27:44 41 s 3 s 38.4 MB / 29169 0.0 B 0.0 B 3 3774 0 SUCCESS PROCESS_LOCAL 84 / host6 2015/05/04 01:27:44 24 s 2 s 38.5 MB / 29258 0.0 B 0.0 B 8 3779 0 SUCCESS PROCESS_LOCAL 309 / host7 2015/05/04 01:27:44 31 s 4 s 39.5 MB / 30008 0.0 B 0.0 B There are 1200 tasks in total. On Sun, May 3, 2015 at 9:53 PM, Dean Wampler deanwamp...@gmail.com wrote: I don't know the full context of what you're doing, but serialization errors usually mean you're attempting to serialize something that can't be serialized, like the SparkContext. Kryo won't help there. The arguments to spark-submit you posted previously look good: 2) --num-executors 96 --driver-memory 12g --driver-java-options -XX:MaxPermSize=10G --executor-memory 12g --executor-cores 4 I suspect you aren't getting the parallelism you need. For partitioning, if your data is in HDFS and your block size is 128MB, then you'll get ~195 partitions anyway. If it takes 7 hours to do a join over 25GB of data, you have some other serious bottleneck. You should examine the web console and the logs to determine where all the time is going. Questions you might pursue: - How long does each task take to complete? - How many of those 195 partitions/tasks are processed at the same time? That is, how many slots are available? Maybe you need more nodes if the number of slots is too low. Based on your command arguments, you should be able to process 1/2 of them at a time, unless the cluster is busy. - Is the cluster swamped with other work? - How much data does each task process? Is the data roughly the same from one task to the next? If not, then you might have serious key skew? You may also need to research the details of how joins are implemented and some of the common tricks for organizing data to minimize having to shuffle all N by M records. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Sun, May 3, 2015 at 11:02 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Hello Deam, If I don;t use Kryo serializer i got Serialization error and hence am using it. If I don';t use partitionBy/reparition then the simply join never completed even after 7 hours and infact as next step i need to run it against 250G as that is my full dataset size. Someone here suggested to me to use repartition. Assuming reparition is mandatory , how do i decide whats the right number ? When i am using 400 i do not get NullPointerException that i talked about, which is strange. I never saw that exception against small random dataset but see it with 25G and again with 400 partitions , i do not see it. On Sun, May 3, 2015 at 9:15 PM, Dean Wampler deanwamp...@gmail.com wrote: IMHO, you are trying waaay to hard to optimize work on what is really a small data set. 25G, even 250G, is not that much data, especially if you've spent a month trying to get something to work that should be simple. All these errors are from optimization attempts.
Re: Spark - Timeout Issues - OutOfMemoryError
Hello Shao, Can you talk more about shuffle key or point me to APIs that allow me to change shuffle key. I will try with different keys and see the performance. What is the shuffle key by default ? On Mon, May 4, 2015 at 2:37 PM, Saisai Shao sai.sai.s...@gmail.com wrote: IMHO If your data or your algorithm is prone to data skew, I think you have to fix this from application level, Spark itself cannot overcome this problem (if one key has large amount of values), you may change your algorithm to choose another shuffle key, somethings like this to avoid shuffle on skewed keys. 2015-05-04 16:41 GMT+08:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com: Hello Dean Others, Thanks for the response. I tried with 100,200, 400, 600 and 1200 repartitions with 100,200,400 and 800 executors. Each time all the tasks of join complete in less than a minute except one and that one tasks runs forever. I have a huge cluster at my disposal. The data for each of 1199 tasks is around 40MB/30k records and for 1 never ending task is 1.5G/98million records. I see that there is data skew among tasks. I had observed this a week earlier and i have no clue on how to fix it and when someone suggested that repartition might make things more parallel, but the problem is still persistent. Please suggest on how to get the task to complete. All i want to do is join two datasets. (dataset1 is in sequence file and dataset2 is in avro format). Ex: Tasks IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch Time DurationGC TimeShuffle Read Size / RecordsShuffle Spill (Memory)Shuffle Spill (Disk)Errors 0 3771 0 RUNNING PROCESS_LOCAL 114 / host1 2015/05/04 01:27:44 7.3 min 19 s 1591.2 MB / 98931767 0.0 B 0.0 B 1 3772 0 SUCCESS PROCESS_LOCAL 226 / host2 2015/05/04 01:27:44 28 s 2 s 39.2 MB / 29754 0.0 B 0.0 B 2 3773 0 SUCCESS PROCESS_LOCAL 283 / host3 2015/05/04 01:27:44 26 s 2 s 39.0 MB / 29646 0.0 B 0.0 B 5 3776 0 SUCCESS PROCESS_LOCAL 320 / host4 2015/05/04 01:27:44 31 s 3 s 38.8 MB / 29512 0.0 B 0.0 B 4 3775 0 SUCCESS PROCESS_LOCAL 203 / host5 2015/05/04 01:27:44 41 s 3 s 38.4 MB / 29169 0.0 B 0.0 B 3 3774 0 SUCCESS PROCESS_LOCAL 84 / host6 2015/05/04 01:27:44 24 s 2 s 38.5 MB / 29258 0.0 B 0.0 B 8 3779 0 SUCCESS PROCESS_LOCAL 309 / host7 2015/05/04 01:27:44 31 s 4 s 39.5 MB / 30008 0.0 B 0.0 B There are 1200 tasks in total. On Sun, May 3, 2015 at 9:53 PM, Dean Wampler deanwamp...@gmail.com wrote: I don't know the full context of what you're doing, but serialization errors usually mean you're attempting to serialize something that can't be serialized, like the SparkContext. Kryo won't help there. The arguments to spark-submit you posted previously look good: 2) --num-executors 96 --driver-memory 12g --driver-java-options -XX:MaxPermSize=10G --executor-memory 12g --executor-cores 4 I suspect you aren't getting the parallelism you need. For partitioning, if your data is in HDFS and your block size is 128MB, then you'll get ~195 partitions anyway. If it takes 7 hours to do a join over 25GB of data, you have some other serious bottleneck. You should examine the web console and the logs to determine where all the time is going. Questions you might pursue: - How long does each task take to complete? - How many of those 195 partitions/tasks are processed at the same time? That is, how many slots are available? Maybe you need more nodes if the number of slots is too low. Based on your command arguments, you should be able to process 1/2 of them at a time, unless the cluster is busy. - Is the cluster swamped with other work? - How much data does each task process? Is the data roughly the same from one task to the next? If not, then you might have serious key skew? You may also need to research the details of how joins are implemented and some of the common tricks for organizing data to minimize having to shuffle all N by M records. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Sun, May 3, 2015 at 11:02 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Hello Deam, If I don;t use Kryo serializer i got Serialization error and hence am using it. If I don';t use partitionBy/reparition then the simply join never completed even after 7 hours and infact as next step i need to run it against 250G as that is my full dataset size. Someone here suggested to me to use repartition. Assuming reparition is mandatory , how do i decide whats the right number ? When i am using 400 i do not get NullPointerException that i talked about, which is strange. I never saw that exception against small random dataset but see it with 25G and again with 400 partitions , i do not see it. On Sun, May 3, 2015 at 9:15 PM, Dean Wampler
Re: Spark - Timeout Issues - OutOfMemoryError
Shuffle key is depending on your implementation, I'm not sure if you are familiar with MapReduce, the mapper output is a key-value pair, where the key is the shuffle key for shuffling, Spark is also the same. 2015-05-04 17:31 GMT+08:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com: Hello Shao, Can you talk more about shuffle key or point me to APIs that allow me to change shuffle key. I will try with different keys and see the performance. What is the shuffle key by default ? On Mon, May 4, 2015 at 2:37 PM, Saisai Shao sai.sai.s...@gmail.com wrote: IMHO If your data or your algorithm is prone to data skew, I think you have to fix this from application level, Spark itself cannot overcome this problem (if one key has large amount of values), you may change your algorithm to choose another shuffle key, somethings like this to avoid shuffle on skewed keys. 2015-05-04 16:41 GMT+08:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com: Hello Dean Others, Thanks for the response. I tried with 100,200, 400, 600 and 1200 repartitions with 100,200,400 and 800 executors. Each time all the tasks of join complete in less than a minute except one and that one tasks runs forever. I have a huge cluster at my disposal. The data for each of 1199 tasks is around 40MB/30k records and for 1 never ending task is 1.5G/98million records. I see that there is data skew among tasks. I had observed this a week earlier and i have no clue on how to fix it and when someone suggested that repartition might make things more parallel, but the problem is still persistent. Please suggest on how to get the task to complete. All i want to do is join two datasets. (dataset1 is in sequence file and dataset2 is in avro format). Ex: Tasks IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch Time DurationGC TimeShuffle Read Size / RecordsShuffle Spill (Memory)Shuffle Spill (Disk)Errors 0 3771 0 RUNNING PROCESS_LOCAL 114 / host1 2015/05/04 01:27:44 7.3 min 19 s 1591.2 MB / 98931767 0.0 B 0.0 B 1 3772 0 SUCCESS PROCESS_LOCAL 226 / host2 2015/05/04 01:27:44 28 s 2 s 39.2 MB / 29754 0.0 B 0.0 B 2 3773 0 SUCCESS PROCESS_LOCAL 283 / host3 2015/05/04 01:27:44 26 s 2 s 39.0 MB / 29646 0.0 B 0.0 B 5 3776 0 SUCCESS PROCESS_LOCAL 320 / host4 2015/05/04 01:27:44 31 s 3 s 38.8 MB / 29512 0.0 B 0.0 B 4 3775 0 SUCCESS PROCESS_LOCAL 203 / host5 2015/05/04 01:27:44 41 s 3 s 38.4 MB / 29169 0.0 B 0.0 B 3 3774 0 SUCCESS PROCESS_LOCAL 84 / host6 2015/05/04 01:27:44 24 s 2 s 38.5 MB / 29258 0.0 B 0.0 B 8 3779 0 SUCCESS PROCESS_LOCAL 309 / host7 2015/05/04 01:27:44 31 s 4 s 39.5 MB / 30008 0.0 B 0.0 B There are 1200 tasks in total. On Sun, May 3, 2015 at 9:53 PM, Dean Wampler deanwamp...@gmail.com wrote: I don't know the full context of what you're doing, but serialization errors usually mean you're attempting to serialize something that can't be serialized, like the SparkContext. Kryo won't help there. The arguments to spark-submit you posted previously look good: 2) --num-executors 96 --driver-memory 12g --driver-java-options -XX:MaxPermSize=10G --executor-memory 12g --executor-cores 4 I suspect you aren't getting the parallelism you need. For partitioning, if your data is in HDFS and your block size is 128MB, then you'll get ~195 partitions anyway. If it takes 7 hours to do a join over 25GB of data, you have some other serious bottleneck. You should examine the web console and the logs to determine where all the time is going. Questions you might pursue: - How long does each task take to complete? - How many of those 195 partitions/tasks are processed at the same time? That is, how many slots are available? Maybe you need more nodes if the number of slots is too low. Based on your command arguments, you should be able to process 1/2 of them at a time, unless the cluster is busy. - Is the cluster swamped with other work? - How much data does each task process? Is the data roughly the same from one task to the next? If not, then you might have serious key skew? You may also need to research the details of how joins are implemented and some of the common tricks for organizing data to minimize having to shuffle all N by M records. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Sun, May 3, 2015 at 11:02 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Hello Deam, If I don;t use Kryo serializer i got Serialization error and hence am using it. If I don';t use partitionBy/reparition then the simply join never completed even after 7 hours and infact as next step i need to run it against 250G as that is my full dataset size. Someone here suggested to me to use repartition. Assuming reparition is mandatory , how do i decide whats the right number ?
Re: Spark - Timeout Issues - OutOfMemoryError
One dataset (RDD Pair) val lstgItem = listings.map { lstg = (lstg.getItemId().toLong, lstg) } Second Dataset (RDDPair) val viEvents = viEventsRaw.map { vi = (vi.get(14).asInstanceOf[Long], vi) } As i want to join based on item Id that is used as first element in the tuple in both cases and i think thats what is shuffle key. listings == Data set contains all the unique item ids that are ever listed on the ecommerce site. viEvents === List of items viewed by user in last day. This will always be a subset of the total set. So i do not understand what is data skewness. When my long running task is working on 1591.2 MB / 98,931,767 does that mean 98 million reocrds contain all the same item ID ? How can millions of user look at the same item in last day ? Or does this dataset contain records across item ids ? Regards, Deepak On Mon, May 4, 2015 at 3:08 PM, Saisai Shao sai.sai.s...@gmail.com wrote: Shuffle key is depending on your implementation, I'm not sure if you are familiar with MapReduce, the mapper output is a key-value pair, where the key is the shuffle key for shuffling, Spark is also the same. 2015-05-04 17:31 GMT+08:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com: Hello Shao, Can you talk more about shuffle key or point me to APIs that allow me to change shuffle key. I will try with different keys and see the performance. What is the shuffle key by default ? On Mon, May 4, 2015 at 2:37 PM, Saisai Shao sai.sai.s...@gmail.com wrote: IMHO If your data or your algorithm is prone to data skew, I think you have to fix this from application level, Spark itself cannot overcome this problem (if one key has large amount of values), you may change your algorithm to choose another shuffle key, somethings like this to avoid shuffle on skewed keys. 2015-05-04 16:41 GMT+08:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com: Hello Dean Others, Thanks for the response. I tried with 100,200, 400, 600 and 1200 repartitions with 100,200,400 and 800 executors. Each time all the tasks of join complete in less than a minute except one and that one tasks runs forever. I have a huge cluster at my disposal. The data for each of 1199 tasks is around 40MB/30k records and for 1 never ending task is 1.5G/98million records. I see that there is data skew among tasks. I had observed this a week earlier and i have no clue on how to fix it and when someone suggested that repartition might make things more parallel, but the problem is still persistent. Please suggest on how to get the task to complete. All i want to do is join two datasets. (dataset1 is in sequence file and dataset2 is in avro format). Ex: Tasks IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch Time DurationGC TimeShuffle Read Size / RecordsShuffle Spill (Memory)Shuffle Spill (Disk)Errors 0 3771 0 RUNNING PROCESS_LOCAL 114 / host1 2015/05/04 01:27:44 7.3 min 19 s 1591.2 MB / 98931767 0.0 B 0.0 B 1 3772 0 SUCCESS PROCESS_LOCAL 226 / host2 2015/05/04 01:27:44 28 s 2 s 39.2 MB / 29754 0.0 B 0.0 B 2 3773 0 SUCCESS PROCESS_LOCAL 283 / host3 2015/05/04 01:27:44 26 s 2 s 39.0 MB / 29646 0.0 B 0.0 B 5 3776 0 SUCCESS PROCESS_LOCAL 320 / host4 2015/05/04 01:27:44 31 s 3 s 38.8 MB / 29512 0.0 B 0.0 B 4 3775 0 SUCCESS PROCESS_LOCAL 203 / host5 2015/05/04 01:27:44 41 s 3 s 38.4 MB / 29169 0.0 B 0.0 B 3 3774 0 SUCCESS PROCESS_LOCAL 84 / host6 2015/05/04 01:27:44 24 s 2 s 38.5 MB / 29258 0.0 B 0.0 B 8 3779 0 SUCCESS PROCESS_LOCAL 309 / host7 2015/05/04 01:27:44 31 s 4 s 39.5 MB / 30008 0.0 B 0.0 B There are 1200 tasks in total. On Sun, May 3, 2015 at 9:53 PM, Dean Wampler deanwamp...@gmail.com wrote: I don't know the full context of what you're doing, but serialization errors usually mean you're attempting to serialize something that can't be serialized, like the SparkContext. Kryo won't help there. The arguments to spark-submit you posted previously look good: 2) --num-executors 96 --driver-memory 12g --driver-java-options -XX:MaxPermSize=10G --executor-memory 12g --executor-cores 4 I suspect you aren't getting the parallelism you need. For partitioning, if your data is in HDFS and your block size is 128MB, then you'll get ~195 partitions anyway. If it takes 7 hours to do a join over 25GB of data, you have some other serious bottleneck. You should examine the web console and the logs to determine where all the time is going. Questions you might pursue: - How long does each task take to complete? - How many of those 195 partitions/tasks are processed at the same time? That is, how many slots are available? Maybe you need more nodes if the number of slots is too low. Based on your command arguments, you should be able to process 1/2 of them at a time, unless the cluster is busy. - Is the cluster swamped with other work? - How much data does each task process? Is the data roughly the same from one task to the next? If not, then you
Re: Spark - Timeout Issues - OutOfMemoryError
Four tasks are now failing with IndexIDAttemptStatus ▾Locality LevelExecutor ID / HostLaunch TimeDurationGC TimeShuffle Read Size / RecordsShuffle Spill (Memory)Shuffle Spill (Disk) Errors 0 3771 0 FAILED PROCESS_LOCAL 114 / host1 2015/05/04 01:27:44 / ExecutorLostFailure (executor 114 lost) 1007 4973 1 FAILED PROCESS_LOCAL 420 / host2 2015/05/04 02:13:14 / FetchFailed(null, shuffleId=1, mapId=-1, reduceId=1007, message= +details FetchFailed(null, shuffleId=1, mapId=-1, reduceId=1007, message= org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 1 at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:385) at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:382) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:381) at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:177) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42) at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:137) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:127) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:127) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) ) 371 4972 1 FAILED PROCESS_LOCAL 563 / host3 2015/05/04 02:13:14 / FetchFailed(null, shuffleId=1, mapId=-1, reduceId=371, message= +details FetchFailed(null, shuffleId=1, mapId=-1, reduceId=371, message= org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 1 at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:385) at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:382) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at
Re: Spark - Timeout Issues - OutOfMemoryError
From the symptoms you mentioned that one task's shuffle write is much larger than all the other task, it is quite similar to normal data skew behavior, I just give some advice based on your descriptions, I think you need to detect whether data is actually skewed or not. The shuffle will put data with same partitioner strategy (default is hash partitioner) into one task, so the same key data will be put into the same task, but one task not just has only one key. 2015-05-04 18:04 GMT+08:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com: Attached image shows the Spark UI for the job. On Mon, May 4, 2015 at 3:28 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Four tasks are now failing with IndexIDAttemptStatus ▾Locality LevelExecutor ID / HostLaunch TimeDurationGC TimeShuffle Read Size / RecordsShuffle Spill (Memory)Shuffle Spill (Disk) Errors 0 3771 0 FAILED PROCESS_LOCAL 114 / host1 2015/05/04 01:27:44 / ExecutorLostFailure (executor 114 lost) 1007 4973 1 FAILED PROCESS_LOCAL 420 / host2 2015/05/04 02:13:14 / FetchFailed(null, shuffleId=1, mapId=-1, reduceId=1007, message= +details FetchFailed(null, shuffleId=1, mapId=-1, reduceId=1007, message= org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 1 at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:385) at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:382) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:381) at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:177) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42) at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:137) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:127) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:127) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) ) 371 4972 1 FAILED PROCESS_LOCAL 563 / host3 2015/05/04 02:13:14 / FetchFailed(null, shuffleId=1, mapId=-1, reduceId=371, message= +details FetchFailed(null, shuffleId=1, mapId=-1, reduceId=371, message= org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 1 at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:385) at
Re: Spark - Timeout Issues - OutOfMemoryError
I ran it against one file instead of 10 files and i see one task is still running after 33 mins its shuffle read size is 780MB/50 mil records. I did a count of records for each itemId from dataset-2 [One FILE] (Second Dataset (RDDPair) val viEvents = viEventsRaw.map { vi = (vi.get(14 ).asInstanceOf[Long], vi) } ). This is the dataset that contains the list of items viewed by user in one day. *Item IdCount* 201335783004 537 111654496030 353 141640703798 287 191568402102 258 111654479898 217 231521843148 211 251931716094 201 111654493548 181 181503913062 181 121635453050 152 261798565828 140 151494617682 139 251927181728 127 231516683056 119 141640492864 117 161677270656 117 171771073616 113 111649942124 109 191516989450 97 231539161292 94 221555628408 88 131497785968 87 121632233872 84 131335379184 83 281531363490 83 131492727742 79 231174157820 79 161666914810 77 251699753072 77 161683664300 76 I was assuming that data-skew would be if the top item(201335783004) had a count of 1 million, however its only few hundreds, then why is Spark skewing it in join ? What should i do that Spark distributes the records evenly ? In M/R we can change the Partitioner between mapper and reducer, how can i do in Spark for Join? IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch TimeDurationGC TimeShuffle Read Size / Records ▴Shuffle Spill (Memory)Shuffle Spill (Disk) Errors 0 3618 0 RUNNING PROCESS_LOCAL 4 / host12015/05/04 05:09:53 33 min 8.5 min 783.9 MB / 50,761,322 4.6 GB 47.5 MB 433 4051 0 SUCCESS PROCESS_LOCAL 1 / host2 2015/05/04 05:16:27 1.1 min 20 s 116.0 MB / 4505143 1282.3 MB 10.1 MB 218 3836 0 SUCCESS PROCESS_LOCAL 3 / host3 2015/05/04 05:13:01 53 s 11 s 76.4 MB / 2865143 879.6 MB 6.9 MB 113 3731 0 SUCCESS PROCESS_LOCAL 2 / host4 2015/05/04 05:11:30 31 s 8 s 6.9 MB / 5187 0.0 B 0.0 B On Mon, May 4, 2015 at 6:00 PM, Saisai Shao sai.sai.s...@gmail.com wrote: From the symptoms you mentioned that one task's shuffle write is much larger than all the other task, it is quite similar to normal data skew behavior, I just give some advice based on your descriptions, I think you need to detect whether data is actually skewed or not. The shuffle will put data with same partitioner strategy (default is hash partitioner) into one task, so the same key data will be put into the same task, but one task not just has only one key. 2015-05-04 18:04 GMT+08:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com: Attached image shows the Spark UI for the job. On Mon, May 4, 2015 at 3:28 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Four tasks are now failing with IndexIDAttemptStatus ▾Locality LevelExecutor ID / HostLaunch Time DurationGC TimeShuffle Read Size / RecordsShuffle Spill (Memory)Shuffle Spill (Disk)Errors 0 3771 0 FAILED PROCESS_LOCAL 114 / host1 2015/05/04 01:27:44 / ExecutorLostFailure (executor 114 lost) 1007 4973 1 FAILED PROCESS_LOCAL 420 / host2 2015/05/04 02:13:14 / FetchFailed(null, shuffleId=1, mapId=-1, reduceId=1007, message= +details FetchFailed(null, shuffleId=1, mapId=-1, reduceId=1007, message= org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 1 at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:385) at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:382) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:381) at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:177) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42) at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:137) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:127) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at
Re: Spark - Timeout Issues - OutOfMemoryError
I tried this val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary, Long))] = lstgItem.join(viEvents, new org.apache.spark.RangePartitioner(partitions = 1200, rdd = viEvents)).map { It fired two jobs and still i have 1 task that never completes. IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch TimeDurationGC TimeShuffle Read Size / Records ▴Shuffle Spill (Memory)Shuffle Spill (Disk) Errors 0 4818 0 RUNNING PROCESS_LOCAL 5 / host1 2015/05/04 07:24:25 1.1 h 13 min 778.0 MB / 50314161 4.5 GB 47.4 MB 955 5773 0 SUCCESS PROCESS_LOCAL 5 / host2 2015/05/04 07:47:16 2.2 min 1.5 min 106.3 MB / 4197539 0.0 B 0.0 B 1199 6017 0 SUCCESS PROCESS_LOCAL 3 / host3 2015/05/04 07:51:51 48 s 2 s 94.2 MB / 3618335 2.8 GB 8.6 MB 216 2) I tried reversing the datasets in join val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary, Long))] =viEvents.join(lstgItem) This led to same problem of a long running task. 3) Next, i am trying this val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary, Long))] = lstgItem.join(viEvents, 1200).map { I have exhausted all my options. Regards, Deepak On Mon, May 4, 2015 at 6:24 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I ran it against one file instead of 10 files and i see one task is still running after 33 mins its shuffle read size is 780MB/50 mil records. I did a count of records for each itemId from dataset-2 [One FILE] (Second Dataset (RDDPair) val viEvents = viEventsRaw.map { vi = (vi.get(14 ).asInstanceOf[Long], vi) } ). This is the dataset that contains the list of items viewed by user in one day. *Item IdCount* 201335783004 537 111654496030 353 141640703798 287 191568402102 258 111654479898 217 231521843148 211 251931716094 201 111654493548 181 181503913062 181 121635453050 152 261798565828 140 151494617682 139 251927181728 127 231516683056 119 141640492864 117 161677270656 117 171771073616 113 111649942124 109 191516989450 97 231539161292 94 221555628408 88 131497785968 87 121632233872 84 131335379184 83 281531363490 83 131492727742 79 231174157820 79 161666914810 77 251699753072 77 161683664300 76 I was assuming that data-skew would be if the top item(201335783004) had a count of 1 million, however its only few hundreds, then why is Spark skewing it in join ? What should i do that Spark distributes the records evenly ? In M/R we can change the Partitioner between mapper and reducer, how can i do in Spark for Join? IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch TimeDurationGC TimeShuffle Read Size / Records ▴Shuffle Spill (Memory)Shuffle Spill (Disk)Errors 0 3618 0 RUNNING PROCESS_LOCAL 4 / host12015/05/04 05:09:53 33 min 8.5 min 783.9 MB / 50,761,322 4.6 GB 47.5 MB 433 4051 0 SUCCESS PROCESS_LOCAL 1 / host2 2015/05/04 05:16:27 1.1 min 20 s 116.0 MB / 4505143 1282.3 MB 10.1 MB 218 3836 0 SUCCESS PROCESS_LOCAL 3 / host3 2015/05/04 05:13:01 53 s 11 s 76.4 MB / 2865143 879.6 MB 6.9 MB 113 3731 0 SUCCESS PROCESS_LOCAL 2 / host4 2015/05/04 05:11:30 31 s 8 s 6.9 MB / 5187 0.0 B 0.0 B On Mon, May 4, 2015 at 6:00 PM, Saisai Shao sai.sai.s...@gmail.com wrote: From the symptoms you mentioned that one task's shuffle write is much larger than all the other task, it is quite similar to normal data skew behavior, I just give some advice based on your descriptions, I think you need to detect whether data is actually skewed or not. The shuffle will put data with same partitioner strategy (default is hash partitioner) into one task, so the same key data will be put into the same task, but one task not just has only one key. 2015-05-04 18:04 GMT+08:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com: Attached image shows the Spark UI for the job. On Mon, May 4, 2015 at 3:28 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Four tasks are now failing with IndexIDAttemptStatus ▾Locality LevelExecutor ID / HostLaunch Time DurationGC TimeShuffle Read Size / RecordsShuffle Spill (Memory)Shuffle Spill (Disk)Errors 0 3771 0 FAILED PROCESS_LOCAL 114 / host1 2015/05/04 01:27:44 / ExecutorLostFailure (executor 114 lost) 1007 4973 1 FAILED PROCESS_LOCAL 420 / host2 2015/05/04 02:13:14 / FetchFailed(null, shuffleId=1, mapId=-1, reduceId=1007, message= +details FetchFailed(null, shuffleId=1, mapId=-1, reduceId=1007, message= org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 1 at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:385) at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:382) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at
Re: Spark - Timeout Issues - OutOfMemoryError
Data Set 1 : viEvents : Is the event activity data of 1 day. I took 10 files out of it and 10 records *Item ID Count* 201335783004 3419 191568402102 1793 111654479898 1362 181503913062 1310 261798565828 1028 111654493548 994 231516683056 862 131497785968 746 161666914810 633 221749455474 432 201324502754 410 201334042634 402 191562605592 380 271841178238 362 161663339210 344 251615941886 313 261855748678 309 271821726658 255 111657099518 224 261868369938 218 181725710132 216 171766164072 215 221757076934 213 171763906872 212 111650132368 206 181629904282 204 261867932788 198 161668475280 194 191398227282 194 Data set 2: ItemID Count 2217305702 1 3842604614 1 4463421160 1 4581260446 1 4632783223 1 4645316947 1 4760829454 1 4786989430 1 5530758430 1 5610056107 1 5661929425 1 5953801612 1 6141607456 1 6197204992 1 6220704442 1 6271022614 1 6282402871 1 6525123621 1 6554834772 1 6566297541 1 This data set will always have only one element for each item as it contains metadata information. Given the nature of these two datasets, if at all there is skewness then it must be with dataset1. In dataset1 the top 20-30 records do not have record count for a given itemID (shuffle key) greater than 3000 and that is very small. Why am i still *not* able to do a join of these two datasets given i have unlimited capacity, repartitioning but 12G memory limit on each node. Each time i get a task that runs forever and it process roughly around 1.5G data when others are processing few MBs. Also 1.5G data (shuffle read size) is very small. Please suggest. On Mon, May 4, 2015 at 9:08 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I tried this val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary, Long))] = lstgItem.join(viEvents, new org.apache.spark.RangePartitioner(partitions = 1200, rdd = viEvents)).map { It fired two jobs and still i have 1 task that never completes. IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch TimeDurationGC TimeShuffle Read Size / Records ▴Shuffle Spill (Memory)Shuffle Spill (Disk)Errors 0 4818 0 RUNNING PROCESS_LOCAL 5 / host1 2015/05/04 07:24:25 1.1 h 13 min 778.0 MB / 50314161 4.5 GB 47.4 MB 955 5773 0 SUCCESS PROCESS_LOCAL 5 / host2 2015/05/04 07:47:16 2.2 min 1.5 min 106.3 MB / 4197539 0.0 B 0.0 B 1199 6017 0 SUCCESS PROCESS_LOCAL 3 / host3 2015/05/04 07:51:51 48 s 2 s 94.2 MB / 3618335 2.8 GB 8.6 MB 216 2) I tried reversing the datasets in join val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary, Long))] =viEvents.join(lstgItem) This led to same problem of a long running task. 3) Next, i am trying this val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary, Long))] = lstgItem.join(viEvents, 1200).map { I have exhausted all my options. Regards, Deepak On Mon, May 4, 2015 at 6:24 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I ran it against one file instead of 10 files and i see one task is still running after 33 mins its shuffle read size is 780MB/50 mil records. I did a count of records for each itemId from dataset-2 [One FILE] (Second Dataset (RDDPair) val viEvents = viEventsRaw.map { vi = (vi.get(14 ).asInstanceOf[Long], vi) } ). This is the dataset that contains the list of items viewed by user in one day. *Item IdCount* 201335783004 537 111654496030 353 141640703798 287 191568402102 258 111654479898 217 231521843148 211 251931716094 201 111654493548 181 181503913062 181 121635453050 152 261798565828 140 151494617682 139 251927181728 127 231516683056 119 141640492864 117 161677270656 117 171771073616 113 111649942124 109 191516989450 97 231539161292 94 221555628408 88 131497785968 87 121632233872 84 131335379184 83 281531363490 83 131492727742 79 231174157820 79 161666914810 77 251699753072 77 161683664300 76 I was assuming that data-skew would be if the top item(201335783004) had a count of 1 million, however its only few hundreds, then why is Spark skewing it in join ? What should i do that Spark distributes the records evenly ? In M/R we can change the Partitioner between mapper and reducer, how can i do in Spark for Join? IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch TimeDurationGC TimeShuffle Read Size / Records ▴Shuffle Spill (Memory)Shuffle Spill (Disk)Errors 0 3618 0 RUNNING PROCESS_LOCAL 4 / host12015/05/04 05:09:53 33 min 8.5 min 783.9 MB / 50,761,322 4.6 GB 47.5 MB 433 4051 0 SUCCESS PROCESS_LOCAL 1 / host2 2015/05/04 05:16:27 1.1 min 20 s 116.0 MB / 4505143 1282.3 MB 10.1 MB 218 3836 0 SUCCESS PROCESS_LOCAL 3 / host3 2015/05/04 05:13:01 53 s 11 s 76.4 MB / 2865143 879.6 MB 6.9 MB 113 3731 0 SUCCESS PROCESS_LOCAL 2 / host4 2015/05/04 05:11:30 31 s 8 s 6.9 MB / 5187 0.0 B 0.0 B On Mon, May 4, 2015 at 6:00 PM, Saisai Shao sai.sai.s...@gmail.com wrote: From the symptoms you mentioned that one
Re: Spark - Timeout Issues - OutOfMemoryError
I don't know the full context of what you're doing, but serialization errors usually mean you're attempting to serialize something that can't be serialized, like the SparkContext. Kryo won't help there. The arguments to spark-submit you posted previously look good: 2) --num-executors 96 --driver-memory 12g --driver-java-options -XX:MaxPermSize=10G --executor-memory 12g --executor-cores 4 I suspect you aren't getting the parallelism you need. For partitioning, if your data is in HDFS and your block size is 128MB, then you'll get ~195 partitions anyway. If it takes 7 hours to do a join over 25GB of data, you have some other serious bottleneck. You should examine the web console and the logs to determine where all the time is going. Questions you might pursue: - How long does each task take to complete? - How many of those 195 partitions/tasks are processed at the same time? That is, how many slots are available? Maybe you need more nodes if the number of slots is too low. Based on your command arguments, you should be able to process 1/2 of them at a time, unless the cluster is busy. - Is the cluster swamped with other work? - How much data does each task process? Is the data roughly the same from one task to the next? If not, then you might have serious key skew? You may also need to research the details of how joins are implemented and some of the common tricks for organizing data to minimize having to shuffle all N by M records. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Sun, May 3, 2015 at 11:02 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Hello Deam, If I don;t use Kryo serializer i got Serialization error and hence am using it. If I don';t use partitionBy/reparition then the simply join never completed even after 7 hours and infact as next step i need to run it against 250G as that is my full dataset size. Someone here suggested to me to use repartition. Assuming reparition is mandatory , how do i decide whats the right number ? When i am using 400 i do not get NullPointerException that i talked about, which is strange. I never saw that exception against small random dataset but see it with 25G and again with 400 partitions , i do not see it. On Sun, May 3, 2015 at 9:15 PM, Dean Wampler deanwamp...@gmail.com wrote: IMHO, you are trying waaay to hard to optimize work on what is really a small data set. 25G, even 250G, is not that much data, especially if you've spent a month trying to get something to work that should be simple. All these errors are from optimization attempts. Kryo is great, but if it's not working reliably for some reason, then don't use it. Rather than force 200 partitions, let Spark try to figure out a good-enough number. (If you really need to force a partition count, use the repartition method instead, unless you're overriding the partitioner.) So. I recommend that you eliminate all the optimizations: Kryo, partitionBy, etc. Just use the simplest code you can. Make it work first. Then, if it really isn't fast enough, look for actual evidence of bottlenecks and optimize those. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Sun, May 3, 2015 at 10:22 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Hello Dean Others, Thanks for your suggestions. I have two data sets and all i want to do is a simple equi join. I have 10G limit and as my dataset_1 exceeded that it was throwing OOM error. Hence i switched back to use .join() API instead of map-side broadcast join. I am repartitioning the data with 100,200 and i see a NullPointerException now. When i run against 25G of each input and with .partitionBy(new org.apache.spark.HashPartitioner(200)) , I see NullPointerExveption this trace does not include a line from my code and hence i do not what is causing error ? I do have registered kryo serializer. val conf = new SparkConf() .setAppName(detail) * .set(spark.serializer, org.apache.spark.serializer.KryoSerializer)* .set(spark.kryoserializer.buffer.mb, arguments.get(buffersize).get) .set(spark.kryoserializer.buffer.max.mb, arguments.get(maxbuffersize).get) .set(spark.driver.maxResultSize, arguments.get(maxResultSize).get) .set(spark.yarn.maxAppAttempts, 0) * .registerKryoClasses(Array(classOf[com.ebay.ep.poc.spark.reporting.process.model.dw.SpsLeve* lMetricSum])) val sc = new SparkContext(conf) I see the exception when this task runs val viEvents = details.map { vi = (vi.get(14).asInstanceOf[Long], vi) } Its a simple mapping of input records to (itemId, record) I found this
Re: Spark - Timeout Issues - OutOfMemoryError
How big is the data you're returning to the driver with collectAsMap? You are probably running out of memory trying to copy too much data back to it. If you're trying to force a map-side join, Spark can do that for you in some cases within the regular DataFrame/RDD context. See http://spark.apache.org/docs/latest/sql-programming-guide.html#performance-tuning and this talk by Michael Armbrust for example, http://spark-summit.org/wp-content/uploads/2014/07/Performing-Advanced-Analytics-on-Relational-Data-with-Spark-SQL-Michael-Armbrust.pdf. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Thu, Apr 30, 2015 at 12:40 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Full Exception *15/04/30 09:59:49 INFO scheduler.DAGScheduler: Stage 1 (collectAsMap at VISummaryDataProvider.scala:37) failed in 884.087 s* *15/04/30 09:59:49 INFO scheduler.DAGScheduler: Job 0 failed: collectAsMap at VISummaryDataProvider.scala:37, took 1093.418249 s* 15/04/30 09:59:49 ERROR yarn.ApplicationMaster: User class threw exception: Job aborted due to stage failure: Exception while getting task result: org.apache.spark.SparkException: Error sending message [message = GetLocations(taskresult_112)] org.apache.spark.SparkException: Job aborted due to stage failure: Exception while getting task result: org.apache.spark.SparkException: Error sending message [message = GetLocations(taskresult_112)] at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 15/04/30 09:59:49 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: Job aborted due to stage failure: Exception while getting task result: org.apache.spark.SparkException: Error sending message [message = GetLocations(taskresult_112)]) *Code at line 37* val lstgItemMap = listings.map { lstg = (lstg.getItemId().toLong, lstg) } .collectAsMap Listing data set size is 26G (10 files) and my driver memory is 12G (I cant go beyond it). The reason i do collectAsMap is to brodcast it and do a map-side join instead of regular join. Please suggest ? On Thu, Apr 30, 2015 at 10:52 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: My Spark Job is failing and i see == 15/04/30 09:59:49 ERROR yarn.ApplicationMaster: User class threw exception: Job aborted due to stage failure: Exception while getting task result: org.apache.spark.SparkException: Error sending message [message = GetLocations(taskresult_112)] org.apache.spark.SparkException: Job aborted due to stage failure: Exception while getting task result: org.apache.spark.SparkException: Error sending message [message = GetLocations(taskresult_112)] at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] I see
Re: Spark - Timeout Issues - OutOfMemoryError
IMHO, you are trying waaay to hard to optimize work on what is really a small data set. 25G, even 250G, is not that much data, especially if you've spent a month trying to get something to work that should be simple. All these errors are from optimization attempts. Kryo is great, but if it's not working reliably for some reason, then don't use it. Rather than force 200 partitions, let Spark try to figure out a good-enough number. (If you really need to force a partition count, use the repartition method instead, unless you're overriding the partitioner.) So. I recommend that you eliminate all the optimizations: Kryo, partitionBy, etc. Just use the simplest code you can. Make it work first. Then, if it really isn't fast enough, look for actual evidence of bottlenecks and optimize those. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Sun, May 3, 2015 at 10:22 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Hello Dean Others, Thanks for your suggestions. I have two data sets and all i want to do is a simple equi join. I have 10G limit and as my dataset_1 exceeded that it was throwing OOM error. Hence i switched back to use .join() API instead of map-side broadcast join. I am repartitioning the data with 100,200 and i see a NullPointerException now. When i run against 25G of each input and with .partitionBy(new org.apache.spark.HashPartitioner(200)) , I see NullPointerExveption this trace does not include a line from my code and hence i do not what is causing error ? I do have registered kryo serializer. val conf = new SparkConf() .setAppName(detail) * .set(spark.serializer, org.apache.spark.serializer.KryoSerializer)* .set(spark.kryoserializer.buffer.mb, arguments.get(buffersize).get) .set(spark.kryoserializer.buffer.max.mb, arguments.get(maxbuffersize).get) .set(spark.driver.maxResultSize, arguments.get(maxResultSize).get) .set(spark.yarn.maxAppAttempts, 0) * .registerKryoClasses(Array(classOf[com.ebay.ep.poc.spark.reporting.process.model.dw.SpsLeve* lMetricSum])) val sc = new SparkContext(conf) I see the exception when this task runs val viEvents = details.map { vi = (vi.get(14).asInstanceOf[Long], vi) } Its a simple mapping of input records to (itemId, record) I found this http://stackoverflow.com/questions/23962796/kryo-readobject-cause-nullpointerexception-with-arraylist and http://apache-spark-user-list.1001560.n3.nabble.com/Kryo-NPE-with-Array-td19797.html Looks like Kryo (2.21v) changed something to stop using default constructors. (Kryo.DefaultInstantiatorStrategy) kryo.getInstantiatorStrategy()).setFallbackInstantiatorStrategy(new StdInstantiatorStrategy()); Please suggest Trace: 15/05/01 03:02:15 ERROR executor.Executor: Exception in task 110.1 in stage 2.0 (TID 774) com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException Serialization trace: values (org.apache.avro.generic.GenericData$Record) datum (org.apache.avro.mapred.AvroKey) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:41) at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) Regards, Any suggestions. I am not able to get this thing to work over a month now, its kind of getting frustrating. On Sun, May 3, 2015 at 8:03 PM, Dean Wampler deanwamp...@gmail.com wrote: How big is the data you're returning to the driver with collectAsMap? You are probably running out of memory trying to copy too much data back to it. If you're trying to force a map-side join, Spark can do that for you in some cases within the regular DataFrame/RDD context. See http://spark.apache.org/docs/latest/sql-programming-guide.html#performance-tuning and this talk by Michael Armbrust for example, http://spark-summit.org/wp-content/uploads/2014/07/Performing-Advanced-Analytics-on-Relational-Data-with-Spark-SQL-Michael-Armbrust.pdf. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Thu, Apr 30, 2015 at 12:40 PM, ÐΞ€ρ@Ҝ
Re: Spark - Timeout Issues - OutOfMemoryError
Hello Dean Others, Thanks for your suggestions. I have two data sets and all i want to do is a simple equi join. I have 10G limit and as my dataset_1 exceeded that it was throwing OOM error. Hence i switched back to use .join() API instead of map-side broadcast join. I am repartitioning the data with 100,200 and i see a NullPointerException now. When i run against 25G of each input and with .partitionBy(new org.apache.spark.HashPartitioner(200)) , I see NullPointerExveption this trace does not include a line from my code and hence i do not what is causing error ? I do have registered kryo serializer. val conf = new SparkConf() .setAppName(detail) * .set(spark.serializer, org.apache.spark.serializer.KryoSerializer)* .set(spark.kryoserializer.buffer.mb, arguments.get(buffersize).get) .set(spark.kryoserializer.buffer.max.mb, arguments.get(maxbuffersize).get) .set(spark.driver.maxResultSize, arguments.get(maxResultSize).get) .set(spark.yarn.maxAppAttempts, 0) * .registerKryoClasses(Array(classOf[com.ebay.ep.poc.spark.reporting.process.model.dw.SpsLeve* lMetricSum])) val sc = new SparkContext(conf) I see the exception when this task runs val viEvents = details.map { vi = (vi.get(14).asInstanceOf[Long], vi) } Its a simple mapping of input records to (itemId, record) I found this http://stackoverflow.com/questions/23962796/kryo-readobject-cause-nullpointerexception-with-arraylist and http://apache-spark-user-list.1001560.n3.nabble.com/Kryo-NPE-with-Array-td19797.html Looks like Kryo (2.21v) changed something to stop using default constructors. (Kryo.DefaultInstantiatorStrategy) kryo.getInstantiatorStrategy()).setFallbackInstantiatorStrategy(new StdInstantiatorStrategy()); Please suggest Trace: 15/05/01 03:02:15 ERROR executor.Executor: Exception in task 110.1 in stage 2.0 (TID 774) com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException Serialization trace: values (org.apache.avro.generic.GenericData$Record) datum (org.apache.avro.mapred.AvroKey) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:41) at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) Regards, Any suggestions. I am not able to get this thing to work over a month now, its kind of getting frustrating. On Sun, May 3, 2015 at 8:03 PM, Dean Wampler deanwamp...@gmail.com wrote: How big is the data you're returning to the driver with collectAsMap? You are probably running out of memory trying to copy too much data back to it. If you're trying to force a map-side join, Spark can do that for you in some cases within the regular DataFrame/RDD context. See http://spark.apache.org/docs/latest/sql-programming-guide.html#performance-tuning and this talk by Michael Armbrust for example, http://spark-summit.org/wp-content/uploads/2014/07/Performing-Advanced-Analytics-on-Relational-Data-with-Spark-SQL-Michael-Armbrust.pdf. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Thu, Apr 30, 2015 at 12:40 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Full Exception *15/04/30 09:59:49 INFO scheduler.DAGScheduler: Stage 1 (collectAsMap at VISummaryDataProvider.scala:37) failed in 884.087 s* *15/04/30 09:59:49 INFO scheduler.DAGScheduler: Job 0 failed: collectAsMap at VISummaryDataProvider.scala:37, took 1093.418249 s* 15/04/30 09:59:49 ERROR yarn.ApplicationMaster: User class threw exception: Job aborted due to stage failure: Exception while getting task result: org.apache.spark.SparkException: Error sending message [message = GetLocations(taskresult_112)] org.apache.spark.SparkException: Job aborted due to stage failure: Exception while getting task result: org.apache.spark.SparkException: Error sending message [message = GetLocations(taskresult_112)] at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) at
Re: Spark - Timeout Issues - OutOfMemoryError
Hello Deam, If I don;t use Kryo serializer i got Serialization error and hence am using it. If I don';t use partitionBy/reparition then the simply join never completed even after 7 hours and infact as next step i need to run it against 250G as that is my full dataset size. Someone here suggested to me to use repartition. Assuming reparition is mandatory , how do i decide whats the right number ? When i am using 400 i do not get NullPointerException that i talked about, which is strange. I never saw that exception against small random dataset but see it with 25G and again with 400 partitions , i do not see it. On Sun, May 3, 2015 at 9:15 PM, Dean Wampler deanwamp...@gmail.com wrote: IMHO, you are trying waaay to hard to optimize work on what is really a small data set. 25G, even 250G, is not that much data, especially if you've spent a month trying to get something to work that should be simple. All these errors are from optimization attempts. Kryo is great, but if it's not working reliably for some reason, then don't use it. Rather than force 200 partitions, let Spark try to figure out a good-enough number. (If you really need to force a partition count, use the repartition method instead, unless you're overriding the partitioner.) So. I recommend that you eliminate all the optimizations: Kryo, partitionBy, etc. Just use the simplest code you can. Make it work first. Then, if it really isn't fast enough, look for actual evidence of bottlenecks and optimize those. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Sun, May 3, 2015 at 10:22 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Hello Dean Others, Thanks for your suggestions. I have two data sets and all i want to do is a simple equi join. I have 10G limit and as my dataset_1 exceeded that it was throwing OOM error. Hence i switched back to use .join() API instead of map-side broadcast join. I am repartitioning the data with 100,200 and i see a NullPointerException now. When i run against 25G of each input and with .partitionBy(new org.apache.spark.HashPartitioner(200)) , I see NullPointerExveption this trace does not include a line from my code and hence i do not what is causing error ? I do have registered kryo serializer. val conf = new SparkConf() .setAppName(detail) * .set(spark.serializer, org.apache.spark.serializer.KryoSerializer)* .set(spark.kryoserializer.buffer.mb, arguments.get(buffersize).get) .set(spark.kryoserializer.buffer.max.mb, arguments.get(maxbuffersize).get) .set(spark.driver.maxResultSize, arguments.get(maxResultSize).get) .set(spark.yarn.maxAppAttempts, 0) * .registerKryoClasses(Array(classOf[com.ebay.ep.poc.spark.reporting.process.model.dw.SpsLeve* lMetricSum])) val sc = new SparkContext(conf) I see the exception when this task runs val viEvents = details.map { vi = (vi.get(14).asInstanceOf[Long], vi) } Its a simple mapping of input records to (itemId, record) I found this http://stackoverflow.com/questions/23962796/kryo-readobject-cause-nullpointerexception-with-arraylist and http://apache-spark-user-list.1001560.n3.nabble.com/Kryo-NPE-with-Array-td19797.html Looks like Kryo (2.21v) changed something to stop using default constructors. (Kryo.DefaultInstantiatorStrategy) kryo.getInstantiatorStrategy()).setFallbackInstantiatorStrategy(new StdInstantiatorStrategy()); Please suggest Trace: 15/05/01 03:02:15 ERROR executor.Executor: Exception in task 110.1 in stage 2.0 (TID 774) com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException Serialization trace: values (org.apache.avro.generic.GenericData$Record) datum (org.apache.avro.mapred.AvroKey) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:41) at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) Regards, Any suggestions. I am not able to get this thing to work over a month now, its kind of getting frustrating. On Sun, May 3, 2015 at 8:03 PM, Dean Wampler deanwamp...@gmail.com wrote: How big is the data you're returning to the driver with collectAsMap? You are probably running out of memory trying to
Re: Spark - Timeout Issues - OutOfMemoryError
You could try repartitioning your listings RDD, also doing a collectAsMap would basically bring all your data to driver, in that case you might want to set the storage level as Memory and disk not sure that will do any help on the driver though. Thanks Best Regards On Thu, Apr 30, 2015 at 11:10 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Full Exception *15/04/30 09:59:49 INFO scheduler.DAGScheduler: Stage 1 (collectAsMap at VISummaryDataProvider.scala:37) failed in 884.087 s* *15/04/30 09:59:49 INFO scheduler.DAGScheduler: Job 0 failed: collectAsMap at VISummaryDataProvider.scala:37, took 1093.418249 s* 15/04/30 09:59:49 ERROR yarn.ApplicationMaster: User class threw exception: Job aborted due to stage failure: Exception while getting task result: org.apache.spark.SparkException: Error sending message [message = GetLocations(taskresult_112)] org.apache.spark.SparkException: Job aborted due to stage failure: Exception while getting task result: org.apache.spark.SparkException: Error sending message [message = GetLocations(taskresult_112)] at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 15/04/30 09:59:49 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: Job aborted due to stage failure: Exception while getting task result: org.apache.spark.SparkException: Error sending message [message = GetLocations(taskresult_112)]) *Code at line 37* val lstgItemMap = listings.map { lstg = (lstg.getItemId().toLong, lstg) } .collectAsMap Listing data set size is 26G (10 files) and my driver memory is 12G (I cant go beyond it). The reason i do collectAsMap is to brodcast it and do a map-side join instead of regular join. Please suggest ? On Thu, Apr 30, 2015 at 10:52 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: My Spark Job is failing and i see == 15/04/30 09:59:49 ERROR yarn.ApplicationMaster: User class threw exception: Job aborted due to stage failure: Exception while getting task result: org.apache.spark.SparkException: Error sending message [message = GetLocations(taskresult_112)] org.apache.spark.SparkException: Job aborted due to stage failure: Exception while getting task result: org.apache.spark.SparkException: Error sending message [message = GetLocations(taskresult_112)] at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] I see multiple of these Caused by: java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] And finally i see this java.lang.OutOfMemoryError: Java heap space at java.nio.HeapByteBuffer.init(HeapByteBuffer.java:57) at java.nio.ByteBuffer.allocate(ByteBuffer.java:331) at org.apache.spark.network.BlockTransferService$$anon$1.onBlockFetchSuccess(BlockTransferService.scala:95) at
Re: Spark - Timeout Issues - OutOfMemoryError
You could try increasing your heap space explicitly. like export _JAVA_OPTIONS=-Xmx10g, its not the correct approach but try. Thanks Best Regards On Tue, Apr 28, 2015 at 10:35 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I have a SparkApp that runs completes in 45 mins for 5 files (5*750MB size) and it takes 16 executors to do so. I wanted to run it against 10 files of each input type (10*3 files as there are three inputs that are transformed). [Input1 = 10*750 MB, Input2=10*2.5GB, Input3 = 10*1.5G], Hence i used 32 executors. I see multiple 5/04/28 09:23:31 WARN executor.Executor: Issue communicating with driver in heartbeater org.apache.spark.SparkException: Error sending message [message = Heartbeat(22,[Lscala.Tuple2;@2e4c404a,BlockManagerId(22, phxaishdc9dn1048.stratus.phx.ebay.com, 39505))] at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:209) at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427) Caused by: java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195) ... 1 more When i searched deeper, i found OOM error. 15/04/28 09:10:15 INFO storage.BlockManagerMasterActor: Removing block manager BlockManagerId(17, phxdpehdc9dn2643.stratus.phx.ebay.com, 36819) 15/04/28 09:11:26 WARN storage.BlockManagerMasterActor: Removing BlockManager BlockManagerId(9, phxaishdc9dn1783.stratus.phx.ebay.com, 48304) with no recent heart beats: 121200ms exceeds 12ms 15/04/28 09:11:26 INFO storage.BlockManagerMasterActor: Removing block manager BlockManagerId(9, phxaishdc9dn1783.stratus.phx.ebay.com, 48304) 15/04/28 09:11:26 ERROR util.Utils: Uncaught exception in thread task-result-getter-3 java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:2245) at java.util.Arrays.copyOf(Arrays.java:2219) at java.util.ArrayList.grow(ArrayList.java:242) at java.util.ArrayList.ensureExplicitCapacity(ArrayList.java:216) at java.util.ArrayList.ensureCapacityInternal(ArrayList.java:208) at java.util.ArrayList.add(ArrayList.java:440) at com.esotericsoftware.kryo.util.MapReferenceResolver.nextReadId(MapReferenceResolver.java:33) at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:766) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:727) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:173) at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79) at org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:621) at org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:379) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:82) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618) at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Exception in thread task-result-getter-3 java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:2245) at java.util.Arrays.copyOf(Arrays.java:2219) at java.util.ArrayList.grow(ArrayList.java:242) at java.util.ArrayList.ensureExplicitCapacity(ArrayList.java:216) at java.util.ArrayList.ensureCapacityInternal(ArrayList.java:208) at java.util.ArrayList.add(ArrayList.java:440) at com.esotericsoftware.kryo.util.MapReferenceResolver.nextReadId(MapReferenceResolver.java:33) at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:766) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:727) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338) at
Re: Spark - Timeout Issues - OutOfMemoryError
Did not work. Same problem. On Thu, Apr 30, 2015 at 1:28 PM, Akhil Das ak...@sigmoidanalytics.com wrote: You could try increasing your heap space explicitly. like export _JAVA_OPTIONS=-Xmx10g, its not the correct approach but try. Thanks Best Regards On Tue, Apr 28, 2015 at 10:35 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I have a SparkApp that runs completes in 45 mins for 5 files (5*750MB size) and it takes 16 executors to do so. I wanted to run it against 10 files of each input type (10*3 files as there are three inputs that are transformed). [Input1 = 10*750 MB, Input2=10*2.5GB, Input3 = 10*1.5G], Hence i used 32 executors. I see multiple 5/04/28 09:23:31 WARN executor.Executor: Issue communicating with driver in heartbeater org.apache.spark.SparkException: Error sending message [message = Heartbeat(22,[Lscala.Tuple2;@2e4c404a,BlockManagerId(22, phxaishdc9dn1048.stratus.phx.ebay.com, 39505))] at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:209) at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427) Caused by: java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195) ... 1 more When i searched deeper, i found OOM error. 15/04/28 09:10:15 INFO storage.BlockManagerMasterActor: Removing block manager BlockManagerId(17, phxdpehdc9dn2643.stratus.phx.ebay.com, 36819) 15/04/28 09:11:26 WARN storage.BlockManagerMasterActor: Removing BlockManager BlockManagerId(9, phxaishdc9dn1783.stratus.phx.ebay.com, 48304) with no recent heart beats: 121200ms exceeds 12ms 15/04/28 09:11:26 INFO storage.BlockManagerMasterActor: Removing block manager BlockManagerId(9, phxaishdc9dn1783.stratus.phx.ebay.com, 48304) 15/04/28 09:11:26 ERROR util.Utils: Uncaught exception in thread task-result-getter-3 java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:2245) at java.util.Arrays.copyOf(Arrays.java:2219) at java.util.ArrayList.grow(ArrayList.java:242) at java.util.ArrayList.ensureExplicitCapacity(ArrayList.java:216) at java.util.ArrayList.ensureCapacityInternal(ArrayList.java:208) at java.util.ArrayList.add(ArrayList.java:440) at com.esotericsoftware.kryo.util.MapReferenceResolver.nextReadId(MapReferenceResolver.java:33) at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:766) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:727) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:173) at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79) at org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:621) at org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:379) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:82) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618) at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Exception in thread task-result-getter-3 java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:2245) at java.util.Arrays.copyOf(Arrays.java:2219) at java.util.ArrayList.grow(ArrayList.java:242) at java.util.ArrayList.ensureExplicitCapacity(ArrayList.java:216) at java.util.ArrayList.ensureCapacityInternal(ArrayList.java:208) at java.util.ArrayList.add(ArrayList.java:440) at com.esotericsoftware.kryo.util.MapReferenceResolver.nextReadId(MapReferenceResolver.java:33) at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:766) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:727) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
Re: Spark - Timeout Issues - OutOfMemoryError
Full Exception *15/04/30 09:59:49 INFO scheduler.DAGScheduler: Stage 1 (collectAsMap at VISummaryDataProvider.scala:37) failed in 884.087 s* *15/04/30 09:59:49 INFO scheduler.DAGScheduler: Job 0 failed: collectAsMap at VISummaryDataProvider.scala:37, took 1093.418249 s* 15/04/30 09:59:49 ERROR yarn.ApplicationMaster: User class threw exception: Job aborted due to stage failure: Exception while getting task result: org.apache.spark.SparkException: Error sending message [message = GetLocations(taskresult_112)] org.apache.spark.SparkException: Job aborted due to stage failure: Exception while getting task result: org.apache.spark.SparkException: Error sending message [message = GetLocations(taskresult_112)] at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 15/04/30 09:59:49 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: Job aborted due to stage failure: Exception while getting task result: org.apache.spark.SparkException: Error sending message [message = GetLocations(taskresult_112)]) *Code at line 37* val lstgItemMap = listings.map { lstg = (lstg.getItemId().toLong, lstg) } .collectAsMap Listing data set size is 26G (10 files) and my driver memory is 12G (I cant go beyond it). The reason i do collectAsMap is to brodcast it and do a map-side join instead of regular join. Please suggest ? On Thu, Apr 30, 2015 at 10:52 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: My Spark Job is failing and i see == 15/04/30 09:59:49 ERROR yarn.ApplicationMaster: User class threw exception: Job aborted due to stage failure: Exception while getting task result: org.apache.spark.SparkException: Error sending message [message = GetLocations(taskresult_112)] org.apache.spark.SparkException: Job aborted due to stage failure: Exception while getting task result: org.apache.spark.SparkException: Error sending message [message = GetLocations(taskresult_112)] at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] I see multiple of these Caused by: java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] And finally i see this java.lang.OutOfMemoryError: Java heap space at java.nio.HeapByteBuffer.init(HeapByteBuffer.java:57) at java.nio.ByteBuffer.allocate(ByteBuffer.java:331) at org.apache.spark.network.BlockTransferService$$anon$1.onBlockFetchSuccess(BlockTransferService.scala:95) at org.apache.spark.network.shuffle.RetryingBlockFetcher$RetryingBlockFetchListener.onBlockFetchSuccess(RetryingBlockFetcher.java:206) at org.apache.spark.network.shuffle.OneForOneBlockFetcher$ChunkCallback.onSuccess(OneForOneBlockFetcher.java:72) at org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:124) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:93) at
Spark - Timeout Issues - OutOfMemoryError
I have a SparkApp that runs completes in 45 mins for 5 files (5*750MB size) and it takes 16 executors to do so. I wanted to run it against 10 files of each input type (10*3 files as there are three inputs that are transformed). [Input1 = 10*750 MB, Input2=10*2.5GB, Input3 = 10*1.5G], Hence i used 32 executors. I see multiple 5/04/28 09:23:31 WARN executor.Executor: Issue communicating with driver in heartbeater org.apache.spark.SparkException: Error sending message [message = Heartbeat(22,[Lscala.Tuple2;@2e4c404a,BlockManagerId(22, phxaishdc9dn1048.stratus.phx.ebay.com, 39505))] at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:209) at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427) Caused by: java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195) ... 1 more When i searched deeper, i found OOM error. 15/04/28 09:10:15 INFO storage.BlockManagerMasterActor: Removing block manager BlockManagerId(17, phxdpehdc9dn2643.stratus.phx.ebay.com, 36819) 15/04/28 09:11:26 WARN storage.BlockManagerMasterActor: Removing BlockManager BlockManagerId(9, phxaishdc9dn1783.stratus.phx.ebay.com, 48304) with no recent heart beats: 121200ms exceeds 12ms 15/04/28 09:11:26 INFO storage.BlockManagerMasterActor: Removing block manager BlockManagerId(9, phxaishdc9dn1783.stratus.phx.ebay.com, 48304) 15/04/28 09:11:26 ERROR util.Utils: Uncaught exception in thread task-result-getter-3 java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:2245) at java.util.Arrays.copyOf(Arrays.java:2219) at java.util.ArrayList.grow(ArrayList.java:242) at java.util.ArrayList.ensureExplicitCapacity(ArrayList.java:216) at java.util.ArrayList.ensureCapacityInternal(ArrayList.java:208) at java.util.ArrayList.add(ArrayList.java:440) at com.esotericsoftware.kryo.util.MapReferenceResolver.nextReadId(MapReferenceResolver.java:33) at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:766) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:727) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:173) at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79) at org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:621) at org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:379) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:82) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618) at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Exception in thread task-result-getter-3 java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:2245) at java.util.Arrays.copyOf(Arrays.java:2219) at java.util.ArrayList.grow(ArrayList.java:242) at java.util.ArrayList.ensureExplicitCapacity(ArrayList.java:216) at java.util.ArrayList.ensureCapacityInternal(ArrayList.java:208) at java.util.ArrayList.add(ArrayList.java:440) at com.esotericsoftware.kryo.util.MapReferenceResolver.nextReadId(MapReferenceResolver.java:33) at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:766) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:727) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:173) at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79) at