Hey guys,
patch is attached + tested on unit-tests + We're testing it on a 1000-nodes
real hadoop cluster as we speak.
Do you want us to create a jira issue for this, or is this good enough?
Thanks, Ilia and Ido

On 7 March 2015 at 23:09, Matthew Hayes <matthew.terence.ha...@gmail.com>
wrote:

> I don't remember if there was a particular reason I didn't implement this
> as AlgebraicEvalFunc. It seems like it could be. I believe the Java
> MapReduce version leverages the combiner. If you want to try making this
> Algebraic we would be happy to accept a patch :)
>
> -Matt
>
> > On Mar 7, 2015, at 12:11 PM, Ido Hadanny <ido.hada...@gmail.com> wrote:
> >
> > data.fu has a nice implementation of HyperLogLog for estimating
> cardinality
> > here
> > <
> https://github.com/apache/incubator-datafu/blob/master/datafu-pig/src/main/java/datafu/pig/stats/HyperLogLogPlusPlus.java
> >
> >
> > However, it's implemented as Accumulator which means it will run only at
> > the reducer and not in the combiner (but it will never load the entire
> set
> > into memory as in normal EvalFunc). Why couldn't data.fu implement it as
> > Algebraic - and fill the registers at every combiner, then merge and
> reduce
> > the result? Am I missing something here?
> > also available here:
> >
> http://stackoverflow.com/questions/28908217/why-is-data-fu-implementing-hyperloglog-as-an-accumulator-and-not-as-algebraic
> >
> > thanks!
> >
> >
> > --
> > Sent from my androido
>



-- 
Sent from my androido
diff --git a/datafu-pig/src/main/java/datafu/pig/stats/HyperLogLogPlusPlus.java 
b/datafu-pig/src/main/java/datafu/pig/stats/HyperLogLogPlusPlus.java
index 95c5b0e..bb5810f 100644
--- a/datafu-pig/src/main/java/datafu/pig/stats/HyperLogLogPlusPlus.java
+++ b/datafu-pig/src/main/java/datafu/pig/stats/HyperLogLogPlusPlus.java
@@ -20,14 +20,24 @@
 package datafu.pig.stats;
 
 import java.io.IOException;
+import java.util.Iterator;
 
 import org.apache.pig.AccumulatorEvalFunc;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 
+import com.clearspring.analytics.hash.MurmurHash;
+import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
+import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
+
 /**
  * A UDF that applies the HyperLogLog++ cardinality estimation algorithm.
  * 
@@ -45,8 +55,9 @@ import org.apache.pig.impl.logicalLayer.schema.Schema;
 public class HyperLogLogPlusPlus extends AccumulatorEvalFunc<Long>
 {
   private com.clearspring.analytics.stream.cardinality.HyperLogLogPlus 
estimator;
-  
-  private final int p;
+       private static TupleFactory mTupleFactory = TupleFactory.getInstance();
+
+       private static int p;
   
   /**
    * Constructs a HyperLogLog++ estimator.
@@ -61,9 +72,9 @@ public class HyperLogLogPlusPlus extends 
AccumulatorEvalFunc<Long>
    * 
    * @param p precision value
    */
-  public HyperLogLogPlusPlus(String p)
+  public HyperLogLogPlusPlus(String par)
   {
-    this.p = Integer.parseInt(p);
+    p = Integer.parseInt(par);
     cleanup();
   }
   
@@ -111,4 +122,94 @@ public class HyperLogLogPlusPlus extends 
AccumulatorEvalFunc<Long>
       throw new RuntimeException(e);
     }
   }
+  
+       public String getInitial() {
+               return Initial.class.getName();
+       }
+
+       public String getIntermed() {
+               return Intermediate.class.getName();
+       }
+
+       public String getFinal() {
+               return Final.class.getName();
+       }
+
+       static public class Initial extends EvalFunc<Tuple> {
+
+               @Override
+               public Tuple exec(Tuple input) throws IOException {
+                       // Since Initial is guaranteed to be called
+                       // only in the map, it will be called with an
+                       // input of a bag with a single tuple - the
+                       // count should always be 1 if bag is non empty
+                       DataBag bag = (DataBag) input.get(0);
+                       Iterator<Tuple> it = bag.iterator();
+                       if (it.hasNext()) {
+                               Tuple t = (Tuple) it.next();
+                               if (t != null && t.size() > 0 && t.get(0) != 
null) {
+                                       long x = MurmurHash.hash64(t);
+                                       return mTupleFactory.newTuple((Object) 
x);
+                               }
+                       }
+                       return mTupleFactory.newTuple((Object) 
MurmurHash.hash64(null));
+               }
+       }
+
+       static public class Intermediate extends EvalFunc<Tuple> {
+               @Override
+               public Tuple exec(Tuple input) throws IOException {
+                       try {
+                               DataByteArray data = new 
DataByteArray(countDisctinct(input).getBytes());
+                               return mTupleFactory.newTuple(data);
+                       } catch (ExecException ee) {
+                               throw ee;
+                       } catch (Exception e) {
+                               int errCode = 2106;
+                               String msg = "Error while computing count in "
+                                               + 
this.getClass().getSimpleName();
+                               throw new ExecException(msg, errCode, 
PigException.BUG, e);
+                       }
+               }
+       }
+
+       static public class Final extends EvalFunc<Long> {
+               @Override
+               public Long exec(Tuple input) throws IOException {
+                       try {
+                               return countDisctinct(input).cardinality();
+                       } catch (Exception ee) {
+                               int errCode = 2106;
+                               String msg = "Error while computing count in "
+                                               + 
this.getClass().getSimpleName();
+                               throw new ExecException(msg, errCode, 
PigException.BUG, ee);
+                       }
+               }
+       }
+
+       static protected HyperLogLogPlus countDisctinct(Tuple input)
+                       throws NumberFormatException, IOException {
+               HyperLogLogPlus estimator = new HyperLogLogPlus(p);
+               DataBag values = (DataBag) input.get(0);
+               for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
+                       Tuple t = it.next();
+                       Object data = t.get(0);
+                       if (data instanceof Long) {
+                               estimator.offer(data);
+                       } else if (data instanceof DataByteArray) {
+                               DataByteArray bytes = (DataByteArray) data;
+                               HyperLogLogPlus newEstimator;
+                               try {
+                                       newEstimator = 
HyperLogLogPlus.Builder.build(bytes.get());
+                                       estimator = (HyperLogLogPlus) 
estimator.merge(newEstimator);
+                               } catch (IOException e) {
+                                       throw new RuntimeException(e);
+                               } catch (CardinalityMergeException e) {
+                                       throw new RuntimeException(e);
+                               }
+                       }
+               }
+               return estimator;
+       }
+  
 }

Reply via email to