I did a search online, and while someone had the same error, I don't think
it was related. From the error log, I see this...
Caused by: java.lang.RuntimeException: Final function of
squeal.map.makecountalgq is not of the expected type.
at org.apache.pig.EvalFunc.<init>(EvalFunc.java:146)
at squeal.map.makecountalgq.<init>(makecountalgq.java:32)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27)
at java.lang.reflect.Constructor.newInstance(Constructor.java:513)
at java.lang.Class.newInstance0(Class.java:355)
at java.lang.Class.newInstance(Class.java:308)
at
org.apache.pig.impl.PigContext.instantiateFuncFromSpec(PigContext.java:472)
But here is my udf (I've truncated a bit, but it works if it isn't
algebraic)
public class makecountalgq extends EvalFunc<Map<String,Integer>> implements
Algebraic, Accumulator<Map<String,Integer>> {
static TupleFactory mTupleFactory = TupleFactory.getInstance();
static BagFactory mBagFactory = BagFactory.getInstance();
private MapWrapper ourWrap = new MapWrapper();
public String getInitial() { return Initial.class.getName(); }
public String getIntermed() { return Intermed.class.getName(); }
public String getFinal() { return Final.class.getName(); }
//An initial tuple, crates partial results
public static class Initial extends EvalFunc<Tuple> {
public Tuple exec(Tuple input) throws IOException {
MapWrapper aWrap = new MapWrapper(new
HashMap<String,MutableInt>());
DataBag bag = (DataBag)input.get(0);
aWrap.addSBag(bag);
return mTupleFactory.newTuple(aWrap);
}
}
//Called on the result of the Initial, or on another intermediate
public static class Intermed extends EvalFunc<Tuple> {
public Tuple exec(Tuple input) throws IOException {
return
mTupleFactory.newTuple(mergeWrappers((DataBag)input.get(0)));
}
}
//Can take the tuple that initial or intermed creates
public static class Final extends EvalFunc<Map<String,Integer>> {
public Map<String,Integer> exec(Tuple input) throws
IOException {
return
mergeWrappers((DataBag)input.get(0)).convertMap();
}
}
//takes a bag of MapWrappers and merges them
public static MapWrapper mergeWrappers(DataBag bag) throws
IOException {
Iterator<Tuple> bagIt = bag.iterator();
MapWrapper aWrap = (MapWrapper)bagIt.next().get(0);
while (bagIt.hasNext())
aWrap.merge((MapWrapper)bagIt.next().get(0));
return aWrap;
}
}
Any idea why I would get this error? It compiles fine, and if I even just
take out Algebraic, I can use the UDF (it just uses the accumulator). I
don't see how the type is off...
Any ideas? (in case the context matters: this is a script that takes a list
of (a, b:chararray), and gives you, for a given key, a map from b-># of
times b occured.
Here is how I invoke it:
register squeal.jar;
A = LOAD 'test.txt' AS (a:chararray, b:chararray);
B = GROUP A BY a;
C = FOREACH B GENERATE group, squeal.map.makecountalgq(A.b) as mapz;
And I get the error.
Thanks in advance. I guess I could content myself with an Accumulator but
the Algebraic solution is so close... a MapWrapper is a class I created that
lets me pass around Maps and whatnot.