2010/4/19 hc busy <hc.b...@gmail.com>
That's just the way it is right now, you can't make bags or tuples
directly... Maybe we should have some UDF's in piggybank for these:
toBag()
toTuple(); --which is kinda like exec(Tuple in){return in;}
TupleToBag(); --some times you need it this way for some reason.
Ok. I place my current code here, may be later I make a patch (if
such
implementation is acceptable of course).
import org.apache.pig.EvalFunc;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import java.io.IOException;
/**
* Convert any sequence of fields to bag with specified count of
fields<br>
* Schema: count:int, fld1 [, fld2, fld3, fld4... ].
* Output: count=2, then { (fld1, fld2) , (fld3, fld4) ... }
*
* @author astepachev
*/
public class ToBag extends EvalFunc<DataBag> {
public BagFactory bagFactory;
public TupleFactory tupleFactory;
public ToBag() {
bagFactory = BagFactory.getInstance();
tupleFactory = TupleFactory.getInstance();
}
@Override
public DataBag exec(Tuple input) throws IOException {
if (input.isNull())
return null;
final DataBag bag = bagFactory.newDefaultBag();
final Integer couter = (Integer) input.get(0);
if (couter == null)
return null;
Tuple tuple = tupleFactory.newTuple();
for (int i = 0; i < input.size() - 1; i++) {
if (i % couter == 0) {
tuple = tupleFactory.newTuple();
bag.add(tuple);
}
tuple.append(input.get(i + 1));
}
return bag;
}
}
import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.net.URISyntaxException;
import java.net.URL;
import static org.junit.Assert.assertTrue;
/**
* @author astepachev
*/
public class ToBagTest {
PigServer pigServer;
URL inputTxt;
@Before
public void init() throws IOException, URISyntaxException {
pigServer = new PigServer(ExecType.LOCAL);
inputTxt =
this.getClass().getResource("bagTest.txt").toURI().toURL();
}
@Test
public void testSimple() throws IOException {
pigServer.registerQuery("a = load '" +
inputTxt.toExternalForm() +
"' using PigStorage(',') " +
"as (id:int, a:chararray, b:chararray, c:chararray,
d:chararray);");
pigServer.registerQuery("last = foreach a generate flatten(" +
ToBag.class.getName() + "(2, id, a, id, b, id, c));");
pigServer.deleteFile("target/pigtest/func1.txt");
pigServer.store("last", "target/pigtest/func1.txt");
assertTrue(pigServer.fileSize("target/pigtest/func1.txt") > 0);
}
}