Repository: incubator-geode Updated Branches: refs/heads/develop ac3d3b4c5 -> 4ed2fd374
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4ed2fd37/geode-core/src/test/java/com/gemstone/gemfire/cache/ConnectionPoolDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/ConnectionPoolDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/ConnectionPoolDUnitTest.java new file mode 100755 index 0000000..41d48aa --- /dev/null +++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/ConnectionPoolDUnitTest.java @@ -0,0 +1,5871 @@ +/*========================================================================= + * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. + * This product is protected by U.S. and international copyright + * and intellectual property laws. Pivotal products are covered by + * one or more patents listed at http://www.pivotal.io/patents. + *========================================================================= + */ +package com.gemstone.gemfire.cache; + +import static org.junit.runners.MethodSorters.NAME_ASCENDING; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +import junit.framework.AssertionFailedError; + +import org.junit.FixMethodOrder; + +import com.gemstone.gemfire.CancelException; +import com.gemstone.gemfire.LogWriter; +import com.gemstone.gemfire.cache.client.NoAvailableServersException; +import com.gemstone.gemfire.cache.client.Pool; +import com.gemstone.gemfire.cache.client.PoolManager; +import com.gemstone.gemfire.cache.client.internal.Endpoint; +import com.gemstone.gemfire.cache.client.internal.PoolImpl; +import com.gemstone.gemfire.cache.server.CacheServer; +import com.gemstone.gemfire.cache.util.CacheListenerAdapter; +import com.gemstone.gemfire.cache30.ClientServerTestCase; +import com.gemstone.gemfire.cache30.CacheSerializableRunnable; +import com.gemstone.gemfire.cache30.CacheTestCase; +import com.gemstone.gemfire.cache30.CertifiableTestCacheListener; +import com.gemstone.gemfire.cache30.TestCacheLoader; +import com.gemstone.gemfire.cache30.TestCacheWriter; +import com.gemstone.gemfire.distributed.DistributedMember; +import com.gemstone.gemfire.distributed.internal.DistributionManager; +import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; +import com.gemstone.gemfire.internal.Assert; +import com.gemstone.gemfire.internal.cache.CacheServerImpl; +import com.gemstone.gemfire.internal.cache.EntryExpiryTask; +import com.gemstone.gemfire.internal.cache.LocalRegion; +import com.gemstone.gemfire.internal.cache.PoolStats; +import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientNotifier; +import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientNotifierStats; +import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID; +import com.gemstone.gemfire.internal.logging.InternalLogWriter; +import com.gemstone.gemfire.internal.logging.LocalLogWriter; +import com.gemstone.gemfire.test.dunit.AsyncInvocation; +import com.gemstone.gemfire.test.dunit.DistributedTestCase; +import com.gemstone.gemfire.test.dunit.Host; +import com.gemstone.gemfire.test.dunit.Invoke; +import com.gemstone.gemfire.test.dunit.NetworkUtils; +import com.gemstone.gemfire.test.dunit.SerializableRunnable; +import com.gemstone.gemfire.test.dunit.ThreadUtils; +import com.gemstone.gemfire.test.dunit.VM; +import com.gemstone.gemfire.test.dunit.Wait; +import com.gemstone.gemfire.test.dunit.WaitCriterion; + +/** + * This class tests the client connection pool in GemFire. + * It does so by creating a cache server with a cache and a pre-defined region and + * a data loader. The client creates the same region with a pool + * (this happens in the controller VM). the client then spins up + * 10 different threads and issues gets on keys. The server data loader returns the + * data to the client. + * Test uses Groboutils TestRunnable objects to achieve multi threading behavior + * in the test. + * + */ +@FixMethodOrder(NAME_ASCENDING) +public class ConnectionPoolDUnitTest extends CacheTestCase { + + private static final long serialVersionUID = 1L; + + /** The port on which the bridge server was started in this VM */ + private static int bridgeServerPort; + + protected static int port = 0; + protected static int port2 = 0; + + protected static int numberOfAfterInvalidates; + protected static int numberOfAfterCreates; + protected static int numberOfAfterUpdates; + + protected final static int TYPE_CREATE = 0; + protected final static int TYPE_UPDATE = 1; + protected final static int TYPE_INVALIDATE = 2; + protected final static int TYPE_DESTROY = 3; + + public ConnectionPoolDUnitTest(String name) { + super(name); + } + + public void setUp() throws Exception { + super.setUp(); + // avoid IllegalStateException from HandShake by connecting all vms to + // system before creating pool + getSystem(); + Invoke.invokeInEveryVM(new SerializableRunnable("getSystem") { + public void run() { + getSystem(); + } + }); + } + + @Override + protected final void postTearDownCacheTestCase() throws Exception { + Invoke.invokeInEveryVM(new SerializableRunnable() { + public void run() { + Map pools = PoolManager.getAll(); + if (!pools.isEmpty()) { + com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter().warning("found pools remaining after teardown: " + pools); + assertEquals(0, pools.size()); + } + } + }); + postTearDownConnectionPoolDUnitTest(); + } + + protected void postTearDownConnectionPoolDUnitTest() throws Exception { + } + + protected/*GemStoneAddition*/ static PoolImpl getPool(Region r) { + PoolImpl result = null; + String poolName = r.getAttributes().getPoolName(); + if (poolName != null) { + result = (PoolImpl)PoolManager.find(poolName); + } + return result; + } + protected static TestCacheWriter getTestWriter(Region r) { + return (TestCacheWriter)r.getAttributes().getCacheWriter(); + } + /** + * Create a bridge server on the given port without starting it. + * + * @since 5.0.2 + */ + protected void createBridgeServer(int port) throws IOException { + CacheServer bridge = getCache().addCacheServer(); + bridge.setPort(port); + bridge.setMaxThreads(getMaxThreads()); + bridgeServerPort = bridge.getPort(); + } + + /** + * Starts a bridge server on the given port, using the given + * deserializeValues and notifyBySubscription to serve up the + * given region. + * + * @since 4.0 + */ + protected void startBridgeServer(int port) + throws IOException { + startBridgeServer(port, -1); + } + + protected void startBridgeServer(int port, int socketBufferSize) throws IOException { + startBridgeServer(port, socketBufferSize, CacheServer.DEFAULT_LOAD_POLL_INTERVAL); + } + + protected void startBridgeServer(int port, int socketBufferSize, long loadPollInterval) + throws IOException { + + Cache cache = getCache(); + CacheServer bridge = cache.addCacheServer(); + bridge.setPort(port); + if (socketBufferSize != -1) { + bridge.setSocketBufferSize(socketBufferSize); + } + bridge.setMaxThreads(getMaxThreads()); + bridge.setLoadPollInterval(loadPollInterval); + bridge.start(); + bridgeServerPort = bridge.getPort(); + } + + /** + * By default return 0 which turns off selector and gives thread per cnx. + * Test subclasses can override to run with selector. + * @since 5.1 + */ + protected int getMaxThreads() { + return 0; + } + + /** + * Stops the bridge server that serves up the given cache. + * + * @since 4.0 + */ + void stopBridgeServer(Cache cache) { + CacheServer bridge = + cache.getCacheServers().iterator().next(); + bridge.stop(); + assertFalse(bridge.isRunning()); + } + + void stopBridgeServers(Cache cache) { + CacheServer bridge = null; + for (Iterator bsI = cache.getCacheServers().iterator();bsI.hasNext(); ) { + bridge = (CacheServer) bsI.next(); + bridge.stop(); + assertFalse(bridge.isRunning()); + } + } + + private void restartBridgeServers(Cache cache) throws IOException + { + CacheServer bridge = null; + for (Iterator bsI = cache.getCacheServers().iterator();bsI.hasNext(); ) { + bridge = (CacheServer) bsI.next(); + bridge.start(); + assertTrue(bridge.isRunning()); + } + } + + protected InternalDistributedSystem createLonerDS() { + disconnectFromDS(); + InternalDistributedSystem ds = getLonerSystem(); + assertEquals(0, ds.getDistributionManager().getOtherDistributionManagerIds().size()); + return ds; + } + + + + /** + * Returns region attributes for a <code>LOCAL</code> region + */ + protected RegionAttributes getRegionAttributes() { + AttributesFactory factory = new AttributesFactory(); + factory.setScope(Scope.LOCAL); + factory.setConcurrencyChecksEnabled(false); // test validation expects this behavior + return factory.create(); + } + + private static String createBridgeClientConnection(String host, int[] ports) { + StringBuffer sb = new StringBuffer(); + for (int i = 0; i < ports.length; i++) { + if (i > 0) sb.append(","); + sb.append("name" + i + "="); + sb.append(host + ":" + ports[i]); + } + return sb.toString(); + } + + private class EventWrapper { + public final EntryEvent event; + public final Object key; + public final Object val; + public final Object arg; + public final int type; + public EventWrapper(EntryEvent ee, int type) { + this.event = ee; + this.key = ee.getKey(); + this.val = ee.getNewValue(); + this.arg = ee.getCallbackArgument(); + this.type = type; + } + public String toString() { + return "EventWrapper: event=" + event + ", type=" + type; + } + } + + protected class ControlListener extends CacheListenerAdapter { + public final LinkedList events = new LinkedList(); + public final Object CONTROL_LOCK = new Object(); + + public boolean waitWhileNotEnoughEvents(long sleepMs, int eventCount) { + long maxMillis = System.currentTimeMillis() + sleepMs; + synchronized (this.CONTROL_LOCK) { + try { + while (this.events.size() < eventCount) { + long waitMillis = maxMillis - System.currentTimeMillis(); + if (waitMillis < 10) { + break; + } + this.CONTROL_LOCK.wait(waitMillis); + } + } catch (InterruptedException abort) { + fail("interrupted"); + } + return !this.events.isEmpty(); + } + } + + public void afterCreate(EntryEvent e) { + //System.out.println("afterCreate: " + e); + synchronized(this.CONTROL_LOCK) { + this.events.add(new EventWrapper(e, TYPE_CREATE)); + this.CONTROL_LOCK.notifyAll(); + } + } + + public void afterUpdate(EntryEvent e) { + //System.out.println("afterUpdate: " + e); + synchronized(this.CONTROL_LOCK) { + this.events.add(new EventWrapper(e, TYPE_UPDATE)); + this.CONTROL_LOCK.notifyAll(); + } + } + + public void afterInvalidate(EntryEvent e) { + //System.out.println("afterInvalidate: " + e); + synchronized(this.CONTROL_LOCK) { + this.events.add(new EventWrapper(e, TYPE_INVALIDATE)); + this.CONTROL_LOCK.notifyAll(); + } + } + + public void afterDestroy(EntryEvent e) { + //System.out.println("afterDestroy: " + e); + synchronized(this.CONTROL_LOCK) { + this.events.add(new EventWrapper(e, TYPE_DESTROY)); + this.CONTROL_LOCK.notifyAll(); + } + } + } + + + + + /** + * Create a fake EntryEvent that returns the provided region for {@link CacheEvent#getRegion()} + * and returns {@link com.gemstone.gemfire.cache.Operation#LOCAL_LOAD_CREATE} for {@link CacheEvent#getOperation()} + * @param r + * @return fake entry event + */ + protected static EntryEvent createFakeyEntryEvent(final Region r) { + return new EntryEvent() { + public Operation getOperation() + { + return Operation.LOCAL_LOAD_CREATE; // fake out pool to exit early + } + public Region getRegion() + { + return r; + } + public Object getKey() { return null; } + public Object getOldValue() { return null;} + public boolean isOldValueAvailable() {return true;} + public Object getNewValue() { return null;} + public boolean isLocalLoad() { return false;} + public boolean isNetLoad() {return false;} + public boolean isLoad() {return true; } + public boolean isNetSearch() {return false;} + public TransactionId getTransactionId() {return null;} + public Object getCallbackArgument() {return null;} + public boolean isCallbackArgumentAvailable() {return true;} + public boolean isOriginRemote() {return false;} + public DistributedMember getDistributedMember() {return null;} + public boolean isExpiration() { return false;} + public boolean isDistributed() { return false;} + public boolean isBridgeEvent() { + return hasClientOrigin(); + } + public boolean hasClientOrigin() { + return false; + } + public ClientProxyMembershipID getContext() { + return null; + } + public SerializedCacheValue getSerializedOldValue() {return null;} + public SerializedCacheValue getSerializedNewValue() {return null;} + }; + } + + public void verifyBalanced(final PoolImpl pool, int expectedServer, + final int expectedConsPerServer) { + verifyServerCount(pool, expectedServer); + WaitCriterion ev = new WaitCriterion() { + public boolean done() { + return balanced(pool, expectedConsPerServer); + } + public String description() { + return "expected " + expectedConsPerServer + + " but endpoints=" + outOfBalanceReport(pool); + } + }; + Wait.waitForCriterion(ev, 2 * 60 * 1000, 200, true); + assertEquals("expected " + expectedConsPerServer + + " but endpoints=" + outOfBalanceReport(pool), + true, balanced(pool, expectedConsPerServer)); + } + protected boolean balanced(PoolImpl pool, int expectedConsPerServer) { + Iterator it = pool.getEndpointMap().values().iterator(); + while (it.hasNext()) { + Endpoint ep = (Endpoint)it.next(); + if (ep.getStats().getConnections() != expectedConsPerServer) { + return false; + } + } + return true; + } + + protected String outOfBalanceReport(PoolImpl pool) { + StringBuffer result = new StringBuffer(); + Iterator it = pool.getEndpointMap().values().iterator(); + result.append("<"); + while (it.hasNext()) { + Endpoint ep = (Endpoint)it.next(); + result.append("ep=" + ep); + result.append(" conCount=" + ep.getStats().getConnections()); + if (it.hasNext()) { + result.append(", "); + } + } + result.append(">"); + return result.toString(); + } + + public void waitForBlacklistToClear(final PoolImpl pool) { + WaitCriterion ev = new WaitCriterion() { + public boolean done() { + return pool.getBlacklistedServers().size() == 0; + } + public String description() { + return null; + } + }; + Wait.waitForCriterion(ev, 30 * 1000, 200, true); + assertEquals("unexpected blacklistedServers=" + pool.getBlacklistedServers(), + 0, pool.getBlacklistedServers().size()); + } + + public void verifyServerCount(final PoolImpl pool, final int expectedCount) { + getCache().getLogger().info("verifyServerCount expects=" + expectedCount); + WaitCriterion ev = new WaitCriterion() { + String excuse; + public boolean done() { + int actual = pool.getConnectedServerCount(); + if (actual == expectedCount) { + return true; + } + excuse = "Found only " + actual + " servers, expected " + expectedCount; + return false; + } + public String description() { + return excuse; + } + }; + Wait.waitForCriterion(ev, 5 * 60 * 1000, 200, true); + } + + /** + * Tests that the callback argument is sent to the server + */ + public void test001CallbackArg() throws CacheException { + final String name = this.getName(); + final Host host = Host.getHost(0); + VM vm0 = host.getVM(0); + VM vm1 = host.getVM(1); + + final Object createCallbackArg = "CREATE CALLBACK ARG"; + final Object updateCallbackArg = "PUT CALLBACK ARG"; + + vm0.invoke(new CacheSerializableRunnable("Create Cache Server") { + public void run2() throws CacheException { + + CacheWriter cw = new TestCacheWriter() { + public final void beforeUpdate2(EntryEvent event) + throws CacheWriterException { + Object beca = event.getCallbackArgument(); + assertEquals(updateCallbackArg, beca); + } + + public void beforeCreate2(EntryEvent event) + throws CacheWriterException { + Object beca = event.getCallbackArgument(); + assertEquals(createCallbackArg, beca); + } + }; + AttributesFactory factory = getBridgeServerRegionAttributes(null, cw); + createRegion(name, factory.create()); + //pause(1000); + try { + startBridgeServer(0); + + } catch (Exception ex) { + com.gemstone.gemfire.test.dunit.Assert.fail("While starting CacheServer", ex); + } + + } + }); + final int port = + vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort()); + final String host0 = NetworkUtils.getServerHostName(vm0.getHost()); + + + SerializableRunnable create = + new CacheSerializableRunnable("Create region") { + public void run2() throws CacheException { + getLonerSystem(); + getCache(); + AttributesFactory factory = new AttributesFactory(); + factory.setScope(Scope.LOCAL); + factory.setConcurrencyChecksEnabled(false); + + ClientServerTestCase.configureConnectionPool(factory,NetworkUtils.getServerHostName(host),port,-1,true,-1,-1, null); + createRegion(name, factory.create()); + } + }; + + vm1.invoke(create); + vm1.invoke(new CacheSerializableRunnable("Add entries") { + public void run2() throws CacheException { + Region region = getRootRegion().getSubregion(name); + for (int i = 0; i < 10; i++) { + region.create(new Integer(i), "old" + i, createCallbackArg); + } + for (int i = 0; i < 10; i++) { + region.put(new Integer(i), "new" + i, updateCallbackArg); + } + } + }); + + vm0.invoke(new CacheSerializableRunnable("Check cache writer") { + public void run2() throws CacheException { + Region region = getRootRegion().getSubregion(name); + TestCacheWriter writer = getTestWriter(region); + assertTrue(writer.wasInvoked()); + } + }); + + SerializableRunnable close = + new CacheSerializableRunnable("Close Pool") { + public void run2() throws CacheException { + Region region = getRootRegion().getSubregion(name); + region.localDestroyRegion(); + } + }; + + vm1.invoke(close); + + vm0.invoke(new SerializableRunnable("Stop CacheServer") { + public void run() { + stopBridgeServer(getCache()); + } + }); + + } + + /** + * Tests that consecutive puts have the callback assigned + * appropriately. + */ + public void test002CallbackArg2() throws CacheException { + final String name = this.getName(); + final Host host = Host.getHost(0); + VM vm0 = host.getVM(0); + VM vm1 = host.getVM(1); + + final Object createCallbackArg = "CREATE CALLBACK ARG"; +// final Object updateCallbackArg = "PUT CALLBACK ARG"; + + vm0.invoke(new CacheSerializableRunnable("Create Cache Server") { + public void run2() throws CacheException { + CacheWriter cw = new TestCacheWriter() { + public void beforeCreate2(EntryEvent event) + throws CacheWriterException { + Integer key = (Integer) event.getKey(); + if (key.intValue() % 2 == 0) { + Object beca = event.getCallbackArgument(); + assertEquals(createCallbackArg, beca); + } else { + Object beca = event.getCallbackArgument(); + assertNull(beca); + } + } + }; + AttributesFactory factory = getBridgeServerRegionAttributes(null, cw); + createRegion(name, factory.create()); + //pause(1000); + try { + startBridgeServer(0); + + } catch (Exception ex) { + com.gemstone.gemfire.test.dunit.Assert.fail("While starting CacheServer", ex); + } + + } + }); + final int port = + vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort()); + final String host0 = NetworkUtils.getServerHostName(vm0.getHost()); + + SerializableRunnable create = + new CacheSerializableRunnable("Create region") { + public void run2() throws CacheException { + getLonerSystem(); + getCache(); + AttributesFactory factory = new AttributesFactory(); + factory.setScope(Scope.LOCAL); + factory.setConcurrencyChecksEnabled(false); + ClientServerTestCase.configureConnectionPool(factory,host0,port,-1,true,-1,-1, null); + createRegion(name, factory.create()); + } + }; + + vm1.invoke(create); + vm1.invoke(new CacheSerializableRunnable("Add entries") { + public void run2() throws CacheException { + Region region = getRootRegion().getSubregion(name); + for (int i = 0; i < 10; i++) { + if (i % 2 == 0) { + region.create(new Integer(i), "old" + i, createCallbackArg); + + } else { + region.create(new Integer(i), "old" + i); + } + } + } + }); + + SerializableRunnable close = + new CacheSerializableRunnable("Close Pool") { + public void run2() throws CacheException { + Region region = getRootRegion().getSubregion(name); + region.localDestroyRegion(); + } + }; + + vm1.invoke(close); + + vm0.invoke(new CacheSerializableRunnable("Check cache writer") { + public void run2() throws CacheException { + Region region = getRootRegion().getSubregion(name); + TestCacheWriter writer = getTestWriter(region); + assertTrue(writer.wasInvoked()); + } + }); + + vm0.invoke(new SerializableRunnable("Stop CacheServer") { + public void run() { + stopBridgeServer(getCache()); + } + }); + } + + + /** + * Tests for bug 36684 by having two bridge servers with cacheloaders that should always return + * a value and one client connected to each server reading values. If the bug exists, the + * clients will get null sometimes. + * @throws InterruptedException + */ + public void test003Bug36684() throws CacheException, InterruptedException { + final String name = this.getName(); + final Host host = Host.getHost(0); + VM vm0 = host.getVM(0); + VM vm1 = host.getVM(1); + VM vm2 = host.getVM(2); + VM vm3 = host.getVM(3); + + // Create the cache servers with distributed, mirrored region + SerializableRunnable createServer = + new CacheSerializableRunnable("Create Cache Server") { + public void run2() throws CacheException { + CacheLoader cl = new CacheLoader() { + public Object load(LoaderHelper helper) { + return helper.getKey(); + } + public void close() { + + } + }; + AttributesFactory factory = getBridgeServerMirroredAckRegionAttributes(cl, null); + createRegion(name, factory.create()); + //pause(1000); + try { + startBridgeServer(0); + } catch (Exception ex) { + com.gemstone.gemfire.test.dunit.Assert.fail("While starting CacheServer", ex); + } + + } + }; + getSystem().getLogWriter().info("before create server"); + vm0.invoke(createServer); + vm1.invoke(createServer); + + // Create cache server clients + final int numberOfKeys = 1000; + final String host0 = NetworkUtils.getServerHostName(host); + final int vm0Port = vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort()); + final int vm1Port = vm1.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort()); + SerializableRunnable createClient = + new CacheSerializableRunnable("Create Cache Server Client") { + public void run2() throws CacheException { + // reset all static listener variables in case this is being rerun in a subclass + numberOfAfterInvalidates = 0; + numberOfAfterCreates = 0; + numberOfAfterUpdates = 0; + // create the region + getLonerSystem(); + AttributesFactory factory = new AttributesFactory(); + factory.setScope(Scope.LOCAL); + factory.setConcurrencyChecksEnabled(false); // test validation expects this behavior + // create bridge writer + ClientServerTestCase.configureConnectionPool(factory,host0,vm0Port,vm1Port,true,-1,-1, null); + createRegion(name, factory.create()); + } + }; + getSystem().getLogWriter().info("before create client"); + vm2.invoke(createClient); + vm3.invoke(createClient); + + // Initialize each client with entries (so that afterInvalidate is called) + SerializableRunnable initializeClient = + new CacheSerializableRunnable("Initialize Client") { + public void run2() throws CacheException { +// StringBuffer errors = new StringBuffer(); + numberOfAfterInvalidates = 0; + numberOfAfterCreates = 0; + numberOfAfterUpdates = 0; + LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name); + for (int i=0; i<numberOfKeys; i++) { + String expected = "key-"+i; + String actual = (String) region.get("key-"+i); + assertEquals(expected, actual); + } + } + }; + + getSystem().getLogWriter().info("before initialize client"); + AsyncInvocation inv2 = vm2.invokeAsync(initializeClient); + AsyncInvocation inv3 = vm3.invokeAsync(initializeClient); + + ThreadUtils.join(inv2, 30 * 1000); + ThreadUtils.join(inv3, 30 * 1000); + + if (inv2.exceptionOccurred()) { + com.gemstone.gemfire.test.dunit.Assert.fail("Error occured in vm2", inv2.getException()); + } + if(inv3.exceptionOccurred()) { + com.gemstone.gemfire.test.dunit.Assert.fail("Error occured in vm3", inv3.getException()); + } + } + + + /** + * Test for client connection loss with CacheLoader Exception on the server. + */ + public void test004ForCacheLoaderException() throws CacheException, InterruptedException { + final String name = this.getName(); + final Host host = Host.getHost(0); + VM server = host.getVM(0); + VM client = host.getVM(1); + + // Create the cache servers with distributed, mirrored region + SerializableRunnable createServer = + new CacheSerializableRunnable("Create Cache Server") { + public void run2() throws CacheException { + CacheLoader cl = new CacheLoader() { + public Object load(LoaderHelper helper) { + System.out.println("### CALLING CACHE LOADER...."); + throw new CacheLoaderException("Test for CahceLoaderException causing Client connection to disconnect."); + } + public void close() { + } + }; + AttributesFactory factory = getBridgeServerMirroredAckRegionAttributes(cl, null); + createRegion(name, factory.create()); + //pause(1000); + try { + startBridgeServer(0); + } catch (Exception ex) { + com.gemstone.gemfire.test.dunit.Assert.fail("While starting CacheServer", ex); + } + + } + }; + getSystem().getLogWriter().info("before create server"); + + server.invoke(createServer); + + // Create cache server clients + final int numberOfKeys = 10; + final String host0 = NetworkUtils.getServerHostName(host); + final int[] port = new int[] {server.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort())}; + final String poolName = "myPool"; + + SerializableRunnable createClient = + new CacheSerializableRunnable("Create Cache Server Client") { + public void run2() throws CacheException { + getLonerSystem(); + AttributesFactory factory = new AttributesFactory(); + factory.setScope(Scope.LOCAL); + factory.setConcurrencyChecksEnabled(false); + // create bridge writer + ClientServerTestCase.configureConnectionPoolWithName(factory,host0,port,true,-1, -1, null, poolName); + createRegion(name, factory.create()); + } + }; + getSystem().getLogWriter().info("before create client"); + client.invoke(createClient); + + // Initialize each client with entries (so that afterInvalidate is called) + SerializableRunnable invokeServerCacheLaoder = + new CacheSerializableRunnable("Initialize Client") { + public void run2() throws CacheException { + LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name); + PoolStats stats = ((PoolImpl)PoolManager.find(poolName)).getStats(); + int oldConnects = stats.getConnects(); + int oldDisConnects = stats.getDisConnects(); + try { + for (int i=0; i<numberOfKeys; i++) { + String actual = (String) region.get("key-"+i); + } + } catch (Exception ex){ + if (!(ex.getCause() instanceof CacheLoaderException)) { + fail ("UnExpected Exception, expected to receive CacheLoaderException from server, instead found: " + ex.getCause().getClass()); + } + } + int newConnects = stats.getConnects(); + int newDisConnects = stats.getDisConnects(); + //System.out.println("#### new connects/disconnects :" + newConnects + ":" + newDisConnects); + if (newConnects != oldConnects && newDisConnects != oldDisConnects) { + fail ("New connection has created for Server side CacheLoaderException."); + } + } + }; + + getSystem().getLogWriter().info("before initialize client"); + AsyncInvocation inv2 = client.invokeAsync(invokeServerCacheLaoder); + + ThreadUtils.join(inv2, 30 * 1000); + SerializableRunnable stopServer = new SerializableRunnable("stop CacheServer") { + public void run() { + stopBridgeServer(getCache()); + } + }; + server.invoke(stopServer); + + } + + + protected void validateDS() { + List l = InternalDistributedSystem.getExistingSystems(); + if (l.size() > 1) { + getSystem().getLogWriter().info("validateDS: size=" + + l.size() + + " isDedicatedAdminVM=" + + DistributionManager.isDedicatedAdminVM + + " l=" + l); + } + assertFalse(DistributionManager.isDedicatedAdminVM); + assertEquals(1, l.size()); + } + + + /** + * Tests the basic operations of the {@link Pool} + * + * @since 3.5 + */ + public void test006Pool() throws CacheException { + final String name = this.getName(); + final Host host = Host.getHost(0); + VM vm0 = host.getVM(0); + VM vm1 = host.getVM(1); + VM vm2 = Host.getHost(0).getVM(2); + + vm0.invoke(new CacheSerializableRunnable("Create Cache Server") { + public void run2() throws CacheException { + AttributesFactory factory = new AttributesFactory(); + factory.setScope(Scope.DISTRIBUTED_ACK); + factory.setConcurrencyChecksEnabled(false); + factory.setCacheLoader(new CacheLoader() { + public Object load(LoaderHelper helper) { + //System.err.println("CacheServer data loader called"); + return helper.getKey().toString(); + } + public void close() { + + } + }); + createRegion(name, factory.create()); + //pause(1000); + try { + startBridgeServer(0); + } catch (Exception ex) { + com.gemstone.gemfire.test.dunit.Assert.fail("While starting CacheServer", ex); + } + + } + }); + final int port = + vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort()); + final String host0 = NetworkUtils.getServerHostName(vm0.getHost()); + SerializableRunnable create = + new CacheSerializableRunnable("Create region") { + public void run2() throws CacheException { + getLonerSystem(); + getCache(); + validateDS(); + AttributesFactory factory = new AttributesFactory(); + factory.setScope(Scope.LOCAL); + factory.setConcurrencyChecksEnabled(false); + ClientServerTestCase.configureConnectionPool(factory,host0,port,-1,true,-1,-1, null); + createRegion(name, factory.create()); + } + }; + vm1.invoke(create); + + vm1.invoke(new CacheSerializableRunnable("Get values") { + public void run2() throws CacheException { + Region region = getRootRegion().getSubregion(name); + for (int i = 0; i < 10; i++) { + Object value = region.get(new Integer(i)); + assertEquals(String.valueOf(i), value); + } + } + }); + + vm1.invoke(new CacheSerializableRunnable("Update values") { + public void run2() throws CacheException { + Region region = getRootRegion().getSubregion(name); + + for (int i = 0; i < 10; i++) { + region.put(new Integer(i), new Integer(i)); + } + } + }); + + vm2.invoke(create); + vm2.invoke(new CacheSerializableRunnable("Validate values") { + public void run2() throws CacheException { + Region region = getRootRegion().getSubregion(name); + for (int i = 0; i < 10; i++) { + Object value = region.get(new Integer(i)); + assertNotNull(value); + assertTrue(value instanceof Integer); + assertEquals(i, ((Integer) value).intValue()); + } + } + }); + + vm1.invoke(new CacheSerializableRunnable("Close Pool") { + // do some special close validation here + public void run2() throws CacheException { + Region region = getRootRegion().getSubregion(name); + String pName = region.getAttributes().getPoolName(); + PoolImpl p = (PoolImpl)PoolManager.find(pName); + assertEquals(false, p.isDestroyed()); + assertEquals(1, p.getAttachCount()); + try { + p.destroy(); + fail("expected IllegalStateException"); + } catch (IllegalStateException expected) { + } + region.localDestroyRegion(); + assertEquals(false, p.isDestroyed()); + assertEquals(0, p.getAttachCount()); + } + }); + + vm0.invoke(new SerializableRunnable("Stop CacheServer") { + public void run() { + stopBridgeServer(getCache()); + } + }); + } + + + + + /** + * Tests the BridgeServer failover (bug 31832). + */ + public void test007BridgeServerFailoverCnx1() throws CacheException { + disconnectAllFromDS(); + basicTestBridgeServerFailover(1); + } + /** + * Test BridgeServer failover with connectionsPerServer set to 0 + */ + public void test008BridgeServerFailoverCnx0() throws CacheException { + basicTestBridgeServerFailover(0); + } + private void basicTestBridgeServerFailover(final int cnxCount) throws CacheException { + final String name = this.getName(); + final Host host = Host.getHost(0); + VM vm0 = host.getVM(0); + VM vm1 = host.getVM(1); + VM vm2 = host.getVM(2); + + // Create two bridge servers + SerializableRunnable createCacheServer = + new CacheSerializableRunnable("Create Cache Server") { + public void run2() throws CacheException { + AttributesFactory factory = getBridgeServerRegionAttributes(null, null); + createRegion(name, factory.create()); + //pause(1000); + try { + startBridgeServer(0); + + } catch (Exception ex) { + com.gemstone.gemfire.test.dunit.Assert.fail("While starting CacheServer", ex); + } + + } + }; + + vm0.invoke(createCacheServer); + vm1.invoke(createCacheServer); + + final int port0 = + vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort()); + final String host0 = NetworkUtils.getServerHostName(vm0.getHost()); + + final int port1 = + vm1.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort()); +// final String host1 = getServerHostName(vm1.getHost()); + + // Create one bridge client in this VM + SerializableRunnable create = + new CacheSerializableRunnable("Create region") { + public void run2() throws CacheException { + getLonerSystem(); + getCache(); + AttributesFactory factory = new AttributesFactory(); + factory.setScope(Scope.LOCAL); + factory.setConcurrencyChecksEnabled(false); + ClientServerTestCase.configureConnectionPool(factory,host0,port0,port1,true,-1,cnxCount, null, 100); + + Region region = createRegion(name, factory.create()); + + // force connections to form + region.put("keyInit", new Integer(0)); + region.put("keyInit2", new Integer(0)); + } + }; + + vm2.invoke(create); + + // Launch async thread that puts objects into cache. This thread will execute until + // the test has ended (which is why the RegionDestroyedException and CacheClosedException + // are caught and ignored. If any other exception occurs, the test will fail. See + // the putAI.exceptionOccurred() assertion below. + AsyncInvocation putAI = vm2.invokeAsync(new CacheSerializableRunnable("Put objects") { + public void run2() throws CacheException { + Region region = getRootRegion().getSubregion(name); + try { + for (int i=0; i<100000; i++) { + region.put("keyAI", new Integer(i)); + try {Thread.sleep(100);} catch (InterruptedException ie) { + fail("interrupted"); + } + } + } catch (NoAvailableServersException ignore) { + /*ignore*/ + } catch (RegionDestroyedException e) { //will be thrown when the test ends + /*ignore*/ + } + catch (CancelException e) { //will be thrown when the test ends + /*ignore*/ + } + } + }); + + + SerializableRunnable verify1Server = + new CacheSerializableRunnable("verify1Server") { + public void run2() throws CacheException { + Region region = getRootRegion().getSubregion(name); + PoolImpl pool = getPool(region); + verifyServerCount(pool, 1); + } + }; + SerializableRunnable verify2Servers = + new CacheSerializableRunnable("verify2Servers") { + public void run2() throws CacheException { + Region region = getRootRegion().getSubregion(name); + PoolImpl pool = getPool(region); + verifyServerCount(pool, 2); + } + }; + + vm2.invoke(verify2Servers); + + SerializableRunnable stopCacheServer = + new SerializableRunnable("Stop CacheServer") { + public void run() { + stopBridgeServer(getCache()); + } + }; + + final String expected = "java.io.IOException"; + final String addExpected = + "<ExpectedException action=add>" + expected + "</ExpectedException>"; + final String removeExpected = + "<ExpectedException action=remove>" + expected + "</ExpectedException>"; + + vm2.invoke(new SerializableRunnable() { + public void run() { + LogWriter bgexecLogger = + new LocalLogWriter(InternalLogWriter.ALL_LEVEL, System.out); + bgexecLogger.info(addExpected); + } + }); + try { // make sure we removeExpected + + // Bounce the non-current server (I know that VM1 contains the non-current server + // because ... + vm1.invoke(stopCacheServer); + + vm2.invoke(verify1Server); + + final int restartPort = port1; + vm1.invoke( + new SerializableRunnable("Restart CacheServer") { + public void run() { + try { + Region region = getRootRegion().getSubregion(name); + assertNotNull(region); + startBridgeServer(restartPort); + } + catch(Exception e) { + getSystem().getLogWriter().fine(new Exception(e)); + com.gemstone.gemfire.test.dunit.Assert.fail("Failed to start CacheServer", e); + } + } + }); + + // Pause long enough for the monitor to realize the server has been bounced + // and reconnect to it. + vm2.invoke(verify2Servers); + + } finally { + vm2.invoke(new SerializableRunnable() { + public void run() { + LogWriter bgexecLogger = + new LocalLogWriter(InternalLogWriter.ALL_LEVEL, System.out); + bgexecLogger.info(removeExpected); + } + }); + } + + // Stop the other cache server + vm0.invoke(stopCacheServer); + + // Run awhile + vm2.invoke(verify1Server); + + com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter().info("FIXME: this thread does not terminate"); // FIXME +// // Verify that no exception has occurred in the putter thread +// DistributedTestCase.join(putAI, 5 * 60 * 1000, getLogWriter()); +// //assertTrue("Exception occurred while invoking " + putAI, !putAI.exceptionOccurred()); +// if (putAI.exceptionOccurred()) { +// fail("While putting entries: ", putAI.getException()); +// } + + // Close Pool + vm2.invoke(new CacheSerializableRunnable("Close Pool") { + public void run2() throws CacheException { + Region region = getRootRegion().getSubregion(name); + region.localDestroyRegion(); + } + }); + + // Stop the last cache server + vm1.invoke(stopCacheServer); + } + + + /** + * Make sure cnx lifetime expiration working on thread local cnxs. + * @author darrel + */ + public void test009LifetimeExpireOnTL() throws CacheException { + basicTestLifetimeExpire(true); + } + + /** + * Make sure cnx lifetime expiration working on thread local cnxs. + * @author darrel + */ + public void test010LifetimeExpireOnPoolCnx() throws CacheException { + basicTestLifetimeExpire(false); + } + + protected static volatile boolean stopTestLifetimeExpire = false; + + protected static volatile int baselineLifetimeCheck; + protected static volatile int baselineLifetimeExtensions; + protected static volatile int baselineLifetimeConnect; + protected static volatile int baselineLifetimeDisconnect; + + private void basicTestLifetimeExpire(final boolean threadLocal) throws CacheException { + final String name = this.getName(); + final Host host = Host.getHost(0); + VM vm0 = host.getVM(0); + VM vm1 = host.getVM(1); + VM vm2 = host.getVM(2); + + AsyncInvocation putAI = null; + AsyncInvocation putAI2 = null; + + try { + + // Create two bridge servers + SerializableRunnable createCacheServer = + new CacheSerializableRunnable("Create Cache Server") { + public void run2() throws CacheException { + AttributesFactory factory = getBridgeServerRegionAttributes(null, null); + factory.setCacheListener(new DelayListener(25)); + createRegion(name, factory.create()); + try { + startBridgeServer(0); + + } catch (Exception ex) { + com.gemstone.gemfire.test.dunit.Assert.fail("While starting CacheServer", ex); + } + + } + }; + + vm0.invoke(createCacheServer); + + final int port0 = + vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort()); + final String host0 = NetworkUtils.getServerHostName(vm0.getHost()); + vm1.invoke(createCacheServer); + final int port1 = + vm1.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort()); + SerializableRunnable stopCacheServer = + new SerializableRunnable("Stop CacheServer") { + public void run() { + stopBridgeServer(getCache()); + } + }; + // we only had to stop it to reserve a port + vm1.invoke(stopCacheServer); + + + // Create one bridge client in this VM + SerializableRunnable create = + new CacheSerializableRunnable("Create region") { + public void run2() throws CacheException { + getLonerSystem(); + getCache(); + AttributesFactory factory = new AttributesFactory(); + factory.setScope(Scope.LOCAL); + factory.setConcurrencyChecksEnabled(false); + ClientServerTestCase.configureConnectionPool(factory,host0,port0,port1,false/*queue*/,-1,0, null, 100, 500, threadLocal, 500); + + Region region = createRegion(name, factory.create()); + + // force connections to form + region.put("keyInit", new Integer(0)); + region.put("keyInit2", new Integer(0)); + } + }; + + vm2.invoke(create); + + // Launch async thread that puts objects into cache. This thread will execute until + // the test has ended. + SerializableRunnable putter1 = + new CacheSerializableRunnable("Put objects") { + public void run2() throws CacheException { + Region region = getRootRegion().getSubregion(name); + PoolImpl pool = getPool(region); + PoolStats stats = pool.getStats(); + baselineLifetimeCheck = stats.getLoadConditioningCheck(); + baselineLifetimeExtensions = stats.getLoadConditioningExtensions(); + baselineLifetimeConnect = stats.getLoadConditioningConnect(); + baselineLifetimeDisconnect = stats.getLoadConditioningDisconnect(); + try { + int count = 0; + while (!stopTestLifetimeExpire) { + count++; + region.put("keyAI1", new Integer(count)); + } + } catch (NoAvailableServersException ex) { + if (stopTestLifetimeExpire) { + return; + } else { + throw ex; + } + // } catch (RegionDestroyedException e) { //will be thrown when the test ends + // /*ignore*/ + // } catch (CancelException e) { //will be thrown when the test ends + // /*ignore*/ + } + } + }; + SerializableRunnable putter2 = + new CacheSerializableRunnable("Put objects") { + public void run2() throws CacheException { + Region region = getRootRegion().getSubregion(name); + try { + int count = 0; + while (!stopTestLifetimeExpire) { + count++; + region.put("keyAI2", new Integer(count)); + } + } catch (NoAvailableServersException ex) { + if (stopTestLifetimeExpire) { + return; + } else { + throw ex; + } + // } catch (RegionDestroyedException e) { //will be thrown when the test ends + // /*ignore*/ + // } catch (CancelException e) { //will be thrown when the test ends + // /*ignore*/ + } + } + }; + putAI = vm2.invokeAsync(putter1); + putAI2 = vm2.invokeAsync(putter2); + + SerializableRunnable verify1Server = + new CacheSerializableRunnable("verify1Server") { + public void run2() throws CacheException { + Region region = getRootRegion().getSubregion(name); + PoolImpl pool = getPool(region); + final PoolStats stats = pool.getStats(); + verifyServerCount(pool, 1); + WaitCriterion ev = new WaitCriterion() { + public boolean done() { + return stats.getLoadConditioningCheck() >= (10 + baselineLifetimeCheck); + } + public String description() { + return null; + } + }; + Wait.waitForCriterion(ev, 30 * 1000, 200, true); + + // make sure no replacements are happening. + // since we have 2 threads and 2 cnxs and 1 server + // when lifetimes are up we should only want to connect back to the + // server we are already connected to and thus just extend our lifetime + assertTrue("baselineLifetimeCheck=" + baselineLifetimeCheck + + " but stats.getLoadConditioningCheck()=" + stats.getLoadConditioningCheck(), + stats.getLoadConditioningCheck() >= (10+baselineLifetimeCheck)); + baselineLifetimeCheck = stats.getLoadConditioningCheck(); + assertTrue(stats.getLoadConditioningExtensions() > baselineLifetimeExtensions); + assertTrue(stats.getLoadConditioningConnect() == baselineLifetimeConnect); + assertTrue(stats.getLoadConditioningDisconnect() == baselineLifetimeDisconnect); + } + }; + SerializableRunnable verify2Servers = + new CacheSerializableRunnable("verify2Servers") { + public void run2() throws CacheException { + Region region = getRootRegion().getSubregion(name); + PoolImpl pool = getPool(region); + final PoolStats stats = pool.getStats(); + verifyServerCount(pool, 2); + // make sure some replacements are happening. + // since we have 2 threads and 2 cnxs and 2 servers + // when lifetimes are up we should connect to the other server sometimes. +// int retry = 300; +// while ((retry-- > 0) +// && (stats.getLoadConditioningCheck() < (10+baselineLifetimeCheck))) { +// pause(100); +// } +// assertTrue("Bug 39209 expected " +// + stats.getLoadConditioningCheck() +// + " to be >= " +// + (10+baselineLifetimeCheck), +// stats.getLoadConditioningCheck() >= (10+baselineLifetimeCheck)); + + // TODO: does this WaitCriterion actually help? + WaitCriterion wc = new WaitCriterion() { + String excuse; + public boolean done() { + int actual = stats.getLoadConditioningCheck(); + int expected = 10 + baselineLifetimeCheck; + if (actual >= expected) { + return true; + } + excuse = "Bug 39209 expected " + actual + " to be >= " + expected; + return false; + } + public String description() { + return excuse; + } + }; + try { + Wait.waitForCriterion(wc, 60 * 1000, 1000, true); + } catch (AssertionFailedError e) { +// dumpStack(); + throw e; + } + + assertTrue(stats.getLoadConditioningConnect() > baselineLifetimeConnect); + assertTrue(stats.getLoadConditioningDisconnect() > baselineLifetimeDisconnect); + } + }; + + vm2.invoke(verify1Server); + assertEquals(true, putAI.isAlive()); + assertEquals(true, putAI2.isAlive()); + + { + final int restartPort = port1; + vm1.invoke(new SerializableRunnable("Restart CacheServer") { + public void run() { + try { + Region region = getRootRegion().getSubregion(name); + assertNotNull(region); + startBridgeServer(restartPort); + } + catch(Exception e) { + getSystem().getLogWriter().fine(new Exception(e)); + com.gemstone.gemfire.test.dunit.Assert.fail("Failed to start CacheServer", e); + } + } + }); + } + + vm2.invoke(verify2Servers); + assertEquals(true, putAI.isAlive()); + assertEquals(true, putAI2.isAlive()); + } finally { + vm2.invoke(new SerializableRunnable("Stop Putters") { + public void run() { + stopTestLifetimeExpire = true; + } + }); + + try { + if (putAI != null) { + // Verify that no exception has occurred in the putter thread + ThreadUtils.join(putAI, 30 * 1000); + if (putAI.exceptionOccurred()) { + com.gemstone.gemfire.test.dunit.Assert.fail("While putting entries: ", putAI.getException()); + } + } + + if (putAI2 != null) { + // Verify that no exception has occurred in the putter thread + ThreadUtils.join(putAI, 30 * 1000); + // FIXME this thread does not terminate +// if (putAI2.exceptionOccurred()) { +// fail("While putting entries: ", putAI.getException()); +// } + } + + } finally { + vm2.invoke(new SerializableRunnable("Stop Putters") { + public void run() { + stopTestLifetimeExpire = false; + } + }); + // Close Pool + vm2.invoke(new CacheSerializableRunnable("Close Pool") { + public void run2() throws CacheException { + Region region = getRootRegion().getSubregion(name); + String poolName = region.getAttributes().getPoolName(); + region.localDestroyRegion(); + PoolManager.find(poolName).destroy(); + } + }); + + SerializableRunnable stopCacheServer = + new SerializableRunnable("Stop CacheServer") { + public void run() { + stopBridgeServer(getCache()); + } + }; + vm1.invoke(stopCacheServer); + vm0.invoke(stopCacheServer); + } + } + } + + /** + * Tests the create operation of the {@link Pool} + * + * @since 3.5 + */ + public void test011PoolCreate() throws CacheException { + final String name = this.getName(); + final Host host = Host.getHost(0); + VM vm0 = host.getVM(0); + VM vm1 = host.getVM(1); + VM vm2 = Host.getHost(0).getVM(2); + + vm0.invoke(new CacheSerializableRunnable("Create Cache Server") { + public void run2() throws CacheException { + AttributesFactory factory = getBridgeServerRegionAttributes(null, null); + createRegion(name, factory.create()); + //pause(1000); + try { + startBridgeServer(0); + } catch (Exception ex) { + com.gemstone.gemfire.test.dunit.Assert.fail("While starting CacheServer", ex); + } + + } + }); + final int port = + vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort()); + final String host0 = NetworkUtils.getServerHostName(vm0.getHost()); + SerializableRunnable create = + new CacheSerializableRunnable("Create region") { + public void run2() throws CacheException { + getLonerSystem(); + getCache(); + AttributesFactory factory = new AttributesFactory(); + factory.setScope(Scope.LOCAL); + factory.setConcurrencyChecksEnabled(false); + ClientServerTestCase.configureConnectionPool(factory,host0,port,-1,false,-1,-1, null); + createRegion(name, factory.create()); + } + }; + + vm1.invoke(create); + vm1.invoke(new CacheSerializableRunnable("Create values") { + public void run2() throws CacheException { + Region region = getRootRegion().getSubregion(name); + for (int i = 0; i < 10; i++) { + region.create(new Integer(i), new Integer(i)); + } + } + }); + + vm2.invoke(create); + vm2.invoke(new CacheSerializableRunnable("Validate values") { + public void run2() throws CacheException { + Region region = getRootRegion().getSubregion(name); + for (int i = 0; i < 10; i++) { + Object value = region.get(new Integer(i)); + assertNotNull(value); + assertTrue(value instanceof Integer); + assertEquals(i, ((Integer) value).intValue()); + } + } + }); + + SerializableRunnable close = + new CacheSerializableRunnable("Close Pool") { + public void run2() throws CacheException { + Region region = getRootRegion().getSubregion(name); + region.localDestroyRegion(); + } + }; + + vm1.invoke(close); + vm2.invoke(close); + + vm0.invoke(new SerializableRunnable("Stop CacheServer") { + public void run() { + stopBridgeServer(getCache()); + } + }); + } + + /** + * Tests the put operation of the {@link Pool} + * + * @since 3.5 + */ + public void test012PoolPut() throws CacheException { + final String name = this.getName(); + final Host host = Host.getHost(0); + VM vm0 = host.getVM(0); + VM vm1 = host.getVM(1); + VM vm2 = Host.getHost(0).getVM(2); + + vm0.invoke(new CacheSerializableRunnable("Create Cache Server") { + public void run2() throws CacheException { + AttributesFactory factory = getBridgeServerRegionAttributes(null, null); + createRegion(name, factory.create()); + //pause(1000); + try { + startBridgeServer(0); + + } catch (Exception ex) { + com.gemstone.gemfire.test.dunit.Assert.fail("While starting CacheServer", ex); + } + + } + }); + + final int port = + vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort()); + final String host0 = NetworkUtils.getServerHostName(vm0.getHost()); + SerializableRunnable createPool = + new CacheSerializableRunnable("Create region") { + public void run2() throws CacheException { + getLonerSystem(); + getCache(); + AttributesFactory factory = new AttributesFactory(); + factory.setScope(Scope.LOCAL); + factory.setConcurrencyChecksEnabled(false); + // create bridge writer + ClientServerTestCase.configureConnectionPool(factory,host0,port,-1,false,-1,-1, null); + createRegion(name, factory.create()); + } + }; + + vm1.invoke(createPool); + + vm1.invoke(new CacheSerializableRunnable("Put values") { + public void run2() throws CacheException { + Region region = getRootRegion().getSubregion(name); + for (int i = 0; i < 10; i++) { + // put string values + region.put("key-string-"+i, "value-"+i); + + // put object values + Order order = new Order(); + order.init(i); + region.put("key-object-"+i, order); + + // put byte[] values + region.put("key-bytes-"+i, ("value-"+i).getBytes()); + } + } + }); + + vm2.invoke(createPool); + + vm2.invoke(new CacheSerializableRunnable("Get / validate string values") { + public void run2() throws CacheException { + Region region = getRootRegion().getSubregion(name); + for (int i = 0; i < 10; i++) { + Object value = region.get("key-string-"+i); + assertNotNull(value); + assertTrue(value instanceof String); + assertEquals("value-"+i, value); + } + } + }); + + vm2.invoke(new CacheSerializableRunnable("Get / validate object values") { + public void run2() throws CacheException { + Region region = getRootRegion().getSubregion(name); + for (int i = 0; i < 10; i++) { + Object value = region.get("key-object-"+i); + assertNotNull(value); + assertTrue(value instanceof Order); + assertEquals(i, ((Order) value).getIndex()); + } + } + }); + + vm2.invoke(new CacheSerializableRunnable("Get / validate byte[] values") { + public void run2() throws CacheException { + Region region = getRootRegion().getSubregion(name); + for (int i = 0; i < 10; i++) { + Object value = region.get("key-bytes-"+i); + assertNotNull(value); + assertTrue(value instanceof byte[]); + assertEquals("value-"+i, new String((byte[]) value)); + } + } + }); + + SerializableRunnable closePool = + new CacheSerializableRunnable("Close Pool") { + public void run2() throws CacheException { + Region region = getRootRegion().getSubregion(name); + region.localDestroyRegion(); + } + }; + + vm1.invoke(closePool); + vm2.invoke(closePool); + + vm0.invoke(new SerializableRunnable("Stop CacheServer") { + public void run() { + stopBridgeServer(getCache()); + } + }); + } + /** + * Tests the put operation of the {@link Pool} + * + * @since 3.5 + */ + public void test013PoolPutNoDeserialize() throws CacheException { + final String name = this.getName(); + final Host host = Host.getHost(0); + VM vm0 = host.getVM(0); + VM vm1 = host.getVM(1); + VM vm2 = Host.getHost(0).getVM(2); + + vm0.invoke(new CacheSerializableRunnable("Create Cache Server") { + public void run2() throws CacheException { + AttributesFactory factory = getBridgeServerRegionAttributes(null,null); + createRegion(name, factory.create()); + //pause(1000); + try { + startBridgeServer(0); + } catch (Exception ex) { + com.gemstone.gemfire.test.dunit.Assert.fail("While starting CacheServer", ex); + } + + } + }); + + final int port = + vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort()); + final String host0 = NetworkUtils.getServerHostName(vm0.getHost()); + + SerializableRunnable createPool = + new CacheSerializableRunnable("Create region") { + public void run2() throws CacheException { + getLonerSystem(); + getCache(); + AttributesFactory factory = new AttributesFactory(); + factory.setScope(Scope.LOCAL); + factory.setConcurrencyChecksEnabled(false); + ClientServerTestCase.configureConnectionPool(factory,host0,port,-1,false,-1,-1, null); + createRegion(name, factory.create()); + } + }; + + vm1.invoke(createPool); + + vm1.invoke(new CacheSerializableRunnable("Put values") { + public void run2() throws CacheException { + Region region = getRootRegion().getSubregion(name); + for (int i = 0; i < 10; i++) { + // put string values + region.put("key-string-"+i, "value-"+i); + + // put object values + Order order = new Order(); + order.init(i); + region.put("key-object-"+i, order); + + // put byte[] values + region.put("key-bytes-"+i, ("value-"+i).getBytes()); + } + } + }); + + vm2.invoke(createPool); + + vm2.invoke(new CacheSerializableRunnable("Get / validate string values") { + public void run2() throws CacheException { + Region region = getRootRegion().getSubregion(name); + for (int i = 0; i < 10; i++) { + Object value = region.get("key-string-"+i); + assertNotNull(value); + assertTrue(value instanceof String); + assertEquals("value-"+i, value); + } + } + }); + + vm2.invoke(new CacheSerializableRunnable("Get / validate object values") { + public void run2() throws CacheException { + Region region = getRootRegion().getSubregion(name); + for (int i = 0; i < 10; i++) { + Object value = region.get("key-object-"+i); + assertNotNull(value); + assertTrue(value instanceof Order); + assertEquals(i, ((Order) value).getIndex()); + } + } + }); + + vm2.invoke(new CacheSerializableRunnable("Get / validate byte[] values") { + public void run2() throws CacheException { + Region region = getRootRegion().getSubregion(name); + for (int i = 0; i < 10; i++) { + Object value = region.get("key-bytes-"+i); + assertNotNull(value); + assertTrue(value instanceof byte[]); + assertEquals("value-"+i, new String((byte[]) value)); + } + } + }); + + SerializableRunnable closePool = + new CacheSerializableRunnable("Close Pool") { + public void run2() throws CacheException { + Region region = getRootRegion().getSubregion(name); + region.localDestroyRegion(); + } + }; + + vm1.invoke(closePool); + vm2.invoke(closePool); + + vm0.invoke(new SerializableRunnable("Stop CacheServer") { + public void run() { + stopBridgeServer(getCache()); + } + }); + Wait.pause(5 * 1000); + } + + /** + * Tests that invalidates and destroys are propagated to {@link Pool}s. + * + * @since 3.5 + */ + public void test014InvalidateAndDestroyPropagation() throws CacheException { + final String name = this.getName(); + final Host host = Host.getHost(0); + VM vm0 = host.getVM(0); + VM vm1 = host.getVM(1); + VM vm2 = host.getVM(2); + + vm0.invoke(new CacheSerializableRunnable("Create Cache Server") { + public void run2() throws CacheException { + AttributesFactory factory = getBridgeServerRegionAttributes(null, null); + createRegion(name, factory.create()); + //pause(1000); + try { + startBridgeServer(0); + + } catch (Exception ex) { + com.gemstone.gemfire.test.dunit.Assert.fail("While starting CacheServer", ex); + } + + } + + }); + final int port = + vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort()); + final String host0 = NetworkUtils.getServerHostName(vm0.getHost()); + + SerializableRunnable create = + new CacheSerializableRunnable("Create region") { + public void run2() throws CacheException { + getLonerSystem(); + getCache(); + AttributesFactory factory = new AttributesFactory(); + factory.setScope(Scope.LOCAL); + factory.setConcurrencyChecksEnabled(false); + ClientServerTestCase.configureConnectionPool(factory,host0,port,-1,true,-1,-1, null); + CertifiableTestCacheListener l = new CertifiableTestCacheListener(com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter()); + factory.setCacheListener(l); + Region rgn = createRegion(name, factory.create()); + rgn.registerInterestRegex(".*", false, false); + } + }; + + vm1.invoke(create); + vm1.invoke(new CacheSerializableRunnable("Populate region") { + public void run2() throws CacheException { + Region region = getRootRegion().getSubregion(name); + for (int i = 0; i < 10; i++) { + region.put(new Integer(i), "old" + i); + } + } + }); + vm2.invoke(create); + Wait.pause(5 * 1000); + + vm1.invoke(new CacheSerializableRunnable("Turn on history") { + public void run2() throws CacheException { + Region region = getRootRegion().getSubregion(name); + CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) region.getAttributes().getCacheListener(); + ctl.enableEventHistory(); + } + }); + vm2.invoke(new CacheSerializableRunnable("Update region") { + public void run2() throws CacheException { + Region region = getRootRegion().getSubregion(name); + for (int i = 0; i < 10; i++) { + region.put(new Integer(i), "new" + i, "callbackArg" + i); + } + } + }); + Wait.pause(5 * 1000); + + vm1.invoke(new CacheSerializableRunnable("Verify invalidates") { + public void run2() throws CacheException { + Region region = getRootRegion().getSubregion(name); + CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) region.getAttributes().getCacheListener(); + for (int i = 0; i < 10; i++) { + Object key = new Integer(i); + ctl.waitForInvalidated(key); + Region.Entry entry = region.getEntry(key); + assertNotNull(entry); + assertNull(entry.getValue()); + } + { + List l = ctl.getEventHistory(); + assertEquals(10, l.size()); + for (int i = 0; i < 10; i++) { + Object key = new Integer(i); + EntryEvent ee = (EntryEvent)l.get(i); + assertEquals(key, ee.getKey()); + assertEquals("old" + i, ee.getOldValue()); + assertEquals(Operation.INVALIDATE, ee.getOperation()); + assertEquals("callbackArg" + i, ee.getCallbackArgument()); + assertEquals(true, ee.isOriginRemote()); + } + } + } + }); + + + vm2.invoke(new CacheSerializableRunnable("Validate original and destroy") { + public void run2() throws CacheException { + Region region = getRootRegion().getSubregion(name); + for (int i = 0; i < 10; i++) { + Object key = new Integer(i); + assertEquals("new" + i, region.getEntry(key).getValue()); + region.destroy(key, "destroyCB"+i); + } + } + }); + Wait.pause(5 * 1000); + + vm1.invoke(new CacheSerializableRunnable("Verify destroys") { + public void run2() throws CacheException { + Region region = getRootRegion().getSubregion(name); + CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) region.getAttributes().getCacheListener(); + for (int i = 0; i < 10; i++) { + Object key = new Integer(i); + ctl.waitForDestroyed(key); + Region.Entry entry = region.getEntry(key); + assertNull(entry); + } + { + List l = ctl.getEventHistory(); + assertEquals(10, l.size()); + for (int i = 0; i < 10; i++) { + Object key = new Integer(i); + EntryEvent ee = (EntryEvent)l.get(i); + assertEquals(key, ee.getKey()); + assertEquals(null, ee.getOldValue()); + assertEquals(Operation.DESTROY, ee.getOperation()); + assertEquals("destroyCB"+i, ee.getCallbackArgument()); + assertEquals(true, ee.isOriginRemote()); + } + } + } + }); + vm2.invoke(new CacheSerializableRunnable("recreate") { + public void run2() throws CacheException { + Region region = getRootRegion().getSubregion(name); + for (int i = 0; i < 10; i++) { + Object key = new Integer(i); + region.create(key, "create" + i); + } + } + }); + Wait.pause(5 * 1000); + + vm1.invoke(new CacheSerializableRunnable("Verify creates") { + public void run2() throws CacheException { + Region region = getRootRegion().getSubregion(name); + CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) region.getAttributes().getCacheListener(); + List l = ctl.getEventHistory(); + com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter().info("history (should be empty): " + l); + assertEquals(0, l.size()); + // now see if we can get it from the server + for (int i = 0; i < 10; i++) { + Object key = new Integer(i); + assertEquals("create"+i, region.get(key, "loadCB"+i)); + } + l = ctl.getEventHistory(); + assertEquals(10, l.size()); + for (int i = 0; i < 10; i++) { + Object key = new Integer(i); + EntryEvent ee = (EntryEvent)l.get(i); + com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter().info("processing " + ee); + assertEquals(key, ee.getKey()); + assertEquals(null, ee.getOldValue()); + assertEquals("create"+i, ee.getNewValue()); + assertEquals(Operation.LOCAL_LOAD_CREATE, ee.getOperation()); + assertEquals("loadCB"+i, ee.getCallbackArgument()); + assertEquals(false, ee.isOriginRemote()); + } + } + }); + + SerializableRunnable close = + new CacheSerializableRunnable("Close Pool") { + public void run2() throws CacheException { + Region region = getRootRegion().getSubregion(name); + region.localDestroyRegion(); + } + }; + + vm1.invoke(close); + vm2.invoke(close); + + vm0.invoke(new SerializableRunnable("Stop CacheServer") { + public void run() { + stopBridgeServer(getCache()); + } + }); + } + /** + * Tests that invalidates and destroys are propagated to {@link Pool}s + * correctly to DataPolicy.EMPTY + InterestPolicy.ALL + * + * @since 5.0 + */ + public void test015InvalidateAndDestroyToEmptyAllPropagation() throws CacheException { + final String name = this.getName(); + final Host host = Host.getHost(0); + VM vm0 = host.getVM(0); + VM vm1 = host.getVM(1); + VM vm2 = host.getVM(2); + + vm0.invoke(new CacheSerializableRunnable("Create Cache Server") { + public void run2() throws CacheException { + AttributesFactory factory = getBridgeServerRegionAttributes(null, null); + createRegion(name, factory.create()); + //pause(1000); + try { + startBridgeServer(0); + + } catch (Exception ex) { + com.gemstone.gemfire.test.dunit.Assert.fail("While starting CacheServer", ex); + } + + } + }); + final int port = + vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort()); + final String host0 = NetworkUtils.getServerHostName(vm0.getHost()); + + SerializableRunnable createEmpty = + new CacheSerializableRunnable("Create region") { + public void run2() throws CacheException { + getLonerSystem(); + getCache(); + AttributesFactory factory = new AttributesFactory(); + factory.setScope(Scope.LOCAL); + factory.setConcurrencyChecksEnabled(false); + ClientServerTestCase.configureConnectionPool(factory,host0,port,-1,true,-1,-1, null); + CertifiableTestCacheListener l = new CertifiableTestCacheListener(com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter()); + factory.setCacheListener(l); + factory.setDataPolicy(DataPolicy.EMPTY); + factory.setSubscriptionAttributes(new SubscriptionAttributes(InterestPolicy.ALL)); + Region rgn = createRegion(name, factory.create()); + rgn.registerInterestRegex(".*", false, false); + } + }; + SerializableRunnable createNormal = + new CacheSerializableRunnable("Create region") { + public void run2() throws CacheException { + getLonerSystem(); + getCache(); + AttributesFactory factory = new AttributesFactory(); + factory.setScope(Scope.LOCAL); + factory.setConcurrencyChecksEnabled(false); + ClientServerTestCase.configureConnectionPool(factory,host0,port,-1,true,-1,-1, null); + CertifiableTestCacheListener l = new CertifiableTestCacheListener(com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter()); + factory.setCacheListener(l); + Region rgn = createRegion(name, factory.create()); + rgn.registerInterestRegex(".*", false, false); + } + }; + + vm1.invoke(createEmpty); + vm1.invoke(new CacheSerializableRunnable("Populate region") { + public void run2() throws CacheException { + Region region = getRootRegion().getSubregion(name); + for (int i = 0; i < 10; i++) { + region.put(new Integer(i), "old" + i); + } + } + }); + + vm2.invoke(createNormal); + vm1.invoke(new CacheSerializableRunnable("Turn on history") { + public void run2() throws CacheException { + Region region = getRootRegion().getSubregion(name); + CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) region.getAttributes().getCacheListener(); + ctl.enableEventHistory(); + } + }); + vm2.invoke(new CacheSerializableRunnable("Update region") { + public void run2() throws CacheException { + Region region = getRootRegion().getSubregion(name); + for (int i = 0; i < 10; i++) { + region.put(new Integer(i), "new" + i, "callbackArg" + i); + } + } + }); + Wait.pause(5 * 1000); + + vm1.invoke(new CacheSerializableRunnable("Verify invalidates") { + public void run2() throws CacheException { + Region region = getRootRegion().getSubregion(name); + CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) region.getAttributes().getCacheListener(); + for (int i = 0; i < 10; i++) { + Object key = new Integer(i); + ctl.waitForInvalidated(key); + Region.Entry entry = region.getEntry(key); + assertNull(entry); // we are empty! + } + { + List l = ctl.getEventHistory(); + assertEquals(10, l.size()); + for (int i = 0; i < 10; i++) { + Object key = new Integer(i); + EntryEvent ee = (EntryEvent)l.get(i); + assertEquals(key, ee.getKey()); + assertEquals(null, ee.getOldValue()); + assertEquals(false, ee.isOldValueAvailable()); // failure + assertEquals(Operation.INVALIDATE, ee.getOperation()); + assertEquals("callbackArg" + i, ee.getCallbackArgument()); + assertEquals(true, ee.isOriginRemote()); + } + } + + } + }); + + + vm2.invoke(new CacheSerializableRunnable("Validate original and destroy") { + public void run2() throws CacheException { + Region region = getRootRegion().getSubregion(name); + for (int i = 0; i < 10; i++) { + Object key = new Integer(i); + assertEquals("new" + i, region.getEntry(key).getValue()); + region.destroy(key, "destroyCB"+i); + } + } + }); + Wait.pause(5 * 1000); + + vm1.invoke(new CacheSerializableRunnable("Verify destroys") { + public void run2() throws CacheException { + Region region = getRootRegion().getSubregion(name); + CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) region.getAttributes().getCacheListener(); + for (int i = 0; i < 10; i++) { + Object key = new Integer(i); + ctl.waitForDestroyed(key); + Region.Entry entry = region.getEntry(key); + assertNull(entry); + } + { + List l = ctl.getEventHistory(); + assertEquals(10, l.size()); + for (int i = 0; i < 10; i++) { + Object key = new Integer(i); + EntryEvent ee = (EntryEvent)l.get(i); + assertEquals(key, ee.getKey()); + assertEquals(null, ee.getOldValue()); + assertEquals(false, ee.isOldValueAvailable()); + assertEquals(Operation.DESTROY, ee.getOperation()); + assertEquals("destroyCB"+i, ee.getCallbackArgument()); + assertEquals(true, ee.isOriginRemote()); + } + } + } + }); + vm2.invoke(new CacheSerializableRunnable("recreate") { + public void run2() throws CacheException { + Region region = getRootRegion().getSubregion(name); + for (int i = 0; i < 10; i++) { + Object key = new Integer(i); + region.create(key, "create" + i, "createCB"+i); + } + } + }); + Wait.pause(5 * 1000); + + vm1.invoke(new CacheSerializableRunnable("Verify creates") { + public void run2() throws CacheException { + Region region = getRootRegion().getSubregion(name); + CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) region.getAttributes().getCacheListener(); + for (int i = 0; i < 10; i++) { + Object key = new Integer(i); + ctl.waitForInvalidated(key); + Region.Entry entry = region.getEntry(key); + assertNull(entry); + } + List l = ctl.getEventHistory(); + assertEquals(10, l.size()); + for (int i = 0; i < 10; i++) { + Object key = new Integer(i); + EntryEvent ee = (EntryEvent)l.get(i); + assertEquals(key, ee.getKey()); + assertEquals(null, ee.getOldValue()); + assertEquals(false, ee.isOldValueAvailable()); + assertEquals(Operation.INVALIDATE, ee.getOperation()); + assertEquals("createCB"+i, ee.getCallbackArgument()); + assertEquals(true, ee.isOriginRemote()); + } + // now see if we can get it from the server + for (int i = 0; i < 10; i++) { + Object key = new Integer(i); + assertEquals("create"+i, region.get(key, "loadCB"+i)); + } + l = ctl.getEventHistory(); + assertEquals(10, l.size()); + for (int i = 0; i < 10; i++) { + Object key = new Integer(i); + EntryEvent ee = (EntryEvent)l.get(i); + assertEquals(key, ee.getKey()); + assertEquals(null, ee.getOldValue()); + assertEquals("create"+i, ee.getNewValue()); + assertEquals(Operation.LOCAL_LOAD_CREATE, ee.getOperation()); + assertEquals("loadCB"+i, ee.getCallbackArgument()); + assertEquals(false, ee.isOriginRemote()); + } + } + }); + + SerializableRunnable close = + new CacheSerializableRunnable("Close Pool") { + public void run2() throws CacheException { + Region region = getRootRegion().getSubregion(name); + region.localDestroyRegion(); + } + }; + + vm1.invoke(close); + vm2.invoke(close); + + vm0.invoke(new SerializableRunnable("Stop CacheServer") { + public void run() { + stopBridgeServer(getCache()); + } + }); + } + + /** + * Tests that invalidates and destroys are propagated to {@link Pool}s + * correctly to DataPolicy.EMPTY + InterestPolicy.CACHE_CONTENT + * + * @since 5.0 + */ + public void test016InvalidateAndDestroyToEmptyCCPropagation() throws CacheException { + final String name = this.getName(); + final Host host = Host.getHost(0); + VM vm0 = host.getVM(0); + VM vm1 = host.getVM(1); + VM vm2 = host.getVM(2); + + vm0.invoke(new CacheSerializableRunnable("Create Cache Server") { + public void run2() throws CacheException { + AttributesFactory factory = getBridgeServerRegionAttributes(null, null); + createRegion(name, factory.create()); + try { + startBridgeServer(0); + + } catch (Exception ex) { + com.gemstone.gemfire.test.dunit.Assert.fail("While starting CacheServer", ex); + } + + } + }); + final int port = + vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort()); + final String host0 = NetworkUtils.getServerHostName(vm0.getHost()); + + SerializableRunnable createEmpty = + new CacheSerializableRunnable("Create region") { + public void run2() throws CacheException { + getLonerSystem(); + getCache(); + AttributesFactory factory = new AttributesFactory(); + factory.setScope(Scope.LOCAL); + factory.setConcurrencyChecksEnabled(false); + ClientServerTestCase.configureConnectionPool(factory,host0,port,-1,true,-1,-1, null); + CertifiableTestCacheListener l = new CertifiableTestCacheListener(com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter()); + factory.setCacheListener(l); + factory.setDataPolicy(DataPolicy.EMPTY); + factory.setSubscriptionAttributes(new SubscriptionAttributes(InterestPolicy.CACHE_CONTENT)); + Region rgn = createRegion(name, factory.create()); + rgn.registerInterestRegex(".*", false, false); + } + }; + SerializableRunnable createNormal = + new CacheSerializableRunnable("Create region") { + public void run2() throws CacheException { + getLonerSystem(); + getCache(); + AttributesFactory factory = new AttributesFactory(); + factory.setScope(Scope.LOCAL); + factory.setConcurrencyChecksEnabled(false); + ClientServerTestCase.configureConnectionPool(factory,host0,port,-1,true,-1,-1, null); + CertifiableTestCacheListener l = new CertifiableTestCacheListener(com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter()); + factory.setCacheListener(l); + Region rgn = createRegion(name, factory.create()); + rgn.registerInterestRegex(".*", false, false); + } + }; + + vm1.invoke(createEmpty); + vm1.invoke(new CacheSerializableRunnable("Populate region") { + public void run2() throws CacheException { + Region region = getRootRegion().getSubregion(name); + for (int i = 0; i < 10; i++) { + region.put(new Integer(i), "old" + i); + } + } + }); + + vm2.invoke(createNormal); + vm1.invoke(new CacheSerializableRunnable("Turn on history") { + public void run2() throws CacheException { + Region region = getRootRegion().getSubregion(name); + CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) region.getAttributes().getCacheListener(); + ctl.enableEventHistory(); + } + }); + vm2.invoke(new CacheSerializableRunnable("Update region") { + public void run2() throws CacheException { + Region region = getRootRegion().getSubregion(name); + for (int i = 0; i < 10; i++) { + region.put(new Integer(i), "new" + i, "callbackArg" + i); + } + } + }); + Wait.pause(5 * 1000); + + vm1.invoke(new CacheSerializableRunnable("Verify invalidates") { + public void run2() throws CacheException { + Region region = getRootRegion().getSubregion(name); + CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) region.getAttributes().getCacheListener(); + List l = ctl.getEventHistory(); + assertEquals(0, l.size()); + } + }); + + + vm2.invoke(new CacheSerializableRunnable("Validate original and destroy") { + public void run2() throws CacheException { + Region region = getRootRegion().getSubregion(name); + for (int i = 0; i < 10; i++) { + Object key = new Integer(i); + assertEquals("new" + i, region.getEntry(key).getValue()); + region.destroy(key, "destroyCB"+i); + } + } + }); + + vm1.invoke(new CacheSerializableRunnable("Verify destroys") { + public void run2() throws CacheException { + Region region = getRootRegion().getSubregion(name); + CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) region.getAttributes().getCacheListener(); + List l = ctl.getEventHistory(); + assertEquals(0, l.size()); + } + }); + vm2.invoke(new CacheSerializableRunnable("recreate") { + public void run2() throws CacheException { + Region region = getRootRegion().getSubregion(name); + for (int i = 0; i < 10; i++) { + Object key = new Integer(i); + region.create(key, "create" + i, "createCB"+i); + } + } + <TRUNCATED>