Repository: incubator-geode Updated Branches: refs/heads/develop 066a9d57a -> 49e3f523d
GEODE-920: lazily create HaContainer for cache server The CacheClientNotifier instance might have been created by gateway receicver. Then not to create the HaContainer until a cache server starts and amend the CacheClientNotifier instance by initialize the HaContainer. Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/49e3f523 Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/49e3f523 Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/49e3f523 Branch: refs/heads/develop Commit: 49e3f523d16389f5e84847c6dcfd6ab45427f8c2 Parents: 066a9d5 Author: zhouxh <gz...@pivotal.io> Authored: Fri Mar 25 10:04:35 2016 -0700 Committer: zhouxh <gz...@pivotal.io> Committed: Mon Apr 4 15:48:57 2016 -0700 ---------------------------------------------------------------------- .../internal/cache/InitialImageOperation.java | 15 +- .../gemfire/internal/cache/LocalRegion.java | 3 + .../internal/cache/ha/HAContainerRegion.java | 5 + .../cache/tier/sockets/CacheClientNotifier.java | 58 +++-- .../cache/wan/CacheClientNotifierDUnitTest.java | 225 +++++++++++++++++++ 5 files changed, 279 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/49e3f523/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InitialImageOperation.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InitialImageOperation.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InitialImageOperation.java index 9bd3faf..a72ca8e 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InitialImageOperation.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InitialImageOperation.java @@ -2179,9 +2179,11 @@ public class InitialImageOperation { if (logger.isDebugEnabled()) { try { CacheClientNotifier ccn = CacheClientNotifier.getInstance(); - CacheClientProxy proxy = ((HAContainerWrapper)ccn.getHaContainer()).getProxy( - region.getName()); - logger.debug("Processing FilterInfo for proxy: {} : {}", proxy, msg); + if (ccn != null && ccn.getHaContainer() != null) { + CacheClientProxy proxy = ((HAContainerWrapper)ccn.getHaContainer()).getProxy( + region.getName()); + logger.debug("Processing FilterInfo for proxy: {} : {}", proxy, msg); + } } catch (Exception ex) { // Ignore. } @@ -3705,8 +3707,13 @@ public class InitialImageOperation { */ public void registerFilters(LocalRegion region) { CacheClientNotifier ccn = CacheClientNotifier.getInstance(); + CacheClientProxy proxy; try { + if (ccn == null || ccn.getHaContainer() == null) { + logger.info("Found null cache client notifier. Failed to register Filters during HARegion GII. Region :{}", region.getName()); + return; + } proxy = ((HAContainerWrapper)ccn.getHaContainer()).getProxy( region.getName()); } catch (Exception ex) { @@ -3716,7 +3723,7 @@ public class InitialImageOperation { } if (proxy == null) { - logger.info("Found null client proxy. Failed to register Filters during HARegion GII. Region :{}", region.getName()); + logger.info("Found null client proxy. Failed to register Filters during HARegion GII. Region :{}, HaContainer :{}", region.getName(), ccn.getHaContainer()); return; } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/49e3f523/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java index 8e30a7a..3ff48bb 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java @@ -4576,6 +4576,9 @@ public class LocalRegion extends AbstractRegion public void refreshEntriesFromServerKeys(Connection con, List serverKeys, InterestResultPolicy pol) { + if (serverKeys == null) { + return; + } ServerRegionProxy proxy = getServerProxy(); if (logger.isDebugEnabled()) { logKeys(serverKeys, pol); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/49e3f523/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ha/HAContainerRegion.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ha/HAContainerRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ha/HAContainerRegion.java index 3bed769..fe0c112 100755 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ha/HAContainerRegion.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ha/HAContainerRegion.java @@ -55,6 +55,11 @@ public class HAContainerRegion implements HAContainerWrapper { } } + public Region getMapForTest() { + Region region = (Region)map; + return region; + } + public Object putProxy(String haName, CacheClientProxy proxy) { return haRegionNameToProxy.put(haName, proxy); } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/49e3f523/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientNotifier.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientNotifier.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientNotifier.java index 6ac4690..3178b8d 100755 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientNotifier.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientNotifier.java @@ -162,6 +162,12 @@ public class CacheClientNotifier { messageTimeToLive, transactionTimeToLive, listener, overflowAttributesList, isGatewayReceiver); } + + if (!isGatewayReceiver && ccnSingleton.getHaContainer() == null) { + // Gateway receiver might have create CCN instance without HaContainer + // In this case, the HaContainer should be lazily created here + ccnSingleton.initHaContainer(overflowAttributesList); + } // else { // ccnSingleton.acceptorStats = acceptorStats; // ccnSingleton.maximumMessageCount = maximumMessageCount; @@ -1671,9 +1677,11 @@ public class CacheClientNotifier { if (noActiveServer() && ccnSingleton != null){ ccnSingleton = null; - haContainer.cleanUp(); - if (isDebugEnabled) { - logger.debug("haContainer ({}) is now cleaned up.", haContainer.getName()); + if (haContainer != null) { + haContainer.cleanUp(); + if (isDebugEnabled) { + logger.debug("haContainer ({}) is now cleaned up.", haContainer.getName()); + } } this.clearCompiledQueries(); blackListedClients.clear(); @@ -2147,25 +2155,6 @@ public class CacheClientNotifier { // Set the security LogWriter this.securityLogWriter = (InternalLogWriter)cache.getSecurityLogger(); - // Create the overflow artifacts - if (overflowAttributesList != null - && !HARegionQueue.HA_EVICTION_POLICY_NONE.equals(overflowAttributesList - .get(0))) { - haContainer = new HAContainerRegion(cache.getRegion(Region.SEPARATOR - + CacheServerImpl.clientMessagesRegion((GemFireCacheImpl)cache, - (String)overflowAttributesList.get(0), - ((Integer)overflowAttributesList.get(1)).intValue(), - ((Integer)overflowAttributesList.get(2)).intValue(), - (String)overflowAttributesList.get(3), - (Boolean)overflowAttributesList.get(4)))); - } - else { - haContainer = new HAContainerMap(new HashMap()); - } - if (logger.isDebugEnabled()) { - logger.debug("ha container ({}) has been created.", haContainer.getName()); - } - this.maximumMessageCount = maximumMessageCount; this.messageTimeToLive = messageTimeToLive; this.transactionTimeToLive = transactionTimeToLive; @@ -2608,7 +2597,7 @@ public class CacheClientNotifier { * (in case of eviction policy "none"). In both the cases, it'll store * HAEventWrapper as its key and ClientUpdateMessage as its value. */ - private final HAContainerWrapper haContainer; + private HAContainerWrapper haContainer; // /** // * The singleton <code>CacheClientNotifier</code> instance @@ -2691,6 +2680,29 @@ public class CacheClientNotifier { public Map getHaContainer() { return haContainer; } + + public void initHaContainer(List overflowAttributesList) { + // lazily initialize haContainer in case this CCN instance was created by a gateway receiver + if (overflowAttributesList != null + && !HARegionQueue.HA_EVICTION_POLICY_NONE.equals(overflowAttributesList + .get(0))) { + haContainer = new HAContainerRegion(_cache.getRegion(Region.SEPARATOR + + CacheServerImpl.clientMessagesRegion((GemFireCacheImpl)_cache, + (String)overflowAttributesList.get(0), + ((Integer)overflowAttributesList.get(1)).intValue(), + ((Integer)overflowAttributesList.get(2)).intValue(), + (String)overflowAttributesList.get(3), + (Boolean)overflowAttributesList.get(4)))); + } + else { + haContainer = new HAContainerMap(new HashMap()); + } + assert haContainer != null; + + if (logger.isDebugEnabled()) { + logger.debug("ha container ({}) has been created.", haContainer.getName()); + } + } private final Set blackListedClients = new CopyOnWriteArraySet(); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/49e3f523/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/CacheClientNotifierDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/CacheClientNotifierDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/CacheClientNotifierDUnitTest.java new file mode 100755 index 0000000..9557f0d --- /dev/null +++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/CacheClientNotifierDUnitTest.java @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache.wan; + +import java.io.IOException; +import java.util.List; + +import com.gemstone.gemfire.cache.DiskStore; +import com.gemstone.gemfire.cache.EvictionAction; +import com.gemstone.gemfire.cache.EvictionAttributes; +import com.gemstone.gemfire.cache.Region; +import com.gemstone.gemfire.cache.RegionAttributes; +import com.gemstone.gemfire.cache.server.CacheServer; +import com.gemstone.gemfire.cache.server.ClientSubscriptionConfig; +import com.gemstone.gemfire.internal.AvailablePort; +import com.gemstone.gemfire.internal.cache.CacheServerImpl; +import com.gemstone.gemfire.internal.cache.UserSpecifiedRegionAttributes; +import com.gemstone.gemfire.internal.cache.ha.HAContainerRegion; +import com.gemstone.gemfire.internal.cache.ha.HAContainerWrapper; +import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientNotifier; +import com.gemstone.gemfire.internal.cache.xmlcache.RegionAttributesCreation; +import com.gemstone.gemfire.internal.logging.LogService; +import com.gemstone.gemfire.test.dunit.IgnoredException; +import com.gemstone.gemfire.test.dunit.SerializableRunnable; +import com.gemstone.gemfire.test.dunit.VM; +import com.gemstone.gemfire.test.dunit.Wait; +import com.gemstone.gemfire.test.dunit.WaitCriterion; +import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; + +public class CacheClientNotifierDUnitTest extends WANTestBase { + private static final int NUM_KEYS = 10; + + public CacheClientNotifierDUnitTest(String name) { + super(name); + // TODO Auto-generated constructor stub + } + + private int createCacheServerWithCSC(VM vm, final boolean withCSC, final int capacity, + final String policy, final String diskStoreName) { + final int serverPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); + + SerializableRunnable createCacheServer = new SerializableRunnable() { + @Override + public void run() throws Exception { + CacheServerImpl server = (CacheServerImpl)cache.addCacheServer(); + server.setPort(serverPort); + if (withCSC) { + if (diskStoreName != null) { + DiskStore ds = cache.findDiskStore(diskStoreName); + if(ds == null) { + ds = cache.createDiskStoreFactory().create(diskStoreName); + } + } + ClientSubscriptionConfig csc = server.getClientSubscriptionConfig(); + csc.setCapacity(capacity); + csc.setEvictionPolicy(policy); + csc.setDiskStoreName(diskStoreName); + server.setHostnameForClients("localhost"); + //server.setGroups(new String[]{"serv"}); + } + try { + server.start(); + } catch (IOException e) { + com.gemstone.gemfire.test.dunit.Assert.fail("Failed to start server ", e); + } + } + }; + vm.invoke(createCacheServer); + return serverPort; + } + + private void checkCacheServer(VM vm, final int serverPort, final boolean withCSC, final int capacity) { + SerializableRunnable checkCacheServer = new SerializableRunnable() { + + @Override + public void run() throws Exception { + List<CacheServer> cacheServers = ((GemFireCacheImpl)cache).getCacheServersAndGatewayReceiver(); + CacheServerImpl server = null; + for (CacheServer cs:cacheServers) { + if (cs.getPort() == serverPort) { + server = (CacheServerImpl)cs; + break; + } + } + assertNotNull(server); + CacheClientNotifier ccn = server.getAcceptor().getCacheClientNotifier(); + HAContainerRegion haContainer = (HAContainerRegion)ccn.getHaContainer(); + if (server.getAcceptor().isGatewayReceiver()) { + assertNull(haContainer); + return; + } + Region internalRegion = haContainer.getMapForTest(); + RegionAttributes ra = internalRegion.getAttributes(); + EvictionAttributes ea = ra.getEvictionAttributes(); + if (withCSC) { + assertNotNull(ea); + assertEquals(capacity, ea.getMaximum()); + assertEquals(EvictionAction.OVERFLOW_TO_DISK, ea.getAction()); + } else { + assertNull(ea); + } + } + }; + vm.invoke(checkCacheServer); + } + + private void closeCacheServer(VM vm, final int serverPort) { + SerializableRunnable stopCacheServer = new SerializableRunnable() { + + @Override + public void run() throws Exception { + List<CacheServer> cacheServers = cache.getCacheServers(); + CacheServerImpl server = null; + for (CacheServer cs:cacheServers) { + if (cs.getPort() == serverPort) { + server = (CacheServerImpl)cs; + break; + } + } + assertNotNull(server); + server.stop(); + } + }; + vm.invoke(stopCacheServer); + } + + private void verifyRegionSize(VM vm, final int expect) { + SerializableRunnable verifyRegionSize = new SerializableRunnable() { + @Override + public void run() throws Exception { + final Region region = cache.getRegion(getTestMethodName() + "_PR"); + + Wait.waitForCriterion(new WaitCriterion() { + public boolean done() { + return region.size() == expect; + } + public String description() { + return null; + } + }, 60000, 100, false); + assertEquals(expect, region.size()); + } + }; + vm.invoke(verifyRegionSize); + } + + /* + * The test will start several cache servers, including gateway receivers. + * Shutdown them and verify the CacheClientNofifier for each server is correct + */ + public void testMultipleCacheServer() throws Exception { + /* test senario: */ + /* create 1 GatewaySender on vm0 */ + /* create 1 GatewayReceiver on vm1 */ + /* create 2 cache servers on vm1, one with overflow. */ + /* verify if the cache server2 still has the overflow attributes */ + /* create 1 cache client1 on vm2 to register interest on cache server1 */ + /* create 1 cache client2 on vm3 to register interest on cache server1 */ + /* do some puts to GatewaySender on vm0 */ + + // create sender at ln + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + + // create recever and cache servers will be at ny + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + vm1.invoke(() -> WANTestBase.createCache( nyPort )); + int receiverPort = vm1.invoke(() -> WANTestBase.createReceiver( nyPort )); + checkCacheServer(vm1, receiverPort, false, 0); + + // create PR for receiver + vm1.invoke(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + + // create cache server1 with overflow + int serverPort = createCacheServerWithCSC(vm1, true, 3, "entry", "DEFAULT"); + checkCacheServer(vm1, serverPort, true, 3); + + // create cache server 2 + final int serverPort2 = createCacheServerWithCSC(vm1, false, 0, null, null); + // Currently, only the first cache server's overflow attributes will take effect + // It will be enhanced in GEODE-1102 + checkCacheServer(vm1, serverPort2, true, 3); + LogService.getLogger().info("receiverPort="+receiverPort+",serverPort="+serverPort+",serverPort2="+serverPort2); + + vm2.invoke(() -> WANTestBase.createClientWithLocator(nyPort, "localhost", getTestMethodName() + "_PR" )); + vm3.invoke(() -> WANTestBase.createClientWithLocator(nyPort, "localhost", getTestMethodName() + "_PR" )); + + vm0.invoke(() -> WANTestBase.createCache( lnPort )); + vm0.invoke(() -> WANTestBase.createSender( "ln", 2, false, 100, 400, false, false, null, true )); + vm0.invoke(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm0.invoke(() -> WANTestBase.startSender( "ln" )); + vm0.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", NUM_KEYS )); + + /* verify */ + verifyRegionSize(vm0, NUM_KEYS); + verifyRegionSize(vm1, NUM_KEYS); + verifyRegionSize(vm2, NUM_KEYS); + verifyRegionSize(vm3, NUM_KEYS); + + // close a cache server, then re-test + closeCacheServer(vm1, serverPort2); + + vm0.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", NUM_KEYS*2 )); + + /* verify */ + verifyRegionSize(vm0, NUM_KEYS*2); + verifyRegionSize(vm1, NUM_KEYS*2); + verifyRegionSize(vm2, NUM_KEYS*2); + verifyRegionSize(vm3, NUM_KEYS*2); + } + +}