[
https://issues.apache.org/jira/browse/PIG-1285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12845458#action_12845458
]
Pradeep Kamath commented on PIG-1285:
-------------------------------------
Couple of comments:
* I think instead of the code below the implementation of write should be
inlined into SingleTupleBag.write() (I guess DefaultDataBag.write() and
SingleTupleBag.write() could call a common method to implement write()).
{noformat}
+ DataBag bag = bagFactory.newDefaultBag();
+ bag.addAll(this);
+ bag.write(out)
{noformat}
The reason is that bagFactory.newDefaultBag() registers the bag with the
SpillableMemoryManager which inturn puts a weak reference to the bag on a
Linked list - in the past we have seen this list grow in size and cause memory
issue and was one of the main motivations for creating SingleTupleBag.
* There is an implementation for write() but not read() - reading through the
code I guess this is because during deserialization SingleTupleBag.read() will
not be called but DefaultDataBag.read() would be called. I am wondering if
leaving the SingleTupleBag.read() as-is is confusing since it throws an
exception with the message - "SingleTupleBag should never be serialized or
deserialized."
> Allow SingleTupleBag to be serialized
> -------------------------------------
>
> Key: PIG-1285
> URL: https://issues.apache.org/jira/browse/PIG-1285
> Project: Pig
> Issue Type: Improvement
> Reporter: Dmitriy V. Ryaboy
> Assignee: Dmitriy V. Ryaboy
> Fix For: 0.7.0
>
> Attachments: PIG-1285.patch
>
>
> Currently, Pig uses a SingleTupleBag for efficiency when a full-blown
> spillable bag implementation is not needed in the Combiner optimization.
> Unfortunately this can create problems. The below Initial.exec() code fails
> at run-time with the message that a SingleTupleBag cannot be serialized:
> {code}
> @Override
> public Tuple exec(Tuple in) throws IOException {
> // single record. just copy.
> if (in == null) return null;
> try {
> Tuple resTuple = tupleFactory_.newTuple(in.size());
> for (int i=0; i< in.size(); i++) {
> resTuple.set(i, in.get(i));
> }
> return resTuple;
> } catch (IOException e) {
> log.warn(e);
> return null;
> }
> }
> {code}
> The code below can fix the problem in the UDF, but it seems like something
> that should be handled transparently, not requiring UDF authors to know about
> SingleTupleBags.
> {code}
> @Override
> public Tuple exec(Tuple in) throws IOException {
> // single record. just copy.
> if (in == null) return null;
>
> /*
> * Unfortunately SingleTupleBags are not serializable. We cache whether
> a given index contains a bag
> * in the map below, and copy all bags into DefaultBags before
> returning to avoid serialization exceptions.
> */
> Map<Integer, Boolean> isBagAtIndex = Maps.newHashMap();
>
> try {
> Tuple resTuple = tupleFactory_.newTuple(in.size());
> for (int i=0; i< in.size(); i++) {
> Object obj = in.get(i);
> if (!isBagAtIndex.containsKey(i)) {
> isBagAtIndex.put(i, obj instanceof SingleTupleBag);
> }
> if (isBagAtIndex.get(i)) {
> DataBag newBag = bagFactory_.newDefaultBag();
> newBag.addAll((DataBag)obj);
> obj = newBag;
> }
> resTuple.set(i, obj);
> }
> return resTuple;
> } catch (IOException e) {
> log.warn(e);
> return null;
> }
> }
> {code}
--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.