Allow SingleTupleBag to be serialized
-------------------------------------
Key: PIG-1285
URL: https://issues.apache.org/jira/browse/PIG-1285
Project: Pig
Issue Type: Improvement
Reporter: Dmitriy V. Ryaboy
Currently, Pig uses a SingleTupleBag for efficiency when a full-blown spillable
bag implementation is not needed in the Combiner optimization.
Unfortunately this can create problems. The below Initial.exec() code fails at
run-time with the message that a SingleTupleBag cannot be serialized:
{code}
@Override
public Tuple exec(Tuple in) throws IOException {
// single record. just copy.
if (in == null) return null;
try {
Tuple resTuple = tupleFactory_.newTuple(in.size());
for (int i=0; i< in.size(); i++) {
resTuple.set(i, in.get(i));
}
return resTuple;
} catch (IOException e) {
log.warn(e);
return null;
}
}
{code}
The code below can fix the problem in the UDF, but it seems like something that
should be handled transparently, not requiring UDF authors to know about
SingleTupleBags.
{code}
@Override
public Tuple exec(Tuple in) throws IOException {
// single record. just copy.
if (in == null) return null;
/*
* Unfortunately SingleTupleBags are not serializable. We cache whether a
given index contains a bag
* in the map below, and copy all bags into DefaultBags before returning
to avoid serialization exceptions.
*/
Map<Integer, Boolean> isBagAtIndex = Maps.newHashMap();
try {
Tuple resTuple = tupleFactory_.newTuple(in.size());
for (int i=0; i< in.size(); i++) {
Object obj = in.get(i);
if (!isBagAtIndex.containsKey(i)) {
isBagAtIndex.put(i, obj instanceof SingleTupleBag);
}
if (isBagAtIndex.get(i)) {
DataBag newBag = bagFactory_.newDefaultBag();
newBag.addAll((DataBag)obj);
obj = newBag;
}
resTuple.set(i, obj);
}
return resTuple;
} catch (IOException e) {
log.warn(e);
return null;
}
}
{code}
--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.