http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c5efb805/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/HAInterestTestCase.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/HAInterestTestCase.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/HAInterestTestCase.java new file mode 100755 index 0000000..481863c --- /dev/null +++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/HAInterestTestCase.java @@ -0,0 +1,1018 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache.tier.sockets; + +import com.gemstone.gemfire.cache.AttributesFactory; +import com.gemstone.gemfire.cache.Cache; +import com.gemstone.gemfire.cache.CacheFactory; +import com.gemstone.gemfire.cache.InterestResultPolicy; +import com.gemstone.gemfire.cache.MirrorType; +import com.gemstone.gemfire.cache.Region; +import com.gemstone.gemfire.cache.RegionAttributes; +import com.gemstone.gemfire.cache.Scope; +import com.gemstone.gemfire.cache.client.PoolManager; +import com.gemstone.gemfire.cache.client.internal.Connection; +import com.gemstone.gemfire.cache.client.internal.PoolImpl; +import com.gemstone.gemfire.cache.client.internal.RegisterInterestTracker; +import com.gemstone.gemfire.cache.client.internal.ServerRegionProxy; +import com.gemstone.gemfire.cache.server.CacheServer; +import com.gemstone.gemfire.distributed.DistributedSystem; +import com.gemstone.gemfire.distributed.internal.DistributionConfig; +import com.gemstone.gemfire.distributed.internal.ServerLocation; +import com.gemstone.gemfire.internal.AvailablePort; +import com.gemstone.gemfire.internal.cache.ClientServerObserverAdapter; +import com.gemstone.gemfire.internal.cache.ClientServerObserverHolder; +import com.gemstone.gemfire.internal.cache.CacheServerImpl; +import com.gemstone.gemfire.internal.cache.LocalRegion; +import com.gemstone.gemfire.internal.cache.tier.InterestType; +import com.gemstone.gemfire.test.junit.categories.IntegrationTest; + +import dunit.DistributedTestCase; +import dunit.Host; +import dunit.VM; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; +import java.util.Set; + +import org.junit.experimental.categories.Category; + +/** + * Tests Interest Registration Functionality + */ +@SuppressWarnings({"deprecation", "rawtypes", "serial", "unchecked"}) +public class HAInterestTestCase extends DistributedTestCase { + + protected static final int TIMEOUT_MILLIS = 60 * 1000; + protected static final int INTERVAL_MILLIS = 10; + + protected static final String REGION_NAME = "HAInterestBaseTest_region"; + + protected static final String k1 = "k1"; + protected static final String k2 = "k2"; + protected static final String client_k1 = "client-k1"; + protected static final String client_k2 = "client-k2"; + protected static final String server_k1 = "server-k1"; + protected static final String server_k2 = "server-k2"; + protected static final String server_k1_updated = "server_k1_updated"; + + protected static Cache cache = null; + protected static PoolImpl pool = null; + protected static Connection conn = null; + + protected static int PORT1; + protected static int PORT2; + protected static int PORT3; + + protected static boolean isBeforeRegistrationCallbackCalled = false; + protected static boolean isBeforeInterestRecoveryCallbackCalled = false; + protected static boolean isAfterRegistrationCallbackCalled = false; + + protected static Host host = null; + protected static VM server1 = null; + protected static VM server2 = null; + protected static VM server3 = null; + + protected volatile static boolean exceptionOccured = false; + + public HAInterestTestCase(String name) { + super(name); + } + + @Override + public void setUp() throws Exception { + super.setUp(); + host = Host.getHost(0); + server1 = host.getVM(0); + server2 = host.getVM(1); + server3 = host.getVM(2); + CacheServerTestUtil.disableShufflingOfEndpoints(); + // start servers first + PORT1 = ((Integer) server1.invoke(HAInterestTestCase.class, "createServerCache")).intValue(); + PORT2 = ((Integer) server2.invoke(HAInterestTestCase.class, "createServerCache")).intValue(); + PORT3 = ((Integer) server3.invoke(HAInterestTestCase.class, "createServerCache")).intValue(); + exceptionOccured = false; + addExpectedException("java.net.ConnectException: Connection refused: connect"); + } + + @Override + public void tearDown2() throws Exception { + // close the clients first + closeCache(); + + // then close the servers + server1.invoke(HAInterestTestCase.class, "closeCache"); + server2.invoke(HAInterestTestCase.class, "closeCache"); + server3.invoke(HAInterestTestCase.class, "closeCache"); + CacheServerTestUtil.resetDisableShufflingOfEndpointsFlag(); + } + + public static void closeCache() { + PoolImpl.AFTER_REGISTER_CALLBACK_FLAG = false; + PoolImpl.BEFORE_PRIMARY_IDENTIFICATION_FROM_BACKUP_CALLBACK_FLAG = false; + PoolImpl.BEFORE_RECOVER_INTEREST_CALLBACK_FLAG = false; + PoolImpl.BEFORE_REGISTER_CALLBACK_FLAG = false; + HAInterestTestCase.isAfterRegistrationCallbackCalled = false; + HAInterestTestCase.isBeforeInterestRecoveryCallbackCalled = false; + HAInterestTestCase.isBeforeRegistrationCallbackCalled = false; + if (cache != null && !cache.isClosed()) { + cache.close(); + cache.getDistributedSystem().disconnect(); + } + cache = null; + pool = null; + conn = null; + } + + /** + * Return the current primary waiting for a primary to exist. + * + * @since 5.7 + */ + public static VM getPrimaryVM() { + return getPrimaryVM(null); + } + + /** + * Return the current primary waiting for a primary to exist and for it not to + * be the oldPrimary (if oldPrimary is NOT null). + * + * @since 5.7 + */ + public static VM getPrimaryVM(final VM oldPrimary) { + WaitCriterion wc = new WaitCriterion() { + @Override + public boolean done() { + int primaryPort = pool.getPrimaryPort(); + if (primaryPort == -1) { + return false; + } + // we have a primary + VM currentPrimary = getServerVM(primaryPort); + if (currentPrimary != oldPrimary) { + return true; + } + return false; + } + @Override + public String description() { + return "waiting for primary"; + } + }; + DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, true); + + int primaryPort = pool.getPrimaryPort(); + assertTrue(primaryPort != -1); + VM currentPrimary = getServerVM(primaryPort); + assertTrue(currentPrimary != oldPrimary); + return currentPrimary; + } + + public static VM getBackupVM() { + return getBackupVM(null); + } + + public static VM getBackupVM(VM stoppedBackup) { + VM currentPrimary = getPrimaryVM(null); + if (currentPrimary != server2 && server2 != stoppedBackup) { + return server2; + } else if (currentPrimary != server3 && server3 != stoppedBackup) { + return server3; + } else if (currentPrimary != server1 && server1 != stoppedBackup) { + return server1; + } else { + fail("expected currentPrimary " + currentPrimary + " to be " + server1 + ", or " + server2 + ", or " + server3); + return null; + } + } + + /** + * Given a server vm (server1, server2, or server3) return its port. + * + * @since 5.7 + */ + public static int getServerPort(VM vm) { + if (vm == server1) { + return PORT1; + } else if (vm == server2) { + return PORT2; + } else if (vm == server3) { + return PORT3; + } else { + fail("expected vm " + vm + " to be " + server1 + ", or " + server2 + ", or " + server3); + return -1; + } + } + + /** + * Given a server port (PORT1, PORT2, or PORT3) return its vm. + * + * @since 5.7 + */ + public static VM getServerVM(int port) { + if (port == PORT1) { + return server1; + } else if (port == PORT2) { + return server2; + } else if (port == PORT3) { + return server3; + } else { + fail("expected port " + port + " to be " + PORT1 + ", or " + PORT2 + ", or " + PORT3); + return null; + } + } + + public static void verifyRefreshedEntriesFromServer() { + final Region r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME); + assertNotNull(r1); + + WaitCriterion wc = new WaitCriterion() { + @Override + public boolean done() { + Region.Entry re = r1.getEntry(k1); + if (re == null) + return false; + Object val = re.getValue(); + return client_k1.equals(val); + } + @Override + public String description() { + return "waiting for client_k1 refresh from server"; + } + }; + DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, true); + + wc = new WaitCriterion() { + @Override + public boolean done() { + Region.Entry re = r1.getEntry(k2); + if (re == null) + return false; + Object val = re.getValue(); + return client_k2.equals(val); + } + @Override + public String description() { + return "waiting for client_k2 refresh from server"; + } + }; + DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, true); + } + + public static void verifyDeadAndLiveServers(final int expectedDeadServers, final int expectedLiveServers) { + WaitCriterion wc = new WaitCriterion() { + @Override + public boolean done() { + return pool.getConnectedServerCount() == expectedLiveServers; + } + @Override + public String description() { + return "waiting for pool.getConnectedServerCount() == expectedLiveServer"; + } + }; + DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, true); + } + + public static void putK1andK2() { + Region r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME); + assertNotNull(r1); + r1.put(k1, server_k1); + r1.put(k2, server_k2); + } + + public static void setClientServerObserverForBeforeInterestRecoveryFailure() { + PoolImpl.BEFORE_RECOVER_INTEREST_CALLBACK_FLAG = true; + ClientServerObserverHolder.setInstance(new ClientServerObserverAdapter() { + public void beforeInterestRecovery() { + synchronized (HAInterestTestCase.class) { + Thread t = new Thread() { + public void run() { + getBackupVM().invoke(HAInterestTestCase.class, "startServer"); + getPrimaryVM().invoke(HAInterestTestCase.class, "stopServer"); + } + }; + t.start(); + try { + DistributedTestCase.join(t, 30 * 1000, getLogWriter()); + } catch (Exception ignore) { + exceptionOccured = true; + } + HAInterestTestCase.isBeforeInterestRecoveryCallbackCalled = true; + HAInterestTestCase.class.notify(); + PoolImpl.BEFORE_RECOVER_INTEREST_CALLBACK_FLAG = false; + } + } + }); + } + + public static void setClientServerObserverForBeforeInterestRecovery() { + PoolImpl.BEFORE_RECOVER_INTEREST_CALLBACK_FLAG = true; + ClientServerObserverHolder.setInstance(new ClientServerObserverAdapter() { + public void beforeInterestRecovery() { + synchronized (HAInterestTestCase.class) { + Thread t = new Thread() { + public void run() { + Region r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME); + assertNotNull(r1); + r1.put(k1, server_k1_updated); + } + }; + t.start(); + + HAInterestTestCase.isBeforeInterestRecoveryCallbackCalled = true; + HAInterestTestCase.class.notify(); + PoolImpl.BEFORE_RECOVER_INTEREST_CALLBACK_FLAG = false; + } + } + }); + } + + public static void waitForBeforeInterestRecoveryCallBack() throws InterruptedException { + assertNotNull(cache); + synchronized (HAInterestTestCase.class) { + while (!isBeforeInterestRecoveryCallbackCalled) { + HAInterestTestCase.class.wait(); + } + } + } + + public static void setClientServerObserverForBeforeRegistration(final VM vm) { + PoolImpl.BEFORE_REGISTER_CALLBACK_FLAG = true; + ClientServerObserverHolder.setInstance(new ClientServerObserverAdapter() { + public void beforeInterestRegistration() { + synchronized (HAInterestTestCase.class) { + vm.invoke(HAInterestTestCase.class, "startServer"); + HAInterestTestCase.isBeforeRegistrationCallbackCalled = true; + HAInterestTestCase.class.notify(); + PoolImpl.BEFORE_REGISTER_CALLBACK_FLAG = false; + } + } + }); + } + + public static void waitForBeforeRegistrationCallback() throws InterruptedException { + assertNotNull(cache); + synchronized (HAInterestTestCase.class) { + while (!isBeforeRegistrationCallbackCalled) { + HAInterestTestCase.class.wait(); + } + } + } + + public static void setClientServerObserverForAfterRegistration(final VM vm) { + PoolImpl.AFTER_REGISTER_CALLBACK_FLAG = true; + ClientServerObserverHolder.setInstance(new ClientServerObserverAdapter() { + public void afterInterestRegistration() { + synchronized (HAInterestTestCase.class) { + vm.invoke(HAInterestTestCase.class, "startServer"); + HAInterestTestCase.isAfterRegistrationCallbackCalled = true; + HAInterestTestCase.class.notify(); + PoolImpl.AFTER_REGISTER_CALLBACK_FLAG = false; + } + } + }); + } + + public static void waitForAfterRegistrationCallback() throws InterruptedException { + assertNotNull(cache); + if (!isAfterRegistrationCallbackCalled) { + synchronized (HAInterestTestCase.class) { + while (!isAfterRegistrationCallbackCalled) { + HAInterestTestCase.class.wait(); + } + } + } + } + + public static void unSetClientServerObserverForRegistrationCallback() { + synchronized (HAInterestTestCase.class) { + PoolImpl.BEFORE_REGISTER_CALLBACK_FLAG = false; + PoolImpl.AFTER_REGISTER_CALLBACK_FLAG = false; + HAInterestTestCase.isBeforeRegistrationCallbackCalled = false; + HAInterestTestCase.isAfterRegistrationCallbackCalled = false; + } + } + + public static void verifyDispatcherIsAlive() { + assertEquals("More than one BridgeServer", 1, cache.getCacheServers().size()); + + WaitCriterion wc = new WaitCriterion() { + @Override + public boolean done() { + return cache.getCacheServers().size() == 1; + } + @Override + public String description() { + return "waiting for cache.getCacheServers().size() == 1"; + } + }; + DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, true); + + CacheServerImpl bs = (CacheServerImpl) cache.getCacheServers().iterator().next(); + assertNotNull(bs); + assertNotNull(bs.getAcceptor()); + assertNotNull(bs.getAcceptor().getCacheClientNotifier()); + final CacheClientNotifier ccn = bs.getAcceptor().getCacheClientNotifier(); + + wc = new WaitCriterion() { + @Override + public boolean done() { + return ccn.getClientProxies().size() > 0; + } + @Override + public String description() { + return "waiting for ccn.getClientProxies().size() > 0"; + } + }; + DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, true); + + wc = new WaitCriterion() { + Iterator iter_prox; + CacheClientProxy proxy; + + @Override + public boolean done() { + iter_prox = ccn.getClientProxies().iterator(); + if (iter_prox.hasNext()) { + proxy = (CacheClientProxy) iter_prox.next(); + return proxy._messageDispatcher.isAlive(); + } else { + return false; + } + } + + @Override + public String description() { + return "waiting for CacheClientProxy _messageDispatcher to be alive"; + } + }; + DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, true); + } + + public static void verifyDispatcherIsNotAlive() { + WaitCriterion wc = new WaitCriterion() { + @Override + public boolean done() { + return cache.getCacheServers().size() == 1; + } + @Override + public String description() { + return "cache.getCacheServers().size() == 1"; + } + }; + DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, true); + + CacheServerImpl bs = (CacheServerImpl) cache.getCacheServers().iterator().next(); + assertNotNull(bs); + assertNotNull(bs.getAcceptor()); + assertNotNull(bs.getAcceptor().getCacheClientNotifier()); + final CacheClientNotifier ccn = bs.getAcceptor().getCacheClientNotifier(); + + wc = new WaitCriterion() { + @Override + public boolean done() { + return ccn.getClientProxies().size() > 0; + } + @Override + public String description() { + return "waiting for ccn.getClientProxies().size() > 0"; + } + }; + DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, true); + + Iterator iter_prox = ccn.getClientProxies().iterator(); + if (iter_prox.hasNext()) { + CacheClientProxy proxy = (CacheClientProxy) iter_prox.next(); + assertFalse("Dispatcher on secondary should not be alive", proxy._messageDispatcher.isAlive()); + } + } + + public static void createEntriesK1andK2OnServer() { + Region r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME); + assertNotNull(r1); + if (!r1.containsKey(k1)) { + r1.create(k1, server_k1); + } + if (!r1.containsKey(k2)) { + r1.create(k2, server_k2); + } + assertEquals(r1.getEntry(k1).getValue(), server_k1); + assertEquals(r1.getEntry(k2).getValue(), server_k2); + } + + public static void createEntriesK1andK2() { + Region r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME); + assertNotNull(r1); + if (!r1.containsKey(k1)) { + r1.create(k1, client_k1); + } + if (!r1.containsKey(k2)) { + r1.create(k2, client_k2); + } + assertEquals(r1.getEntry(k1).getValue(), client_k1); + assertEquals(r1.getEntry(k2).getValue(), client_k2); + } + + public static void createServerEntriesK1andK2() { + Region r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME); + assertNotNull(r1); + if (!r1.containsKey(k1)) { + r1.create(k1, server_k1); + } + if (!r1.containsKey(k2)) { + r1.create(k2, server_k2); + } + assertEquals(r1.getEntry(k1).getValue(), server_k1); + assertEquals(r1.getEntry(k2).getValue(), server_k2); + } + + public static void registerK1AndK2() { + Region r = cache.getRegion(Region.SEPARATOR + REGION_NAME); + assertNotNull(r); + List list = new ArrayList(); + list.add(k1); + list.add(k2); + r.registerInterest(list, InterestResultPolicy.KEYS_VALUES); + } + + public static void reRegisterK1AndK2() { + Region r = cache.getRegion(Region.SEPARATOR + REGION_NAME); + assertNotNull(r); + List list = new ArrayList(); + list.add(k1); + list.add(k2); + r.registerInterest(list); + } + + public static void startServer() throws IOException { + Cache c = CacheFactory.getAnyInstance(); + assertEquals("More than one BridgeServer", 1, c.getCacheServers().size()); + CacheServerImpl bs = (CacheServerImpl) c.getCacheServers().iterator().next(); + assertNotNull(bs); + bs.start(); + } + + public static void stopServer() { + assertEquals("More than one BridgeServer", 1, cache.getCacheServers().size()); + CacheServerImpl bs = (CacheServerImpl) cache.getCacheServers().iterator().next(); + assertNotNull(bs); + bs.stop(); + } + + public static void stopPrimaryAndRegisterK1AndK2AndVerifyResponse() { + LocalRegion r = (LocalRegion) cache.getRegion(Region.SEPARATOR + REGION_NAME); + assertNotNull(r); + ServerRegionProxy srp = new ServerRegionProxy(r); + + WaitCriterion wc = new WaitCriterion() { + @Override + public boolean done() { + return pool.getConnectedServerCount() == 3; + } + @Override + public String description() { + return "connected server count never became 3"; + } + }; + DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, true); + + // close primaryEP + getPrimaryVM().invoke(HAInterestTestCase.class, "stopServer"); + List list = new ArrayList(); + list.add(k1); + list.add(k2); + List serverKeys = srp.registerInterest(list, InterestType.KEY, InterestResultPolicy.KEYS, false, r.getAttributes().getDataPolicy().ordinal); + assertNotNull(serverKeys); + List resultKeys = (List) serverKeys.get(0); + assertEquals(2, resultKeys.size()); + assertTrue(resultKeys.contains(k1)); + assertTrue(resultKeys.contains(k2)); + } + + public static void stopPrimaryAndUnregisterRegisterK1() { + LocalRegion r = (LocalRegion) cache.getRegion(Region.SEPARATOR + REGION_NAME); + assertNotNull(r); + ServerRegionProxy srp = new ServerRegionProxy(r); + + WaitCriterion wc = new WaitCriterion() { + @Override + public boolean done() { + return pool.getConnectedServerCount() == 3; + } + @Override + public String description() { + return "connected server count never became 3"; + } + }; + DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, true); + + // close primaryEP + getPrimaryVM().invoke(HAInterestTestCase.class, "stopServer"); + List list = new ArrayList(); + list.add(k1); + srp.unregisterInterest(list, InterestType.KEY, false, false); + } + + public static void stopBothPrimaryAndSecondaryAndRegisterK1AndK2AndVerifyResponse() { + LocalRegion r = (LocalRegion) cache.getRegion(Region.SEPARATOR + REGION_NAME); + assertNotNull(r); + ServerRegionProxy srp = new ServerRegionProxy(r); + + WaitCriterion wc = new WaitCriterion() { + @Override + public boolean done() { + return pool.getConnectedServerCount() == 3; + } + @Override + public String description() { + return "connected server count never became 3"; + } + }; + DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, true); + + // close primaryEP + VM backup = getBackupVM(); + getPrimaryVM().invoke(HAInterestTestCase.class, "stopServer"); + // close secondary + backup.invoke(HAInterestTestCase.class, "stopServer"); + List list = new ArrayList(); + list.add(k1); + list.add(k2); + List serverKeys = srp.registerInterest(list, InterestType.KEY, InterestResultPolicy.KEYS, false, r.getAttributes().getDataPolicy().ordinal); + + assertNotNull(serverKeys); + List resultKeys = (List) serverKeys.get(0); + assertEquals(2, resultKeys.size()); + assertTrue(resultKeys.contains(k1)); + assertTrue(resultKeys.contains(k2)); + } + + /** + * returns the secondary that was stopped + */ + public static VM stopSecondaryAndRegisterK1AndK2AndVerifyResponse() { + LocalRegion r = (LocalRegion) cache.getRegion(Region.SEPARATOR + REGION_NAME); + assertNotNull(r); + ServerRegionProxy srp = new ServerRegionProxy(r); + + WaitCriterion wc = new WaitCriterion() { + @Override + public boolean done() { + return pool.getConnectedServerCount() == 3; + } + @Override + public String description() { + return "Never got three connected servers"; + } + }; + DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, true); + + // close secondary EP + VM result = getBackupVM(); + result.invoke(HAInterestTestCase.class, "stopServer"); + List list = new ArrayList(); + list.add(k1); + list.add(k2); + List serverKeys = srp.registerInterest(list, InterestType.KEY, InterestResultPolicy.KEYS, false, r.getAttributes().getDataPolicy().ordinal); + + assertNotNull(serverKeys); + List resultKeys = (List) serverKeys.get(0); + assertEquals(2, resultKeys.size()); + assertTrue(resultKeys.contains(k1)); + assertTrue(resultKeys.contains(k2)); + return result; + } + + /** + * returns the secondary that was stopped + */ + public static VM stopSecondaryAndUNregisterK1() { + LocalRegion r = (LocalRegion) cache.getRegion(Region.SEPARATOR + REGION_NAME); + assertNotNull(r); + ServerRegionProxy srp = new ServerRegionProxy(r); + + WaitCriterion wc = new WaitCriterion() { + @Override + public boolean done() { + return pool.getConnectedServerCount() == 3; + } + @Override + public String description() { + return "connected server count never became 3"; + } + }; + DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, true); + + // close secondary EP + VM result = getBackupVM(); + result.invoke(HAInterestTestCase.class, "stopServer"); + List list = new ArrayList(); + list.add(k1); + srp.unregisterInterest(list, InterestType.KEY, false, false); + return result; + } + + public static void registerK1AndK2OnPrimaryAndSecondaryAndVerifyResponse() { + ServerLocation primary = pool.getPrimary(); + ServerLocation secondary = (ServerLocation) pool.getRedundants().get(0); + LocalRegion r = (LocalRegion) cache.getRegion(Region.SEPARATOR + REGION_NAME); + assertNotNull(r); + ServerRegionProxy srp = new ServerRegionProxy(r); + List list = new ArrayList(); + list.add(k1); + list.add(k2); + + // Primary server + List serverKeys1 = srp.registerInterestOn(primary, list, InterestType.KEY, InterestResultPolicy.KEYS, false, r.getAttributes().getDataPolicy().ordinal); + assertNotNull(serverKeys1); + // expect serverKeys in response from primary + List resultKeys = (List) serverKeys1.get(0); + assertEquals(2, resultKeys.size()); + assertTrue(resultKeys.contains(k1)); + assertTrue(resultKeys.contains(k2)); + + // Secondary server + List serverKeys2 = srp.registerInterestOn(secondary, list, InterestType.KEY, InterestResultPolicy.KEYS, false, r.getAttributes().getDataPolicy().ordinal); + // if the list is null then it is empty + if (serverKeys2 != null) { + // no serverKeys in response from secondary + assertTrue(serverKeys2.isEmpty()); + } + } + + public static void verifyInterestRegistration() { + WaitCriterion wc = new WaitCriterion() { + @Override + public boolean done() { + return cache.getCacheServers().size() == 1; + } + @Override + public String description() { + return "waiting for cache.getCacheServers().size() == 1"; + } + }; + DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, true); + + CacheServerImpl bs = (CacheServerImpl) cache.getCacheServers().iterator().next(); + assertNotNull(bs); + assertNotNull(bs.getAcceptor()); + assertNotNull(bs.getAcceptor().getCacheClientNotifier()); + final CacheClientNotifier ccn = bs.getAcceptor().getCacheClientNotifier(); + + wc = new WaitCriterion() { + @Override + public boolean done() { + return ccn.getClientProxies().size() > 0; + } + @Override + public String description() { + return "waiting for ccn.getClientProxies().size() > 0"; + } + }; + DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, true); + + Iterator iter_prox = ccn.getClientProxies().iterator(); + + if (iter_prox.hasNext()) { + final CacheClientProxy ccp = (CacheClientProxy) iter_prox.next(); + + wc = new WaitCriterion() { + @Override + public boolean done() { + Set keysMap = (Set) ccp.cils[RegisterInterestTracker.interestListIndex] + .getProfile(Region.SEPARATOR + REGION_NAME) + .getKeysOfInterestFor(ccp.getProxyID()); + return keysMap != null && keysMap.size() == 2; + } + @Override + public String description() { + return "waiting for keys of interest to include 2 keys"; + } + }; + DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, true); + + Set keysMap = (Set) ccp.cils[RegisterInterestTracker.interestListIndex].getProfile(Region.SEPARATOR + REGION_NAME) + .getKeysOfInterestFor(ccp.getProxyID()); + assertNotNull(keysMap); + assertEquals(2, keysMap.size()); + assertTrue(keysMap.contains(k1)); + assertTrue(keysMap.contains(k2)); + } + } + + public static void verifyInterestUNRegistration() { + WaitCriterion wc = new WaitCriterion() { + @Override + public boolean done() { + return cache.getCacheServers().size() == 1; + } + @Override + public String description() { + return "waiting for cache.getCacheServers().size() == 1"; + } + }; + DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, true); + + CacheServerImpl bs = (CacheServerImpl) cache.getCacheServers().iterator().next(); + assertNotNull(bs); + assertNotNull(bs.getAcceptor()); + assertNotNull(bs.getAcceptor().getCacheClientNotifier()); + final CacheClientNotifier ccn = bs.getAcceptor().getCacheClientNotifier(); + + wc = new WaitCriterion() { + @Override + public boolean done() { + return ccn.getClientProxies().size() > 0; + } + @Override + public String description() { + return "waiting for ccn.getClientProxies().size() > 0"; + } + }; + DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, true); + + Iterator iter_prox = ccn.getClientProxies().iterator(); + if (iter_prox.hasNext()) { + final CacheClientProxy ccp = (CacheClientProxy) iter_prox.next(); + + wc = new WaitCriterion() { + @Override + public boolean done() { + Set keysMap = (Set) ccp.cils[RegisterInterestTracker.interestListIndex] + .getProfile(Region.SEPARATOR + REGION_NAME) + .getKeysOfInterestFor(ccp.getProxyID()); + return keysMap != null; + } + @Override + public String description() { + return "waiting for keys of interest to not be null"; + } + }; + DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, true); + + Set keysMap = (Set) ccp.cils[RegisterInterestTracker.interestListIndex] + .getProfile(Region.SEPARATOR + REGION_NAME) + .getKeysOfInterestFor(ccp.getProxyID()); + assertNotNull(keysMap); + assertEquals(1, keysMap.size()); + assertFalse(keysMap.contains(k1)); + assertTrue(keysMap.contains(k2)); + } + } + + private void createCache(Properties props) throws Exception { + DistributedSystem ds = getSystem(props); + assertNotNull(ds); + ds.disconnect(); + ds = getSystem(props); + cache = CacheFactory.create(ds); + assertNotNull(cache); + } + + public static void createClientPoolCache(String testName, String host) throws Exception { + Properties props = new Properties(); + props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0"); + props.setProperty(DistributionConfig.LOCATORS_NAME, ""); + new HAInterestTestCase("temp").createCache(props); + CacheServerTestUtil.disableShufflingOfEndpoints(); + PoolImpl p; + try { + p = (PoolImpl) PoolManager.createFactory() + .addServer(host, PORT1) + .addServer(host, PORT2) + .addServer(host, PORT3) + .setSubscriptionEnabled(true) + .setSubscriptionRedundancy(-1) + .setReadTimeout(1000) + .setPingInterval(1000) + // retryInterval should be more so that only registerInterste thread + // will initiate failover + // .setRetryInterval(20000) + .create("HAInterestBaseTestPool"); + } finally { + CacheServerTestUtil.enableShufflingOfEndpoints(); + } + AttributesFactory factory = new AttributesFactory(); + factory.setScope(Scope.LOCAL); + factory.setConcurrencyChecksEnabled(true); + factory.setPoolName(p.getName()); + + cache.createRegion(REGION_NAME, factory.create()); + pool = p; + conn = pool.acquireConnection(); + assertNotNull(conn); + } + + public static void createClientPoolCacheWithSmallRetryInterval(String testName, String host) throws Exception { + Properties props = new Properties(); + props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0"); + props.setProperty(DistributionConfig.LOCATORS_NAME, ""); + new HAInterestTestCase("temp").createCache(props); + CacheServerTestUtil.disableShufflingOfEndpoints(); + PoolImpl p; + try { + p = (PoolImpl) PoolManager.createFactory() + .addServer(host, PORT1) + .addServer(host, PORT2) + .setSubscriptionEnabled(true) + .setSubscriptionRedundancy(-1) + .setReadTimeout(1000) + .setSocketBufferSize(32768) + .setMinConnections(6) + .setPingInterval(200) + // .setRetryInterval(200) + // retryAttempts 3 + .create("HAInterestBaseTestPool"); + } finally { + CacheServerTestUtil.enableShufflingOfEndpoints(); + } + AttributesFactory factory = new AttributesFactory(); + factory.setScope(Scope.LOCAL); + factory.setConcurrencyChecksEnabled(true); + factory.setPoolName(p.getName()); + + cache.createRegion(REGION_NAME, factory.create()); + + pool = p; + conn = pool.acquireConnection(); + assertNotNull(conn); + } + + public static void createClientPoolCacheConnectionToSingleServer(String testName, String hostName) throws Exception { + Properties props = new Properties(); + props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0"); + props.setProperty(DistributionConfig.LOCATORS_NAME, ""); + new HAInterestTestCase("temp").createCache(props); + PoolImpl p = (PoolImpl) PoolManager.createFactory() + .addServer(hostName, PORT1) + .setSubscriptionEnabled(true) + .setSubscriptionRedundancy(-1) + .setReadTimeout(1000) + // .setRetryInterval(20) + .create("HAInterestBaseTestPool"); + AttributesFactory factory = new AttributesFactory(); + factory.setScope(Scope.LOCAL); + factory.setConcurrencyChecksEnabled(true); + factory.setPoolName(p.getName()); + + cache.createRegion(REGION_NAME, factory.create()); + + pool = p; + conn = pool.acquireConnection(); + assertNotNull(conn); + } + + public static Integer createServerCache() throws Exception { + new HAInterestTestCase("temp").createCache(new Properties()); + AttributesFactory factory = new AttributesFactory(); + factory.setScope(Scope.DISTRIBUTED_ACK); + factory.setEnableBridgeConflation(true); + factory.setMirrorType(MirrorType.KEYS_VALUES); + factory.setConcurrencyChecksEnabled(true); + cache.createRegion(REGION_NAME, factory.create()); + + CacheServer server = cache.addCacheServer(); + int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); + server.setPort(port); + server.setMaximumTimeBetweenPings(180000); + // ensures updates to be sent instead of invalidations + server.setNotifyBySubscription(true); + server.start(); + return new Integer(server.getPort()); + } + + public static Integer createServerCacheWithLocalRegion() throws Exception { + new HAInterestTestCase("temp").createCache(new Properties()); + AttributesFactory factory = new AttributesFactory(); + factory.setScope(Scope.LOCAL); + factory.setConcurrencyChecksEnabled(true); + RegionAttributes attrs = factory.create(); + cache.createRegion(REGION_NAME, attrs); + + CacheServer server = cache.addCacheServer(); + int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); + server.setPort(port); + // ensures updates to be sent instead of invalidations + server.setNotifyBySubscription(true); + server.setMaximumTimeBetweenPings(180000); + server.start(); + return new Integer(server.getPort()); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c5efb805/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/CommitCommandTest.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/CommitCommandTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/CommitCommandTest.java index b12f55b..b6bfe22 100644 --- a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/CommitCommandTest.java +++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/CommitCommandTest.java @@ -22,12 +22,18 @@ import static org.mockito.Mockito.when; import java.io.IOException; import org.junit.Test; +import org.junit.experimental.categories.Category; import com.gemstone.gemfire.CancelCriterion; import com.gemstone.gemfire.cache.Cache; import com.gemstone.gemfire.internal.cache.tier.sockets.Message; import com.gemstone.gemfire.internal.cache.tier.sockets.ServerConnection; +import com.gemstone.gemfire.test.junit.categories.UnitTest; +/** + * Exposes GEODE-537: NPE in JTA AFTER_COMPLETION command processing + */ +@Category(UnitTest.class) public class CommitCommandTest { /** http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c5efb805/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/LogWriterPerformanceTest.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/LogWriterPerformanceTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/LogWriterPerformanceTest.java index 77d7995..1f72a6b 100644 --- a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/LogWriterPerformanceTest.java +++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/LogWriterPerformanceTest.java @@ -21,16 +21,22 @@ import java.io.FileOutputStream; import java.io.IOException; import java.util.Properties; +import org.junit.Ignore; +import org.junit.experimental.categories.Category; + import com.gemstone.gemfire.LogWriter; import com.gemstone.gemfire.distributed.internal.DistributionConfig; import com.gemstone.gemfire.distributed.internal.DistributionConfigImpl; import com.gemstone.gemfire.internal.util.IOUtils; +import com.gemstone.gemfire.test.junit.categories.PerformanceTest; /** * Tests performance of logging when level is OFF. * * @author Kirk Lund */ +@Category(PerformanceTest.class) +@Ignore("Tests have no assertions") public class LogWriterPerformanceTest extends LoggingPerformanceTestCase { public LogWriterPerformanceTest(String name) { http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c5efb805/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/log4j/Log4J2DisabledPerformanceTest.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/log4j/Log4J2DisabledPerformanceTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/log4j/Log4J2DisabledPerformanceTest.java index f98868b..caedadc 100644 --- a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/log4j/Log4J2DisabledPerformanceTest.java +++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/log4j/Log4J2DisabledPerformanceTest.java @@ -20,7 +20,13 @@ import java.io.IOException; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.Logger; +import org.junit.Ignore; +import org.junit.experimental.categories.Category; +import com.gemstone.gemfire.test.junit.categories.PerformanceTest; + +@Category(PerformanceTest.class) +@Ignore("Tests have no assertions") public class Log4J2DisabledPerformanceTest extends Log4J2PerformanceTest { public Log4J2DisabledPerformanceTest(String name) { http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c5efb805/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/log4j/Log4J2PerformanceTest.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/log4j/Log4J2PerformanceTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/log4j/Log4J2PerformanceTest.java index 6d75f80..1ad30bd 100644 --- a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/log4j/Log4J2PerformanceTest.java +++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/log4j/Log4J2PerformanceTest.java @@ -28,11 +28,16 @@ import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.core.config.ConfigurationFactory; +import org.junit.Ignore; +import org.junit.experimental.categories.Category; import com.gemstone.gemfire.internal.FileUtil; import com.gemstone.gemfire.internal.logging.LoggingPerformanceTestCase; import com.gemstone.gemfire.internal.util.IOUtils; +import com.gemstone.gemfire.test.junit.categories.PerformanceTest; +@Category(PerformanceTest.class) +@Ignore("Tests have no assertions") public class Log4J2PerformanceTest extends LoggingPerformanceTestCase { protected static final int DEFAULT_LOG_FILE_SIZE_LIMIT = Integer.MAX_VALUE; http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c5efb805/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/log4j/LogWriterLoggerDisabledPerformanceTest.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/log4j/LogWriterLoggerDisabledPerformanceTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/log4j/LogWriterLoggerDisabledPerformanceTest.java index f964208..4be34c7 100644 --- a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/log4j/LogWriterLoggerDisabledPerformanceTest.java +++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/log4j/LogWriterLoggerDisabledPerformanceTest.java @@ -20,7 +20,13 @@ import java.io.IOException; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.Logger; +import org.junit.Ignore; +import org.junit.experimental.categories.Category; +import com.gemstone.gemfire.test.junit.categories.PerformanceTest; + +@Category(PerformanceTest.class) +@Ignore("Tests have no assertions") public class LogWriterLoggerDisabledPerformanceTest extends LogWriterLoggerPerformanceTest { public LogWriterLoggerDisabledPerformanceTest(String name) { http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c5efb805/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/log4j/LogWriterLoggerPerformanceTest.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/log4j/LogWriterLoggerPerformanceTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/log4j/LogWriterLoggerPerformanceTest.java index 506e487..3d8ee46 100644 --- a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/log4j/LogWriterLoggerPerformanceTest.java +++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/log4j/LogWriterLoggerPerformanceTest.java @@ -27,11 +27,16 @@ import org.apache.commons.io.FileUtils; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.core.config.ConfigurationFactory; +import org.junit.Ignore; +import org.junit.experimental.categories.Category; import com.gemstone.gemfire.internal.FileUtil; import com.gemstone.gemfire.internal.logging.LoggingPerformanceTestCase; import com.gemstone.gemfire.internal.util.IOUtils; +import com.gemstone.gemfire.test.junit.categories.PerformanceTest; +@Category(PerformanceTest.class) +@Ignore("Tests have no assertions") public class LogWriterLoggerPerformanceTest extends LoggingPerformanceTestCase { protected static final int DEFAULT_LOG_FILE_SIZE_LIMIT = Integer.MAX_VALUE; http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c5efb805/gemfire-core/src/test/java/dunit/DistributedTestCase.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/dunit/DistributedTestCase.java b/gemfire-core/src/test/java/dunit/DistributedTestCase.java index a3d4785..6fa560f 100755 --- a/gemfire-core/src/test/java/dunit/DistributedTestCase.java +++ b/gemfire-core/src/test/java/dunit/DistributedTestCase.java @@ -34,6 +34,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.junit.experimental.categories.Category; import org.springframework.data.gemfire.support.GemfireCache; import junit.framework.TestCase; @@ -86,6 +87,7 @@ import com.gemstone.gemfire.internal.logging.LogWriterImpl; import com.gemstone.gemfire.internal.logging.ManagerLogWriter; import com.gemstone.gemfire.internal.logging.log4j.LogWriterLogger; import com.gemstone.gemfire.management.internal.cli.LogWrapper; +import com.gemstone.gemfire.test.junit.categories.DistributedTest; import dunit.standalone.DUnitLauncher; @@ -101,6 +103,7 @@ import dunit.standalone.DUnitLauncher; * * @author David Whitlock */ +@Category(DistributedTest.class) @SuppressWarnings("serial") public abstract class DistributedTestCase extends TestCase implements java.io.Serializable { private static final Logger logger = LogService.getLogger(); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c5efb805/gemfire-core/src/test/java/dunit/standalone/ProcessManager.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/dunit/standalone/ProcessManager.java b/gemfire-core/src/test/java/dunit/standalone/ProcessManager.java index 7fc762f..6366853 100644 --- a/gemfire-core/src/test/java/dunit/standalone/ProcessManager.java +++ b/gemfire-core/src/test/java/dunit/standalone/ProcessManager.java @@ -38,6 +38,7 @@ import com.gemstone.gemfire.internal.FileUtil; import com.gemstone.gemfire.internal.logging.LogService; import dunit.RemoteDUnitVMIF; +import dunit.standalone.ChildVM; /** * @author dsmith @@ -183,7 +184,7 @@ public class ProcessManager { "-Dgemfire.disallowMcastDefaults=true", "-ea", agent, - "dunit.standalone.ChildVM" + ChildVM.class.getName() }; } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c5efb805/gemfire-junit/src/test/java/com/gemstone/gemfire/test/junit/categories/ContainerTest.java ---------------------------------------------------------------------- diff --git a/gemfire-junit/src/test/java/com/gemstone/gemfire/test/junit/categories/ContainerTest.java b/gemfire-junit/src/test/java/com/gemstone/gemfire/test/junit/categories/ContainerTest.java new file mode 100755 index 0000000..8eec738 --- /dev/null +++ b/gemfire-junit/src/test/java/com/gemstone/gemfire/test/junit/categories/ContainerTest.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.test.junit.categories; +/** + * JUnit Test Category that specifies a test executes within a container + * environment such as an OSGi server. + * + * @author Kirk Lund + */ +public interface ContainerTest { +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c5efb805/gemfire-junit/src/test/java/com/gemstone/gemfire/test/junit/categories/HydraTest.java ---------------------------------------------------------------------- diff --git a/gemfire-junit/src/test/java/com/gemstone/gemfire/test/junit/categories/HydraTest.java b/gemfire-junit/src/test/java/com/gemstone/gemfire/test/junit/categories/HydraTest.java new file mode 100755 index 0000000..4fe535b --- /dev/null +++ b/gemfire-junit/src/test/java/com/gemstone/gemfire/test/junit/categories/HydraTest.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.test.junit.categories; +/** + * JUnit Test Category that specifies a hydra test. + * + * @author Kirk Lund + */ +public interface HydraTest { +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c5efb805/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImplJUnitPerformanceTest.java ---------------------------------------------------------------------- diff --git a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImplJUnitPerformanceTest.java b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImplJUnitPerformanceTest.java deleted file mode 100644 index ab2db78..0000000 --- a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImplJUnitPerformanceTest.java +++ /dev/null @@ -1,437 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package com.gemstone.gemfire.cache.lucene.internal.repository; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Random; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; - -import org.apache.lucene.analysis.standard.StandardAnalyzer; -import org.apache.lucene.document.Document; -import org.apache.lucene.document.Field.Store; -import org.apache.lucene.document.TextField; -import org.apache.lucene.index.IndexWriter; -import org.apache.lucene.index.IndexWriterConfig; -import org.apache.lucene.index.Term; -import org.apache.lucene.search.IndexSearcher; -import org.apache.lucene.search.Query; -import org.apache.lucene.search.SearcherManager; -import org.apache.lucene.search.TermQuery; -import org.apache.lucene.store.RAMDirectory; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import com.gemstone.gemfire.DataSerializable; -import com.gemstone.gemfire.DataSerializer; -import com.gemstone.gemfire.cache.Cache; -import com.gemstone.gemfire.cache.CacheFactory; -import com.gemstone.gemfire.cache.PartitionAttributesFactory; -import com.gemstone.gemfire.cache.Region; -import com.gemstone.gemfire.cache.RegionShortcut; -import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueue; -import com.gemstone.gemfire.cache.lucene.LuceneIndex; -import com.gemstone.gemfire.cache.lucene.LuceneQuery; -import com.gemstone.gemfire.cache.lucene.LuceneQueryProvider; -import com.gemstone.gemfire.cache.lucene.LuceneQueryResults; -import com.gemstone.gemfire.cache.lucene.LuceneService; -import com.gemstone.gemfire.cache.lucene.LuceneServiceProvider; -import com.gemstone.gemfire.cache.lucene.internal.LuceneServiceImpl; -import com.gemstone.gemfire.cache.lucene.internal.directory.RegionDirectory; -import com.gemstone.gemfire.cache.lucene.internal.distributed.TopEntriesCollector; -import com.gemstone.gemfire.cache.lucene.internal.filesystem.ChunkKey; -import com.gemstone.gemfire.cache.lucene.internal.filesystem.File; -import com.gemstone.gemfire.cache.lucene.internal.repository.serializer.HeterogenousLuceneSerializer; -import com.gemstone.gemfire.cache.query.QueryException; -import com.gemstone.gemfire.test.junit.categories.PerformanceTest; - -/** - * Microbenchmark of the IndexRepository to compare an - * IndexRepository built on top of cache with a - * stock lucene IndexWriter with a RAMDirectory. - */ -@Category(PerformanceTest.class) -public class IndexRepositoryImplJUnitPerformanceTest { - - private static final int NUM_WORDS = 1000; - private static int[] COMMIT_INTERVAL = new int[] {100, 1000, 5000}; - private static int NUM_ENTRIES = 500_000; - private static int NUM_QUERIES = 500_000; - - private StandardAnalyzer analyzer = new StandardAnalyzer(); - - @Test - public void testIndexRepository() throws Exception { - - - doTest("IndexRepository", new TestCallbacks() { - - private Cache cache; - private IndexRepositoryImpl repo; - private IndexWriter writer; - - @Override - public void addObject(String key, String text) throws Exception { - repo.create(key, new TestObject(text)); - } - - @Override - public void commit() throws Exception { - repo.commit(); - } - - @Override - public void init() throws Exception { - cache = new CacheFactory().set("mcast-port", "0") - .set("log-level", "error") - .create(); - Region<String, File> fileRegion = cache.<String, File>createRegionFactory(RegionShortcut.REPLICATE).create("files"); - Region<ChunkKey, byte[]> chunkRegion = cache.<ChunkKey, byte[]>createRegionFactory(RegionShortcut.REPLICATE).create("chunks"); - - RegionDirectory dir = new RegionDirectory(fileRegion, chunkRegion); - - - IndexWriterConfig config = new IndexWriterConfig(analyzer); - writer = new IndexWriter(dir, config); - String[] indexedFields= new String[] {"text"}; - HeterogenousLuceneSerializer mapper = new HeterogenousLuceneSerializer(indexedFields); - repo = new IndexRepositoryImpl(fileRegion, writer, mapper); - } - - @Override - public void cleanup() throws IOException { - writer.close(); - cache.close(); - } - - @Override - public void waitForAsync() throws Exception { - //do nothing - } - - @Override - public int query(Query query) throws IOException { - TopEntriesCollector collector = new TopEntriesCollector(); - repo.query(query, 100, collector); - return collector.size(); - } - }); - } - - /** - * Test our full lucene index implementation - * @throws Exception - */ - @Test - public void testLuceneIndex() throws Exception { - - - doTest("LuceneIndex", new TestCallbacks() { - - private Cache cache; - private Region<String, TestObject> region; - private LuceneService service; - - @Override - public void addObject(String key, String text) throws Exception { - region.create(key, new TestObject(text)); - } - - @Override - public void commit() throws Exception { - //NA - } - - @Override - public void init() throws Exception { - cache = new CacheFactory().set("mcast-port", "0") - .set("log-level", "warning") - .create(); - service = LuceneServiceProvider.get(cache); - service.createIndex("index", "/region", "text"); - region = cache.<String, TestObject>createRegionFactory(RegionShortcut.PARTITION) - .setPartitionAttributes(new PartitionAttributesFactory<>().setTotalNumBuckets(1).create()) - .create("region"); - } - - @Override - public void cleanup() throws IOException { - cache.close(); - } - - @Override - public void waitForAsync() throws Exception { - AsyncEventQueue aeq = cache.getAsyncEventQueue(LuceneServiceImpl.getUniqueIndexName("index", "/region")); - - //We will be at most 10 ms off - while(aeq.size() > 0) { - Thread.sleep(10); - } - } - - @Override - public int query(final Query query) throws Exception { - LuceneQuery<Object, Object> luceneQuery = service.createLuceneQueryFactory().create("index", "/region", new LuceneQueryProvider() { - - @Override - public Query getQuery(LuceneIndex index) throws QueryException { - return query; - } - }); - - LuceneQueryResults<Object, Object> results = luceneQuery.search(); - return results.size(); - } - }); - } - - @Test - public void testLuceneWithRegionDirectory() throws Exception { - doTest("RegionDirectory", new TestCallbacks() { - - private IndexWriter writer; - private SearcherManager searcherManager; - - @Override - public void init() throws Exception { - RegionDirectory dir = new RegionDirectory(new ConcurrentHashMap<String, File>(), new ConcurrentHashMap<ChunkKey, byte[]>()); - IndexWriterConfig config = new IndexWriterConfig(analyzer); - writer = new IndexWriter(dir, config); - searcherManager = new SearcherManager(writer, true, null); - } - - @Override - public void addObject(String key, String text) throws Exception { - Document doc = new Document(); - doc.add(new TextField("key", key, Store.YES)); - doc.add(new TextField("text", text, Store.NO)); - writer.addDocument(doc); - } - - @Override - public void commit() throws Exception { - writer.commit(); - searcherManager.maybeRefresh(); - } - - @Override - public void cleanup() throws Exception { - writer.close(); - } - - @Override - public void waitForAsync() throws Exception { - //do nothing - } - - @Override - public int query(Query query) throws Exception { - IndexSearcher searcher = searcherManager.acquire(); - try { - return searcher.count(query); - } finally { - searcherManager.release(searcher); - } - } - - }); - - } - - @Test - public void testLucene() throws Exception { - doTest("Lucene", new TestCallbacks() { - - private IndexWriter writer; - private SearcherManager searcherManager; - - @Override - public void init() throws Exception { - RAMDirectory dir = new RAMDirectory(); - IndexWriterConfig config = new IndexWriterConfig(analyzer); - writer = new IndexWriter(dir, config); - searcherManager = new SearcherManager(writer, true, null); - } - - @Override - public void addObject(String key, String text) throws Exception { - Document doc = new Document(); - doc.add(new TextField("key", key, Store.YES)); - doc.add(new TextField("text", text, Store.NO)); - writer.addDocument(doc); - } - - @Override - public void commit() throws Exception { - writer.commit(); - searcherManager.maybeRefresh(); - } - - @Override - public void cleanup() throws Exception { - writer.close(); - } - - @Override - public void waitForAsync() throws Exception { - //do nothing - } - - @Override - public int query(Query query) throws Exception { - IndexSearcher searcher = searcherManager.acquire(); - try { - return searcher.count(query); - } finally { - searcherManager.release(searcher); - } - } - - }); - - } - - private void doTest(String testName, TestCallbacks callbacks) throws Exception { - - //Create some random words. We need to be careful - //to make sure we get NUM_WORDS distinct words here - Set<String> wordSet = new HashSet<String>(); - Random rand = new Random(); - while(wordSet.size() < NUM_WORDS) { - int length = rand.nextInt(12) + 3; - char[] text = new char[length]; - for(int i = 0; i < length; i++) { - text[i] = (char) (rand.nextInt(26) + 97); - } - wordSet.add(new String(text)); - } - List<String> words = new ArrayList<String>(wordSet.size()); - words.addAll(wordSet); - - - - //warm up - writeRandomWords(callbacks, words, rand, NUM_ENTRIES / 10, NUM_QUERIES / 10, COMMIT_INTERVAL[0]); - - //Do the actual test - - for(int i = 0; i < COMMIT_INTERVAL.length; i++) { - Results results = writeRandomWords(callbacks, words, rand, NUM_ENTRIES, NUM_QUERIES / 10, COMMIT_INTERVAL[i]); - - System.out.println(testName + " writes(entries=" + NUM_ENTRIES + ", commit=" + COMMIT_INTERVAL[i] + "): " + TimeUnit.NANOSECONDS.toMillis(results.writeTime)); - System.out.println(testName + " queries(entries=" + NUM_ENTRIES + ", commit=" + COMMIT_INTERVAL[i] + "): " + TimeUnit.NANOSECONDS.toMillis(results.queryTime)); - } - } - - private Results writeRandomWords(TestCallbacks callbacks, List<String> words, - Random rand, int numEntries, int numQueries, int commitInterval) throws Exception { - Results results = new Results(); - callbacks.init(); - int[] counts = new int[words.size()]; - long start = System.nanoTime(); - try { - for(int i =0; i < numEntries; i++) { - int word1 = rand.nextInt(words.size()); - int word2 = rand.nextInt(words.size()); - counts[word1]++; - counts[word2]++; - String value = words.get(word1) + " " + words.get(word2); - callbacks.addObject("key" + i, value); - - if(i % commitInterval == 0 && i != 0) { - callbacks.commit(); - } - } - callbacks.commit(); - callbacks.waitForAsync(); - long end = System.nanoTime(); - results.writeTime = end - start; - - - start = System.nanoTime(); - for(int i=0; i < numQueries; i++) { - int wordIndex = rand.nextInt(words.size()); - String word = words.get(wordIndex); - Query query = new TermQuery(new Term("text", word)); - int size = callbacks.query(query); -// int size = callbacks.query(parser.parse(word)); - //All of my tests sometimes seem to be missing a couple of words, including the stock lucene -// assertEquals("Error on query " + i + " word=" + word, counts[wordIndex], size); - } - end = System.nanoTime(); - results.queryTime = end - start; - - return results; - } finally { - callbacks.cleanup(); - } - } - - private static class TestObject implements DataSerializable { - private String text; - - public TestObject() { - - } - - public TestObject(String text) { - super(); - this.text = text; - } - - @Override - public void toData(DataOutput out) throws IOException { - DataSerializer.writeString(text, out); - } - - @Override - public void fromData(DataInput in) - throws IOException, ClassNotFoundException { - text = DataSerializer.readString(in); - } - - @Override - public String toString() { - return text; - } - - - } - - private interface TestCallbacks { - public void init() throws Exception; - public int query(Query query) throws Exception; - public void addObject(String key, String text) throws Exception; - public void commit() throws Exception; - public void waitForAsync() throws Exception; - public void cleanup() throws Exception; - } - - private static class Results { - long writeTime; - long queryTime; - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c5efb805/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImplPerformanceTest.java ---------------------------------------------------------------------- diff --git a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImplPerformanceTest.java b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImplPerformanceTest.java new file mode 100644 index 0000000..74f3742 --- /dev/null +++ b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImplPerformanceTest.java @@ -0,0 +1,439 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.gemstone.gemfire.cache.lucene.internal.repository; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + +import org.apache.lucene.analysis.standard.StandardAnalyzer; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field.Store; +import org.apache.lucene.document.TextField; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.Term; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.SearcherManager; +import org.apache.lucene.search.TermQuery; +import org.apache.lucene.store.RAMDirectory; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.gemstone.gemfire.DataSerializable; +import com.gemstone.gemfire.DataSerializer; +import com.gemstone.gemfire.cache.Cache; +import com.gemstone.gemfire.cache.CacheFactory; +import com.gemstone.gemfire.cache.PartitionAttributesFactory; +import com.gemstone.gemfire.cache.Region; +import com.gemstone.gemfire.cache.RegionShortcut; +import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueue; +import com.gemstone.gemfire.cache.lucene.LuceneIndex; +import com.gemstone.gemfire.cache.lucene.LuceneQuery; +import com.gemstone.gemfire.cache.lucene.LuceneQueryProvider; +import com.gemstone.gemfire.cache.lucene.LuceneQueryResults; +import com.gemstone.gemfire.cache.lucene.LuceneService; +import com.gemstone.gemfire.cache.lucene.LuceneServiceProvider; +import com.gemstone.gemfire.cache.lucene.internal.LuceneServiceImpl; +import com.gemstone.gemfire.cache.lucene.internal.directory.RegionDirectory; +import com.gemstone.gemfire.cache.lucene.internal.distributed.TopEntriesCollector; +import com.gemstone.gemfire.cache.lucene.internal.filesystem.ChunkKey; +import com.gemstone.gemfire.cache.lucene.internal.filesystem.File; +import com.gemstone.gemfire.cache.lucene.internal.repository.serializer.HeterogenousLuceneSerializer; +import com.gemstone.gemfire.cache.query.QueryException; +import com.gemstone.gemfire.test.junit.categories.PerformanceTest; + +/** + * Microbenchmark of the IndexRepository to compare an + * IndexRepository built on top of cache with a + * stock lucene IndexWriter with a RAMDirectory. + */ +@Category(PerformanceTest.class) +@Ignore("Tests have no assertions") +public class IndexRepositoryImplPerformanceTest { + + private static final int NUM_WORDS = 1000; + private static int[] COMMIT_INTERVAL = new int[] {100, 1000, 5000}; + private static int NUM_ENTRIES = 500_000; + private static int NUM_QUERIES = 500_000; + + private StandardAnalyzer analyzer = new StandardAnalyzer(); + + @Test + public void testIndexRepository() throws Exception { + + + doTest("IndexRepository", new TestCallbacks() { + + private Cache cache; + private IndexRepositoryImpl repo; + private IndexWriter writer; + + @Override + public void addObject(String key, String text) throws Exception { + repo.create(key, new TestObject(text)); + } + + @Override + public void commit() throws Exception { + repo.commit(); + } + + @Override + public void init() throws Exception { + cache = new CacheFactory().set("mcast-port", "0") + .set("log-level", "error") + .create(); + Region<String, File> fileRegion = cache.<String, File>createRegionFactory(RegionShortcut.REPLICATE).create("files"); + Region<ChunkKey, byte[]> chunkRegion = cache.<ChunkKey, byte[]>createRegionFactory(RegionShortcut.REPLICATE).create("chunks"); + + RegionDirectory dir = new RegionDirectory(fileRegion, chunkRegion); + + + IndexWriterConfig config = new IndexWriterConfig(analyzer); + writer = new IndexWriter(dir, config); + String[] indexedFields= new String[] {"text"}; + HeterogenousLuceneSerializer mapper = new HeterogenousLuceneSerializer(indexedFields); + repo = new IndexRepositoryImpl(fileRegion, writer, mapper); + } + + @Override + public void cleanup() throws IOException { + writer.close(); + cache.close(); + } + + @Override + public void waitForAsync() throws Exception { + //do nothing + } + + @Override + public int query(Query query) throws IOException { + TopEntriesCollector collector = new TopEntriesCollector(); + repo.query(query, 100, collector); + return collector.size(); + } + }); + } + + /** + * Test our full lucene index implementation + * @throws Exception + */ + @Test + public void testLuceneIndex() throws Exception { + + + doTest("LuceneIndex", new TestCallbacks() { + + private Cache cache; + private Region<String, TestObject> region; + private LuceneService service; + + @Override + public void addObject(String key, String text) throws Exception { + region.create(key, new TestObject(text)); + } + + @Override + public void commit() throws Exception { + //NA + } + + @Override + public void init() throws Exception { + cache = new CacheFactory().set("mcast-port", "0") + .set("log-level", "warning") + .create(); + service = LuceneServiceProvider.get(cache); + service.createIndex("index", "/region", "text"); + region = cache.<String, TestObject>createRegionFactory(RegionShortcut.PARTITION) + .setPartitionAttributes(new PartitionAttributesFactory<>().setTotalNumBuckets(1).create()) + .create("region"); + } + + @Override + public void cleanup() throws IOException { + cache.close(); + } + + @Override + public void waitForAsync() throws Exception { + AsyncEventQueue aeq = cache.getAsyncEventQueue(LuceneServiceImpl.getUniqueIndexName("index", "/region")); + + //We will be at most 10 ms off + while(aeq.size() > 0) { + Thread.sleep(10); + } + } + + @Override + public int query(final Query query) throws Exception { + LuceneQuery<Object, Object> luceneQuery = service.createLuceneQueryFactory().create("index", "/region", new LuceneQueryProvider() { + + @Override + public Query getQuery(LuceneIndex index) throws QueryException { + return query; + } + }); + + LuceneQueryResults<Object, Object> results = luceneQuery.search(); + return results.size(); + } + }); + } + + @Test + public void testLuceneWithRegionDirectory() throws Exception { + doTest("RegionDirectory", new TestCallbacks() { + + private IndexWriter writer; + private SearcherManager searcherManager; + + @Override + public void init() throws Exception { + RegionDirectory dir = new RegionDirectory(new ConcurrentHashMap<String, File>(), new ConcurrentHashMap<ChunkKey, byte[]>()); + IndexWriterConfig config = new IndexWriterConfig(analyzer); + writer = new IndexWriter(dir, config); + searcherManager = new SearcherManager(writer, true, null); + } + + @Override + public void addObject(String key, String text) throws Exception { + Document doc = new Document(); + doc.add(new TextField("key", key, Store.YES)); + doc.add(new TextField("text", text, Store.NO)); + writer.addDocument(doc); + } + + @Override + public void commit() throws Exception { + writer.commit(); + searcherManager.maybeRefresh(); + } + + @Override + public void cleanup() throws Exception { + writer.close(); + } + + @Override + public void waitForAsync() throws Exception { + //do nothing + } + + @Override + public int query(Query query) throws Exception { + IndexSearcher searcher = searcherManager.acquire(); + try { + return searcher.count(query); + } finally { + searcherManager.release(searcher); + } + } + + }); + + } + + @Test + public void testLucene() throws Exception { + doTest("Lucene", new TestCallbacks() { + + private IndexWriter writer; + private SearcherManager searcherManager; + + @Override + public void init() throws Exception { + RAMDirectory dir = new RAMDirectory(); + IndexWriterConfig config = new IndexWriterConfig(analyzer); + writer = new IndexWriter(dir, config); + searcherManager = new SearcherManager(writer, true, null); + } + + @Override + public void addObject(String key, String text) throws Exception { + Document doc = new Document(); + doc.add(new TextField("key", key, Store.YES)); + doc.add(new TextField("text", text, Store.NO)); + writer.addDocument(doc); + } + + @Override + public void commit() throws Exception { + writer.commit(); + searcherManager.maybeRefresh(); + } + + @Override + public void cleanup() throws Exception { + writer.close(); + } + + @Override + public void waitForAsync() throws Exception { + //do nothing + } + + @Override + public int query(Query query) throws Exception { + IndexSearcher searcher = searcherManager.acquire(); + try { + return searcher.count(query); + } finally { + searcherManager.release(searcher); + } + } + + }); + + } + + private void doTest(String testName, TestCallbacks callbacks) throws Exception { + + //Create some random words. We need to be careful + //to make sure we get NUM_WORDS distinct words here + Set<String> wordSet = new HashSet<String>(); + Random rand = new Random(); + while(wordSet.size() < NUM_WORDS) { + int length = rand.nextInt(12) + 3; + char[] text = new char[length]; + for(int i = 0; i < length; i++) { + text[i] = (char) (rand.nextInt(26) + 97); + } + wordSet.add(new String(text)); + } + List<String> words = new ArrayList<String>(wordSet.size()); + words.addAll(wordSet); + + + + //warm up + writeRandomWords(callbacks, words, rand, NUM_ENTRIES / 10, NUM_QUERIES / 10, COMMIT_INTERVAL[0]); + + //Do the actual test + + for(int i = 0; i < COMMIT_INTERVAL.length; i++) { + Results results = writeRandomWords(callbacks, words, rand, NUM_ENTRIES, NUM_QUERIES / 10, COMMIT_INTERVAL[i]); + + System.out.println(testName + " writes(entries=" + NUM_ENTRIES + ", commit=" + COMMIT_INTERVAL[i] + "): " + TimeUnit.NANOSECONDS.toMillis(results.writeTime)); + System.out.println(testName + " queries(entries=" + NUM_ENTRIES + ", commit=" + COMMIT_INTERVAL[i] + "): " + TimeUnit.NANOSECONDS.toMillis(results.queryTime)); + } + } + + private Results writeRandomWords(TestCallbacks callbacks, List<String> words, + Random rand, int numEntries, int numQueries, int commitInterval) throws Exception { + Results results = new Results(); + callbacks.init(); + int[] counts = new int[words.size()]; + long start = System.nanoTime(); + try { + for(int i =0; i < numEntries; i++) { + int word1 = rand.nextInt(words.size()); + int word2 = rand.nextInt(words.size()); + counts[word1]++; + counts[word2]++; + String value = words.get(word1) + " " + words.get(word2); + callbacks.addObject("key" + i, value); + + if(i % commitInterval == 0 && i != 0) { + callbacks.commit(); + } + } + callbacks.commit(); + callbacks.waitForAsync(); + long end = System.nanoTime(); + results.writeTime = end - start; + + + start = System.nanoTime(); + for(int i=0; i < numQueries; i++) { + int wordIndex = rand.nextInt(words.size()); + String word = words.get(wordIndex); + Query query = new TermQuery(new Term("text", word)); + int size = callbacks.query(query); +// int size = callbacks.query(parser.parse(word)); + //All of my tests sometimes seem to be missing a couple of words, including the stock lucene +// assertEquals("Error on query " + i + " word=" + word, counts[wordIndex], size); + } + end = System.nanoTime(); + results.queryTime = end - start; + + return results; + } finally { + callbacks.cleanup(); + } + } + + private static class TestObject implements DataSerializable { + private String text; + + public TestObject() { + + } + + public TestObject(String text) { + super(); + this.text = text; + } + + @Override + public void toData(DataOutput out) throws IOException { + DataSerializer.writeString(text, out); + } + + @Override + public void fromData(DataInput in) + throws IOException, ClassNotFoundException { + text = DataSerializer.readString(in); + } + + @Override + public String toString() { + return text; + } + + + } + + private interface TestCallbacks { + public void init() throws Exception; + public int query(Query query) throws Exception; + public void addObject(String key, String text) throws Exception; + public void commit() throws Exception; + public void waitForAsync() throws Exception; + public void cleanup() throws Exception; + } + + private static class Results { + long writeTime; + long queryTime; + } +}