http://git-wip-us.apache.org/repos/asf/geode/blob/c6dbc6d4/geode-core/src/main/java/org/apache/geode/internal/hll/IBuilder.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/hll/IBuilder.java b/geode-core/src/main/java/org/apache/geode/internal/hll/IBuilder.java deleted file mode 100755 index 10189c8..0000000 --- a/geode-core/src/main/java/org/apache/geode/internal/hll/IBuilder.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Copyright (C) 2011 Clearspring Technologies, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.geode.internal.hll; - - -public interface IBuilder<T> { - - T build(); - - int sizeof(); -}
http://git-wip-us.apache.org/repos/asf/geode/blob/c6dbc6d4/geode-core/src/main/java/org/apache/geode/internal/hll/ICardinality.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/hll/ICardinality.java b/geode-core/src/main/java/org/apache/geode/internal/hll/ICardinality.java deleted file mode 100755 index 125b621..0000000 --- a/geode-core/src/main/java/org/apache/geode/internal/hll/ICardinality.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Copyright (C) 2011 Clearspring Technologies, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.geode.internal.hll; - - -import java.io.IOException; - - -public interface ICardinality { - - /** - * @param o stream element - * @return false if the value returned by cardinality() is unaffected by the appearance of o in - * the stream. - */ - boolean offer(Object o); - - /** - * Offer the value as a hashed long value - * - * @param hashedLong - the hash of the item to offer to the estimator - * @return false if the value returned by cardinality() is unaffected by the appearance of - * hashedLong in the stream - */ - boolean offerHashed(long hashedLong); - - /** - * Offer the value as a hashed long value - * - * @param hashedInt - the hash of the item to offer to the estimator - * @return false if the value returned by cardinality() is unaffected by the appearance of - * hashedInt in the stream - */ - boolean offerHashed(int hashedInt); - - /** - * @return the number of unique elements in the stream or an estimate thereof - */ - long cardinality(); - - /** - * @return size in bytes needed for serialization - */ - int sizeof(); - - /** - * @return byte[] - * @throws IOException - */ - byte[] getBytes() throws IOException; - - /** - * Merges estimators to produce a new estimator for the combined streams of this estimator and - * those passed as arguments. - * <p/> - * Nor this estimator nor the one passed as parameters are modified. - * - * @param estimators Zero or more compatible estimators - * @throws CardinalityMergeException If at least one of the estimators is not compatible with this - * one - */ - ICardinality merge(ICardinality... estimators) throws CardinalityMergeException; -} http://git-wip-us.apache.org/repos/asf/geode/blob/c6dbc6d4/geode-core/src/main/java/org/apache/geode/internal/hll/MurmurHash.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/hll/MurmurHash.java b/geode-core/src/main/java/org/apache/geode/internal/hll/MurmurHash.java deleted file mode 100755 index be19e29..0000000 --- a/geode-core/src/main/java/org/apache/geode/internal/hll/MurmurHash.java +++ /dev/null @@ -1,214 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.geode.internal.hll; - - -/** - * This is a very fast, non-cryptographic hash suitable for general hash-based lookup. See - * http://murmurhash.googlepages.com/ for more details. - * <p/> - * <p> - * The C version of MurmurHash 2.0 found at that site was ported to Java by Andrzej Bialecki (ab at - * getopt org). - * </p> - */ -public class MurmurHash { - public static int hash(Object o) { - if (o == null) { - return 0; - } - if (o instanceof Long) { - return hashLong((Long) o); - } - if (o instanceof Integer) { - return hashLong((Integer) o); - } - if (o instanceof Double) { - return hashLong(Double.doubleToRawLongBits((Double) o)); - } - if (o instanceof Float) { - return hashLong(Float.floatToRawIntBits((Float) o)); - } - if (o instanceof String) { - return hash(((String) o).getBytes()); - } - if (o instanceof byte[]) { - return hash((byte[]) o); - } - return hash(o.toString()); - } - - public static int hash(byte[] data) { - return hash(data, 0, data.length, -1); - } - - public static int hash(byte[] data, int seed) { - return hash(data, 0, data.length, seed); - } - - public static int hash(byte[] data, int offset, int length, int seed) { - int m = 0x5bd1e995; - int r = 24; - - int h = seed ^ length; - - int len_4 = length >> 2; - - for (int i = 0; i < len_4; i++) { - int i_4 = i << 2; - int k = data[offset + i_4 + 3]; - k = k << 8; - k = k | (data[offset + i_4 + 2] & 0xff); - k = k << 8; - k = k | (data[offset + i_4 + 1] & 0xff); - k = k << 8; - k = k | (data[offset + i_4 + 0] & 0xff); - k *= m; - k ^= k >>> r; - k *= m; - h *= m; - h ^= k; - } - - // avoid calculating modulo - int len_m = len_4 << 2; - int left = length - len_m; - - if (left != 0) { - if (left >= 3) { - h ^= (int) data[offset + length - 3] << 16; - } - if (left >= 2) { - h ^= (int) data[offset + length - 2] << 8; - } - if (left >= 1) { - h ^= (int) data[offset + length - 1]; - } - - h *= m; - } - - h ^= h >>> 13; - h *= m; - h ^= h >>> 15; - - return h; - } - - public static int hashLong(long data) { - int m = 0x5bd1e995; - int r = 24; - - int h = 0; - - int k = (int) data * m; - k ^= k >>> r; - h ^= k * m; - - k = (int) (data >> 32) * m; - k ^= k >>> r; - h *= m; - h ^= k * m; - - h ^= h >>> 13; - h *= m; - h ^= h >>> 15; - - return h; - } - - public static long hash64(Object o) { - if (o == null) { - return 0l; - } else if (o instanceof String) { - final byte[] bytes = ((String) o).getBytes(); - return hash64(bytes, bytes.length); - } else if (o instanceof byte[]) { - final byte[] bytes = (byte[]) o; - return hash64(bytes, bytes.length); - } - return hash64(o.toString()); - } - - // 64 bit implementation copied from here: https://github.com/tnm/murmurhash-java - - /** - * Generates 64 bit hash from byte array with default seed value. - * - * @param data byte array to hash - * @param length length of the array to hash - * @return 64 bit hash of the given string - */ - public static long hash64(final byte[] data, int length) { - return hash64(data, length, 0xe17a1465); - } - - - /** - * Generates 64 bit hash from byte array of the given length and seed. - * - * @param data byte array to hash - * @param length length of the array to hash - * @param seed initial seed value - * @return 64 bit hash of the given array - */ - public static long hash64(final byte[] data, int length, int seed) { - final long m = 0xc6a4a7935bd1e995L; - final int r = 47; - - long h = (seed & 0xffffffffl) ^ (length * m); - - int length8 = length / 8; - - for (int i = 0; i < length8; i++) { - final int i8 = i * 8; - long k = ((long) data[i8 + 0] & 0xff) + (((long) data[i8 + 1] & 0xff) << 8) - + (((long) data[i8 + 2] & 0xff) << 16) + (((long) data[i8 + 3] & 0xff) << 24) - + (((long) data[i8 + 4] & 0xff) << 32) + (((long) data[i8 + 5] & 0xff) << 40) - + (((long) data[i8 + 6] & 0xff) << 48) + (((long) data[i8 + 7] & 0xff) << 56); - - k *= m; - k ^= k >>> r; - k *= m; - - h ^= k; - h *= m; - } - - switch (length % 8) { - case 7: - h ^= (long) (data[(length & ~7) + 6] & 0xff) << 48; - case 6: - h ^= (long) (data[(length & ~7) + 5] & 0xff) << 40; - case 5: - h ^= (long) (data[(length & ~7) + 4] & 0xff) << 32; - case 4: - h ^= (long) (data[(length & ~7) + 3] & 0xff) << 24; - case 3: - h ^= (long) (data[(length & ~7) + 2] & 0xff) << 16; - case 2: - h ^= (long) (data[(length & ~7) + 1] & 0xff) << 8; - case 1: - h ^= (long) (data[length & ~7] & 0xff); - h *= m; - }; - - h ^= h >>> r; - h *= m; - h ^= h >>> r; - - return h; - } -} http://git-wip-us.apache.org/repos/asf/geode/blob/c6dbc6d4/geode-core/src/main/java/org/apache/geode/internal/hll/RegisterSet.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/hll/RegisterSet.java b/geode-core/src/main/java/org/apache/geode/internal/hll/RegisterSet.java deleted file mode 100755 index cad691b..0000000 --- a/geode-core/src/main/java/org/apache/geode/internal/hll/RegisterSet.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Copyright (C) 2012 Clearspring Technologies, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.geode.internal.hll; - - -public class RegisterSet { - - public final static int LOG2_BITS_PER_WORD = 6; - public final static int REGISTER_SIZE = 5; - - public final int count; - public final int size; - - private final int[] M; - - public RegisterSet(int count) { - this(count, null); - } - - public RegisterSet(int count, int[] initialValues) { - this.count = count; - - if (initialValues == null) { - this.M = new int[getSizeForCount(count)]; - } else { - this.M = initialValues; - } - this.size = this.M.length; - } - - public static int getBits(int count) { - return count / LOG2_BITS_PER_WORD; - } - - public static int getSizeForCount(int count) { - int bits = getBits(count); - if (bits == 0) { - return 1; - } else if (bits % Integer.SIZE == 0) { - return bits; - } else { - return bits + 1; - } - } - - public void set(int position, int value) { - int bucketPos = position / LOG2_BITS_PER_WORD; - int shift = REGISTER_SIZE * (position - (bucketPos * LOG2_BITS_PER_WORD)); - this.M[bucketPos] = (this.M[bucketPos] & ~(0x1f << shift)) | (value << shift); - } - - public int get(int position) { - int bucketPos = position / LOG2_BITS_PER_WORD; - int shift = REGISTER_SIZE * (position - (bucketPos * LOG2_BITS_PER_WORD)); - return (this.M[bucketPos] & (0x1f << shift)) >>> shift; - } - - public boolean updateIfGreater(int position, int value) { - int bucket = position / LOG2_BITS_PER_WORD; - int shift = REGISTER_SIZE * (position - (bucket * LOG2_BITS_PER_WORD)); - int mask = 0x1f << shift; - - // Use long to avoid sign issues with the left-most shift - long curVal = this.M[bucket] & mask; - long newVal = value << shift; - if (curVal < newVal) { - this.M[bucket] = (int) ((this.M[bucket] & ~mask) | newVal); - return true; - } else { - return false; - } - } - - public void merge(RegisterSet that) { - for (int bucket = 0; bucket < M.length; bucket++) { - int word = 0; - for (int j = 0; j < LOG2_BITS_PER_WORD; j++) { - int mask = 0x1f << (REGISTER_SIZE * j); - - int thisVal = (this.M[bucket] & mask); - int thatVal = (that.M[bucket] & mask); - word |= (thisVal < thatVal) ? thatVal : thisVal; - } - this.M[bucket] = word; - } - } - - int[] readOnlyBits() { - return M; - } - - public int[] bits() { - int[] copy = new int[size]; - System.arraycopy(M, 0, copy, 0, M.length); - return copy; - } -} http://git-wip-us.apache.org/repos/asf/geode/blob/c6dbc6d4/geode-core/src/main/java/org/apache/geode/management/internal/cli/domain/FixedPartitionAttributesInfo.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/cli/domain/FixedPartitionAttributesInfo.java b/geode-core/src/main/java/org/apache/geode/management/internal/cli/domain/FixedPartitionAttributesInfo.java index eb0435a..673f1bc 100644 --- a/geode-core/src/main/java/org/apache/geode/management/internal/cli/domain/FixedPartitionAttributesInfo.java +++ b/geode-core/src/main/java/org/apache/geode/management/internal/cli/domain/FixedPartitionAttributesInfo.java @@ -16,7 +16,6 @@ package org.apache.geode.management.internal.cli.domain; import java.io.Serializable; -import io.netty.util.internal.StringUtil; import org.apache.commons.lang.StringUtils; import org.apache.geode.cache.FixedPartitionAttributes; http://git-wip-us.apache.org/repos/asf/geode/blob/c6dbc6d4/geode-core/src/main/java/org/apache/geode/redis/GeodeRedisServer.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/redis/GeodeRedisServer.java b/geode-core/src/main/java/org/apache/geode/redis/GeodeRedisServer.java deleted file mode 100644 index 4c97c98..0000000 --- a/geode-core/src/main/java/org/apache/geode/redis/GeodeRedisServer.java +++ /dev/null @@ -1,729 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.geode.redis; - -import static org.apache.geode.distributed.ConfigurationProperties.*; - -import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.UnknownHostException; -import java.util.Collection; -import java.util.Map.Entry; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.atomic.AtomicInteger; - -import io.netty.bootstrap.ServerBootstrap; -import io.netty.buffer.PooledByteBufAllocator; -import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelOption; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.ServerChannel; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.oio.OioEventLoopGroup; -import io.netty.channel.socket.SocketChannel; -import io.netty.channel.socket.nio.NioServerSocketChannel; -import io.netty.channel.socket.oio.OioServerSocketChannel; -import io.netty.util.concurrent.Future; -import org.apache.geode.cache.*; -import org.apache.geode.redis.internal.ByteArrayWrapper; -import org.apache.geode.redis.internal.ByteToCommandDecoder; -import org.apache.geode.redis.internal.Coder; -import org.apache.geode.redis.internal.ExecutionHandlerContext; -import org.apache.geode.redis.internal.RedisDataType; -import org.apache.geode.redis.internal.RegionProvider; - -import org.apache.geode.InternalGemFireError; -import org.apache.geode.LogWriter; -import org.apache.geode.cache.util.CacheListenerAdapter; -import org.apache.geode.distributed.internal.InternalDistributedSystem; -import org.apache.geode.internal.cache.GemFireCacheImpl; -import org.apache.geode.internal.cache.InternalRegionArguments; -import org.apache.geode.internal.hll.HyperLogLogPlus; -import org.apache.geode.internal.i18n.LocalizedStrings; -import org.apache.geode.internal.net.SocketCreator; - -/** - * The GeodeRedisServer is a server that understands the Redis protocol. As commands are sent to the - * server, each command is picked up by a thread, interpreted and then executed and a response is - * sent back to the client. The default connection port is 6379 but that can be altered when run - * through GFSH or started through the provided static main class. - * <p> - * Each Redis data type instance is stored in a separate {@link Region} except for the Strings and - * HyperLogLogs which are collectively stored in one Region respectively. That Region along with a - * meta data region used internally are protected so the client may not store keys with the name - * {@link GeodeRedisServer#REDIS_META_DATA_REGION} or {@link GeodeRedisServer#STRING_REGION}. The - * default Region type is {@link RegionShortcut#PARTITION} although this can be changed by - * specifying the SystemProperty {@value #DEFAULT_REGION_SYS_PROP_NAME} to a type defined by - * {@link RegionShortcut}. If the {@link GeodeRedisServer#NUM_THREADS_SYS_PROP_NAME} system property - * is set to 0, one thread per client will be created. Otherwise a worker thread pool of specified - * size is used or a default size of 4 * {@link Runtime#availableProcessors()} if the property is - * not set. - * <p> - * Setting the AUTH password requires setting the property "redis-password" just as "redis-port" - * would be in xml or through GFSH. - * <p> - * The supported commands are as follows: - * <p> - * Supported String commands - APPEND, BITCOUNT, BITOP, BITPOS, DECR, DECRBY, GET, GETBIT, GETRANGE, - * GETSET, INCR, INCRBY, INCRBYFLOAT, MGET, MSET, MSETNX, PSETEX, SET, SETBIT, SETEX, SETNX, STRLEN - * <p> - * Supported List commands - LINDEX, LLEN, LPOP, LPUSH, LPUSHX, LRANGE, LREM, LSET, LTRIM, RPOP, - * RPUSH, RPUSHX - * <p> - * Supported Hash commands - HDEL, HEXISTS, HGET, HGETALL, HINCRBY, HINCRBYFLOAT, HKEYS, HMGET, - * HMSET, HSETNX, HLEN, HSCAN, HSET, HVALS - * <p> - * Supported Set commands - SADD, SCARD, SDIFF, SDIFFSTORE, SINTER, SINTERSTORE, SISMEMBER, - * SMEMBERS, SMOVE, SREM, SPOP, SRANDMEMBER, SCAN, SUNION, SUNIONSTORE - * <p> - * Supported SortedSet commands - ZADD, ZCARD, ZCOUNT, ZINCRBY, ZLEXCOUNT, ZRANGE, ZRANGEBYLEX, - * ZRANGEBYSCORE, ZRANK, ZREM, ZREMRANGEBYLEX, ZREMRANGEBYRANK, ZREMRANGEBYSCORE, ZREVRANGE, - * ZREVRANGEBYSCORE, ZREVRANK, ZSCAN, ZSCORE - * <p> - * Supported HyperLogLog commands - PFADD, PFCOUNT, PFMERGE - * <p> - * Supported Keys commands - DEL, DBSIZE, EXISTS, EXPIRE, EXPIREAT, FLUSHDB, FLUSHALL, KEYS, - * PERSIST, PEXPIRE, PEXPIREAT, PTTL, SCAN, TTL - * <p> - * Supported Transaction commands - DISCARD, EXEC, MULTI - * <P> - * Supported Server commands - AUTH, ECHO, PING, TIME, QUIT - * <p> - * <p> - * The command executors are not explicitly documented but the functionality can be found at - * <a href="http://redis.io/commands">Redis Commands</a> - * <p> - * Exceptions to the Redis Commands Documents: - * <p> - * <ul> - * <li>Any command that removes keys and returns a count of removed entries will not return a total - * remove count but rather a count of how many entries have been removed that existed on the local - * vm, though all entries will be removed</li> - * <li>Any command that returns a count of newly set members has an unspecified return value. The - * command will work just as the Redis protocol states but the count will not necessary reflect the - * number set compared to overridden.</li> - * <li>Transactions work just as they would on a Redis instance, they are local transaction. - * Transactions cannot be executed on data that is not local to the executing server, that is on a - * partitioned region in a different server instance or on a persistent region that does not have - * transactions enabled. Also, you cannot watch or unwatch keys as all keys within a GemFire - * transaction are watched by default.</li> - * </ul> - * - */ - -public class GeodeRedisServer { - - /** - * Thread used to start main method - */ - private static Thread mainThread = null; - - /** - * The default Redis port as specified by their protocol, {@value #DEFAULT_REDIS_SERVER_PORT} - */ - public static final int DEFAULT_REDIS_SERVER_PORT = 6379; - - /** - * The number of threads that will work on handling requests - */ - private final int numWorkerThreads; - - /** - * The number of threads that will work socket selectors - */ - private final int numSelectorThreads; - - /** - * The actual port being used by the server - */ - private final int serverPort; - - /** - * The address to bind to - */ - private final String bindAddress; - - /** - * Connection timeout in milliseconds - */ - private static final int connectTimeoutMillis = 1000; - - /** - * Temporary constant whether to use old single thread per connection model for worker group - */ - private boolean singleThreadPerConnection; - - /** - * Logging level - */ - private final String logLevel; - - /** - * The cache instance pointer on this vm - */ - private Cache cache; - - /** - * Channel to be closed when shutting down - */ - private Channel serverChannel; - - /** - * Gem logwriter - */ - private LogWriter logger; - - private RegionProvider regionCache; - - private final MetaCacheListener metaListener; - - private EventLoopGroup bossGroup; - private EventLoopGroup workerGroup; - private final static int numExpirationThreads = 1; - private final ScheduledExecutorService expirationExecutor; - - /** - * Map of futures to be executed for key expirations - */ - private final ConcurrentMap<ByteArrayWrapper, ScheduledFuture<?>> expirationFutures; - - - /** - * The field that defines the name of the {@link Region} which holds all of the strings. The - * current value of this field is {@value #STRING_REGION}. - */ - public static final String STRING_REGION = "ReDiS_StRiNgS"; - - /** - * The field that defines the name of the {@link Region} which holds all of the HyperLogLogs. The - * current value of this field is {@value #HLL_REGION}. - */ - public static final String HLL_REGION = "ReDiS_HlL"; - - /** - * The field that defines the name of the {@link Region} which holds all of the Redis meta data. - * The current value of this field is {@value #REDIS_META_DATA_REGION}. - */ - public static final String REDIS_META_DATA_REGION = "__ReDiS_MeTa_DaTa"; - - /** - * The system property name used to set the default {@link Region} creation type. The property - * name is {@value #DEFAULT_REGION_SYS_PROP_NAME} and the acceptable values are types defined by - * {@link RegionShortcut}, i.e. "PARTITION" would be used for {@link RegionShortcut#PARTITION}. - */ - public static final String DEFAULT_REGION_SYS_PROP_NAME = "gemfireredis.regiontype"; - - /** - * System property name that can be used to set the number of threads to be used by the - * GeodeRedisServer - */ - public static final String NUM_THREADS_SYS_PROP_NAME = "gemfireredis.numthreads"; - - /** - * The actual {@link RegionShortcut} type specified by the system property - * {@value #DEFAULT_REGION_SYS_PROP_NAME}. - */ - public final RegionShortcut DEFAULT_REGION_TYPE; - - private boolean shutdown; - private boolean started; - - /** - * Determine the {@link RegionShortcut} type from a String value. If the String value doesn't map - * to a RegionShortcut type then {@link RegionShortcut#PARTITION} will be used by default. - * - * @return {@link RegionShortcut} - */ - private static RegionShortcut setRegionType() { - String regionType = System.getProperty(DEFAULT_REGION_SYS_PROP_NAME, "PARTITION"); - RegionShortcut type; - try { - type = RegionShortcut.valueOf(regionType); - } catch (Exception e) { - type = RegionShortcut.PARTITION; - } - return type; - } - - /** - * Helper method to set the number of worker threads - * - * @return If the System property {@value #NUM_THREADS_SYS_PROP_NAME} is set then that number is - * used, otherwise 4 * # of cores - */ - private int setNumWorkerThreads() { - String prop = System.getProperty(NUM_THREADS_SYS_PROP_NAME); - int numCores = Runtime.getRuntime().availableProcessors(); - int def = 4 * numCores; - if (prop == null || prop.isEmpty()) - return def; - int threads; - try { - threads = Integer.parseInt(prop); - } catch (NumberFormatException e) { - return def; - } - return threads; - } - - /** - * Constructor for {@link GeodeRedisServer} that will start the server on the given port and bind - * to the first non-loopback address - * - * @param port The port the server will bind to, will use {@value #DEFAULT_REDIS_SERVER_PORT} by - * default - */ - public GeodeRedisServer(int port) { - this(null, port, null); - } - - /** - * Constructor for {@link GeodeRedisServer} that will start the server and bind to the given - * address and port - * - * @param bindAddress The address to which the server will attempt to bind to - * @param port The port the server will bind to, will use {@value #DEFAULT_REDIS_SERVER_PORT} by - * default if argument is less than or equal to 0 - */ - public GeodeRedisServer(String bindAddress, int port) { - this(bindAddress, port, null); - } - - - /** - * Constructor for {@link GeodeRedisServer} that will start the server and bind to the given - * address and port. Keep in mind that the log level configuration will only be set if a - * {@link Cache} does not already exist, if one already exists then setting that property will - * have no effect. - * - * @param bindAddress The address to which the server will attempt to bind to - * @param port The port the server will bind to, will use {@value #DEFAULT_REDIS_SERVER_PORT} by - * default if argument is less than or equal to 0 - * @param logLevel The logging level to be used by GemFire - */ - public GeodeRedisServer(String bindAddress, int port, String logLevel) { - if (port <= 0) - this.serverPort = DEFAULT_REDIS_SERVER_PORT; - else - this.serverPort = port; - this.bindAddress = bindAddress; - this.logLevel = logLevel; - this.numWorkerThreads = setNumWorkerThreads(); - if (this.numWorkerThreads == 0) - this.singleThreadPerConnection = true; - this.numSelectorThreads = 1; - this.metaListener = new MetaCacheListener(); - this.expirationFutures = new ConcurrentHashMap<ByteArrayWrapper, ScheduledFuture<?>>(); - this.expirationExecutor = - Executors.newScheduledThreadPool(numExpirationThreads, new ThreadFactory() { - private final AtomicInteger counter = new AtomicInteger(); - - @Override - public Thread newThread(Runnable r) { - Thread t = new Thread(r); - t.setName("GemFireRedis-ScheduledExecutor-" + counter.incrementAndGet()); - t.setDaemon(true); - return t; - } - - }); - this.DEFAULT_REGION_TYPE = setRegionType(); - this.shutdown = false; - this.started = false; - } - - /** - * Helper method to get the host name to bind to - * - * @return The InetAddress to bind to - * @throws UnknownHostException - */ - private InetAddress getBindAddress() throws UnknownHostException { - return this.bindAddress == null || this.bindAddress.isEmpty() ? SocketCreator.getLocalHost() - : InetAddress.getByName(this.bindAddress); - } - - /** - * This is function to call on a {@link GeodeRedisServer} instance to start it running - */ - public synchronized void start() { - if (!started) { - try { - startGemFire(); - initializeRedis(); - startRedisServer(); - } catch (IOException e) { - throw new RuntimeException("Could not start Server", e); - } catch (InterruptedException e) { - throw new RuntimeException("Could not start Server", e); - } - started = true; - } - } - - /** - * Initializes the {@link Cache}, and creates Redis necessities Region and protects declares that - * {@link Region} to be protected. Also, every {@link GeodeRedisServer} will check for entries - * already in the meta data Region. - */ - private void startGemFire() { - Cache c = GemFireCacheImpl.getInstance(); - if (c == null) { - synchronized (GeodeRedisServer.class) { - c = GemFireCacheImpl.getInstance(); - if (c == null) { - CacheFactory cacheFactory = new CacheFactory(); - if (logLevel != null) - cacheFactory.set(LOG_LEVEL, logLevel); - c = cacheFactory.create(); - } - } - } - this.cache = c; - this.logger = c.getLogger(); - } - - private void initializeRedis() { - synchronized (this.cache) { - Region<ByteArrayWrapper, ByteArrayWrapper> stringsRegion; - - Region<ByteArrayWrapper, HyperLogLogPlus> hLLRegion; - Region<String, RedisDataType> redisMetaData; - GemFireCacheImpl gemFireCache = (GemFireCacheImpl) cache; - try { - if ((stringsRegion = cache.getRegion(STRING_REGION)) == null) { - RegionFactory<ByteArrayWrapper, ByteArrayWrapper> regionFactory = - gemFireCache.createRegionFactory(this.DEFAULT_REGION_TYPE); - stringsRegion = regionFactory.create(STRING_REGION); - } - if ((hLLRegion = cache.getRegion(HLL_REGION)) == null) { - RegionFactory<ByteArrayWrapper, HyperLogLogPlus> regionFactory = - gemFireCache.createRegionFactory(this.DEFAULT_REGION_TYPE); - hLLRegion = regionFactory.create(HLL_REGION); - } - if ((redisMetaData = cache.getRegion(REDIS_META_DATA_REGION)) == null) { - AttributesFactory af = new AttributesFactory(); - af.addCacheListener(metaListener); - af.setDataPolicy(DataPolicy.REPLICATE); - InternalRegionArguments ira = - new InternalRegionArguments().setInternalRegion(true).setIsUsedForMetaRegion(true); - redisMetaData = gemFireCache.createVMRegion(REDIS_META_DATA_REGION, af.create(), ira); - } - } catch (IOException | ClassNotFoundException e) { - // only if loading snapshot, not here - InternalGemFireError assErr = new InternalGemFireError( - LocalizedStrings.GemFireCache_UNEXPECTED_EXCEPTION.toLocalizedString()); - assErr.initCause(e); - throw assErr; - } - this.regionCache = new RegionProvider(stringsRegion, hLLRegion, redisMetaData, - expirationFutures, expirationExecutor, this.DEFAULT_REGION_TYPE); - redisMetaData.put(REDIS_META_DATA_REGION, RedisDataType.REDIS_PROTECTED); - redisMetaData.put(HLL_REGION, RedisDataType.REDIS_PROTECTED); - redisMetaData.put(STRING_REGION, RedisDataType.REDIS_PROTECTED); - } - checkForRegions(); - } - - private void checkForRegions() { - Collection<Entry<String, RedisDataType>> entrySet = this.regionCache.metaEntrySet(); - for (Entry<String, RedisDataType> entry : entrySet) { - String regionName = entry.getKey(); - RedisDataType type = entry.getValue(); - Region<?, ?> newRegion = cache.getRegion(regionName); - if (newRegion == null && type != RedisDataType.REDIS_STRING && type != RedisDataType.REDIS_HLL - && type != RedisDataType.REDIS_PROTECTED) { - try { - this.regionCache - .createRemoteRegionReferenceLocally(Coder.stringToByteArrayWrapper(regionName), type); - } catch (Exception e) { - if (logger.errorEnabled()) - logger.error(e); - } - } - } - } - - /** - * Helper method to start the server listening for connections. The server is bound to the port - * specified by {@link GeodeRedisServer#serverPort} - * - * @throws IOException - * @throws InterruptedException - */ - private void startRedisServer() throws IOException, InterruptedException { - ThreadFactory selectorThreadFactory = new ThreadFactory() { - private final AtomicInteger counter = new AtomicInteger(); - - @Override - public Thread newThread(Runnable r) { - Thread t = new Thread(r); - t.setName("GeodeRedisServer-SelectorThread-" + counter.incrementAndGet()); - t.setDaemon(true); - return t; - } - - }; - - ThreadFactory workerThreadFactory = new ThreadFactory() { - private final AtomicInteger counter = new AtomicInteger(); - - @Override - public Thread newThread(Runnable r) { - Thread t = new Thread(r); - t.setName("GeodeRedisServer-WorkerThread-" + counter.incrementAndGet()); - return t; - } - - }; - - bossGroup = null; - workerGroup = null; - Class<? extends ServerChannel> socketClass = null; - if (singleThreadPerConnection) { - bossGroup = new OioEventLoopGroup(Integer.MAX_VALUE, selectorThreadFactory); - workerGroup = new OioEventLoopGroup(Integer.MAX_VALUE, workerThreadFactory); - socketClass = OioServerSocketChannel.class; - } else { - bossGroup = new NioEventLoopGroup(this.numSelectorThreads, selectorThreadFactory); - workerGroup = new NioEventLoopGroup(this.numWorkerThreads, workerThreadFactory); - socketClass = NioServerSocketChannel.class; - } - InternalDistributedSystem system = (InternalDistributedSystem) cache.getDistributedSystem(); - String pwd = system.getConfig().getRedisPassword(); - final byte[] pwdB = Coder.stringToBytes(pwd); - ServerBootstrap b = new ServerBootstrap(); - b.group(bossGroup, workerGroup).channel(socketClass) - .childHandler(new ChannelInitializer<SocketChannel>() { - @Override - public void initChannel(SocketChannel ch) throws Exception { - if (logger.fineEnabled()) - logger.fine("GeodeRedisServer-Connection established with " + ch.remoteAddress()); - ChannelPipeline p = ch.pipeline(); - p.addLast(ByteToCommandDecoder.class.getSimpleName(), new ByteToCommandDecoder()); - p.addLast(ExecutionHandlerContext.class.getSimpleName(), - new ExecutionHandlerContext(ch, cache, regionCache, GeodeRedisServer.this, pwdB)); - } - }).option(ChannelOption.SO_REUSEADDR, true).option(ChannelOption.SO_RCVBUF, getBufferSize()) - .childOption(ChannelOption.SO_KEEPALIVE, true) - .childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, GeodeRedisServer.connectTimeoutMillis) - .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); - - // Bind and start to accept incoming connections. - ChannelFuture f = b.bind(new InetSocketAddress(getBindAddress(), serverPort)).sync(); - if (this.logger.infoEnabled()) { - String logMessage = "GeodeRedisServer started {" + getBindAddress() + ":" + serverPort - + "}, Selector threads: " + this.numSelectorThreads; - if (this.singleThreadPerConnection) - logMessage += ", One worker thread per connection"; - else - logMessage += ", Worker threads: " + this.numWorkerThreads; - this.logger.info(logMessage); - } - this.serverChannel = f.channel(); - } - - /** - * Takes an entry event and processes it. If the entry denotes that a - * {@link RedisDataType#REDIS_LIST} or {@link RedisDataType#REDIS_SORTEDSET} was created then this - * function will call the necessary calls to create the parameterized queries for those keys. - * - * @param event EntryEvent from meta data region - */ - private void afterKeyCreate(EntryEvent<String, RedisDataType> event) { - if (event.isOriginRemote()) { - final String key = (String) event.getKey(); - final RedisDataType value = event.getNewValue(); - if (value != RedisDataType.REDIS_STRING && value != RedisDataType.REDIS_HLL - && value != RedisDataType.REDIS_PROTECTED) { - try { - this.regionCache.createRemoteRegionReferenceLocally(Coder.stringToByteArrayWrapper(key), - value); - } catch (RegionDestroyedException ignore) { // Region already destroyed, ignore - } - } - } - } - - /** - * When a key is removed then this function will make sure the associated queries with the key are - * also removed from each vm to avoid unnecessary data retention - */ - private void afterKeyDestroy(EntryEvent<String, RedisDataType> event) { - if (event.isOriginRemote()) { - final String key = (String) event.getKey(); - final RedisDataType value = event.getOldValue(); - if (value != null && value != RedisDataType.REDIS_STRING && value != RedisDataType.REDIS_HLL - && value != RedisDataType.REDIS_PROTECTED) { - ByteArrayWrapper kW = Coder.stringToByteArrayWrapper(key); - Region<?, ?> r = this.regionCache.getRegion(kW); - if (r != null) { - this.regionCache.removeRegionReferenceLocally(kW, value); - } - } - } - } - - private final class MetaCacheListener extends CacheListenerAdapter<String, RedisDataType> { - - @Override - public void afterCreate(EntryEvent<String, RedisDataType> event) { - afterKeyCreate(event); - } - - @Override - public void afterDestroy(EntryEvent<String, RedisDataType> event) { - afterKeyDestroy(event); - } - } - - /** - * Helper method to get GemFire set socket buffer size, possibly a default of 32k - * - * @return Buffer size to use for server - */ - private int getBufferSize() { - InternalDistributedSystem system = (InternalDistributedSystem) cache.getDistributedSystem(); - return system.getConfig().getSocketBufferSize(); - } - - /** - * Shutdown method for {@link GeodeRedisServer}. This closes the {@link Cache}, interrupts all - * execution and forcefully closes all connections. - */ - public synchronized void shutdown() { - if (!shutdown) { - if (logger.infoEnabled()) - logger.info("GeodeRedisServer shutting down"); - ChannelFuture closeFuture = this.serverChannel.closeFuture(); - Future<?> c = workerGroup.shutdownGracefully(); - Future<?> c2 = bossGroup.shutdownGracefully(); - this.serverChannel.close(); - c.syncUninterruptibly(); - c2.syncUninterruptibly(); - this.regionCache.close(); - if (mainThread != null) - mainThread.interrupt(); - for (ScheduledFuture<?> f : this.expirationFutures.values()) - f.cancel(true); - this.expirationFutures.clear(); - this.expirationExecutor.shutdownNow(); - closeFuture.syncUninterruptibly(); - shutdown = true; - } - } - - /** - * Static main method that allows the {@link GeodeRedisServer} to be started from the command - * line. The supported command line arguments are - * <p> - * -port= <br> - * -bind-address= <br> - * -log-level= - * - * @param args Command line args - */ - public static void main(String[] args) { - int port = DEFAULT_REDIS_SERVER_PORT; - String bindAddress = null; - String logLevel = null; - for (String arg : args) { - if (arg.startsWith("-port")) - port = getPort(arg); - else if (arg.startsWith("-bind-address")) - bindAddress = getBindAddress(arg); - else if (arg.startsWith("-log-level")) - logLevel = getLogLevel(arg); - } - mainThread = Thread.currentThread(); - GeodeRedisServer server = new GeodeRedisServer(bindAddress, port, logLevel); - server.start(); - while (true) { - try { - Thread.sleep(Long.MAX_VALUE); - } catch (InterruptedException e1) { - break; - } catch (Exception e) { - } - } - } - - /** - * Helper method to parse the port to a number - * - * @param arg String where the argument is - * @return The port number when the correct syntax was used, otherwise will return - * {@link #DEFAULT_REDIS_SERVER_PORT} - */ - private static int getPort(String arg) { - int port = DEFAULT_REDIS_SERVER_PORT; - if (arg != null && arg.length() > 6) { - if (arg.startsWith("-port")) { - String p = arg.substring(arg.indexOf('=') + 1); - p = p.trim(); - try { - port = Integer.parseInt(p); - } catch (NumberFormatException e) { - System.out.println("Unable to parse port, using default port"); - } - } - } - return port; - } - - /** - * Helper method to parse bind address - * - * @param arg String holding bind address - * @return Bind address - */ - private static String getBindAddress(String arg) { - String address = null; - if (arg != null && arg.length() > 14) { - if (arg.startsWith("-bind-address")) { - String p = arg.substring(arg.indexOf('=') + 1); - address = p.trim(); - } - } - return address; - } - - /** - * Helper method to parse log level - * - * @param arg String holding log level - * @return Log level - */ - private static String getLogLevel(String arg) { - String logLevel = null; - if (arg != null && arg.length() > 11) { - if (arg.startsWith("-log-level")) { - String p = arg.substring(arg.indexOf('=') + 1); - logLevel = p.trim(); - } - } - return logLevel; - } - -} http://git-wip-us.apache.org/repos/asf/geode/blob/c6dbc6d4/geode-core/src/main/java/org/apache/geode/redis/GeodeRedisService.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/redis/GeodeRedisService.java b/geode-core/src/main/java/org/apache/geode/redis/GeodeRedisService.java new file mode 100644 index 0000000..e625bb9 --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/redis/GeodeRedisService.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.redis; + +import org.apache.geode.internal.cache.CacheService; + +/** + * Created by ukohlmeyer on 2/9/17. + */ +public interface GeodeRedisService extends CacheService { + + void start(); + + void stop(); +} http://git-wip-us.apache.org/repos/asf/geode/blob/c6dbc6d4/geode-core/src/main/java/org/apache/geode/redis/internal/ByteArrayWrapper.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/redis/internal/ByteArrayWrapper.java b/geode-core/src/main/java/org/apache/geode/redis/internal/ByteArrayWrapper.java deleted file mode 100755 index 4a0ef59..0000000 --- a/geode-core/src/main/java/org/apache/geode/redis/internal/ByteArrayWrapper.java +++ /dev/null @@ -1,174 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.geode.redis.internal; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.Arrays; - -import org.apache.geode.DataSerializable; -import org.apache.geode.DataSerializer; - -/** - * This class is a wrapper for the any Regions that need to store a byte[]. The only data this an - * instance will store is a byte[] for the data but it is also serializable and comparable so it is - * able to be used in querying - * - * - */ -public class ByteArrayWrapper implements DataSerializable, Comparable<ByteArrayWrapper> { - /** - * Generated serialVerionUID - */ - private static final long serialVersionUID = 9066391742266642992L; - - /** - * The data portion of ValueWrapper - */ - private byte[] value; - - /** - * Hash of {@link #value}, this value is cached for performance - */ - private transient int hashCode; - - private transient String toString; - - /** - * Empty constructor for serialization - */ - public ByteArrayWrapper() {} - - /** - * Default constructor constructs a ValueWrapper and initialize the {@link #value} - * - * @param value - */ - public ByteArrayWrapper(byte[] value) { - this.value = value; - this.hashCode = Arrays.hashCode(value); - } - - @Override - public void toData(DataOutput out) throws IOException { - DataSerializer.writeByteArray(value, out); - } - - @Override - public void fromData(DataInput in) throws IOException, ClassNotFoundException { - this.value = DataSerializer.readByteArray(in);; - this.hashCode = Arrays.hashCode(this.value); - } - - @Override - public String toString() { - if (toString == null) - toString = Coder.bytesToString(this.value); - return toString; - } - - public byte[] toBytes() { - return this.value; - } - - public void setBytes(byte[] bytes) { - this.value = bytes; - this.toString = null; - this.hashCode = Arrays.hashCode(bytes); - } - - /** - * Getter for the length of the {@link #value} array - * - * @return The length of the value array - */ - public int length() { - return value.length; - } - - /** - * Hash code for byte[] wrapped by this object, the actual hashcode is determined by - * Arrays.hashCode(byte[]) - */ - @Override - public int hashCode() { - return this.hashCode; - } - - - /** - * This equals is neither symmetric and therefore not transitive, because a String with the same - * underlying bytes is considered equal. Clearly calling {@link String#equals(Object)} would not - * yield the same result - */ - @Override - public boolean equals(Object other) { - if (other instanceof ByteArrayWrapper) - return Arrays.equals(value, ((ByteArrayWrapper) other).value); - else if (other instanceof String) { - return Arrays.equals(value, Coder.stringToBytes((String) other)); - } - return false; - } - - /** - * This is a byte to byte comparator, it is not lexicographical but purely compares byte by byte - * values - */ - @Override - public int compareTo(ByteArrayWrapper other) { - return arrayCmp(value, other.value); - - } - - /** - * Private helper method to compare two byte arrays, A.compareTo(B). The comparison is basically - * numerical, for each byte index, the byte representing the greater value will be the greater - * - * @param A byte[] - * @param B byte[] - * @return 1 if A > B, -1 if B > A, 0 if A == B - */ - private int arrayCmp(byte[] A, byte[] B) { - if (A == B) - return 0; - if (A == null) { - return -1; - } else if (B == null) { - return 1; - } - - int len = Math.min(A.length, B.length); - - for (int i = 0; i < len; i++) { - byte a = A[i]; - byte b = B[i]; - int diff = a - b; - if (diff > 0) - return 1; - else if (diff < 0) - return -1; - } - - if (A.length > B.length) - return 1; - else if (B.length > A.length) - return -1; - - return 0; - } - -} http://git-wip-us.apache.org/repos/asf/geode/blob/c6dbc6d4/geode-core/src/main/java/org/apache/geode/redis/internal/ByteToCommandDecoder.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/redis/internal/ByteToCommandDecoder.java b/geode-core/src/main/java/org/apache/geode/redis/internal/ByteToCommandDecoder.java deleted file mode 100644 index 124bf75..0000000 --- a/geode-core/src/main/java/org/apache/geode/redis/internal/ByteToCommandDecoder.java +++ /dev/null @@ -1,188 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.geode.redis.internal; - -import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.ByteToMessageDecoder; - -import java.util.ArrayList; -import java.util.List; - -/** - * This is the first part of the channel pipeline for Netty. Here incoming bytes are read and a - * created {@link Command} is sent down the pipeline. It is unfortunate that this class is not - * {@link io.netty.channel.ChannelHandler.Sharable} because no state is kept in this class. State is - * kept by {@link ByteToMessageDecoder}, it may be worthwhile to look at a different decoder setup - * as to avoid allocating a decoder for every new connection. - * <p> - * The code flow of the protocol parsing may not be exactly Java like, but this is done very - * intentionally. It was found that in cases where large Redis requests are sent that end up being - * fragmented, throwing exceptions when the command could not be fully parsed took up an enormous - * amount of cpu time. The simplicity of the Redis protocol allows us to just back out and wait for - * more data, while exceptions are left to malformed requests which should never happen if using a - * proper Redis client. - * - * - */ -public class ByteToCommandDecoder extends ByteToMessageDecoder { - - /** - * Important note - * - * Do not use '' <-- java primitive chars. Redis uses {@link Coder#CHARSET} encoding so we should - * not risk java handling char to byte conversions, rather just hard code {@link Coder#CHARSET} - * chars as bytes - */ - - private static final byte rID = 13; // '\r'; - private static final byte nID = 10; // '\n'; - private static final byte bulkStringID = 36; // '$'; - private static final byte arrayID = 42; // '*'; - private static final int MAX_BULK_STRING_LENGTH = 512 * 1024 * 1024; // 512 MB - - public ByteToCommandDecoder() {} - - @Override - protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { - Command c = null; - do { - in.markReaderIndex(); - c = parse(in); - if (c == null) { - in.resetReaderIndex(); - return; - } - out.add(c); - } while (in.isReadable()); // Try to take advantage of pipelining if it is being used - } - - private Command parse(ByteBuf buffer) throws RedisCommandParserException { - if (buffer == null) - throw new NullPointerException(); - if (!buffer.isReadable()) - return null; - - byte firstB = buffer.readByte(); - if (firstB != arrayID) - throw new RedisCommandParserException( - "Expected: " + (char) arrayID + " Actual: " + (char) firstB); - ArrayList<byte[]> commandElems = new ArrayList<byte[]>(); - - if (!parseArray(commandElems, buffer)) - return null; - - return new Command(commandElems); - } - - private boolean parseArray(ArrayList<byte[]> commandElems, ByteBuf buffer) - throws RedisCommandParserException { - byte currentChar; - int arrayLength = parseCurrentNumber(buffer); - if (arrayLength == Integer.MIN_VALUE || !parseRN(buffer)) - return false; - if (arrayLength < 0 || arrayLength > 1000000000) - throw new RedisCommandParserException("invalid multibulk length"); - - for (int i = 0; i < arrayLength; i++) { - if (!buffer.isReadable()) - return false; - currentChar = buffer.readByte(); - if (currentChar == bulkStringID) { - byte[] newBulkString = parseBulkString(buffer); - if (newBulkString == null) - return false; - commandElems.add(newBulkString); - } else - throw new RedisCommandParserException( - "expected: \'$\', got \'" + (char) currentChar + "\'"); - } - return true; - } - - /** - * Helper method to parse a bulk string when one is seen - * - * @param buffer Buffer to read from - * @return byte[] representation of the Bulk String read - * @throws RedisCommandParserException Thrown when there is illegal syntax - */ - private byte[] parseBulkString(ByteBuf buffer) throws RedisCommandParserException { - int bulkStringLength = parseCurrentNumber(buffer); - if (bulkStringLength == Integer.MIN_VALUE) - return null; - if (bulkStringLength > MAX_BULK_STRING_LENGTH) - throw new RedisCommandParserException( - "invalid bulk length, cannot exceed max length of " + MAX_BULK_STRING_LENGTH); - if (!parseRN(buffer)) - return null; - - if (!buffer.isReadable(bulkStringLength)) - return null; - byte[] bulkString = new byte[bulkStringLength]; - buffer.readBytes(bulkString); - - if (!parseRN(buffer)) - return null; - - return bulkString; - } - - /** - * Helper method to parse the number at the beginning of the buffer - * - * @param buffer Buffer to read - * @return The number found at the beginning of the buffer - */ - private int parseCurrentNumber(ByteBuf buffer) { - int number = 0; - int readerIndex = buffer.readerIndex(); - byte b = 0; - while (true) { - if (!buffer.isReadable()) - return Integer.MIN_VALUE; - b = buffer.readByte(); - if (Character.isDigit(b)) { - number = number * 10 + (int) (b - '0'); - readerIndex++; - } else { - buffer.readerIndex(readerIndex); - break; - } - } - return number; - } - - /** - * Helper method that is called when the next characters are supposed to be "\r\n" - * - * @param buffer Buffer to read from - * @throws RedisCommandParserException Thrown when the next two characters are not "\r\n" - */ - private boolean parseRN(ByteBuf buffer) throws RedisCommandParserException { - if (!buffer.isReadable(2)) - return false; - byte b = buffer.readByte(); - if (b != rID) - throw new RedisCommandParserException( - "expected \'" + (char) rID + "\', got \'" + (char) b + "\'"); - b = buffer.readByte(); - if (b != nID) - throw new RedisCommandParserException( - "expected: \'" + (char) nID + "\', got \'" + (char) b + "\'"); - return true; - } - -} http://git-wip-us.apache.org/repos/asf/geode/blob/c6dbc6d4/geode-core/src/main/java/org/apache/geode/redis/internal/Command.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/redis/internal/Command.java b/geode-core/src/main/java/org/apache/geode/redis/internal/Command.java deleted file mode 100755 index 2f91218..0000000 --- a/geode-core/src/main/java/org/apache/geode/redis/internal/Command.java +++ /dev/null @@ -1,145 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.geode.redis.internal; - -import io.netty.buffer.ByteBuf; - -import java.nio.channels.SocketChannel; -import java.util.List; - -/** - * The command class is used in holding a received Redis command. Each sent command resides in an - * instance of this class. This class is designed to be used strictly by getter and setter methods. - * - * - */ -public class Command { - - private final List<byte[]> commandElems; - private final RedisCommandType commandType; - private ByteBuf response; - private String key; - private ByteArrayWrapper bytes; - - /** - * Constructor for {@link Command}. Must initialize Command with a {@link SocketChannel} and a - * {@link List} of command elements - * - * @param commandElems List of elements in command - */ - public Command(List<byte[]> commandElems) { - if (commandElems == null || commandElems.isEmpty()) - throw new IllegalArgumentException( - "List of command elements cannot be empty -> List:" + commandElems); - this.commandElems = commandElems; - this.response = null; - - RedisCommandType type; - - try { - byte[] charCommand = commandElems.get(0); - String commandName = Coder.bytesToString(charCommand).toUpperCase(); - type = RedisCommandType.valueOf(commandName); - } catch (Exception e) { - type = RedisCommandType.UNKNOWN; - } - this.commandType = type; - - } - - /** - * Used to get the command element list - * - * @return List of command elements in form of {@link List} - */ - public List<byte[]> getProcessedCommand() { - return this.commandElems; - } - - /** - * Getter method for the command type - * - * @return The command type - */ - public RedisCommandType getCommandType() { - return this.commandType; - } - - /** - * Getter method to get the response to be sent - * - * @return The response - */ - public ByteBuf getResponse() { - return response; - } - - /** - * Setter method to set the response to be sent - * - * @param response The response to be sent - */ - public void setResponse(ByteBuf response) { - this.response = response; - } - - public boolean hasError() { - if (response == null) - return false; - - if (response.getByte(0) == Coder.ERROR_ID) - return true; - - return false; - } - - /** - * Convenience method to get a String representation of the key in a Redis command, always at the - * second position in the sent command array - * - * @return Returns the second element in the parsed command list, which is always the key for - * commands indicating a key - */ - public String getStringKey() { - if (this.commandElems.size() > 1) { - if (this.bytes == null) { - this.bytes = new ByteArrayWrapper(this.commandElems.get(1)); - this.key = this.bytes.toString(); - } else if (this.key == null) - this.key = this.bytes.toString(); - return this.key; - } else - return null; - } - - public ByteArrayWrapper getKey() { - if (this.commandElems.size() > 1) { - if (this.bytes == null) - this.bytes = new ByteArrayWrapper(this.commandElems.get(1)); - return this.bytes; - } else - return null; - } - - @Override - public String toString() { - StringBuilder b = new StringBuilder(); - for (byte[] bs : this.commandElems) { - b.append(Coder.bytesToString(bs)); - b.append(' '); - } - return b.toString(); - } -} http://git-wip-us.apache.org/repos/asf/geode/blob/c6dbc6d4/geode-core/src/main/java/org/apache/geode/redis/internal/DoubleWrapper.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/redis/internal/DoubleWrapper.java b/geode-core/src/main/java/org/apache/geode/redis/internal/DoubleWrapper.java deleted file mode 100755 index 60cd130..0000000 --- a/geode-core/src/main/java/org/apache/geode/redis/internal/DoubleWrapper.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.geode.redis.internal; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -import org.apache.geode.DataSerializable; -import org.apache.geode.DataSerializer; - -/** - * This is a wrapper class for doubles, similar to {@link ByteArrayWrapper} - * - * - */ -public class DoubleWrapper implements DataSerializable, Comparable<Object> { - - private static final long serialVersionUID = 6946858357297398633L; - - public Double score; - private String toString; - - public DoubleWrapper() {} - - public DoubleWrapper(Double dubs) { - this.score = dubs; - } - - @Override - public void toData(DataOutput out) throws IOException { - DataSerializer.writeDouble(score, out); - } - - @Override - public void fromData(DataInput in) throws IOException, ClassNotFoundException { - this.score = DataSerializer.readDouble(in); - } - - @Override - public int compareTo(Object arg0) { - Double other; - if (arg0 instanceof DoubleWrapper) { - other = ((DoubleWrapper) arg0).score; - } else if (arg0 instanceof Double) { - other = (Double) arg0; - } else - return 0; - Double diff = this.score - other; - if (diff > 0) - return 1; - else if (diff < 0) - return -1; - else - return 0; - } - - public String toString() { - if (this.toString == null) - this.toString = Coder.doubleToString(score); - return this.toString; - } - -} http://git-wip-us.apache.org/repos/asf/geode/blob/c6dbc6d4/geode-core/src/main/java/org/apache/geode/redis/internal/ExecutionHandlerContext.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/redis/internal/ExecutionHandlerContext.java b/geode-core/src/main/java/org/apache/geode/redis/internal/ExecutionHandlerContext.java deleted file mode 100644 index e2b49be..0000000 --- a/geode-core/src/main/java/org/apache/geode/redis/internal/ExecutionHandlerContext.java +++ /dev/null @@ -1,392 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.geode.redis.internal; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; -import io.netty.channel.Channel; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.handler.codec.DecoderException; -import io.netty.util.concurrent.EventExecutor; - -import java.io.IOException; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.geode.LogWriter; -import org.apache.geode.cache.Cache; -import org.apache.geode.cache.CacheClosedException; -import org.apache.geode.cache.CacheTransactionManager; -import org.apache.geode.cache.RegionDestroyedException; -import org.apache.geode.cache.TransactionException; -import org.apache.geode.cache.TransactionId; -import org.apache.geode.cache.UnsupportedOperationInTransactionException; -import org.apache.geode.cache.query.QueryInvocationTargetException; -import org.apache.geode.cache.query.RegionNotFoundException; -import org.apache.geode.redis.internal.executor.transactions.TransactionExecutor; -import org.apache.geode.redis.GeodeRedisServer; - -/** - * This class extends {@link ChannelInboundHandlerAdapter} from Netty and it is the last part of the - * channel pipeline. The {@link ByteToCommandDecoder} forwards a {@link Command} to this class which - * executes it and sends the result back to the client. Additionally, all exception handling is done - * by this class. - * <p> - * Besides being part of Netty's pipeline, this class also serves as a context to the execution of a - * command. It abstracts transactions, provides access to the {@link RegionProvider} and anything - * else an executing {@link Command} may need. - * - * - */ -public class ExecutionHandlerContext extends ChannelInboundHandlerAdapter { - - private static final int WAIT_REGION_DSTRYD_MILLIS = 100; - private static final int MAXIMUM_NUM_RETRIES = (1000 * 60) / WAIT_REGION_DSTRYD_MILLIS; // 60 - // seconds - // total - - private final Cache cache; - private final GeodeRedisServer server; - private final LogWriter logger; - private final Channel channel; - private final AtomicBoolean needChannelFlush; - private final Runnable flusher; - private final EventExecutor lastExecutor; - private final ByteBufAllocator byteBufAllocator; - /** - * TransactionId for any transactions started by this client - */ - private TransactionId transactionID; - - /** - * Queue of commands for a given transaction - */ - private Queue<Command> transactionQueue; - private final RegionProvider regionProvider; - private final byte[] authPwd; - - private boolean isAuthenticated; - - /** - * Default constructor for execution contexts. - * - * @param ch Channel used by this context, should be one to one - * @param cache The Geode cache instance of this vm - * @param regionProvider The region provider of this context - * @param server Instance of the server it is attached to, only used so that any execution can - * initiate a shutdwon - * @param pwd Authentication password for each context, can be null - */ - public ExecutionHandlerContext(Channel ch, Cache cache, RegionProvider regionProvider, - GeodeRedisServer server, byte[] pwd) { - if (ch == null || cache == null || regionProvider == null || server == null) - throw new IllegalArgumentException("Only the authentication password may be null"); - this.cache = cache; - this.server = server; - this.logger = cache.getLogger(); - this.channel = ch; - this.needChannelFlush = new AtomicBoolean(false); - this.flusher = new Runnable() { - - @Override - public void run() { - flushChannel(); - } - - }; - this.lastExecutor = channel.pipeline().lastContext().executor(); - this.byteBufAllocator = channel.alloc(); - this.transactionID = null; - this.transactionQueue = null; // Lazy - this.regionProvider = regionProvider; - this.authPwd = pwd; - this.isAuthenticated = pwd != null ? false : true; - } - - private void flushChannel() { - while (needChannelFlush.getAndSet(false)) { - channel.flush(); - } - } - - private void writeToChannel(ByteBuf message) { - channel.write(message, channel.voidPromise()); - if (!needChannelFlush.getAndSet(true)) { - this.lastExecutor.execute(flusher); - } - } - - /** - * This will handle the execution of received commands - */ - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - Command command = (Command) msg; - executeCommand(ctx, command); - } - - /** - * Exception handler for the entire pipeline - */ - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - if (cause instanceof IOException) { - channelInactive(ctx); - return; - } - ByteBuf response = getExceptionResponse(ctx, cause); - writeToChannel(response); - } - - private ByteBuf getExceptionResponse(ChannelHandlerContext ctx, Throwable cause) { - ByteBuf response; - if (cause instanceof RedisDataTypeMismatchException) - response = Coder.getWrongTypeResponse(this.byteBufAllocator, cause.getMessage()); - else if (cause instanceof DecoderException - && cause.getCause() instanceof RedisCommandParserException) - response = - Coder.getErrorResponse(this.byteBufAllocator, RedisConstants.PARSING_EXCEPTION_MESSAGE); - else if (cause instanceof RegionCreationException) { - this.logger.error(cause); - response = - Coder.getErrorResponse(this.byteBufAllocator, RedisConstants.ERROR_REGION_CREATION); - } else if (cause instanceof InterruptedException || cause instanceof CacheClosedException) - response = - Coder.getErrorResponse(this.byteBufAllocator, RedisConstants.SERVER_ERROR_SHUTDOWN); - else if (cause instanceof IllegalStateException) { - response = Coder.getErrorResponse(this.byteBufAllocator, cause.getMessage()); - } else { - if (this.logger.errorEnabled()) - this.logger.error("GeodeRedisServer-Unexpected error handler for " + ctx.channel(), cause); - response = Coder.getErrorResponse(this.byteBufAllocator, RedisConstants.SERVER_ERROR_MESSAGE); - } - return response; - } - - @Override - public void channelInactive(ChannelHandlerContext ctx) { - if (logger.fineEnabled()) - logger.fine("GeodeRedisServer-Connection closing with " + ctx.channel().remoteAddress()); - ctx.channel().close(); - ctx.close(); - } - - private void executeCommand(ChannelHandlerContext ctx, Command command) throws Exception { - RedisCommandType type = command.getCommandType(); - Executor exec = type.getExecutor(); - if (isAuthenticated) { - if (type == RedisCommandType.SHUTDOWN) { - this.server.shutdown(); - return; - } - if (hasTransaction() && !(exec instanceof TransactionExecutor)) - executeWithTransaction(ctx, exec, command); - else - executeWithoutTransaction(exec, command); - - if (hasTransaction() && command.getCommandType() != RedisCommandType.MULTI) { - writeToChannel( - Coder.getSimpleStringResponse(this.byteBufAllocator, RedisConstants.COMMAND_QUEUED)); - } else { - ByteBuf response = command.getResponse(); - writeToChannel(response); - } - } else if (type == RedisCommandType.QUIT) { - exec.executeCommand(command, this); - ByteBuf response = command.getResponse(); - writeToChannel(response); - channelInactive(ctx); - } else if (type == RedisCommandType.AUTH) { - exec.executeCommand(command, this); - ByteBuf response = command.getResponse(); - writeToChannel(response); - } else { - ByteBuf r = Coder.getNoAuthResponse(this.byteBufAllocator, RedisConstants.ERROR_NOT_AUTH); - writeToChannel(r); - } - } - - /** - * Private helper method to execute a command without a transaction, done for special exception - * handling neatness - * - * @param exec Executor to use - * @param command Command to execute - * @throws Exception Throws exception if exception is from within execution and not to be handled - */ - private void executeWithoutTransaction(final Executor exec, Command command) throws Exception { - Exception cause = null; - for (int i = 0; i < MAXIMUM_NUM_RETRIES; i++) { - try { - exec.executeCommand(command, this); - return; - } catch (Exception e) { - cause = e; - if (e instanceof RegionDestroyedException || e instanceof RegionNotFoundException - || e.getCause() instanceof QueryInvocationTargetException) - Thread.sleep(WAIT_REGION_DSTRYD_MILLIS); - } - } - throw cause; - } - - private void executeWithTransaction(ChannelHandlerContext ctx, final Executor exec, - Command command) throws Exception { - CacheTransactionManager txm = cache.getCacheTransactionManager(); - TransactionId transactionId = getTransactionID(); - txm.resume(transactionId); - try { - exec.executeCommand(command, this); - } catch (UnsupportedOperationInTransactionException e) { - command.setResponse(Coder.getErrorResponse(this.byteBufAllocator, - RedisConstants.ERROR_UNSUPPORTED_OPERATION_IN_TRANSACTION)); - } catch (TransactionException e) { - command.setResponse(Coder.getErrorResponse(this.byteBufAllocator, - RedisConstants.ERROR_TRANSACTION_EXCEPTION)); - } catch (Exception e) { - ByteBuf response = getExceptionResponse(ctx, e); - command.setResponse(response); - } - getTransactionQueue().add(command); - transactionId = txm.suspend(); - setTransactionID(transactionId); - } - - /** - * Get the current transacationId - * - * @return The current transactionId, null if one doesn't exist - */ - public TransactionId getTransactionID() { - return this.transactionID; - } - - /** - * Check if client has transaction - * - * @return True if client has transaction, false otherwise - */ - public boolean hasTransaction() { - return transactionID != null; - } - - /** - * Setter method for transaction - * - * @param id TransactionId of current transaction for client - */ - public void setTransactionID(TransactionId id) { - this.transactionID = id; - } - - /** - * Reset the transaction of client - */ - public void clearTransaction() { - this.transactionID = null; - if (this.transactionQueue != null) { - for (Command c : this.transactionQueue) { - ByteBuf r = c.getResponse(); - if (r != null) - r.release(); - } - this.transactionQueue.clear(); - } - } - - /** - * Getter for transaction command queue - * - * @return Command queue - */ - public Queue<Command> getTransactionQueue() { - if (this.transactionQueue == null) - this.transactionQueue = new ConcurrentLinkedQueue<Command>(); - return this.transactionQueue; - } - - /** - * {@link ByteBuf} allocator for this context. All executors must use this pooled allocator as - * opposed to having unpooled buffers for maximum performance - * - * @return allocator instance - */ - public ByteBufAllocator getByteBufAllocator() { - return this.byteBufAllocator; - } - - /** - * Gets the provider of Regions - * - * @return Provider - */ - public RegionProvider getRegionProvider() { - return this.regionProvider; - } - - /** - * Getter for manager to allow pausing and resuming transactions - * - * @return Instance - */ - public CacheTransactionManager getCacheTransactionManager() { - return this.cache.getCacheTransactionManager(); - } - - /** - * Getter for logger - * - * @return instance - */ - public LogWriter getLogger() { - return this.cache.getLogger(); - } - - /** - * Get the channel for this context - * - * @return instance - * - * public Channel getChannel() { return this.channel; } - */ - - /** - * Get the authentication password, this will be same server wide. It is exposed here as opposed - * to {@link GeodeRedisServer}. - * - * @return password - */ - public byte[] getAuthPwd() { - return this.authPwd; - } - - /** - * Checker if user has authenticated themselves - * - * @return True if no authentication required or authentication complete, false otherwise - */ - public boolean isAuthenticated() { - return this.isAuthenticated; - } - - /** - * Lets this context know the authentication is complete - */ - public void setAuthenticationVerified() { - this.isAuthenticated = true; - } -} http://git-wip-us.apache.org/repos/asf/geode/blob/c6dbc6d4/geode-core/src/main/java/org/apache/geode/redis/internal/Executor.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/redis/internal/Executor.java b/geode-core/src/main/java/org/apache/geode/redis/internal/Executor.java deleted file mode 100755 index 7698535..0000000 --- a/geode-core/src/main/java/org/apache/geode/redis/internal/Executor.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.geode.redis.internal; - - -/** - * Interface for executors of a {@link Command} - * - * - */ -public interface Executor { - - /** - * This method executes the command and sets the response. Any runtime errors from this execution - * should be handled by caller to ensure the client gets a response - * - * @param command The command to be executed - * @param context The execution context by which this command is to be executed - */ - public void executeCommand(Command command, ExecutionHandlerContext context); - -} http://git-wip-us.apache.org/repos/asf/geode/blob/c6dbc6d4/geode-core/src/main/java/org/apache/geode/redis/internal/Extendable.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/redis/internal/Extendable.java b/geode-core/src/main/java/org/apache/geode/redis/internal/Extendable.java deleted file mode 100644 index ed36030..0000000 --- a/geode-core/src/main/java/org/apache/geode/redis/internal/Extendable.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.geode.redis.internal; - -/** - * This defines a command that can be extended, and there may need some level of abstraction - * - * - */ -public interface Extendable { - - /** - * Getter for error message in case of argument arity mismatch - * - * @return Error string - */ - public String getArgsError(); - -} http://git-wip-us.apache.org/repos/asf/geode/blob/c6dbc6d4/geode-core/src/main/java/org/apache/geode/redis/internal/RedisCommandParserException.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/redis/internal/RedisCommandParserException.java b/geode-core/src/main/java/org/apache/geode/redis/internal/RedisCommandParserException.java deleted file mode 100755 index 4ec154a..0000000 --- a/geode-core/src/main/java/org/apache/geode/redis/internal/RedisCommandParserException.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.geode.redis.internal; - -/** - * Exception thrown by CommandParser (non-existent class) when a command has illegal syntax - * - * - */ -public class RedisCommandParserException extends Exception { - - private static final long serialVersionUID = 4707944288714910949L; - - public RedisCommandParserException() { - super(); - } - - public RedisCommandParserException(String message) { - super(message); - } - - public RedisCommandParserException(Throwable cause) { - super(cause); - } - - public RedisCommandParserException(String message, Throwable cause) { - super(message, cause); - } - -}
