this actually caused a rather nasty bug today.
in another udf that returns a bag of tuples, originally I inserted the tuple into a fieldschema inside the bag, and the schema for FLATTEN(myudf()) as mytuple::field1, mytuple::field2, but actually the values of all the fields are expanded into the root level, and overwrote another field having the same value, but without the "mytuple::" part this is on 0.8.1 On Tue, Jul 17, 2012 at 11:25 PM, Jonathan Coveney <[email protected]>wrote: > In 0.10 you should have to have bag -> tuple -> elments > > 2012/7/17 Yang <[email protected]> > > > ok, found the issue, > > > > now I do not create an explicit FieldSchema for the inside tuple Schema, > > but directly insert the tuple schema into > > the bag. then it works. > > > > this is indeed some difference between 081 and 0.10, cuz the original > works > > on 0.10, and the new one only works on 0.8.1 > > > > On Tue, Jul 17, 2012 at 4:59 PM, Yang <[email protected]> wrote: > > > > > I created a Udf that returns a Bag of Tuples. the syntax is all fine, > > but > > > when I run it in pig, > > > Pig gives error: > > > 2/07/17 16:51:58 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics > with > > > processName=JobTracker, sessionId= - already initialized > > > 12/07/17 16:51:58 WARN mapred.LocalJobRunner: job_local_0001 > > > java.lang.ClassCastException: java.lang.String cannot be cast to > > > org.apache.pig.data.Tuple > > > at > > > > > > org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject.getNext(POProject.java:392) > > > at > > > > > > org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.processPlan(POForEach.java:342) > > > at > > > > > > org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.getNext(POForEach.java:290) > > > at > > > > > > org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.runPipeline(PigMapBase.java:237) > > > at > > > > > > org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.map(PigMapBase.java:232) > > > at > > > > > > org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.map(PigMapBase.java:53) > > > at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144) > > > at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:621) > > > at org.apache.hadoop.mapred.MapTask.run(MapTask.java:305) > > > at > > > > org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:177) > > > 12/07/17 16:51:58 INFO mapReduceLayer.MapReduceLauncher: HadoopJobId: > > > job_local_0001 > > > > > > > > > > > > it looks that the returned value is wrong somehow. but I checked the > > > outputSchema() method, and it is exactly the same as > > > online docs. where am I wrong? > > > ---- this is pig 0.8.1 . I posted a question about 1 month ago, > > > stating that 0.8.1 FLATTEN(bag_of_tuples) behavior is different from > > > 0.10.0, in that > > > it keeps the enclosing tuple, while 0.10.0 strips it and places the > > fields > > > at the root level. > > > > > > > > > > > > Thanks! > > > yang > > > > > > ///// DemoUdf.java > > > > > > import java.io.IOException; > > > > > > import org.apache.pig.EvalFunc; > > > import org.apache.pig.data.DataBag; > > > import org.apache.pig.data.DataType; > > > import org.apache.pig.data.DefaultDataBag; > > > import org.apache.pig.data.DefaultTuple; > > > import org.apache.pig.data.Tuple; > > > import org.apache.pig.impl.logicalLayer.FrontendException; > > > import org.apache.pig.impl.logicalLayer.schema.Schema; > > > > > > public class DemoUdf extends EvalFunc<DataBag> { > > > > > > @Override > > > public DataBag exec(Tuple args) throws IOException { > > > > > > Tuple t1 = new DefaultTuple(); > > > t1.append("xx"); > > > t1.append("yy"); > > > Tuple t2 = new DefaultTuple(); > > > t2.append("xxx"); > > > t2.append("yyy"); > > > DataBag b = new DefaultDataBag(); > > > b.add(t1); > > > b.add(t2); > > > return b; > > > } > > > > > > // schema is bagContent:bag{bagContentTuple:tuple(x, y)} > > > @Override > > > public Schema outputSchema(Schema input) { > > > try { > > > > > > Schema insideTuple = new Schema();// this is a tuple > > > insideTuple.add(new Schema.FieldSchema("x", DataType.CHARARRAY)); > > > insideTuple.add(new Schema.FieldSchema("y", DataType.CHARARRAY)); > > > Schema out = new Schema(); > > > out.add(new Schema.FieldSchema("bagContent", bagOfTuples("bagContent", > > > insideTuple), DataType.BAG)); > > > return out; > > > } catch (FrontendException e) { > > > e.printStackTrace(); > > > return null; > > > } > > > } > > > > > > private Schema bagOfTuples(String bagName, Schema tupleSchema) throws > > > FrontendException { > > > Schema bagSchema = new Schema(); > > > // the name does not really matter here, you will see it only on > describe > > > output > > > bagSchema.add(new Schema.FieldSchema(bagName + "Tuple", tupleSchema, > > > DataType.TUPLE )); > > > > > > return bagSchema; > > > } > > > > > > } > > > > > > > > > > > > > > > > > > > > > ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// > > > /// src/test/java/DemoTest.java > > > > > > import org.apache.pig.pigunit.PigTest; > > > import org.junit.Test; > > > > > > public class DemoTest { > > > @Test > > > public void blah() {} > > > @Test > > > public void testSimple() throws Exception { > > > > > > > > > PigTest test = new PigTest("src/test/resources/test_demo.pig"); > > > // sample input data schema > > > // x : bag{(vertex:int, cliques:bag{tuple(id:int, privateId:int)}) > > } > > > > > > String [] inputData = { "1" }; > > > > > > > > > String [] expectedOutput = { "({xxx,yyy})"}; > > > > > > > > > // don't really verify anything, too long > > > test.assertOutput("inputdata", inputData, "tuples", > expectedOutput); > > > > > > } > > > > > > } > > > ///////////////////////////////////////////////////////////////// > > > > > > > > > /// src/test/resources/test_demo.pig > > > > > > DEFINE demo DemoUdf(); > > > > > > inputdata = load 'src/test/resources/test_demo.pig' as (x:chararray); > > > tuples = FOREACH inputdata GENERATE FLATTEN(demo(1)) as kkk; > > > > > > tuples = FOREACH tuples GENERATE $0; > > > STORE tuples INTO 'fake_output'; > > > > > > > > > > > > > > > > > >
