Hi, I'm new to Pig and i wrote a pig UDF to generate a bag of tuples, it seems to be correct, I've tested and it works perfectly. What I've missed ? Thanks
Davide B. When I apply it to my data (220 million of rows) using this scripts *1 REGISTER '/mnt5/pig/udf_date.jar';* *2 REGISTER '/opt/cloudera/parcels/CDH-4.5.0-1.cdh4.5.0.p0.30/lib/pig/piggybank.jar';* *3 jlsraw = load './data/mydata.dat' USING PigStorage(' ') as (ts:chararray, user:chararray, d1:chararray, item:chararray,d2:chararray,duration:long, d3:chararray);* *4 jlsfilter = FILTER jlsraw BY $5/1000 >= 120;* *5 jlsfilters = ORDER jlsfilter by user;* *6 jlsslots = FOREACH jlsfilters GENERATE $1 as user, $3 as item, com.moviri.pig.udf.GenerateProfileBag(ts,(chararray)$5,(chararray)30) as t;* *7 jlsunroll = FOREACH jlsslots GENERATE $0 as user, $1 as item, FLATTEN(t) as (slot:chararray, duration:long);* *8 jlsgroup = GROUP jlsunroll BY (user,item,slot);* *9 jlsfinal = FOREACH jlsgroup GENERATE group.user, group.item, group.slot, SUM(jlsunroll.duration) as duration;* *10 jls = ORDER jlsfinal BY user;* *11 STORE jls INTO '/user/hdfs/mydata_10M.dat';* I get that the reducer step that processes detailed locations: M: jlsfilters[3,13] C: R: jlsslots[4,11],jlsunroll[5,12] produces this logs and my job fails to I get this log and my job uses lots of memory and is killed by the framework because it fails to notify its state with this error *=== Killing log ===* Task attempt_201401261145_0013_r_000000_0 failed to report status for 600 seconds. Killing! *=== Reducer log ===* *2014-01-26 14:13:09,138 INFO org.apache.hadoop.mapred.ReduceTask: GetMapEventsThread exiting 2014-01-26 14:13:09,138 INFO org.apache.hadoop.mapred.ReduceTask: getMapsEventsThread joined. 2014-01-26 14:13:09,139 INFO org.apache.hadoop.mapred.ReduceTask: Closed ram manager 2014-01-26 14:13:09,139 INFO org.apache.hadoop.mapred.ReduceTask: Interleaved on-disk merge complete: 0 files left. 2014-01-26 14:13:09,139 INFO org.apache.hadoop.mapred.ReduceTask: In-memory merge complete: 5 files left. 2014-01-26 14:13:09,159 INFO org.apache.hadoop.mapred.Merger: Merging 5 sorted segments 2014-01-26 14:13:09,159 INFO org.apache.hadoop.mapred.Merger: Down to the last merge-pass, with 5 segments left of total size: 480601243 bytes 2014-01-26 14:13:09,162 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new compressor [.snappy] 2014-01-26 14:13:15,327 INFO org.apache.hadoop.mapred.ReduceTask: Merged 5 segments, 480601243 bytes to disk to satisfy reduce memory limit 2014-01-26 14:13:15,328 INFO org.apache.hadoop.mapred.ReduceTask: Merging 1 files, 142586009 bytes from disk 2014-01-26 14:13:15,329 INFO org.apache.hadoop.mapred.ReduceTask: Merging 0 segments, 0 bytes from memory into reduce 2014-01-26 14:13:15,329 INFO org.apache.hadoop.mapred.Merger: Merging 1 sorted segments 2014-01-26 14:13:15,334 INFO org.apache.hadoop.mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 480601235 bytes 2014-01-26 14:13:15,641 INFO org.apache.pig.data.SchemaTupleBackend: Key [pig.schematuple] was not set... will not generate code. 2014-01-26 14:13:15,739 INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce$Reduce: Aliases being processed per job phase (AliasName[line,offset]): M: jlsfilters[3,13] C: R: jlsslots[4,11],jlsunroll[5,12] 2014-01-26 14:15:59,265 INFO org.apache.pig.impl.util.SpillableMemoryManager: first memory handler call - Collection threshold init = 162594816(158784K) used = 5079178808(4960135K) committed = 6274088960(6127040K) max = 8388608000(8192000K) 2014-01-26 14:16:33,907 INFO org.apache.pig.impl.util.SpillableMemoryManager: Spilled an estimate of 4959196460 bytes from 1 objects. init = 162594816(158784K) used = 5079178808(4960135K) committed = 6274088960(6127040K) max = 8388608000(8192000K) 2014-01-26 14:18:10,567 INFO org.apache.pig.impl.util.SpillableMemoryManager: first memory handler call- Usage threshold init = 162594816(158784K) used = 5878913448(5741126K) committed = 6274088960(6127040K) max = 8388608000(8192000K) 2014-01-26 14:19:16,325 INFO org.apache.pig.impl.util.SpillableMemoryManager: Spilled an estimate of 6492776420 bytes from 1 objects. init = 162594816(158784K) used = 5878913448(5741126K) committed = 6274088960(6127040K) max = 8388608000(8192000K)* *==== MY UDF CLASS ====* public class GenerateProfileBag extends EvalFunc<DataBag> { private static final String PROFILE_SEPARATOR = "@"; private BagFactory instance = BagFactory.getInstance(); private TupleFactory tupleInstance = TupleFactory.getInstance(); @Override public DataBag exec(Tuple input) throws IOException { if (input == null || input.size() < 3) { return null; } DataBag res = instance.newDefaultBag(); Tuple datetime = tupleInstance.newTuple(); datetime.append(input.get(0)); DateTime startDT = ISOHelper.parseDateTime(datetime); long duration = Long.parseLong((String) input.get(1)); DateTime endDT = startDT.plus(duration); int minutes = Integer.parseInt((String)input.get(2)); int dowStart = getDayOfWeek(startDT); int dowEnd = getDayOfWeek(endDT); int slotStart = getMinSlot(startDT, minutes); int slotEnd = getMinSlot(endDT, minutes); Tuple t = null; if ( dowStart == dowEnd ) { // Process data in the same day of week if ( slotEnd == slotStart ) { t = tupleInstance.newTuple(); t.append(dowStart + PROFILE_SEPARATOR + slotStart); t.append((endDT.getMillis() - startDT.getMillis()) / 1000); res.add(t); } else if ( slotStart < slotEnd ) { t = tupleInstance.newTuple(); t.append(dowStart + PROFILE_SEPARATOR + slotStart); t.append((slotStart + minutes - startDT.minuteOfDay().get())); res.add(t); slotStart+=minutes; while ( slotStart < slotEnd ) { t = tupleInstance.newTuple(); t.append(dowStart + PROFILE_SEPARATOR + slotStart); t.append(minutes); res.add(t); slotStart += minutes; } t = tupleInstance.newTuple(); t.append(dowStart + PROFILE_SEPARATOR + slotEnd); t.append((endDT.minuteOfDay().get() - slotEnd)); res.add(t); } } else { // Process data that span over two or more periods t = tupleInstance.newTuple(); t.append(dowStart + PROFILE_SEPARATOR + slotStart); t.append((slotStart + minutes - startDT.minuteOfDay().get())); res.add(t); slotStart += minutes; if ( slotStart == 1440 ) { slotStart = 0; dowStart += 1; dowStart = dowStart % 8; } while ( !(dowStart == dowEnd && slotStart == slotEnd) ) { // Process till the right day and slot if ( slotStart == 1440 ) { slotStart = 0; dowStart += 1; dowStart = dowStart % 8; } t = tupleInstance.newTuple(); t.append(dowStart + PROFILE_SEPARATOR + slotStart); t.append(minutes); res.add(t); slotStart += minutes; } t = tupleInstance.newTuple(); t.append(dowStart + PROFILE_SEPARATOR + slotEnd); t.append((endDT.minuteOfDay().get() - slotEnd)); res.add(t); } return res; } private int getMinSlot(DateTime dt, int minutes) { int slot = dt.minuteOfDay().get(); slot = (slot / minutes) * minutes; return slot; } private int getDayOfWeek(DateTime dt) { int dow = dt.getDayOfWeek(); if ( dow == 7 ) dow = 1; else dow = dow + 1; return dow; } // It is required if Tuple or Bag is returned @Override public Schema outputSchema(Schema input) { List<FuncSpec> funcList = new ArrayList<FuncSpec>(); Schema s = new Schema(); s.add(new Schema.FieldSchema(null, DataType.BAG)); funcList.add(new FuncSpec(this.getClass().getName(), s)); return s; } // It is used if basic type is returned and to identify overloadings @Override public List<FuncSpec> getArgToFuncMapping() throws FrontendException { List<FuncSpec> funcList = new ArrayList<FuncSpec>(); Schema s = new Schema(); s.add(new Schema.FieldSchema(null, DataType.CHARARRAY)); s.add(new Schema.FieldSchema(null, DataType.CHARARRAY)); s.add(new Schema.FieldSchema(null, DataType.CHARARRAY)); funcList.add(new FuncSpec(this.getClass().getName(), s)); return funcList; } } ---------------------------------------------------------------------------------------- Davide Brambilla ContentWise R&D Moviri davide.brambi...@moviri.com phone: +39 02 49517001 mobile: 345 71 13 800 Moviri S.p.A. - Via Schiaffino, 11 - 20158 Milano (MI) – ITALY