After the above changes 1) filter shows this Tasks IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch Time DurationGC TimeInput Size / RecordsWrite TimeShuffle Write Size / Records Errors 0 1 0 SUCCESS ANY 1 / phxaishdc9dn1571.stratus.phx.ebay.com 2015/04/20 20:55:31 7.4 min 21 s 129.7 MB (hadoop) / 1000000 18 s 1106.2 MB / 718687 2) lstgItem.join(viEvents).map [Equi Join] shows this
Tasks IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch Time DurationGC TimeShuffle Read Size / RecordsWrite TimeShuffle Write Size / RecordsShuffle Spill (Memory)Shuffle Spill (Disk)Errors 0 17 0 RUNNING PROCESS_LOCAL 8 / phxaishdc9dn0556.phx.ebay.com 2015/04/20 21:02:56 4.3 min 20 s 1097.3 MB / 55906817 0.0 B / 0 0.0 B 0.0 B 1 18 0 SUCCESS PROCESS_LOCAL 3 / phxaishdc9dn0374.phx.ebay.com 2015/04/20 21:02:56 1.4 min 1 s 251.0 MB / 831341 2 ms 377.8 KB / 226 9.6 GB 173.3 MB 2 19 0 SUCCESS PROCESS_LOCAL 9 / phxaishdc9dn0121.phx.ebay.com 2015/04/20 21:02:56 2.1 min 4 s 250.6 MB / 830896 89 ms 280.4 KB / 168 4.4 GB 267.9 MB 3 20 0 SUCCESS PROCESS_LOCAL 4 / phxaishdc9dn1703.stratus.phx.ebay.com 2015/04/20 21:02:56 1.9 min 1.0 s 250.6 MB / 831180 2 ms 330.3 KB / 198 7.4 GB 285.2 MB 4 21 0 SUCCESS PROCESS_LOCAL 5 / phxaishdc9dn1350.stratus.phx.ebay.com 2015/04/20 21:02:56 2.1 min 3 s 249.7 MB / 830966 3 ms 303.9 KB / 182 3.7 GB 282.8 MB Task 0/17 will run for 30 minutes. I was wondering if increasing the input data size & executors will solve this problem ? Will Map Side join help ? On Tue, Apr 21, 2015 at 9:23 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> wrote: > It also is a little more evidence to Jonathan's suggestion that there is a > null / 0 record that is getting grouped together. > > To fix this, do i need to run a filter ? > > val viEventsRaw = details.map { vi => (vi.get(14).asInstanceOf[Long], > vi) } > > val viEvents = viEventsRaw.filter { case (itemId, viEvent) => itemId > != 0 } > > > > On Wed, Apr 15, 2015 at 2:04 AM, Imran Rashid <iras...@cloudera.com> > wrote: > >> Shuffle write could be a good indication of skew, but it looks like the >> task in question hasn't generated any shuffle write yet, because its still >> working on the shuffle-read side. So I wouldn't read too much into the >> fact that the shuffle write is 0 for a task that is still running. >> >> The shuffle read is larger than for the other tasks (3.0GB vs. 2.2 GB, or >> more importantly, 55M records vs 1M records). So it might not be that the >> raw data volume is much higher on that task, but its getting a ton more >> small records, which will also generate a lot of work. It also is a little >> more evidence to Jonathan's suggestion that there is a null / 0 record that >> is getting grouped together. >> >> >> On Mon, Apr 13, 2015 at 12:40 PM, Jonathan Coveney <jcove...@gmail.com> >> wrote: >> >>> I'm not 100% sure of spark's implementation but in the MR frameworks, it >>> would have a much larger shuffle write size becasue that node is dealing >>> with a lot more data and as a result has a lot more to shuffle >>> >>> 2015-04-13 13:20 GMT-04:00 java8964 <java8...@hotmail.com>: >>> >>> If it is really due to data skew, will the task hanging has much bigger >>> Shuffle >>>> Write Size in this case? >>>> >>>> In this case, the shuffle write size for that task is 0, and the rest >>>> IO of this task is not much larger than the fast finished tasks, is that >>>> normal? >>>> >>>> I am also interested in this case, as from statistics on the UI, how it >>>> indicates the task could have skew data? >>>> >>>> Yong >>>> >>>> ------------------------------ >>>> Date: Mon, 13 Apr 2015 12:58:12 -0400 >>>> Subject: Re: Equi Join is taking for ever. 1 Task is Running while >>>> other 199 are complete >>>> From: jcove...@gmail.com >>>> To: deepuj...@gmail.com >>>> CC: user@spark.apache.org >>>> >>>> >>>> I can promise you that this is also a problem in the pig world :) not >>>> sure why it's not a problem for this data set, though... are you sure that >>>> the two are doing the exact same code? >>>> >>>> you should inspect your source data. Make a histogram for each and see >>>> what the data distribution looks like. If there is a value or bucket with a >>>> disproportionate set of values you know you have an issue >>>> >>>> 2015-04-13 12:50 GMT-04:00 ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>: >>>> >>>> You mean there is a tuple in either RDD, that has itemID = 0 or null ? >>>> And what is catch all ? >>>> >>>> That implies is it a good idea to run a filter on each RDD first ? We >>>> do not do this using Pig on M/R. Is it required in Spark world ? >>>> >>>> On Mon, Apr 13, 2015 at 9:58 PM, Jonathan Coveney <jcove...@gmail.com> >>>> wrote: >>>> >>>> My guess would be data skew. Do you know if there is some item id that >>>> is a catch all? can it be null? item id 0? lots of data sets have this sort >>>> of value and it always kills joins >>>> >>>> 2015-04-13 11:32 GMT-04:00 ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>: >>>> >>>> Code: >>>> >>>> val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary, >>>> Long))] = lstgItem.join(viEvents).map { >>>> case (itemId, (listing, viDetail)) => >>>> val viSummary = new VISummary >>>> viSummary.leafCategoryId = listing.getLeafCategId().toInt >>>> viSummary.itemSiteId = listing.getItemSiteId().toInt >>>> viSummary.auctionTypeCode = listing.getAuctTypeCode().toInt >>>> viSummary.sellerCountryId = listing.getSlrCntryId().toInt >>>> viSummary.buyerSegment = "0" >>>> viSummary.isBin = (if >>>> (listing.getBinPriceLstgCurncy.doubleValue() > 0) 1 else 0) >>>> val sellerId = listing.getSlrId.toLong >>>> (sellerId, (viDetail, viSummary, itemId)) >>>> } >>>> >>>> Running Tasks: >>>> Tasks IndexIDAttemptStatus ▾Locality LevelExecutor ID / HostLaunch Time >>>> DurationGC TimeShuffle Read Size / RecordsWrite TimeShuffle Write Size >>>> / RecordsShuffle Spill (Memory)Shuffle Spill (Disk)Errors 0 216 0 >>>> RUNNING PROCESS_LOCAL 181 / phxaishdc9dn0474.phx.ebay.com 2015/04/13 >>>> 06:43:53 1.7 h 13 min 3.0 GB / 56964921 0.0 B / 0 21.2 GB 1902.6 >>>> MB 2 218 0 SUCCESS PROCESS_LOCAL 582 / phxaishdc9dn0235.phx.ebay.com >>>> 2015/04/13 >>>> 06:43:53 15 min 31 s 2.2 GB / 1666851 0.1 s 3.0 MB / 2062 54.8 GB >>>> 1924.5 >>>> MB 1 217 0 SUCCESS PROCESS_LOCAL 202 / >>>> phxdpehdc9dn2683.stratus.phx.ebay.com 2015/04/13 06:43:53 19 min 1.3 >>>> min 2.2 GB / 1687086 75 ms 3.9 MB / 2692 33.7 GB 1960.4 MB 4 220 0 >>>> SUCCESS PROCESS_LOCAL 218 / phxaishdc9dn0855.phx.ebay.com 2015/04/13 >>>> 06:43:53 15 min 56 s 2.2 GB / 1675654 40 ms 3.3 MB / 2260 26.2 GB >>>> 1928.4 >>>> MB >>>> >>>> >>>> >>>> Command: >>>> ./bin/spark-submit -v --master yarn-cluster --driver-class-path >>>> /apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar >>>> --jars >>>> /apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.3/spark_reporting_dep_only-1.0-SNAPSHOT.jar >>>> --num-executors 3000 --driver-memory 12g --driver-java-options >>>> "-XX:MaxPermSize=6G" --executor-memory 12g --executor-cores 1 --queue >>>> hdmi-express --class com.ebay.ep.poc.spark.reporting.SparkApp >>>> /home/dvasthimal/spark1.3/spark_reporting-1.0-SNAPSHOT.jar >>>> startDate=2015-04-6 endDate=2015-04-7 >>>> input=/user/dvasthimal/epdatasets_small/exptsession subcommand=viewItem >>>> output=/user/dvasthimal/epdatasets/viewItem buffersize=128 >>>> maxbuffersize=1068 maxResultSize=2G >>>> >>>> >>>> What do i do ? I killed the job twice and its stuck again. Where is it >>>> stuck ? >>>> >>>> -- >>>> Deepak >>>> >>>> >>>> >>>> >>>> >>>> -- >>>> Deepak >>>> >>>> >>>> >>> >> > > > -- > Deepak > > -- Deepak