Rodrigo,

I tried following to reproduce the problem. In my UDF /*GroupTest*/, I was getting data related to 1 key at a time(see *LOG *below). Added UDF Code, Pig Code, Input data and Log output below. Let me know in case I have missed anything.

*UDF Code :* (Intentionally added sysout in UDF to check bag size in the log)
package com.pigtutorial.ch01;

import java.io.IOException;
import java.util.Iterator;

import org.apache.pig.EvalFunc;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;

public class GroupTest extends EvalFunc<DataBag> {

    @Override
    public DataBag exec(Tuple input) throws IOException {

        DataBag returnBag = BagFactory.getInstance().newDefaultBag();
        if (input == null || input.size() == 0 || input.get(0) == null)
            return null;
        try {
            DataBag bag = DataType.toBag(input.get(0));
/  System.out.println("Calling UDF with databag size : " + bag.size());/
            Iterator it = bag.iterator();
            while (it.hasNext()) {
                Tuple t = (Tuple) it.next();
/System.out.println(t);/
                returnBag.add(t);
            }
        } catch (Exception e) {
throw new IOException("Caught exception processing input row ", e);
        }
        return returnBag;
    }

}

*Pig Code :*
a =
    LOAD 'src/resources/data/input/data_input'
    USING PigStorage(',')
    AS (KEY:chararray, VAL1:chararray, VAL2:chararray);

b = GROUP a
    BY KEY
    PARALLEL 2;

c = FILTER b
    BY NOT IsEmpty(a);

d = FOREACH c
    GENERATE FLATTEN(com.pigtutorial.ch01.GroupTest(a));
STORE
    a
    INTO 'src/resources/data/actual_output/group_output'
    USING PigStorage(',');

*INPUT :*
key1,d1,d2
key2,d1,d3
key1,d1,d4
key1,d1,d5

*LOG :*
Calling UDF with databag size : 3
(key1,d1,d2)
(key1,d1,d4)
(key1,d1,d5)
Calling UDF with databag size : 1
(key2,d1,d3)

--
Thanks
Suraj Nayak

On Sunday 13 July 2014 04:36 AM, Rodrigo Ferreira wrote:
(answering again but now including the mailing list :P)

Thank you for your answer Suraj.

What you said is exactly what I expect, but I get something different.

Using your example (the specific data is not important here) I get in my UDF more than one key ordered. Here's a sample of the code of my UDF:


DataBag bag = DataType.toBag(input.get(0));

Iterator it = bag.iterator();

while (it.hasNext()) {
Tuple t = (Tuple)it.next();
//Here I print the attribute used as the grouping key
}


What I get in the output is:

key1
key1
key1
key2

The point is that I'm using test data that are not really big (less than 64MB). Anyhow, Pig shouldn't put these keys together in the same bag! Maybe this a kind of optimization that I should turn off.


2014-07-12 23:29 GMT+02:00 Suraj Nayak <[email protected] <mailto:[email protected]>>:

    Are you processing the bag in the UDF?

    Can you send sample records which is going in to UDF using dump
    command for alias C?

    If the data is(alias A)
    (key1,d1,d2)
    (key2,d1,d3)
    (key1,d1,d4)
    (key1,d1,d5)

    On grouping on 1st column the data should be grouped as below

    {(key1),{ (key1,d1,d2), (key1,d1,d4), (key1,d1,d5) }}
    {(key2),{ (key2,d1,d3) }}

    If you are providing the data A to UDF you should get all records
    with respect to same key in same bag.

    --
    Suraj Nayak



Reply via email to