After the above changes

1) filter shows this
Tasks IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch Time
DurationGC TimeInput Size / RecordsWrite TimeShuffle Write Size / Records
Errors  0 1 0 SUCCESS ANY 1 / phxaishdc9dn1571.stratus.phx.ebay.com 2015/04/20
20:55:31 7.4 min  21 s  129.7 MB (hadoop) / 1000000  18 s 1106.2 MB /
718687
2) lstgItem.join(viEvents).map [Equi Join] shows this

Tasks IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch Time
DurationGC TimeShuffle Read Size / RecordsWrite TimeShuffle Write Size /
RecordsShuffle Spill (Memory)Shuffle Spill (Disk)Errors  0 17 0 RUNNING
PROCESS_LOCAL 8 / phxaishdc9dn0556.phx.ebay.com 2015/04/20 21:02:56 4.3 min
20 s  1097.3 MB / 55906817  0.0 B / 0  0.0 B 0.0 B   1 18 0 SUCCESS
PROCESS_LOCAL 3 / phxaishdc9dn0374.phx.ebay.com 2015/04/20 21:02:56 1.4 min
1 s  251.0 MB / 831341  2 ms 377.8 KB / 226  9.6 GB 173.3 MB   2 19 0
SUCCESS PROCESS_LOCAL 9 / phxaishdc9dn0121.phx.ebay.com 2015/04/20 21:02:56 2.1
min  4 s  250.6 MB / 830896  89 ms 280.4 KB / 168  4.4 GB 267.9 MB   3 20 0
SUCCESS PROCESS_LOCAL 4 / phxaishdc9dn1703.stratus.phx.ebay.com 2015/04/20
21:02:56 1.9 min  1.0 s  250.6 MB / 831180  2 ms 330.3 KB / 198  7.4 GB 285.2
MB   4 21 0 SUCCESS PROCESS_LOCAL 5 /
phxaishdc9dn1350.stratus.phx.ebay.com 2015/04/20
21:02:56 2.1 min  3 s  249.7 MB / 830966  3 ms 303.9 KB / 182  3.7 GB 282.8
MB

Task 0/17 will run for 30 minutes.
I was wondering if increasing the input data size & executors will solve
this problem ?
Will Map Side join help ?


On Tue, Apr 21, 2015 at 9:23 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> wrote:

> It also is a little more evidence to Jonathan's suggestion that there is a
> null / 0 record that is getting grouped together.
>
> To fix this, do i need to run a filter ?
>
>     val viEventsRaw = details.map { vi => (vi.get(14).asInstanceOf[Long],
> vi) }
>
>     val viEvents = viEventsRaw.filter { case (itemId, viEvent) => itemId
> != 0 }
>
>
>
> On Wed, Apr 15, 2015 at 2:04 AM, Imran Rashid <iras...@cloudera.com>
> wrote:
>
>> Shuffle write could be a good indication of skew, but it looks like the
>> task in question hasn't generated any shuffle write yet, because its still
>> working on the shuffle-read side.   So I wouldn't read too much into the
>> fact that the shuffle write is 0 for a task that is still running.
>>
>> The shuffle read is larger than for the other tasks (3.0GB vs. 2.2 GB, or
>> more importantly, 55M records vs 1M records).  So it might not be that the
>> raw data volume is much higher on that task, but its getting a ton more
>> small records, which will also generate a lot of work.  It also is a little
>> more evidence to Jonathan's suggestion that there is a null / 0 record that
>> is getting grouped together.
>>
>>
>> On Mon, Apr 13, 2015 at 12:40 PM, Jonathan Coveney <jcove...@gmail.com>
>> wrote:
>>
>>> I'm not 100% sure of spark's implementation but in the MR frameworks, it
>>> would have a much larger shuffle write size becasue that node is dealing
>>> with a lot more data and as a result has a lot  more to shuffle
>>>
>>> 2015-04-13 13:20 GMT-04:00 java8964 <java8...@hotmail.com>:
>>>
>>> If it is really due to data skew, will the task hanging has much bigger 
>>> Shuffle
>>>> Write Size in this case?
>>>>
>>>> In this case, the shuffle write size for that task is 0, and the rest
>>>> IO of this task is not much larger than the fast finished tasks, is that
>>>> normal?
>>>>
>>>> I am also interested in this case, as from statistics on the UI, how it
>>>> indicates the task could have skew data?
>>>>
>>>> Yong
>>>>
>>>> ------------------------------
>>>> Date: Mon, 13 Apr 2015 12:58:12 -0400
>>>> Subject: Re: Equi Join is taking for ever. 1 Task is Running while
>>>> other 199 are complete
>>>> From: jcove...@gmail.com
>>>> To: deepuj...@gmail.com
>>>> CC: user@spark.apache.org
>>>>
>>>>
>>>> I can promise you that this is also a problem in the pig world :) not
>>>> sure why it's not a problem for this data set, though... are you sure that
>>>> the two are doing the exact same code?
>>>>
>>>> you should inspect your source data. Make a histogram for each and see
>>>> what the data distribution looks like. If there is a value or bucket with a
>>>> disproportionate set of values you know you have an issue
>>>>
>>>> 2015-04-13 12:50 GMT-04:00 ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>:
>>>>
>>>> You mean there is a tuple in either RDD, that has itemID = 0 or null ?
>>>> And what is catch all ?
>>>>
>>>> That implies is it a good idea to run a filter on each RDD first ? We
>>>> do not do this using Pig on M/R. Is it required in Spark world ?
>>>>
>>>> On Mon, Apr 13, 2015 at 9:58 PM, Jonathan Coveney <jcove...@gmail.com>
>>>> wrote:
>>>>
>>>> My guess would be data skew. Do you know if there is some item id that
>>>> is a catch all? can it be null? item id 0? lots of data sets have this sort
>>>> of value and it always kills joins
>>>>
>>>> 2015-04-13 11:32 GMT-04:00 ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>:
>>>>
>>>> Code:
>>>>
>>>> val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary,
>>>> Long))] = lstgItem.join(viEvents).map {
>>>>       case (itemId, (listing, viDetail)) =>
>>>>         val viSummary = new VISummary
>>>>         viSummary.leafCategoryId = listing.getLeafCategId().toInt
>>>>         viSummary.itemSiteId = listing.getItemSiteId().toInt
>>>>         viSummary.auctionTypeCode = listing.getAuctTypeCode().toInt
>>>>         viSummary.sellerCountryId = listing.getSlrCntryId().toInt
>>>>         viSummary.buyerSegment = "0"
>>>>         viSummary.isBin = (if
>>>> (listing.getBinPriceLstgCurncy.doubleValue() > 0) 1 else 0)
>>>>         val sellerId = listing.getSlrId.toLong
>>>>         (sellerId, (viDetail, viSummary, itemId))
>>>>     }
>>>>
>>>> Running Tasks:
>>>> Tasks IndexIDAttemptStatus ▾Locality LevelExecutor ID / HostLaunch Time
>>>> DurationGC TimeShuffle Read Size / RecordsWrite TimeShuffle Write Size
>>>> / RecordsShuffle Spill (Memory)Shuffle Spill (Disk)Errors  0 216 0
>>>> RUNNING PROCESS_LOCAL 181 / phxaishdc9dn0474.phx.ebay.com 2015/04/13
>>>> 06:43:53 1.7 h  13 min  3.0 GB / 56964921  0.0 B / 0  21.2 GB 1902.6
>>>> MB   2 218 0 SUCCESS PROCESS_LOCAL 582 / phxaishdc9dn0235.phx.ebay.com 
>>>> 2015/04/13
>>>> 06:43:53 15 min  31 s  2.2 GB / 1666851  0.1 s 3.0 MB / 2062  54.8 GB 
>>>> 1924.5
>>>> MB   1 217 0 SUCCESS PROCESS_LOCAL 202 /
>>>> phxdpehdc9dn2683.stratus.phx.ebay.com 2015/04/13 06:43:53 19 min  1.3
>>>> min  2.2 GB / 1687086  75 ms 3.9 MB / 2692  33.7 GB 1960.4 MB   4 220 0
>>>> SUCCESS PROCESS_LOCAL 218 / phxaishdc9dn0855.phx.ebay.com 2015/04/13
>>>> 06:43:53 15 min  56 s  2.2 GB / 1675654  40 ms 3.3 MB / 2260  26.2 GB 
>>>> 1928.4
>>>> MB
>>>>
>>>>
>>>>
>>>> Command:
>>>> ./bin/spark-submit -v --master yarn-cluster --driver-class-path
>>>> /apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar
>>>> --jars
>>>> /apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.3/spark_reporting_dep_only-1.0-SNAPSHOT.jar
>>>>  --num-executors 3000 --driver-memory 12g --driver-java-options
>>>> "-XX:MaxPermSize=6G" --executor-memory 12g --executor-cores 1 --queue
>>>> hdmi-express --class com.ebay.ep.poc.spark.reporting.SparkApp
>>>> /home/dvasthimal/spark1.3/spark_reporting-1.0-SNAPSHOT.jar
>>>> startDate=2015-04-6 endDate=2015-04-7
>>>> input=/user/dvasthimal/epdatasets_small/exptsession subcommand=viewItem
>>>> output=/user/dvasthimal/epdatasets/viewItem buffersize=128
>>>> maxbuffersize=1068 maxResultSize=2G
>>>>
>>>>
>>>> What do i do ? I killed the job twice and its stuck again. Where is it
>>>> stuck ?
>>>>
>>>> --
>>>> Deepak
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Deepak
>>>>
>>>>
>>>>
>>>
>>
>
>
> --
> Deepak
>
>


-- 
Deepak

Reply via email to