Rodrigo,
I tried following to reproduce the problem. In my UDF /*GroupTest*/, I
was getting data related to 1 key at a time(see *LOG *below). Added UDF
Code, Pig Code, Input data and Log output below. Let me know in case I
have missed anything.
*UDF Code :* (Intentionally added sysout in UDF to check bag size in the
log)
package com.pigtutorial.ch01;
import java.io.IOException;
import java.util.Iterator;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
public class GroupTest extends EvalFunc<DataBag> {
@Override
public DataBag exec(Tuple input) throws IOException {
DataBag returnBag = BagFactory.getInstance().newDefaultBag();
if (input == null || input.size() == 0 || input.get(0) == null)
return null;
try {
DataBag bag = DataType.toBag(input.get(0));
/ System.out.println("Calling UDF with databag size : " + bag.size());/
Iterator it = bag.iterator();
while (it.hasNext()) {
Tuple t = (Tuple) it.next();
/System.out.println(t);/
returnBag.add(t);
}
} catch (Exception e) {
throw new IOException("Caught exception processing input
row ", e);
}
return returnBag;
}
}
*Pig Code :*
a =
LOAD 'src/resources/data/input/data_input'
USING PigStorage(',')
AS (KEY:chararray, VAL1:chararray, VAL2:chararray);
b = GROUP a
BY KEY
PARALLEL 2;
c = FILTER b
BY NOT IsEmpty(a);
d = FOREACH c
GENERATE FLATTEN(com.pigtutorial.ch01.GroupTest(a));
STORE
a
INTO 'src/resources/data/actual_output/group_output'
USING PigStorage(',');
*INPUT :*
key1,d1,d2
key2,d1,d3
key1,d1,d4
key1,d1,d5
*LOG :*
Calling UDF with databag size : 3
(key1,d1,d2)
(key1,d1,d4)
(key1,d1,d5)
Calling UDF with databag size : 1
(key2,d1,d3)
--
Thanks
Suraj Nayak
On Sunday 13 July 2014 04:36 AM, Rodrigo Ferreira wrote:
(answering again but now including the mailing list :P)
Thank you for your answer Suraj.
What you said is exactly what I expect, but I get something different.
Using your example (the specific data is not important here) I get in
my UDF more than one key ordered. Here's a sample of the code of my UDF:
DataBag bag = DataType.toBag(input.get(0));
Iterator it = bag.iterator();
while (it.hasNext()) {
Tuple t = (Tuple)it.next();
//Here I print the attribute used as the grouping key
}
What I get in the output is:
key1
key1
key1
key2
The point is that I'm using test data that are not really big (less
than 64MB). Anyhow, Pig shouldn't put these keys together in the same
bag! Maybe this a kind of optimization that I should turn off.
2014-07-12 23:29 GMT+02:00 Suraj Nayak <[email protected]
<mailto:[email protected]>>:
Are you processing the bag in the UDF?
Can you send sample records which is going in to UDF using dump
command for alias C?
If the data is(alias A)
(key1,d1,d2)
(key2,d1,d3)
(key1,d1,d4)
(key1,d1,d5)
On grouping on 1st column the data should be grouped as below
{(key1),{ (key1,d1,d2), (key1,d1,d4), (key1,d1,d5) }}
{(key2),{ (key2,d1,d3) }}
If you are providing the data A to UDF you should get all records
with respect to same key in same bag.
--
Suraj Nayak