Hey guys, patch is attached + tested on unit-tests + We're testing it on a 1000-nodes real hadoop cluster as we speak. Do you want us to create a jira issue for this, or is this good enough? Thanks, Ilia and Ido
On 7 March 2015 at 23:09, Matthew Hayes <matthew.terence.ha...@gmail.com> wrote: > I don't remember if there was a particular reason I didn't implement this > as AlgebraicEvalFunc. It seems like it could be. I believe the Java > MapReduce version leverages the combiner. If you want to try making this > Algebraic we would be happy to accept a patch :) > > -Matt > > > On Mar 7, 2015, at 12:11 PM, Ido Hadanny <ido.hada...@gmail.com> wrote: > > > > data.fu has a nice implementation of HyperLogLog for estimating > cardinality > > here > > < > https://github.com/apache/incubator-datafu/blob/master/datafu-pig/src/main/java/datafu/pig/stats/HyperLogLogPlusPlus.java > > > > > > However, it's implemented as Accumulator which means it will run only at > > the reducer and not in the combiner (but it will never load the entire > set > > into memory as in normal EvalFunc). Why couldn't data.fu implement it as > > Algebraic - and fill the registers at every combiner, then merge and > reduce > > the result? Am I missing something here? > > also available here: > > > http://stackoverflow.com/questions/28908217/why-is-data-fu-implementing-hyperloglog-as-an-accumulator-and-not-as-algebraic > > > > thanks! > > > > > > -- > > Sent from my androido > -- Sent from my androido
diff --git a/datafu-pig/src/main/java/datafu/pig/stats/HyperLogLogPlusPlus.java b/datafu-pig/src/main/java/datafu/pig/stats/HyperLogLogPlusPlus.java index 95c5b0e..bb5810f 100644 --- a/datafu-pig/src/main/java/datafu/pig/stats/HyperLogLogPlusPlus.java +++ b/datafu-pig/src/main/java/datafu/pig/stats/HyperLogLogPlusPlus.java @@ -20,14 +20,24 @@ package datafu.pig.stats; import java.io.IOException; +import java.util.Iterator; import org.apache.pig.AccumulatorEvalFunc; +import org.apache.pig.EvalFunc; +import org.apache.pig.PigException; +import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.data.DataBag; +import org.apache.pig.data.DataByteArray; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.logicalLayer.FrontendException; import org.apache.pig.impl.logicalLayer.schema.Schema; +import com.clearspring.analytics.hash.MurmurHash; +import com.clearspring.analytics.stream.cardinality.CardinalityMergeException; +import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus; + /** * A UDF that applies the HyperLogLog++ cardinality estimation algorithm. * @@ -45,8 +55,9 @@ import org.apache.pig.impl.logicalLayer.schema.Schema; public class HyperLogLogPlusPlus extends AccumulatorEvalFunc<Long> { private com.clearspring.analytics.stream.cardinality.HyperLogLogPlus estimator; - - private final int p; + private static TupleFactory mTupleFactory = TupleFactory.getInstance(); + + private static int p; /** * Constructs a HyperLogLog++ estimator. @@ -61,9 +72,9 @@ public class HyperLogLogPlusPlus extends AccumulatorEvalFunc<Long> * * @param p precision value */ - public HyperLogLogPlusPlus(String p) + public HyperLogLogPlusPlus(String par) { - this.p = Integer.parseInt(p); + p = Integer.parseInt(par); cleanup(); } @@ -111,4 +122,94 @@ public class HyperLogLogPlusPlus extends AccumulatorEvalFunc<Long> throw new RuntimeException(e); } } + + public String getInitial() { + return Initial.class.getName(); + } + + public String getIntermed() { + return Intermediate.class.getName(); + } + + public String getFinal() { + return Final.class.getName(); + } + + static public class Initial extends EvalFunc<Tuple> { + + @Override + public Tuple exec(Tuple input) throws IOException { + // Since Initial is guaranteed to be called + // only in the map, it will be called with an + // input of a bag with a single tuple - the + // count should always be 1 if bag is non empty + DataBag bag = (DataBag) input.get(0); + Iterator<Tuple> it = bag.iterator(); + if (it.hasNext()) { + Tuple t = (Tuple) it.next(); + if (t != null && t.size() > 0 && t.get(0) != null) { + long x = MurmurHash.hash64(t); + return mTupleFactory.newTuple((Object) x); + } + } + return mTupleFactory.newTuple((Object) MurmurHash.hash64(null)); + } + } + + static public class Intermediate extends EvalFunc<Tuple> { + @Override + public Tuple exec(Tuple input) throws IOException { + try { + DataByteArray data = new DataByteArray(countDisctinct(input).getBytes()); + return mTupleFactory.newTuple(data); + } catch (ExecException ee) { + throw ee; + } catch (Exception e) { + int errCode = 2106; + String msg = "Error while computing count in " + + this.getClass().getSimpleName(); + throw new ExecException(msg, errCode, PigException.BUG, e); + } + } + } + + static public class Final extends EvalFunc<Long> { + @Override + public Long exec(Tuple input) throws IOException { + try { + return countDisctinct(input).cardinality(); + } catch (Exception ee) { + int errCode = 2106; + String msg = "Error while computing count in " + + this.getClass().getSimpleName(); + throw new ExecException(msg, errCode, PigException.BUG, ee); + } + } + } + + static protected HyperLogLogPlus countDisctinct(Tuple input) + throws NumberFormatException, IOException { + HyperLogLogPlus estimator = new HyperLogLogPlus(p); + DataBag values = (DataBag) input.get(0); + for (Iterator<Tuple> it = values.iterator(); it.hasNext();) { + Tuple t = it.next(); + Object data = t.get(0); + if (data instanceof Long) { + estimator.offer(data); + } else if (data instanceof DataByteArray) { + DataByteArray bytes = (DataByteArray) data; + HyperLogLogPlus newEstimator; + try { + newEstimator = HyperLogLogPlus.Builder.build(bytes.get()); + estimator = (HyperLogLogPlus) estimator.merge(newEstimator); + } catch (IOException e) { + throw new RuntimeException(e); + } catch (CardinalityMergeException e) { + throw new RuntimeException(e); + } + } + } + return estimator; + } + }