GEODE-3147: Set commBuffer for threads performing TXSynchronization calls when max-threads is set.
Added an interface that is implemented by AcceptorImpl to set and release commBuffer. Added a unit test case that failed without this fix. Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/a1583458 Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/a1583458 Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/a1583458 Branch: refs/heads/feature/GEODE-3109 Commit: a1583458e81b0a3fb3766948dd5b03af9fabf907 Parents: ac0474d Author: eshu <e...@pivotal.io> Authored: Thu Jun 29 17:03:44 2017 -0700 Committer: eshu <e...@pivotal.io> Committed: Thu Jun 29 17:03:44 2017 -0700 ---------------------------------------------------------------------- .../cache/TXSynchronizationRunnable.java | 15 +++++- .../cache/tier/sockets/AcceptorImpl.java | 25 ++++++++-- .../cache/tier/sockets/CommBufferPool.java | 36 +++++++++++++++ .../cache/tier/sockets/ServerConnection.java | 4 +- .../command/TXSynchronizationCommand.java | 3 +- .../internal/jta/ClientServerJTADUnitTest.java | 48 ++++++++++++++++++-- 6 files changed, 118 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/a1583458/geode-core/src/main/java/org/apache/geode/internal/cache/TXSynchronizationRunnable.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXSynchronizationRunnable.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXSynchronizationRunnable.java index 35b0e75..884f281 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXSynchronizationRunnable.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXSynchronizationRunnable.java @@ -15,7 +15,7 @@ package org.apache.geode.internal.cache; import org.apache.logging.log4j.Logger; - +import org.apache.geode.internal.cache.tier.sockets.CommBufferPool; import org.apache.geode.internal.logging.LogService; /** @@ -38,12 +38,23 @@ public class TXSynchronizationRunnable implements Runnable { private boolean secondRunnableCompleted; private boolean abort; + private final CommBufferPool commBufferPool; - public TXSynchronizationRunnable(Runnable beforeCompletion) { + public TXSynchronizationRunnable(Runnable beforeCompletion, final CommBufferPool commBufferPool) { this.firstRunnable = beforeCompletion; + this.commBufferPool = commBufferPool; } public void run() { + commBufferPool.setTLCommBuffer(); + try { + doSynchronizationOps(); + } finally { + commBufferPool.releaseTLCommBuffer(); + } + } + + private void doSynchronizationOps() { synchronized (this.firstRunnableSync) { try { this.firstRunnable.run(); http://git-wip-us.apache.org/repos/asf/geode/blob/a1583458/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java index 472af09..3c424d3 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java @@ -98,7 +98,7 @@ import javax.net.ssl.SSLException; * @since GemFire 2.0.2 */ @SuppressWarnings("deprecation") -public class AcceptorImpl extends Acceptor implements Runnable { +public class AcceptorImpl extends Acceptor implements Runnable, CommBufferPool { private static final Logger logger = LogService.getLogger(); private static final boolean isJRockit = System.getProperty("java.vm.name").contains("JRockit"); @@ -1373,7 +1373,7 @@ public class AcceptorImpl extends Acceptor implements Runnable { } } - public ByteBuffer takeCommBuffer() { + private ByteBuffer takeCommBuffer() { ByteBuffer result = (ByteBuffer) this.commBufferQueue.poll(); if (result == null) { result = ByteBuffer.allocateDirect(this.socketBufferSize); @@ -1381,7 +1381,7 @@ public class AcceptorImpl extends Acceptor implements Runnable { return result; } - public void releaseCommBuffer(ByteBuffer bb) { + private void releaseCommBuffer(ByteBuffer bb) { if (bb == null) { // fix for bug 37107 return; } @@ -1791,4 +1791,23 @@ public class AcceptorImpl extends Acceptor implements Runnable { public ServerConnection[] getAllServerConnectionList() { return this.allSCList; } + + @Override + public void setTLCommBuffer() { + // The thread local will only be set if maxThreads has been set. + if (!isSelector()) { + return; + } + + Message.setTLCommBuffer(takeCommBuffer()); + } + + @Override + public void releaseTLCommBuffer() { + if (!isSelector()) { + return; + } + + releaseCommBuffer(Message.setTLCommBuffer(null)); + } } http://git-wip-us.apache.org/repos/asf/geode/blob/a1583458/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CommBufferPool.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CommBufferPool.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CommBufferPool.java new file mode 100644 index 0000000..de3189e --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CommBufferPool.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.internal.cache.tier.sockets; + +/** + * Defines the comm buffer pool interface which can set a comm buffer in a ThreadLocal and return + * the buffer back to the comm buffer queue. + * + */ +public interface CommBufferPool { + + /** + * Set a comm buffer in a ThreadLocal. + * + */ + public void setTLCommBuffer(); + + /** + * Release the ThreadLocal comm buffer back to the queue. + * + */ + public void releaseTLCommBuffer(); +} http://git-wip-us.apache.org/repos/asf/geode/blob/a1583458/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java index ebc9dab..8704dad 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java @@ -1118,7 +1118,7 @@ public abstract class ServerConnection implements Runnable { try { this.stats.decThreadQueueSize(); if (!isTerminated()) { - Message.setTLCommBuffer(getAcceptor().takeCommBuffer()); + getAcceptor().setTLCommBuffer(); doOneMessage(); if (this.processMessages && !(this.crHelper.isShutdown())) { registerWithSelector(); // finished msg so reregister @@ -1134,7 +1134,7 @@ public abstract class ServerConnection implements Runnable { LocalizedMessage.create(LocalizedStrings.ServerConnection_0__UNEXPECTED_EXCEPTION, ex)); setClientDisconnectedException(ex); } finally { - getAcceptor().releaseCommBuffer(Message.setTLCommBuffer(null)); + getAcceptor().releaseTLCommBuffer(); // DistributedSystem.releaseThreadsSockets(); unsetOwner(); setNotProcessingMessage(); http://git-wip-us.apache.org/repos/asf/geode/blob/a1583458/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java index b1b0cfb..eb70700 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java @@ -142,7 +142,8 @@ public class TXSynchronizationCommand extends BaseCommand { } } }; - TXSynchronizationRunnable sync = new TXSynchronizationRunnable(beforeCompletion); + TXSynchronizationRunnable sync = + new TXSynchronizationRunnable(beforeCompletion, serverConnection.getAcceptor()); txProxy.setSynchronizationRunnable(sync); Executor exec = InternalDistributedSystem.getConnectedInstance().getDistributionManager() .getWaitingThreadPool(); http://git-wip-us.apache.org/repos/asf/geode/blob/a1583458/geode-core/src/test/java/org/apache/geode/internal/jta/ClientServerJTADUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/jta/ClientServerJTADUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/jta/ClientServerJTADUnitTest.java index 51ef44a..ddf08d0 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/jta/ClientServerJTADUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/jta/ClientServerJTADUnitTest.java @@ -49,19 +49,19 @@ public class ClientServerJTADUnitTest extends JUnit4CacheTestCase { private String key = "key"; private String value = "value"; private String newValue = "newValue"; + final Host host = Host.getHost(0); + final VM server = host.getVM(0); + final VM client = host.getVM(1); @Test public void testClientTXStateStubBeforeCompletion() throws Exception { - final Host host = Host.getHost(0); - final VM server = host.getVM(0); - final VM client = host.getVM(1); final String regionName = getUniqueName(); getBlackboard().initBlackboard(); final Properties properties = getDistributedSystemProperties(); final int port = server.invoke("create cache", () -> { Cache cache = getCache(properties); - CacheServer cacheServer = createCacheServer(cache); + CacheServer cacheServer = createCacheServer(cache, 0); Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(regionName); region.put(key, value); @@ -106,8 +106,9 @@ public class ClientServerJTADUnitTest extends JUnit4CacheTestCase { } } - private CacheServer createCacheServer(Cache cache) { + private CacheServer createCacheServer(Cache cache, int maxThreads) { CacheServer server = cache.addCacheServer(); + server.setMaxThreads(maxThreads); server.setPort(AvailablePortHelper.getRandomAvailableTCPPort()); try { server.start(); @@ -140,4 +141,41 @@ public class ClientServerJTADUnitTest extends JUnit4CacheTestCase { } txStub.afterCompletion(Status.STATUS_COMMITTED); } + + @Test + public void testJTAMaxThreads() throws TimeoutException, InterruptedException { + testJTAWithMaxThreads(1); + } + + @Test + public void testJTANoMaxThreadsSetting() throws TimeoutException, InterruptedException { + testJTAWithMaxThreads(0); + } + + private void testJTAWithMaxThreads(int maxThreads) { + final String regionName = getUniqueName(); + getBlackboard().initBlackboard(); + final Properties properties = getDistributedSystemProperties(); + + final int port = server.invoke("create cache", () -> { + Cache cache = getCache(properties); + CacheServer cacheServer = createCacheServer(cache, maxThreads); + Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(regionName); + region.put(key, value); + + return cacheServer.getPort(); + }); + + createClientRegion(host, port, regionName); + + Region region = getCache().getRegion(regionName); + assertTrue(region.get(key).equals(value)); + + try { + commitTxWithBeforeCompletion(regionName, false, null, null); + } catch (Exception e) { + Assert.fail("got unexpected exception", e); + } + assertTrue(region.get(key).equals(newValue)); + } }