[ https://issues.apache.org/jira/browse/PIG-1895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13005604#comment-13005604 ]
Vivek Padmanabhan commented on PIG-1895: ---------------------------------------- UDF Source code : {code} import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import org.apache.pig.EvalFunc; import org.apache.pig.data.DataBag; import org.apache.pig.data.DataType; import org.apache.pig.data.DefaultBagFactory; import org.apache.pig.data.DefaultTupleFactory; import org.apache.pig.data.Tuple; import org.apache.pig.impl.logicalLayer.schema.Schema; public class TestEvalFunc extends EvalFunc<Tuple>{ public Tuple exec(Tuple input) throws IOException { ArrayList<Tuple> tupleList = new ArrayList<Tuple>(2); DataBag values = (DataBag)(input.get(0)); for (Iterator<Tuple> vit = values.iterator(); vit.hasNext();) { tupleList.add(vit.next()); } DataBag sampleBag = DefaultBagFactory.getInstance().newDefaultBag(tupleList); Tuple output = DefaultTupleFactory.getInstance().newTuple(3); output.set(0, sampleBag); output.set(1, new Long(3)); output.set(2, new Integer(2)); return output; } public Schema outputSchema(Schema input) { Schema udfSchema = new Schema(); udfSchema.add(new Schema.FieldSchema("sampled",DataType.BAG)); udfSchema.add(new Schema.FieldSchema("k",DataType.LONG)); udfSchema.add(new Schema.FieldSchema("i",DataType.INTEGER)); return udfSchema; } } {code} Test Case to Verify; {code} import static org.apache.pig.ExecType.LOCAL; import java.util.ArrayList; import java.util.Iterator; import junit.framework.TestCase; import org.apache.pig.PigServer; import org.apache.pig.data.Tuple; public class MyPigUnitTests extends TestCase { private static String patternString = "(\\d+)!+(\\w+)~+(\\w+)"; public static ArrayList<String[]> data = new ArrayList<String[]>(); static { data.add(new String[] { "1"}); data.add(new String[] { "3"}); } private static String [] script = new String []{ "Data = load 'file:/home/pvivek/Desktop/input' using PigStorage() as ( i: int );", "AllData = group Data all parallel 1;", "SampledData = foreach AllData generate org.vivek.TestEvalFunc(Data, 5) as rs;", "SampledData1 = foreach SampledData generate rs.sampled;", }; public void test () throws Exception { String filename = TestHelper.createTempFile(data, ""); PigServer pig = new PigServer(LOCAL); filename = filename.replace("\\", "\\\\"); patternString = patternString.replace("\\", "\\\\"); for (String query : script) { pig.registerQuery(query); } Iterator<?> it = pig.openIterator("SampledData1"); int tupleCount = 0; while (it.hasNext()) { Tuple tuple = (Tuple) it.next(); if (tuple == null) break; else { if (tuple.size() > 0) { tupleCount++; } } } assertEquals(1, tupleCount); } } {code} > Class cast exception while projecting udf result > ------------------------------------------------ > > Key: PIG-1895 > URL: https://issues.apache.org/jira/browse/PIG-1895 > Project: Pig > Issue Type: Bug > Components: impl > Affects Versions: 0.7.0, 0.8.0, 0.9.0 > Reporter: Vivek Padmanabhan > > Class cast exception is thrown when I try to project the result from my udf. > The udf has a defined schema DataType.BAG,DataType.LONG and DataType.INTEGER > The below is my script > {code} > Data = load 'file:/home/pvivek/Desktop/input' using PigStorage() as ( i: int > ); > AllData = group Data all parallel 1; > SampledData = foreach AllData generate org.vivek.TestEvalFunc(Data, 5) as rs; > SampledData1 = foreach SampledData generate rs.sampled; > {code} > Even though the output schema defines "sampled" as a data bag, while > processing, instead of sending only the data bag generated from the UDF , the > entire tuple was sent to the projection as result. > {code} > Exception recieved : > java.lang.ClassCastException: org.apache.pig.data.BinSedesTuple cannot be > cast to org.apache.pig.data.DataBag > at > org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject.processInputBag(POProject.java:484) > at > org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject.getNext(POProject.java:197) > at > org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject.processInputBag(POProject.java:480) > at > org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject.getNext(POProject.java:197) > at > org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.processPlan(POForEach.java:339) > at > org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.getNext(POForEach.java:291) > at > org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce$Reduce.runPipeline(PigMapReduce.java:434) > at > org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce$Reduce.processOnePackageOutput(PigMapReduce.java:402) > at > org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce$Reduce.reduce(PigMapReduce.java:382) > at > org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce$Reduce.reduce(PigMapReduce.java:1) > at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176) > at > org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:566) > at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:408) > at > org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:216) > {code} > This issue is happening with 0.9/0.8 and 0.7 -- This message is automatically generated by JIRA. For more information on JIRA, see: http://www.atlassian.com/software/jira