http://git-wip-us.apache.org/repos/asf/geode/blob/c6dbc6d4/geode-core/src/test/java/org/apache/geode/redis/RedisDistDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/redis/RedisDistDUnitTest.java b/geode-core/src/test/java/org/apache/geode/redis/RedisDistDUnitTest.java deleted file mode 100644 index d6030a9..0000000 --- a/geode-core/src/test/java/org/apache/geode/redis/RedisDistDUnitTest.java +++ /dev/null @@ -1,265 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.geode.redis; - -import static org.apache.geode.distributed.ConfigurationProperties.*; -import static org.junit.Assert.*; - -import java.util.Random; - -import org.junit.Test; -import org.junit.experimental.categories.Category; -import redis.clients.jedis.Jedis; - -import org.apache.geode.cache.CacheFactory; -import org.apache.geode.distributed.ConfigurationProperties; -import org.apache.geode.internal.AvailablePortHelper; -import org.apache.geode.internal.net.SocketCreator; -import org.apache.geode.test.dunit.AsyncInvocation; -import org.apache.geode.test.dunit.DistributedTestUtils; -import org.apache.geode.test.dunit.Host; -import org.apache.geode.test.dunit.IgnoredException; -import org.apache.geode.test.dunit.LogWriterUtils; -import org.apache.geode.test.dunit.SerializableCallable; -import org.apache.geode.test.dunit.VM; -import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase; -import org.apache.geode.test.junit.categories.DistributedTest; -import org.apache.geode.test.junit.categories.FlakyTest; - -@Category(DistributedTest.class) -public class RedisDistDUnitTest extends JUnit4DistributedTestCase { - - public static final String TEST_KEY = "key"; - public static int pushes = 200; - int redisPort = 6379; - private Host host; - private VM server1; - private VM server2; - private VM client1; - private VM client2; - - private int server1Port; - private int server2Port; - - private String localHost; - - private static final int JEDIS_TIMEOUT = 20 * 1000; - - private abstract class ClientTestBase extends SerializableCallable { - - int port; - - protected ClientTestBase(int port) { - this.port = port; - } - } - - @Override - public final void postSetUp() throws Exception { - disconnectAllFromDS(); - - localHost = SocketCreator.getLocalHost().getHostName(); - - host = Host.getHost(0); - server1 = host.getVM(0); - server2 = host.getVM(1); - client1 = host.getVM(2); - client2 = host.getVM(3); - final int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(2); - final int locatorPort = DistributedTestUtils.getDUnitLocatorPort(); - final SerializableCallable<Object> startRedisAdapter = new SerializableCallable<Object>() { - - @Override - public Object call() throws Exception { - int port = ports[VM.getCurrentVMNum()]; - CacheFactory cF = new CacheFactory(); - String locator = SocketCreator.getLocalHost().getHostName() + "[" + locatorPort + "]"; - cF.set(LOG_LEVEL, LogWriterUtils.getDUnitLogLevel()); - cF.set(ConfigurationProperties.REDIS_BIND_ADDRESS, localHost); - cF.set(ConfigurationProperties.REDIS_PORT, "" + port); - cF.set(MCAST_PORT, "0"); - cF.set(LOCATORS, locator); - cF.create(); - return Integer.valueOf(port); - } - }; - AsyncInvocation i = server1.invokeAsync(startRedisAdapter); - server2Port = (Integer) server2.invoke(startRedisAdapter); - server1Port = (Integer) i.getResult(); - } - - @Override - public final void preTearDown() throws Exception { - disconnectAllFromDS(); - } - - @Category(FlakyTest.class) // GEODE-1092: random ports, failure stack involves TCPTransport - // ConnectionHandler (are we eating BindExceptions somewhere?), uses - // Random, async actions - @Test - public void testConcListOps() throws Exception { - final Jedis jedis1 = new Jedis(localHost, server1Port, JEDIS_TIMEOUT); - final Jedis jedis2 = new Jedis(localHost, server2Port, JEDIS_TIMEOUT); - final int pushes = 20; - class ConcListOps extends ClientTestBase { - protected ConcListOps(int port) { - super(port); - } - - @Override - public Object call() throws Exception { - Jedis jedis = new Jedis(localHost, port, JEDIS_TIMEOUT); - Random r = new Random(); - for (int i = 0; i < pushes; i++) { - if (r.nextBoolean()) { - jedis.lpush(TEST_KEY, randString()); - } else { - jedis.rpush(TEST_KEY, randString()); - } - } - return null; - } - }; - - AsyncInvocation i = client1.invokeAsync(new ConcListOps(server1Port)); - client2.invoke(new ConcListOps(server2Port)); - i.getResult(); - long expected = 2 * pushes; - long result1 = jedis1.llen(TEST_KEY); - long result2 = jedis2.llen(TEST_KEY); - assertEquals(expected, result1); - assertEquals(result1, result2); - } - - @Category(FlakyTest.class) // GEODE-717: random ports, BindException in failure stack, async - // actions - @Test - public void testConcCreateDestroy() throws Exception { - IgnoredException.addIgnoredException("RegionDestroyedException"); - IgnoredException.addIgnoredException("IndexInvalidException"); - final int ops = 40; - final String hKey = TEST_KEY + "hash"; - final String lKey = TEST_KEY + "list"; - final String zKey = TEST_KEY + "zset"; - final String sKey = TEST_KEY + "set"; - - class ConcCreateDestroy extends ClientTestBase { - protected ConcCreateDestroy(int port) { - super(port); - } - - @Override - public Object call() throws Exception { - Jedis jedis = new Jedis(localHost, port, JEDIS_TIMEOUT); - Random r = new Random(); - for (int i = 0; i < ops; i++) { - int n = r.nextInt(4); - if (n == 0) { - if (r.nextBoolean()) { - jedis.hset(hKey, randString(), randString()); - } else { - jedis.del(hKey); - } - } else if (n == 1) { - if (r.nextBoolean()) { - jedis.lpush(lKey, randString()); - } else { - jedis.del(lKey); - } - } else if (n == 2) { - if (r.nextBoolean()) { - jedis.zadd(zKey, r.nextDouble(), randString()); - } else { - jedis.del(zKey); - } - } else { - if (r.nextBoolean()) { - jedis.sadd(sKey, randString()); - } else { - jedis.del(sKey); - } - } - } - return null; - } - } - - // Expect to run with no exception - AsyncInvocation i = client1.invokeAsync(new ConcCreateDestroy(server1Port)); - client2.invoke(new ConcCreateDestroy(server2Port)); - i.getResult(); - } - - /** - * Just make sure there are no unexpected server crashes - */ - @Category(FlakyTest.class) // GEODE-1697 - @Test - public void testConcOps() throws Exception { - - final int ops = 100; - final String hKey = TEST_KEY + "hash"; - final String lKey = TEST_KEY + "list"; - final String zKey = TEST_KEY + "zset"; - final String sKey = TEST_KEY + "set"; - - class ConcOps extends ClientTestBase { - - protected ConcOps(int port) { - super(port); - } - - @Override - public Object call() throws Exception { - Jedis jedis = new Jedis(localHost, port, JEDIS_TIMEOUT); - Random r = new Random(); - for (int i = 0; i < ops; i++) { - int n = r.nextInt(4); - if (n == 0) { - jedis.hset(hKey, randString(), randString()); - jedis.hgetAll(hKey); - jedis.hvals(hKey); - } else if (n == 1) { - jedis.lpush(lKey, randString()); - jedis.rpush(lKey, randString()); - jedis.ltrim(lKey, 0, 100); - jedis.lrange(lKey, 0, -1); - } else if (n == 2) { - jedis.zadd(zKey, r.nextDouble(), randString()); - jedis.zrangeByLex(zKey, "(a", "[z"); - jedis.zrangeByScoreWithScores(zKey, 0, 1, 0, 100); - jedis.zremrangeByScore(zKey, r.nextDouble(), r.nextDouble()); - } else { - jedis.sadd(sKey, randString()); - jedis.smembers(sKey); - jedis.sdiff(sKey, "afd"); - jedis.sunionstore("dst", sKey, "afds"); - } - } - return null; - } - } - - // Expect to run with no exception - AsyncInvocation i = client1.invokeAsync(new ConcOps(server1Port)); - client2.invoke(new ConcOps(server2Port)); - i.getResult(); - } - - private String randString() { - return Long.toHexString(Double.doubleToLongBits(Math.random())); - } - -}
http://git-wip-us.apache.org/repos/asf/geode/blob/c6dbc6d4/geode-core/src/test/java/org/apache/geode/redis/RedisServerTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/redis/RedisServerTest.java b/geode-core/src/test/java/org/apache/geode/redis/RedisServerTest.java deleted file mode 100644 index 81e639d..0000000 --- a/geode-core/src/test/java/org/apache/geode/redis/RedisServerTest.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.geode.redis; - -import org.apache.geode.cache.Cache; -import org.apache.geode.cache.CacheFactory; -import org.apache.geode.cache.DataPolicy; -import org.apache.geode.cache.Region; -import org.apache.geode.test.junit.categories.IntegrationTest; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import java.util.Properties; - -@Category(IntegrationTest.class) -public class RedisServerTest { - - Cache cache; - GeodeRedisServer redisServer; - - @Before - public void createCache() { - Properties props = new Properties(); - props.setProperty("mcast-port", "0"); - CacheFactory cacheFactory = new CacheFactory(props); - cache = cacheFactory.create(); - } - - @After - public void teardown() { - if (redisServer != null) { - redisServer.shutdown(); - cache.close(); - } - } - - @Test - public void initializeRedisCreatesThreeRegions() { - createCache(); - assert (cache.rootRegions().size() == 0); - redisServer = new GeodeRedisServer(0); - redisServer.start(); - assert cache.rootRegions().size() == 2 : cache.rootRegions().size(); - assert cache.getRegion(GeodeRedisServer.REDIS_META_DATA_REGION) != null; - } - - @Test - public void initializeRedisCreatesPartitionedRegionByDefault() { - createCache(); - assert (cache.rootRegions().size() == 0); - redisServer = new GeodeRedisServer(0); - redisServer.start(); - Region r = cache.getRegion(GeodeRedisServer.STRING_REGION); - assert r.getAttributes().getDataPolicy() == DataPolicy.PARTITION : r.getAttributes() - .getDataPolicy(); - } - - @Test - public void initializeRedisCreatesRegionsUsingSystemProperty() { - createCache(); - assert (cache.rootRegions().size() == 0); - System.setProperty("gemfireredis.regiontype", "REPLICATE"); - redisServer = new GeodeRedisServer(0); - redisServer.start(); - Region r = cache.getRegion(GeodeRedisServer.STRING_REGION); - assert r.getAttributes().getDataPolicy() == DataPolicy.REPLICATE : r.getAttributes() - .getDataPolicy(); - System.setProperty("gemfireredis.regiontype", ""); - } -} http://git-wip-us.apache.org/repos/asf/geode/blob/c6dbc6d4/geode-core/src/test/java/org/apache/geode/redis/SetsJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/redis/SetsJUnitTest.java b/geode-core/src/test/java/org/apache/geode/redis/SetsJUnitTest.java deleted file mode 100755 index aaa7724..0000000 --- a/geode-core/src/test/java/org/apache/geode/redis/SetsJUnitTest.java +++ /dev/null @@ -1,257 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.geode.redis; - -import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS; -import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL; -import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import org.apache.geode.cache.CacheFactory; -import org.apache.geode.cache.GemFireCache; -import org.apache.geode.internal.AvailablePortHelper; -import org.apache.geode.test.junit.categories.IntegrationTest; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.Random; -import java.util.Set; -import redis.clients.jedis.Jedis; - -@Category(IntegrationTest.class) -public class SetsJUnitTest { - - private static Jedis jedis; - private static GeodeRedisServer server; - private static GemFireCache cache; - private static Random rand; - private static int port = 6379; - - @BeforeClass - public static void setUp() throws IOException { - rand = new Random(); - CacheFactory cf = new CacheFactory(); - // cf.set("log-file", "redis.log"); - cf.set(LOG_LEVEL, "error"); - cf.set(MCAST_PORT, "0"); - cf.set(LOCATORS, ""); - cache = cf.create(); - port = AvailablePortHelper.getRandomAvailableTCPPort(); - server = new GeodeRedisServer("localhost", port); - - server.start(); - jedis = new Jedis("localhost", port, 10000000); - } - - @Test - public void testSAddScard() { - int elements = 10; - Set<String> strings = new HashSet<String>(); - String key = randString(); - for (int i = 0; i < elements; i++) { - String elem = randString(); - strings.add(elem); - } - String[] stringArray = strings.toArray(new String[strings.size()]); - Long response = jedis.sadd(key, stringArray); - assertEquals(response, new Long(strings.size())); - - assertEquals(jedis.scard(key), new Long(strings.size())); - } - - @Test - public void testSMembersIsMember() { - int elements = 10; - Set<String> strings = new HashSet<String>(); - String key = randString(); - for (int i = 0; i < elements; i++) { - String elem = randString(); - strings.add(elem); - } - String[] stringArray = strings.toArray(new String[strings.size()]); - jedis.sadd(key, stringArray); - - Set<String> returnedSet = jedis.smembers(key); - - assertEquals(returnedSet, new HashSet<String>(strings)); - - for (String entry : strings) { - boolean exists = jedis.sismember(key, entry); - assertTrue(exists); - } - } - - @Test - public void testSMove() { - String source = randString(); - String dest = randString(); - String test = randString(); - int elements = 10; - Set<String> strings = new HashSet<String>(); - for (int i = 0; i < elements; i++) { - String elem = randString(); - strings.add(elem); - } - String[] stringArray = strings.toArray(new String[strings.size()]); - jedis.sadd(source, stringArray); - - long i = 1; - for (String entry : strings) { - assertTrue(jedis.smove(source, dest, entry) == 1); - assertTrue(jedis.sismember(dest, entry)); - assertTrue(jedis.scard(source) == strings.size() - i); - assertTrue(jedis.scard(dest) == i); - i++; - } - - assertTrue(jedis.smove(test, dest, randString()) == 0); - } - - @Test - public void testSDiffAndStore() { - int numSets = 3; - int elements = 10; - String[] keys = new String[numSets]; - ArrayList<Set<String>> sets = new ArrayList<Set<String>>(); - for (int j = 0; j < numSets; j++) { - keys[j] = randString(); - Set<String> newSet = new HashSet<String>(); - for (int i = 0; i < elements; i++) - newSet.add(randString()); - sets.add(newSet); - } - - for (int i = 0; i < numSets; i++) { - Set<String> s = sets.get(i); - String[] stringArray = s.toArray(new String[s.size()]); - jedis.sadd(keys[i], stringArray); - } - - Set<String> result = sets.get(0); - for (int i = 1; i < numSets; i++) - result.removeAll(sets.get(i)); - - assertEquals(result, jedis.sdiff(keys)); - - String destination = randString(); - - jedis.sdiffstore(destination, keys); - - Set<String> destResult = jedis.smembers(destination); - - assertEquals(result, destResult); - - } - - @Test - public void testSUnionAndStore() { - int numSets = 3; - int elements = 10; - String[] keys = new String[numSets]; - ArrayList<Set<String>> sets = new ArrayList<Set<String>>(); - for (int j = 0; j < numSets; j++) { - keys[j] = randString(); - Set<String> newSet = new HashSet<String>(); - for (int i = 0; i < elements; i++) - newSet.add(randString()); - sets.add(newSet); - } - - for (int i = 0; i < numSets; i++) { - Set<String> s = sets.get(i); - String[] stringArray = s.toArray(new String[s.size()]); - jedis.sadd(keys[i], stringArray); - } - - Set<String> result = sets.get(0); - for (int i = 1; i < numSets; i++) - result.addAll(sets.get(i)); - - assertEquals(result, jedis.sunion(keys)); - - String destination = randString(); - - jedis.sunionstore(destination, keys); - - Set<String> destResult = jedis.smembers(destination); - - assertEquals(result, destResult); - - } - - @Test - public void testSInterAndStore() { - int numSets = 3; - int elements = 10; - String[] keys = new String[numSets]; - ArrayList<Set<String>> sets = new ArrayList<Set<String>>(); - for (int j = 0; j < numSets; j++) { - keys[j] = randString(); - Set<String> newSet = new HashSet<String>(); - for (int i = 0; i < elements; i++) - newSet.add(randString()); - sets.add(newSet); - } - - for (int i = 0; i < numSets; i++) { - Set<String> s = sets.get(i); - String[] stringArray = s.toArray(new String[s.size()]); - jedis.sadd(keys[i], stringArray); - } - - Set<String> result = sets.get(0); - for (int i = 1; i < numSets; i++) - result.retainAll(sets.get(i)); - - assertEquals(result, jedis.sinter(keys)); - - String destination = randString(); - - jedis.sinterstore(destination, keys); - - Set<String> destResult = jedis.smembers(destination); - - assertEquals(result, destResult); - - } - - private String randString() { - int length = rand.nextInt(8) + 5; - StringBuilder rString = new StringBuilder(); - for (int i = 0; i < length; i++) - rString.append((char) (rand.nextInt(57) + 65)); - return rString.toString(); - // return Long.toHexString(Double.doubleToLongBits(Math.random())); - } - - @After - public void flushAll() { - jedis.flushAll(); - } - - @AfterClass - public static void tearDown() { - jedis.close(); - cache.close(); - server.shutdown(); - } -} http://git-wip-us.apache.org/repos/asf/geode/blob/c6dbc6d4/geode-core/src/test/java/org/apache/geode/redis/SortedSetsJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/redis/SortedSetsJUnitTest.java b/geode-core/src/test/java/org/apache/geode/redis/SortedSetsJUnitTest.java deleted file mode 100755 index 633cb3d..0000000 --- a/geode-core/src/test/java/org/apache/geode/redis/SortedSetsJUnitTest.java +++ /dev/null @@ -1,420 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.geode.redis; - -import org.apache.geode.cache.CacheFactory; -import org.apache.geode.cache.GemFireCache; -import org.apache.geode.internal.AvailablePortHelper; -import org.apache.geode.test.junit.categories.IntegrationTest; - -import org.apache.geode.redis.GeodeRedisServer; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import redis.clients.jedis.Jedis; -import redis.clients.jedis.Tuple; - -import java.io.IOException; -import java.util.*; -import java.util.Map.Entry; - -import static org.apache.geode.distributed.ConfigurationProperties.*; -import static org.junit.Assert.*; - -@Category(IntegrationTest.class) -public class SortedSetsJUnitTest { - private static Jedis jedis; - private static GeodeRedisServer server; - private static GemFireCache cache; - private static Random rand; - private static int port = 6379; - - @BeforeClass - public static void setUp() throws IOException { - rand = new Random(); - CacheFactory cf = new CacheFactory(); - // cf.set("log-file", "redis.log"); - cf.set(LOG_LEVEL, "error"); - cf.set(MCAST_PORT, "0"); - cf.set(LOCATORS, ""); - cache = cf.create(); - port = AvailablePortHelper.getRandomAvailableTCPPort(); - server = new GeodeRedisServer("localhost", port); - - server.start(); - jedis = new Jedis("localhost", port, 10000000); - } - - @Test - public void testZAddZRange() { - int numMembers = 10; - String key = randString(); - Map<String, Double> scoreMembers = new HashMap<String, Double>(); - - for (int i = 0; i < numMembers; i++) - scoreMembers.put(randString(), rand.nextDouble()); - - jedis.zadd(key, scoreMembers); - int k = 0; - for (String entry : scoreMembers.keySet()) - assertNotNull(jedis.zscore(key, entry)); - - Set<Tuple> results = jedis.zrangeWithScores(key, 0, -1); - Map<String, Double> resultMap = new HashMap<String, Double>(); - for (Tuple t : results) { - resultMap.put(t.getElement(), t.getScore()); - } - - assertEquals(scoreMembers, resultMap); - - for (int i = 0; i < 10; i++) { - int start; - int stop; - do { - start = rand.nextInt(numMembers); - stop = rand.nextInt(numMembers); - } while (start > stop); - results = jedis.zrangeWithScores(key, start, stop); - List<Entry<String, Double>> resultList = new ArrayList<Entry<String, Double>>(); - for (Tuple t : results) - resultList.add(new AbstractMap.SimpleEntry<String, Double>(t.getElement(), t.getScore())); - List<Entry<String, Double>> list = - new ArrayList<Entry<String, Double>>(scoreMembers.entrySet()); - Collections.sort(list, new EntryCmp()); - list = list.subList(start, stop + 1); - assertEquals(list, resultList); - } - } - - @Test - public void testZRevRange() { - int numMembers = 10; - String key = randString(); - - Map<String, Double> scoreMembers = new HashMap<String, Double>(); - - for (int i = 0; i < numMembers; i++) - scoreMembers.put(randString(), rand.nextDouble()); - - jedis.zadd(key, scoreMembers); - - Set<Tuple> results; - - for (int i = 0; i < 10; i++) { - int start; - int stop; - do { - start = rand.nextInt(numMembers); - stop = rand.nextInt(numMembers); - } while (start > stop); - results = jedis.zrevrangeWithScores(key, start, stop); - List<Entry<String, Double>> resultList = new ArrayList<Entry<String, Double>>(); - for (Tuple t : results) - resultList.add(new AbstractMap.SimpleEntry<String, Double>(t.getElement(), t.getScore())); - List<Entry<String, Double>> list = - new ArrayList<Entry<String, Double>>(scoreMembers.entrySet()); - Collections.sort(list, new EntryRevCmp()); - list = list.subList(start, stop + 1); - assertEquals(list, resultList); - } - } - - @Test - public void testZCount() { - int num = 10; - int runs = 2; - for (int i = 0; i < runs; i++) { - Double min; - Double max; - do { - min = rand.nextDouble(); - max = rand.nextDouble(); - } while (min > max); - - - int count = 0; - - String key = randString(); - Map<String, Double> scoreMembers = new HashMap<String, Double>(); - - for (int j = 0; j < num; j++) { - Double nextDouble = rand.nextDouble(); - if (nextDouble >= min && nextDouble <= max) - count++; - scoreMembers.put(randString(), nextDouble); - } - - jedis.zadd(key, scoreMembers); - Long countResult = jedis.zcount(key, min, max); - assertTrue(count == countResult); - } - - } - - @Test - public void testZIncrBy() { - String key = randString(); - String member = randString(); - Double score = 0.0; - for (int i = 0; i < 20; i++) { - Double incr = rand.nextDouble(); - Double result = jedis.zincrby(key, incr, member); - score += incr; - assertEquals(score, result, 1.0 / 100000000.0); - } - - - jedis.zincrby(key, Double.MAX_VALUE, member); - Double infResult = jedis.zincrby(key, Double.MAX_VALUE, member); - - - assertEquals(infResult, Double.valueOf(Double.POSITIVE_INFINITY)); - } - - public void testZRangeByScore() { - Double min; - Double max; - for (int j = 0; j < 2; j++) { - do { - min = rand.nextDouble(); - max = rand.nextDouble(); - } while (min > max); - int numMembers = 500; - String key = randString(); - Map<String, Double> scoreMembers = new HashMap<String, Double>(); - List<Entry<String, Double>> expected = new ArrayList<Entry<String, Double>>(); - for (int i = 0; i < numMembers; i++) { - String s = randString(); - Double d = rand.nextDouble(); - scoreMembers.put(s, d); - if (d > min && d < max) - expected.add(new AbstractMap.SimpleEntry<String, Double>(s, d)); - } - jedis.zadd(key, scoreMembers); - Set<Tuple> results = jedis.zrangeByScoreWithScores(key, min, max); - List<Entry<String, Double>> resultList = new ArrayList<Entry<String, Double>>(); - for (Tuple t : results) - resultList.add(new AbstractMap.SimpleEntry<String, Double>(t.getElement(), t.getScore())); - Collections.sort(expected, new EntryCmp()); - - assertEquals(expected, resultList); - jedis.del(key); - } - } - - public void testZRevRangeByScore() { - Double min; - Double max; - for (int j = 0; j < 2; j++) { - do { - min = rand.nextDouble(); - max = rand.nextDouble(); - } while (min > max); - int numMembers = 500; - String key = randString(); - Map<String, Double> scoreMembers = new HashMap<String, Double>(); - List<Entry<String, Double>> expected = new ArrayList<Entry<String, Double>>(); - for (int i = 0; i < numMembers; i++) { - String s = randString(); - Double d = rand.nextDouble(); - scoreMembers.put(s, d); - if (d > min && d < max) - expected.add(new AbstractMap.SimpleEntry<String, Double>(s, d)); - } - jedis.zadd(key, scoreMembers); - Set<Tuple> results = jedis.zrevrangeByScoreWithScores(key, max, min); - List<Entry<String, Double>> resultList = new ArrayList<Entry<String, Double>>(); - for (Tuple t : results) - resultList.add(new AbstractMap.SimpleEntry<String, Double>(t.getElement(), t.getScore())); - Collections.sort(expected, new EntryRevCmp()); - - assertEquals(expected, resultList); - jedis.del(key); - } - } - - @Test - public void testZRemZScore() { - Double min; - Double max; - for (int j = 0; j < 2; j++) { - do { - min = rand.nextDouble(); - max = rand.nextDouble(); - } while (min > max); - int numMembers = 5000; - String key = randString(); - Map<String, Double> scoreMembers = new HashMap<String, Double>(); - List<Entry<String, Double>> expected = new ArrayList<Entry<String, Double>>(); - for (int i = 0; i < numMembers; i++) { - String s = randString(); - Double d = rand.nextDouble(); - scoreMembers.put(s, d); - if (d > min && d < max) - expected.add(new AbstractMap.SimpleEntry<String, Double>(s, d)); - } - jedis.zadd(key, scoreMembers); - Collections.sort(expected, new EntryCmp()); - for (int i = expected.size(); i > 0; i--) { - Entry<String, Double> remEntry = expected.remove(i - 1); - String rem = remEntry.getKey(); - Double val = remEntry.getValue(); - assertEquals(val, jedis.zscore(key, rem)); - - assertTrue(jedis.zrem(key, rem) == 1); - } - String s = randString(); - if (!expected.contains(s)) - assertTrue(jedis.zrem(key, s) == 0); - jedis.del(key); - } - } - - @Test - public void testZRank() { - for (int j = 0; j < 2; j++) { - int numMembers = 10; - String key = randString(); - Map<String, Double> scoreMembers = new HashMap<String, Double>(); - List<Entry<String, Double>> expected = new ArrayList<Entry<String, Double>>(); - for (int i = 0; i < numMembers; i++) { - String s = randString(); - Double d = rand.nextDouble(); - scoreMembers.put(s, d); - expected.add(new AbstractMap.SimpleEntry<String, Double>(s, d)); - } - Collections.sort(expected, new EntryCmp()); - jedis.zadd(key, scoreMembers); - for (int i = 0; i < expected.size(); i++) { - Entry<String, Double> en = expected.get(i); - String field = en.getKey(); - Long rank = jedis.zrank(key, field); - assertEquals(new Long(i), rank); - } - String field = randString(); - if (!expected.contains(field)) - assertNull(jedis.zrank(key, field)); - jedis.del(key); - } - } - - @Test - public void testZRevRank() { - for (int j = 0; j < 2; j++) { - int numMembers = 10; - String key = randString(); - Map<String, Double> scoreMembers = new HashMap<String, Double>(); - List<Entry<String, Double>> expected = new ArrayList<Entry<String, Double>>(); - for (int i = 0; i < numMembers; i++) { - String s = randString(); - Double d = rand.nextDouble(); - scoreMembers.put(s, d); - expected.add(new AbstractMap.SimpleEntry<String, Double>(s, d)); - } - Collections.sort(expected, new EntryRevCmp()); - jedis.zadd(key, scoreMembers); - for (int i = 0; i < expected.size(); i++) { - Entry<String, Double> en = expected.get(i); - String field = en.getKey(); - Long rank = jedis.zrevrank(key, field); - assertEquals(new Long(i), rank); - } - String field = randString(); - if (!expected.contains(field)) - assertNull(jedis.zrank(key, field)); - jedis.del(key); - } - } - - private class EntryCmp implements Comparator<Entry<String, Double>> { - - @Override - public int compare(Entry<String, Double> o1, Entry<String, Double> o2) { - Double diff = o1.getValue() - o2.getValue(); - if (diff == 0) - return o2.getKey().compareTo(o1.getKey()); - else - return diff > 0 ? 1 : -1; - } - - } - - private class EntryRevCmp implements Comparator<Entry<String, Double>> { - - @Override - public int compare(Entry<String, Double> o1, Entry<String, Double> o2) { - Double diff = o2.getValue() - o1.getValue(); - if (diff == 0) - return o1.getKey().compareTo(o2.getKey()); - else - return diff > 0 ? 1 : -1; - } - - } - - @Test - public void testZRemRangeByScore() { - Double min; - Double max; - for (int j = 0; j < 3; j++) { - do { - min = rand.nextDouble(); - max = rand.nextDouble(); - } while (min > max); - int numMembers = 10; - String key = randString(); - Map<String, Double> scoreMembers = new HashMap<String, Double>(); - List<Entry<String, Double>> fullList = new ArrayList<Entry<String, Double>>(); - List<Entry<String, Double>> toRemoveList = new ArrayList<Entry<String, Double>>(); - for (int i = 0; i < numMembers; i++) { - String s = randString(); - Double d = rand.nextDouble(); - scoreMembers.put(s, d); - fullList.add(new AbstractMap.SimpleEntry<String, Double>(s, d)); - if (d > min && d < max) - toRemoveList.add(new AbstractMap.SimpleEntry<String, Double>(s, d)); - } - jedis.zadd(key, scoreMembers); - Long numRemoved = jedis.zremrangeByScore(key, min, max); - List<Entry<String, Double>> expectedList = new ArrayList<Entry<String, Double>>(fullList); - expectedList.removeAll(toRemoveList); - Collections.sort(expectedList, new EntryCmp()); - Set<Tuple> result = jedis.zrangeWithScores(key, 0, -1); - List<Entry<String, Double>> resultList = new ArrayList<Entry<String, Double>>(); - for (Tuple t : result) - resultList.add(new AbstractMap.SimpleEntry<String, Double>(t.getElement(), t.getScore())); - assertEquals(expectedList, resultList); - jedis.del(key); - } - } - - private String randString() { - return Long.toHexString(Double.doubleToLongBits(Math.random())); - } - - @After - public void flushAll() { - jedis.flushAll(); - } - - @AfterClass - public static void tearDown() { - jedis.close(); - cache.close(); - server.shutdown(); - } -} http://git-wip-us.apache.org/repos/asf/geode/blob/c6dbc6d4/geode-core/src/test/java/org/apache/geode/redis/StringsJunitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/redis/StringsJunitTest.java b/geode-core/src/test/java/org/apache/geode/redis/StringsJunitTest.java deleted file mode 100755 index 6d11a71..0000000 --- a/geode-core/src/test/java/org/apache/geode/redis/StringsJunitTest.java +++ /dev/null @@ -1,304 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.geode.redis; - -import org.apache.geode.cache.CacheFactory; -import org.apache.geode.cache.GemFireCache; -import org.apache.geode.internal.AvailablePortHelper; -import org.apache.geode.test.junit.categories.IntegrationTest; - -import org.apache.geode.redis.GeodeRedisServer; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import redis.clients.jedis.Jedis; - -import java.io.IOException; -import java.util.*; - -import static org.apache.geode.distributed.ConfigurationProperties.*; -import static org.junit.Assert.*; - -@Category(IntegrationTest.class) -public class StringsJunitTest { - - private static Jedis jedis; - private static GeodeRedisServer server; - private static GemFireCache cache; - private static Random rand; - private static int port = 6379; - - @BeforeClass - public static void setUp() throws IOException { - rand = new Random(); - CacheFactory cf = new CacheFactory(); - // cf.set("log-file", "redis.log"); - cf.set(LOG_LEVEL, "error"); - cf.set(MCAST_PORT, "0"); - cf.set(LOCATORS, ""); - cache = cf.create(); - port = AvailablePortHelper.getRandomAvailableTCPPort(); - server = new GeodeRedisServer("localhost", port); - - server.start(); - jedis = new Jedis("localhost", port, 10000000); - } - - @Test - public void testAppendAndStrlen() { - String key = randString(); - int len = key.length(); - String full = key; - jedis.set(key, key); - for (int i = 0; i < 15; i++) { - String rand = randString(); - jedis.append(key, rand); - len += rand.length(); - full += rand; - } - String ret = jedis.get(key); - assertTrue(ret.length() == len); - assertTrue(full.equals(ret)); - assertTrue(full.length() == jedis.strlen(key)); - } - - @Test - public void testDecr() { - String key1 = randString(); - String key2 = randString(); - String key3 = randString(); - int num1 = 100; - int num2 = -100; - jedis.set(key1, "" + num1); - // jedis.set(key3, "-100"); - jedis.set(key2, "" + num2); - - jedis.decr(key1); - jedis.decr(key3); - jedis.decr(key2); - assertTrue(jedis.get(key1).equals("" + (num1 - 1))); - assertTrue(jedis.get(key2).equals("" + (num2 - 1))); - assertTrue(jedis.get(key3).equals("" + (-1))); - } - - @Test - public void testIncr() { - String key1 = randString(); - String key2 = randString(); - String key3 = randString(); - int num1 = 100; - int num2 = -100; - jedis.set(key1, "" + num1); - // jedis.set(key3, "-100"); - jedis.set(key2, "" + num2); - - jedis.incr(key1); - jedis.incr(key3); - jedis.incr(key2); - - assertTrue(jedis.get(key1).equals("" + (num1 + 1))); - assertTrue(jedis.get(key2).equals("" + (num2 + 1))); - assertTrue(jedis.get(key3).equals("" + (+1))); - } - - @Test - public void testDecrBy() { - String key1 = randString(); - String key2 = randString(); - String key3 = randString(); - int decr1 = rand.nextInt(100); - int decr2 = rand.nextInt(100); - Long decr3 = Long.MAX_VALUE / 2; - int num1 = 100; - int num2 = -100; - jedis.set(key1, "" + num1); - jedis.set(key2, "" + num2); - jedis.set(key3, "" + Long.MIN_VALUE); - - jedis.decrBy(key1, decr1); - jedis.decrBy(key2, decr2); - - assertTrue(jedis.get(key1).equals("" + (num1 - decr1 * 1))); - assertTrue(jedis.get(key2).equals("" + (num2 - decr2 * 1))); - - Exception ex = null; - try { - jedis.decrBy(key3, decr3); - } catch (Exception e) { - ex = e; - } - assertNotNull(ex); - - } - - @Test - public void testIncrBy() { - String key1 = randString(); - String key2 = randString(); - String key3 = randString(); - int incr1 = rand.nextInt(100); - int incr2 = rand.nextInt(100); - Long incr3 = Long.MAX_VALUE / 2; - int num1 = 100; - int num2 = -100; - jedis.set(key1, "" + num1); - jedis.set(key2, "" + num2); - jedis.set(key3, "" + Long.MAX_VALUE); - - jedis.incrBy(key1, incr1); - jedis.incrBy(key2, incr2); - assertTrue(jedis.get(key1).equals("" + (num1 + incr1 * 1))); - assertTrue(jedis.get(key2).equals("" + (num2 + incr2 * 1))); - - Exception ex = null; - try { - jedis.incrBy(key3, incr3); - } catch (Exception e) { - ex = e; - } - assertNotNull(ex); - } - - @Test - public void testGetRange() { - String sent = randString(); - String contents = randString(); - jedis.set(sent, contents); - for (int i = 0; i < sent.length(); i++) { - String range = jedis.getrange(sent, i, -1); - assertTrue(contents.substring(i).equals(range)); - } - assertNull(jedis.getrange(sent, 2, 0)); - } - - @Test - public void testGetSet() { - String key = randString(); - String contents = randString(); - jedis.set(key, contents); - String newContents = randString(); - String oldContents = jedis.getSet(key, newContents); - assertTrue(oldContents.equals(contents)); - contents = newContents; - } - - @Test - public void testMSetAndGet() { - int r = 5; - String[] keyvals = new String[(r * 2)]; - String[] keys = new String[r]; - String[] vals = new String[r]; - for (int i = 0; i < r; i++) { - String key = randString(); - String val = randString(); - keyvals[2 * i] = key; - keyvals[2 * i + 1] = val; - keys[i] = key; - vals[i] = val; - } - - jedis.mset(keyvals); - - List<String> ret = jedis.mget(keys); - Object[] retArray = ret.toArray(); - - assertTrue(Arrays.equals(vals, retArray)); - } - - @Test - public void testMSetNX() { - Set<String> strings = new HashSet<String>(); - for (int i = 0; i < 2 * 5; i++) - strings.add(randString()); - String[] array = strings.toArray(new String[0]); - long response = jedis.msetnx(array); - - assertTrue(response == 1); - - long response2 = jedis.msetnx(array[0], randString()); - - assertTrue(response2 == 0); - assertEquals(array[1], jedis.get(array[0])); - } - - @Test - public void testSetNX() { - String key1 = randString(); - String key2; - do { - key2 = randString(); - } while (key2.equals(key1)); - - long response1 = jedis.setnx(key1, key1); - long response2 = jedis.setnx(key2, key2); - long response3 = jedis.setnx(key1, key2); - - assertTrue(response1 == 1); - assertTrue(response2 == 1); - assertTrue(response3 == 0); - } - - @Test - public void testPAndSetex() { - Random r = new Random(); - int setex = r.nextInt(5); - if (setex == 0) - setex = 1; - String key = randString(); - jedis.setex(key, setex, randString()); - try { - Thread.sleep((setex + 5) * 1000); - } catch (InterruptedException e) { - return; - } - String result = jedis.get(key); - // System.out.println(result); - assertNull(result); - - int psetex = r.nextInt(5000); - if (psetex == 0) - psetex = 1; - key = randString(); - jedis.psetex(key, psetex, randString()); - long start = System.currentTimeMillis(); - try { - Thread.sleep(psetex + 5000); - } catch (InterruptedException e) { - return; - } - long stop = System.currentTimeMillis(); - result = jedis.get(key); - assertTrue(stop - start >= psetex); - assertNull(result); - } - - private String randString() { - return Long.toHexString(Double.doubleToLongBits(Math.random())); - } - - @After - public void flushAll() { - jedis.flushAll(); - } - - @AfterClass - public static void tearDown() { - jedis.close(); - cache.close(); - server.shutdown(); - } -} http://git-wip-us.apache.org/repos/asf/geode/blob/c6dbc6d4/geode-redis/build.gradle ---------------------------------------------------------------------- diff --git a/geode-redis/build.gradle b/geode-redis/build.gradle new file mode 100644 index 0000000..b769007 --- /dev/null +++ b/geode-redis/build.gradle @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +dependencies { + compile project(':geode-core') + compile project(':geode-common') + compile('io.netty:netty-all:' + project.'netty-all.version') { + ext.optional = true + } + + testCompile project(':geode-junit') + testCompile 'redis.clients:jedis:' + project.'jedis.version' + + testCompile files(project(':geode-core').sourceSets.test.output) +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/geode/blob/c6dbc6d4/geode-redis/src/main/java/org/apache/geode/redis/GeodeRedisServiceImpl.java ---------------------------------------------------------------------- diff --git a/geode-redis/src/main/java/org/apache/geode/redis/GeodeRedisServiceImpl.java b/geode-redis/src/main/java/org/apache/geode/redis/GeodeRedisServiceImpl.java new file mode 100644 index 0000000..6503766 --- /dev/null +++ b/geode-redis/src/main/java/org/apache/geode/redis/GeodeRedisServiceImpl.java @@ -0,0 +1,597 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.redis; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.ServerChannel; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.oio.OioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.channel.socket.oio.OioServerSocketChannel; +import io.netty.util.concurrent.Future; +import org.apache.geode.InternalGemFireError; +import org.apache.geode.cache.AttributesFactory; +import org.apache.geode.cache.Cache; +import org.apache.geode.cache.DataPolicy; +import org.apache.geode.cache.EntryEvent; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionDestroyedException; +import org.apache.geode.cache.RegionFactory; +import org.apache.geode.cache.RegionShortcut; +import org.apache.geode.cache.util.CacheListenerAdapter; +import org.apache.geode.distributed.internal.DistributionConfig; +import org.apache.geode.distributed.internal.InternalDistributedSystem; +import org.apache.geode.internal.cache.CacheService; +import org.apache.geode.internal.cache.GemFireCacheImpl; +import org.apache.geode.internal.cache.InternalRegionArguments; +import org.apache.geode.internal.i18n.LocalizedStrings; +import org.apache.geode.internal.logging.LogService; +import org.apache.geode.internal.net.SocketCreator; +import org.apache.geode.management.internal.beans.CacheServiceMBeanBase; +import org.apache.geode.redis.internal.ByteArrayWrapper; +import org.apache.geode.redis.internal.ByteToCommandDecoder; +import org.apache.geode.redis.internal.Coder; +import org.apache.geode.redis.internal.ExecutionHandlerContext; +import org.apache.geode.redis.internal.RedisDataType; +import org.apache.geode.redis.internal.RegionProvider; +import org.apache.geode.redis.internal.hll.HyperLogLogPlus; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.util.Collection; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * The GeodeRedisServiceImpl is a server that understands the Redis protocol. As commands are sent + * to the server, each command is picked up by a thread, interpreted and then executed and a + * response is sent back to the client. The default connection port is 6379 but that can be altered + * when run through GFSH or started through the provided static main class. + * <p> + * Each Redis data type instance is stored in a separate {@link Region} except for the Strings and + * HyperLogLogs which are collectively stored in one Region respectively. That Region along with a + * meta data region used internally are protected so the client may not store keys with the name + * {@link GeodeRedisServiceImpl#REDIS_META_DATA_REGION} or + * {@link GeodeRedisServiceImpl#STRING_REGION}. The default Region type is + * {@link RegionShortcut#PARTITION} although this can be changed by specifying the SystemProperty + * {@value #DEFAULT_REGION_SYS_PROP_NAME} to a type defined by {@link RegionShortcut}. If the + * {@link GeodeRedisServiceImpl#NUM_THREADS_SYS_PROP_NAME} system property is set to 0, one thread + * per client will be created. Otherwise a worker thread pool of specified size is used or a default + * size of 4 * {@link Runtime#availableProcessors()} if the property is not set. + * <p> + * Setting the AUTH password requires setting the property "redis-password" just as "redis-port" + * would be in xml or through GFSH. + * <p> + * The supported commands are as follows: + * <p> + * Supported String commands - APPEND, BITCOUNT, BITOP, BITPOS, DECR, DECRBY, GET, GETBIT, GETRANGE, + * GETSET, INCR, INCRBY, INCRBYFLOAT, MGET, MSET, MSETNX, PSETEX, SET, SETBIT, SETEX, SETNX, STRLEN + * <p> + * Supported List commands - LINDEX, LLEN, LPOP, LPUSH, LPUSHX, LRANGE, LREM, LSET, LTRIM, RPOP, + * RPUSH, RPUSHX + * <p> + * Supported Hash commands - HDEL, HEXISTS, HGET, HGETALL, HINCRBY, HINCRBYFLOAT, HKEYS, HMGET, + * HMSET, HSETNX, HLEN, HSCAN, HSET, HVALS + * <p> + * Supported Set commands - SADD, SCARD, SDIFF, SDIFFSTORE, SINTER, SINTERSTORE, SISMEMBER, + * SMEMBERS, SMOVE, SREM, SPOP, SRANDMEMBER, SCAN, SUNION, SUNIONSTORE + * <p> + * Supported SortedSet commands - ZADD, ZCARD, ZCOUNT, ZINCRBY, ZLEXCOUNT, ZRANGE, ZRANGEBYLEX, + * ZRANGEBYSCORE, ZRANK, ZREM, ZREMRANGEBYLEX, ZREMRANGEBYRANK, ZREMRANGEBYSCORE, ZREVRANGE, + * ZREVRANGEBYSCORE, ZREVRANK, ZSCAN, ZSCORE + * <p> + * Supported HyperLogLog commands - PFADD, PFCOUNT, PFMERGE + * <p> + * Supported Keys commands - DEL, DBSIZE, EXISTS, EXPIRE, EXPIREAT, FLUSHDB, FLUSHALL, KEYS, + * PERSIST, PEXPIRE, PEXPIREAT, PTTL, SCAN, TTL + * <p> + * Supported Transaction commands - DISCARD, EXEC, MULTI + * <P> + * Supported Server commands - AUTH, ECHO, PING, TIME, QUIT + * <p> + * <p> + * The command executors are not explicitly documented but the functionality can be found at + * <a href="http://redis.io/commands">Redis Commands</a> + * <p> + * Exceptions to the Redis Commands Documents: + * <p> + * <ul> + * <li>Any command that removes keys and returns a count of removed entries will not return a total + * remove count but rather a count of how many entries have been removed that existed on the local + * vm, though all entries will be removed</li> + * <li>Any command that returns a count of newly set members has an unspecified return value. The + * command will work just as the Redis protocol states but the count will not necessary reflect the + * number set compared to overridden.</li> + * <li>Transactions work just as they would on a Redis instance, they are local transaction. + * Transactions cannot be executed on data that is not local to the executing server, that is on a + * partitioned region in a different server instance or on a persistent region that does not have + * transactions enabled. Also, you cannot watch or unwatch keys as all keys within a GemFire + * transaction are watched by default.</li> + * </ul> + */ + +public class GeodeRedisServiceImpl implements GeodeRedisService { + + /** + * The default Redis port as specified by their protocol, {@value #DEFAULT_REDIS_SERVER_PORT} + */ + public static final int DEFAULT_REDIS_SERVER_PORT = 6379; + + /** + * The number of threads that will work on handling requests + */ + private int numWorkerThreads; + + /** + * The number of threads that will work socket selectors + */ + private final int numSelectorThreads = 1; + private final int numExpirationThreads = 1; + + /** + * The actual port being used by the server + */ + private int redisPort; + + /** + * The address to bind to + */ + private String redisBindAddress; + + /** + * Connection timeout in milliseconds + */ + private static final int connectTimeoutMillis = 1000; + + /** + * Temporary constant whether to use old single thread per connection model for worker group + */ + private boolean singleThreadPerConnection; + + /** + * The cache instance pointer on this vm + */ + private Cache cache; + + /** + * Channel to be closed when shutting down + */ + private Channel serverChannel; + + /** + * Gem logwriter + */ + private Logger logger = null; + + private RegionProvider regionProvider; + private MetaCacheListener metaListener; + private EventLoopGroup bossGroup; + private EventLoopGroup workerGroup; + private ScheduledExecutorService expirationExecutor; + + /** + * Map of futures to be executed for key expirations + */ + private ConcurrentMap<ByteArrayWrapper, ScheduledFuture<?>> expirationFutures; + + + /** + * The field that defines the name of the {@link Region} which holds all of the strings. The + * current value of this field is {@value #STRING_REGION}. + */ + public static final String STRING_REGION = "ReDiS_StRiNgS"; + + /** + * The field that defines the name of the {@link Region} which holds all of the HyperLogLogs. The + * current value of this field is {@value #HLL_REGION}. + */ + public static final String HLL_REGION = "ReDiS_HlL"; + + /** + * The field that defines the name of the {@link Region} which holds all of the Redis meta data. + * The current value of this field is {@value #REDIS_META_DATA_REGION}. + */ + public static final String REDIS_META_DATA_REGION = "__ReDiS_MeTa_DaTa"; + + /** + * The system property name used to set the default {@link Region} creation type. The property + * name is {@value #DEFAULT_REGION_SYS_PROP_NAME} and the acceptable values are types defined by + * {@link RegionShortcut}, i.e. "PARTITION" would be used for {@link RegionShortcut#PARTITION}. + */ + public static final String DEFAULT_REGION_SYS_PROP_NAME = "gemfireredis.regiontype"; + + /** + * System property name that can be used to set the number of threads to be used by the + * GeodeRedisServiceImpl + */ + public static final String NUM_THREADS_SYS_PROP_NAME = "gemfireredis.numthreads"; + + /** + * The actual {@link RegionShortcut} type specified by the system property + * {@value #DEFAULT_REGION_SYS_PROP_NAME}. + */ + private RegionShortcut DEFAULT_REGION_TYPE; + + private boolean shutdown = false; + private boolean started = false; + + /** + * Determine the {@link RegionShortcut} type from a String value. If the String value doesn't map + * to a RegionShortcut type then {@link RegionShortcut#PARTITION} will be used by default. + * + * @return {@link RegionShortcut} + */ + private RegionShortcut setRegionType() { + String regionType = System.getProperty(DEFAULT_REGION_SYS_PROP_NAME, "PARTITION"); + RegionShortcut type; + try { + type = RegionShortcut.valueOf(regionType); + } catch (Exception e) { + type = RegionShortcut.PARTITION; + } + return type; + } + + /** + * Helper method to set the number of worker threads + * + * @return If the System property {@value #NUM_THREADS_SYS_PROP_NAME} is set then that number is + * used, otherwise 4 * # of cores + */ + private int setNumWorkerThreads() { + String prop = System.getProperty(NUM_THREADS_SYS_PROP_NAME); + int numCores = Runtime.getRuntime().availableProcessors(); + int def = 4 * numCores; + if (prop == null || prop.isEmpty()) { + return def; + } + int threads; + try { + threads = Integer.parseInt(prop); + } catch (NumberFormatException e) { + return def; + } + return threads; + } + + public GeodeRedisServiceImpl() {} + + /** + * Helper method to get the host name to bind to + * + * @return The InetAddress to bind to + */ + private InetAddress getRedisBindAddress() throws UnknownHostException { + return this.redisBindAddress == null || this.redisBindAddress.isEmpty() + ? SocketCreator.getLocalHost() : InetAddress.getByName(this.redisBindAddress); + } + + /** + * This is function to call on a {@link GeodeRedisServiceImpl} instance to start it running + */ + @Override + public synchronized void start() { + if (!started) { + try { + initializeRedis(); + startRedisServer(); + } catch (IOException | InterruptedException e) { + throw new RuntimeException("Could not start Server", e); + } + started = true; + } + } + + private void initializeRedis() { + initializeRedisServiceInternals(); + Region<ByteArrayWrapper, ByteArrayWrapper> stringsRegion; + + Region<ByteArrayWrapper, HyperLogLogPlus> hLLRegion; + Region<String, RedisDataType> redisMetaData; + GemFireCacheImpl gemFireCache = (GemFireCacheImpl) cache; + try { + if ((stringsRegion = cache.getRegion(STRING_REGION)) == null) { + RegionFactory<ByteArrayWrapper, ByteArrayWrapper> regionFactory = + gemFireCache.createRegionFactory(this.DEFAULT_REGION_TYPE); + stringsRegion = regionFactory.create(STRING_REGION); + } + if ((hLLRegion = cache.getRegion(HLL_REGION)) == null) { + RegionFactory<ByteArrayWrapper, HyperLogLogPlus> regionFactory = + gemFireCache.createRegionFactory(this.DEFAULT_REGION_TYPE); + hLLRegion = regionFactory.create(HLL_REGION); + } + if ((redisMetaData = cache.getRegion(REDIS_META_DATA_REGION)) == null) { + AttributesFactory af = new AttributesFactory(); + af.addCacheListener(metaListener); + af.setDataPolicy(DataPolicy.REPLICATE); + InternalRegionArguments ira = + new InternalRegionArguments().setInternalRegion(true).setIsUsedForMetaRegion(true); + redisMetaData = gemFireCache.createVMRegion(REDIS_META_DATA_REGION, af.create(), ira); + } + } catch (IOException | ClassNotFoundException e) { + // only if loading snapshot, not here + InternalGemFireError assErr = new InternalGemFireError( + LocalizedStrings.GemFireCache_UNEXPECTED_EXCEPTION.toLocalizedString()); + assErr.initCause(e); + throw assErr; + } + this.regionProvider = new RegionProvider(stringsRegion, hLLRegion, redisMetaData, + expirationFutures, expirationExecutor, this.DEFAULT_REGION_TYPE); + redisMetaData.put(REDIS_META_DATA_REGION, RedisDataType.REDIS_PROTECTED); + redisMetaData.put(HLL_REGION, RedisDataType.REDIS_PROTECTED); + redisMetaData.put(STRING_REGION, RedisDataType.REDIS_PROTECTED); + checkForRegions(); + } + + private void initializeRedisServiceInternals() { + this.DEFAULT_REGION_TYPE = setRegionType(); + this.numWorkerThreads = setNumWorkerThreads(); + if (this.numWorkerThreads == 0) { + this.singleThreadPerConnection = true; + } + this.metaListener = new MetaCacheListener(); + this.expirationFutures = new ConcurrentHashMap<>(); + this.expirationExecutor = + Executors.newScheduledThreadPool(numExpirationThreads, new ThreadFactory() { + private final AtomicInteger counter = new AtomicInteger(); + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r); + t.setName("GemFireRedis-ScheduledExecutor-" + counter.incrementAndGet()); + t.setDaemon(true); + return t; + } + + }); + } + + private void checkForRegions() { + Collection<Entry<String, RedisDataType>> entrySet = this.regionProvider.metaEntrySet(); + for (Entry<String, RedisDataType> entry : entrySet) { + String regionName = entry.getKey(); + RedisDataType type = entry.getValue(); + Region<?, ?> newRegion = cache.getRegion(regionName); + if (newRegion == null && type != RedisDataType.REDIS_STRING && type != RedisDataType.REDIS_HLL + && type != RedisDataType.REDIS_PROTECTED) { + try { + this.regionProvider + .createRemoteRegionReferenceLocally(Coder.stringToByteArrayWrapper(regionName), type); + } catch (Exception e) { + if (logger.isErrorEnabled()) { + logger.error(e); + } + } + } + } + } + + /** + * Helper method to start the server listening for connections. The server is bound to the port + * specified by {@link GeodeRedisServiceImpl#redisPort} + */ + private void startRedisServer() throws IOException, InterruptedException { + ThreadFactory selectorThreadFactory = new ThreadFactory() { + private final AtomicInteger counter = new AtomicInteger(); + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r); + t.setName("GeodeRedisServiceImpl-SelectorThread-" + counter.incrementAndGet()); + t.setDaemon(true); + return t; + } + + }; + + ThreadFactory workerThreadFactory = new ThreadFactory() { + private final AtomicInteger counter = new AtomicInteger(); + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r); + t.setName("GeodeRedisServiceImpl-WorkerThread-" + counter.incrementAndGet()); + return t; + } + + }; + + bossGroup = null; + workerGroup = null; + Class<? extends ServerChannel> socketClass = null; + if (singleThreadPerConnection) { + bossGroup = new OioEventLoopGroup(Integer.MAX_VALUE, selectorThreadFactory); + workerGroup = new OioEventLoopGroup(Integer.MAX_VALUE, workerThreadFactory); + socketClass = OioServerSocketChannel.class; + } else { + bossGroup = new NioEventLoopGroup(this.numSelectorThreads, selectorThreadFactory); + workerGroup = new NioEventLoopGroup(this.numWorkerThreads, workerThreadFactory); + socketClass = NioServerSocketChannel.class; + } + InternalDistributedSystem system = (InternalDistributedSystem) cache.getDistributedSystem(); + String pwd = system.getConfig().getRedisPassword(); + final byte[] pwdB = Coder.stringToBytes(pwd); + ServerBootstrap b = new ServerBootstrap(); + b.group(bossGroup, workerGroup).channel(socketClass) + .childHandler(new ChannelInitializer<SocketChannel>() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + if (logger.isDebugEnabled()) { + logger + .debug("GeodeRedisServiceImpl-Connection established with " + ch.remoteAddress()); + } + ChannelPipeline p = ch.pipeline(); + p.addLast(ByteToCommandDecoder.class.getSimpleName(), new ByteToCommandDecoder()); + p.addLast(ExecutionHandlerContext.class.getSimpleName(), new ExecutionHandlerContext(ch, + cache, regionProvider, GeodeRedisServiceImpl.this, pwdB)); + } + }).option(ChannelOption.SO_REUSEADDR, true).option(ChannelOption.SO_RCVBUF, getBufferSize()) + .childOption(ChannelOption.SO_KEEPALIVE, true) + .childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, + GeodeRedisServiceImpl.connectTimeoutMillis) + .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); + + // Bind and start to accept incoming connections. + ChannelFuture f = b.bind(new InetSocketAddress(getRedisBindAddress(), redisPort)).sync(); + if (this.logger.isInfoEnabled()) { + String logMessage = "GeodeRedisServiceImpl started {" + getRedisBindAddress() + ":" + + redisPort + "}, Selector threads: " + this.numSelectorThreads; + if (this.singleThreadPerConnection) { + logMessage += ", One worker thread per connection"; + } else { + logMessage += ", Worker threads: " + this.numWorkerThreads; + } + this.logger.info(logMessage); + } + this.serverChannel = f.channel(); + } + + /** + * Takes an entry event and processes it. If the entry denotes that a + * {@link RedisDataType#REDIS_LIST} or {@link RedisDataType#REDIS_SORTEDSET} was created then this + * function will call the necessary calls to create the parameterized queries for those keys. + * + * @param event EntryEvent from meta data region + */ + private void afterKeyCreate(EntryEvent<String, RedisDataType> event) { + if (event.isOriginRemote()) { + final String key = (String) event.getKey(); + final RedisDataType value = event.getNewValue(); + if (value != RedisDataType.REDIS_STRING && value != RedisDataType.REDIS_HLL + && value != RedisDataType.REDIS_PROTECTED) { + try { + this.regionProvider + .createRemoteRegionReferenceLocally(Coder.stringToByteArrayWrapper(key), value); + } catch (RegionDestroyedException ignore) { // Region already destroyed, ignore + } + } + } + } + + /** + * When a key is removed then this function will make sure the associated queries with the key are + * also removed from each vm to avoid unnecessary data retention + */ + private void afterKeyDestroy(EntryEvent<String, RedisDataType> event) { + if (event.isOriginRemote()) { + final String key = (String) event.getKey(); + final RedisDataType value = event.getOldValue(); + if (value != null && value != RedisDataType.REDIS_STRING && value != RedisDataType.REDIS_HLL + && value != RedisDataType.REDIS_PROTECTED) { + ByteArrayWrapper kW = Coder.stringToByteArrayWrapper(key); + Region<?, ?> r = this.regionProvider.getRegion(kW); + if (r != null) { + this.regionProvider.removeRegionReferenceLocally(kW, value); + } + } + } + } + + @Override + public void init(Cache cache) { + this.cache = cache; + logger = LogService.getLogger(); + + InternalDistributedSystem internalDistributedSystem = ((GemFireCacheImpl) cache).getSystem(); + DistributionConfig internalDistributedSystemConfig = internalDistributedSystem.getConfig(); + this.redisPort = internalDistributedSystemConfig.getRedisPort(); + if (redisPort <= 0) { // unset + this.redisPort = DEFAULT_REDIS_SERVER_PORT; + } + this.redisBindAddress = internalDistributedSystemConfig.getRedisBindAddress(); + } + + @Override + public Class<? extends CacheService> getInterface() { + return GeodeRedisService.class; + } + + @Override + public CacheServiceMBeanBase getMBean() { + // TODO This needs to be implemented in the GEODE-2449 + throw new RuntimeException("This still needs to be implemented"); + } + + private final class MetaCacheListener extends CacheListenerAdapter<String, RedisDataType> { + + @Override + public void afterCreate(EntryEvent<String, RedisDataType> event) { + afterKeyCreate(event); + } + + @Override + public void afterDestroy(EntryEvent<String, RedisDataType> event) { + afterKeyDestroy(event); + } + } + + /** + * Helper method to get GemFire set socket buffer size, possibly a default of 32k + * + * @return Buffer size to use for server + */ + private int getBufferSize() { + InternalDistributedSystem system = (InternalDistributedSystem) cache.getDistributedSystem(); + return system.getConfig().getSocketBufferSize(); + } + + /** + * Shutdown method for {@link GeodeRedisServiceImpl}. This closes the {@link Cache}, interrupts + * all execution and forcefully closes all connections. + */ + @Override + public void stop() { + if (!shutdown) { + if (logger.isInfoEnabled()) { + logger.info("GeodeRedisServiceImpl shutting down"); + } + + Future<?> workerGroupFuture = workerGroup.shutdownGracefully(0,1, TimeUnit.SECONDS); + Future<?> bossGroupFuture = bossGroup.shutdownGracefully(0,1,TimeUnit.SECONDS); + ChannelFuture closeFuture = this.serverChannel.close(); + + // We are likely brought here by a channel read reading a shutdown message, in which case calling + // await or sync can cause a deadlock. +// workerGroupFuture.syncUninterruptibly(); +// bossGroupFuture.syncUninterruptibly(); + this.regionProvider.close(); + for (ScheduledFuture<?> f : this.expirationFutures.values()) { + f.cancel(true); + } + this.expirationFutures.clear(); + this.expirationExecutor.shutdownNow(); +// closeFuture.syncUninterruptibly(); + shutdown = true; + } + } +} http://git-wip-us.apache.org/repos/asf/geode/blob/c6dbc6d4/geode-redis/src/main/java/org/apache/geode/redis/internal/ByteArrayWrapper.java ---------------------------------------------------------------------- diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/ByteArrayWrapper.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/ByteArrayWrapper.java new file mode 100644 index 0000000..5b895e8 --- /dev/null +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/ByteArrayWrapper.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.redis.internal; + +import org.apache.geode.DataSerializable; +import org.apache.geode.DataSerializer; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Arrays; + +/** + * This class is a wrapper for the any Regions that need to store a byte[]. The only data this an + * instance will store is a byte[] for the data but it is also serializable and comparable so it is + * able to be used in querying + * + * + */ +public class ByteArrayWrapper implements DataSerializable, Comparable<ByteArrayWrapper> { + /** + * Generated serialVerionUID + */ + private static final long serialVersionUID = 9066391742266642992L; + + /** + * The data portion of ValueWrapper + */ + private byte[] value; + + /** + * Hash of {@link #value}, this value is cached for performance + */ + private transient int hashCode; + + private transient String toString; + + /** + * Empty constructor for serialization + */ + public ByteArrayWrapper() {} + + /** + * Default constructor constructs a ValueWrapper and initialize the {@link #value} + * + * @param value + */ + public ByteArrayWrapper(byte[] value) { + this.value = value; + this.hashCode = Arrays.hashCode(value); + } + + @Override + public void toData(DataOutput out) throws IOException { + DataSerializer.writeByteArray(value, out); + } + + @Override + public void fromData(DataInput in) throws IOException, ClassNotFoundException { + this.value = DataSerializer.readByteArray(in);; + this.hashCode = Arrays.hashCode(this.value); + } + + @Override + public String toString() { + if (toString == null) + toString = Coder.bytesToString(this.value); + return toString; + } + + public byte[] toBytes() { + return this.value; + } + + public void setBytes(byte[] bytes) { + this.value = bytes; + this.toString = null; + this.hashCode = Arrays.hashCode(bytes); + } + + /** + * Getter for the length of the {@link #value} array + * + * @return The length of the value array + */ + public int length() { + return value.length; + } + + /** + * Hash code for byte[] wrapped by this object, the actual hashcode is determined by + * Arrays.hashCode(byte[]) + */ + @Override + public int hashCode() { + return this.hashCode; + } + + + /** + * This equals is neither symmetric and therefore not transitive, because a String with the same + * underlying bytes is considered equal. Clearly calling {@link String#equals(Object)} would not + * yield the same result + */ + @Override + public boolean equals(Object other) { + if (other instanceof ByteArrayWrapper) + return Arrays.equals(value, ((ByteArrayWrapper) other).value); + else if (other instanceof String) { + return Arrays.equals(value, Coder.stringToBytes((String) other)); + } + return false; + } + + /** + * This is a byte to byte comparator, it is not lexicographical but purely compares byte by byte + * values + */ + @Override + public int compareTo(ByteArrayWrapper other) { + return arrayCmp(value, other.value); + + } + + /** + * Private helper method to compare two byte arrays, A.compareTo(B). The comparison is basically + * numerical, for each byte index, the byte representing the greater value will be the greater + * + * @param A byte[] + * @param B byte[] + * @return 1 if A > B, -1 if B > A, 0 if A == B + */ + private int arrayCmp(byte[] A, byte[] B) { + if (A == B) + return 0; + if (A == null) { + return -1; + } else if (B == null) { + return 1; + } + + int len = Math.min(A.length, B.length); + + for (int i = 0; i < len; i++) { + byte a = A[i]; + byte b = B[i]; + int diff = a - b; + if (diff > 0) + return 1; + else if (diff < 0) + return -1; + } + + if (A.length > B.length) + return 1; + else if (B.length > A.length) + return -1; + + return 0; + } + +} http://git-wip-us.apache.org/repos/asf/geode/blob/c6dbc6d4/geode-redis/src/main/java/org/apache/geode/redis/internal/ByteToCommandDecoder.java ---------------------------------------------------------------------- diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/ByteToCommandDecoder.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/ByteToCommandDecoder.java new file mode 100644 index 0000000..4f66a47 --- /dev/null +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/ByteToCommandDecoder.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.redis.internal; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; + +import java.util.ArrayList; +import java.util.List; + +/** + * This is the first part of the channel pipeline for Netty. Here incoming bytes are read and a + * created {@link Command} is sent down the pipeline. It is unfortunate that this class is not + * {@link Sharable} because no state is kept in this class. State is kept by + * {@link ByteToMessageDecoder}, it may be worthwhile to look at a different decoder setup as to + * avoid allocating a decoder for every new connection. + * <p> + * The code flow of the protocol parsing may not be exactly Java like, but this is done very + * intentionally. It was found that in cases where large Redis requests are sent that end up being + * fragmented, throwing exceptions when the command could not be fully parsed took up an enormous + * amount of cpu time. The simplicity of the Redis protocol allows us to just back out and wait for + * more data, while exceptions are left to malformed requests which should never happen if using a + * proper Redis client. + * + * + */ +public class ByteToCommandDecoder extends ByteToMessageDecoder { + + /** + * Important note + * + * Do not use '' <-- java primitive chars. Redis uses {@link Coder#CHARSET} encoding so we should + * not risk java handling char to byte conversions, rather just hard code {@link Coder#CHARSET} + * chars as bytes + */ + + private static final byte rID = 13; // '\r'; + private static final byte nID = 10; // '\n'; + private static final byte bulkStringID = 36; // '$'; + private static final byte arrayID = 42; // '*'; + private static final int MAX_BULK_STRING_LENGTH = 512 * 1024 * 1024; // 512 MB + + public ByteToCommandDecoder() {} + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { + Command c = null; + do { + in.markReaderIndex(); + c = parse(in); + if (c == null) { + in.resetReaderIndex(); + return; + } + out.add(c); + } while (in.isReadable()); // Try to take advantage of pipelining if it is being used + } + + private Command parse(ByteBuf buffer) throws RedisCommandParserException { + if (buffer == null) + throw new NullPointerException(); + if (!buffer.isReadable()) + return null; + + byte firstB = buffer.readByte(); + if (firstB != arrayID) + throw new RedisCommandParserException( + "Expected: " + (char) arrayID + " Actual: " + (char) firstB); + ArrayList<byte[]> commandElems = new ArrayList<byte[]>(); + + if (!parseArray(commandElems, buffer)) + return null; + + return new Command(commandElems); + } + + private boolean parseArray(ArrayList<byte[]> commandElems, ByteBuf buffer) + throws RedisCommandParserException { + byte currentChar; + int arrayLength = parseCurrentNumber(buffer); + if (arrayLength == Integer.MIN_VALUE || !parseRN(buffer)) + return false; + if (arrayLength < 0 || arrayLength > 1000000000) + throw new RedisCommandParserException("invalid multibulk length"); + + for (int i = 0; i < arrayLength; i++) { + if (!buffer.isReadable()) + return false; + currentChar = buffer.readByte(); + if (currentChar == bulkStringID) { + byte[] newBulkString = parseBulkString(buffer); + if (newBulkString == null) + return false; + commandElems.add(newBulkString); + } else + throw new RedisCommandParserException( + "expected: \'$\', got \'" + (char) currentChar + "\'"); + } + return true; + } + + /** + * Helper method to parse a bulk string when one is seen + * + * @param buffer Buffer to read from + * @return byte[] representation of the Bulk String read + * @throws RedisCommandParserException Thrown when there is illegal syntax + */ + private byte[] parseBulkString(ByteBuf buffer) throws RedisCommandParserException { + int bulkStringLength = parseCurrentNumber(buffer); + if (bulkStringLength == Integer.MIN_VALUE) + return null; + if (bulkStringLength > MAX_BULK_STRING_LENGTH) + throw new RedisCommandParserException( + "invalid bulk length, cannot exceed max length of " + MAX_BULK_STRING_LENGTH); + if (!parseRN(buffer)) + return null; + + if (!buffer.isReadable(bulkStringLength)) + return null; + byte[] bulkString = new byte[bulkStringLength]; + buffer.readBytes(bulkString); + + if (!parseRN(buffer)) + return null; + + return bulkString; + } + + /** + * Helper method to parse the number at the beginning of the buffer + * + * @param buffer Buffer to read + * @return The number found at the beginning of the buffer + */ + private int parseCurrentNumber(ByteBuf buffer) { + int number = 0; + int readerIndex = buffer.readerIndex(); + byte b = 0; + while (true) { + if (!buffer.isReadable()) + return Integer.MIN_VALUE; + b = buffer.readByte(); + if (Character.isDigit(b)) { + number = number * 10 + (int) (b - '0'); + readerIndex++; + } else { + buffer.readerIndex(readerIndex); + break; + } + } + return number; + } + + /** + * Helper method that is called when the next characters are supposed to be "\r\n" + * + * @param buffer Buffer to read from + * @throws RedisCommandParserException Thrown when the next two characters are not "\r\n" + */ + private boolean parseRN(ByteBuf buffer) throws RedisCommandParserException { + if (!buffer.isReadable(2)) + return false; + byte b = buffer.readByte(); + if (b != rID) + throw new RedisCommandParserException( + "expected \'" + (char) rID + "\', got \'" + (char) b + "\'"); + b = buffer.readByte(); + if (b != nID) + throw new RedisCommandParserException( + "expected: \'" + (char) nID + "\', got \'" + (char) b + "\'"); + return true; + } + +} http://git-wip-us.apache.org/repos/asf/geode/blob/c6dbc6d4/geode-redis/src/main/java/org/apache/geode/redis/internal/Command.java ---------------------------------------------------------------------- diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/Command.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/Command.java new file mode 100644 index 0000000..2f91218 --- /dev/null +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/Command.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.redis.internal; + +import io.netty.buffer.ByteBuf; + +import java.nio.channels.SocketChannel; +import java.util.List; + +/** + * The command class is used in holding a received Redis command. Each sent command resides in an + * instance of this class. This class is designed to be used strictly by getter and setter methods. + * + * + */ +public class Command { + + private final List<byte[]> commandElems; + private final RedisCommandType commandType; + private ByteBuf response; + private String key; + private ByteArrayWrapper bytes; + + /** + * Constructor for {@link Command}. Must initialize Command with a {@link SocketChannel} and a + * {@link List} of command elements + * + * @param commandElems List of elements in command + */ + public Command(List<byte[]> commandElems) { + if (commandElems == null || commandElems.isEmpty()) + throw new IllegalArgumentException( + "List of command elements cannot be empty -> List:" + commandElems); + this.commandElems = commandElems; + this.response = null; + + RedisCommandType type; + + try { + byte[] charCommand = commandElems.get(0); + String commandName = Coder.bytesToString(charCommand).toUpperCase(); + type = RedisCommandType.valueOf(commandName); + } catch (Exception e) { + type = RedisCommandType.UNKNOWN; + } + this.commandType = type; + + } + + /** + * Used to get the command element list + * + * @return List of command elements in form of {@link List} + */ + public List<byte[]> getProcessedCommand() { + return this.commandElems; + } + + /** + * Getter method for the command type + * + * @return The command type + */ + public RedisCommandType getCommandType() { + return this.commandType; + } + + /** + * Getter method to get the response to be sent + * + * @return The response + */ + public ByteBuf getResponse() { + return response; + } + + /** + * Setter method to set the response to be sent + * + * @param response The response to be sent + */ + public void setResponse(ByteBuf response) { + this.response = response; + } + + public boolean hasError() { + if (response == null) + return false; + + if (response.getByte(0) == Coder.ERROR_ID) + return true; + + return false; + } + + /** + * Convenience method to get a String representation of the key in a Redis command, always at the + * second position in the sent command array + * + * @return Returns the second element in the parsed command list, which is always the key for + * commands indicating a key + */ + public String getStringKey() { + if (this.commandElems.size() > 1) { + if (this.bytes == null) { + this.bytes = new ByteArrayWrapper(this.commandElems.get(1)); + this.key = this.bytes.toString(); + } else if (this.key == null) + this.key = this.bytes.toString(); + return this.key; + } else + return null; + } + + public ByteArrayWrapper getKey() { + if (this.commandElems.size() > 1) { + if (this.bytes == null) + this.bytes = new ByteArrayWrapper(this.commandElems.get(1)); + return this.bytes; + } else + return null; + } + + @Override + public String toString() { + StringBuilder b = new StringBuilder(); + for (byte[] bs : this.commandElems) { + b.append(Coder.bytesToString(bs)); + b.append(' '); + } + return b.toString(); + } +}
