In 0.10 you should have to have bag -> tuple -> elments 2012/7/17 Yang <teddyyyy...@gmail.com>
> ok, found the issue, > > now I do not create an explicit FieldSchema for the inside tuple Schema, > but directly insert the tuple schema into > the bag. then it works. > > this is indeed some difference between 081 and 0.10, cuz the original works > on 0.10, and the new one only works on 0.8.1 > > On Tue, Jul 17, 2012 at 4:59 PM, Yang <teddyyyy...@gmail.com> wrote: > > > I created a Udf that returns a Bag of Tuples. the syntax is all fine, > but > > when I run it in pig, > > Pig gives error: > > 2/07/17 16:51:58 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with > > processName=JobTracker, sessionId= - already initialized > > 12/07/17 16:51:58 WARN mapred.LocalJobRunner: job_local_0001 > > java.lang.ClassCastException: java.lang.String cannot be cast to > > org.apache.pig.data.Tuple > > at > > > org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject.getNext(POProject.java:392) > > at > > > org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.processPlan(POForEach.java:342) > > at > > > org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.getNext(POForEach.java:290) > > at > > > org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.runPipeline(PigMapBase.java:237) > > at > > > org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.map(PigMapBase.java:232) > > at > > > org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.map(PigMapBase.java:53) > > at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144) > > at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:621) > > at org.apache.hadoop.mapred.MapTask.run(MapTask.java:305) > > at > > org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:177) > > 12/07/17 16:51:58 INFO mapReduceLayer.MapReduceLauncher: HadoopJobId: > > job_local_0001 > > > > > > > > it looks that the returned value is wrong somehow. but I checked the > > outputSchema() method, and it is exactly the same as > > online docs. where am I wrong? > > ---- this is pig 0.8.1 . I posted a question about 1 month ago, > > stating that 0.8.1 FLATTEN(bag_of_tuples) behavior is different from > > 0.10.0, in that > > it keeps the enclosing tuple, while 0.10.0 strips it and places the > fields > > at the root level. > > > > > > > > Thanks! > > yang > > > > ///// DemoUdf.java > > > > import java.io.IOException; > > > > import org.apache.pig.EvalFunc; > > import org.apache.pig.data.DataBag; > > import org.apache.pig.data.DataType; > > import org.apache.pig.data.DefaultDataBag; > > import org.apache.pig.data.DefaultTuple; > > import org.apache.pig.data.Tuple; > > import org.apache.pig.impl.logicalLayer.FrontendException; > > import org.apache.pig.impl.logicalLayer.schema.Schema; > > > > public class DemoUdf extends EvalFunc<DataBag> { > > > > @Override > > public DataBag exec(Tuple args) throws IOException { > > > > Tuple t1 = new DefaultTuple(); > > t1.append("xx"); > > t1.append("yy"); > > Tuple t2 = new DefaultTuple(); > > t2.append("xxx"); > > t2.append("yyy"); > > DataBag b = new DefaultDataBag(); > > b.add(t1); > > b.add(t2); > > return b; > > } > > > > // schema is bagContent:bag{bagContentTuple:tuple(x, y)} > > @Override > > public Schema outputSchema(Schema input) { > > try { > > > > Schema insideTuple = new Schema();// this is a tuple > > insideTuple.add(new Schema.FieldSchema("x", DataType.CHARARRAY)); > > insideTuple.add(new Schema.FieldSchema("y", DataType.CHARARRAY)); > > Schema out = new Schema(); > > out.add(new Schema.FieldSchema("bagContent", bagOfTuples("bagContent", > > insideTuple), DataType.BAG)); > > return out; > > } catch (FrontendException e) { > > e.printStackTrace(); > > return null; > > } > > } > > > > private Schema bagOfTuples(String bagName, Schema tupleSchema) throws > > FrontendException { > > Schema bagSchema = new Schema(); > > // the name does not really matter here, you will see it only on describe > > output > > bagSchema.add(new Schema.FieldSchema(bagName + "Tuple", tupleSchema, > > DataType.TUPLE )); > > > > return bagSchema; > > } > > > > } > > > > > > > > > > > > > ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// > > /// src/test/java/DemoTest.java > > > > import org.apache.pig.pigunit.PigTest; > > import org.junit.Test; > > > > public class DemoTest { > > @Test > > public void blah() {} > > @Test > > public void testSimple() throws Exception { > > > > > > PigTest test = new PigTest("src/test/resources/test_demo.pig"); > > // sample input data schema > > // x : bag{(vertex:int, cliques:bag{tuple(id:int, privateId:int)}) > } > > > > String [] inputData = { "1" }; > > > > > > String [] expectedOutput = { "({xxx,yyy})"}; > > > > > > // don't really verify anything, too long > > test.assertOutput("inputdata", inputData, "tuples", expectedOutput); > > > > } > > > > } > > ///////////////////////////////////////////////////////////////// > > > > > > /// src/test/resources/test_demo.pig > > > > DEFINE demo DemoUdf(); > > > > inputdata = load 'src/test/resources/test_demo.pig' as (x:chararray); > > tuples = FOREACH inputdata GENERATE FLATTEN(demo(1)) as kkk; > > > > tuples = FOREACH tuples GENERATE $0; > > STORE tuples INTO 'fake_output'; > > > > > > > > > > >