http://git-wip-us.apache.org/repos/asf/geode/blob/f79beb1e/geode-core/src/main/java/org/apache/geode/internal/hll/IBuilder.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/hll/IBuilder.java b/geode-core/src/main/java/org/apache/geode/internal/hll/IBuilder.java new file mode 100644 index 0000000..10189c8 --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/hll/IBuilder.java @@ -0,0 +1,22 @@ +/* + * Copyright (C) 2011 Clearspring Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.hll; + + +public interface IBuilder<T> { + + T build(); + + int sizeof(); +}
http://git-wip-us.apache.org/repos/asf/geode/blob/f79beb1e/geode-core/src/main/java/org/apache/geode/internal/hll/ICardinality.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/hll/ICardinality.java b/geode-core/src/main/java/org/apache/geode/internal/hll/ICardinality.java new file mode 100644 index 0000000..125b621 --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/hll/ICardinality.java @@ -0,0 +1,74 @@ +/* + * Copyright (C) 2011 Clearspring Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.hll; + + +import java.io.IOException; + + +public interface ICardinality { + + /** + * @param o stream element + * @return false if the value returned by cardinality() is unaffected by the appearance of o in + * the stream. + */ + boolean offer(Object o); + + /** + * Offer the value as a hashed long value + * + * @param hashedLong - the hash of the item to offer to the estimator + * @return false if the value returned by cardinality() is unaffected by the appearance of + * hashedLong in the stream + */ + boolean offerHashed(long hashedLong); + + /** + * Offer the value as a hashed long value + * + * @param hashedInt - the hash of the item to offer to the estimator + * @return false if the value returned by cardinality() is unaffected by the appearance of + * hashedInt in the stream + */ + boolean offerHashed(int hashedInt); + + /** + * @return the number of unique elements in the stream or an estimate thereof + */ + long cardinality(); + + /** + * @return size in bytes needed for serialization + */ + int sizeof(); + + /** + * @return byte[] + * @throws IOException + */ + byte[] getBytes() throws IOException; + + /** + * Merges estimators to produce a new estimator for the combined streams of this estimator and + * those passed as arguments. + * <p/> + * Nor this estimator nor the one passed as parameters are modified. + * + * @param estimators Zero or more compatible estimators + * @throws CardinalityMergeException If at least one of the estimators is not compatible with this + * one + */ + ICardinality merge(ICardinality... estimators) throws CardinalityMergeException; +} http://git-wip-us.apache.org/repos/asf/geode/blob/f79beb1e/geode-core/src/main/java/org/apache/geode/internal/hll/MurmurHash.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/hll/MurmurHash.java b/geode-core/src/main/java/org/apache/geode/internal/hll/MurmurHash.java new file mode 100644 index 0000000..be19e29 --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/hll/MurmurHash.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.hll; + + +/** + * This is a very fast, non-cryptographic hash suitable for general hash-based lookup. See + * http://murmurhash.googlepages.com/ for more details. + * <p/> + * <p> + * The C version of MurmurHash 2.0 found at that site was ported to Java by Andrzej Bialecki (ab at + * getopt org). + * </p> + */ +public class MurmurHash { + public static int hash(Object o) { + if (o == null) { + return 0; + } + if (o instanceof Long) { + return hashLong((Long) o); + } + if (o instanceof Integer) { + return hashLong((Integer) o); + } + if (o instanceof Double) { + return hashLong(Double.doubleToRawLongBits((Double) o)); + } + if (o instanceof Float) { + return hashLong(Float.floatToRawIntBits((Float) o)); + } + if (o instanceof String) { + return hash(((String) o).getBytes()); + } + if (o instanceof byte[]) { + return hash((byte[]) o); + } + return hash(o.toString()); + } + + public static int hash(byte[] data) { + return hash(data, 0, data.length, -1); + } + + public static int hash(byte[] data, int seed) { + return hash(data, 0, data.length, seed); + } + + public static int hash(byte[] data, int offset, int length, int seed) { + int m = 0x5bd1e995; + int r = 24; + + int h = seed ^ length; + + int len_4 = length >> 2; + + for (int i = 0; i < len_4; i++) { + int i_4 = i << 2; + int k = data[offset + i_4 + 3]; + k = k << 8; + k = k | (data[offset + i_4 + 2] & 0xff); + k = k << 8; + k = k | (data[offset + i_4 + 1] & 0xff); + k = k << 8; + k = k | (data[offset + i_4 + 0] & 0xff); + k *= m; + k ^= k >>> r; + k *= m; + h *= m; + h ^= k; + } + + // avoid calculating modulo + int len_m = len_4 << 2; + int left = length - len_m; + + if (left != 0) { + if (left >= 3) { + h ^= (int) data[offset + length - 3] << 16; + } + if (left >= 2) { + h ^= (int) data[offset + length - 2] << 8; + } + if (left >= 1) { + h ^= (int) data[offset + length - 1]; + } + + h *= m; + } + + h ^= h >>> 13; + h *= m; + h ^= h >>> 15; + + return h; + } + + public static int hashLong(long data) { + int m = 0x5bd1e995; + int r = 24; + + int h = 0; + + int k = (int) data * m; + k ^= k >>> r; + h ^= k * m; + + k = (int) (data >> 32) * m; + k ^= k >>> r; + h *= m; + h ^= k * m; + + h ^= h >>> 13; + h *= m; + h ^= h >>> 15; + + return h; + } + + public static long hash64(Object o) { + if (o == null) { + return 0l; + } else if (o instanceof String) { + final byte[] bytes = ((String) o).getBytes(); + return hash64(bytes, bytes.length); + } else if (o instanceof byte[]) { + final byte[] bytes = (byte[]) o; + return hash64(bytes, bytes.length); + } + return hash64(o.toString()); + } + + // 64 bit implementation copied from here: https://github.com/tnm/murmurhash-java + + /** + * Generates 64 bit hash from byte array with default seed value. + * + * @param data byte array to hash + * @param length length of the array to hash + * @return 64 bit hash of the given string + */ + public static long hash64(final byte[] data, int length) { + return hash64(data, length, 0xe17a1465); + } + + + /** + * Generates 64 bit hash from byte array of the given length and seed. + * + * @param data byte array to hash + * @param length length of the array to hash + * @param seed initial seed value + * @return 64 bit hash of the given array + */ + public static long hash64(final byte[] data, int length, int seed) { + final long m = 0xc6a4a7935bd1e995L; + final int r = 47; + + long h = (seed & 0xffffffffl) ^ (length * m); + + int length8 = length / 8; + + for (int i = 0; i < length8; i++) { + final int i8 = i * 8; + long k = ((long) data[i8 + 0] & 0xff) + (((long) data[i8 + 1] & 0xff) << 8) + + (((long) data[i8 + 2] & 0xff) << 16) + (((long) data[i8 + 3] & 0xff) << 24) + + (((long) data[i8 + 4] & 0xff) << 32) + (((long) data[i8 + 5] & 0xff) << 40) + + (((long) data[i8 + 6] & 0xff) << 48) + (((long) data[i8 + 7] & 0xff) << 56); + + k *= m; + k ^= k >>> r; + k *= m; + + h ^= k; + h *= m; + } + + switch (length % 8) { + case 7: + h ^= (long) (data[(length & ~7) + 6] & 0xff) << 48; + case 6: + h ^= (long) (data[(length & ~7) + 5] & 0xff) << 40; + case 5: + h ^= (long) (data[(length & ~7) + 4] & 0xff) << 32; + case 4: + h ^= (long) (data[(length & ~7) + 3] & 0xff) << 24; + case 3: + h ^= (long) (data[(length & ~7) + 2] & 0xff) << 16; + case 2: + h ^= (long) (data[(length & ~7) + 1] & 0xff) << 8; + case 1: + h ^= (long) (data[length & ~7] & 0xff); + h *= m; + }; + + h ^= h >>> r; + h *= m; + h ^= h >>> r; + + return h; + } +} http://git-wip-us.apache.org/repos/asf/geode/blob/f79beb1e/geode-core/src/main/java/org/apache/geode/internal/hll/RegisterSet.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/hll/RegisterSet.java b/geode-core/src/main/java/org/apache/geode/internal/hll/RegisterSet.java new file mode 100644 index 0000000..cad691b --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/hll/RegisterSet.java @@ -0,0 +1,108 @@ +/* + * Copyright (C) 2012 Clearspring Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.hll; + + +public class RegisterSet { + + public final static int LOG2_BITS_PER_WORD = 6; + public final static int REGISTER_SIZE = 5; + + public final int count; + public final int size; + + private final int[] M; + + public RegisterSet(int count) { + this(count, null); + } + + public RegisterSet(int count, int[] initialValues) { + this.count = count; + + if (initialValues == null) { + this.M = new int[getSizeForCount(count)]; + } else { + this.M = initialValues; + } + this.size = this.M.length; + } + + public static int getBits(int count) { + return count / LOG2_BITS_PER_WORD; + } + + public static int getSizeForCount(int count) { + int bits = getBits(count); + if (bits == 0) { + return 1; + } else if (bits % Integer.SIZE == 0) { + return bits; + } else { + return bits + 1; + } + } + + public void set(int position, int value) { + int bucketPos = position / LOG2_BITS_PER_WORD; + int shift = REGISTER_SIZE * (position - (bucketPos * LOG2_BITS_PER_WORD)); + this.M[bucketPos] = (this.M[bucketPos] & ~(0x1f << shift)) | (value << shift); + } + + public int get(int position) { + int bucketPos = position / LOG2_BITS_PER_WORD; + int shift = REGISTER_SIZE * (position - (bucketPos * LOG2_BITS_PER_WORD)); + return (this.M[bucketPos] & (0x1f << shift)) >>> shift; + } + + public boolean updateIfGreater(int position, int value) { + int bucket = position / LOG2_BITS_PER_WORD; + int shift = REGISTER_SIZE * (position - (bucket * LOG2_BITS_PER_WORD)); + int mask = 0x1f << shift; + + // Use long to avoid sign issues with the left-most shift + long curVal = this.M[bucket] & mask; + long newVal = value << shift; + if (curVal < newVal) { + this.M[bucket] = (int) ((this.M[bucket] & ~mask) | newVal); + return true; + } else { + return false; + } + } + + public void merge(RegisterSet that) { + for (int bucket = 0; bucket < M.length; bucket++) { + int word = 0; + for (int j = 0; j < LOG2_BITS_PER_WORD; j++) { + int mask = 0x1f << (REGISTER_SIZE * j); + + int thisVal = (this.M[bucket] & mask); + int thatVal = (that.M[bucket] & mask); + word |= (thisVal < thatVal) ? thatVal : thisVal; + } + this.M[bucket] = word; + } + } + + int[] readOnlyBits() { + return M; + } + + public int[] bits() { + int[] copy = new int[size]; + System.arraycopy(M, 0, copy, 0, M.length); + return copy; + } +} http://git-wip-us.apache.org/repos/asf/geode/blob/f79beb1e/geode-core/src/main/java/org/apache/geode/internal/hll/Varint.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/hll/Varint.java b/geode-core/src/main/java/org/apache/geode/internal/hll/Varint.java new file mode 100644 index 0000000..8e0bdea --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/hll/Varint.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.hll; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + + +/** + * <p> + * Encodes signed and unsigned values using a common variable-length scheme, found for example in + * <a href="http://code.google.com/apis/protocolbuffers/docs/encoding.html"> Google's Protocol + * Buffers</a>. It uses fewer bytes to encode smaller values, but will use slightly more bytes to + * encode large values. + * </p> + * <p/> + * <p> + * Signed values are further encoded using so-called zig-zag encoding in order to make them + * "compatible" with variable-length encoding. + * </p> + */ +public final class Varint { + + private Varint() {} + + /** + * Encodes a value using the variable-length encoding from + * <a href="http://code.google.com/apis/protocolbuffers/docs/encoding.html"> Google Protocol + * Buffers</a>. It uses zig-zag encoding to efficiently encode signed values. If values are known + * to be nonnegative, {@link #writeUnsignedVarLong(long, DataOutput)} should be used. + * + * @param value value to encode + * @param out to write bytes to + * @throws IOException if {@link DataOutput} throws {@link IOException} + */ + public static void writeSignedVarLong(long value, DataOutput out) throws IOException { + // Great trick from http://code.google.com/apis/protocolbuffers/docs/encoding.html#types + writeUnsignedVarLong((value << 1) ^ (value >> 63), out); + } + + /** + * Encodes a value using the variable-length encoding from + * <a href="http://code.google.com/apis/protocolbuffers/docs/encoding.html"> Google Protocol + * Buffers</a>. Zig-zag is not used, so input must not be negative. If values can be negative, use + * {@link #writeSignedVarLong(long, DataOutput)} instead. This method treats negative input as + * like a large unsigned value. + * + * @param value value to encode + * @param out to write bytes to + * @throws IOException if {@link DataOutput} throws {@link IOException} + */ + public static void writeUnsignedVarLong(long value, DataOutput out) throws IOException { + while ((value & 0xFFFFFFFFFFFFFF80L) != 0L) { + out.writeByte(((int) value & 0x7F) | 0x80); + value >>>= 7; + } + out.writeByte((int) value & 0x7F); + } + + /** + * @see #writeSignedVarLong(long, DataOutput) + */ + public static void writeSignedVarInt(int value, DataOutput out) throws IOException { + // Great trick from http://code.google.com/apis/protocolbuffers/docs/encoding.html#types + writeUnsignedVarInt((value << 1) ^ (value >> 31), out); + } + + /** + * @see #writeUnsignedVarLong(long, DataOutput) + */ + public static void writeUnsignedVarInt(int value, DataOutput out) throws IOException { + while ((value & 0xFFFFFF80) != 0L) { + out.writeByte((value & 0x7F) | 0x80); + value >>>= 7; + } + out.writeByte(value & 0x7F); + } + + public static byte[] writeSignedVarInt(int value) { + // Great trick from http://code.google.com/apis/protocolbuffers/docs/encoding.html#types + return writeUnsignedVarInt((value << 1) ^ (value >> 31)); + } + + /** + * @see #writeUnsignedVarLong(long, DataOutput) + * <p/> + * This one does not use streams and is much faster. Makes a single object each time, and + * that object is a primitive array. + */ + public static byte[] writeUnsignedVarInt(int value) { + byte[] byteArrayList = new byte[10]; + int i = 0; + while ((value & 0xFFFFFF80) != 0L) { + byteArrayList[i++] = ((byte) ((value & 0x7F) | 0x80)); + value >>>= 7; + } + byteArrayList[i] = ((byte) (value & 0x7F)); + byte[] out = new byte[i + 1]; + for (; i >= 0; i--) { + out[i] = byteArrayList[i]; + } + return out; + } + + /** + * @param in to read bytes from + * @return decode value + * @throws IOException if {@link DataInput} throws {@link IOException} + * @throws IllegalArgumentException if variable-length value does not terminate after 9 bytes have + * been read + * @see #writeSignedVarLong(long, DataOutput) + */ + public static long readSignedVarLong(DataInput in) throws IOException { + long raw = readUnsignedVarLong(in); + // This undoes the trick in writeSignedVarLong() + long temp = (((raw << 63) >> 63) ^ raw) >> 1; + // This extra step lets us deal with the largest signed values by treating + // negative results from read unsigned methods as like unsigned values + // Must re-flip the top bit if the original read value had it set. + return temp ^ (raw & (1L << 63)); + } + + /** + * @param in to read bytes from + * @return decode value + * @throws IOException if {@link DataInput} throws {@link IOException} + * @throws IllegalArgumentException if variable-length value does not terminate after 9 bytes have + * been read + * @see #writeUnsignedVarLong(long, DataOutput) + */ + public static long readUnsignedVarLong(DataInput in) throws IOException { + long value = 0L; + int i = 0; + long b; + while (((b = in.readByte()) & 0x80L) != 0) { + value |= (b & 0x7F) << i; + i += 7; + if (i > 63) { + throw new IllegalArgumentException("Variable length quantity is too long"); + } + } + return value | (b << i); + } + + /** + * @throws IllegalArgumentException if variable-length value does not terminate after 5 bytes have + * been read + * @throws IOException if {@link DataInput} throws {@link IOException} + * @see #readSignedVarLong(DataInput) + */ + public static int readSignedVarInt(DataInput in) throws IOException { + int raw = readUnsignedVarInt(in); + // This undoes the trick in writeSignedVarInt() + int temp = (((raw << 31) >> 31) ^ raw) >> 1; + // This extra step lets us deal with the largest signed values by treating + // negative results from read unsigned methods as like unsigned values. + // Must re-flip the top bit if the original read value had it set. + return temp ^ (raw & (1 << 31)); + } + + /** + * @throws IllegalArgumentException if variable-length value does not terminate after 5 bytes have + * been read + * @throws IOException if {@link DataInput} throws {@link IOException} + * @see #readUnsignedVarLong(DataInput) + */ + public static int readUnsignedVarInt(DataInput in) throws IOException { + int value = 0; + int i = 0; + int b; + while (((b = in.readByte()) & 0x80) != 0) { + value |= (b & 0x7F) << i; + i += 7; + if (i > 35) { + throw new IllegalArgumentException("Variable length quantity is too long"); + } + } + return value | (b << i); + } + + public static int readSignedVarInt(byte[] bytes) { + int raw = readUnsignedVarInt(bytes); + // This undoes the trick in writeSignedVarInt() + int temp = (((raw << 31) >> 31) ^ raw) >> 1; + // This extra step lets us deal with the largest signed values by treating + // negative results from read unsigned methods as like unsigned values. + // Must re-flip the top bit if the original read value had it set. + return temp ^ (raw & (1 << 31)); + } + + public static int readUnsignedVarInt(byte[] bytes) { + int value = 0; + int i = 0; + byte rb = Byte.MIN_VALUE; + for (byte b : bytes) { + rb = b; + if ((b & 0x80) == 0) { + break; + } + value |= (b & 0x7f) << i; + i += 7; + if (i > 35) { + throw new IllegalArgumentException("Variable length quantity is too long"); + } + } + return value | (rb << i); + } + +} http://git-wip-us.apache.org/repos/asf/geode/blob/f79beb1e/geode-core/src/main/java/org/apache/geode/redis/GeodeRedisService.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/redis/GeodeRedisService.java b/geode-core/src/main/java/org/apache/geode/redis/GeodeRedisService.java index e625bb9..0b32fe7 100644 --- a/geode-core/src/main/java/org/apache/geode/redis/GeodeRedisService.java +++ b/geode-core/src/main/java/org/apache/geode/redis/GeodeRedisService.java @@ -20,8 +20,14 @@ import org.apache.geode.internal.cache.CacheService; * Created by ukohlmeyer on 2/9/17. */ public interface GeodeRedisService extends CacheService { + /** + * The default Redis port as specified by their protocol, {@value #DEFAULT_REDIS_SERVER_PORT} + */ + int DEFAULT_REDIS_SERVER_PORT = 6379; void start(); + boolean isRunning(); + void stop(); } http://git-wip-us.apache.org/repos/asf/geode/blob/f79beb1e/geode-redis/src/main/java/org/apache/geode/redis/GeodeRedisServer.java ---------------------------------------------------------------------- diff --git a/geode-redis/src/main/java/org/apache/geode/redis/GeodeRedisServer.java b/geode-redis/src/main/java/org/apache/geode/redis/GeodeRedisServer.java new file mode 100644 index 0000000..0ed668e --- /dev/null +++ b/geode-redis/src/main/java/org/apache/geode/redis/GeodeRedisServer.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.redis; + +import org.apache.geode.GemFireCacheException; +import org.apache.geode.GemFireException; +import org.apache.geode.cache.Cache; +import org.apache.geode.cache.CacheException; +import org.apache.geode.cache.CacheExistsException; +import org.apache.geode.cache.CacheFactory; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionShortcut; +import org.apache.geode.distributed.ConfigurationProperties; +import org.apache.geode.internal.cache.GemFireCacheImpl; + +import java.util.Properties; + +/** + * The GeodeRedisServer is a server that understands the Redis protocol. As commands are sent to the + * server, each command is picked up by a thread, interpreted and then executed and a response is + * sent back to the client. The default connection port is 6379 but that can be altered when run + * through GFSH or started through the provided static main class. + * <p> + * Each Redis data type instance is stored in a separate {@link Region} except for the Strings and + * HyperLogLogs which are collectively stored in one Region respectively. That Region along with a + * meta data region used internally are protected so the client may not store keys with the name + * {@link GeodeRedisServiceImpl#REDIS_META_DATA_REGION} or + * {@link GeodeRedisServiceImpl#STRING_REGION}. The default Region type is + * {@link RegionShortcut#PARTITION} although this can be changed by specifying the SystemProperty + * {@value GeodeRedisServiceImpl#DEFAULT_REGION_SYS_PROP_NAME} to a type defined by + * {@link RegionShortcut}. If the {@link GeodeRedisServiceImpl#NUM_THREADS_SYS_PROP_NAME} system + * property is set to 0, one thread per client will be created. Otherwise a worker thread pool of + * specified size is used or a default size of 4 * {@link Runtime#availableProcessors()} if the + * property is not set. + * <p> + * Setting the AUTH password requires setting the property "redis-password" just as "redis-port" + * would be in xml or through GFSH. + * <p> + * The supported commands are as follows: + * <p> + * Supported String commands - APPEND, BITCOUNT, BITOP, BITPOS, DECR, DECRBY, GET, GETBIT, GETRANGE, + * GETSET, INCR, INCRBY, INCRBYFLOAT, MGET, MSET, MSETNX, PSETEX, SET, SETBIT, SETEX, SETNX, STRLEN + * <p> + * Supported List commands - LINDEX, LLEN, LPOP, LPUSH, LPUSHX, LRANGE, LREM, LSET, LTRIM, RPOP, + * RPUSH, RPUSHX + * <p> + * Supported Hash commands - HDEL, HEXISTS, HGET, HGETALL, HINCRBY, HINCRBYFLOAT, HKEYS, HMGET, + * HMSET, HSETNX, HLEN, HSCAN, HSET, HVALS + * <p> + * Supported Set commands - SADD, SCARD, SDIFF, SDIFFSTORE, SINTER, SINTERSTORE, SISMEMBER, + * SMEMBERS, SMOVE, SREM, SPOP, SRANDMEMBER, SCAN, SUNION, SUNIONSTORE + * <p> + * Supported SortedSet commands - ZADD, ZCARD, ZCOUNT, ZINCRBY, ZLEXCOUNT, ZRANGE, ZRANGEBYLEX, + * ZRANGEBYSCORE, ZRANK, ZREM, ZREMRANGEBYLEX, ZREMRANGEBYRANK, ZREMRANGEBYSCORE, ZREVRANGE, + * ZREVRANGEBYSCORE, ZREVRANK, ZSCAN, ZSCORE + * <p> + * Supported HyperLogLog commands - PFADD, PFCOUNT, PFMERGE + * <p> + * Supported Keys commands - DEL, DBSIZE, EXISTS, EXPIRE, EXPIREAT, FLUSHDB, FLUSHALL, KEYS, + * PERSIST, PEXPIRE, PEXPIREAT, PTTL, SCAN, TTL + * <p> + * Supported Transaction commands - DISCARD, EXEC, MULTI + * <P> + * Supported Server commands - AUTH, ECHO, PING, TIME, QUIT + * <p> + * <p> + * The command executors are not explicitly documented but the functionality can be found at + * <a href="http://redis.io/commands">Redis Commands</a> + * <p> + * Exceptions to the Redis Commands Documents: + * <p> + * <ul> + * <li>Any command that removes keys and returns a count of removed entries will not return a total + * remove count but rather a count of how many entries have been removed that existed on the local + * vm, though all entries will be removed</li> + * <li>Any command that returns a count of newly set members has an unspecified return value. The + * command will work just as the Redis protocol states but the count will not necessary reflect the + * number set compared to overridden.</li> + * <li>Transactions work just as they would on a Redis instance, they are local transaction. + * Transactions cannot be executed on data that is not local to the executing server, that is on a + * partitioned region in a different server instance or on a persistent region that does not have + * transactions enabled. Also, you cannot watch or unwatch keys as all keys within a GemFire + * transaction are watched by default.</li> + * </ul> + * + * @deprecated as of Geode 1.2.0 + */ +public class GeodeRedisServer { + private Properties properties; + private boolean started = false; + private GeodeRedisService redisService; + + /** + * Constructor for {@link GeodeRedisServer} that will start the server on the given port and bind + * to the first non-loopback address + * + * @param port The port the server will bind to, will use + * {@link GeodeRedisService#DEFAULT_REDIS_SERVER_PORT} by default + */ + public GeodeRedisServer(int port) { + this(null, port, null); + } + + /** + * Constructor for {@link GeodeRedisServer} that will start the server and bind to the given + * address and port + * + * @param bindAddress The address to which the server will attempt to bind to + * @param port The port the server will bind to, will use + * {@link GeodeRedisService#DEFAULT_REDIS_SERVER_PORT} by default if argument is less than + * or equal to 0 + */ + public GeodeRedisServer(String bindAddress, int port) { + this(bindAddress, port, null); + } + + + /** + * Constructor for {@link GeodeRedisServer} that will start the server and bind to the given + * address and port. Keep in mind that the log level configuration will only be set if a + * {@link Cache} does not already exist, if one already exists then setting that property will + * have no effect. + * + * @param bindAddress The address to which the server will attempt to bind to + * @param port The port the server will bind to, will use + * {@link GeodeRedisService#DEFAULT_REDIS_SERVER_PORT} default if argument is less than or + * equal to 0 + * @param logLevel The logging level to be used by GemFire + */ + public GeodeRedisServer(String bindAddress, int port, String logLevel) { + this.properties = new Properties(); + if (bindAddress != null) { + this.properties.setProperty(ConfigurationProperties.REDIS_BIND_ADDRESS, bindAddress); + } + this.properties.setProperty(ConfigurationProperties.REDIS_PORT, String.valueOf(port)); + if (logLevel != null) { + this.properties.setProperty(ConfigurationProperties.LOG_LEVEL, logLevel); + } + } + + /** + * This is function to call on a {@link GeodeRedisServer} instance to start it running + */ + public synchronized void start() { + if (!started) { + GemFireCacheImpl cache = startGemFire(properties); + redisService = cache.getService(GeodeRedisService.class); + redisService.start(); + started = true; + } + } + + /** + * Initializes the {@link Cache}, and creates Redis necessities Region and protects declares that + * {@link Region} to be protected. Also, every {@link GeodeRedisServer} will check for entries + * already in the meta data Region. + */ + private synchronized GemFireCacheImpl startGemFire(final Properties properties) { + Cache cache = GemFireCacheImpl.getInstance(); + if (cache != null) { + throw new CacheExistsException(cache, + "Cache must not be running to use the deprecated GeodeRedisServer class. Consider using the GeodeRedisService as part of a cache."); + } else { + CacheFactory cacheFactory = new CacheFactory(properties); + cache = cacheFactory.create(); + } + return (GemFireCacheImpl) cache; + } + + /** + * Shutdown method for {@link GeodeRedisServer}. This closes the {@link Cache}, interrupts all + * execution and forcefully closes all connections. + */ + public synchronized void shutdown() { + redisService.stop(); + } +} http://git-wip-us.apache.org/repos/asf/geode/blob/f79beb1e/geode-redis/src/main/java/org/apache/geode/redis/GeodeRedisServiceImpl.java ---------------------------------------------------------------------- diff --git a/geode-redis/src/main/java/org/apache/geode/redis/GeodeRedisServiceImpl.java b/geode-redis/src/main/java/org/apache/geode/redis/GeodeRedisServiceImpl.java index 1026d40..0cac27d 100644 --- a/geode-redis/src/main/java/org/apache/geode/redis/GeodeRedisServiceImpl.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/GeodeRedisServiceImpl.java @@ -54,7 +54,7 @@ import org.apache.geode.redis.internal.Coder; import org.apache.geode.redis.internal.ExecutionHandlerContext; import org.apache.geode.redis.internal.RedisDataType; import org.apache.geode.redis.internal.RegionProvider; -import org.apache.geode.redis.internal.hll.HyperLogLogPlus; +import org.apache.geode.internal.hll.HyperLogLogPlus; import org.apache.logging.log4j.Logger; import java.io.IOException; @@ -141,12 +141,6 @@ import java.util.concurrent.atomic.AtomicInteger; */ public class GeodeRedisServiceImpl implements GeodeRedisService { - - /** - * The default Redis port as specified by their protocol, {@value #DEFAULT_REDIS_SERVER_PORT} - */ - public static final int DEFAULT_REDIS_SERVER_PORT = 6379; - /** * The number of threads that will work on handling requests */ @@ -300,8 +294,8 @@ public class GeodeRedisServiceImpl implements GeodeRedisService { * This is function to call on a {@link GeodeRedisServiceImpl} instance to start it running */ @Override - public synchronized void start() { - if (!started) { + public void start() { + if (!started && redisPort > 0) { try { initializeRedis(); startRedisServer(); @@ -312,6 +306,11 @@ public class GeodeRedisServiceImpl implements GeodeRedisService { } } + @Override + public boolean isRunning() { + return started; + } + private void initializeRedis() { initializeRedisServiceInternals(); Region<ByteArrayWrapper, ByteArrayWrapper> stringsRegion; @@ -525,7 +524,9 @@ public class GeodeRedisServiceImpl implements GeodeRedisService { InternalDistributedSystem internalDistributedSystem = ((GemFireCacheImpl) cache).getSystem(); DistributionConfig internalDistributedSystemConfig = internalDistributedSystem.getConfig(); this.redisPort = internalDistributedSystemConfig.getRedisPort(); - if (redisPort <= 0) { // unset + if (redisPort < 0) { + return; + } else if (redisPort == 0) { // unset, don't start the server. this.redisPort = DEFAULT_REDIS_SERVER_PORT; } this.redisBindAddress = internalDistributedSystemConfig.getRedisBindAddress(); @@ -571,7 +572,7 @@ public class GeodeRedisServiceImpl implements GeodeRedisService { */ @Override public void stop() { - if (!shutdown) { + if (!shutdown && isRunning()) { if (logger.isInfoEnabled()) { logger.info("GeodeRedisServiceImpl shutting down"); } http://git-wip-us.apache.org/repos/asf/geode/blob/f79beb1e/geode-redis/src/main/java/org/apache/geode/redis/internal/RegionProvider.java ---------------------------------------------------------------------- diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/RegionProvider.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/RegionProvider.java index deacee0..9c478a1 100644 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/RegionProvider.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/RegionProvider.java @@ -34,7 +34,7 @@ import org.apache.geode.redis.GeodeRedisServiceImpl; import org.apache.geode.redis.internal.executor.ExpirationExecutor; import org.apache.geode.redis.internal.executor.ListQuery; import org.apache.geode.redis.internal.executor.SortedSetQuery; -import org.apache.geode.redis.internal.hll.HyperLogLogPlus; +import org.apache.geode.internal.hll.HyperLogLogPlus; import java.io.Closeable; import java.util.HashMap; http://git-wip-us.apache.org/repos/asf/geode/blob/f79beb1e/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hll/HllExecutor.java ---------------------------------------------------------------------- diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hll/HllExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hll/HllExecutor.java index e440c09..2114fac 100644 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hll/HllExecutor.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hll/HllExecutor.java @@ -22,10 +22,6 @@ import org.apache.geode.redis.internal.executor.AbstractExecutor; public abstract class HllExecutor extends AbstractExecutor { - public static final Double DEFAULT_HLL_STD_DEV = 0.081; - public static final Integer DEFAULT_HLL_DENSE = 18; - public static final Integer DEFAULT_HLL_SPARSE = 32; - protected final void checkAndSetDataType(ByteArrayWrapper key, ExecutionHandlerContext context) { Object oldVal = context.getRegionProvider().metaPutIfAbsent(key, RedisDataType.REDIS_HLL); if (oldVal == RedisDataType.REDIS_PROTECTED) http://git-wip-us.apache.org/repos/asf/geode/blob/f79beb1e/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hll/PFAddExecutor.java ---------------------------------------------------------------------- diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hll/PFAddExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hll/PFAddExecutor.java index d85a4ca..3787370 100644 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hll/PFAddExecutor.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hll/PFAddExecutor.java @@ -20,10 +20,12 @@ import org.apache.geode.redis.internal.Coder; import org.apache.geode.redis.internal.Command; import org.apache.geode.redis.internal.ExecutionHandlerContext; import org.apache.geode.redis.internal.RedisConstants.ArityDef; -import org.apache.geode.redis.internal.hll.HyperLogLogPlus; +import org.apache.geode.internal.hll.HyperLogLogPlus; import java.util.List; +import static org.apache.geode.internal.hll.HyperLogLog.DEFAULT_HLL_DENSE; + public class PFAddExecutor extends HllExecutor { @Override http://git-wip-us.apache.org/repos/asf/geode/blob/f79beb1e/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hll/PFCountExecutor.java ---------------------------------------------------------------------- diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hll/PFCountExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hll/PFCountExecutor.java index 1639542..dcc63a0 100644 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hll/PFCountExecutor.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hll/PFCountExecutor.java @@ -21,8 +21,8 @@ import org.apache.geode.redis.internal.Command; import org.apache.geode.redis.internal.ExecutionHandlerContext; import org.apache.geode.redis.internal.RedisConstants.ArityDef; import org.apache.geode.redis.internal.RedisDataType; -import org.apache.geode.redis.internal.hll.CardinalityMergeException; -import org.apache.geode.redis.internal.hll.HyperLogLogPlus; +import org.apache.geode.internal.hll.CardinalityMergeException; +import org.apache.geode.internal.hll.HyperLogLogPlus; import java.util.ArrayList; import java.util.List; http://git-wip-us.apache.org/repos/asf/geode/blob/f79beb1e/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hll/PFMergeExecutor.java ---------------------------------------------------------------------- diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hll/PFMergeExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hll/PFMergeExecutor.java index ee3eacf..6ca6f57 100644 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hll/PFMergeExecutor.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hll/PFMergeExecutor.java @@ -21,12 +21,14 @@ import org.apache.geode.redis.internal.Command; import org.apache.geode.redis.internal.ExecutionHandlerContext; import org.apache.geode.redis.internal.RedisConstants.ArityDef; import org.apache.geode.redis.internal.RedisDataType; -import org.apache.geode.redis.internal.hll.CardinalityMergeException; -import org.apache.geode.redis.internal.hll.HyperLogLogPlus; +import org.apache.geode.internal.hll.CardinalityMergeException; +import org.apache.geode.internal.hll.HyperLogLogPlus; import java.util.ArrayList; import java.util.List; +import static org.apache.geode.internal.hll.HyperLogLog.DEFAULT_HLL_DENSE; + public class PFMergeExecutor extends HllExecutor { @Override http://git-wip-us.apache.org/repos/asf/geode/blob/f79beb1e/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hll/Varint.java ---------------------------------------------------------------------- diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hll/Varint.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hll/Varint.java deleted file mode 100644 index da370ce..0000000 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hll/Varint.java +++ /dev/null @@ -1,238 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.geode.redis.internal.executor.hll; - -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - - -/** - * <p> - * Encodes signed and unsigned values using a common variable-length scheme, found for example in - * <a href="http://code.google.com/apis/protocolbuffers/docs/encoding.html"> Google's Protocol - * Buffers</a>. It uses fewer bytes to encode smaller values, but will use slightly more bytes to - * encode large values. - * </p> - * <p/> - * <p> - * Signed values are further encoded using so-called zig-zag encoding in order to make them - * "compatible" with variable-length encoding. - * </p> - */ -public final class Varint { - - private Varint() {} - - /** - * Encodes a value using the variable-length encoding from - * <a href="http://code.google.com/apis/protocolbuffers/docs/encoding.html"> Google Protocol - * Buffers</a>. It uses zig-zag encoding to efficiently encode signed values. If values are known - * to be nonnegative, {@link #writeUnsignedVarLong(long, DataOutput)} should be used. - * - * @param value value to encode - * @param out to write bytes to - * @throws IOException if {@link DataOutput} throws {@link IOException} - */ - public static void writeSignedVarLong(long value, DataOutput out) throws IOException { - // Great trick from http://code.google.com/apis/protocolbuffers/docs/encoding.html#types - writeUnsignedVarLong((value << 1) ^ (value >> 63), out); - } - - /** - * Encodes a value using the variable-length encoding from - * <a href="http://code.google.com/apis/protocolbuffers/docs/encoding.html"> Google Protocol - * Buffers</a>. Zig-zag is not used, so input must not be negative. If values can be negative, use - * {@link #writeSignedVarLong(long, DataOutput)} instead. This method treats negative input as - * like a large unsigned value. - * - * @param value value to encode - * @param out to write bytes to - * @throws IOException if {@link DataOutput} throws {@link IOException} - */ - public static void writeUnsignedVarLong(long value, DataOutput out) throws IOException { - while ((value & 0xFFFFFFFFFFFFFF80L) != 0L) { - out.writeByte(((int) value & 0x7F) | 0x80); - value >>>= 7; - } - out.writeByte((int) value & 0x7F); - } - - /** - * @see #writeSignedVarLong(long, DataOutput) - */ - public static void writeSignedVarInt(int value, DataOutput out) throws IOException { - // Great trick from http://code.google.com/apis/protocolbuffers/docs/encoding.html#types - writeUnsignedVarInt((value << 1) ^ (value >> 31), out); - } - - /** - * @see #writeUnsignedVarLong(long, DataOutput) - */ - public static void writeUnsignedVarInt(int value, DataOutput out) throws IOException { - while ((value & 0xFFFFFF80) != 0L) { - out.writeByte((value & 0x7F) | 0x80); - value >>>= 7; - } - out.writeByte(value & 0x7F); - } - - public static byte[] writeSignedVarInt(int value) { - // Great trick from http://code.google.com/apis/protocolbuffers/docs/encoding.html#types - return writeUnsignedVarInt((value << 1) ^ (value >> 31)); - } - - /** - * @see #writeUnsignedVarLong(long, DataOutput) - * <p/> - * This one does not use streams and is much faster. Makes a single object each time, and - * that object is a primitive array. - */ - public static byte[] writeUnsignedVarInt(int value) { - byte[] byteArrayList = new byte[10]; - int i = 0; - while ((value & 0xFFFFFF80) != 0L) { - byteArrayList[i++] = ((byte) ((value & 0x7F) | 0x80)); - value >>>= 7; - } - byteArrayList[i] = ((byte) (value & 0x7F)); - byte[] out = new byte[i + 1]; - for (; i >= 0; i--) { - out[i] = byteArrayList[i]; - } - return out; - } - - /** - * @param in to read bytes from - * @return decode value - * @throws IOException if {@link DataInput} throws {@link IOException} - * @throws IllegalArgumentException if variable-length value does not terminate after 9 bytes have - * been read - * @see #writeSignedVarLong(long, DataOutput) - */ - public static long readSignedVarLong(DataInput in) throws IOException { - long raw = readUnsignedVarLong(in); - // This undoes the trick in writeSignedVarLong() - long temp = (((raw << 63) >> 63) ^ raw) >> 1; - // This extra step lets us deal with the largest signed values by treating - // negative results from read unsigned methods as like unsigned values - // Must re-flip the top bit if the original read value had it set. - return temp ^ (raw & (1L << 63)); - } - - /** - * @param in to read bytes from - * @return decode value - * @throws IOException if {@link DataInput} throws {@link IOException} - * @throws IllegalArgumentException if variable-length value does not terminate after 9 bytes have - * been read - * @see #writeUnsignedVarLong(long, DataOutput) - */ - public static long readUnsignedVarLong(DataInput in) throws IOException { - long value = 0L; - int i = 0; - long b; - while (((b = in.readByte()) & 0x80L) != 0) { - value |= (b & 0x7F) << i; - i += 7; - if (i > 63) { - throw new IllegalArgumentException("Variable length quantity is too long"); - } - } - return value | (b << i); - } - - /** - * @throws IllegalArgumentException if variable-length value does not terminate after 5 bytes have - * been read - * @throws IOException if {@link DataInput} throws {@link IOException} - * @see #readSignedVarLong(DataInput) - */ - public static int readSignedVarInt(DataInput in) throws IOException { - int raw = readUnsignedVarInt(in); - // This undoes the trick in writeSignedVarInt() - int temp = (((raw << 31) >> 31) ^ raw) >> 1; - // This extra step lets us deal with the largest signed values by treating - // negative results from read unsigned methods as like unsigned values. - // Must re-flip the top bit if the original read value had it set. - return temp ^ (raw & (1 << 31)); - } - - /** - * @throws IllegalArgumentException if variable-length value does not terminate after 5 bytes have - * been read - * @throws IOException if {@link DataInput} throws {@link IOException} - * @see #readUnsignedVarLong(DataInput) - */ - public static int readUnsignedVarInt(DataInput in) throws IOException { - int value = 0; - int i = 0; - int b; - while (((b = in.readByte()) & 0x80) != 0) { - value |= (b & 0x7F) << i; - i += 7; - if (i > 35) { - throw new IllegalArgumentException("Variable length quantity is too long"); - } - } - return value | (b << i); - } - - public static int readSignedVarInt(byte[] bytes) { - int raw = readUnsignedVarInt(bytes); - // This undoes the trick in writeSignedVarInt() - int temp = (((raw << 31) >> 31) ^ raw) >> 1; - // This extra step lets us deal with the largest signed values by treating - // negative results from read unsigned methods as like unsigned values. - // Must re-flip the top bit if the original read value had it set. - return temp ^ (raw & (1 << 31)); - } - - public static int readUnsignedVarInt(byte[] bytes) { - int value = 0; - int i = 0; - byte rb = Byte.MIN_VALUE; - for (byte b : bytes) { - rb = b; - if ((b & 0x80) == 0) { - break; - } - value |= (b & 0x7f) << i; - i += 7; - if (i > 35) { - throw new IllegalArgumentException("Variable length quantity is too long"); - } - } - return value | (rb << i); - } - -} http://git-wip-us.apache.org/repos/asf/geode/blob/f79beb1e/geode-redis/src/main/java/org/apache/geode/redis/internal/hll/Bits.java ---------------------------------------------------------------------- diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/hll/Bits.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/hll/Bits.java deleted file mode 100644 index 92cef47..0000000 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/hll/Bits.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright (C) 2011 Clearspring Technologies, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.geode.redis.internal.hll; - -import java.io.ByteArrayInputStream; -import java.io.DataInput; -import java.io.DataInputStream; -import java.io.IOException; - -public class Bits { - - public static int[] getBits(byte[] mBytes) throws IOException { - int bitSize = mBytes.length / 4; - int[] bits = new int[bitSize]; - DataInputStream dis = new DataInputStream(new ByteArrayInputStream(mBytes)); - for (int i = 0; i < bitSize; i++) { - bits[i] = dis.readInt(); - } - return bits; - } - - /** - * This method might be better described as "byte array to int array" or "data input to int array" - */ - public static int[] getBits(DataInput dataIn, int byteLength) throws IOException { - int bitSize = byteLength / 4; - int[] bits = new int[bitSize]; - for (int i = 0; i < bitSize; i++) { - bits[i] = dataIn.readInt(); - } - return bits; - } - -} http://git-wip-us.apache.org/repos/asf/geode/blob/f79beb1e/geode-redis/src/main/java/org/apache/geode/redis/internal/hll/CardinalityMergeException.java ---------------------------------------------------------------------- diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/hll/CardinalityMergeException.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/hll/CardinalityMergeException.java deleted file mode 100644 index 561c3ef..0000000 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/hll/CardinalityMergeException.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Copyright (C) 2011 Clearspring Technologies, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.geode.redis.internal.hll; - -@SuppressWarnings("serial") -public abstract class CardinalityMergeException extends Exception { - - public CardinalityMergeException(String message) { - super(message); - } -} http://git-wip-us.apache.org/repos/asf/geode/blob/f79beb1e/geode-redis/src/main/java/org/apache/geode/redis/internal/hll/HyperLogLog.java ---------------------------------------------------------------------- diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/hll/HyperLogLog.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/hll/HyperLogLog.java deleted file mode 100644 index b89512e..0000000 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/hll/HyperLogLog.java +++ /dev/null @@ -1,336 +0,0 @@ -/* - * Copyright (C) 2012 Clearspring Technologies, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.geode.redis.internal.hll; - -import org.apache.geode.redis.internal.executor.hll.HllExecutor; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInput; -import java.io.DataInputStream; -import java.io.DataOutput; -import java.io.DataOutputStream; -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.io.Serializable; - -/** - * Java implementation of HyperLogLog (HLL) algorithm from this paper: - * <p/> - * http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf - * <p/> - * HLL is an improved version of LogLog that is capable of estimating the cardinality of a set with - * accuracy = 1.04/sqrt(m) where m = 2^b. So we can control accuracy vs space usage by increasing or - * decreasing b. - * <p/> - * The main benefit of using HLL over LL is that it only requires 64% of the space that LL does to - * get the same accuracy. - * <p/> - * This implementation implements a single counter. If a large (millions) number of counters are - * required you may want to refer to: - * <p/> - * http://dsiutils.dsi.unimi.it/ - * <p/> - * It has a more complex implementation of HLL that supports multiple counters in a single object, - * drastically reducing the java overhead from creating a large number of objects. - * <p/> - * This implementation leveraged a javascript implementation that Yammer has been working on: - * <p/> - * https://github.com/yammer/probablyjs - * <p> - * Note that this implementation does not include the long range correction function defined in the - * original paper. Empirical evidence shows that the correction function causes more harm than good. - * </p> - * <p/> - * <p> - * Users have different motivations to use different types of hashing functions. Rather than try to - * keep up with all available hash functions and to remove the concern of causing future binary - * incompatibilities this class allows clients to offer the value in hashed int or long form. This - * way clients are free to change their hash function on their own time line. We recommend using - * Google's Guava Murmur3_128 implementation as it provides good performance and speed when high - * precision is required. In our tests the 32bit MurmurHash function included in this project is - * faster and produces better results than the 32 bit murmur3 implementation google provides. - * </p> - */ -public class HyperLogLog implements ICardinality, Serializable { - - private static final long serialVersionUID = -4661220245111112301L; - private final RegisterSet registerSet; - private final int log2m; - private final double alphaMM; - - - /** - * Create a new HyperLogLog instance using the specified standard deviation. - * - * @param rsd - the relative standard deviation for the counter. smaller values create counters - * that require more space. - */ - public HyperLogLog(double rsd) { - this(log2m(rsd)); - } - - private static int log2m(double rsd) { - return (int) (Math.log((1.106 / rsd) * (1.106 / rsd)) / Math.log(2)); - } - - /** - * Create a new HyperLogLog instance. The log2m parameter defines the accuracy of the counter. The - * larger the log2m the better the accuracy. - * <p/> - * accuracy = 1.04/sqrt(2^log2m) - * - * @param log2m - the number of bits to use as the basis for the HLL instance - */ - public HyperLogLog(int log2m) { - this(log2m, new RegisterSet(1 << log2m)); - } - - /** - * Creates a new HyperLogLog instance using the given registers. Used for unmarshalling a - * serialized instance and for merging multiple counters together. - * - * @param registerSet - the initial values for the register set - */ - @Deprecated - public HyperLogLog(int log2m, RegisterSet registerSet) { - if (log2m < 0 || log2m > 30) { - throw new IllegalArgumentException( - "log2m argument is " + log2m + " and is outside the range [0, 30]"); - } - this.registerSet = registerSet; - this.log2m = log2m; - int m = 1 << this.log2m; - - alphaMM = getAlphaMM(log2m, m); - } - - @Override - public boolean offerHashed(long hashedValue) { - // j becomes the binary address determined by the first b log2m of x - // j will be between 0 and 2^log2m - final int j = (int) (hashedValue >>> (Long.SIZE - log2m)); - final int r = - Long.numberOfLeadingZeros((hashedValue << this.log2m) | (1 << (this.log2m - 1)) + 1) + 1; - return registerSet.updateIfGreater(j, r); - } - - @Override - public boolean offerHashed(int hashedValue) { - // j becomes the binary address determined by the first b log2m of x - // j will be between 0 and 2^log2m - final int j = hashedValue >>> (Integer.SIZE - log2m); - final int r = - Integer.numberOfLeadingZeros((hashedValue << this.log2m) | (1 << (this.log2m - 1)) + 1) + 1; - return registerSet.updateIfGreater(j, r); - } - - @Override - public boolean offer(Object o) { - final int x = MurmurHash.hash(o); - return offerHashed(x); - } - - - @Override - public long cardinality() { - double registerSum = 0; - int count = registerSet.count; - double zeros = 0.0; - for (int j = 0; j < registerSet.count; j++) { - int val = registerSet.get(j); - registerSum += 1.0 / (1 << val); - if (val == 0) { - zeros++; - } - } - - double estimate = alphaMM * (1 / registerSum); - - if (estimate <= (5.0 / 2.0) * count) { - // Small Range Estimate - return Math.round(linearCounting(count, zeros)); - } else { - return Math.round(estimate); - } - } - - @Override - public int sizeof() { - return registerSet.size * 4; - } - - @Override - public byte[] getBytes() throws IOException { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - DataOutput dos = new DataOutputStream(baos); - writeBytes(dos); - - return baos.toByteArray(); - } - - private void writeBytes(DataOutput serializedByteStream) throws IOException { - serializedByteStream.writeInt(log2m); - serializedByteStream.writeInt(registerSet.size * 4); - for (int x : registerSet.readOnlyBits()) { - serializedByteStream.writeInt(x); - } - } - - /** - * Add all the elements of the other set to this set. - * <p/> - * This operation does not imply a loss of precision. - * - * @param other A compatible Hyperloglog instance (same log2m) - * @throws CardinalityMergeException if other is not compatible - */ - public void addAll(HyperLogLog other) throws CardinalityMergeException { - if (this.sizeof() != other.sizeof()) { - throw new HyperLogLogMergeException("Cannot merge estimators of different sizes"); - } - - registerSet.merge(other.registerSet); - } - - @Override - public ICardinality merge(ICardinality... estimators) throws CardinalityMergeException { - HyperLogLog merged = new HyperLogLog(HllExecutor.DEFAULT_HLL_STD_DEV);// new HyperLogLog(log2m, - // new - // RegisterSet(this.registerSet.count)); - merged.addAll(this); - - if (estimators == null) { - return merged; - } - - for (ICardinality estimator : estimators) { - if (!(estimator instanceof HyperLogLog)) { - throw new HyperLogLogMergeException("Cannot merge estimators of different class"); - } - HyperLogLog hll = (HyperLogLog) estimator; - merged.addAll(hll); - } - - return merged; - } - - private Object writeReplace() { - return new SerializationHolder(this); - } - - /** - * This class exists to support Externalizable semantics for HyperLogLog objects without having to - * expose a public constructor, public write/read methods, or pretend final fields aren't final. - * - * In short, Externalizable allows you to skip some of the more verbose meta-data default - * Serializable gets you, but still includes the class name. In that sense, there is some cost to - * this holder object because it has a longer class name. I imagine people who care about - * optimizing for that have their own work-around for long class names in general, or just use a - * custom serialization framework. Therefore we make no attempt to optimize that here (eg. by - * raising this from an inner class and giving it an unhelpful name). - */ - private static class SerializationHolder implements Externalizable { - - HyperLogLog hyperLogLogHolder; - - public SerializationHolder(HyperLogLog hyperLogLogHolder) { - this.hyperLogLogHolder = hyperLogLogHolder; - } - - /** - * required for Externalizable - */ - @SuppressWarnings("unused") - public SerializationHolder() { - - } - - @Override - public void writeExternal(ObjectOutput out) throws IOException { - hyperLogLogHolder.writeBytes(out); - } - - @Override - public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - hyperLogLogHolder = Builder.build(in); - } - - private Object readResolve() { - return hyperLogLogHolder; - } - } - - public static class Builder implements IBuilder<ICardinality>, Serializable { - - private static final long serialVersionUID = -979314356097156719L; - private double rsd; - - public Builder(double rsd) { - this.rsd = rsd; - } - - @Override - public HyperLogLog build() { - return new HyperLogLog(rsd); - } - - @Override - public int sizeof() { - int log2m = log2m(rsd); - int k = 1 << log2m; - return RegisterSet.getBits(k) * 4; - } - - public static HyperLogLog build(byte[] bytes) throws IOException { - ByteArrayInputStream bais = new ByteArrayInputStream(bytes); - return build(new DataInputStream(bais)); - } - - public static HyperLogLog build(DataInput serializedByteStream) throws IOException { - int log2m = serializedByteStream.readInt(); - int byteArraySize = serializedByteStream.readInt(); - return new HyperLogLog(log2m, - new RegisterSet(1 << log2m, Bits.getBits(serializedByteStream, byteArraySize))); - } - } - - @SuppressWarnings("serial") - protected static class HyperLogLogMergeException extends CardinalityMergeException { - - public HyperLogLogMergeException(String message) { - super(message); - } - } - - protected static double getAlphaMM(final int p, final int m) { - // See the paper. - switch (p) { - case 4: - return 0.673 * m * m; - case 5: - return 0.697 * m * m; - case 6: - return 0.709 * m * m; - default: - return (0.7213 / (1 + 1.079 / m)) * m * m; - } - } - - protected static double linearCounting(int m, double V) { - return m * Math.log(m / V); - } -}