[
https://issues.apache.org/jira/browse/PIG-1826?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jonathan Coveney updated PIG-1826:
----------------------------------
Attachment: numgraph.java
This is the UDF I made that it fails on
The form of the script is
register /path/to/myudf.jar;
A = LOAD 'test.txt' as (a:chararray, b:chararray);
B = GROUP A BY a;
C = FOREACH B GENERATE A.b;
D = GROUP C ALL;
E = FOREACH D GENERATE myudf.fun.udf(C.b);
> Unexpected data type -1 found in stream error
> ---------------------------------------------
>
> Key: PIG-1826
> URL: https://issues.apache.org/jira/browse/PIG-1826
> Project: Pig
> Issue Type: Bug
> Affects Versions: 0.8.0
> Environment: This is pig 0.8.0 on a linux box
> Reporter: Jonathan Coveney
> Attachments: numgraph.java
>
>
> When running the attached udf I get the title error. By inserting printlns
> extensively, the script is functioning properly and returning a DataBag, but
> for whatever reason, pig does not detect it as such.
> package squeal.fun;
> import java.util.Iterator;
> import java.util.List;
> import java.util.ArrayList;
> import java.util.Map;
> import java.util.HashMap;
> import java.util.Set;
> import java.util.HashSet;
> import java.io.IOException;
> import org.apache.pig.PigException;
> import org.apache.pig.backend.executionengine.ExecException;
> import org.apache.pig.EvalFunc;
> import org.apache.pig.data.Tuple;
> import org.apache.pig.data.DataBag;
> import org.apache.pig.data.BagFactory;
> import org.apache.pig.data.TupleFactory;
> import org.apache.pig.impl.util.WrappedIOException;
> import org.apache.pig.impl.logicalLayer.schema.Schema;
> import org.apache.pig.data.DataType;
> import squeal.com.MutableInt;
> public class numgraph extends EvalFunc<DataBag>{
>
> TupleFactory mTupleFactory = TupleFactory.getInstance();
> BagFactory mBagFactory = BagFactory.getInstance();
> public DataBag exec(Tuple input) throws IOException {
> try {
> accumulate(input);
> DataBag bag = getValue();
> System.out.println(input.get(0).toString());
> System.out.println(bag.toString());
> return bag;
> } catch (Exception e) {
> int errCode = 31415;
> String msg = "Error while accumulating graphs (exec) "
> + this.getClass().getSimpleName();
> throw new ExecException(msg, errCode, PigException.BUG,
> e);
> }
> }
> public void accumulate(Tuple input) throws IOException {
> try {
> buildgraph(input);
> } catch (Exception e) {
> int errCode = 31415;
> String msg = "Error while accumulating graphs
> (accumulate) " + this.getClass().getSimpleName();
> throw new ExecException(msg, errCode, PigException.BUG,
> e);
> }
> }
> //public void cleanup() { thegraph.clear(); }
> public DataBag getValue() throws IOException {
> try {
> return thegraph.toBag();
> } catch (Exception e) {
> int errCode = 31415;
> String msg = "Error while accumulating graphs
> (getValue) " + this.getClass().getSimpleName();
> throw new ExecException(msg, errCode, PigException.BUG,
> e);
> }
> }
> Graph thegraph = null;
> private class Graph {
> Map<numpair, MutableInt> graph;
> Graph() { graph = null; }
> Graph(Map<numpair,MutableInt> gs) { graph = gs; }
> Map<numpair,MutableInt> getGraph() { return graph; }
> void setGraph(Map<numpair,MutableInt> gs) { graph = gs; }
>
> void inc(numpair look) {
> MutableInt val = graph.get(look);
> if (val == null) {
> val = new MutableInt();
> graph.put(look,val);
> } else {
> val.inc();
> }
> }
> void clear() { graph = null; }
> @Override
> public String toString() { return graph.toString(); }
> void addPairsBag(DataBag c2s) throws IOException {
> try {
> List<String> c2list = new ArrayList<String>();
> for (Tuple tup : c2s) {
> String cur = (String)tup.get(0);
> for (String ne : c2list)
> inc(new numpair(ne, cur));
> c2list.add(cur);
> }
> } catch (Exception e) {
> int errCode = 31415;
> String msg = "Error while accumulating graphs
> (addPairsBag) " + this.getClass().getSimpleName();
> throw new ExecException(msg, errCode,
> PigException.BUG, e);
> }
> }
> //This creates a databag in the form of (c2, c2, hits)
> DataBag toBag() throws IOException {
> try {
> DataBag outBag = mBagFactory.newDefaultBag();
> for (Map.Entry<numpair,MutableInt> pairs :
> graph.entrySet()) {
> List inList = new ArrayList();
> Iterator<String> sIt =
> pairs.getKey().getPartsIt();
> inList.add(sIt.next());
> inList.add(sIt.next()); inList.add(pairs.getValue());
>
> outBag.add(mTupleFactory.newTuple(inList));
> }
> return outBag;
> } catch (Exception e) {
> int errCode = 31415;
> String msg = "Error while accumulating graphs
> (toBag) " + this.getClass().getSimpleName();
> throw new ExecException(msg, errCode,
> PigException.BUG, e);
> }
> }
> }
>
> private class numpair {
> Set<String> pair;
> numpair(String p1, String p2) {
> pair = new HashSet<String>(2,1);
> pair.add(p1);
> pair.add(p2);
> }
> Set<String> getPair() { return pair; }
> Iterator<String> getPartsIt() { return pair.iterator(); }
>
> @Override
> public boolean equals(Object p) {
> return p instanceof numpair &&
> ((numpair)p).getPair().equals(pair);
> }
>
> @Override
> public int hashCode() {
> return pair.hashCode();
> }
> public String toString() { return pair.toString(); }
> }
> private void buildgraph(Tuple input) throws IOException {
> if (input == null || input.size() == 0)
> return;
> try {
> if (thegraph == null)
> thegraph = new Graph(new
> HashMap<numpair,MutableInt>());
> if (thegraph.getGraph() == null)
> thegraph.setGraph(new HashMap<numpair,
> MutableInt>());
> DataBag bag = (DataBag)input.get(0);
> for (Tuple ne : bag)
> thegraph.addPairsBag((DataBag)ne.get(0));
> } catch (ExecException ee) {
> throw ee;
> } catch (Exception e) {
> int errCode = 31415;
> String msg = "Error while accumulating graphs in " +
> this.getClass().getSimpleName();
> throw new ExecException(msg, errCode, PigException.BUG,
> e);
> }
> }
> @Override
> public Schema outputSchema(Schema input) {
> try {
> Schema bagSchema = new Schema();
> bagSchema.add(new
> Schema.FieldSchema("c2_1",DataType.CHARARRAY));
> bagSchema.add(new
> Schema.FieldSchema("c2_2",DataType.CHARARRAY));
> bagSchema.add(new
> Schema.FieldSchema("hits",DataType.INTEGER));
> return new Schema(new
> Schema.FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(),
> input), bagSchema, DataType.BAG));
> } catch (Exception e) {
> return null;
> }
> }
> }
--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.