[ https://issues.apache.org/jira/browse/PIG-1374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Daniel Dai updated PIG-1374: ---------------------------- Summary: PushDownForeachFlatten shall not push ForEach below Join if the flattened fields is used in the next statement (was: Order by fails with java.lang.String cannot be cast to org.apache.pig.data.DataBag) Description: A reproducible sample: {code} a = load '2.txt' as (b{t(a0:chararray,a1:int)}); b = foreach a generate flatten($0); c = order b by $1 desc; dump c; {code} 2.txt {code} {(a,1),(b,2)} {code} Error message: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.pig.data.DataBag at org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject.processInputBag(POProject.java:479) at org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject.getNext(POProject.java:197) at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange.getNext(POLocalRearrange.java:332) at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.runPipeline(PigMapBase.java:233) at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.map(PigMapBase.java:228) at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.map(PigMapBase.java:53) at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:621) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:305) at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:177) The problem is we push foreach below order by, but the foreach generated field is used by order by. Original report: Subject: Order by fails with java.lang.String cannot be cast to org.apache.pig.data.DataBag Script loads data from BinStorage(), then flattens columns and then sorts on the second column with order descending. The order by fails with the ClassCastException {code} register loader.jar; a = load 'c2' using BinStorage(); b = foreach a generate org.apache.pig.CCMLoader(*); describe b; c = foreach b generate flatten($0); describe c; d = order c by $1 desc; dump d; {code} The sampling job fails with the following error: =============================================================================================================== java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.pig.data.DataBag at org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject.processInputBag(POProject.java:407) at org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject.getNext(POProject.java:188) at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange.getNext(POLocalRearrange.java:329) at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.runPipeline(PigMapBase.java:232) at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.map(PigMapBase.java:227) at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.map(PigMapBase.java:52) at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:621) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:305) at org.apache.hadoop.mapred.Child.main(Child.java:159) =============================================================================================================== The schema for b, c and d are as follows: b: {bag_of_tuples: {tuple: (uuid: chararray,velocity: double)}} c: {bag_of_tuples::uuid: chararray,bag_of_tuples::velocity: double} d: {bag_of_tuples::uuid: chararray,bag_of_tuples::velocity: double} If we modify this script to order on the first column it seems to work {code} register loader.jar; a = load 'c2' using BinStorage(); b = foreach a generate org.apache.pig.CCMLoader(*); describe b; c = foreach b generate flatten($0); describe c; d = order c by $0 desc; dump d; {code} (gc639c60-4267-11df-9879-0800200c9a66,2.4227339503478493) (ec639c60-4267-11df-9879-0800200c9a66,1.140175425099138) There is a workaround to do a projection before ORDER {code} register loader.jar; a = load 'c2' using BinStorage(); b = foreach a generate org.apache.pig.CCMLoader(*); describe b; c = foreach b generate flatten($0); describe c; newc = foreach c generate $0 as uuid, $1 as velocity; newd = order newc by velocity desc; dump newd; {code} (gc639c60-4267-11df-9879-0800200c9a66,2.4227339503478493) (ec639c60-4267-11df-9879-0800200c9a66,1.140175425099138) The schema for the Loader is as follows: {code} public Schema outputSchema(Schema input) { try{ List<Schema.FieldSchema> list = new ArrayList<Schema.FieldSchema>(); list.add(new Schema.FieldSchema("uuid", DataType.CHARARRAY)); list.add(new Schema.FieldSchema("velocity", DataType.DOUBLE)); Schema tupleSchema = new Schema(list); Schema.FieldSchema tupleFs = new Schema.FieldSchema("tuple", tupleSchema, DataType.TUPLE); Schema bagSchema = new Schema(tupleFs); bagSchema.setTwoLevelAccessRequired(true); Schema.FieldSchema bagFs = new Schema.FieldSchema("bag_of_tuples",bagSchema, DataType.BAG); return new Schema(bagFs); }catch (Exception e){ return null; } } {code} was: Script loads data from BinStorage(), then flattens columns and then sorts on the second column with order descending. The order by fails with the ClassCastException {code} register loader.jar; a = load 'c2' using BinStorage(); b = foreach a generate org.apache.pig.CCMLoader(*); describe b; c = foreach b generate flatten($0); describe c; d = order c by $1 desc; dump d; {code} The sampling job fails with the following error: =============================================================================================================== java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.pig.data.DataBag at org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject.processInputBag(POProject.java:407) at org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject.getNext(POProject.java:188) at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange.getNext(POLocalRearrange.java:329) at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.runPipeline(PigMapBase.java:232) at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.map(PigMapBase.java:227) at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.map(PigMapBase.java:52) at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:621) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:305) at org.apache.hadoop.mapred.Child.main(Child.java:159) =============================================================================================================== The schema for b, c and d are as follows: b: {bag_of_tuples: {tuple: (uuid: chararray,velocity: double)}} c: {bag_of_tuples::uuid: chararray,bag_of_tuples::velocity: double} d: {bag_of_tuples::uuid: chararray,bag_of_tuples::velocity: double} If we modify this script to order on the first column it seems to work {code} register loader.jar; a = load 'c2' using BinStorage(); b = foreach a generate org.apache.pig.CCMLoader(*); describe b; c = foreach b generate flatten($0); describe c; d = order c by $0 desc; dump d; {code} (gc639c60-4267-11df-9879-0800200c9a66,2.4227339503478493) (ec639c60-4267-11df-9879-0800200c9a66,1.140175425099138) There is a workaround to do a projection before ORDER {code} register loader.jar; a = load 'c2' using BinStorage(); b = foreach a generate org.apache.pig.CCMLoader(*); describe b; c = foreach b generate flatten($0); describe c; newc = foreach c generate $0 as uuid, $1 as velocity; newd = order newc by velocity desc; dump newd; {code} (gc639c60-4267-11df-9879-0800200c9a66,2.4227339503478493) (ec639c60-4267-11df-9879-0800200c9a66,1.140175425099138) The schema for the Loader is as follows: {code} public Schema outputSchema(Schema input) { try{ List<Schema.FieldSchema> list = new ArrayList<Schema.FieldSchema>(); list.add(new Schema.FieldSchema("uuid", DataType.CHARARRAY)); list.add(new Schema.FieldSchema("velocity", DataType.DOUBLE)); Schema tupleSchema = new Schema(list); Schema.FieldSchema tupleFs = new Schema.FieldSchema("tuple", tupleSchema, DataType.TUPLE); Schema bagSchema = new Schema(tupleFs); bagSchema.setTwoLevelAccessRequired(true); Schema.FieldSchema bagFs = new Schema.FieldSchema("bag_of_tuples",bagSchema, DataType.BAG); return new Schema(bagFs); }catch (Exception e){ return null; } } {code} > PushDownForeachFlatten shall not push ForEach below Join if the flattened > fields is used in the next statement > -------------------------------------------------------------------------------------------------------------- > > Key: PIG-1374 > URL: https://issues.apache.org/jira/browse/PIG-1374 > Project: Pig > Issue Type: Bug > Components: impl > Affects Versions: 0.6.0, 0.7.0 > Reporter: Viraj Bhat > Assignee: Daniel Dai > Fix For: 0.7.0 > > > A reproducible sample: > {code} > a = load '2.txt' as (b{t(a0:chararray,a1:int)}); > b = foreach a generate flatten($0); > c = order b by $1 desc; > dump c; > {code} > 2.txt > {code} > {(a,1),(b,2)} > {code} > Error message: > java.lang.ClassCastException: java.lang.String cannot be cast to > org.apache.pig.data.DataBag > at > org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject.processInputBag(POProject.java:479) > at > org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject.getNext(POProject.java:197) > at > org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange.getNext(POLocalRearrange.java:332) > at > org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.runPipeline(PigMapBase.java:233) > at > org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.map(PigMapBase.java:228) > at > org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.map(PigMapBase.java:53) > at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144) > at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:621) > at org.apache.hadoop.mapred.MapTask.run(MapTask.java:305) > at > org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:177) > The problem is we push foreach below order by, but the foreach generated > field is used by order by. > Original report: > Subject: Order by fails with java.lang.String cannot be cast to > org.apache.pig.data.DataBag > Script loads data from BinStorage(), then flattens columns and then sorts on > the second column with order descending. The order by fails with the > ClassCastException > {code} > register loader.jar; > a = load 'c2' using BinStorage(); > b = foreach a generate org.apache.pig.CCMLoader(*); > describe b; > c = foreach b generate flatten($0); > describe c; > d = order c by $1 desc; > dump d; > {code} > The sampling job fails with the following error: > =============================================================================================================== > java.lang.ClassCastException: java.lang.String cannot be cast to > org.apache.pig.data.DataBag > at > org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject.processInputBag(POProject.java:407) > at > org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject.getNext(POProject.java:188) > at > org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange.getNext(POLocalRearrange.java:329) > at > org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.runPipeline(PigMapBase.java:232) > at > org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.map(PigMapBase.java:227) > at > org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.map(PigMapBase.java:52) > at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144) > at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:621) > at org.apache.hadoop.mapred.MapTask.run(MapTask.java:305) > at org.apache.hadoop.mapred.Child.main(Child.java:159) > =============================================================================================================== > The schema for b, c and d are as follows: > b: {bag_of_tuples: {tuple: (uuid: chararray,velocity: double)}} > c: {bag_of_tuples::uuid: chararray,bag_of_tuples::velocity: double} > d: {bag_of_tuples::uuid: chararray,bag_of_tuples::velocity: double} > If we modify this script to order on the first column it seems to work > {code} > register loader.jar; > a = load 'c2' using BinStorage(); > b = foreach a generate org.apache.pig.CCMLoader(*); > describe b; > c = foreach b generate flatten($0); > describe c; > d = order c by $0 desc; > dump d; > {code} > (gc639c60-4267-11df-9879-0800200c9a66,2.4227339503478493) > (ec639c60-4267-11df-9879-0800200c9a66,1.140175425099138) > There is a workaround to do a projection before ORDER > {code} > register loader.jar; > a = load 'c2' using BinStorage(); > b = foreach a generate org.apache.pig.CCMLoader(*); > describe b; > c = foreach b generate flatten($0); > describe c; > newc = foreach c generate $0 as uuid, $1 as velocity; > newd = order newc by velocity desc; > dump newd; > {code} > (gc639c60-4267-11df-9879-0800200c9a66,2.4227339503478493) > (ec639c60-4267-11df-9879-0800200c9a66,1.140175425099138) > The schema for the Loader is as follows: > {code} > public Schema outputSchema(Schema input) { > try{ > List<Schema.FieldSchema> list = new > ArrayList<Schema.FieldSchema>(); > list.add(new Schema.FieldSchema("uuid", > DataType.CHARARRAY)); > list.add(new Schema.FieldSchema("velocity", > DataType.DOUBLE)); > Schema tupleSchema = new Schema(list); > Schema.FieldSchema tupleFs = new > Schema.FieldSchema("tuple", tupleSchema, DataType.TUPLE); > Schema bagSchema = new Schema(tupleFs); > bagSchema.setTwoLevelAccessRequired(true); > Schema.FieldSchema bagFs = new > Schema.FieldSchema("bag_of_tuples",bagSchema, DataType.BAG); > return new Schema(bagFs); > }catch (Exception e){ > return null; > } > } > {code} -- This message is automatically generated by JIRA. - If you think it was sent incorrectly contact one of the administrators: https://issues.apache.org/jira/secure/Administrators.jspa - For more information on JIRA, see: http://www.atlassian.com/software/jira