[ 
https://issues.apache.org/jira/browse/PIG-1826?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jonathan Coveney updated PIG-1826:
----------------------------------

    Attachment: numgraph.java

This is the UDF I made that it fails on

The form of the script is

register /path/to/myudf.jar;
A = LOAD 'test.txt' as (a:chararray, b:chararray);
B = GROUP A BY a;
C = FOREACH B GENERATE A.b;
D = GROUP C ALL;
E = FOREACH D GENERATE myudf.fun.udf(C.b);

> Unexpected data type -1 found in stream error
> ---------------------------------------------
>
>                 Key: PIG-1826
>                 URL: https://issues.apache.org/jira/browse/PIG-1826
>             Project: Pig
>          Issue Type: Bug
>    Affects Versions: 0.8.0
>         Environment: This is pig 0.8.0 on a linux box
>            Reporter: Jonathan Coveney
>         Attachments: numgraph.java
>
>
> When running the attached udf I get the title error. By inserting printlns 
> extensively, the script is functioning properly and returning a DataBag, but 
> for whatever reason, pig does not detect it as such.
> package squeal.fun;
> import java.util.Iterator;
> import java.util.List;
> import java.util.ArrayList;
> import java.util.Map;
> import java.util.HashMap;
> import java.util.Set;
> import java.util.HashSet;
> import java.io.IOException;
> import org.apache.pig.PigException;
> import org.apache.pig.backend.executionengine.ExecException;
> import org.apache.pig.EvalFunc;
> import org.apache.pig.data.Tuple;
> import org.apache.pig.data.DataBag;
> import org.apache.pig.data.BagFactory;
> import org.apache.pig.data.TupleFactory;
> import org.apache.pig.impl.util.WrappedIOException;
> import org.apache.pig.impl.logicalLayer.schema.Schema;
> import org.apache.pig.data.DataType;
> import squeal.com.MutableInt;
> public class numgraph extends EvalFunc<DataBag>{
>       
>       TupleFactory mTupleFactory = TupleFactory.getInstance();
>       BagFactory mBagFactory = BagFactory.getInstance();
>       public DataBag exec(Tuple input) throws IOException {
>               try {
>                       accumulate(input);
>                       DataBag bag = getValue();
>                       System.out.println(input.get(0).toString());
>                       System.out.println(bag.toString());
>                       return bag;
>               } catch (Exception e) {
>                       int errCode = 31415;
>                       String msg = "Error while accumulating graphs (exec) " 
> + this.getClass().getSimpleName();
>                       throw new ExecException(msg, errCode, PigException.BUG, 
> e);
>               }
>       }
>       public void accumulate(Tuple input) throws IOException {
>               try {
>                       buildgraph(input);
>               } catch (Exception e) {
>                       int errCode = 31415;
>                       String msg = "Error while accumulating graphs 
> (accumulate) " + this.getClass().getSimpleName();
>                       throw new ExecException(msg, errCode, PigException.BUG, 
> e);
>               }
>       }
>       //public void cleanup() { thegraph.clear(); }
>       public DataBag getValue() throws IOException {
>               try {
>                       return thegraph.toBag();
>               } catch (Exception e) {
>                       int errCode = 31415;
>                       String msg = "Error while accumulating graphs 
> (getValue) " + this.getClass().getSimpleName();
>                       throw new ExecException(msg, errCode, PigException.BUG, 
> e);
>               }
>       }
>       Graph thegraph = null;
>       private class Graph {
>               Map<numpair, MutableInt> graph;
>               Graph() { graph = null; }
>               Graph(Map<numpair,MutableInt> gs) { graph = gs; }
>               Map<numpair,MutableInt> getGraph() { return graph; }
>               void setGraph(Map<numpair,MutableInt> gs) { graph = gs; }
>               
>               void inc(numpair look) {
>                       MutableInt val = graph.get(look);
>                       if (val == null) {
>                               val = new MutableInt();
>                               graph.put(look,val);
>                       } else {
>                               val.inc();
>                       }
>               }
>               void clear() { graph = null; }
>               @Override
>               public String toString() { return graph.toString(); }
>               void addPairsBag(DataBag c2s) throws IOException {
>                       try {
>                               List<String> c2list = new ArrayList<String>();
>                               for (Tuple tup : c2s) {
>                                       String cur = (String)tup.get(0);
>                                       for (String ne : c2list)
>                                               inc(new numpair(ne, cur));
>                                       c2list.add(cur);
>                               }
>                       } catch (Exception e) {
>                               int errCode = 31415;
>                               String msg = "Error while accumulating graphs 
> (addPairsBag) " + this.getClass().getSimpleName();
>                               throw new ExecException(msg, errCode, 
> PigException.BUG, e);
>                       }
>               }
>               //This creates a databag in the form of (c2, c2, hits)
>               DataBag toBag() throws IOException {
>                       try {
>                               DataBag outBag = mBagFactory.newDefaultBag();
>                               for (Map.Entry<numpair,MutableInt> pairs : 
> graph.entrySet()) {
>                                       List inList = new ArrayList();
>                                       Iterator<String> sIt = 
> pairs.getKey().getPartsIt();
>                                       inList.add(sIt.next()); 
> inList.add(sIt.next()); inList.add(pairs.getValue());
>                                       
> outBag.add(mTupleFactory.newTuple(inList));
>                               }
>                               return outBag;
>                       } catch (Exception e) {
>                               int errCode = 31415;
>                               String msg = "Error while accumulating graphs 
> (toBag) " + this.getClass().getSimpleName();
>                               throw new ExecException(msg, errCode, 
> PigException.BUG, e);
>                       }
>               }
>       }
>       
>       private class numpair {
>               Set<String> pair;
>               numpair(String p1, String p2) {
>                       pair = new HashSet<String>(2,1);
>                       pair.add(p1);
>                       pair.add(p2);
>               }
>               Set<String> getPair() { return pair; }
>               Iterator<String> getPartsIt() { return pair.iterator(); }
>               
>               @Override
>               public boolean equals(Object p) {
>                       return p instanceof numpair && 
> ((numpair)p).getPair().equals(pair); 
>               }
>               
>               @Override
>               public int hashCode() {
>                       return pair.hashCode();
>               }
>               public String toString() { return pair.toString(); }
>       }
>       private void buildgraph(Tuple input) throws IOException {
>               if (input == null || input.size() == 0) 
>                       return;
>               try {
>                       if (thegraph == null)
>                               thegraph = new Graph(new 
> HashMap<numpair,MutableInt>());
>                       if (thegraph.getGraph() == null)
>                               thegraph.setGraph(new HashMap<numpair, 
> MutableInt>());
>                       DataBag bag = (DataBag)input.get(0);
>                       for (Tuple ne : bag)
>                               thegraph.addPairsBag((DataBag)ne.get(0));
>               } catch (ExecException ee) {
>                       throw ee;
>               } catch (Exception e) {
>                       int errCode = 31415;
>                       String msg = "Error while accumulating graphs in " + 
> this.getClass().getSimpleName();
>                       throw new ExecException(msg, errCode, PigException.BUG, 
> e);
>               }
>       }
>       @Override
>       public Schema outputSchema(Schema input) {
>               try {
>                       Schema bagSchema = new Schema();
>                       bagSchema.add(new 
> Schema.FieldSchema("c2_1",DataType.CHARARRAY));
>                       bagSchema.add(new 
> Schema.FieldSchema("c2_2",DataType.CHARARRAY));
>                       bagSchema.add(new 
> Schema.FieldSchema("hits",DataType.INTEGER));
>                       return new Schema(new 
> Schema.FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), 
> input), bagSchema, DataType.BAG)); 
>               } catch (Exception e) {
>                       return null;
>               }
>       }
> }

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to