Unexpected data type -1 found in stream error
---------------------------------------------

                 Key: PIG-1826
                 URL: https://issues.apache.org/jira/browse/PIG-1826
             Project: Pig
          Issue Type: Bug
    Affects Versions: 0.8.0
         Environment: This is pig 0.8.0 on a linux box
            Reporter: Jonathan Coveney
         Attachments: numgraph.java

When running the attached udf I get the title error. By inserting printlns 
extensively, the script is functioning properly and returning a DataBag, but 
for whatever reason, pig does not detect it as such.

package squeal.fun;
import java.util.Iterator;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.util.HashMap;
import java.util.Set;
import java.util.HashSet;
import java.io.IOException;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.util.WrappedIOException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.data.DataType;
import squeal.com.MutableInt;

public class numgraph extends EvalFunc<DataBag>{
        
        TupleFactory mTupleFactory = TupleFactory.getInstance();
        BagFactory mBagFactory = BagFactory.getInstance();

        public DataBag exec(Tuple input) throws IOException {
                try {
                        accumulate(input);
                        DataBag bag = getValue();
                        System.out.println(input.get(0).toString());
                        System.out.println(bag.toString());
                        return bag;

                } catch (Exception e) {
                        int errCode = 31415;
                        String msg = "Error while accumulating graphs (exec) " 
+ this.getClass().getSimpleName();
                        throw new ExecException(msg, errCode, PigException.BUG, 
e);
                }
        }

        public void accumulate(Tuple input) throws IOException {
                try {
                        buildgraph(input);
                } catch (Exception e) {
                        int errCode = 31415;
                        String msg = "Error while accumulating graphs 
(accumulate) " + this.getClass().getSimpleName();
                        throw new ExecException(msg, errCode, PigException.BUG, 
e);
                }
        }

        //public void cleanup() { thegraph.clear(); }

        public DataBag getValue() throws IOException {
                try {
                        return thegraph.toBag();
                } catch (Exception e) {
                        int errCode = 31415;
                        String msg = "Error while accumulating graphs 
(getValue) " + this.getClass().getSimpleName();
                        throw new ExecException(msg, errCode, PigException.BUG, 
e);
                }
        }

        Graph thegraph = null;
        private class Graph {
                Map<numpair, MutableInt> graph;

                Graph() { graph = null; }
                Graph(Map<numpair,MutableInt> gs) { graph = gs; }

                Map<numpair,MutableInt> getGraph() { return graph; }
                void setGraph(Map<numpair,MutableInt> gs) { graph = gs; }
                
                void inc(numpair look) {
                        MutableInt val = graph.get(look);
                        if (val == null) {
                                val = new MutableInt();
                                graph.put(look,val);
                        } else {
                                val.inc();
                        }
                }

                void clear() { graph = null; }

                @Override
                public String toString() { return graph.toString(); }

                void addPairsBag(DataBag c2s) throws IOException {
                        try {
                                List<String> c2list = new ArrayList<String>();
                                for (Tuple tup : c2s) {
                                        String cur = (String)tup.get(0);
                                        for (String ne : c2list)
                                                inc(new numpair(ne, cur));
                                        c2list.add(cur);
                                }
                        } catch (Exception e) {
                                int errCode = 31415;
                                String msg = "Error while accumulating graphs 
(addPairsBag) " + this.getClass().getSimpleName();
                                throw new ExecException(msg, errCode, 
PigException.BUG, e);
                        }
                }

                //This creates a databag in the form of (c2, c2, hits)
                DataBag toBag() throws IOException {
                        try {
                                DataBag outBag = mBagFactory.newDefaultBag();
                                for (Map.Entry<numpair,MutableInt> pairs : 
graph.entrySet()) {
                                        List inList = new ArrayList();
                                        Iterator<String> sIt = 
pairs.getKey().getPartsIt();
                                        inList.add(sIt.next()); 
inList.add(sIt.next()); inList.add(pairs.getValue());
                                        
outBag.add(mTupleFactory.newTuple(inList));
                                }
                                return outBag;
                        } catch (Exception e) {
                                int errCode = 31415;
                                String msg = "Error while accumulating graphs 
(toBag) " + this.getClass().getSimpleName();
                                throw new ExecException(msg, errCode, 
PigException.BUG, e);
                        }
                }
        }
        
        private class numpair {
                Set<String> pair;

                numpair(String p1, String p2) {
                        pair = new HashSet<String>(2,1);
                        pair.add(p1);
                        pair.add(p2);
                }
                Set<String> getPair() { return pair; }
                Iterator<String> getPartsIt() { return pair.iterator(); }
                
                @Override
                public boolean equals(Object p) {
                        return p instanceof numpair && 
((numpair)p).getPair().equals(pair); 
                }
                
                @Override
                public int hashCode() {
                        return pair.hashCode();
                }

                public String toString() { return pair.toString(); }
        }

        private void buildgraph(Tuple input) throws IOException {
                if (input == null || input.size() == 0) 
                        return;
                try {
                        if (thegraph == null)
                                thegraph = new Graph(new 
HashMap<numpair,MutableInt>());
                        if (thegraph.getGraph() == null)
                                thegraph.setGraph(new HashMap<numpair, 
MutableInt>());
                        DataBag bag = (DataBag)input.get(0);
                        for (Tuple ne : bag)
                                thegraph.addPairsBag((DataBag)ne.get(0));
                } catch (ExecException ee) {
                        throw ee;
                } catch (Exception e) {
                        int errCode = 31415;
                        String msg = "Error while accumulating graphs in " + 
this.getClass().getSimpleName();
                        throw new ExecException(msg, errCode, PigException.BUG, 
e);
                }
        }


        @Override
        public Schema outputSchema(Schema input) {
                try {
                        Schema bagSchema = new Schema();
                        bagSchema.add(new 
Schema.FieldSchema("c2_1",DataType.CHARARRAY));
                        bagSchema.add(new 
Schema.FieldSchema("c2_2",DataType.CHARARRAY));
                        bagSchema.add(new 
Schema.FieldSchema("hits",DataType.INTEGER));
                        return new Schema(new 
Schema.FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), 
input), bagSchema, DataType.BAG)); 
                } catch (Exception e) {
                        return null;
                }
        }
}

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to