[ https://issues.apache.org/jira/browse/PIG-1826?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jonathan Coveney updated PIG-1826: ---------------------------------- Attachment: numgraph.java This is the UDF I made that it fails on The form of the script is register /path/to/myudf.jar; A = LOAD 'test.txt' as (a:chararray, b:chararray); B = GROUP A BY a; C = FOREACH B GENERATE A.b; D = GROUP C ALL; E = FOREACH D GENERATE myudf.fun.udf(C.b); > Unexpected data type -1 found in stream error > --------------------------------------------- > > Key: PIG-1826 > URL: https://issues.apache.org/jira/browse/PIG-1826 > Project: Pig > Issue Type: Bug > Affects Versions: 0.8.0 > Environment: This is pig 0.8.0 on a linux box > Reporter: Jonathan Coveney > Attachments: numgraph.java > > > When running the attached udf I get the title error. By inserting printlns > extensively, the script is functioning properly and returning a DataBag, but > for whatever reason, pig does not detect it as such. > package squeal.fun; > import java.util.Iterator; > import java.util.List; > import java.util.ArrayList; > import java.util.Map; > import java.util.HashMap; > import java.util.Set; > import java.util.HashSet; > import java.io.IOException; > import org.apache.pig.PigException; > import org.apache.pig.backend.executionengine.ExecException; > import org.apache.pig.EvalFunc; > import org.apache.pig.data.Tuple; > import org.apache.pig.data.DataBag; > import org.apache.pig.data.BagFactory; > import org.apache.pig.data.TupleFactory; > import org.apache.pig.impl.util.WrappedIOException; > import org.apache.pig.impl.logicalLayer.schema.Schema; > import org.apache.pig.data.DataType; > import squeal.com.MutableInt; > public class numgraph extends EvalFunc<DataBag>{ > > TupleFactory mTupleFactory = TupleFactory.getInstance(); > BagFactory mBagFactory = BagFactory.getInstance(); > public DataBag exec(Tuple input) throws IOException { > try { > accumulate(input); > DataBag bag = getValue(); > System.out.println(input.get(0).toString()); > System.out.println(bag.toString()); > return bag; > } catch (Exception e) { > int errCode = 31415; > String msg = "Error while accumulating graphs (exec) " > + this.getClass().getSimpleName(); > throw new ExecException(msg, errCode, PigException.BUG, > e); > } > } > public void accumulate(Tuple input) throws IOException { > try { > buildgraph(input); > } catch (Exception e) { > int errCode = 31415; > String msg = "Error while accumulating graphs > (accumulate) " + this.getClass().getSimpleName(); > throw new ExecException(msg, errCode, PigException.BUG, > e); > } > } > //public void cleanup() { thegraph.clear(); } > public DataBag getValue() throws IOException { > try { > return thegraph.toBag(); > } catch (Exception e) { > int errCode = 31415; > String msg = "Error while accumulating graphs > (getValue) " + this.getClass().getSimpleName(); > throw new ExecException(msg, errCode, PigException.BUG, > e); > } > } > Graph thegraph = null; > private class Graph { > Map<numpair, MutableInt> graph; > Graph() { graph = null; } > Graph(Map<numpair,MutableInt> gs) { graph = gs; } > Map<numpair,MutableInt> getGraph() { return graph; } > void setGraph(Map<numpair,MutableInt> gs) { graph = gs; } > > void inc(numpair look) { > MutableInt val = graph.get(look); > if (val == null) { > val = new MutableInt(); > graph.put(look,val); > } else { > val.inc(); > } > } > void clear() { graph = null; } > @Override > public String toString() { return graph.toString(); } > void addPairsBag(DataBag c2s) throws IOException { > try { > List<String> c2list = new ArrayList<String>(); > for (Tuple tup : c2s) { > String cur = (String)tup.get(0); > for (String ne : c2list) > inc(new numpair(ne, cur)); > c2list.add(cur); > } > } catch (Exception e) { > int errCode = 31415; > String msg = "Error while accumulating graphs > (addPairsBag) " + this.getClass().getSimpleName(); > throw new ExecException(msg, errCode, > PigException.BUG, e); > } > } > //This creates a databag in the form of (c2, c2, hits) > DataBag toBag() throws IOException { > try { > DataBag outBag = mBagFactory.newDefaultBag(); > for (Map.Entry<numpair,MutableInt> pairs : > graph.entrySet()) { > List inList = new ArrayList(); > Iterator<String> sIt = > pairs.getKey().getPartsIt(); > inList.add(sIt.next()); > inList.add(sIt.next()); inList.add(pairs.getValue()); > > outBag.add(mTupleFactory.newTuple(inList)); > } > return outBag; > } catch (Exception e) { > int errCode = 31415; > String msg = "Error while accumulating graphs > (toBag) " + this.getClass().getSimpleName(); > throw new ExecException(msg, errCode, > PigException.BUG, e); > } > } > } > > private class numpair { > Set<String> pair; > numpair(String p1, String p2) { > pair = new HashSet<String>(2,1); > pair.add(p1); > pair.add(p2); > } > Set<String> getPair() { return pair; } > Iterator<String> getPartsIt() { return pair.iterator(); } > > @Override > public boolean equals(Object p) { > return p instanceof numpair && > ((numpair)p).getPair().equals(pair); > } > > @Override > public int hashCode() { > return pair.hashCode(); > } > public String toString() { return pair.toString(); } > } > private void buildgraph(Tuple input) throws IOException { > if (input == null || input.size() == 0) > return; > try { > if (thegraph == null) > thegraph = new Graph(new > HashMap<numpair,MutableInt>()); > if (thegraph.getGraph() == null) > thegraph.setGraph(new HashMap<numpair, > MutableInt>()); > DataBag bag = (DataBag)input.get(0); > for (Tuple ne : bag) > thegraph.addPairsBag((DataBag)ne.get(0)); > } catch (ExecException ee) { > throw ee; > } catch (Exception e) { > int errCode = 31415; > String msg = "Error while accumulating graphs in " + > this.getClass().getSimpleName(); > throw new ExecException(msg, errCode, PigException.BUG, > e); > } > } > @Override > public Schema outputSchema(Schema input) { > try { > Schema bagSchema = new Schema(); > bagSchema.add(new > Schema.FieldSchema("c2_1",DataType.CHARARRAY)); > bagSchema.add(new > Schema.FieldSchema("c2_2",DataType.CHARARRAY)); > bagSchema.add(new > Schema.FieldSchema("hits",DataType.INTEGER)); > return new Schema(new > Schema.FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), > input), bagSchema, DataType.BAG)); > } catch (Exception e) { > return null; > } > } > } -- This message is automatically generated by JIRA. - You can reply to this email to add a comment to the issue online.