Re: Java Heap Space error - Spark ML
What is the size of your data, size of the cluster, are you using spark-submit or an IDE, what spark version are you using? Try spark-submit and increase the memory of the driver or the executors. a. On 22/3/19 17:19, KhajaAsmath Mohammed wrote: Hi, I am getting the below exception when using Spark Kmeans. Any solutions from the experts. Would be really helpful. val kMeans = new KMeans().setK(reductionCount).setMaxIter(30) val kMeansModel = kMeans.fit(df) Error is occured when calling kmeans.fit Exception in thread "main" java.lang.OutOfMemoryError: Java heap space at org.apache.spark.mllib.linalg.SparseVector.toArray(Vectors.scala:760) at org.apache.spark.mllib.clustering.VectorWithNorm.toDense(KMeans.scala:614) at org.apache.spark.mllib.clustering.KMeans$$anonfun$initKMeansParallel$3.apply(KMeans.scala:382) at org.apache.spark.mllib.clustering.KMeans$$anonfun$initKMeansParallel$3.apply(KMeans.scala:382) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at org.apache.spark.mllib.clustering.KMeans.initKMeansParallel(KMeans.scala:382) at org.apache.spark.mllib.clustering.KMeans.runAlgorithm(KMeans.scala:256) at org.apache.spark.mllib.clustering.KMeans.run(KMeans.scala:227) at org.apache.spark.ml.clustering.KMeans.fit(KMeans.scala:319) at com.datamantra.spark.DataBalancing$.createBalancedDataframe(DataBalancing.scala:25) at com.datamantra.spark.jobs.IftaMLTraining$.trainML$1(IftaMLTraining.scala:182) at com.datamantra.spark.jobs.IftaMLTraining$.main(IftaMLTraining.scala:94) at com.datamantra.spark.jobs.IftaMLTraining.main(IftaMLTraining.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:738) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Thanks, Asmath -- Apostolos N. Papadopoulos, Associate Professor Department of Informatics Aristotle University of Thessaloniki Thessaloniki, GREECE tel: ++0030312310991918 email: papad...@csd.auth.gr twitter: @papadopoulos_ap web: http://datalab.csd.auth.gr/~apostol
Java Heap Space error - Spark ML
Hi, I am getting the below exception when using Spark Kmeans. Any solutions from the experts. Would be really helpful. val kMeans = new KMeans().setK(reductionCount).setMaxIter(30) val kMeansModel = kMeans.fit(df) Error is occured when calling kmeans.fit Exception in thread "main" java.lang.OutOfMemoryError: Java heap space at org.apache.spark.mllib.linalg.SparseVector.toArray(Vectors.scala:760) at org.apache.spark.mllib.clustering.VectorWithNorm.toDense(KMeans.scala:614) at org.apache.spark.mllib.clustering.KMeans$$anonfun$initKMeansParallel$3.apply(KMeans.scala:382) at org.apache.spark.mllib.clustering.KMeans$$anonfun$initKMeansParallel$3.apply(KMeans.scala:382) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at org.apache.spark.mllib.clustering.KMeans.initKMeansParallel(KMeans.scala:382) at org.apache.spark.mllib.clustering.KMeans.runAlgorithm(KMeans.scala:256) at org.apache.spark.mllib.clustering.KMeans.run(KMeans.scala:227) at org.apache.spark.ml.clustering.KMeans.fit(KMeans.scala:319) at com.datamantra.spark.DataBalancing$.createBalancedDataframe(DataBalancing.scala:25) at com.datamantra.spark.jobs.IftaMLTraining$.trainML$1(IftaMLTraining.scala:182) at com.datamantra.spark.jobs.IftaMLTraining$.main(IftaMLTraining.scala:94) at com.datamantra.spark.jobs.IftaMLTraining.main(IftaMLTraining.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:738) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Thanks, Asmath >
Java Heap Space Error
Hello, I am trying to debug a PySpark program and quite frankly, I am stumped. I see the following error in the logs. I verified the input parameters - all appear to be in order. Driver and executors appear to be proper - about 3MB of 7GB being used on each node. I do see that the DAG plan that is being created is huge. Could it be due to that? Thanks! Vinay 18/02/17 00:59:02 ERROR Utils: throw uncaught fatal error in thread SparkListenerBus java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOfRange(Arrays.java:3664) at java.lang.String.(String.java:207) at java.lang.StringBuilder.toString(StringBuilder.java:407) at com.fasterxml.jackson.core.util.TextBuffer.contentsAsString(TextBuffer.java:356) at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.getText(ReaderBasedJsonParser.java:235) at org.json4s.jackson.JValueDeserializer.deserialize(JValueDeserializer.scala:20) at org.json4s.jackson.JValueDeserializer.deserialize(JValueDeserializer.scala:42) at org.json4s.jackson.JValueDeserializer.deserialize(JValueDeserializer.scala:35) at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3736) at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2726) at org.json4s.jackson.JsonMethods$class.parse(JsonMethods.scala:20) at org.json4s.jackson.JsonMethods$.parse(JsonMethods.scala:50) at org.apache.spark.util.JsonProtocol$.sparkEventToJson(JsonProtocol.scala:103) at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:134) at org.apache.spark.scheduler.EventLoggingListener.onOtherEvent(EventLoggingListener.scala:202) at org.apache.spark.scheduler.SparkListenerBus$class.doPostEvent(SparkListenerBus.scala:67) at org.apache.spark.scheduler.LiveListenerBus.doPostEvent(LiveListenerBus.scala:36) at org.apache.spark.scheduler.LiveListenerBus.doPostEvent(LiveListenerBus.scala:36) at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63) at org.apache.spark.scheduler.LiveListenerBus.postToAll(LiveListenerBus.scala:36) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(LiveListenerBus.scala:94) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:79) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:79) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(LiveListenerBus.scala:78) at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1245) at org.apache.spark.scheduler.LiveListenerBus$$anon$1.run(LiveListenerBus.scala:77) Exception in thread "SparkListenerBus" java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOfRange(Arrays.java:3664) at java.lang.String.(String.java:207) at java.lang.StringBuilder.toString(StringBuilder.java:407) at com.fasterxml.jackson.core.util.TextBuffer.contentsAsString(TextBuffer.java:356) at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.getText(ReaderBasedJsonParser.java:235) at org.json4s.jackson.JValueDeserializer.deserialize(JValueDeserializer.scala:20) at org.json4s.jackson.JValueDeserializer.deserialize(JValueDeserializer.scala:42) at org.json4s.jackson.JValueDeserializer.deserialize(JValueDeserializer.scala:35) at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3736) at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2726) at org.json4s.jackson.JsonMethods$class.parse(JsonMethods.scala:20) at org.json4s.jackson.JsonMethods$.parse(JsonMethods.scala:50) at org.apache.spark.util.JsonProtocol$.sparkEventToJson(JsonProtocol.scala:103) at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:134) at org.apache.spark.scheduler.EventLoggingListener.onOtherEvent(EventLoggingListener.scala:202) at org.apache.spark.scheduler.SparkListenerBus$class.doPostEvent(SparkListenerBus.scala:67) at org.apache.spark.scheduler.LiveListenerBus.doPostEvent(LiveListenerBus.scala:36) at org.apache.spark.scheduler.LiveListenerBus.doPostEvent(LiveListenerBus.scala:36) at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63) at org.apache.spark.scheduler.LiveListenerBus.postToAll(LiveListenerBus.scala:36) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(LiveListenerBus.scala:94) at
Re: Java Heap Space Error
Hello, It worked like a charm. Thank you very much. Some userid’s were null that’s why many records go to userid ’null’. When i put a where clause: userid != ‘null’, it solved problem. > On 24 Sep 2015, at 22:43, java8964 <java8...@hotmail.com> wrote: > > I can understand why your first query will finish without OOM, but the new > one will fail with OOM. > > In the new query, you are asking a groupByKey/cogroup operation, which will > force all the productName + prodcutionCatagory per user id sent to the same > reducer. This could easily below out reducer's memory if you have one user id > having lot of productName and productCatagory. > > Keep in mind that Spark on the reducer side still use a Hash to merge all the > data from different mappers, so the memory in the reduce side has to be able > to merge all the productionName + productCatagory for the most frequently > shown up user id (at least), and I don't know why you want all the > productName and productCategory per user Id (Maybe a distinct could be > enough?). > > Image you have one user id show up 1M time in your dataset, with 0.5M > productname as 'A', and 0.5M product name as 'B', and your query will push 1M > of 'A' and 'B' into the same reducer, and ask Spark to merge them in the > HashMap for you for that user Id. This will cause OOM. > > Above all, you need to find out what is the max count per user id in your > data: select max(count(*)) from land where . group by userid > > Your memory has to support that amount of productName and productCatagory, > and if your partition number is not high enough (even as your unique count of > user id), if that is really what you want, to consolidate all the > productionName and product catagory together, without even consider removing > duplication. > > But both query still should push similar records count per partition, but > with much of different volume size of data. > > Yong > > Subject: Re: Java Heap Space Error > From: yu...@useinsider.com > Date: Thu, 24 Sep 2015 18:56:51 +0300 > CC: jingyu.zh...@news.com.au; user@spark.apache.org > To: java8...@hotmail.com > > Yes right, the query you wrote worked in same cluster. In this case, > partitions were equally distributed but when i used regex and concetanations > it’s not as i said before. Query with concetanation is below: > > val usersInputDF = sqlContext.sql( > s""" > | select userid,concat_ws(' ',collect_list(concat_ws(' ',if(productname > is not > NULL,lower(productname),''),lower(regexp_replace(regexp_replace(substr(productcategory,2,length(productcategory)-2),'\"',''),\",\",' > ') inputlist from landing where > dt='${dateUtil.getYear}-${dateUtil.getMonth}' and day >= '${day}' and userid > != '' and userid is not null and userid is not NULL and pagetype = > 'productDetail' group by userid > >""".stripMargin) > > > On 24 Sep 2015, at 16:52, java8964 <java8...@hotmail.com > <mailto:java8...@hotmail.com>> wrote: > > This is interesting. > > So you mean that query as > > "select userid from landing where dt='2015-9' and userid != '' and userid is > not null and userid is not NULL and pagetype = 'productDetail' group by > userid" > > works in your cluster? > > In this case, do you also see this one task with way more data than the rest, > as it happened when you use regex and concatenation? > > It is hard to believe that just add "regex" and "concatenation" will make the > distribution more equally across partitions. In your query, the distribution > in the partitions simply depends on the Hash partitioner of "userid". > > Can you show us the query after you add "regex" and "concatenation"? > > Yong > > Subject: Re: Java Heap Space Error > From: yu...@useinsider.com <mailto:yu...@useinsider.com> > Date: Thu, 24 Sep 2015 15:34:48 +0300 > CC: user@spark.apache.org <mailto:user@spark.apache.org> > To: jingyu.zh...@news.com.au <mailto:jingyu.zh...@news.com.au>; > java8...@hotmail.com <mailto:java8...@hotmail.com> > > @Jingyu > Yes, it works without regex and concatenation as the query below: > > So, what we can understand from this? Because when i do like that, shuffle > read sizes are equally distributed between partitions. > > val usersInputDF = sqlContext.sql( > s""" > | select userid from landing where dt='2015-9' and userid != '' and > userid is not null and userid is not NULL and pagetype = 'productDetail' > group by userid > >""".stripMargin) > &g
Re: Java Heap Space Error
@Jingyu Yes, it works without regex and concatenation as the query below: So, what we can understand from this? Because when i do like that, shuffle read sizes are equally distributed between partitions. val usersInputDF = sqlContext.sql( s""" | select userid from landing where dt='2015-9' and userid != '' and userid is not null and userid is not NULL and pagetype = 'productDetail' group by userid """.stripMargin) @java8964 I tried with sql.shuffle.partitions = 1 but no luck. It’s again one of the partitions shuffle size is huge and the others are very small. —— So how can i balance this shuffle read size between partitions? > On 24 Sep 2015, at 03:35, Zhang, Jingyu <jingyu.zh...@news.com.au> wrote: > > Is you sql works if do not runs a regex on strings and concatenates them, I > mean just Select the stuff without String operations? > > On 24 September 2015 at 10:11, java8964 <java8...@hotmail.com > <mailto:java8...@hotmail.com>> wrote: > Try to increase partitions count, that will make each partition has less data. > > Yong > > Subject: Re: Java Heap Space Error > From: yu...@useinsider.com <mailto:yu...@useinsider.com> > Date: Thu, 24 Sep 2015 00:32:47 +0300 > CC: user@spark.apache.org <mailto:user@spark.apache.org> > To: java8...@hotmail.com <mailto:java8...@hotmail.com> > > > Yes, it’s possible. I use S3 as data source. My external tables has > partitioned. Belowed task is 193/200. Job has 2 stages and its 193. task of > 200 in 2.stage because of sql.shuffle.partitions. > > How can i avoid this situation, this is my query: > > select userid,concat_ws(' ',collect_list(concat_ws(' ',if(productname is not > NULL,lower(productname),''),lower(regexp_replace(regexp_replace(substr(productcategory,2,length(productcategory)-2),'\"',''),\",\",' > ') inputlist from landing where dt='2015-9' and userid != '' and userid > is not null and userid is not NULL and pagetype = 'productDetail' group by > userid > > On 23 Sep 2015, at 23:55, java8964 <java8...@hotmail.com > <mailto:java8...@hotmail.com>> wrote: > > Based on your description, you job shouldn't have any shuffle then, as you > just apply regex and concatenation on the column, but there is one partition > having 4.3M records to be read, vs less than 1M records for other partitions. > > Is that possible? It depends on what is the source of your data. > > If there is shuffle in your query (More than 2 stages generated by your > query, and this is my guess of what happening), then it simple means that one > partition having way more data than the rest of partitions. > > Yong > > From: yu...@useinsider.com <mailto:yu...@useinsider.com> > Subject: Java Heap Space Error > Date: Wed, 23 Sep 2015 23:07:17 +0300 > To: user@spark.apache.org <mailto:user@spark.apache.org> > > What can cause this issue in the attached picture? I’m running and sql query > which runs a regex on strings and concatenates them. Because of this task, my > job gives java heap space error. > > > > > > This message and its attachments may contain legally privileged or > confidential information. It is intended solely for the named addressee. If > you are not the addressee indicated in this message or responsible for > delivery of the message to the addressee, you may not copy or deliver this > message or its attachments to anyone. Rather, you should permanently delete > this message and its attachments and kindly notify the sender by reply > e-mail. Any content of this message and its attachments which does not relate > to the official business of the sending company must be taken not to have > been sent or endorsed by that company or any of its related entities. No > warranty is made that the e-mail or attachments are free from computer virus > or other defect.
RE: Java Heap Space Error
This is interesting. So you mean that query as "select userid from landing where dt='2015-9' and userid != '' and userid is not null and userid is not NULL and pagetype = 'productDetail' group by userid" works in your cluster? In this case, do you also see this one task with way more data than the rest, as it happened when you use regex and concatenation? It is hard to believe that just add "regex" and "concatenation" will make the distribution more equally across partitions. In your query, the distribution in the partitions simply depends on the Hash partitioner of "userid". Can you show us the query after you add "regex" and "concatenation"? Yong Subject: Re: Java Heap Space Error From: yu...@useinsider.com Date: Thu, 24 Sep 2015 15:34:48 +0300 CC: user@spark.apache.org To: jingyu.zh...@news.com.au; java8...@hotmail.com @JingyuYes, it works without regex and concatenation as the query below: So, what we can understand from this? Because when i do like that, shuffle read sizes are equally distributed between partitions. val usersInputDF = sqlContext.sql(s""" | select userid from landing where dt='2015-9' and userid != '' and userid is not null and userid is not NULL and pagetype = 'productDetail' group by userid """.stripMargin) @java8964 I tried with sql.shuffle.partitions = 1 but no luck. It’s again one of the partitions shuffle size is huge and the others are very small. ——So how can i balance this shuffle read size between partitions? On 24 Sep 2015, at 03:35, Zhang, Jingyu <jingyu.zh...@news.com.au> wrote:Is you sql works if do not runs a regex on strings and concatenates them, I mean just Select the stuff without String operations? On 24 September 2015 at 10:11, java8964 <java8...@hotmail.com> wrote: Try to increase partitions count, that will make each partition has less data. Yong Subject: Re: Java Heap Space Error From: yu...@useinsider.com Date: Thu, 24 Sep 2015 00:32:47 +0300 CC: user@spark.apache.org To: java8...@hotmail.com Yes, it’s possible. I use S3 as data source. My external tables has partitioned. Belowed task is 193/200. Job has 2 stages and its 193. task of 200 in 2.stage because of sql.shuffle.partitions. How can i avoid this situation, this is my query: select userid,concat_ws(' ',collect_list(concat_ws(' ',if(productname is not NULL,lower(productname),''),lower(regexp_replace(regexp_replace(substr(productcategory,2,length(productcategory)-2),'\"',''),\",\",' ') inputlist from landing where dt='2015-9' and userid != '' and userid is not null and userid is not NULL and pagetype = 'productDetail' group by userid On 23 Sep 2015, at 23:55, java8964 <java8...@hotmail.com> wrote: Based on your description, you job shouldn't have any shuffle then, as you just apply regex and concatenation on the column, but there is one partition having 4.3M records to be read, vs less than 1M records for other partitions. Is that possible? It depends on what is the source of your data. If there is shuffle in your query (More than 2 stages generated by your query, and this is my guess of what happening), then it simple means that one partition having way more data than the rest of partitions. Yong From: yu...@useinsider.com Subject: Java Heap Space Error Date: Wed, 23 Sep 2015 23:07:17 +0300 To: user@spark.apache.org What can cause this issue in the attached picture? I’m running and sql query which runs a regex on strings and concatenates them. Because of this task, my job gives java heap space error. This message and its attachments may contain legally privileged or confidential information. It is intended solely for the named addressee. If you are not the addressee indicated in this message or responsible for delivery of the message to the addressee, you may not copy or deliver this message or its attachments to anyone. Rather, you should permanently delete this message and its attachments and kindly notify the sender by reply e-mail. Any content of this message and its attachments which does not relate to the official business of the sending company must be taken not to have been sent or endorsed by that company or any of its related entities. No warranty is made that the e-mail or attachments are free from computer virus or other defect.
Re: Java Heap Space Error
Yes right, the query you wrote worked in same cluster. In this case, partitions were equally distributed but when i used regex and concetanations it’s not as i said before. Query with concetanation is below: val usersInputDF = sqlContext.sql( s""" | select userid,concat_ws(' ',collect_list(concat_ws(' ',if(productname is not NULL,lower(productname),''),lower(regexp_replace(regexp_replace(substr(productcategory,2,length(productcategory)-2),'\"',''),\",\",' ') inputlist from landing where dt='${dateUtil.getYear}-${dateUtil.getMonth}' and day >= '${day}' and userid != '' and userid is not null and userid is not NULL and pagetype = 'productDetail' group by userid """.stripMargin) > On 24 Sep 2015, at 16:52, java8964 <java8...@hotmail.com> wrote: > > This is interesting. > > So you mean that query as > > "select userid from landing where dt='2015-9' and userid != '' and userid is > not null and userid is not NULL and pagetype = 'productDetail' group by > userid" > > works in your cluster? > > In this case, do you also see this one task with way more data than the rest, > as it happened when you use regex and concatenation? > > It is hard to believe that just add "regex" and "concatenation" will make the > distribution more equally across partitions. In your query, the distribution > in the partitions simply depends on the Hash partitioner of "userid". > > Can you show us the query after you add "regex" and "concatenation"? > > Yong > > Subject: Re: Java Heap Space Error > From: yu...@useinsider.com > Date: Thu, 24 Sep 2015 15:34:48 +0300 > CC: user@spark.apache.org > To: jingyu.zh...@news.com.au; java8...@hotmail.com > > @Jingyu > Yes, it works without regex and concatenation as the query below: > > So, what we can understand from this? Because when i do like that, shuffle > read sizes are equally distributed between partitions. > > val usersInputDF = sqlContext.sql( > s""" > | select userid from landing where dt='2015-9' and userid != '' and > userid is not null and userid is not NULL and pagetype = 'productDetail' > group by userid > >""".stripMargin) > > @java8964 > > I tried with sql.shuffle.partitions = 1 but no luck. It’s again one of > the partitions shuffle size is huge and the others are very small. > > > —— > So how can i balance this shuffle read size between partitions? > > > On 24 Sep 2015, at 03:35, Zhang, Jingyu <jingyu.zh...@news.com.au > <mailto:jingyu.zh...@news.com.au>> wrote: > > Is you sql works if do not runs a regex on strings and concatenates them, I > mean just Select the stuff without String operations? > > On 24 September 2015 at 10:11, java8964 <java8...@hotmail.com > <mailto:java8...@hotmail.com>> wrote: > Try to increase partitions count, that will make each partition has less data. > > Yong > > Subject: Re: Java Heap Space Error > From: yu...@useinsider.com <mailto:yu...@useinsider.com> > Date: Thu, 24 Sep 2015 00:32:47 +0300 > CC: user@spark.apache.org <mailto:user@spark.apache.org> > To: java8...@hotmail.com <mailto:java8...@hotmail.com> > > > Yes, it’s possible. I use S3 as data source. My external tables has > partitioned. Belowed task is 193/200. Job has 2 stages and its 193. task of > 200 in 2.stage because of sql.shuffle.partitions. > > How can i avoid this situation, this is my query: > > select userid,concat_ws(' ',collect_list(concat_ws(' ',if(productname is not > NULL,lower(productname),''),lower(regexp_replace(regexp_replace(substr(productcategory,2,length(productcategory)-2),'\"',''),\",\",' > ') inputlist from landing where dt='2015-9' and userid != '' and userid > is not null and userid is not NULL and pagetype = 'productDetail' group by > userid > > On 23 Sep 2015, at 23:55, java8964 <java8...@hotmail.com > <mailto:java8...@hotmail.com>> wrote: > > Based on your description, you job shouldn't have any shuffle then, as you > just apply regex and concatenation on the column, but there is one partition > having 4.3M records to be read, vs less than 1M records for other partitions. > > Is that possible? It depends on what is the source of your data. > > If there is shuffle in your query (More than 2 stages generated by your > query, and this is my guess of what happening), then it simple means that one > partition having way more data than the rest of partitions. > > Yong > > From: yu...@useinsider.com <mailto:yu...@useinsider.com> > Subject:
Re: Java Heap Space Error
Thank you very much. This makes sense. I will write after try your solution. > On 24 Sep 2015, at 22:43, java8964 <java8...@hotmail.com> wrote: > > I can understand why your first query will finish without OOM, but the new > one will fail with OOM. > > In the new query, you are asking a groupByKey/cogroup operation, which will > force all the productName + prodcutionCatagory per user id sent to the same > reducer. This could easily below out reducer's memory if you have one user id > having lot of productName and productCatagory. > > Keep in mind that Spark on the reducer side still use a Hash to merge all the > data from different mappers, so the memory in the reduce side has to be able > to merge all the productionName + productCatagory for the most frequently > shown up user id (at least), and I don't know why you want all the > productName and productCategory per user Id (Maybe a distinct could be > enough?). > > Image you have one user id show up 1M time in your dataset, with 0.5M > productname as 'A', and 0.5M product name as 'B', and your query will push 1M > of 'A' and 'B' into the same reducer, and ask Spark to merge them in the > HashMap for you for that user Id. This will cause OOM. > > Above all, you need to find out what is the max count per user id in your > data: select max(count(*)) from land where . group by userid > > Your memory has to support that amount of productName and productCatagory, > and if your partition number is not high enough (even as your unique count of > user id), if that is really what you want, to consolidate all the > productionName and product catagory together, without even consider removing > duplication. > > But both query still should push similar records count per partition, but > with much of different volume size of data. > > Yong > > Subject: Re: Java Heap Space Error > From: yu...@useinsider.com > Date: Thu, 24 Sep 2015 18:56:51 +0300 > CC: jingyu.zh...@news.com.au; user@spark.apache.org > To: java8...@hotmail.com > > Yes right, the query you wrote worked in same cluster. In this case, > partitions were equally distributed but when i used regex and concetanations > it’s not as i said before. Query with concetanation is below: > > val usersInputDF = sqlContext.sql( > s""" > | select userid,concat_ws(' ',collect_list(concat_ws(' ',if(productname > is not > NULL,lower(productname),''),lower(regexp_replace(regexp_replace(substr(productcategory,2,length(productcategory)-2),'\"',''),\",\",' > ') inputlist from landing where > dt='${dateUtil.getYear}-${dateUtil.getMonth}' and day >= '${day}' and userid > != '' and userid is not null and userid is not NULL and pagetype = > 'productDetail' group by userid > >""".stripMargin) > > > On 24 Sep 2015, at 16:52, java8964 <java8...@hotmail.com > <mailto:java8...@hotmail.com>> wrote: > > This is interesting. > > So you mean that query as > > "select userid from landing where dt='2015-9' and userid != '' and userid is > not null and userid is not NULL and pagetype = 'productDetail' group by > userid" > > works in your cluster? > > In this case, do you also see this one task with way more data than the rest, > as it happened when you use regex and concatenation? > > It is hard to believe that just add "regex" and "concatenation" will make the > distribution more equally across partitions. In your query, the distribution > in the partitions simply depends on the Hash partitioner of "userid". > > Can you show us the query after you add "regex" and "concatenation"? > > Yong > > Subject: Re: Java Heap Space Error > From: yu...@useinsider.com <mailto:yu...@useinsider.com> > Date: Thu, 24 Sep 2015 15:34:48 +0300 > CC: user@spark.apache.org <mailto:user@spark.apache.org> > To: jingyu.zh...@news.com.au <mailto:jingyu.zh...@news.com.au>; > java8...@hotmail.com <mailto:java8...@hotmail.com> > > @Jingyu > Yes, it works without regex and concatenation as the query below: > > So, what we can understand from this? Because when i do like that, shuffle > read sizes are equally distributed between partitions. > > val usersInputDF = sqlContext.sql( > s""" > | select userid from landing where dt='2015-9' and userid != '' and > userid is not null and userid is not NULL and pagetype = 'productDetail' > group by userid > >""".stripMargin) > > @java8964 > > I tried with sql.shuffle.partitions = 1 but no luck. It’s again one of > the partit
RE: Java Heap Space Error
I can understand why your first query will finish without OOM, but the new one will fail with OOM. In the new query, you are asking a groupByKey/cogroup operation, which will force all the productName + prodcutionCatagory per user id sent to the same reducer. This could easily below out reducer's memory if you have one user id having lot of productName and productCatagory. Keep in mind that Spark on the reducer side still use a Hash to merge all the data from different mappers, so the memory in the reduce side has to be able to merge all the productionName + productCatagory for the most frequently shown up user id (at least), and I don't know why you want all the productName and productCategory per user Id (Maybe a distinct could be enough?). Image you have one user id show up 1M time in your dataset, with 0.5M productname as 'A', and 0.5M product name as 'B', and your query will push 1M of 'A' and 'B' into the same reducer, and ask Spark to merge them in the HashMap for you for that user Id. This will cause OOM. Above all, you need to find out what is the max count per user id in your data: select max(count(*)) from land where . group by userid Your memory has to support that amount of productName and productCatagory, and if your partition number is not high enough (even as your unique count of user id), if that is really what you want, to consolidate all the productionName and product catagory together, without even consider removing duplication. But both query still should push similar records count per partition, but with much of different volume size of data. Yong Subject: Re: Java Heap Space Error From: yu...@useinsider.com Date: Thu, 24 Sep 2015 18:56:51 +0300 CC: jingyu.zh...@news.com.au; user@spark.apache.org To: java8...@hotmail.com Yes right, the query you wrote worked in same cluster. In this case, partitions were equally distributed but when i used regex and concetanations it’s not as i said before. Query with concetanation is below: val usersInputDF = sqlContext.sql( s""" | select userid,concat_ws(' ',collect_list(concat_ws(' ',if(productname is not NULL,lower(productname),''),lower(regexp_replace(regexp_replace(substr(productcategory,2,length(productcategory)-2),'\"',''),\",\",' ') inputlist from landing where dt='${dateUtil.getYear}-${dateUtil.getMonth}' and day >= '${day}' and userid != '' and userid is not null and userid is not NULL and pagetype = 'productDetail' group by userid """.stripMargin) On 24 Sep 2015, at 16:52, java8964 <java8...@hotmail.com> wrote:This is interesting. So you mean that query as "select userid from landing where dt='2015-9' and userid != '' and userid is not null and userid is not NULL and pagetype = 'productDetail' group by userid" works in your cluster? In this case, do you also see this one task with way more data than the rest, as it happened when you use regex and concatenation? It is hard to believe that just add "regex" and "concatenation" will make the distribution more equally across partitions. In your query, the distribution in the partitions simply depends on the Hash partitioner of "userid". Can you show us the query after you add "regex" and "concatenation"? Yong Subject: Re: Java Heap Space Error From: yu...@useinsider.com Date: Thu, 24 Sep 2015 15:34:48 +0300 CC: user@spark.apache.org To: jingyu.zh...@news.com.au; java8...@hotmail.com @JingyuYes, it works without regex and concatenation as the query below: So, what we can understand from this? Because when i do like that, shuffle read sizes are equally distributed between partitions. val usersInputDF = sqlContext.sql(s""" | select userid from landing where dt='2015-9' and userid != '' and userid is not null and userid is not NULL and pagetype = 'productDetail' group by userid """.stripMargin) @java8964 I tried with sql.shuffle.partitions = 1 but no luck. It’s again one of the partitions shuffle size is huge and the others are very small. ——So how can i balance this shuffle read size between partitions? On 24 Sep 2015, at 03:35, Zhang, Jingyu <jingyu.zh...@news.com.au> wrote:Is you sql works if do not runs a regex on strings and concatenates them, I mean just Select the stuff without String operations? On 24 September 2015 at 10:11, java8964 <java8...@hotmail.com> wrote: Try to increase partitions count, that will make each partition has less data. Yong Subject: Re: Java Heap Space Error From: yu...@useinsider.com Date: Thu, 24 Sep 2015 00:32:47 +0300 CC: user@spark.apache.org To: java8...@hotmail.com Yes, it’s possible. I use S3 as data source. My external tables has partitioned. Belowed task is 193/200. Job has 2 stages and its 193. task of 200 in 2.stage because of sql.shuffle.partitions. How can i avoid this situation, this is my query: select use
Re: Java Heap Space Error
Yes, it’s possible. I use S3 as data source. My external tables has partitioned. Belowed task is 193/200. Job has 2 stages and its 193. task of 200 in 2.stage because of sql.shuffle.partitions. How can i avoid this situation, this is my query: select userid,concat_ws(' ',collect_list(concat_ws(' ',if(productname is not NULL,lower(productname),''),lower(regexp_replace(regexp_replace(substr(productcategory,2,length(productcategory)-2),'\"',''),\",\",' ') inputlist from landing where dt='2015-9' and userid != '' and userid is not null and userid is not NULL and pagetype = 'productDetail' group by userid > On 23 Sep 2015, at 23:55, java8964 <java8...@hotmail.com> wrote: > > Based on your description, you job shouldn't have any shuffle then, as you > just apply regex and concatenation on the column, but there is one partition > having 4.3M records to be read, vs less than 1M records for other partitions. > > Is that possible? It depends on what is the source of your data. > > If there is shuffle in your query (More than 2 stages generated by your > query, and this is my guess of what happening), then it simple means that one > partition having way more data than the rest of partitions. > > Yong > > From: yu...@useinsider.com > Subject: Java Heap Space Error > Date: Wed, 23 Sep 2015 23:07:17 +0300 > To: user@spark.apache.org > > What can cause this issue in the attached picture? I’m running and sql query > which runs a regex on strings and concatenates them. Because of this task, my > job gives java heap space error. > >
RE: Java Heap Space Error
Try to increase partitions count, that will make each partition has less data. Yong Subject: Re: Java Heap Space Error From: yu...@useinsider.com Date: Thu, 24 Sep 2015 00:32:47 +0300 CC: user@spark.apache.org To: java8...@hotmail.com Yes, it’s possible. I use S3 as data source. My external tables has partitioned. Belowed task is 193/200. Job has 2 stages and its 193. task of 200 in 2.stage because of sql.shuffle.partitions. How can i avoid this situation, this is my query: select userid,concat_ws(' ',collect_list(concat_ws(' ',if(productname is not NULL,lower(productname),''),lower(regexp_replace(regexp_replace(substr(productcategory,2,length(productcategory)-2),'\"',''),\",\",' ') inputlist from landing where dt='2015-9' and userid != '' and userid is not null and userid is not NULL and pagetype = 'productDetail' group by userid On 23 Sep 2015, at 23:55, java8964 <java8...@hotmail.com> wrote:Based on your description, you job shouldn't have any shuffle then, as you just apply regex and concatenation on the column, but there is one partition having 4.3M records to be read, vs less than 1M records for other partitions. Is that possible? It depends on what is the source of your data. If there is shuffle in your query (More than 2 stages generated by your query, and this is my guess of what happening), then it simple means that one partition having way more data than the rest of partitions. Yong From: yu...@useinsider.com Subject: Java Heap Space Error Date: Wed, 23 Sep 2015 23:07:17 +0300 To: user@spark.apache.org What can cause this issue in the attached picture? I’m running and sql query which runs a regex on strings and concatenates them. Because of this task, my job gives java heap space error.
Re: Java Heap Space Error
Is you sql works if do not runs a regex on strings and concatenates them, I mean just Select the stuff without String operations? On 24 September 2015 at 10:11, java8964 <java8...@hotmail.com> wrote: > Try to increase partitions count, that will make each partition has less > data. > > Yong > > ------ > Subject: Re: Java Heap Space Error > From: yu...@useinsider.com > Date: Thu, 24 Sep 2015 00:32:47 +0300 > CC: user@spark.apache.org > To: java8...@hotmail.com > > > Yes, it’s possible. I use S3 as data source. My external tables has > partitioned. Belowed task is 193/200. Job has 2 stages and its 193. task of > 200 in 2.stage because of sql.shuffle.partitions. > > How can i avoid this situation, this is my query: > > select userid,concat_ws(' ',collect_list(concat_ws(' ',if(productname is not > NULL,lower(productname),''),lower(regexp_replace(regexp_replace(substr(productcategory,2,length(productcategory)-2),'\"',''),\",\",' > ') inputlist from landing where dt='2015-9' and userid != '' and userid > is not null and userid is not NULL and pagetype = 'productDetail' group by > userid > > > On 23 Sep 2015, at 23:55, java8964 <java8...@hotmail.com> wrote: > > Based on your description, you job shouldn't have any shuffle then, as you > just apply regex and concatenation on the column, but there is one > partition having 4.3M records to be read, vs less than 1M records for other > partitions. > > Is that possible? It depends on what is the source of your data. > > If there is shuffle in your query (More than 2 stages generated by your > query, and this is my guess of what happening), then it simple means that > one partition having way more data than the rest of partitions. > > Yong > > -- > From: yu...@useinsider.com > Subject: Java Heap Space Error > Date: Wed, 23 Sep 2015 23:07:17 +0300 > To: user@spark.apache.org > > What can cause this issue in the attached picture? I’m running and sql > query which runs a regex on strings and concatenates them. Because of this > task, my job gives java heap space error. > > > > > -- This message and its attachments may contain legally privileged or confidential information. It is intended solely for the named addressee. If you are not the addressee indicated in this message or responsible for delivery of the message to the addressee, you may not copy or deliver this message or its attachments to anyone. Rather, you should permanently delete this message and its attachments and kindly notify the sender by reply e-mail. Any content of this message and its attachments which does not relate to the official business of the sending company must be taken not to have been sent or endorsed by that company or any of its related entities. No warranty is made that the e-mail or attachments are free from computer virus or other defect.
'Java heap space' error occured when query 4G data file from HDFS
In my dev-test env .I have 3 virtual machines ,every machine have 12G memory,8 cpu core. Here is spark-defaults.conf,and spark-env.sh.Maybe some config is not right. I run this command :*spark-submit --master yarn-client --driver-memory 7g --executor-memory 6g /home/hadoop/spark/main.py* exception rised. *spark-defaults.conf* spark.master spark://cloud1:7077 spark.default.parallelism 100 spark.eventLog.enabled true spark.serializer org.apache.spark.serializer.KryoSerializer spark.driver.memory 5g spark.driver.maxResultSize 6g spark.kryoserializer.buffer.mb 256 spark.kryoserializer.buffer.max.mb 512 spark.executor.memory 4g spark.rdd.compress true spark.storage.memoryFraction 0 spark.akka.frameSize 50 spark.shuffle.compress true spark.shuffle.spill.compress false spark.local.dir /home/hadoop/tmp * spark-evn.sh* export SCALA=/home/hadoop/softsetup/scala export JAVA_HOME=/home/hadoop/softsetup/jdk1.7.0_71 export SPARK_WORKER_CORES=1 export SPARK_WORKER_MEMORY=4g export HADOOP_CONF_DIR=/opt/cloud/hadoop/etc/hadoop export SPARK_EXECUTOR_MEMORY=4g export SPARK_DRIVER_MEMORY=4g *Exception:* 15/04/07 18:11:03 INFO BlockManagerInfo: Added taskresult_28 on disk on cloud3:38109 (size: 162.7 MB) 15/04/07 18:11:03 INFO BlockManagerInfo: Added taskresult_28 on disk on cloud3:38109 (size: 162.7 MB) 15/04/07 18:11:03 INFO TaskSetManager: Starting task 31.0 in stage 1.0 (TID 31, cloud3, NODE_LOCAL, 1296 bytes) 15/04/07 18:11:03 INFO BlockManagerInfo: Added taskresult_29 on disk on cloud2:49451 (size: 163.7 MB) 15/04/07 18:11:03 INFO BlockManagerInfo: Added taskresult_29 on disk on cloud2:49451 (size: 163.7 MB) 15/04/07 18:11:03 INFO TaskSetManager: Starting task 30.0 in stage 1.0 (TID 32, cloud2, NODE_LOCAL, 1296 bytes) 15/04/07 18:11:03 ERROR Utils: Uncaught exception in thread task-result-getter-0 java.lang.OutOfMemoryError: Java heap space at org.apache.spark.scheduler.DirectTaskResult$$anonfun$readExternal$1.apply$mcV$sp(TaskResult.scala:61) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:985) at org.apache.spark.scheduler.DirectTaskResult.readExternal(TaskResult.scala:58) at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:81) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:73) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:49) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:49) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1460) at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:48) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Exception in thread task-result-getter-0 java.lang.OutOfMemoryError: Java heap space at org.apache.spark.scheduler.DirectTaskResult$$anonfun$readExternal$1.apply$mcV$sp(TaskResult.scala:61) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:985) at org.apache.spark.scheduler.DirectTaskResult.readExternal(TaskResult.scala:58) at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:81) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:73) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:49) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:49) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1460) at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:48) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/04/07 18:11:03 INFO BlockManagerInfo: Added taskresult_28 on disk on cloud3:38109 (size: 162.7 MB) 15/04/07 18:11:03 INFO BlockManagerInfo: Added taskresult_29 on disk on
Re: 'Java heap space' error occured when query 4G data file from HDFS
Any help?please. Help me do a right configure. 李铖 lidali...@gmail.com于2015年4月7日星期二写道: In my dev-test env .I have 3 virtual machines ,every machine have 12G memory,8 cpu core. Here is spark-defaults.conf,and spark-env.sh.Maybe some config is not right. I run this command :*spark-submit --master yarn-client --driver-memory 7g --executor-memory 6g /home/hadoop/spark/main.py* exception rised. *spark-defaults.conf* spark.master spark://cloud1:7077 spark.default.parallelism 100 spark.eventLog.enabled true spark.serializer org.apache.spark.serializer.KryoSerializer spark.driver.memory 5g spark.driver.maxResultSize 6g spark.kryoserializer.buffer.mb 256 spark.kryoserializer.buffer.max.mb 512 spark.executor.memory 4g spark.rdd.compress true spark.storage.memoryFraction 0 spark.akka.frameSize 50 spark.shuffle.compress true spark.shuffle.spill.compress false spark.local.dir /home/hadoop/tmp * spark-evn.sh* export SCALA=/home/hadoop/softsetup/scala export JAVA_HOME=/home/hadoop/softsetup/jdk1.7.0_71 export SPARK_WORKER_CORES=1 export SPARK_WORKER_MEMORY=4g export HADOOP_CONF_DIR=/opt/cloud/hadoop/etc/hadoop export SPARK_EXECUTOR_MEMORY=4g export SPARK_DRIVER_MEMORY=4g *Exception:* 15/04/07 18:11:03 INFO BlockManagerInfo: Added taskresult_28 on disk on cloud3:38109 (size: 162.7 MB) 15/04/07 18:11:03 INFO BlockManagerInfo: Added taskresult_28 on disk on cloud3:38109 (size: 162.7 MB) 15/04/07 18:11:03 INFO TaskSetManager: Starting task 31.0 in stage 1.0 (TID 31, cloud3, NODE_LOCAL, 1296 bytes) 15/04/07 18:11:03 INFO BlockManagerInfo: Added taskresult_29 on disk on cloud2:49451 (size: 163.7 MB) 15/04/07 18:11:03 INFO BlockManagerInfo: Added taskresult_29 on disk on cloud2:49451 (size: 163.7 MB) 15/04/07 18:11:03 INFO TaskSetManager: Starting task 30.0 in stage 1.0 (TID 32, cloud2, NODE_LOCAL, 1296 bytes) 15/04/07 18:11:03 ERROR Utils: Uncaught exception in thread task-result-getter-0 java.lang.OutOfMemoryError: Java heap space at org.apache.spark.scheduler.DirectTaskResult$$anonfun$readExternal$1.apply$mcV$sp(TaskResult.scala:61) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:985) at org.apache.spark.scheduler.DirectTaskResult.readExternal(TaskResult.scala:58) at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:81) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:73) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:49) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:49) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1460) at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:48) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Exception in thread task-result-getter-0 java.lang.OutOfMemoryError: Java heap space at org.apache.spark.scheduler.DirectTaskResult$$anonfun$readExternal$1.apply$mcV$sp(TaskResult.scala:61) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:985) at org.apache.spark.scheduler.DirectTaskResult.readExternal(TaskResult.scala:58) at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:81) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:73) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:49) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:49) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1460) at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:48) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at
RE: 'Java heap space' error occured when query 4G data file from HDFS
It is hard to guess why OOM happens without knowing your application's logic and the data size. Without knowing that, I can only guess based on some common experiences: 1) increase spark.default.parallelism2) Increase your executor-memory, maybe 6g is not just enough 3) Your environment is kind of unbalance between cup cores and available memory (8 cores vs 12G). Each core should have 3G for Spark.4) If you cache RDD, using MEMORY_ONLY_SER instead of MEMORY_ONLY5) Since your cores is much more compared with your available memory, lower the cores for executor by set -Dspark.deploy.defaultCores=. When you have not enough memory, reduce the concurrency of your executor, it will lower the memory requirement, with running in a slower speed. Yong Date: Wed, 8 Apr 2015 04:57:22 +0800 Subject: Re: 'Java heap space' error occured when query 4G data file from HDFS From: lidali...@gmail.com To: user@spark.apache.org Any help?please. Help me do a right configure. 李铖 lidali...@gmail.com于2015年4月7日星期二写道: In my dev-test env .I have 3 virtual machines ,every machine have 12G memory,8 cpu core. Here is spark-defaults.conf,and spark-env.sh.Maybe some config is not right. I run this command :spark-submit --master yarn-client --driver-memory 7g --executor-memory 6g /home/hadoop/spark/main.pyexception rised. spark-defaults.conf spark.master spark://cloud1:7077spark.default.parallelism 100spark.eventLog.enabled truespark.serializer org.apache.spark.serializer.KryoSerializerspark.driver.memory 5gspark.driver.maxResultSize 6gspark.kryoserializer.buffer.mb 256spark.kryoserializer.buffer.max.mb 512 spark.executor.memory 4gspark.rdd.compresstruespark.storage.memoryFraction 0spark.akka.frameSize 50spark.shuffle.compress truespark.shuffle.spill.compressfalsespark.local.dir /home/hadoop/tmp spark-evn.sh export SCALA=/home/hadoop/softsetup/scalaexport JAVA_HOME=/home/hadoop/softsetup/jdk1.7.0_71export SPARK_WORKER_CORES=1export SPARK_WORKER_MEMORY=4gexport HADOOP_CONF_DIR=/opt/cloud/hadoop/etc/hadoopexport SPARK_EXECUTOR_MEMORY=4gexport SPARK_DRIVER_MEMORY=4g Exception: 15/04/07 18:11:03 INFO BlockManagerInfo: Added taskresult_28 on disk on cloud3:38109 (size: 162.7 MB)15/04/07 18:11:03 INFO BlockManagerInfo: Added taskresult_28 on disk on cloud3:38109 (size: 162.7 MB)15/04/07 18:11:03 INFO TaskSetManager: Starting task 31.0 in stage 1.0 (TID 31, cloud3, NODE_LOCAL, 1296 bytes)15/04/07 18:11:03 INFO BlockManagerInfo: Added taskresult_29 on disk on cloud2:49451 (size: 163.7 MB)15/04/07 18:11:03 INFO BlockManagerInfo: Added taskresult_29 on disk on cloud2:49451 (size: 163.7 MB)15/04/07 18:11:03 INFO TaskSetManager: Starting task 30.0 in stage 1.0 (TID 32, cloud2, NODE_LOCAL, 1296 bytes)15/04/07 18:11:03 ERROR Utils: Uncaught exception in thread task-result-getter-0java.lang.OutOfMemoryError: Java heap space at org.apache.spark.scheduler.DirectTaskResult$$anonfun$readExternal$1.apply$mcV$sp(TaskResult.scala:61) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:985) at org.apache.spark.scheduler.DirectTaskResult.readExternal(TaskResult.scala:58) at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:81) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:73) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:49) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:49) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1460) at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:48) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)Exception in thread task-result-getter-0 java.lang.OutOfMemoryError: Java heap space at org.apache.spark.scheduler.DirectTaskResult$$anonfun$readExternal$1.apply$mcV$sp(TaskResult.scala:61) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:985) at org.apache.spark.scheduler.DirectTaskResult.readExternal(TaskResult.scala:58) at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796
Re: 'Java heap space' error occured when query 4G data file from HDFS
李铖: w.r.t. #5, you can use --executor-cores when invoking spark-submit Cheers On Tue, Apr 7, 2015 at 2:35 PM, java8964 java8...@hotmail.com wrote: It is hard to guess why OOM happens without knowing your application's logic and the data size. Without knowing that, I can only guess based on some common experiences: 1) increase spark.default.parallelism 2) Increase your executor-memory, maybe 6g is not just enough 3) Your environment is kind of unbalance between cup cores and available memory (8 cores vs 12G). Each core should have 3G for Spark. 4) If you cache RDD, using MEMORY_ONLY_SER instead of MEMORY_ONLY 5) Since your cores is much more compared with your available memory, lower the cores for executor by set -Dspark.deploy.defaultCores=. When you have not enough memory, reduce the concurrency of your executor, it will lower the memory requirement, with running in a slower speed. Yong -- Date: Wed, 8 Apr 2015 04:57:22 +0800 Subject: Re: 'Java heap space' error occured when query 4G data file from HDFS From: lidali...@gmail.com To: user@spark.apache.org Any help?please. Help me do a right configure. 李铖 lidali...@gmail.com于2015年4月7日星期二写道: In my dev-test env .I have 3 virtual machines ,every machine have 12G memory,8 cpu core. Here is spark-defaults.conf,and spark-env.sh.Maybe some config is not right. I run this command :*spark-submit --master yarn-client --driver-memory 7g --executor-memory 6g /home/hadoop/spark/main.py* exception rised. *spark-defaults.conf* spark.master spark://cloud1:7077 spark.default.parallelism 100 spark.eventLog.enabled true spark.serializer org.apache.spark.serializer.KryoSerializer spark.driver.memory 5g spark.driver.maxResultSize 6g spark.kryoserializer.buffer.mb 256 spark.kryoserializer.buffer.max.mb 512 spark.executor.memory 4g spark.rdd.compress true spark.storage.memoryFraction 0 spark.akka.frameSize 50 spark.shuffle.compress true spark.shuffle.spill.compress false spark.local.dir /home/hadoop/tmp * spark-evn.sh* export SCALA=/home/hadoop/softsetup/scala export JAVA_HOME=/home/hadoop/softsetup/jdk1.7.0_71 export SPARK_WORKER_CORES=1 export SPARK_WORKER_MEMORY=4g export HADOOP_CONF_DIR=/opt/cloud/hadoop/etc/hadoop export SPARK_EXECUTOR_MEMORY=4g export SPARK_DRIVER_MEMORY=4g *Exception:* 15/04/07 18:11:03 INFO BlockManagerInfo: Added taskresult_28 on disk on cloud3:38109 (size: 162.7 MB) 15/04/07 18:11:03 INFO BlockManagerInfo: Added taskresult_28 on disk on cloud3:38109 (size: 162.7 MB) 15/04/07 18:11:03 INFO TaskSetManager: Starting task 31.0 in stage 1.0 (TID 31, cloud3, NODE_LOCAL, 1296 bytes) 15/04/07 18:11:03 INFO BlockManagerInfo: Added taskresult_29 on disk on cloud2:49451 (size: 163.7 MB) 15/04/07 18:11:03 INFO BlockManagerInfo: Added taskresult_29 on disk on cloud2:49451 (size: 163.7 MB) 15/04/07 18:11:03 INFO TaskSetManager: Starting task 30.0 in stage 1.0 (TID 32, cloud2, NODE_LOCAL, 1296 bytes) 15/04/07 18:11:03 ERROR Utils: Uncaught exception in thread task-result-getter-0 java.lang.OutOfMemoryError: Java heap space at org.apache.spark.scheduler.DirectTaskResult$$anonfun$readExternal$1.apply$mcV$sp(TaskResult.scala:61) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:985) at org.apache.spark.scheduler.DirectTaskResult.readExternal(TaskResult.scala:58) at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:81) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:73) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:49) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:49) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1460) at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:48) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Exception in thread task-result-getter-0 java.lang.OutOfMemoryError: Java heap space at org.apache.spark.scheduler.DirectTaskResult$$anonfun$readExternal$1.apply$mcV$sp(TaskResult.scala:61) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:985) at org.apache.spark.scheduler.DirectTaskResult.readExternal(TaskResult.scala:58
Re: OutOfMemory : Java heap space error
I am facing same issue, posted a new thread. Please respond. On Wed, Jul 9, 2014 at 1:56 AM, Rahul Bhojwani rahulbhojwani2...@gmail.com wrote: Hi, My code was running properly but then it suddenly gave this error. Can you just put some light on it. ### 0 KB, free: 38.7 MB) 14/07/09 01:46:12 INFO BlockManagerMaster: Updated info of block rdd_2212_4 14/07/09 01:46:13 INFO PythonRDD: Times: total = 1486, boot = 698, init = 626, finish = 162 Exception in thread stdin writer for python 14/07/09 01:46:14 INFO MemoryStore: ensureFreeSpace(61480) called with cur Mem=270794224, maxMem=311387750 java.lang.OutOfMemoryError: Java heap space at java.io.BufferedOutputStream.init(Unknown Source) at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:62) 14/07/09 01:46:15 INFO MemoryStore: Block rdd_2212_0 stored as values to memory (estimated size 60.0 KB, free 38.7 MB) Exception in thread stdin writer for python java.lang.OutOfMemoryError: Java heap space at java.io.BufferedOutputStream.init(Unknown Source) at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:62) 14/07/09 01:46:18 INFO BlockManagerMasterActor$BlockManagerInfo: Added rdd_2212_0 in memory on shawn-PC:51451 (size: 60. 0 KB, free: 38.7 MB) PySpark worker failed with exception: Traceback (most recent call last): File F:\spark-0.9.1\spark-0.9.1\bin\..\/python/pyspark/worker.py, line 50, in main split_index = read_int(infile) File F:\spark-0.9.1\spark-0.9.1\python\pyspark\serializers.py, line 328, in read_int raise EOFError EOFError 14/07/09 01:46:25 INFO BlockManagerMaster: Updated info of block rdd_2212_0 Exception in thread stdin writer for python java.lang.OutOfMemoryError: Java heap space PySpark worker failed with exception: Traceback (most recent call last): File F:\spark-0.9.1\spark-0.9.1\bin\..\/python/pyspark/worker.py, line 50, in main split_index = read_int(infile) File F:\spark-0.9.1\spark-0.9.1\python\pyspark\serializers.py, line 328, in read_int raise EOFError EOFError Exception in thread Executor task launch worker-3 java.lang.OutOfMemoryError: Java heap space Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread spark-akka.actor.default-dispa tcher-15 Exception in thread Executor task launch worker-1 Exception in thread Executor task launch worker-2 java.lang.OutOfM emoryError: Java heap space java.lang.OutOfMemoryError: Java heap space Exception in thread Executor task launch worker-0 Exception in thread Executor task launch worker-5 java.lang.OutOfM emoryError: Java heap space java.lang.OutOfMemoryError: Java heap space 14/07/09 01:46:52 WARN BlockManagerMaster: Error sending message to BlockManagerMaster in 1 attempts akka.pattern.AskTimeoutException: Recipient[Actor[akka://spark/user/BlockManagerMaster#920823400]] had already been term inated. at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134) at org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:161) at org.apache.spark.storage.BlockManagerMaster.sendHeartBeat(BlockManagerMaster.scala:52) at org.apache.spark.storage.BlockManager.org $apache$spark$storage$BlockManager$$heartBeat(BlockManager.scala:97) at org.apache.spark.storage.BlockManager$$anonfun$initialize$1.apply$mcV$sp(BlockManager.scala:135) at akka.actor.Scheduler$$anon$9.run(Scheduler.scala:80) at akka.actor.LightArrayRevolverScheduler$$anon$3$$anon$2.run(Scheduler.scala:241) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source) 14/07/09 01:46:56 WARN BlockManagerMaster: Error sending message to BlockManagerMaster in 2 attempts akka.pattern.AskTimeoutException: Recipient[Actor[akka://spark/user/BlockManagerMaster#920823400]] had already been term inated. at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134) at org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:161) at org.apache.spark.storage.BlockManagerMaster.sendHeartBeat(BlockManagerMaster.scala:52) at org.apache.spark.storage.BlockManager.org $apache$spark$storage$BlockManager$$heartBeat(BlockManager.scala:97) at org.apache.spark.storage.BlockManager$$anonfun$initialize$1.apply$mcV$sp(BlockManager.scala:135) at akka.actor.Scheduler$$anon$9.run(Scheduler.scala:80) at akka.actor.LightArrayRevolverScheduler$$anon$3$$anon$2.run(Scheduler.scala:241) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source) 14/07/09
Large dataset, reduceByKey - java heap space error
I'm trying to process a large dataset, mapping/filtering works ok, but as long as I try to reduceByKey, I get out of memory errors: http://pastebin.com/70M5d0Bn Any ideas how I can fix that? Thanks. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Large dataset, reduceByKey - java heap space error
Hi Kane- http://spark.apache.org/docs/latest/tuning.html has excellent information that may be helpful. In particular increasing the number of tasks may help, as well as confirming that you don’t have more data than you're expecting landing on a key. Also, if you are using spark 1.2.0, setting spark.shuffle.manager=sort was a huge help for many of our shuffle heavy workloads (this is the default in 1.2.0 now) Cheers, Sean On Jan 22, 2015, at 3:15 PM, Kane Kim kane.ist...@gmail.commailto:kane.ist...@gmail.com wrote: I'm trying to process a large dataset, mapping/filtering works ok, but as long as I try to reduceByKey, I get out of memory errors: http://pastebin.com/70M5d0Bn Any ideas how I can fix that? Thanks. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: OOM Java heap space error on saveAsTextFile
What operation are you performing before doing the saveAsTextFile? If you are doing a groupBy/sortBy/mapPartition/reduceByKey operations then you can specify the number of partitions. We were facing these kind of problems and specifying the correct partition solved the issue. Thanks Best Regards On Fri, Aug 22, 2014 at 2:06 AM, Daniil Osipov daniil.osi...@shazam.com wrote: Hello, My job keeps failing on saveAsTextFile stage (frustrating after a 3 hour run) with an OOM exception. The log is below. I'm running the job on an input of ~8Tb gzipped JSON files, executing on 15 m3.xlarge instances. Executor is given 13Gb memory, and I'm setting two custom preferences in the job: spark.akka.frameSize: 50 (otherwise it fails due to exceeding the limit of 10Mb), spark.storage.memoryFraction: 0.2 Any suggestions? 14/08/21 19:29:26 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to spark@ip-10-99-160-181.ec2.internal :36962 14/08/21 19:29:31 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 1 is 17541459 bytes 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to spark@ip-10-144-221-26.ec2.internal :49973 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to spark@ip-10-69-31-121.ec2.internal :34569 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to spark@ip-10-165-70-221.ec2.internal :49193 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to spark@ip-10-218-181-93.ec2.internal :57648 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to spark@ip-10-142-187-230.ec2.internal :48115 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to spark@ip-10-101-178-68.ec2.internal :51931 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to spark@ip-10-99-165-121.ec2.internal :38153 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to spark@ip-10-179-187-182.ec2.internal :55645 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to spark@ip-10-182-231-107.ec2.internal :54088 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to spark@ip-10-165-79-9.ec2.internal :40112 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to spark@ip-10-111-169-138.ec2.internal :40394 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to spark@ip-10-203-161-222.ec2.internal :47447 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to spark@ip-10-153-141-230.ec2.internal :53906 14/08/21 19:29:32 ERROR actor.ActorSystemImpl: Uncaught fatal error from thread [spark-akka.actor.default-dispatcher-20] shutting down ActorSystem [spark] java.lang.OutOfMemoryError: Java heap space at com.google.protobuf_spark.AbstractMessageLite.toByteArray(AbstractMessageLite.java:62) at akka.remote.transport.AkkaPduProtobufCodec$.constructPayload(AkkaPduCodec.scala:145) at akka.remote.transport.AkkaProtocolHandle.write(AkkaProtocolTransport.scala:156) at akka.remote.EndpointWriter$$anonfun$7.applyOrElse(Endpoint.scala:569) at akka.remote.EndpointWriter$$anonfun$7.applyOrElse(Endpoint.scala:544) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33) at akka.actor.FSM$class.processEvent(FSM.scala:595) at akka.remote.EndpointWriter.processEvent(Endpoint.scala:443) at akka.actor.FSM$class.akka$actor$FSM$$processMsg(FSM.scala:589) at akka.actor.FSM$$anonfun$receive$1.applyOrElse(FSM.scala:583) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 14/08/21 19:29:32 INFO scheduler.DAGScheduler: Failed to run saveAsTextFile at RecRateApp.scala:88 Exception in
OOM Java heap space error on saveAsTextFile
Hello, My job keeps failing on saveAsTextFile stage (frustrating after a 3 hour run) with an OOM exception. The log is below. I'm running the job on an input of ~8Tb gzipped JSON files, executing on 15 m3.xlarge instances. Executor is given 13Gb memory, and I'm setting two custom preferences in the job: spark.akka.frameSize: 50 (otherwise it fails due to exceeding the limit of 10Mb), spark.storage.memoryFraction: 0.2 Any suggestions? 14/08/21 19:29:26 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to spark@ip-10-99-160-181.ec2.internal:36962 14/08/21 19:29:31 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 1 is 17541459 bytes 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to spark@ip-10-144-221-26.ec2.internal:49973 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to spark@ip-10-69-31-121.ec2.internal:34569 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to spark@ip-10-165-70-221.ec2.internal:49193 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to spark@ip-10-218-181-93.ec2.internal:57648 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to spark@ip-10-142-187-230.ec2.internal:48115 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to spark@ip-10-101-178-68.ec2.internal:51931 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to spark@ip-10-99-165-121.ec2.internal:38153 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to spark@ip-10-179-187-182.ec2.internal:55645 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to spark@ip-10-182-231-107.ec2.internal:54088 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to spark@ip-10-165-79-9.ec2.internal:40112 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to spark@ip-10-111-169-138.ec2.internal:40394 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to spark@ip-10-203-161-222.ec2.internal:47447 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to spark@ip-10-153-141-230.ec2.internal:53906 14/08/21 19:29:32 ERROR actor.ActorSystemImpl: Uncaught fatal error from thread [spark-akka.actor.default-dispatcher-20] shutting down ActorSystem [spark] java.lang.OutOfMemoryError: Java heap space at com.google.protobuf_spark.AbstractMessageLite.toByteArray(AbstractMessageLite.java:62) at akka.remote.transport.AkkaPduProtobufCodec$.constructPayload(AkkaPduCodec.scala:145) at akka.remote.transport.AkkaProtocolHandle.write(AkkaProtocolTransport.scala:156) at akka.remote.EndpointWriter$$anonfun$7.applyOrElse(Endpoint.scala:569) at akka.remote.EndpointWriter$$anonfun$7.applyOrElse(Endpoint.scala:544) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33) at akka.actor.FSM$class.processEvent(FSM.scala:595) at akka.remote.EndpointWriter.processEvent(Endpoint.scala:443) at akka.actor.FSM$class.akka$actor$FSM$$processMsg(FSM.scala:589) at akka.actor.FSM$$anonfun$receive$1.applyOrElse(FSM.scala:583) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 14/08/21 19:29:32 INFO scheduler.DAGScheduler: Failed to run saveAsTextFile at RecRateApp.scala:88 Exception in thread main org.apache.spark.SparkException: Job cancelled because SparkContext was shut down at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:639) at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:638) at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at