Hi,
   I'm new to Pig and i wrote a  pig UDF to generate a bag of tuples, it
seems to be correct, I've tested and it works perfectly.
What I've missed ?
Thanks

Davide B.

When I apply it to my data (220 million of rows) using this scripts

*1 REGISTER '/mnt5/pig/udf_date.jar';*
*2 REGISTER
'/opt/cloudera/parcels/CDH-4.5.0-1.cdh4.5.0.p0.30/lib/pig/piggybank.jar';*
*3 jlsraw = load './data/mydata.dat' USING PigStorage(' ') as
(ts:chararray, user:chararray, d1:chararray,
item:chararray,d2:chararray,duration:long, d3:chararray);*
*4 jlsfilter = FILTER jlsraw BY $5/1000 >= 120;*
*5 jlsfilters = ORDER jlsfilter by user;*
*6 jlsslots = FOREACH jlsfilters GENERATE $1 as user, $3 as item,
com.moviri.pig.udf.GenerateProfileBag(ts,(chararray)$5,(chararray)30) as t;*
*7 jlsunroll = FOREACH jlsslots GENERATE $0 as user, $1 as item, FLATTEN(t)
as (slot:chararray, duration:long);*
*8 jlsgroup = GROUP jlsunroll BY (user,item,slot);*
*9  jlsfinal = FOREACH jlsgroup GENERATE group.user, group.item,
group.slot, SUM(jlsunroll.duration) as duration;*
*10 jls = ORDER jlsfinal BY user;*
*11 STORE jls INTO '/user/hdfs/mydata_10M.dat';*

I get that the reducer step that processes  detailed locations: M:
jlsfilters[3,13] C:  R: jlsslots[4,11],jlsunroll[5,12] produces this logs
and my job fails to

I get this log and my job uses lots of memory and is killed by the
framework because it fails to notify its state with this error

*=== Killing log ===*
Task attempt_201401261145_0013_r_000000_0 failed to report status for 600
seconds. Killing!

*=== Reducer log ===*

*2014-01-26 14:13:09,138 INFO org.apache.hadoop.mapred.ReduceTask:
GetMapEventsThread exiting
2014-01-26 14:13:09,138 INFO org.apache.hadoop.mapred.ReduceTask:
getMapsEventsThread joined.
2014-01-26 14:13:09,139 INFO org.apache.hadoop.mapred.ReduceTask:
Closed ram manager
2014-01-26 14:13:09,139 INFO org.apache.hadoop.mapred.ReduceTask:
Interleaved on-disk merge complete: 0 files left.
2014-01-26 14:13:09,139 INFO org.apache.hadoop.mapred.ReduceTask:
In-memory merge complete: 5 files left.
2014-01-26 14:13:09,159 INFO org.apache.hadoop.mapred.Merger: Merging
5 sorted segments
2014-01-26 14:13:09,159 INFO org.apache.hadoop.mapred.Merger: Down to
the last merge-pass, with 5 segments left of total size: 480601243
bytes
2014-01-26 14:13:09,162 INFO org.apache.hadoop.io.compress.CodecPool:
Got brand-new compressor [.snappy]
2014-01-26 14:13:15,327 INFO org.apache.hadoop.mapred.ReduceTask:
Merged 5 segments, 480601243 bytes to disk to satisfy reduce memory
limit
2014-01-26 14:13:15,328 INFO org.apache.hadoop.mapred.ReduceTask:
Merging 1 files, 142586009 bytes from disk
2014-01-26 14:13:15,329 INFO org.apache.hadoop.mapred.ReduceTask:
Merging 0 segments, 0 bytes from memory into reduce
2014-01-26 14:13:15,329 INFO org.apache.hadoop.mapred.Merger: Merging
1 sorted segments
2014-01-26 14:13:15,334 INFO org.apache.hadoop.mapred.Merger: Down to
the last merge-pass, with 1 segments left of total size: 480601235
bytes
2014-01-26 14:13:15,641 INFO org.apache.pig.data.SchemaTupleBackend:
Key [pig.schematuple] was not set... will not generate code.
2014-01-26 14:13:15,739 INFO
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce$Reduce:
Aliases being processed per job phase (AliasName[line,offset]): M:
jlsfilters[3,13] C:  R: jlsslots[4,11],jlsunroll[5,12]
2014-01-26 14:15:59,265 INFO
org.apache.pig.impl.util.SpillableMemoryManager: first memory handler
call - Collection threshold init = 162594816(158784K) used =
5079178808(4960135K) committed = 6274088960(6127040K) max =
8388608000(8192000K)
2014-01-26 14:16:33,907 INFO
org.apache.pig.impl.util.SpillableMemoryManager: Spilled an estimate
of 4959196460 bytes from 1 objects. init = 162594816(158784K) used =
5079178808(4960135K) committed = 6274088960(6127040K) max =
8388608000(8192000K)
2014-01-26 14:18:10,567 INFO
org.apache.pig.impl.util.SpillableMemoryManager: first memory handler
call- Usage threshold init = 162594816(158784K) used =
5878913448(5741126K) committed = 6274088960(6127040K) max =
8388608000(8192000K)
2014-01-26 14:19:16,325 INFO
org.apache.pig.impl.util.SpillableMemoryManager: Spilled an estimate
of 6492776420 bytes from 1 objects. init = 162594816(158784K) used =
5878913448(5741126K) committed = 6274088960(6127040K) max =
8388608000(8192000K)*

*==== MY UDF CLASS ====*

public class GenerateProfileBag extends EvalFunc<DataBag> {
private static final String PROFILE_SEPARATOR = "@";

private BagFactory instance = BagFactory.getInstance();
private TupleFactory tupleInstance = TupleFactory.getInstance();
 @Override
public DataBag exec(Tuple input) throws IOException {
 if (input == null || input.size() < 3) {
return null;
}

DataBag res = instance.newDefaultBag();

Tuple datetime = tupleInstance.newTuple();
 datetime.append(input.get(0));
DateTime startDT = ISOHelper.parseDateTime(datetime);

 long duration = Long.parseLong((String) input.get(1));
DateTime endDT = startDT.plus(duration);

 int minutes = Integer.parseInt((String)input.get(2));

int dowStart = getDayOfWeek(startDT);
 int dowEnd = getDayOfWeek(endDT);

int slotStart = getMinSlot(startDT, minutes);
int slotEnd = getMinSlot(endDT, minutes);

Tuple t =  null;
if ( dowStart == dowEnd ) { // Process data in the same day of week
 if ( slotEnd == slotStart ) {
t =  tupleInstance.newTuple();
t.append(dowStart + PROFILE_SEPARATOR + slotStart);
 t.append((endDT.getMillis() - startDT.getMillis()) / 1000);
res.add(t);
} else if ( slotStart < slotEnd ) {
 t =  tupleInstance.newTuple();
t.append(dowStart + PROFILE_SEPARATOR + slotStart);
t.append((slotStart + minutes - startDT.minuteOfDay().get()));
 res.add(t);
slotStart+=minutes;
while ( slotStart < slotEnd ) {
 t =  tupleInstance.newTuple();
t.append(dowStart + PROFILE_SEPARATOR + slotStart);
t.append(minutes);
 res.add(t);
slotStart += minutes;
}
 t =  tupleInstance.newTuple();
t.append(dowStart + PROFILE_SEPARATOR + slotEnd);
t.append((endDT.minuteOfDay().get() - slotEnd));
 res.add(t);
}
} else { // Process data that span over two or more periods
 t = tupleInstance.newTuple();
t.append(dowStart + PROFILE_SEPARATOR + slotStart);
t.append((slotStart + minutes - startDT.minuteOfDay().get()));
 res.add(t);
slotStart += minutes;
if ( slotStart == 1440 ) {
 slotStart = 0;
dowStart += 1;
dowStart = dowStart % 8;
 }
while ( !(dowStart == dowEnd && slotStart == slotEnd) ) { // Process till
the right day and slot
 if ( slotStart == 1440 ) {
slotStart = 0;
dowStart += 1;
 dowStart = dowStart % 8;
}
t =  tupleInstance.newTuple();
 t.append(dowStart + PROFILE_SEPARATOR + slotStart);
t.append(minutes);
res.add(t);
 slotStart += minutes;
}
t =  tupleInstance.newTuple();
 t.append(dowStart + PROFILE_SEPARATOR + slotEnd);
t.append((endDT.minuteOfDay().get() - slotEnd));
 res.add(t);
}
return res;
}

private int getMinSlot(DateTime dt, int minutes) {
int slot = dt.minuteOfDay().get();
 slot = (slot / minutes) * minutes;
return slot;
}

private int getDayOfWeek(DateTime dt) {
int dow = dt.getDayOfWeek();
if ( dow == 7 ) dow = 1;
 else dow = dow + 1;
return dow;
}

 // It is required if Tuple or Bag is returned
@Override
public Schema outputSchema(Schema input) {
 List<FuncSpec> funcList = new ArrayList<FuncSpec>();
Schema s = new Schema();
s.add(new Schema.FieldSchema(null, DataType.BAG));
 funcList.add(new FuncSpec(this.getClass().getName(), s));
return s;
}

// It is used if basic type is returned and to identify overloadings
@Override
public List<FuncSpec> getArgToFuncMapping() throws FrontendException {
 List<FuncSpec> funcList = new ArrayList<FuncSpec>();
Schema s = new Schema();
s.add(new Schema.FieldSchema(null, DataType.CHARARRAY));
 s.add(new Schema.FieldSchema(null, DataType.CHARARRAY));
s.add(new Schema.FieldSchema(null, DataType.CHARARRAY));
 funcList.add(new FuncSpec(this.getClass().getName(), s));
return funcList;
}
}
----------------------------------------------------------------------------------------
Davide Brambilla
ContentWise R&D
Moviri
davide.brambi...@moviri.com
phone: +39 02 49517001 mobile: 345 71 13 800
Moviri S.p.A. - Via Schiaffino, 11 - 20158 Milano (MI) – ITALY

Reply via email to