Looks like a bug. Create a Jira for it:
https://issues.apache.org/jira/browse/PIG-1866
Thanks,
Daniel
Ryan Tecco wrote:
This seems like it should work:
register '/tmp/test-udfs.jar';
/*
package test.udfs;
import java.io.IOException;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.DefaultBagFactory;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.schema.Schema;
public class ReturnNestedBag extends EvalFunc<Tuple> {
@Override
public Tuple exec(Tuple input) throws IOException {
if (input == null || input.size() < 1) {
return null;
}
try {
Tuple t =
TupleFactory.getInstance().newTuple(2);
t.set(0, 1);
DataBag segments =
DefaultBagFactory.getInstance().newDefaultBag();
Tuple segmentTuple1 =
TupleFactory.getInstance().newTuple(1);
segmentTuple1.set(0, "str1");
Tuple segmentTuple2 =
TupleFactory.getInstance().newTuple(1);
segmentTuple2.set(0, "str2");
segments.add(segmentTuple1);
segments.add(segmentTuple2);
t.set(1, segments);
return t;
} catch (IOException e) {
return null;
}
}
// @Override
// public Schema outputSchema(Schema input) {
// }
}
/tmp/dummy_data.txt
one 1
two 2
three 3
four 4
five 5
*/
a = load '/tmp/dummy_data.txt' using PigStorage as (x: chararray, y: int);
c = foreach a generate test.udfs.ReturnNestedBag(x) as t : tuple (i: int,
b: bag {
b_tuple : tuple (
b_str: chararray
)
});
w = limit c 15;
d = foreach w generate flatten(t.b);
dump d;
But returns a ClassCastException under a stock 0.8 release:
java.lang.ClassCastException: org.apache.pig.data.BinSedesTuple cannot
be cast to org.apache.pig.data.DataBag
at
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject.processInputBag(POProject.java:482)
at
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject.getNext(POProject.java:197)
at
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject.processInputBag(POProject.java:480)
at
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject.getNext(POProject.java:197)
at
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.processPlan(POForEach.java:339)
at
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.getNext(POForEach.java:291)
at
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce$Reduce.runPipeline(PigMapReduce.java:433)
at
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce$Reduce.processOnePackageOutput(PigMapReduce.java:401)
at
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce$Reduce.reduce(PigMapReduce.java:381)
at
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce$Reduce.reduce(PigMapReduce.java:251)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176)
at
org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:566)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:408)
at
org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:216)
I tried trunk but it fails with some random error relating to the
logical planner.
Thoughts?
Thanks,
rt