GEODE-2449: changes in response to review.

* Move HyperLogLog back into geode-core.
* Bring back deprecated GeodeRedisServer for backwards compatibilty.


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/f79beb1e
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/f79beb1e
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/f79beb1e

Branch: refs/heads/feature/GEODE-2449
Commit: f79beb1e9d5ec08d25bb02f93eb96468b3253a72
Parents: 5562547
Author: Galen O'Sullivan <gosulli...@pivotal.io>
Authored: Mon Feb 13 10:53:05 2017 -0800
Committer: Galen O'Sullivan <gosulli...@pivotal.io>
Committed: Mon Feb 13 15:04:08 2017 -0800

----------------------------------------------------------------------
 .../internal/DistributionConfig.java            |    4 +-
 .../geode/internal/cache/GemFireCacheImpl.java  |   17 +-
 .../org/apache/geode/internal/hll/Bits.java     |   45 +
 .../internal/hll/CardinalityMergeException.java |   22 +
 .../apache/geode/internal/hll/HyperLogLog.java  |  337 ++++
 .../geode/internal/hll/HyperLogLogPlus.java     | 1864 +++++++++++++++++
 .../org/apache/geode/internal/hll/IBuilder.java |   22 +
 .../apache/geode/internal/hll/ICardinality.java |   74 +
 .../apache/geode/internal/hll/MurmurHash.java   |  214 ++
 .../apache/geode/internal/hll/RegisterSet.java  |  108 +
 .../org/apache/geode/internal/hll/Varint.java   |  238 +++
 .../apache/geode/redis/GeodeRedisService.java   |    6 +
 .../apache/geode/redis/GeodeRedisServer.java    |  189 ++
 .../geode/redis/GeodeRedisServiceImpl.java      |   23 +-
 .../geode/redis/internal/RegionProvider.java    |    2 +-
 .../internal/executor/hll/HllExecutor.java      |    4 -
 .../internal/executor/hll/PFAddExecutor.java    |    4 +-
 .../internal/executor/hll/PFCountExecutor.java  |    4 +-
 .../internal/executor/hll/PFMergeExecutor.java  |    6 +-
 .../redis/internal/executor/hll/Varint.java     |  238 ---
 .../apache/geode/redis/internal/hll/Bits.java   |   45 -
 .../internal/hll/CardinalityMergeException.java |   22 -
 .../geode/redis/internal/hll/HyperLogLog.java   |  336 ----
 .../redis/internal/hll/HyperLogLogPlus.java     | 1866 ------------------
 .../geode/redis/internal/hll/IBuilder.java      |   22 -
 .../geode/redis/internal/hll/ICardinality.java  |   74 -
 .../geode/redis/internal/hll/MurmurHash.java    |  214 --
 .../geode/redis/internal/hll/RegisterSet.java   |  108 -
 .../apache/geode/redis/ConcurrentStartTest.java |  136 +-
 29 files changed, 3224 insertions(+), 3020 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/f79beb1e/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionConfig.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionConfig.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionConfig.java
index c2a395d..84d52df 100644
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionConfig.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionConfig.java
@@ -3126,9 +3126,9 @@ public interface DistributionConfig extends Config, 
LogConfig {
   @ConfigAttributeSetter(name = REDIS_PORT)
   void setRedisPort(int value);
 
-  @ConfigAttribute(type = Integer.class, min = 0, max = 65535)
+  @ConfigAttribute(type = Integer.class, min = -1, max = 65535)
   String REDIS_PORT_NAME = REDIS_PORT;
-  int DEFAULT_REDIS_PORT = 0;
+  int DEFAULT_REDIS_PORT = -1;
 
   /**
    * Returns the value of the {@link 
ConfigurationProperties#REDIS_BIND_ADDRESS} property

http://git-wip-us.apache.org/repos/asf/geode/blob/f79beb1e/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
index 1df20dc..09584b6 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
@@ -1359,17 +1359,12 @@ public class GemFireCacheImpl
   }
 
   private void startRedisServer() {
-    // TODO this needs to be fixed up. Maybe we don't want to leave the 
starting of redis to the
-    // setting of the port.
-    int port = system.getConfig().getRedisPort();
-    if (port != 0) {
-      GeodeRedisService geodeRedisService = 
getService(GeodeRedisService.class);
-      if (geodeRedisService != null) {
-        geodeRedisService.start();
-      } else {
-        throw new GemFireConfigException(
-            "Geode Redis Service could not be started because it was not 
registered as a service");
-      }
+    GeodeRedisService geodeRedisService = getService(GeodeRedisService.class);
+    if (geodeRedisService != null) {
+      geodeRedisService.start();
+    } else {
+      throw new GemFireConfigException(
+          "Geode Redis Service could not be started because it was not 
registered as a service");
     }
   }
 

http://git-wip-us.apache.org/repos/asf/geode/blob/f79beb1e/geode-core/src/main/java/org/apache/geode/internal/hll/Bits.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/hll/Bits.java 
b/geode-core/src/main/java/org/apache/geode/internal/hll/Bits.java
new file mode 100644
index 0000000..595fb57
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/hll/Bits.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright (C) 2011 Clearspring Technologies, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not 
use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.hll;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.IOException;
+
+public class Bits {
+
+  public static int[] getBits(byte[] mBytes) throws IOException {
+    int bitSize = mBytes.length / 4;
+    int[] bits = new int[bitSize];
+    DataInputStream dis = new DataInputStream(new 
ByteArrayInputStream(mBytes));
+    for (int i = 0; i < bitSize; i++) {
+      bits[i] = dis.readInt();
+    }
+    return bits;
+  }
+
+  /**
+   * This method might be better described as "byte array to int array" or 
"data input to int array"
+   */
+  public static int[] getBits(DataInput dataIn, int byteLength) throws 
IOException {
+    int bitSize = byteLength / 4;
+    int[] bits = new int[bitSize];
+    for (int i = 0; i < bitSize; i++) {
+      bits[i] = dataIn.readInt();
+    }
+    return bits;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/f79beb1e/geode-core/src/main/java/org/apache/geode/internal/hll/CardinalityMergeException.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/hll/CardinalityMergeException.java
 
b/geode-core/src/main/java/org/apache/geode/internal/hll/CardinalityMergeException.java
new file mode 100644
index 0000000..c3d1ab5
--- /dev/null
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/hll/CardinalityMergeException.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright (C) 2011 Clearspring Technologies, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not 
use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.hll;
+
+@SuppressWarnings("serial")
+public abstract class CardinalityMergeException extends Exception {
+
+  public CardinalityMergeException(String message) {
+    super(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/f79beb1e/geode-core/src/main/java/org/apache/geode/internal/hll/HyperLogLog.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/hll/HyperLogLog.java 
b/geode-core/src/main/java/org/apache/geode/internal/hll/HyperLogLog.java
new file mode 100644
index 0000000..421de61
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/hll/HyperLogLog.java
@@ -0,0 +1,337 @@
+/*
+ * Copyright (C) 2012 Clearspring Technologies, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not 
use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.hll;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.io.Serializable;
+
+/**
+ * Java implementation of HyperLogLog (HLL) algorithm from this paper:
+ * <p/>
+ * http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf
+ * <p/>
+ * HLL is an improved version of LogLog that is capable of estimating the 
cardinality of a set with
+ * accuracy = 1.04/sqrt(m) where m = 2^b. So we can control accuracy vs space 
usage by increasing or
+ * decreasing b.
+ * <p/>
+ * The main benefit of using HLL over LL is that it only requires 64% of the 
space that LL does to
+ * get the same accuracy.
+ * <p/>
+ * This implementation implements a single counter. If a large (millions) 
number of counters are
+ * required you may want to refer to:
+ * <p/>
+ * http://dsiutils.dsi.unimi.it/
+ * <p/>
+ * It has a more complex implementation of HLL that supports multiple counters 
in a single object,
+ * drastically reducing the java overhead from creating a large number of 
objects.
+ * <p/>
+ * This implementation leveraged a javascript implementation that Yammer has 
been working on:
+ * <p/>
+ * https://github.com/yammer/probablyjs
+ * <p>
+ * Note that this implementation does not include the long range correction 
function defined in the
+ * original paper. Empirical evidence shows that the correction function 
causes more harm than good.
+ * </p>
+ * <p/>
+ * <p>
+ * Users have different motivations to use different types of hashing 
functions. Rather than try to
+ * keep up with all available hash functions and to remove the concern of 
causing future binary
+ * incompatibilities this class allows clients to offer the value in hashed 
int or long form. This
+ * way clients are free to change their hash function on their own time line. 
We recommend using
+ * Google's Guava Murmur3_128 implementation as it provides good performance 
and speed when high
+ * precision is required. In our tests the 32bit MurmurHash function included 
in this project is
+ * faster and produces better results than the 32 bit murmur3 implementation 
google provides.
+ * </p>
+ */
+public class HyperLogLog implements ICardinality, Serializable {
+
+  private static final long serialVersionUID = -4661220245111112301L;
+  private final RegisterSet registerSet;
+  private final int log2m;
+  private final double alphaMM;
+
+  public static final Double DEFAULT_HLL_STD_DEV = 0.081;
+  public static final Integer DEFAULT_HLL_DENSE = 18;
+  public static final Integer DEFAULT_HLL_SPARSE = 32;
+
+  /**
+   * Create a new HyperLogLog instance using the specified standard deviation.
+   *
+   * @param rsd - the relative standard deviation for the counter. smaller 
values create counters
+   *        that require more space.
+   */
+  public HyperLogLog(double rsd) {
+    this(log2m(rsd));
+  }
+
+  private static int log2m(double rsd) {
+    return (int) (Math.log((1.106 / rsd) * (1.106 / rsd)) / Math.log(2));
+  }
+
+  /**
+   * Create a new HyperLogLog instance. The log2m parameter defines the 
accuracy of the counter. The
+   * larger the log2m the better the accuracy.
+   * <p/>
+   * accuracy = 1.04/sqrt(2^log2m)
+   *
+   * @param log2m - the number of bits to use as the basis for the HLL instance
+   */
+  public HyperLogLog(int log2m) {
+    this(log2m, new RegisterSet(1 << log2m));
+  }
+
+  /**
+   * Creates a new HyperLogLog instance using the given registers. Used for 
unmarshalling a
+   * serialized instance and for merging multiple counters together.
+   *
+   * @param registerSet - the initial values for the register set
+   */
+  @Deprecated
+  public HyperLogLog(int log2m, RegisterSet registerSet) {
+    if (log2m < 0 || log2m > 30) {
+      throw new IllegalArgumentException(
+          "log2m argument is " + log2m + " and is outside the range [0, 30]");
+    }
+    this.registerSet = registerSet;
+    this.log2m = log2m;
+    int m = 1 << this.log2m;
+
+    alphaMM = getAlphaMM(log2m, m);
+  }
+
+  @Override
+  public boolean offerHashed(long hashedValue) {
+    // j becomes the binary address determined by the first b log2m of x
+    // j will be between 0 and 2^log2m
+    final int j = (int) (hashedValue >>> (Long.SIZE - log2m));
+    final int r =
+        Long.numberOfLeadingZeros((hashedValue << this.log2m) | (1 << 
(this.log2m - 1)) + 1) + 1;
+    return registerSet.updateIfGreater(j, r);
+  }
+
+  @Override
+  public boolean offerHashed(int hashedValue) {
+    // j becomes the binary address determined by the first b log2m of x
+    // j will be between 0 and 2^log2m
+    final int j = hashedValue >>> (Integer.SIZE - log2m);
+    final int r =
+        Integer.numberOfLeadingZeros((hashedValue << this.log2m) | (1 << 
(this.log2m - 1)) + 1) + 1;
+    return registerSet.updateIfGreater(j, r);
+  }
+
+  @Override
+  public boolean offer(Object o) {
+    final int x = MurmurHash.hash(o);
+    return offerHashed(x);
+  }
+
+
+  @Override
+  public long cardinality() {
+    double registerSum = 0;
+    int count = registerSet.count;
+    double zeros = 0.0;
+    for (int j = 0; j < registerSet.count; j++) {
+      int val = registerSet.get(j);
+      registerSum += 1.0 / (1 << val);
+      if (val == 0) {
+        zeros++;
+      }
+    }
+
+    double estimate = alphaMM * (1 / registerSum);
+
+    if (estimate <= (5.0 / 2.0) * count) {
+      // Small Range Estimate
+      return Math.round(linearCounting(count, zeros));
+    } else {
+      return Math.round(estimate);
+    }
+  }
+
+  @Override
+  public int sizeof() {
+    return registerSet.size * 4;
+  }
+
+  @Override
+  public byte[] getBytes() throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    DataOutput dos = new DataOutputStream(baos);
+    writeBytes(dos);
+
+    return baos.toByteArray();
+  }
+
+  private void writeBytes(DataOutput serializedByteStream) throws IOException {
+    serializedByteStream.writeInt(log2m);
+    serializedByteStream.writeInt(registerSet.size * 4);
+    for (int x : registerSet.readOnlyBits()) {
+      serializedByteStream.writeInt(x);
+    }
+  }
+
+  /**
+   * Add all the elements of the other set to this set.
+   * <p/>
+   * This operation does not imply a loss of precision.
+   *
+   * @param other A compatible Hyperloglog instance (same log2m)
+   * @throws CardinalityMergeException if other is not compatible
+   */
+  public void addAll(HyperLogLog other) throws CardinalityMergeException {
+    if (this.sizeof() != other.sizeof()) {
+      throw new HyperLogLogMergeException("Cannot merge estimators of 
different sizes");
+    }
+
+    registerSet.merge(other.registerSet);
+  }
+
+  @Override
+  public ICardinality merge(ICardinality... estimators) throws 
CardinalityMergeException {
+    HyperLogLog merged = new HyperLogLog(DEFAULT_HLL_STD_DEV);// new 
HyperLogLog(log2m,
+                                                              // new
+                                                              // 
RegisterSet(this.registerSet.count));
+    merged.addAll(this);
+
+    if (estimators == null) {
+      return merged;
+    }
+
+    for (ICardinality estimator : estimators) {
+      if (!(estimator instanceof HyperLogLog)) {
+        throw new HyperLogLogMergeException("Cannot merge estimators of 
different class");
+      }
+      HyperLogLog hll = (HyperLogLog) estimator;
+      merged.addAll(hll);
+    }
+
+    return merged;
+  }
+
+  private Object writeReplace() {
+    return new SerializationHolder(this);
+  }
+
+  /**
+   * This class exists to support Externalizable semantics for HyperLogLog 
objects without having to
+   * expose a public constructor, public write/read methods, or pretend final 
fields aren't final.
+   *
+   * In short, Externalizable allows you to skip some of the more verbose 
meta-data default
+   * Serializable gets you, but still includes the class name. In that sense, 
there is some cost to
+   * this holder object because it has a longer class name. I imagine people 
who care about
+   * optimizing for that have their own work-around for long class names in 
general, or just use a
+   * custom serialization framework. Therefore we make no attempt to optimize 
that here (eg. by
+   * raising this from an inner class and giving it an unhelpful name).
+   */
+  private static class SerializationHolder implements Externalizable {
+
+    HyperLogLog hyperLogLogHolder;
+
+    public SerializationHolder(HyperLogLog hyperLogLogHolder) {
+      this.hyperLogLogHolder = hyperLogLogHolder;
+    }
+
+    /**
+     * required for Externalizable
+     */
+    @SuppressWarnings("unused")
+    public SerializationHolder() {
+
+    }
+
+    @Override
+    public void writeExternal(ObjectOutput out) throws IOException {
+      hyperLogLogHolder.writeBytes(out);
+    }
+
+    @Override
+    public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
+      hyperLogLogHolder = Builder.build(in);
+    }
+
+    private Object readResolve() {
+      return hyperLogLogHolder;
+    }
+  }
+
+  public static class Builder implements IBuilder<ICardinality>, Serializable {
+
+    private static final long serialVersionUID = -979314356097156719L;
+    private double rsd;
+
+    public Builder(double rsd) {
+      this.rsd = rsd;
+    }
+
+    @Override
+    public HyperLogLog build() {
+      return new HyperLogLog(rsd);
+    }
+
+    @Override
+    public int sizeof() {
+      int log2m = log2m(rsd);
+      int k = 1 << log2m;
+      return RegisterSet.getBits(k) * 4;
+    }
+
+    public static HyperLogLog build(byte[] bytes) throws IOException {
+      ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+      return build(new DataInputStream(bais));
+    }
+
+    public static HyperLogLog build(DataInput serializedByteStream) throws 
IOException {
+      int log2m = serializedByteStream.readInt();
+      int byteArraySize = serializedByteStream.readInt();
+      return new HyperLogLog(log2m,
+          new RegisterSet(1 << log2m, Bits.getBits(serializedByteStream, 
byteArraySize)));
+    }
+  }
+
+  @SuppressWarnings("serial")
+  protected static class HyperLogLogMergeException extends 
CardinalityMergeException {
+
+    public HyperLogLogMergeException(String message) {
+      super(message);
+    }
+  }
+
+  protected static double getAlphaMM(final int p, final int m) {
+    // See the paper.
+    switch (p) {
+      case 4:
+        return 0.673 * m * m;
+      case 5:
+        return 0.697 * m * m;
+      case 6:
+        return 0.709 * m * m;
+      default:
+        return (0.7213 / (1 + 1.079 / m)) * m * m;
+    }
+  }
+
+  protected static double linearCounting(int m, double V) {
+    return m * Math.log(m / V);
+  }
+}

Reply via email to