GEODE-1246: change async event pool to use all its threads The system property gemfire.Cache.EVENT_THREAD_LIMIT can be used to configure the max number of threads used for async cache listener invocation. It defaults to 16.
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/1af92d89 Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/1af92d89 Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/1af92d89 Branch: refs/heads/feature/GEODE-835 Commit: 1af92d89108dbdb570f60fe4682edb77558f91e2 Parents: a17dc71 Author: Darrel Schneider <dschnei...@pivotal.io> Authored: Wed May 18 09:23:38 2016 -0700 Committer: Darrel Schneider <dschnei...@pivotal.io> Committed: Fri May 20 14:54:24 2016 -0700 ---------------------------------------------------------------------- .../internal/PooledExecutorWithDMStats.java | 9 +-- .../internal/locks/DLockService.java | 9 ++- .../internal/cache/GemFireCacheImpl.java | 37 +++++------ .../gemfire/internal/cache/LocalRegion.java | 2 +- .../gemfire/internal/jndi/JNDIInvoker.java | 3 +- .../internal/cache/GemFireCacheImplTest.java | 66 ++++++++++++++++++++ .../com/gemstone/gemfire/test/fake/Fakes.java | 10 +++ 7 files changed, 106 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1af92d89/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/PooledExecutorWithDMStats.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/PooledExecutorWithDMStats.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/PooledExecutorWithDMStats.java index 0a9f9ec..3909474 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/PooledExecutorWithDMStats.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/PooledExecutorWithDMStats.java @@ -39,7 +39,7 @@ public class PooledExecutorWithDMStats extends ThreadPoolExecutor { /** * Create a new pool **/ - public PooledExecutorWithDMStats(BlockingQueue<Runnable> q, int maxPoolSize, PoolStatHelper stats, ThreadFactory tf, int msTimeout, RejectedExecutionHandler reh) { + public PooledExecutorWithDMStats(SynchronousQueue<Runnable> q, int maxPoolSize, PoolStatHelper stats, ThreadFactory tf, int msTimeout, RejectedExecutionHandler reh) { super(getCorePoolSize(maxPoolSize), maxPoolSize, msTimeout, TimeUnit.MILLISECONDS, q, tf, reh); @@ -61,9 +61,9 @@ public class PooledExecutorWithDMStats extends ThreadPoolExecutor { */ private Thread bufferConsumer; - private static BlockingQueue<Runnable> initQ(BlockingQueue<Runnable> q) { + private static SynchronousQueue<Runnable> initQ(BlockingQueue<Runnable> q) { if (q instanceof SynchronousQueue) { - return q; + return (SynchronousQueue<Runnable>) q; } else { return new SynchronousQueue/*NoSpin*/<Runnable>(); } @@ -95,7 +95,8 @@ public class PooledExecutorWithDMStats extends ThreadPoolExecutor { try { for (;;) { SystemFailure.checkFailure(); - putQueue.put(takeQueue.take()); + Runnable job = takeQueue.take(); + putQueue.put(job); } } catch (InterruptedException ie) { http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1af92d89/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/locks/DLockService.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/locks/DLockService.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/locks/DLockService.java index 610944e..844a93d 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/locks/DLockService.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/locks/DLockService.java @@ -133,7 +133,7 @@ public class DLockService extends DistributedLockService { private DLockRecoverGrantorProcessor.MessageProcessor recoverGrantorProcessor; /** Thread-safe reference to DistributedLockStats */ - private final DistributedLockStats dlockStats = getOrCreateStats(); + private final DistributedLockStats dlockStats; /** * Protects {@link #lockGrantorId}, {@link #grantor} and @@ -2145,6 +2145,7 @@ public class DLockService extends DistributedLockService { boolean destroyOnDisconnect, boolean automateFreeResources) { super(); + this.dlockStats = getOrCreateStats(ds); this.serialNumber = createSerialNumber(); this.serviceName = serviceName; this.ds = (InternalDistributedSystem) ds; @@ -3185,12 +3186,10 @@ public class DLockService extends DistributedLockService { } /** Get or create static dlock stats */ - protected static synchronized DistributedLockStats getOrCreateStats() { + protected static synchronized DistributedLockStats getOrCreateStats(DistributedSystem ds) { if (stats == DUMMY_STATS) { - InternalDistributedSystem ds = - InternalDistributedSystem.getAnyInstance(); Assert.assertTrue(ds != null, - "Cannot find any instance of InternalDistributedSystem"); + "Need an instance of InternalDistributedSystem"); StatisticsFactory statFactory = ds; long statId = OSProcess.getId(); stats = new DLockStats(statFactory, statId); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1af92d89/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java index 96b7bbc..c44f3b7 100755 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java @@ -66,7 +66,6 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -270,9 +269,9 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer private static final Set<CacheLifecycleListener> cacheLifecycleListeners = new HashSet<CacheLifecycleListener>(); /** - * Define LocalRegion.ASYNC_EVENT_LISTENERS=true to invoke event listeners in the background + * Define gemfire.Cache.ASYNC_EVENT_LISTENERS=true to invoke event listeners in the background */ - public static final boolean ASYNC_EVENT_LISTENERS = Boolean.getBoolean("gemfire.Cache.ASYNC_EVENT_LISTENERS"); + private static final boolean ASYNC_EVENT_LISTENERS = Boolean.getBoolean("gemfire.Cache.ASYNC_EVENT_LISTENERS"); /** * If true then when a delta is applied the size of the entry value will be recalculated. If false (the default) then @@ -283,6 +282,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer public static boolean DELTAS_RECALCULATE_SIZE = Boolean.getBoolean("gemfire.DELTAS_RECALCULATE_SIZE"); public static final int EVENT_QUEUE_LIMIT = Integer.getInteger("gemfire.Cache.EVENT_QUEUE_LIMIT", 4096).intValue(); + public static final int EVENT_THREAD_LIMIT = Integer.getInteger("gemfire.Cache.EVENT_THREAD_LIMIT", 16).intValue(); /** * System property to limit the max query-execution time. By default its turned off (-1), the time is set in MiliSecs. @@ -763,18 +763,22 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer // } public static GemFireCacheImpl createClient(DistributedSystem system, PoolFactory pf, CacheConfig cacheConfig) { - return basicCreate(system, true, cacheConfig, pf, true); + return basicCreate(system, true, cacheConfig, pf, true, ASYNC_EVENT_LISTENERS); } public static GemFireCacheImpl create(DistributedSystem system, CacheConfig cacheConfig) { - return basicCreate(system, true, cacheConfig, null, false); + return basicCreate(system, true, cacheConfig, null, false, ASYNC_EVENT_LISTENERS); } - public static Cache create(DistributedSystem system, boolean existingOk, CacheConfig cacheConfig) { - return basicCreate(system, existingOk, cacheConfig, null, false); + public static GemFireCacheImpl createWithAsyncEventListeners(DistributedSystem system, CacheConfig cacheConfig) { + return basicCreate(system, true, cacheConfig, null, false, true); + } + + public static Cache create(DistributedSystem system, boolean existingOk, CacheConfig cacheConfig) { + return basicCreate(system, existingOk, cacheConfig, null, false, ASYNC_EVENT_LISTENERS); } - private static GemFireCacheImpl basicCreate(DistributedSystem system, boolean existingOk, CacheConfig cacheConfig, PoolFactory pf, boolean isClient) + private static GemFireCacheImpl basicCreate(DistributedSystem system, boolean existingOk, CacheConfig cacheConfig, PoolFactory pf, boolean isClient, boolean asyncEventListeners) throws CacheExistsException, TimeoutException, CacheWriterException, GatewayException, RegionExistsException @@ -782,7 +786,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer try { GemFireCacheImpl instance = checkExistingCache(existingOk, cacheConfig); if (instance == null) { - instance = new GemFireCacheImpl(isClient, pf, system, cacheConfig); + instance = new GemFireCacheImpl(isClient, pf, system, cacheConfig, asyncEventListeners); instance.initialize(); } return instance; @@ -814,7 +818,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer /** * Creates a new instance of GemFireCache and populates it according to the <code>cache.xml</code>, if appropriate. */ - private GemFireCacheImpl(boolean isClient, PoolFactory pf, DistributedSystem system, CacheConfig cacheConfig) { + private GemFireCacheImpl(boolean isClient, PoolFactory pf, DistributedSystem system, CacheConfig cacheConfig, boolean asyncEventListeners) { this.isClient = isClient; this.clientpf = pf; this.cacheConfig = cacheConfig; // do early for bug 43213 @@ -825,7 +829,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer // start JTA transaction manager within this synchronized block // to prevent race with cache close. fixes bug 43987 - JNDIInvoker.mapTransactions(); + JNDIInvoker.mapTransactions(system); this.system = (InternalDistributedSystem) system; this.dm = this.system.getDistributionManager(); if (!this.isClient && PoolManager.getAll().isEmpty()) { @@ -868,7 +872,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer this.persistentMemberManager = new PersistentMemberManager(); - if (ASYNC_EVENT_LISTENERS) { + if (asyncEventListeners) { final ThreadGroup group = LoggingThreadGroup.createThreadGroup("Message Event Threads",logger); ThreadFactory tf = new ThreadFactory() { public Thread newThread(final Runnable command) { @@ -883,11 +887,8 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer return thread; } }; - // @todo darrel: add stats - // this.cachePerfStats.getEventQueueHelper()); ArrayBlockingQueue q = new ArrayBlockingQueue(EVENT_QUEUE_LIMIT); - this.eventThreadPool = new PooledExecutorWithDMStats(q, 16, this.cachePerfStats.getEventPoolHelper(), tf, 1000, - new CallerRunsPolicy()); + this.eventThreadPool = new PooledExecutorWithDMStats(q, EVENT_THREAD_LIMIT, this.cachePerfStats.getEventPoolHelper(), tf, 1000); } else { this.eventThreadPool = null; } @@ -2078,7 +2079,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer destroyGatewaySenderLockService(); - if (ASYNC_EVENT_LISTENERS) { + if (this.eventThreadPool != null) { if (isDebugEnabled) { logger.debug("{}: stopping event thread pool...", this); } @@ -3812,11 +3813,11 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer /** * Returns the <code>Executor</code> (thread pool) that is used to execute cache event listeners. + * Returns <code>null</code> if no pool exists. * * @since 3.5 */ Executor getEventThreadPool() { - Assert.assertTrue(this.eventThreadPool != null); return this.eventThreadPool; } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1af92d89/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java index b5ff7ee..d28496c 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java @@ -7582,7 +7582,7 @@ public class LocalRegion extends AbstractRegion } } - if (!GemFireCacheImpl.ASYNC_EVENT_LISTENERS) { + if (this.cache.getEventThreadPool() == null) { dispatchEvent(this, event, op); } else { http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1af92d89/geode-core/src/main/java/com/gemstone/gemfire/internal/jndi/JNDIInvoker.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/jndi/JNDIInvoker.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/jndi/JNDIInvoker.java index b3aecc4..2743537 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/jndi/JNDIInvoker.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/jndi/JNDIInvoker.java @@ -125,9 +125,8 @@ public class JNDIInvoker { * </p> * */ - public static void mapTransactions() { + public static void mapTransactions(DistributedSystem distSystem) { try { - DistributedSystem distSystem = InternalDistributedSystem.getAnyInstance(); TransactionUtils.setLogWriter(distSystem.getLogWriter().convertToLogWriterI18n()); cleanup(); if (IGNORE_JTA) { http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1af92d89/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/GemFireCacheImplTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/GemFireCacheImplTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/GemFireCacheImplTest.java new file mode 100644 index 0000000..3dc2f7a --- /dev/null +++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/GemFireCacheImplTest.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache; + +import static org.junit.Assert.*; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.gemstone.gemfire.test.fake.Fakes; +import com.gemstone.gemfire.test.junit.categories.UnitTest; +import com.jayway.awaitility.Awaitility; + +@Category(UnitTest.class) +public class GemFireCacheImplTest { + + @Test + public void checkThatAsyncEventListenersUseAllThreadsInPool() { + + GemFireCacheImpl gfc = GemFireCacheImpl.createWithAsyncEventListeners(Fakes.distributedSystem(), new CacheConfig()); + ThreadPoolExecutor executor = (ThreadPoolExecutor) gfc.getEventThreadPool(); + final long initialCount = executor.getCompletedTaskCount(); + try { + int MAX_THREADS = GemFireCacheImpl.EVENT_THREAD_LIMIT; + final CountDownLatch cdl = new CountDownLatch(MAX_THREADS); + for (int i = 1; i <= MAX_THREADS; i++) { + Runnable r = new Runnable() { + @Override + public void run() { + cdl.countDown(); + try { + cdl.await(); + } catch (InterruptedException e) { + } + } + }; + executor.execute(r); + } + Awaitility.await().pollInterval(10, TimeUnit.MILLISECONDS).pollDelay(10, TimeUnit.MILLISECONDS).timeout(15, TimeUnit.SECONDS) + .until(() -> { + return executor.getCompletedTaskCount() == MAX_THREADS+initialCount; + }); + } finally { + executor.shutdown(); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1af92d89/geode-core/src/test/java/com/gemstone/gemfire/test/fake/Fakes.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/com/gemstone/gemfire/test/fake/Fakes.java b/geode-core/src/test/java/com/gemstone/gemfire/test/fake/Fakes.java index 2a1fd8e..16734b5 100644 --- a/geode-core/src/test/java/com/gemstone/gemfire/test/fake/Fakes.java +++ b/geode-core/src/test/java/com/gemstone/gemfire/test/fake/Fakes.java @@ -18,11 +18,14 @@ package com.gemstone.gemfire.test.fake; import static org.mockito.Mockito.*; +import java.io.File; import java.net.UnknownHostException; import org.junit.Assert; import com.gemstone.gemfire.CancelCriterion; +import com.gemstone.gemfire.LogWriter; +import com.gemstone.gemfire.Statistics; import com.gemstone.gemfire.cache.Cache; import com.gemstone.gemfire.cache.DataPolicy; import com.gemstone.gemfire.cache.Region; @@ -65,6 +68,8 @@ public class Fakes { DistributionManager distributionManager = mock(DistributionManager.class); CancelCriterion systemCancelCriterion = mock(CancelCriterion.class); DSClock clock = mock(DSClock.class); + LogWriter logger = mock(LogWriter.class); + Statistics stats = mock(Statistics.class); InternalDistributedMember member; try { @@ -73,6 +78,8 @@ public class Fakes { throw new RuntimeException(e); } + when(config.getCacheXmlFile()).thenReturn(new File("")); + when(config.getDeployWorkingDir()).thenReturn(new File(".")); when(cache.getDistributedSystem()).thenReturn(system); when(cache.getMyId()).thenReturn(member); @@ -84,6 +91,9 @@ public class Fakes { when(system.getDistributionManager()).thenReturn(distributionManager); when(system.getCancelCriterion()).thenReturn(systemCancelCriterion); when(system.getClock()).thenReturn(clock); + when(system.getLogWriter()).thenReturn(logger); + when(system.createAtomicStatistics(any(), any(), anyLong())).thenReturn(stats); + when(system.createAtomicStatistics(any(), any())).thenReturn(stats); when(distributionManager.getId()).thenReturn(member); when(distributionManager.getConfig()).thenReturn(config);