Unexpected data type -1 found in stream error ---------------------------------------------
Key: PIG-1826 URL: https://issues.apache.org/jira/browse/PIG-1826 Project: Pig Issue Type: Bug Affects Versions: 0.8.0 Environment: This is pig 0.8.0 on a linux box Reporter: Jonathan Coveney Attachments: numgraph.java When running the attached udf I get the title error. By inserting printlns extensively, the script is functioning properly and returning a DataBag, but for whatever reason, pig does not detect it as such. package squeal.fun; import java.util.Iterator; import java.util.List; import java.util.ArrayList; import java.util.Map; import java.util.HashMap; import java.util.Set; import java.util.HashSet; import java.io.IOException; import org.apache.pig.PigException; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.EvalFunc; import org.apache.pig.data.Tuple; import org.apache.pig.data.DataBag; import org.apache.pig.data.BagFactory; import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.util.WrappedIOException; import org.apache.pig.impl.logicalLayer.schema.Schema; import org.apache.pig.data.DataType; import squeal.com.MutableInt; public class numgraph extends EvalFunc<DataBag>{ TupleFactory mTupleFactory = TupleFactory.getInstance(); BagFactory mBagFactory = BagFactory.getInstance(); public DataBag exec(Tuple input) throws IOException { try { accumulate(input); DataBag bag = getValue(); System.out.println(input.get(0).toString()); System.out.println(bag.toString()); return bag; } catch (Exception e) { int errCode = 31415; String msg = "Error while accumulating graphs (exec) " + this.getClass().getSimpleName(); throw new ExecException(msg, errCode, PigException.BUG, e); } } public void accumulate(Tuple input) throws IOException { try { buildgraph(input); } catch (Exception e) { int errCode = 31415; String msg = "Error while accumulating graphs (accumulate) " + this.getClass().getSimpleName(); throw new ExecException(msg, errCode, PigException.BUG, e); } } //public void cleanup() { thegraph.clear(); } public DataBag getValue() throws IOException { try { return thegraph.toBag(); } catch (Exception e) { int errCode = 31415; String msg = "Error while accumulating graphs (getValue) " + this.getClass().getSimpleName(); throw new ExecException(msg, errCode, PigException.BUG, e); } } Graph thegraph = null; private class Graph { Map<numpair, MutableInt> graph; Graph() { graph = null; } Graph(Map<numpair,MutableInt> gs) { graph = gs; } Map<numpair,MutableInt> getGraph() { return graph; } void setGraph(Map<numpair,MutableInt> gs) { graph = gs; } void inc(numpair look) { MutableInt val = graph.get(look); if (val == null) { val = new MutableInt(); graph.put(look,val); } else { val.inc(); } } void clear() { graph = null; } @Override public String toString() { return graph.toString(); } void addPairsBag(DataBag c2s) throws IOException { try { List<String> c2list = new ArrayList<String>(); for (Tuple tup : c2s) { String cur = (String)tup.get(0); for (String ne : c2list) inc(new numpair(ne, cur)); c2list.add(cur); } } catch (Exception e) { int errCode = 31415; String msg = "Error while accumulating graphs (addPairsBag) " + this.getClass().getSimpleName(); throw new ExecException(msg, errCode, PigException.BUG, e); } } //This creates a databag in the form of (c2, c2, hits) DataBag toBag() throws IOException { try { DataBag outBag = mBagFactory.newDefaultBag(); for (Map.Entry<numpair,MutableInt> pairs : graph.entrySet()) { List inList = new ArrayList(); Iterator<String> sIt = pairs.getKey().getPartsIt(); inList.add(sIt.next()); inList.add(sIt.next()); inList.add(pairs.getValue()); outBag.add(mTupleFactory.newTuple(inList)); } return outBag; } catch (Exception e) { int errCode = 31415; String msg = "Error while accumulating graphs (toBag) " + this.getClass().getSimpleName(); throw new ExecException(msg, errCode, PigException.BUG, e); } } } private class numpair { Set<String> pair; numpair(String p1, String p2) { pair = new HashSet<String>(2,1); pair.add(p1); pair.add(p2); } Set<String> getPair() { return pair; } Iterator<String> getPartsIt() { return pair.iterator(); } @Override public boolean equals(Object p) { return p instanceof numpair && ((numpair)p).getPair().equals(pair); } @Override public int hashCode() { return pair.hashCode(); } public String toString() { return pair.toString(); } } private void buildgraph(Tuple input) throws IOException { if (input == null || input.size() == 0) return; try { if (thegraph == null) thegraph = new Graph(new HashMap<numpair,MutableInt>()); if (thegraph.getGraph() == null) thegraph.setGraph(new HashMap<numpair, MutableInt>()); DataBag bag = (DataBag)input.get(0); for (Tuple ne : bag) thegraph.addPairsBag((DataBag)ne.get(0)); } catch (ExecException ee) { throw ee; } catch (Exception e) { int errCode = 31415; String msg = "Error while accumulating graphs in " + this.getClass().getSimpleName(); throw new ExecException(msg, errCode, PigException.BUG, e); } } @Override public Schema outputSchema(Schema input) { try { Schema bagSchema = new Schema(); bagSchema.add(new Schema.FieldSchema("c2_1",DataType.CHARARRAY)); bagSchema.add(new Schema.FieldSchema("c2_2",DataType.CHARARRAY)); bagSchema.add(new Schema.FieldSchema("hits",DataType.INTEGER)); return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input), bagSchema, DataType.BAG)); } catch (Exception e) { return null; } } } -- This message is automatically generated by JIRA. - You can reply to this email to add a comment to the issue online.