the listserv strips attachments. you'll have to host it somewhere else and link it
2012/7/11 Haitao Yao <yao.e...@gmail.com> > Sorry , I sent the mail only to Thejas. > > Resend it for all. > > > Haitao Yao > yao.e...@gmail.com > weibo: @haitao_yao > Skype: haitao.yao.final > > 在 2012-7-12,上午10:41, Haitao Yao 写道: > > > > > > > > Is your query using combiner ? > > I did know how to explicitly use combiner. > > > > > Can you send the explain plan output ? > > The explain result is in the attachment. It's a little long. > > > > <aa.explain> > > > > > Does the heap information say how many entries are there in the > > InteralCachedBag's ArrayList ? > > There's 6 big Array lists, and the size is about 372692 > > Here's the screen snapshot of the heap dump: > > > > screen snapshot 1: you can see there's 6 big POForeEach instances > > > > <aa.jpg> > > > > screen snapshot 2: you can see the memory are mostly > retained by the big array list. > > > > <bb.jpg> > > > > screen snapshot 3: you can see the big array list is > referenced by InternalCachedBag: > > > > <cc.jpg> > > > > > What version of pig are you using? > > pig-0.9.2, I've read the latest source code of pig from github, > and I don't find any improvements on IntercalCachedBag. > > > > > > Haitao Yao > > yao.e...@gmail.com > > weibo: @haitao_yao > > Skype: haitao.yao.final > > > > 在 2012-7-12,上午8:56, Thejas Nair 写道: > > > >> Haitao, > >> Is your query using combiner ? Can you send the explain plan output ? > >> Does the heap information say how many entries are there in the > >> InteralCachedBag's ArrayList ? > >> What version of pig are you using ? > >> > >> > >> Thanks, > >> Thejas > >> > >> > >> On 7/10/12 11:50 PM, Haitao Yao wrote: > >>> Oh, new discovery: we can not set pig.cachedbag.memusage = 0 because > >>> every time the InternalCachedBag spills, It creates a new tmp file in > >>> java.io.tmpdir. if we set pig.cachedbag.memusage to 0 , every new tuple > >>> added into InternalCachedBag will create a new tmp file. And the tmp > >>> file is deleted on exit. > >>> So , if you're unlucky like me, you will get a OOM Exception caused by > >>> java.io.DeleteOnExitHook! > >>> Here's the evidence: > >>> > >>> God, we really need a full description of how every parameter works. > >>> > >>> > >>> > >>> Haitao Yao > >>> yao.e...@gmail.com <mailto:yao.e...@gmail.com> > >>> weibo: @haitao_yao > >>> Skype: haitao.yao.final > >>> > >>> 在 2012-7-10,下午4:20, Haitao Yao 写道: > >>> > >>>> I found the solution. > >>>> > >>>> After analyzing the heap dump while the reducer OOM, I found out the > >>>> memory is consumed by org.apache.pig.data.InternalCachedBag , here's > >>>> the diagram: > >>>> <cc.jpg> > >>>> > >>>> In the source code of org.apache.pig.data.InternalCachedBag, I found > >>>> out there's a parameter for the cache limit: > >>>> *public* InternalCachedBag(*int* bagCount) { > >>>> *float* percent = 0.2F; > >>>> > >>>> *if* (PigMapReduce./sJobConfInternal/.get() != *null*) { > >>>> // here, the cache limit is from here! > >>>> String usage = > >>>> PigMapReduce./sJobConfInternal/.get().get("pig.cachedbag.memusage"); > >>>> *if* (usage != *null*) { > >>>> percent = Float./parseFloat/(usage); > >>>> } > >>>> } > >>>> > >>>> init(bagCount, percent); > >>>> } > >>>> *private* *void* init(*int* bagCount, *float* percent) { > >>>> factory = TupleFactory./getInstance/(); > >>>> mContents = *new* ArrayList<Tuple>(); > >>>> > >>>> *long* max = Runtime./getRuntime/().maxMemory(); > >>>> maxMemUsage = (*long*)(((*float*)max * percent) / (*float*)bagCount); > >>>> cacheLimit = Integer./MAX_VALUE/; > >>>> > >>>> // set limit to 0, if memusage is 0 or really really small. > >>>> // then all tuples are put into disk > >>>> *if* (maxMemUsage < 1) { > >>>> cacheLimit = 0; > >>>> } > >>>> /log/.warn("cacheLimit: " + *this*.cacheLimit); > >>>> addDone = *false*; > >>>> } > >>>> > >>>> so, after write pig.cachedbag.memusage=0 into > >>>> $PIG_HOME/conf/pig.properties, my job successes! > >>>> > >>>> You can also set to an appropriate value to fully utilize your memory > >>>> as a cache. > >>>> > >>>> Hope this is useful for others. > >>>> Thanks. > >>>> > >>>> > >>>> Haitao Yao > >>>> yao.e...@gmail.com <mailto:yao.e...@gmail.com> > >>>> weibo: @haitao_yao > >>>> Skype: haitao.yao.final > >>>> > >>>> 在 2012-7-10,下午1:06, Haitao Yao 写道: > >>>> > >>>>> my reducers get 512 MB, -Xms512M -Xmx512M. > >>>>> The reducer does not get OOM when manually invoke spill in my case. > >>>>> > >>>>> Can you explain more about your solution? > >>>>> And can your solution fit into 512MB reducer process? > >>>>> Thanks very much. > >>>>> > >>>>> > >>>>> > >>>>> Haitao Yao > >>>>> yao.e...@gmail.com <mailto:yao.e...@gmail.com> > >>>>> weibo: @haitao_yao > >>>>> Skype: haitao.yao.final > >>>>> > >>>>> 在 2012-7-10,下午12:26, Jonathan Coveney 写道: > >>>>> > >>>>>> I have something in the mix that should reduce bag memory :) > >>>>>> Question: how > >>>>>> much memory are your reducers getting? In my experience, you'll get > >>>>>> OOM's > >>>>>> on spilling if you have allocated less than a gig to the JVM > >>>>>> > >>>>>> 2012/7/9 Haitao Yao <yao.e...@gmail.com <mailto:yao.e...@gmail.com > >> > >>>>>> > >>>>>>> I have encountered the similar problem. And I got a OOM while > >>>>>>> running the > >>>>>>> reducer. > >>>>>>> I think the reason is the data bag generated after group all is too > >>>>>>> big to > >>>>>>> fit into the reducer's memory. > >>>>>>> > >>>>>>> and I have written a new COUNT implementation with explicit invoke > >>>>>>> System.gc() and spill after the COUNT function finish its job, > but it > >>>>>>> still get OOM > >>>>>>> > >>>>>>> here's the code of the new COUNT implementation: > >>>>>>> @Override > >>>>>>> public Long exec(Tuple input) throws IOException { > >>>>>>> DataBag bag = (DataBag)input.get(0); > >>>>>>> Long result = super.exec(input); > >>>>>>> LOG.warn(" before spill data bag memory : " + > >>>>>>> Runtime.getRuntime().freeMemory()); > >>>>>>> bag.spill(); > >>>>>>> System.gc(); > >>>>>>> LOG.warn(" after spill data bag memory : " + > >>>>>>> Runtime.getRuntime().freeMemory()); > >>>>>>> LOG.warn("big bag size: " + bag.size() + ", > >>>>>>> hashcode: " + > >>>>>>> bag.hashCode()); > >>>>>>> return result; > >>>>>>> } > >>>>>>> > >>>>>>> > >>>>>>> I think we have to redesign the data bag implementation with less > >>>>>>> memory > >>>>>>> consumed. > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> Haitao Yao > >>>>>>> yao.e...@gmail.com <mailto:yao.e...@gmail.com> > >>>>>>> weibo: @haitao_yao > >>>>>>> Skype: haitao.yao.final > >>>>>>> > >>>>>>> 在 2012-7-10,上午6:54, Sheng Guo 写道: > >>>>>>> > >>>>>>>> the pig script: > >>>>>>>> > >>>>>>>> longDesc = load '/user/xx/filtered_chunk' USING AvroStorage(); > >>>>>>>> > >>>>>>>> grpall = group longDesc all; > >>>>>>>> cnt = foreach grpall generate COUNT(longDesc) as allNumber; > >>>>>>>> explain cnt; > >>>>>>>> > >>>>>>>> > >>>>>>>> the dump relation result: > >>>>>>>> > >>>>>>>> #----------------------------------------------- > >>>>>>>> # New Logical Plan: > >>>>>>>> #----------------------------------------------- > >>>>>>>> cnt: (Name: LOStore Schema: allNumber#65:long) > >>>>>>>> | > >>>>>>>> |---cnt: (Name: LOForEach Schema: allNumber#65:long) > >>>>>>>> | | > >>>>>>>> | (Name: LOGenerate[false] Schema: > >>>>>>>> > allNumber#65:long)ColumnPrune:InputUids=[63]ColumnPrune:OutputUids=[65] > >>>>>>>> | | | > >>>>>>>> | | (Name: UserFunc(org.apache.pig.builtin.COUNT) Type: long > >>>>>>>> Uid: > >>>>>>>> 65) > >>>>>>>> | | | > >>>>>>>> | | |---longDesc:(Name: Project Type: bag Uid: 63 Input: 0 > >>>>>>>> Column: > >>>>>>>> (*)) > >>>>>>>> | | > >>>>>>>> | |---longDesc: (Name: LOInnerLoad[1] Schema: > >>>>>>>> > >>>>>>> > DISCUSSION_ID#41:long,COMMENT_COUNT#42:long,UNIQUE_COMMENTER_COUNT#43:long,ACTIVE_COMMENT_COUNT#44:long,LAST_ACTIVITY_AT#45:long,SUBJECT#46:chararray,SUBJECT_CHUNKS#47:chararray,LOCALE#48:chararray,STATE#49:chararray,DETAIL#50:chararray,DETAIL_CHUNKS#51:chararray,TOPIC_TITLE#52:chararray,TOPIC_TITLE_CHUNKS#53:chararray,TOPIC_DESCRIPTION#54:chararray,TOPIC_DESCRIPTION_CHUNKS#55:chararray,TOPIC_ATTRIBUTES#56:chararray) > >>>>>>>> | > >>>>>>>> |---grpall: (Name: LOCogroup Schema: > >>>>>>>> > >>>>>>> > group#62:chararray,longDesc#63:bag{#64:tuple(DISCUSSION_ID#41:long,COMMENT_COUNT#42:long,UNIQUE_COMMENTER_COUNT#43:long,ACTIVE_COMMENT_COUNT#44:long,LAST_ACTIVITY_AT#45:long,SUBJECT#46:chararray,SUBJECT_CHUNKS#47:chararray,LOCALE#48:chararray,STATE#49:chararray,DETAIL#50:chararray,DETAIL_CHUNKS#51:chararray,TOPIC_TITLE#52:chararray,TOPIC_TITLE_CHUNKS#53:chararray,TOPIC_DESCRIPTION#54:chararray,TOPIC_DESCRIPTION_CHUNKS#55:chararray,TOPIC_ATTRIBUTES#56:chararray)}) > >>>>>>>> | | > >>>>>>>> | (Name: Constant Type: chararray Uid: 62) > >>>>>>>> | > >>>>>>>> |---longDesc: (Name: LOLoad Schema: > >>>>>>>> > >>>>>>> > DISCUSSION_ID#41:long,COMMENT_COUNT#42:long,UNIQUE_COMMENTER_COUNT#43:long,ACTIVE_COMMENT_COUNT#44:long,LAST_ACTIVITY_AT#45:long,SUBJECT#46:chararray,SUBJECT_CHUNKS#47:chararray,LOCALE#48:chararray,STATE#49:chararray,DETAIL#50:chararray,DETAIL_CHUNKS#51:chararray,TOPIC_TITLE#52:chararray,TOPIC_TITLE_CHUNKS#53:chararray,TOPIC_DESCRIPTION#54:chararray,TOPIC_DESCRIPTION_CHUNKS#55:chararray,TOPIC_ATTRIBUTES#56:chararray)RequiredFields:null > >>>>>>>> > >>>>>>>> #----------------------------------------------- > >>>>>>>> # Physical Plan: > >>>>>>>> #----------------------------------------------- > >>>>>>>> cnt: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-9 > >>>>>>>> | > >>>>>>>> |---cnt: New For Each(false)[bag] - scope-8 > >>>>>>>> | | > >>>>>>>> | POUserFunc(org.apache.pig.builtin.COUNT)[long] - scope-6 > >>>>>>>> | | > >>>>>>>> | |---Project[bag][1] - scope-5 > >>>>>>>> | > >>>>>>>> |---grpall: Package[tuple]{chararray} - scope-2 > >>>>>>>> | > >>>>>>>> |---grpall: Global Rearrange[tuple] - scope-1 > >>>>>>>> | > >>>>>>>> |---grpall: Local Rearrange[tuple]{chararray}(false) - > >>>>>>>> scope-3 > >>>>>>>> | | > >>>>>>>> | Constant(all) - scope-4 > >>>>>>>> | > >>>>>>>> |---longDesc: > >>>>>>>> Load(/user/sguo/h2o/group_filtered_chunk:LiAvroStorage) - scope-0 > >>>>>>>> > >>>>>>>> 2012-07-09 15:47:02,441 [main] INFO > >>>>>>>> > org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler > >>>>>>>> - > >>>>>>>> File concatenation threshold: 100 optimistic? false > >>>>>>>> 2012-07-09 15:47:02,448 [main] INFO > >>>>>>>> > >>>>>>> > org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.CombinerOptimizer > >>>>>>>> - Choosing to move algebraic foreach to combiner > >>>>>>>> 2012-07-09 15:47:02,581 [main] INFO > >>>>>>>> > >>>>>>> > org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer > >>>>>>>> - MR plan size before optimization: 1 > >>>>>>>> 2012-07-09 15:47:02,581 [main] INFO > >>>>>>>> > >>>>>>> > org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer > >>>>>>>> - MR plan size after optimization: 1 > >>>>>>>> #-------------------------------------------------- > >>>>>>>> # Map Reduce Plan > >>>>>>>> #-------------------------------------------------- > >>>>>>>> MapReduce node scope-10 > >>>>>>>> Map Plan > >>>>>>>> grpall: Local Rearrange[tuple]{chararray}(false) - scope-22 > >>>>>>>> | | > >>>>>>>> | Project[chararray][0] - scope-23 > >>>>>>>> | > >>>>>>>> |---cnt: New For Each(false,false)[bag] - scope-11 > >>>>>>>> | | > >>>>>>>> | Project[chararray][0] - scope-12 > >>>>>>>> | | > >>>>>>>> | POUserFunc(org.apache.pig.builtin.COUNT$Initial)[tuple] - > >>>>>>>> scope-13 > >>>>>>>> | | > >>>>>>>> | |---Project[bag][1] - scope-14 > >>>>>>>> | > >>>>>>>> |---Pre Combiner Local Rearrange[tuple]{Unknown} - scope-24 > >>>>>>>> | > >>>>>>>> |---longDesc: > >>>>>>>> Load(/user/sguo/h2o/group_filtered_chunk:LiAvroStorage) - > >>>>>>>> scope-0-------- > >>>>>>>> Combine Plan > >>>>>>>> grpall: Local Rearrange[tuple]{chararray}(false) - scope-26 > >>>>>>>> | | > >>>>>>>> | Project[chararray][0] - scope-27 > >>>>>>>> | > >>>>>>>> |---cnt: New For Each(false,false)[bag] - scope-15 > >>>>>>>> | | > >>>>>>>> | Project[chararray][0] - scope-16 > >>>>>>>> | | > >>>>>>>> | POUserFunc(org.apache.pig.builtin.COUNT$Intermediate)[tuple] > - > >>>>>>>> scope-17 > >>>>>>>> | | > >>>>>>>> | |---Project[bag][1] - scope-18 > >>>>>>>> | > >>>>>>>> |---POCombinerPackage[tuple]{chararray} - scope-20-------- > >>>>>>>> Reduce Plan > >>>>>>>> cnt: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-9 > >>>>>>>> | > >>>>>>>> |---cnt: New For Each(false)[bag] - scope-8 > >>>>>>>> | | > >>>>>>>> | POUserFunc(org.apache.pig.builtin.COUNT$Final)[long] - > scope-6 > >>>>>>>> | | > >>>>>>>> | |---Project[bag][1] - scope-19 > >>>>>>>> | > >>>>>>>> |---POCombinerPackage[tuple]{chararray} - scope-28-------- > >>>>>>>> Global sort: false > >>>>>>>> ---------------- > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> On Tue, Jul 3, 2012 at 9:56 AM, Jonathan Coveney > >>>>>>>> <jcove...@gmail.com <mailto:jcove...@gmail.com>> > >>>>>>> wrote: > >>>>>>>> > >>>>>>>>> instead of doing "dump relation," do "explain relation" (then run > >>>>>>>>> identically) and paste the output here. It will show whether the > >>>>>>> combiner > >>>>>>>>> is being used, > >>>>>>>>> > >>>>>>>>> 2012/7/3 Ruslan Al-Fakikh <ruslan.al-fak...@jalent.ru > >>>>>>>>> <mailto:ruslan.al-fak...@jalent.ru>> > >>>>>>>>> > >>>>>>>>>> Hi, > >>>>>>>>>> > >>>>>>>>>> As it was said, COUNT is algebraic and should be fast, because > it > >>>>>>>>>> forces combiner. You should make sure that combiner is really > used > >>>>>>>>>> here. It can be disabled in some situations. I've encountered > such > >>>>>>>>>> situations many times when a job is tooo heavy in case no > >>>>>>>>>> combiner is > >>>>>>>>>> applied. > >>>>>>>>>> > >>>>>>>>>> Ruslan > >>>>>>>>>> > >>>>>>>>>> On Tue, Jul 3, 2012 at 1:35 AM, Subir S > >>>>>>>>>> <subir.sasiku...@gmail.com <mailto:subir.sasiku...@gmail.com>> > >>>>>>>>> wrote: > >>>>>>>>>>> Right!! > >>>>>>>>>>> > >>>>>>>>>>> Since it is mentioned that job is hanging, wild guess is it > must be > >>>>>>>>>>> 'group all'. How can that be confirmed? > >>>>>>>>>>> > >>>>>>>>>>> On 7/3/12, Jonathan Coveney <jcove...@gmail.com > >>>>>>>>>>> <mailto:jcove...@gmail.com>> wrote: > >>>>>>>>>>>> group all uses a single reducer, but COUNT is algebraic, and > >>>>>>>>>>>> as such, > >>>>>>>>>> will > >>>>>>>>>>>> use combiners, so it is generally quite fast. > >>>>>>>>>>>> > >>>>>>>>>>>> 2012/7/2 Subir S <subir.sasiku...@gmail.com > >>>>>>>>>>>> <mailto:subir.sasiku...@gmail.com>> > >>>>>>>>>>>> > >>>>>>>>>>>>> Group all - uses single reducer AFAIU. You can try to count > per > >>>>>>> group > >>>>>>>>>>>>> and sum may be. > >>>>>>>>>>>>> > >>>>>>>>>>>>> You may also try with COUNT_STAR to include NULL fields. > >>>>>>>>>>>>> > >>>>>>>>>>>>> On 7/3/12, Sheng Guo <enigma...@gmail.com > >>>>>>>>>>>>> <mailto:enigma...@gmail.com>> wrote: > >>>>>>>>>>>>>> Hi all, > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I used to use the following pig script to do the counting > of the > >>>>>>>>>>>>>> records. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> m_skill_group = group m_skills_filter by member_id; > >>>>>>>>>>>>>> grpd = group m_skill_group all; > >>>>>>>>>>>>>> cnt = foreach grpd generate COUNT(m_skill_group); > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> cnt_filter = limit cnt 10; > >>>>>>>>>>>>>> dump cnt_filter; > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> but sometimes, when the records get larger, it takes lots of > >>>>>>>>>>>>>> time > >>>>>>>>> and > >>>>>>>>>>>>> hang > >>>>>>>>>>>>>> up, and or die. > >>>>>>>>>>>>>> I thought counting should be simple enough, so what is the > >>>>>>>>>>>>>> best way > >>>>>>>>>> to > >>>>>>>>>>>>> do a > >>>>>>>>>>>>>> counting in pig? > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Thanks! > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Sheng > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> -- > >>>>>>>>>> Best Regards, > >>>>>>>>>> Ruslan Al-Fakikh > >>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>>>> > >>>>> > >>>> > >>> > >> > >> > > > >