Hey guys,
patch is attached + tested on unit-tests + We're testing it on a 1000-nodes
real hadoop cluster as we speak.
Do you want us to create a jira issue for this, or is this good enough?
Thanks, Ilia and Ido
On 7 March 2015 at 23:09, Matthew Hayes <[email protected]>
wrote:
> I don't remember if there was a particular reason I didn't implement this
> as AlgebraicEvalFunc. It seems like it could be. I believe the Java
> MapReduce version leverages the combiner. If you want to try making this
> Algebraic we would be happy to accept a patch :)
>
> -Matt
>
> > On Mar 7, 2015, at 12:11 PM, Ido Hadanny <[email protected]> wrote:
> >
> > data.fu has a nice implementation of HyperLogLog for estimating
> cardinality
> > here
> > <
> https://github.com/apache/incubator-datafu/blob/master/datafu-pig/src/main/java/datafu/pig/stats/HyperLogLogPlusPlus.java
> >
> >
> > However, it's implemented as Accumulator which means it will run only at
> > the reducer and not in the combiner (but it will never load the entire
> set
> > into memory as in normal EvalFunc). Why couldn't data.fu implement it as
> > Algebraic - and fill the registers at every combiner, then merge and
> reduce
> > the result? Am I missing something here?
> > also available here:
> >
> http://stackoverflow.com/questions/28908217/why-is-data-fu-implementing-hyperloglog-as-an-accumulator-and-not-as-algebraic
> >
> > thanks!
> >
> >
> > --
> > Sent from my androido
>
--
Sent from my androido
diff --git a/datafu-pig/src/main/java/datafu/pig/stats/HyperLogLogPlusPlus.java
b/datafu-pig/src/main/java/datafu/pig/stats/HyperLogLogPlusPlus.java
index 95c5b0e..bb5810f 100644
--- a/datafu-pig/src/main/java/datafu/pig/stats/HyperLogLogPlusPlus.java
+++ b/datafu-pig/src/main/java/datafu/pig/stats/HyperLogLogPlusPlus.java
@@ -20,14 +20,24 @@
package datafu.pig.stats;
import java.io.IOException;
+import java.util.Iterator;
import org.apache.pig.AccumulatorEvalFunc;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
+import com.clearspring.analytics.hash.MurmurHash;
+import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
+import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
+
/**
* A UDF that applies the HyperLogLog++ cardinality estimation algorithm.
*
@@ -45,8 +55,9 @@ import org.apache.pig.impl.logicalLayer.schema.Schema;
public class HyperLogLogPlusPlus extends AccumulatorEvalFunc<Long>
{
private com.clearspring.analytics.stream.cardinality.HyperLogLogPlus
estimator;
-
- private final int p;
+ private static TupleFactory mTupleFactory = TupleFactory.getInstance();
+
+ private static int p;
/**
* Constructs a HyperLogLog++ estimator.
@@ -61,9 +72,9 @@ public class HyperLogLogPlusPlus extends
AccumulatorEvalFunc<Long>
*
* @param p precision value
*/
- public HyperLogLogPlusPlus(String p)
+ public HyperLogLogPlusPlus(String par)
{
- this.p = Integer.parseInt(p);
+ p = Integer.parseInt(par);
cleanup();
}
@@ -111,4 +122,94 @@ public class HyperLogLogPlusPlus extends
AccumulatorEvalFunc<Long>
throw new RuntimeException(e);
}
}
+
+ public String getInitial() {
+ return Initial.class.getName();
+ }
+
+ public String getIntermed() {
+ return Intermediate.class.getName();
+ }
+
+ public String getFinal() {
+ return Final.class.getName();
+ }
+
+ static public class Initial extends EvalFunc<Tuple> {
+
+ @Override
+ public Tuple exec(Tuple input) throws IOException {
+ // Since Initial is guaranteed to be called
+ // only in the map, it will be called with an
+ // input of a bag with a single tuple - the
+ // count should always be 1 if bag is non empty
+ DataBag bag = (DataBag) input.get(0);
+ Iterator<Tuple> it = bag.iterator();
+ if (it.hasNext()) {
+ Tuple t = (Tuple) it.next();
+ if (t != null && t.size() > 0 && t.get(0) !=
null) {
+ long x = MurmurHash.hash64(t);
+ return mTupleFactory.newTuple((Object)
x);
+ }
+ }
+ return mTupleFactory.newTuple((Object)
MurmurHash.hash64(null));
+ }
+ }
+
+ static public class Intermediate extends EvalFunc<Tuple> {
+ @Override
+ public Tuple exec(Tuple input) throws IOException {
+ try {
+ DataByteArray data = new
DataByteArray(countDisctinct(input).getBytes());
+ return mTupleFactory.newTuple(data);
+ } catch (ExecException ee) {
+ throw ee;
+ } catch (Exception e) {
+ int errCode = 2106;
+ String msg = "Error while computing count in "
+ +
this.getClass().getSimpleName();
+ throw new ExecException(msg, errCode,
PigException.BUG, e);
+ }
+ }
+ }
+
+ static public class Final extends EvalFunc<Long> {
+ @Override
+ public Long exec(Tuple input) throws IOException {
+ try {
+ return countDisctinct(input).cardinality();
+ } catch (Exception ee) {
+ int errCode = 2106;
+ String msg = "Error while computing count in "
+ +
this.getClass().getSimpleName();
+ throw new ExecException(msg, errCode,
PigException.BUG, ee);
+ }
+ }
+ }
+
+ static protected HyperLogLogPlus countDisctinct(Tuple input)
+ throws NumberFormatException, IOException {
+ HyperLogLogPlus estimator = new HyperLogLogPlus(p);
+ DataBag values = (DataBag) input.get(0);
+ for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
+ Tuple t = it.next();
+ Object data = t.get(0);
+ if (data instanceof Long) {
+ estimator.offer(data);
+ } else if (data instanceof DataByteArray) {
+ DataByteArray bytes = (DataByteArray) data;
+ HyperLogLogPlus newEstimator;
+ try {
+ newEstimator =
HyperLogLogPlus.Builder.build(bytes.get());
+ estimator = (HyperLogLogPlus)
estimator.merge(newEstimator);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } catch (CardinalityMergeException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ return estimator;
+ }
+
}