It was actually zlib compression in Python. But you can certainly use any
other compression lib. Unfortunately, this is not a built-in feature in
Pig, but I agree that would be useful.


On Fri, Feb 7, 2014 at 11:45 AM, praveenesh kumar <praveen...@gmail.com>wrote:

> Hi Park,
>
> Your explanation makes perfect sense in my case. Thanks for explaining
> what is happening behind the scenes. I am wondering you used normal java
> compression/decompression or is there a UDF already available to do this
> stuff or some kind of property that we need to enable to say to PIG that
> compress bags before spilling.
>
> Regards
> Prav
>
>
> On Fri, Feb 7, 2014 at 4:37 PM, Cheolsoo Park <piaozhe...@gmail.com>wrote:
>
>> Hi Prav,
>>
>> You're thinking correctly, and it's true that Pig bags are spillable.
>>
>> However, spilling is no magic, meaning you can still run into OOM with
>> huge
>> bags like you have here. Pig runs Spillable Memory Manager (SMM) in a
>> separate thread. When spilling is triggered, SMM locks bags that it's
>> trying to spill to disk. After the spilling is finished, GC frees up
>> memory. The problem is that it's possible that more bags are loaded into
>> memory while the spilling is in progress. Now JVM triggers GC, but GC
>> cannot free up memory because SMM is locking the bags, resulting in OOM
>> error. This happens quite often.
>>
>> Sounds like you do group-by to reduce the number of rows before join and
>> don't immediately run any aggregation function on the grouped bags. If
>> that's the case, can you compress those bags? For eg, you could add a
>> foreach after group-by and run a UDF that compresses a bag and returns it
>> as bytearray. From there, you're moving around small blobs rather than big
>> bags. Of course, you will need to decompress them when you restore data
>> out
>> of those bags at some point. This trick saved me several times in the past
>> particularly when I dealt with bags of large chararrays.
>>
>> Just a thought. Hope this is helpful.
>>
>> Thanks,
>> Cheolsoo
>>
>>
>> On Fri, Feb 7, 2014 at 7:37 AM, praveenesh kumar <praveen...@gmail.com
>> >wrote:
>>
>> > Thanks Park for sharing the above configs
>> >
>> > But I am wondering if the above config changes would make any huge
>> > difference in my case.
>> > As per my logs, I am very worried about this line -
>> >
>> >  INFO org.apache.hadoop.mapred.MapTask: Record too large for in-memory
>> buffer: 644245358 bytes
>> >
>> > If I am understanding it properly, my 1 record is very large to fit
>> into the memory, which is causing the issue.
>> >
>> > Any of the above changes wouldn't make any huge impact, please correct
>> me if I am taking it totally wrong.
>> >
>> >  - Adding hadoop user group here as well, to throw some valuable inputs
>> to understand the above question.
>> >
>> >
>> > Since I am doing a join on a grouped bag, do you think that might be
>> the case ?
>> >
>> > But if that is the issue, as far as I understand Bags in Pig are
>> spillable, it shouldn't have given this issue.
>> >
>> > I can't get rid of group by, Grouping by first should idealing improve
>> my join. But if this is the root cause, if I am understanding it correctly,
>> >
>> > do you think I should get rid of group-by.
>> >
>> > But my question in that case would be what would happen if I do group
>> by later after join, if will result in much bigger bag (because it would
>> have more records after join)
>> >
>> > Am I thinking here correctly ?
>> >
>> > Regards
>> >
>> > Prav
>> >
>> >
>> >
>> > On Fri, Feb 7, 2014 at 3:11 AM, Cheolsoo Park <piaozhe...@gmail.com
>> >wrote:
>> >
>> >> Looks like you're running out of space in MapOutputBuffer. Two
>> >> suggestions-
>> >>
>> >> 1)
>> >> You said that io.sort.mb is already set to 768 MB, but did you try to
>> >> lower
>> >> io.sort.spill.percent in order to spill earlier and more often?
>> >>
>> >> Page 12-
>> >>
>> >>
>> http://www.slideshare.net/Hadoop_Summit/optimizing-mapreduce-job-performance
>> >>
>> >> 2)
>> >> Can't you increase the parallelism of mappers so that each mapper has
>> to
>> >> handle a smaller size of data? Pig determines the number of mappers by
>> >> total input size / pig.maxCombinedSplitSize (128MB by default). So you
>> can
>> >> try to lower pig.maxCombinedSplitSize.
>> >>
>> >> But I admit Pig internal data types are not memory-efficient, and that
>> is
>> >> an optimization opportunity. Contribute!
>> >>
>> >>
>> >>
>> >> On Thu, Feb 6, 2014 at 2:54 PM, praveenesh kumar <praveen...@gmail.com
>> >> >wrote:
>> >>
>> >> > Its a normal join. I can't use replicated join, as the data is very
>> >> large.
>> >> >
>> >> > Regards
>> >> > Prav
>> >> >
>> >> >
>> >> > On Thu, Feb 6, 2014 at 7:52 PM, abhishek <abhishek.dod...@gmail.com>
>> >> > wrote:
>> >> >
>> >> > > Hi Praveenesh,
>> >> > >
>> >> > > Did you use "replicated join" in your pig script or is it a regular
>> >> join
>> >> > ??
>> >> > >
>> >> > > Regards
>> >> > > Abhishek
>> >> > >
>> >> > > Sent from my iPhone
>> >> > >
>> >> > > > On Feb 6, 2014, at 11:25 AM, praveenesh kumar <
>> praveen...@gmail.com
>> >> >
>> >> > > wrote:
>> >> > > >
>> >> > > > Hi all,
>> >> > > >
>> >> > > > I am running a Pig Script which is running fine for small data.
>> But
>> >> > when
>> >> > > I
>> >> > > > scale the data, I am getting the following error at my map stage.
>> >> > > > Please refer to the map logs as below.
>> >> > > >
>> >> > > > My Pig script is doing a group by first, followed by a join on
>> the
>> >> > > grouped
>> >> > > > data.
>> >> > > >
>> >> > > >
>> >> > > > Any clues to understand where I should look at or how shall I
>> deal
>> >> with
>> >> > > > this situation. I don't want to just go by just increasing the
>> heap
>> >> > > space.
>> >> > > > My map jvm heap space is already 3 GB with io.sort.mb = 768 MB.
>> >> > > >
>> >> > > > 2014-02-06 19:15:12,243 WARN
>> >> org.apache.hadoop.util.NativeCodeLoader:
>> >> > > > Unable to load native-hadoop library for your platform... using
>> >> > > > builtin-java classes where applicable 2014-02-06 19:15:15,025
>> INFO
>> >> > > > org.apache.hadoop.util.ProcessTree: setsid exited with exit code
>> 0
>> >> > > > 2014-02-06 19:15:15,123 INFO org.apache.hadoop.mapred.Task: Using
>> >> > > > ResourceCalculatorPlugin :
>> >> > > >
>> >> org.apache.hadoop.util.LinuxResourceCalculatorPlugin@2bd9e2822014-02-06
>> >> > > > 19:15:15,546 INFO org.apache.hadoop.mapred.MapTask: io.sort.mb =
>> 768
>> >> > > > 2014-02-06 19:15:19,846 INFO org.apache.hadoop.mapred.MapTask:
>> data
>> >> > > buffer
>> >> > > > = 612032832/644245088 2014-02-06 19:15:19,846 INFO
>> >> > > > org.apache.hadoop.mapred.MapTask: record buffer =
>> 9563013/10066330
>> >> > > > 2014-02-06 19:15:20,037 INFO
>> >> org.apache.hadoop.io.compress.CodecPool:
>> >> > Got
>> >> > > > brand-new decompressor 2014-02-06 19:15:21,083 INFO
>> >> > > >
>> >> > >
>> >> >
>> >>
>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigRecordReader:
>> >> > > > Created input record counter: Input records from _1_tmp1327641329
>> >> > > > 2014-02-06 19:15:52,894 INFO org.apache.hadoop.mapred.MapTask:
>> >> Spilling
>> >> > > map
>> >> > > > output: buffer full= true 2014-02-06 19:15:52,895 INFO
>> >> > > > org.apache.hadoop.mapred.MapTask: bufstart = 0; bufend =
>> 611949600;
>> >> > > bufvoid
>> >> > > > = 644245088 2014-02-06 19:15:52,895 INFO
>> >> > > org.apache.hadoop.mapred.MapTask:
>> >> > > > kvstart = 0; kvend = 576; length = 10066330 2014-02-06
>> 19:16:06,182
>> >> > INFO
>> >> > > > org.apache.hadoop.mapred.MapTask: Finished spill 0 2014-02-06
>> >> > > 19:16:16,169
>> >> > > > INFO org.apache.pig.impl.util.SpillableMemoryManager: first
>> memory
>> >> > > handler
>> >> > > > call - Collection threshold init = 328728576(321024K) used =
>> >> > > > 1175055104(1147514K) committed = 1770848256(1729344K) max =
>> >> > > > 2097152000(2048000K) 2014-02-06 19:16:20,446 INFO
>> >> > > > org.apache.pig.impl.util.SpillableMemoryManager: Spilled an
>> >> estimate of
>> >> > > > 308540402 bytes from 1 objects. init = 328728576(321024K) used =
>> >> > > > 1175055104(1147514K) committed = 1770848256(1729344K) max =
>> >> > > > 2097152000(2048000K) 2014-02-06 19:17:22,246 INFO
>> >> > > > org.apache.pig.impl.util.SpillableMemoryManager: first memory
>> >> handler
>> >> > > call-
>> >> > > > Usage threshold init = 328728576(321024K) used =
>> >> 1768466512(1727018K)
>> >> > > > committed = 1770848256(1729344K) max = 2097152000(2048000K)
>> >> 2014-02-06
>> >> > > > 19:17:35,597 INFO
>> org.apache.pig.impl.util.SpillableMemoryManager:
>> >> > > Spilled
>> >> > > > an estimate of 1073462600 bytes from 1 objects. init =
>> >> > 328728576(321024K)
>> >> > > > used = 1768466512(1727018K) committed = 1770848256(1729344K) max
>> =
>> >> > > > 2097152000(2048000K) 2014-02-06 19:18:01,276 INFO
>> >> > > > org.apache.hadoop.mapred.MapTask: Spilling map output: buffer
>> full=
>> >> > true
>> >> > > > 2014-02-06 19:18:01,288 INFO org.apache.hadoop.mapred.MapTask:
>> >> > bufstart =
>> >> > > > 611949600; bufend = 52332788; bufvoid = 644245088 2014-02-06
>> >> > 19:18:01,288
>> >> > > > INFO org.apache.hadoop.mapred.MapTask: kvstart = 576; kvend =
>> 777;
>> >> > > length =
>> >> > > > 10066330 2014-02-06 19:18:03,377 INFO
>> >> org.apache.hadoop.mapred.MapTask:
>> >> > > > Finished spill 1 2014-02-06 19:18:05,494 INFO
>> >> > > > org.apache.hadoop.mapred.MapTask: Record too large for in-memory
>> >> > buffer:
>> >> > > > 644246693 bytes 2014-02-06 19:18:36,008 INFO
>> >> > > > org.apache.pig.impl.util.SpillableMemoryManager: Spilled an
>> >> estimate of
>> >> > > > 306271368 bytes from 1 objects. init = 328728576(321024K) used =
>> >> > > > 1449267128(1415299K) committed = 2097152000(2048000K) max =
>> >> > > > 2097152000(2048000K) 2014-02-06 19:18:44,448 INFO
>> >> > > > org.apache.hadoop.mapred.TaskLogsTruncater: Initializing logs'
>> >> > truncater
>> >> > > > with mapRetainSize=-1 and reduceRetainSize=-1 2014-02-06
>> >> 19:18:44,780
>> >> > > FATAL
>> >> > > > org.apache.hadoop.mapred.Child: Error running child :
>> >> > > > java.lang.OutOfMemoryError: Java heap space at
>> >> > > > java.util.Arrays.copyOf(Arrays.java:2786) at
>> >> > > >
>> java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:94)
>> >> at
>> >> > > > java.io.DataOutputStream.write(DataOutputStream.java:90) at
>> >> > > > java.io.DataOutputStream.writeUTF(DataOutputStream.java:384) at
>> >> > > > java.io.DataOutputStream.writeUTF(DataOutputStream.java:306) at
>> >> > > >
>> >> org.apache.pig.data.BinInterSedes.writeDatum(BinInterSedes.java:454) at
>> >> > > >
>> >> org.apache.pig.data.BinInterSedes.writeTuple(BinInterSedes.java:542) at
>> >> > > >
>> org.apache.pig.data.BinInterSedes.writeBag(BinInterSedes.java:523)
>> >> at
>> >> > > >
>> >> org.apache.pig.data.BinInterSedes.writeDatum(BinInterSedes.java:361) at
>> >> > > >
>> >> org.apache.pig.data.BinInterSedes.writeTuple(BinInterSedes.java:542) at
>> >> > > >
>> >> org.apache.pig.data.BinInterSedes.writeDatum(BinInterSedes.java:357) at
>> >> > > > org.apache.pig.data.BinSedesTuple.write(BinSedesTuple.java:57) at
>> >> > > >
>> >> > >
>> >> >
>> >>
>> org.apache.pig.impl.io.PigNullableWritable.write(PigNullableWritable.java:123)
>> >> > > > at
>> >> > > >
>> >> > >
>> >> >
>> >>
>> org.apache.hadoop.io.serializer.WritableSerialization$WritableSerializer.serialize(WritableSerialization.java:90)
>> >> > > > at
>> >> > > >
>> >> > >
>> >> >
>> >>
>> org.apache.hadoop.io.serializer.WritableSerialization$WritableSerializer.serialize(WritableSerialization.java:77)
>> >> > > > at org.apache.hadoop.mapred.IFile$Writer.append(IFile.java:179)
>> at
>> >> > > >
>> >> > >
>> >> >
>> >>
>> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.spillSingleRecord(MapTask.java:1501)
>> >> > > > at
>> >> > > >
>> >> > >
>> >> >
>> >>
>> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1091)
>> >> > > > at
>> >> > > >
>> >> > >
>> >> >
>> >>
>> org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:691)
>> >> > > > at
>> >> > > >
>> >> > >
>> >> >
>> >>
>> org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
>> >> > > > at
>> >> > > >
>> >> > >
>> >> >
>> >>
>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Map.collect(PigGenericMapReduce.java:128)
>> >> > > > at
>> >> > > >
>> >> > >
>> >> >
>> >>
>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase.runPipeline(PigGenericMapBase.java:269)
>> >> > > > at
>> >> > > >
>> >> > >
>> >> >
>> >>
>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase.map(PigGenericMapBase.java:262)
>> >> > > > at
>> >> > > >
>> >> > >
>> >> >
>> >>
>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase.map(PigGenericMapBase.java:64)
>> >> > > > at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144) at
>> >> > > > org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
>> at
>> >> > > > org.apache.hadoop.mapred.MapTask.run(MapTask.java:370) at
>> >> > > > org.apache.hadoop.mapred.Child$4.run(Child.java:255) at
>> >> > > > java.security.AccessController.doPrivileged(Native Method) at
>> >> > > > javax.security.auth.Subject.doAs(Subject.java:396) at
>> >> > > >
>> >> > >
>> >> >
>> >>
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
>> >> > > > at org.apache.hadoop.mapred.Child.main(Child.java:249)
>> >> > >
>> >> >
>> >>
>> >
>> >
>>
>
>

Reply via email to