Re: Java Heap Space error - Spark ML
What is the size of your data, size of the cluster, are you using spark-submit or an IDE, what spark version are you using? Try spark-submit and increase the memory of the driver or the executors. a. On 22/3/19 17:19, KhajaAsmath Mohammed wrote: Hi, I am getting the below exception when using Spark Kmeans. Any solutions from the experts. Would be really helpful. val kMeans = new KMeans().setK(reductionCount).setMaxIter(30) val kMeansModel = kMeans.fit(df) Error is occured when calling kmeans.fit Exception in thread "main" java.lang.OutOfMemoryError: Java heap space at org.apache.spark.mllib.linalg.SparseVector.toArray(Vectors.scala:760) at org.apache.spark.mllib.clustering.VectorWithNorm.toDense(KMeans.scala:614) at org.apache.spark.mllib.clustering.KMeans$$anonfun$initKMeansParallel$3.apply(KMeans.scala:382) at org.apache.spark.mllib.clustering.KMeans$$anonfun$initKMeansParallel$3.apply(KMeans.scala:382) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at org.apache.spark.mllib.clustering.KMeans.initKMeansParallel(KMeans.scala:382) at org.apache.spark.mllib.clustering.KMeans.runAlgorithm(KMeans.scala:256) at org.apache.spark.mllib.clustering.KMeans.run(KMeans.scala:227) at org.apache.spark.ml.clustering.KMeans.fit(KMeans.scala:319) at com.datamantra.spark.DataBalancing$.createBalancedDataframe(DataBalancing.scala:25) at com.datamantra.spark.jobs.IftaMLTraining$.trainML$1(IftaMLTraining.scala:182) at com.datamantra.spark.jobs.IftaMLTraining$.main(IftaMLTraining.scala:94) at com.datamantra.spark.jobs.IftaMLTraining.main(IftaMLTraining.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:738) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Thanks, Asmath -- Apostolos N. Papadopoulos, Associate Professor Department of Informatics Aristotle University of Thessaloniki Thessaloniki, GREECE tel: ++0030312310991918 email: papad...@csd.auth.gr twitter: @papadopoulos_ap web: http://datalab.csd.auth.gr/~apostol
Re: Java Heap Space Error
Hello, It worked like a charm. Thank you very much. Some userid’s were null that’s why many records go to userid ’null’. When i put a where clause: userid != ‘null’, it solved problem. > On 24 Sep 2015, at 22:43, java8964 <java8...@hotmail.com> wrote: > > I can understand why your first query will finish without OOM, but the new > one will fail with OOM. > > In the new query, you are asking a groupByKey/cogroup operation, which will > force all the productName + prodcutionCatagory per user id sent to the same > reducer. This could easily below out reducer's memory if you have one user id > having lot of productName and productCatagory. > > Keep in mind that Spark on the reducer side still use a Hash to merge all the > data from different mappers, so the memory in the reduce side has to be able > to merge all the productionName + productCatagory for the most frequently > shown up user id (at least), and I don't know why you want all the > productName and productCategory per user Id (Maybe a distinct could be > enough?). > > Image you have one user id show up 1M time in your dataset, with 0.5M > productname as 'A', and 0.5M product name as 'B', and your query will push 1M > of 'A' and 'B' into the same reducer, and ask Spark to merge them in the > HashMap for you for that user Id. This will cause OOM. > > Above all, you need to find out what is the max count per user id in your > data: select max(count(*)) from land where . group by userid > > Your memory has to support that amount of productName and productCatagory, > and if your partition number is not high enough (even as your unique count of > user id), if that is really what you want, to consolidate all the > productionName and product catagory together, without even consider removing > duplication. > > But both query still should push similar records count per partition, but > with much of different volume size of data. > > Yong > > Subject: Re: Java Heap Space Error > From: yu...@useinsider.com > Date: Thu, 24 Sep 2015 18:56:51 +0300 > CC: jingyu.zh...@news.com.au; user@spark.apache.org > To: java8...@hotmail.com > > Yes right, the query you wrote worked in same cluster. In this case, > partitions were equally distributed but when i used regex and concetanations > it’s not as i said before. Query with concetanation is below: > > val usersInputDF = sqlContext.sql( > s""" > | select userid,concat_ws(' ',collect_list(concat_ws(' ',if(productname > is not > NULL,lower(productname),''),lower(regexp_replace(regexp_replace(substr(productcategory,2,length(productcategory)-2),'\"',''),\",\",' > ') inputlist from landing where > dt='${dateUtil.getYear}-${dateUtil.getMonth}' and day >= '${day}' and userid > != '' and userid is not null and userid is not NULL and pagetype = > 'productDetail' group by userid > >""".stripMargin) > > > On 24 Sep 2015, at 16:52, java8964 <java8...@hotmail.com > <mailto:java8...@hotmail.com>> wrote: > > This is interesting. > > So you mean that query as > > "select userid from landing where dt='2015-9' and userid != '' and userid is > not null and userid is not NULL and pagetype = 'productDetail' group by > userid" > > works in your cluster? > > In this case, do you also see this one task with way more data than the rest, > as it happened when you use regex and concatenation? > > It is hard to believe that just add "regex" and "concatenation" will make the > distribution more equally across partitions. In your query, the distribution > in the partitions simply depends on the Hash partitioner of "userid". > > Can you show us the query after you add "regex" and "concatenation"? > > Yong > > Subject: Re: Java Heap Space Error > From: yu...@useinsider.com <mailto:yu...@useinsider.com> > Date: Thu, 24 Sep 2015 15:34:48 +0300 > CC: user@spark.apache.org <mailto:user@spark.apache.org> > To: jingyu.zh...@news.com.au <mailto:jingyu.zh...@news.com.au>; > java8...@hotmail.com <mailto:java8...@hotmail.com> > > @Jingyu > Yes, it works without regex and concatenation as the query below: > > So, what we can understand from this? Because when i do like that, shuffle > read sizes are equally distributed between partitions. > > val usersInputDF = sqlContext.sql( > s""" > | select userid from landing where dt='2015-9' and userid != '' and > userid is not null and userid is not NULL and pagetype = 'productDetail' > group by userid > >""".stripMargin) > &g
Re: Java Heap Space Error
@Jingyu Yes, it works without regex and concatenation as the query below: So, what we can understand from this? Because when i do like that, shuffle read sizes are equally distributed between partitions. val usersInputDF = sqlContext.sql( s""" | select userid from landing where dt='2015-9' and userid != '' and userid is not null and userid is not NULL and pagetype = 'productDetail' group by userid """.stripMargin) @java8964 I tried with sql.shuffle.partitions = 1 but no luck. It’s again one of the partitions shuffle size is huge and the others are very small. —— So how can i balance this shuffle read size between partitions? > On 24 Sep 2015, at 03:35, Zhang, Jingyu <jingyu.zh...@news.com.au> wrote: > > Is you sql works if do not runs a regex on strings and concatenates them, I > mean just Select the stuff without String operations? > > On 24 September 2015 at 10:11, java8964 <java8...@hotmail.com > <mailto:java8...@hotmail.com>> wrote: > Try to increase partitions count, that will make each partition has less data. > > Yong > > Subject: Re: Java Heap Space Error > From: yu...@useinsider.com <mailto:yu...@useinsider.com> > Date: Thu, 24 Sep 2015 00:32:47 +0300 > CC: user@spark.apache.org <mailto:user@spark.apache.org> > To: java8...@hotmail.com <mailto:java8...@hotmail.com> > > > Yes, it’s possible. I use S3 as data source. My external tables has > partitioned. Belowed task is 193/200. Job has 2 stages and its 193. task of > 200 in 2.stage because of sql.shuffle.partitions. > > How can i avoid this situation, this is my query: > > select userid,concat_ws(' ',collect_list(concat_ws(' ',if(productname is not > NULL,lower(productname),''),lower(regexp_replace(regexp_replace(substr(productcategory,2,length(productcategory)-2),'\"',''),\",\",' > ') inputlist from landing where dt='2015-9' and userid != '' and userid > is not null and userid is not NULL and pagetype = 'productDetail' group by > userid > > On 23 Sep 2015, at 23:55, java8964 <java8...@hotmail.com > <mailto:java8...@hotmail.com>> wrote: > > Based on your description, you job shouldn't have any shuffle then, as you > just apply regex and concatenation on the column, but there is one partition > having 4.3M records to be read, vs less than 1M records for other partitions. > > Is that possible? It depends on what is the source of your data. > > If there is shuffle in your query (More than 2 stages generated by your > query, and this is my guess of what happening), then it simple means that one > partition having way more data than the rest of partitions. > > Yong > > From: yu...@useinsider.com <mailto:yu...@useinsider.com> > Subject: Java Heap Space Error > Date: Wed, 23 Sep 2015 23:07:17 +0300 > To: user@spark.apache.org <mailto:user@spark.apache.org> > > What can cause this issue in the attached picture? I’m running and sql query > which runs a regex on strings and concatenates them. Because of this task, my > job gives java heap space error. > > > > > > This message and its attachments may contain legally privileged or > confidential information. It is intended solely for the named addressee. If > you are not the addressee indicated in this message or responsible for > delivery of the message to the addressee, you may not copy or deliver this > message or its attachments to anyone. Rather, you should permanently delete > this message and its attachments and kindly notify the sender by reply > e-mail. Any content of this message and its attachments which does not relate > to the official business of the sending company must be taken not to have > been sent or endorsed by that company or any of its related entities. No > warranty is made that the e-mail or attachments are free from computer virus > or other defect.
RE: Java Heap Space Error
This is interesting. So you mean that query as "select userid from landing where dt='2015-9' and userid != '' and userid is not null and userid is not NULL and pagetype = 'productDetail' group by userid" works in your cluster? In this case, do you also see this one task with way more data than the rest, as it happened when you use regex and concatenation? It is hard to believe that just add "regex" and "concatenation" will make the distribution more equally across partitions. In your query, the distribution in the partitions simply depends on the Hash partitioner of "userid". Can you show us the query after you add "regex" and "concatenation"? Yong Subject: Re: Java Heap Space Error From: yu...@useinsider.com Date: Thu, 24 Sep 2015 15:34:48 +0300 CC: user@spark.apache.org To: jingyu.zh...@news.com.au; java8...@hotmail.com @JingyuYes, it works without regex and concatenation as the query below: So, what we can understand from this? Because when i do like that, shuffle read sizes are equally distributed between partitions. val usersInputDF = sqlContext.sql(s""" | select userid from landing where dt='2015-9' and userid != '' and userid is not null and userid is not NULL and pagetype = 'productDetail' group by userid """.stripMargin) @java8964 I tried with sql.shuffle.partitions = 1 but no luck. It’s again one of the partitions shuffle size is huge and the others are very small. ——So how can i balance this shuffle read size between partitions? On 24 Sep 2015, at 03:35, Zhang, Jingyu <jingyu.zh...@news.com.au> wrote:Is you sql works if do not runs a regex on strings and concatenates them, I mean just Select the stuff without String operations? On 24 September 2015 at 10:11, java8964 <java8...@hotmail.com> wrote: Try to increase partitions count, that will make each partition has less data. Yong Subject: Re: Java Heap Space Error From: yu...@useinsider.com Date: Thu, 24 Sep 2015 00:32:47 +0300 CC: user@spark.apache.org To: java8...@hotmail.com Yes, it’s possible. I use S3 as data source. My external tables has partitioned. Belowed task is 193/200. Job has 2 stages and its 193. task of 200 in 2.stage because of sql.shuffle.partitions. How can i avoid this situation, this is my query: select userid,concat_ws(' ',collect_list(concat_ws(' ',if(productname is not NULL,lower(productname),''),lower(regexp_replace(regexp_replace(substr(productcategory,2,length(productcategory)-2),'\"',''),\",\",' ') inputlist from landing where dt='2015-9' and userid != '' and userid is not null and userid is not NULL and pagetype = 'productDetail' group by userid On 23 Sep 2015, at 23:55, java8964 <java8...@hotmail.com> wrote: Based on your description, you job shouldn't have any shuffle then, as you just apply regex and concatenation on the column, but there is one partition having 4.3M records to be read, vs less than 1M records for other partitions. Is that possible? It depends on what is the source of your data. If there is shuffle in your query (More than 2 stages generated by your query, and this is my guess of what happening), then it simple means that one partition having way more data than the rest of partitions. Yong From: yu...@useinsider.com Subject: Java Heap Space Error Date: Wed, 23 Sep 2015 23:07:17 +0300 To: user@spark.apache.org What can cause this issue in the attached picture? I’m running and sql query which runs a regex on strings and concatenates them. Because of this task, my job gives java heap space error. This message and its attachments may contain legally privileged or confidential information. It is intended solely for the named addressee. If you are not the addressee indicated in this message or responsible for delivery of the message to the addressee, you may not copy or deliver this message or its attachments to anyone. Rather, you should permanently delete this message and its attachments and kindly notify the sender by reply e-mail. Any content of this message and its attachments which does not relate to the official business of the sending company must be taken not to have been sent or endorsed by that company or any of its related entities. No warranty is made that the e-mail or attachments are free from computer virus or other defect.
Re: Java Heap Space Error
Yes right, the query you wrote worked in same cluster. In this case, partitions were equally distributed but when i used regex and concetanations it’s not as i said before. Query with concetanation is below: val usersInputDF = sqlContext.sql( s""" | select userid,concat_ws(' ',collect_list(concat_ws(' ',if(productname is not NULL,lower(productname),''),lower(regexp_replace(regexp_replace(substr(productcategory,2,length(productcategory)-2),'\"',''),\",\",' ') inputlist from landing where dt='${dateUtil.getYear}-${dateUtil.getMonth}' and day >= '${day}' and userid != '' and userid is not null and userid is not NULL and pagetype = 'productDetail' group by userid """.stripMargin) > On 24 Sep 2015, at 16:52, java8964 <java8...@hotmail.com> wrote: > > This is interesting. > > So you mean that query as > > "select userid from landing where dt='2015-9' and userid != '' and userid is > not null and userid is not NULL and pagetype = 'productDetail' group by > userid" > > works in your cluster? > > In this case, do you also see this one task with way more data than the rest, > as it happened when you use regex and concatenation? > > It is hard to believe that just add "regex" and "concatenation" will make the > distribution more equally across partitions. In your query, the distribution > in the partitions simply depends on the Hash partitioner of "userid". > > Can you show us the query after you add "regex" and "concatenation"? > > Yong > > Subject: Re: Java Heap Space Error > From: yu...@useinsider.com > Date: Thu, 24 Sep 2015 15:34:48 +0300 > CC: user@spark.apache.org > To: jingyu.zh...@news.com.au; java8...@hotmail.com > > @Jingyu > Yes, it works without regex and concatenation as the query below: > > So, what we can understand from this? Because when i do like that, shuffle > read sizes are equally distributed between partitions. > > val usersInputDF = sqlContext.sql( > s""" > | select userid from landing where dt='2015-9' and userid != '' and > userid is not null and userid is not NULL and pagetype = 'productDetail' > group by userid > >""".stripMargin) > > @java8964 > > I tried with sql.shuffle.partitions = 1 but no luck. It’s again one of > the partitions shuffle size is huge and the others are very small. > > > —— > So how can i balance this shuffle read size between partitions? > > > On 24 Sep 2015, at 03:35, Zhang, Jingyu <jingyu.zh...@news.com.au > <mailto:jingyu.zh...@news.com.au>> wrote: > > Is you sql works if do not runs a regex on strings and concatenates them, I > mean just Select the stuff without String operations? > > On 24 September 2015 at 10:11, java8964 <java8...@hotmail.com > <mailto:java8...@hotmail.com>> wrote: > Try to increase partitions count, that will make each partition has less data. > > Yong > > Subject: Re: Java Heap Space Error > From: yu...@useinsider.com <mailto:yu...@useinsider.com> > Date: Thu, 24 Sep 2015 00:32:47 +0300 > CC: user@spark.apache.org <mailto:user@spark.apache.org> > To: java8...@hotmail.com <mailto:java8...@hotmail.com> > > > Yes, it’s possible. I use S3 as data source. My external tables has > partitioned. Belowed task is 193/200. Job has 2 stages and its 193. task of > 200 in 2.stage because of sql.shuffle.partitions. > > How can i avoid this situation, this is my query: > > select userid,concat_ws(' ',collect_list(concat_ws(' ',if(productname is not > NULL,lower(productname),''),lower(regexp_replace(regexp_replace(substr(productcategory,2,length(productcategory)-2),'\"',''),\",\",' > ') inputlist from landing where dt='2015-9' and userid != '' and userid > is not null and userid is not NULL and pagetype = 'productDetail' group by > userid > > On 23 Sep 2015, at 23:55, java8964 <java8...@hotmail.com > <mailto:java8...@hotmail.com>> wrote: > > Based on your description, you job shouldn't have any shuffle then, as you > just apply regex and concatenation on the column, but there is one partition > having 4.3M records to be read, vs less than 1M records for other partitions. > > Is that possible? It depends on what is the source of your data. > > If there is shuffle in your query (More than 2 stages generated by your > query, and this is my guess of what happening), then it simple means that one > partition having way more data than the rest of partitions. > > Yong > > From: yu...@useinsider.com <mailto:yu...@useinsider.com> > Subject:
Re: Java Heap Space Error
Thank you very much. This makes sense. I will write after try your solution. > On 24 Sep 2015, at 22:43, java8964 <java8...@hotmail.com> wrote: > > I can understand why your first query will finish without OOM, but the new > one will fail with OOM. > > In the new query, you are asking a groupByKey/cogroup operation, which will > force all the productName + prodcutionCatagory per user id sent to the same > reducer. This could easily below out reducer's memory if you have one user id > having lot of productName and productCatagory. > > Keep in mind that Spark on the reducer side still use a Hash to merge all the > data from different mappers, so the memory in the reduce side has to be able > to merge all the productionName + productCatagory for the most frequently > shown up user id (at least), and I don't know why you want all the > productName and productCategory per user Id (Maybe a distinct could be > enough?). > > Image you have one user id show up 1M time in your dataset, with 0.5M > productname as 'A', and 0.5M product name as 'B', and your query will push 1M > of 'A' and 'B' into the same reducer, and ask Spark to merge them in the > HashMap for you for that user Id. This will cause OOM. > > Above all, you need to find out what is the max count per user id in your > data: select max(count(*)) from land where . group by userid > > Your memory has to support that amount of productName and productCatagory, > and if your partition number is not high enough (even as your unique count of > user id), if that is really what you want, to consolidate all the > productionName and product catagory together, without even consider removing > duplication. > > But both query still should push similar records count per partition, but > with much of different volume size of data. > > Yong > > Subject: Re: Java Heap Space Error > From: yu...@useinsider.com > Date: Thu, 24 Sep 2015 18:56:51 +0300 > CC: jingyu.zh...@news.com.au; user@spark.apache.org > To: java8...@hotmail.com > > Yes right, the query you wrote worked in same cluster. In this case, > partitions were equally distributed but when i used regex and concetanations > it’s not as i said before. Query with concetanation is below: > > val usersInputDF = sqlContext.sql( > s""" > | select userid,concat_ws(' ',collect_list(concat_ws(' ',if(productname > is not > NULL,lower(productname),''),lower(regexp_replace(regexp_replace(substr(productcategory,2,length(productcategory)-2),'\"',''),\",\",' > ') inputlist from landing where > dt='${dateUtil.getYear}-${dateUtil.getMonth}' and day >= '${day}' and userid > != '' and userid is not null and userid is not NULL and pagetype = > 'productDetail' group by userid > >""".stripMargin) > > > On 24 Sep 2015, at 16:52, java8964 <java8...@hotmail.com > <mailto:java8...@hotmail.com>> wrote: > > This is interesting. > > So you mean that query as > > "select userid from landing where dt='2015-9' and userid != '' and userid is > not null and userid is not NULL and pagetype = 'productDetail' group by > userid" > > works in your cluster? > > In this case, do you also see this one task with way more data than the rest, > as it happened when you use regex and concatenation? > > It is hard to believe that just add "regex" and "concatenation" will make the > distribution more equally across partitions. In your query, the distribution > in the partitions simply depends on the Hash partitioner of "userid". > > Can you show us the query after you add "regex" and "concatenation"? > > Yong > > Subject: Re: Java Heap Space Error > From: yu...@useinsider.com <mailto:yu...@useinsider.com> > Date: Thu, 24 Sep 2015 15:34:48 +0300 > CC: user@spark.apache.org <mailto:user@spark.apache.org> > To: jingyu.zh...@news.com.au <mailto:jingyu.zh...@news.com.au>; > java8...@hotmail.com <mailto:java8...@hotmail.com> > > @Jingyu > Yes, it works without regex and concatenation as the query below: > > So, what we can understand from this? Because when i do like that, shuffle > read sizes are equally distributed between partitions. > > val usersInputDF = sqlContext.sql( > s""" > | select userid from landing where dt='2015-9' and userid != '' and > userid is not null and userid is not NULL and pagetype = 'productDetail' > group by userid > >""".stripMargin) > > @java8964 > > I tried with sql.shuffle.partitions = 1 but no luck. It’s again one of > the partit
RE: Java Heap Space Error
I can understand why your first query will finish without OOM, but the new one will fail with OOM. In the new query, you are asking a groupByKey/cogroup operation, which will force all the productName + prodcutionCatagory per user id sent to the same reducer. This could easily below out reducer's memory if you have one user id having lot of productName and productCatagory. Keep in mind that Spark on the reducer side still use a Hash to merge all the data from different mappers, so the memory in the reduce side has to be able to merge all the productionName + productCatagory for the most frequently shown up user id (at least), and I don't know why you want all the productName and productCategory per user Id (Maybe a distinct could be enough?). Image you have one user id show up 1M time in your dataset, with 0.5M productname as 'A', and 0.5M product name as 'B', and your query will push 1M of 'A' and 'B' into the same reducer, and ask Spark to merge them in the HashMap for you for that user Id. This will cause OOM. Above all, you need to find out what is the max count per user id in your data: select max(count(*)) from land where . group by userid Your memory has to support that amount of productName and productCatagory, and if your partition number is not high enough (even as your unique count of user id), if that is really what you want, to consolidate all the productionName and product catagory together, without even consider removing duplication. But both query still should push similar records count per partition, but with much of different volume size of data. Yong Subject: Re: Java Heap Space Error From: yu...@useinsider.com Date: Thu, 24 Sep 2015 18:56:51 +0300 CC: jingyu.zh...@news.com.au; user@spark.apache.org To: java8...@hotmail.com Yes right, the query you wrote worked in same cluster. In this case, partitions were equally distributed but when i used regex and concetanations it’s not as i said before. Query with concetanation is below: val usersInputDF = sqlContext.sql( s""" | select userid,concat_ws(' ',collect_list(concat_ws(' ',if(productname is not NULL,lower(productname),''),lower(regexp_replace(regexp_replace(substr(productcategory,2,length(productcategory)-2),'\"',''),\",\",' ') inputlist from landing where dt='${dateUtil.getYear}-${dateUtil.getMonth}' and day >= '${day}' and userid != '' and userid is not null and userid is not NULL and pagetype = 'productDetail' group by userid """.stripMargin) On 24 Sep 2015, at 16:52, java8964 <java8...@hotmail.com> wrote:This is interesting. So you mean that query as "select userid from landing where dt='2015-9' and userid != '' and userid is not null and userid is not NULL and pagetype = 'productDetail' group by userid" works in your cluster? In this case, do you also see this one task with way more data than the rest, as it happened when you use regex and concatenation? It is hard to believe that just add "regex" and "concatenation" will make the distribution more equally across partitions. In your query, the distribution in the partitions simply depends on the Hash partitioner of "userid". Can you show us the query after you add "regex" and "concatenation"? Yong Subject: Re: Java Heap Space Error From: yu...@useinsider.com Date: Thu, 24 Sep 2015 15:34:48 +0300 CC: user@spark.apache.org To: jingyu.zh...@news.com.au; java8...@hotmail.com @JingyuYes, it works without regex and concatenation as the query below: So, what we can understand from this? Because when i do like that, shuffle read sizes are equally distributed between partitions. val usersInputDF = sqlContext.sql(s""" | select userid from landing where dt='2015-9' and userid != '' and userid is not null and userid is not NULL and pagetype = 'productDetail' group by userid """.stripMargin) @java8964 I tried with sql.shuffle.partitions = 1 but no luck. It’s again one of the partitions shuffle size is huge and the others are very small. ——So how can i balance this shuffle read size between partitions? On 24 Sep 2015, at 03:35, Zhang, Jingyu <jingyu.zh...@news.com.au> wrote:Is you sql works if do not runs a regex on strings and concatenates them, I mean just Select the stuff without String operations? On 24 September 2015 at 10:11, java8964 <java8...@hotmail.com> wrote: Try to increase partitions count, that will make each partition has less data. Yong Subject: Re: Java Heap Space Error From: yu...@useinsider.com Date: Thu, 24 Sep 2015 00:32:47 +0300 CC: user@spark.apache.org To: java8...@hotmail.com Yes, it’s possible. I use S3 as data source. My external tables has partitioned. Belowed task is 193/200. Job has 2 stages and its 193. task of 200 in 2.stage because of sql.shuffle.partitions. How can i avoid this situation, this is my query: select use
Re: Java Heap Space Error
Yes, it’s possible. I use S3 as data source. My external tables has partitioned. Belowed task is 193/200. Job has 2 stages and its 193. task of 200 in 2.stage because of sql.shuffle.partitions. How can i avoid this situation, this is my query: select userid,concat_ws(' ',collect_list(concat_ws(' ',if(productname is not NULL,lower(productname),''),lower(regexp_replace(regexp_replace(substr(productcategory,2,length(productcategory)-2),'\"',''),\",\",' ') inputlist from landing where dt='2015-9' and userid != '' and userid is not null and userid is not NULL and pagetype = 'productDetail' group by userid > On 23 Sep 2015, at 23:55, java8964wrote: > > Based on your description, you job shouldn't have any shuffle then, as you > just apply regex and concatenation on the column, but there is one partition > having 4.3M records to be read, vs less than 1M records for other partitions. > > Is that possible? It depends on what is the source of your data. > > If there is shuffle in your query (More than 2 stages generated by your > query, and this is my guess of what happening), then it simple means that one > partition having way more data than the rest of partitions. > > Yong > > From: yu...@useinsider.com > Subject: Java Heap Space Error > Date: Wed, 23 Sep 2015 23:07:17 +0300 > To: user@spark.apache.org > > What can cause this issue in the attached picture? I’m running and sql query > which runs a regex on strings and concatenates them. Because of this task, my > job gives java heap space error. > >
RE: Java Heap Space Error
Try to increase partitions count, that will make each partition has less data. Yong Subject: Re: Java Heap Space Error From: yu...@useinsider.com Date: Thu, 24 Sep 2015 00:32:47 +0300 CC: user@spark.apache.org To: java8...@hotmail.com Yes, it’s possible. I use S3 as data source. My external tables has partitioned. Belowed task is 193/200. Job has 2 stages and its 193. task of 200 in 2.stage because of sql.shuffle.partitions. How can i avoid this situation, this is my query: select userid,concat_ws(' ',collect_list(concat_ws(' ',if(productname is not NULL,lower(productname),''),lower(regexp_replace(regexp_replace(substr(productcategory,2,length(productcategory)-2),'\"',''),\",\",' ') inputlist from landing where dt='2015-9' and userid != '' and userid is not null and userid is not NULL and pagetype = 'productDetail' group by userid On 23 Sep 2015, at 23:55, java8964 <java8...@hotmail.com> wrote:Based on your description, you job shouldn't have any shuffle then, as you just apply regex and concatenation on the column, but there is one partition having 4.3M records to be read, vs less than 1M records for other partitions. Is that possible? It depends on what is the source of your data. If there is shuffle in your query (More than 2 stages generated by your query, and this is my guess of what happening), then it simple means that one partition having way more data than the rest of partitions. Yong From: yu...@useinsider.com Subject: Java Heap Space Error Date: Wed, 23 Sep 2015 23:07:17 +0300 To: user@spark.apache.org What can cause this issue in the attached picture? I’m running and sql query which runs a regex on strings and concatenates them. Because of this task, my job gives java heap space error.
Re: Java Heap Space Error
Is you sql works if do not runs a regex on strings and concatenates them, I mean just Select the stuff without String operations? On 24 September 2015 at 10:11, java8964 <java8...@hotmail.com> wrote: > Try to increase partitions count, that will make each partition has less > data. > > Yong > > ---------- > Subject: Re: Java Heap Space Error > From: yu...@useinsider.com > Date: Thu, 24 Sep 2015 00:32:47 +0300 > CC: user@spark.apache.org > To: java8...@hotmail.com > > > Yes, it’s possible. I use S3 as data source. My external tables has > partitioned. Belowed task is 193/200. Job has 2 stages and its 193. task of > 200 in 2.stage because of sql.shuffle.partitions. > > How can i avoid this situation, this is my query: > > select userid,concat_ws(' ',collect_list(concat_ws(' ',if(productname is not > NULL,lower(productname),''),lower(regexp_replace(regexp_replace(substr(productcategory,2,length(productcategory)-2),'\"',''),\",\",' > ') inputlist from landing where dt='2015-9' and userid != '' and userid > is not null and userid is not NULL and pagetype = 'productDetail' group by > userid > > > On 23 Sep 2015, at 23:55, java8964 <java8...@hotmail.com> wrote: > > Based on your description, you job shouldn't have any shuffle then, as you > just apply regex and concatenation on the column, but there is one > partition having 4.3M records to be read, vs less than 1M records for other > partitions. > > Is that possible? It depends on what is the source of your data. > > If there is shuffle in your query (More than 2 stages generated by your > query, and this is my guess of what happening), then it simple means that > one partition having way more data than the rest of partitions. > > Yong > > -- > From: yu...@useinsider.com > Subject: Java Heap Space Error > Date: Wed, 23 Sep 2015 23:07:17 +0300 > To: user@spark.apache.org > > What can cause this issue in the attached picture? I’m running and sql > query which runs a regex on strings and concatenates them. Because of this > task, my job gives java heap space error. > > > > > -- This message and its attachments may contain legally privileged or confidential information. It is intended solely for the named addressee. If you are not the addressee indicated in this message or responsible for delivery of the message to the addressee, you may not copy or deliver this message or its attachments to anyone. Rather, you should permanently delete this message and its attachments and kindly notify the sender by reply e-mail. Any content of this message and its attachments which does not relate to the official business of the sending company must be taken not to have been sent or endorsed by that company or any of its related entities. No warranty is made that the e-mail or attachments are free from computer virus or other defect.
Re: 'Java heap space' error occured when query 4G data file from HDFS
Any help?please. Help me do a right configure. 李铖 lidali...@gmail.com于2015年4月7日星期二写道: In my dev-test env .I have 3 virtual machines ,every machine have 12G memory,8 cpu core. Here is spark-defaults.conf,and spark-env.sh.Maybe some config is not right. I run this command :*spark-submit --master yarn-client --driver-memory 7g --executor-memory 6g /home/hadoop/spark/main.py* exception rised. *spark-defaults.conf* spark.master spark://cloud1:7077 spark.default.parallelism 100 spark.eventLog.enabled true spark.serializer org.apache.spark.serializer.KryoSerializer spark.driver.memory 5g spark.driver.maxResultSize 6g spark.kryoserializer.buffer.mb 256 spark.kryoserializer.buffer.max.mb 512 spark.executor.memory 4g spark.rdd.compress true spark.storage.memoryFraction 0 spark.akka.frameSize 50 spark.shuffle.compress true spark.shuffle.spill.compress false spark.local.dir /home/hadoop/tmp * spark-evn.sh* export SCALA=/home/hadoop/softsetup/scala export JAVA_HOME=/home/hadoop/softsetup/jdk1.7.0_71 export SPARK_WORKER_CORES=1 export SPARK_WORKER_MEMORY=4g export HADOOP_CONF_DIR=/opt/cloud/hadoop/etc/hadoop export SPARK_EXECUTOR_MEMORY=4g export SPARK_DRIVER_MEMORY=4g *Exception:* 15/04/07 18:11:03 INFO BlockManagerInfo: Added taskresult_28 on disk on cloud3:38109 (size: 162.7 MB) 15/04/07 18:11:03 INFO BlockManagerInfo: Added taskresult_28 on disk on cloud3:38109 (size: 162.7 MB) 15/04/07 18:11:03 INFO TaskSetManager: Starting task 31.0 in stage 1.0 (TID 31, cloud3, NODE_LOCAL, 1296 bytes) 15/04/07 18:11:03 INFO BlockManagerInfo: Added taskresult_29 on disk on cloud2:49451 (size: 163.7 MB) 15/04/07 18:11:03 INFO BlockManagerInfo: Added taskresult_29 on disk on cloud2:49451 (size: 163.7 MB) 15/04/07 18:11:03 INFO TaskSetManager: Starting task 30.0 in stage 1.0 (TID 32, cloud2, NODE_LOCAL, 1296 bytes) 15/04/07 18:11:03 ERROR Utils: Uncaught exception in thread task-result-getter-0 java.lang.OutOfMemoryError: Java heap space at org.apache.spark.scheduler.DirectTaskResult$$anonfun$readExternal$1.apply$mcV$sp(TaskResult.scala:61) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:985) at org.apache.spark.scheduler.DirectTaskResult.readExternal(TaskResult.scala:58) at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:81) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:73) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:49) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:49) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1460) at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:48) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Exception in thread task-result-getter-0 java.lang.OutOfMemoryError: Java heap space at org.apache.spark.scheduler.DirectTaskResult$$anonfun$readExternal$1.apply$mcV$sp(TaskResult.scala:61) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:985) at org.apache.spark.scheduler.DirectTaskResult.readExternal(TaskResult.scala:58) at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:81) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:73) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:49) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:49) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1460) at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:48) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at
RE: 'Java heap space' error occured when query 4G data file from HDFS
It is hard to guess why OOM happens without knowing your application's logic and the data size. Without knowing that, I can only guess based on some common experiences: 1) increase spark.default.parallelism2) Increase your executor-memory, maybe 6g is not just enough 3) Your environment is kind of unbalance between cup cores and available memory (8 cores vs 12G). Each core should have 3G for Spark.4) If you cache RDD, using MEMORY_ONLY_SER instead of MEMORY_ONLY5) Since your cores is much more compared with your available memory, lower the cores for executor by set -Dspark.deploy.defaultCores=. When you have not enough memory, reduce the concurrency of your executor, it will lower the memory requirement, with running in a slower speed. Yong Date: Wed, 8 Apr 2015 04:57:22 +0800 Subject: Re: 'Java heap space' error occured when query 4G data file from HDFS From: lidali...@gmail.com To: user@spark.apache.org Any help?please. Help me do a right configure. 李铖 lidali...@gmail.com于2015年4月7日星期二写道: In my dev-test env .I have 3 virtual machines ,every machine have 12G memory,8 cpu core. Here is spark-defaults.conf,and spark-env.sh.Maybe some config is not right. I run this command :spark-submit --master yarn-client --driver-memory 7g --executor-memory 6g /home/hadoop/spark/main.pyexception rised. spark-defaults.conf spark.master spark://cloud1:7077spark.default.parallelism 100spark.eventLog.enabled truespark.serializer org.apache.spark.serializer.KryoSerializerspark.driver.memory 5gspark.driver.maxResultSize 6gspark.kryoserializer.buffer.mb 256spark.kryoserializer.buffer.max.mb 512 spark.executor.memory 4gspark.rdd.compresstruespark.storage.memoryFraction 0spark.akka.frameSize 50spark.shuffle.compress truespark.shuffle.spill.compressfalsespark.local.dir /home/hadoop/tmp spark-evn.sh export SCALA=/home/hadoop/softsetup/scalaexport JAVA_HOME=/home/hadoop/softsetup/jdk1.7.0_71export SPARK_WORKER_CORES=1export SPARK_WORKER_MEMORY=4gexport HADOOP_CONF_DIR=/opt/cloud/hadoop/etc/hadoopexport SPARK_EXECUTOR_MEMORY=4gexport SPARK_DRIVER_MEMORY=4g Exception: 15/04/07 18:11:03 INFO BlockManagerInfo: Added taskresult_28 on disk on cloud3:38109 (size: 162.7 MB)15/04/07 18:11:03 INFO BlockManagerInfo: Added taskresult_28 on disk on cloud3:38109 (size: 162.7 MB)15/04/07 18:11:03 INFO TaskSetManager: Starting task 31.0 in stage 1.0 (TID 31, cloud3, NODE_LOCAL, 1296 bytes)15/04/07 18:11:03 INFO BlockManagerInfo: Added taskresult_29 on disk on cloud2:49451 (size: 163.7 MB)15/04/07 18:11:03 INFO BlockManagerInfo: Added taskresult_29 on disk on cloud2:49451 (size: 163.7 MB)15/04/07 18:11:03 INFO TaskSetManager: Starting task 30.0 in stage 1.0 (TID 32, cloud2, NODE_LOCAL, 1296 bytes)15/04/07 18:11:03 ERROR Utils: Uncaught exception in thread task-result-getter-0java.lang.OutOfMemoryError: Java heap space at org.apache.spark.scheduler.DirectTaskResult$$anonfun$readExternal$1.apply$mcV$sp(TaskResult.scala:61) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:985) at org.apache.spark.scheduler.DirectTaskResult.readExternal(TaskResult.scala:58) at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:81) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:73) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:49) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:49) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1460) at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:48) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)Exception in thread task-result-getter-0 java.lang.OutOfMemoryError: Java heap space at org.apache.spark.scheduler.DirectTaskResult$$anonfun$readExternal$1.apply$mcV$sp(TaskResult.scala:61) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:985) at org.apache.spark.scheduler.DirectTaskResult.readExternal(TaskResult.scala:58) at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796
Re: 'Java heap space' error occured when query 4G data file from HDFS
李铖: w.r.t. #5, you can use --executor-cores when invoking spark-submit Cheers On Tue, Apr 7, 2015 at 2:35 PM, java8964 java8...@hotmail.com wrote: It is hard to guess why OOM happens without knowing your application's logic and the data size. Without knowing that, I can only guess based on some common experiences: 1) increase spark.default.parallelism 2) Increase your executor-memory, maybe 6g is not just enough 3) Your environment is kind of unbalance between cup cores and available memory (8 cores vs 12G). Each core should have 3G for Spark. 4) If you cache RDD, using MEMORY_ONLY_SER instead of MEMORY_ONLY 5) Since your cores is much more compared with your available memory, lower the cores for executor by set -Dspark.deploy.defaultCores=. When you have not enough memory, reduce the concurrency of your executor, it will lower the memory requirement, with running in a slower speed. Yong -- Date: Wed, 8 Apr 2015 04:57:22 +0800 Subject: Re: 'Java heap space' error occured when query 4G data file from HDFS From: lidali...@gmail.com To: user@spark.apache.org Any help?please. Help me do a right configure. 李铖 lidali...@gmail.com于2015年4月7日星期二写道: In my dev-test env .I have 3 virtual machines ,every machine have 12G memory,8 cpu core. Here is spark-defaults.conf,and spark-env.sh.Maybe some config is not right. I run this command :*spark-submit --master yarn-client --driver-memory 7g --executor-memory 6g /home/hadoop/spark/main.py* exception rised. *spark-defaults.conf* spark.master spark://cloud1:7077 spark.default.parallelism 100 spark.eventLog.enabled true spark.serializer org.apache.spark.serializer.KryoSerializer spark.driver.memory 5g spark.driver.maxResultSize 6g spark.kryoserializer.buffer.mb 256 spark.kryoserializer.buffer.max.mb 512 spark.executor.memory 4g spark.rdd.compress true spark.storage.memoryFraction 0 spark.akka.frameSize 50 spark.shuffle.compress true spark.shuffle.spill.compress false spark.local.dir /home/hadoop/tmp * spark-evn.sh* export SCALA=/home/hadoop/softsetup/scala export JAVA_HOME=/home/hadoop/softsetup/jdk1.7.0_71 export SPARK_WORKER_CORES=1 export SPARK_WORKER_MEMORY=4g export HADOOP_CONF_DIR=/opt/cloud/hadoop/etc/hadoop export SPARK_EXECUTOR_MEMORY=4g export SPARK_DRIVER_MEMORY=4g *Exception:* 15/04/07 18:11:03 INFO BlockManagerInfo: Added taskresult_28 on disk on cloud3:38109 (size: 162.7 MB) 15/04/07 18:11:03 INFO BlockManagerInfo: Added taskresult_28 on disk on cloud3:38109 (size: 162.7 MB) 15/04/07 18:11:03 INFO TaskSetManager: Starting task 31.0 in stage 1.0 (TID 31, cloud3, NODE_LOCAL, 1296 bytes) 15/04/07 18:11:03 INFO BlockManagerInfo: Added taskresult_29 on disk on cloud2:49451 (size: 163.7 MB) 15/04/07 18:11:03 INFO BlockManagerInfo: Added taskresult_29 on disk on cloud2:49451 (size: 163.7 MB) 15/04/07 18:11:03 INFO TaskSetManager: Starting task 30.0 in stage 1.0 (TID 32, cloud2, NODE_LOCAL, 1296 bytes) 15/04/07 18:11:03 ERROR Utils: Uncaught exception in thread task-result-getter-0 java.lang.OutOfMemoryError: Java heap space at org.apache.spark.scheduler.DirectTaskResult$$anonfun$readExternal$1.apply$mcV$sp(TaskResult.scala:61) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:985) at org.apache.spark.scheduler.DirectTaskResult.readExternal(TaskResult.scala:58) at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:81) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:73) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:49) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:49) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1460) at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:48) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Exception in thread task-result-getter-0 java.lang.OutOfMemoryError: Java heap space at org.apache.spark.scheduler.DirectTaskResult$$anonfun$readExternal$1.apply$mcV$sp(TaskResult.scala:61) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:985) at org.apache.spark.scheduler.DirectTaskResult.readExternal(TaskResult.scala:58