[ 
https://issues.apache.org/jira/browse/PIG-1374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Dai updated PIG-1374:
----------------------------

        Summary: PushDownForeachFlatten shall not push ForEach below Join if 
the flattened fields is used in the next statement  (was: Order by fails with 
java.lang.String cannot be cast to org.apache.pig.data.DataBag)
    Description: 
A reproducible sample:
{code}
a = load '2.txt' as (b{t(a0:chararray,a1:int)});
b = foreach a generate flatten($0);
c = order b by $1 desc;
dump c;
{code}
2.txt
{code}
{(a,1),(b,2)}
{code}
Error message:
java.lang.ClassCastException: java.lang.String cannot be cast to 
org.apache.pig.data.DataBag
        at 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject.processInputBag(POProject.java:479)
        at 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject.getNext(POProject.java:197)
        at 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange.getNext(POLocalRearrange.java:332)
        at 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.runPipeline(PigMapBase.java:233)
        at 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.map(PigMapBase.java:228)
        at 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.map(PigMapBase.java:53)
        at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
        at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:621)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:305)
        at 
org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:177)

The problem is we push foreach below order by, but the foreach generated field 
is used by order by.

Original report:
Subject: Order by fails with java.lang.String cannot be cast to 
org.apache.pig.data.DataBag
Script loads data from BinStorage(), then flattens columns and then sorts on 
the second column with order descending. The order by fails with the 
ClassCastException

{code}
register loader.jar;
a = load 'c2' using BinStorage();
b = foreach a generate org.apache.pig.CCMLoader(*);
describe b;
c = foreach b generate flatten($0);
describe c;
d = order c by $1 desc;
dump d;
{code}

The sampling job fails with the following error:
===============================================================================================================
java.lang.ClassCastException: java.lang.String cannot be cast to 
org.apache.pig.data.DataBag
        at
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject.processInputBag(POProject.java:407)
        at
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject.getNext(POProject.java:188)
        at
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange.getNext(POLocalRearrange.java:329)
        at 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.runPipeline(PigMapBase.java:232)
        at 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.map(PigMapBase.java:227)
        at 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.map(PigMapBase.java:52)
        at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
        at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:621)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:305)
        at org.apache.hadoop.mapred.Child.main(Child.java:159)
===============================================================================================================

The schema for b, c and d are as follows:

b: {bag_of_tuples: {tuple: (uuid: chararray,velocity: double)}}

c: {bag_of_tuples::uuid: chararray,bag_of_tuples::velocity: double}

d: {bag_of_tuples::uuid: chararray,bag_of_tuples::velocity: double}

If we modify this script to order on the first column it seems to work

{code}
register loader.jar;
a = load 'c2' using BinStorage();
b = foreach a generate org.apache.pig.CCMLoader(*);
describe b;
c = foreach b generate flatten($0);
describe c;
d = order c by $0 desc;
dump d;
{code}

(gc639c60-4267-11df-9879-0800200c9a66,2.4227339503478493)
(ec639c60-4267-11df-9879-0800200c9a66,1.140175425099138)


There is a workaround to do a projection before ORDER

{code}
register loader.jar;
a = load 'c2' using BinStorage();
b = foreach a generate org.apache.pig.CCMLoader(*);
describe b;
c = foreach b generate flatten($0);
describe c;
newc = foreach c generate $0 as uuid, $1 as velocity;
newd = order newc by velocity desc;
dump newd;
{code}

(gc639c60-4267-11df-9879-0800200c9a66,2.4227339503478493)
(ec639c60-4267-11df-9879-0800200c9a66,1.140175425099138)


The schema for the Loader is as follows:

{code}
  public Schema outputSchema(Schema input) {
                 try{          
                        List<Schema.FieldSchema> list = new 
ArrayList<Schema.FieldSchema>();
                        list.add(new Schema.FieldSchema("uuid", 
DataType.CHARARRAY));
                        list.add(new Schema.FieldSchema("velocity", 
DataType.DOUBLE));
                        Schema tupleSchema = new Schema(list);
                        Schema.FieldSchema tupleFs = new 
Schema.FieldSchema("tuple", tupleSchema, DataType.TUPLE);
                        Schema bagSchema = new Schema(tupleFs);
                        bagSchema.setTwoLevelAccessRequired(true);
                        Schema.FieldSchema bagFs = new 
Schema.FieldSchema("bag_of_tuples",bagSchema, DataType.BAG);
                        return new Schema(bagFs);
                }catch (Exception e){
                        return null;
                }
    }
{code}

  was:
Script loads data from BinStorage(), then flattens columns and then sorts on 
the second column with order descending. The order by fails with the 
ClassCastException

{code}
register loader.jar;
a = load 'c2' using BinStorage();
b = foreach a generate org.apache.pig.CCMLoader(*);
describe b;
c = foreach b generate flatten($0);
describe c;
d = order c by $1 desc;
dump d;
{code}

The sampling job fails with the following error:
===============================================================================================================
java.lang.ClassCastException: java.lang.String cannot be cast to 
org.apache.pig.data.DataBag
        at
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject.processInputBag(POProject.java:407)
        at
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject.getNext(POProject.java:188)
        at
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange.getNext(POLocalRearrange.java:329)
        at 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.runPipeline(PigMapBase.java:232)
        at 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.map(PigMapBase.java:227)
        at 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.map(PigMapBase.java:52)
        at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
        at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:621)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:305)
        at org.apache.hadoop.mapred.Child.main(Child.java:159)
===============================================================================================================

The schema for b, c and d are as follows:

b: {bag_of_tuples: {tuple: (uuid: chararray,velocity: double)}}

c: {bag_of_tuples::uuid: chararray,bag_of_tuples::velocity: double}

d: {bag_of_tuples::uuid: chararray,bag_of_tuples::velocity: double}

If we modify this script to order on the first column it seems to work

{code}
register loader.jar;
a = load 'c2' using BinStorage();
b = foreach a generate org.apache.pig.CCMLoader(*);
describe b;
c = foreach b generate flatten($0);
describe c;
d = order c by $0 desc;
dump d;
{code}

(gc639c60-4267-11df-9879-0800200c9a66,2.4227339503478493)
(ec639c60-4267-11df-9879-0800200c9a66,1.140175425099138)


There is a workaround to do a projection before ORDER

{code}
register loader.jar;
a = load 'c2' using BinStorage();
b = foreach a generate org.apache.pig.CCMLoader(*);
describe b;
c = foreach b generate flatten($0);
describe c;
newc = foreach c generate $0 as uuid, $1 as velocity;
newd = order newc by velocity desc;
dump newd;
{code}

(gc639c60-4267-11df-9879-0800200c9a66,2.4227339503478493)
(ec639c60-4267-11df-9879-0800200c9a66,1.140175425099138)


The schema for the Loader is as follows:

{code}
  public Schema outputSchema(Schema input) {
                 try{          
                        List<Schema.FieldSchema> list = new 
ArrayList<Schema.FieldSchema>();
                        list.add(new Schema.FieldSchema("uuid", 
DataType.CHARARRAY));
                        list.add(new Schema.FieldSchema("velocity", 
DataType.DOUBLE));
                        Schema tupleSchema = new Schema(list);
                        Schema.FieldSchema tupleFs = new 
Schema.FieldSchema("tuple", tupleSchema, DataType.TUPLE);
                        Schema bagSchema = new Schema(tupleFs);
                        bagSchema.setTwoLevelAccessRequired(true);
                        Schema.FieldSchema bagFs = new 
Schema.FieldSchema("bag_of_tuples",bagSchema, DataType.BAG);
                        return new Schema(bagFs);
                }catch (Exception e){
                        return null;
                }
    }
{code}


> PushDownForeachFlatten shall not push ForEach below Join if the flattened 
> fields is used in the next statement
> --------------------------------------------------------------------------------------------------------------
>
>                 Key: PIG-1374
>                 URL: https://issues.apache.org/jira/browse/PIG-1374
>             Project: Pig
>          Issue Type: Bug
>          Components: impl
>    Affects Versions: 0.6.0, 0.7.0
>            Reporter: Viraj Bhat
>            Assignee: Daniel Dai
>             Fix For: 0.7.0
>
>
> A reproducible sample:
> {code}
> a = load '2.txt' as (b{t(a0:chararray,a1:int)});
> b = foreach a generate flatten($0);
> c = order b by $1 desc;
> dump c;
> {code}
> 2.txt
> {code}
> {(a,1),(b,2)}
> {code}
> Error message:
> java.lang.ClassCastException: java.lang.String cannot be cast to 
> org.apache.pig.data.DataBag
>         at 
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject.processInputBag(POProject.java:479)
>         at 
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject.getNext(POProject.java:197)
>         at 
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange.getNext(POLocalRearrange.java:332)
>         at 
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.runPipeline(PigMapBase.java:233)
>         at 
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.map(PigMapBase.java:228)
>         at 
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.map(PigMapBase.java:53)
>         at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
>         at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:621)
>         at org.apache.hadoop.mapred.MapTask.run(MapTask.java:305)
>         at 
> org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:177)
> The problem is we push foreach below order by, but the foreach generated 
> field is used by order by.
> Original report:
> Subject: Order by fails with java.lang.String cannot be cast to 
> org.apache.pig.data.DataBag
> Script loads data from BinStorage(), then flattens columns and then sorts on 
> the second column with order descending. The order by fails with the 
> ClassCastException
> {code}
> register loader.jar;
> a = load 'c2' using BinStorage();
> b = foreach a generate org.apache.pig.CCMLoader(*);
> describe b;
> c = foreach b generate flatten($0);
> describe c;
> d = order c by $1 desc;
> dump d;
> {code}
> The sampling job fails with the following error:
> ===============================================================================================================
> java.lang.ClassCastException: java.lang.String cannot be cast to 
> org.apache.pig.data.DataBag
>         at
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject.processInputBag(POProject.java:407)
>         at
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject.getNext(POProject.java:188)
>         at
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange.getNext(POLocalRearrange.java:329)
>         at 
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.runPipeline(PigMapBase.java:232)
>         at 
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.map(PigMapBase.java:227)
>         at 
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.map(PigMapBase.java:52)
>         at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
>         at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:621)
>         at org.apache.hadoop.mapred.MapTask.run(MapTask.java:305)
>         at org.apache.hadoop.mapred.Child.main(Child.java:159)
> ===============================================================================================================
> The schema for b, c and d are as follows:
> b: {bag_of_tuples: {tuple: (uuid: chararray,velocity: double)}}
> c: {bag_of_tuples::uuid: chararray,bag_of_tuples::velocity: double}
> d: {bag_of_tuples::uuid: chararray,bag_of_tuples::velocity: double}
> If we modify this script to order on the first column it seems to work
> {code}
> register loader.jar;
> a = load 'c2' using BinStorage();
> b = foreach a generate org.apache.pig.CCMLoader(*);
> describe b;
> c = foreach b generate flatten($0);
> describe c;
> d = order c by $0 desc;
> dump d;
> {code}
> (gc639c60-4267-11df-9879-0800200c9a66,2.4227339503478493)
> (ec639c60-4267-11df-9879-0800200c9a66,1.140175425099138)
> There is a workaround to do a projection before ORDER
> {code}
> register loader.jar;
> a = load 'c2' using BinStorage();
> b = foreach a generate org.apache.pig.CCMLoader(*);
> describe b;
> c = foreach b generate flatten($0);
> describe c;
> newc = foreach c generate $0 as uuid, $1 as velocity;
> newd = order newc by velocity desc;
> dump newd;
> {code}
> (gc639c60-4267-11df-9879-0800200c9a66,2.4227339503478493)
> (ec639c60-4267-11df-9879-0800200c9a66,1.140175425099138)
> The schema for the Loader is as follows:
> {code}
>   public Schema outputSchema(Schema input) {
>                  try{          
>                         List<Schema.FieldSchema> list = new 
> ArrayList<Schema.FieldSchema>();
>                         list.add(new Schema.FieldSchema("uuid", 
> DataType.CHARARRAY));
>                         list.add(new Schema.FieldSchema("velocity", 
> DataType.DOUBLE));
>                         Schema tupleSchema = new Schema(list);
>                         Schema.FieldSchema tupleFs = new 
> Schema.FieldSchema("tuple", tupleSchema, DataType.TUPLE);
>                         Schema bagSchema = new Schema(tupleFs);
>                         bagSchema.setTwoLevelAccessRequired(true);
>                         Schema.FieldSchema bagFs = new 
> Schema.FieldSchema("bag_of_tuples",bagSchema, DataType.BAG);
>                         return new Schema(bagFs);
>                 }catch (Exception e){
>                         return null;
>                 }
>     }
> {code}

-- 
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators: 
https://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to