GEODE-2449: changes in response to review. * Move HyperLogLog back into geode-core. * Bring back deprecated GeodeRedisServer for backwards compatibilty.
Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/f79beb1e Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/f79beb1e Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/f79beb1e Branch: refs/heads/feature/GEODE-2449 Commit: f79beb1e9d5ec08d25bb02f93eb96468b3253a72 Parents: 5562547 Author: Galen O'Sullivan <gosulli...@pivotal.io> Authored: Mon Feb 13 10:53:05 2017 -0800 Committer: Galen O'Sullivan <gosulli...@pivotal.io> Committed: Mon Feb 13 15:04:08 2017 -0800 ---------------------------------------------------------------------- .../internal/DistributionConfig.java | 4 +- .../geode/internal/cache/GemFireCacheImpl.java | 17 +- .../org/apache/geode/internal/hll/Bits.java | 45 + .../internal/hll/CardinalityMergeException.java | 22 + .../apache/geode/internal/hll/HyperLogLog.java | 337 ++++ .../geode/internal/hll/HyperLogLogPlus.java | 1864 +++++++++++++++++ .../org/apache/geode/internal/hll/IBuilder.java | 22 + .../apache/geode/internal/hll/ICardinality.java | 74 + .../apache/geode/internal/hll/MurmurHash.java | 214 ++ .../apache/geode/internal/hll/RegisterSet.java | 108 + .../org/apache/geode/internal/hll/Varint.java | 238 +++ .../apache/geode/redis/GeodeRedisService.java | 6 + .../apache/geode/redis/GeodeRedisServer.java | 189 ++ .../geode/redis/GeodeRedisServiceImpl.java | 23 +- .../geode/redis/internal/RegionProvider.java | 2 +- .../internal/executor/hll/HllExecutor.java | 4 - .../internal/executor/hll/PFAddExecutor.java | 4 +- .../internal/executor/hll/PFCountExecutor.java | 4 +- .../internal/executor/hll/PFMergeExecutor.java | 6 +- .../redis/internal/executor/hll/Varint.java | 238 --- .../apache/geode/redis/internal/hll/Bits.java | 45 - .../internal/hll/CardinalityMergeException.java | 22 - .../geode/redis/internal/hll/HyperLogLog.java | 336 ---- .../redis/internal/hll/HyperLogLogPlus.java | 1866 ------------------ .../geode/redis/internal/hll/IBuilder.java | 22 - .../geode/redis/internal/hll/ICardinality.java | 74 - .../geode/redis/internal/hll/MurmurHash.java | 214 -- .../geode/redis/internal/hll/RegisterSet.java | 108 - .../apache/geode/redis/ConcurrentStartTest.java | 136 +- 29 files changed, 3224 insertions(+), 3020 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/f79beb1e/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionConfig.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionConfig.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionConfig.java index c2a395d..84d52df 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionConfig.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionConfig.java @@ -3126,9 +3126,9 @@ public interface DistributionConfig extends Config, LogConfig { @ConfigAttributeSetter(name = REDIS_PORT) void setRedisPort(int value); - @ConfigAttribute(type = Integer.class, min = 0, max = 65535) + @ConfigAttribute(type = Integer.class, min = -1, max = 65535) String REDIS_PORT_NAME = REDIS_PORT; - int DEFAULT_REDIS_PORT = 0; + int DEFAULT_REDIS_PORT = -1; /** * Returns the value of the {@link ConfigurationProperties#REDIS_BIND_ADDRESS} property http://git-wip-us.apache.org/repos/asf/geode/blob/f79beb1e/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java index 1df20dc..09584b6 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java @@ -1359,17 +1359,12 @@ public class GemFireCacheImpl } private void startRedisServer() { - // TODO this needs to be fixed up. Maybe we don't want to leave the starting of redis to the - // setting of the port. - int port = system.getConfig().getRedisPort(); - if (port != 0) { - GeodeRedisService geodeRedisService = getService(GeodeRedisService.class); - if (geodeRedisService != null) { - geodeRedisService.start(); - } else { - throw new GemFireConfigException( - "Geode Redis Service could not be started because it was not registered as a service"); - } + GeodeRedisService geodeRedisService = getService(GeodeRedisService.class); + if (geodeRedisService != null) { + geodeRedisService.start(); + } else { + throw new GemFireConfigException( + "Geode Redis Service could not be started because it was not registered as a service"); } } http://git-wip-us.apache.org/repos/asf/geode/blob/f79beb1e/geode-core/src/main/java/org/apache/geode/internal/hll/Bits.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/hll/Bits.java b/geode-core/src/main/java/org/apache/geode/internal/hll/Bits.java new file mode 100644 index 0000000..595fb57 --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/hll/Bits.java @@ -0,0 +1,45 @@ +/* + * Copyright (C) 2011 Clearspring Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.hll; + +import java.io.ByteArrayInputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.IOException; + +public class Bits { + + public static int[] getBits(byte[] mBytes) throws IOException { + int bitSize = mBytes.length / 4; + int[] bits = new int[bitSize]; + DataInputStream dis = new DataInputStream(new ByteArrayInputStream(mBytes)); + for (int i = 0; i < bitSize; i++) { + bits[i] = dis.readInt(); + } + return bits; + } + + /** + * This method might be better described as "byte array to int array" or "data input to int array" + */ + public static int[] getBits(DataInput dataIn, int byteLength) throws IOException { + int bitSize = byteLength / 4; + int[] bits = new int[bitSize]; + for (int i = 0; i < bitSize; i++) { + bits[i] = dataIn.readInt(); + } + return bits; + } + +} http://git-wip-us.apache.org/repos/asf/geode/blob/f79beb1e/geode-core/src/main/java/org/apache/geode/internal/hll/CardinalityMergeException.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/hll/CardinalityMergeException.java b/geode-core/src/main/java/org/apache/geode/internal/hll/CardinalityMergeException.java new file mode 100644 index 0000000..c3d1ab5 --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/hll/CardinalityMergeException.java @@ -0,0 +1,22 @@ +/* + * Copyright (C) 2011 Clearspring Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.hll; + +@SuppressWarnings("serial") +public abstract class CardinalityMergeException extends Exception { + + public CardinalityMergeException(String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/geode/blob/f79beb1e/geode-core/src/main/java/org/apache/geode/internal/hll/HyperLogLog.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/hll/HyperLogLog.java b/geode-core/src/main/java/org/apache/geode/internal/hll/HyperLogLog.java new file mode 100644 index 0000000..421de61 --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/hll/HyperLogLog.java @@ -0,0 +1,337 @@ +/* + * Copyright (C) 2012 Clearspring Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.hll; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.io.Serializable; + +/** + * Java implementation of HyperLogLog (HLL) algorithm from this paper: + * <p/> + * http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf + * <p/> + * HLL is an improved version of LogLog that is capable of estimating the cardinality of a set with + * accuracy = 1.04/sqrt(m) where m = 2^b. So we can control accuracy vs space usage by increasing or + * decreasing b. + * <p/> + * The main benefit of using HLL over LL is that it only requires 64% of the space that LL does to + * get the same accuracy. + * <p/> + * This implementation implements a single counter. If a large (millions) number of counters are + * required you may want to refer to: + * <p/> + * http://dsiutils.dsi.unimi.it/ + * <p/> + * It has a more complex implementation of HLL that supports multiple counters in a single object, + * drastically reducing the java overhead from creating a large number of objects. + * <p/> + * This implementation leveraged a javascript implementation that Yammer has been working on: + * <p/> + * https://github.com/yammer/probablyjs + * <p> + * Note that this implementation does not include the long range correction function defined in the + * original paper. Empirical evidence shows that the correction function causes more harm than good. + * </p> + * <p/> + * <p> + * Users have different motivations to use different types of hashing functions. Rather than try to + * keep up with all available hash functions and to remove the concern of causing future binary + * incompatibilities this class allows clients to offer the value in hashed int or long form. This + * way clients are free to change their hash function on their own time line. We recommend using + * Google's Guava Murmur3_128 implementation as it provides good performance and speed when high + * precision is required. In our tests the 32bit MurmurHash function included in this project is + * faster and produces better results than the 32 bit murmur3 implementation google provides. + * </p> + */ +public class HyperLogLog implements ICardinality, Serializable { + + private static final long serialVersionUID = -4661220245111112301L; + private final RegisterSet registerSet; + private final int log2m; + private final double alphaMM; + + public static final Double DEFAULT_HLL_STD_DEV = 0.081; + public static final Integer DEFAULT_HLL_DENSE = 18; + public static final Integer DEFAULT_HLL_SPARSE = 32; + + /** + * Create a new HyperLogLog instance using the specified standard deviation. + * + * @param rsd - the relative standard deviation for the counter. smaller values create counters + * that require more space. + */ + public HyperLogLog(double rsd) { + this(log2m(rsd)); + } + + private static int log2m(double rsd) { + return (int) (Math.log((1.106 / rsd) * (1.106 / rsd)) / Math.log(2)); + } + + /** + * Create a new HyperLogLog instance. The log2m parameter defines the accuracy of the counter. The + * larger the log2m the better the accuracy. + * <p/> + * accuracy = 1.04/sqrt(2^log2m) + * + * @param log2m - the number of bits to use as the basis for the HLL instance + */ + public HyperLogLog(int log2m) { + this(log2m, new RegisterSet(1 << log2m)); + } + + /** + * Creates a new HyperLogLog instance using the given registers. Used for unmarshalling a + * serialized instance and for merging multiple counters together. + * + * @param registerSet - the initial values for the register set + */ + @Deprecated + public HyperLogLog(int log2m, RegisterSet registerSet) { + if (log2m < 0 || log2m > 30) { + throw new IllegalArgumentException( + "log2m argument is " + log2m + " and is outside the range [0, 30]"); + } + this.registerSet = registerSet; + this.log2m = log2m; + int m = 1 << this.log2m; + + alphaMM = getAlphaMM(log2m, m); + } + + @Override + public boolean offerHashed(long hashedValue) { + // j becomes the binary address determined by the first b log2m of x + // j will be between 0 and 2^log2m + final int j = (int) (hashedValue >>> (Long.SIZE - log2m)); + final int r = + Long.numberOfLeadingZeros((hashedValue << this.log2m) | (1 << (this.log2m - 1)) + 1) + 1; + return registerSet.updateIfGreater(j, r); + } + + @Override + public boolean offerHashed(int hashedValue) { + // j becomes the binary address determined by the first b log2m of x + // j will be between 0 and 2^log2m + final int j = hashedValue >>> (Integer.SIZE - log2m); + final int r = + Integer.numberOfLeadingZeros((hashedValue << this.log2m) | (1 << (this.log2m - 1)) + 1) + 1; + return registerSet.updateIfGreater(j, r); + } + + @Override + public boolean offer(Object o) { + final int x = MurmurHash.hash(o); + return offerHashed(x); + } + + + @Override + public long cardinality() { + double registerSum = 0; + int count = registerSet.count; + double zeros = 0.0; + for (int j = 0; j < registerSet.count; j++) { + int val = registerSet.get(j); + registerSum += 1.0 / (1 << val); + if (val == 0) { + zeros++; + } + } + + double estimate = alphaMM * (1 / registerSum); + + if (estimate <= (5.0 / 2.0) * count) { + // Small Range Estimate + return Math.round(linearCounting(count, zeros)); + } else { + return Math.round(estimate); + } + } + + @Override + public int sizeof() { + return registerSet.size * 4; + } + + @Override + public byte[] getBytes() throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutput dos = new DataOutputStream(baos); + writeBytes(dos); + + return baos.toByteArray(); + } + + private void writeBytes(DataOutput serializedByteStream) throws IOException { + serializedByteStream.writeInt(log2m); + serializedByteStream.writeInt(registerSet.size * 4); + for (int x : registerSet.readOnlyBits()) { + serializedByteStream.writeInt(x); + } + } + + /** + * Add all the elements of the other set to this set. + * <p/> + * This operation does not imply a loss of precision. + * + * @param other A compatible Hyperloglog instance (same log2m) + * @throws CardinalityMergeException if other is not compatible + */ + public void addAll(HyperLogLog other) throws CardinalityMergeException { + if (this.sizeof() != other.sizeof()) { + throw new HyperLogLogMergeException("Cannot merge estimators of different sizes"); + } + + registerSet.merge(other.registerSet); + } + + @Override + public ICardinality merge(ICardinality... estimators) throws CardinalityMergeException { + HyperLogLog merged = new HyperLogLog(DEFAULT_HLL_STD_DEV);// new HyperLogLog(log2m, + // new + // RegisterSet(this.registerSet.count)); + merged.addAll(this); + + if (estimators == null) { + return merged; + } + + for (ICardinality estimator : estimators) { + if (!(estimator instanceof HyperLogLog)) { + throw new HyperLogLogMergeException("Cannot merge estimators of different class"); + } + HyperLogLog hll = (HyperLogLog) estimator; + merged.addAll(hll); + } + + return merged; + } + + private Object writeReplace() { + return new SerializationHolder(this); + } + + /** + * This class exists to support Externalizable semantics for HyperLogLog objects without having to + * expose a public constructor, public write/read methods, or pretend final fields aren't final. + * + * In short, Externalizable allows you to skip some of the more verbose meta-data default + * Serializable gets you, but still includes the class name. In that sense, there is some cost to + * this holder object because it has a longer class name. I imagine people who care about + * optimizing for that have their own work-around for long class names in general, or just use a + * custom serialization framework. Therefore we make no attempt to optimize that here (eg. by + * raising this from an inner class and giving it an unhelpful name). + */ + private static class SerializationHolder implements Externalizable { + + HyperLogLog hyperLogLogHolder; + + public SerializationHolder(HyperLogLog hyperLogLogHolder) { + this.hyperLogLogHolder = hyperLogLogHolder; + } + + /** + * required for Externalizable + */ + @SuppressWarnings("unused") + public SerializationHolder() { + + } + + @Override + public void writeExternal(ObjectOutput out) throws IOException { + hyperLogLogHolder.writeBytes(out); + } + + @Override + public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + hyperLogLogHolder = Builder.build(in); + } + + private Object readResolve() { + return hyperLogLogHolder; + } + } + + public static class Builder implements IBuilder<ICardinality>, Serializable { + + private static final long serialVersionUID = -979314356097156719L; + private double rsd; + + public Builder(double rsd) { + this.rsd = rsd; + } + + @Override + public HyperLogLog build() { + return new HyperLogLog(rsd); + } + + @Override + public int sizeof() { + int log2m = log2m(rsd); + int k = 1 << log2m; + return RegisterSet.getBits(k) * 4; + } + + public static HyperLogLog build(byte[] bytes) throws IOException { + ByteArrayInputStream bais = new ByteArrayInputStream(bytes); + return build(new DataInputStream(bais)); + } + + public static HyperLogLog build(DataInput serializedByteStream) throws IOException { + int log2m = serializedByteStream.readInt(); + int byteArraySize = serializedByteStream.readInt(); + return new HyperLogLog(log2m, + new RegisterSet(1 << log2m, Bits.getBits(serializedByteStream, byteArraySize))); + } + } + + @SuppressWarnings("serial") + protected static class HyperLogLogMergeException extends CardinalityMergeException { + + public HyperLogLogMergeException(String message) { + super(message); + } + } + + protected static double getAlphaMM(final int p, final int m) { + // See the paper. + switch (p) { + case 4: + return 0.673 * m * m; + case 5: + return 0.697 * m * m; + case 6: + return 0.709 * m * m; + default: + return (0.7213 / (1 + 1.079 / m)) * m * m; + } + } + + protected static double linearCounting(int m, double V) { + return m * Math.log(m / V); + } +}