This is an automated email from the ASF dual-hosted git repository. udo pushed a commit to branch feature/GEODE-3637 in repository https://gitbox.apache.org/repos/asf/geode.git
commit 849d2fde12aca79f13579e61ff80999b57560116 Author: kohlmu-pivotal <[email protected]> AuthorDate: Mon Nov 6 16:26:42 2017 -0800 GEODE-3637: Revert changes to client queue initialization --- .../internal/cache/tier/sockets/AcceptorImpl.java | 312 +++++++++------------ .../cache/tier/sockets/ServerConnection.java | 51 ++-- .../sockets/AcceptorImplClientQueueDUnitTest.java | 263 ----------------- .../apache/geode/test/dunit/rules/CacheRule.java | 22 +- 4 files changed, 157 insertions(+), 491 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java index 21a0ac3..5b289a9 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java @@ -103,7 +103,6 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool { private static final Logger logger = LogService.getLogger(); private static final boolean isJRockit = System.getProperty("java.vm.name").contains("JRockit"); - private static final int HANDSHAKER_DEFAULT_POOL_SIZE = 4; protected final CacheServerStats stats; private final int maxConnections; @@ -116,11 +115,6 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool { private final ThreadPoolExecutor hsPool; /** - * A pool used to process client-queue-initializations. - */ - private final ThreadPoolExecutor clientQueueInitPool; - - /** * The port on which this acceptor listens for client connections */ private final int localPort; @@ -540,126 +534,103 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool { this.healthMonitor = ClientHealthMonitor.getInstance(internalCache, maximumTimeBetweenPings, this.clientNotifier.getStats()); - pool = initializeServerConnectionThreadPool(); - hsPool = initializeHandshakerThreadPool(); - clientQueueInitPool = initializeClientQueueInitializerThreadPool(); - - isAuthenticationRequired = this.securityService.isClientSecurityRequired(); - - isIntegratedSecurity = this.securityService.isIntegratedSecurity(); - - String postAuthzFactoryName = - this.cache.getDistributedSystem().getProperties().getProperty(SECURITY_CLIENT_ACCESSOR_PP); - - isPostAuthzCallbackPresent = - (postAuthzFactoryName != null && postAuthzFactoryName.length() > 0) ? true : false; - } - - private ThreadPoolExecutor initializeHandshakerThreadPool() throws IOException { - String gName = "Handshaker " + serverSock.getInetAddress() + ":" + this.localPort; - final ThreadGroup socketThreadGroup = LoggingThreadGroup.createThreadGroup(gName, logger); - - ThreadFactory socketThreadFactory = new ThreadFactory() { - AtomicInteger connNum = new AtomicInteger(-1); - - @Override - public Thread newThread(Runnable command) { - String threadName = socketThreadGroup.getName() + " Thread " + connNum.incrementAndGet(); - getStats().incAcceptThreadsCreated(); - return new Thread(socketThreadGroup, command, threadName); - } - }; - try { - final BlockingQueue blockingQueue = new SynchronousQueue(); - final RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() { - public void rejectedExecution(Runnable r, ThreadPoolExecutor pool) { - try { - blockingQueue.put(r); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); // preserve the state - throw new RejectedExecutionException( - LocalizedStrings.AcceptorImpl_INTERRUPTED.toLocalizedString(), ex); + { + ThreadPoolExecutor tmp_pool = null; + String gName = "ServerConnection " + // + serverSock.getInetAddress() + + "on port " + this.localPort; + final ThreadGroup socketThreadGroup = LoggingThreadGroup.createThreadGroup(gName, logger); + + ThreadFactory socketThreadFactory = new ThreadFactory() { + int connNum = -1; + + public Thread newThread(final Runnable command) { + int tnum; + synchronized (this) { + tnum = ++connNum; } + String tName = socketThreadGroup.getName() + " Thread " + tnum; + getStats().incConnectionThreadsCreated(); + Runnable r = new Runnable() { + public void run() { + try { + command.run(); + } catch (CancelException e) { // bug 39463 + // ignore + } finally { + ConnectionTable.releaseThreadsSockets(); + } + } + }; + return new Thread(socketThreadGroup, r, tName); } }; - logger.warn("Handshaker max Pool size: " + HANDSHAKE_POOL_SIZE); - return new ThreadPoolExecutor(1, HANDSHAKE_POOL_SIZE, 60, TimeUnit.SECONDS, blockingQueue, - socketThreadFactory, rejectedExecutionHandler); - } catch (IllegalArgumentException poolInitException) { - this.stats.close(); - this.serverSock.close(); - this.pool.shutdownNow(); - throw poolInitException; + try { + if (isSelector()) { + tmp_pool = new PooledExecutorWithDMStats(new LinkedBlockingQueue(), this.maxThreads, + getStats().getCnxPoolHelper(), socketThreadFactory, Integer.MAX_VALUE); + } else { + tmp_pool = new ThreadPoolExecutor(MINIMUM_MAX_CONNECTIONS, this.maxConnections, 0L, + TimeUnit.MILLISECONDS, new SynchronousQueue(), socketThreadFactory); + } + } catch (IllegalArgumentException poolInitException) { + this.stats.close(); + this.serverSock.close(); + throw poolInitException; + } + this.pool = tmp_pool; } - } - - private ThreadPoolExecutor initializeClientQueueInitializerThreadPool() throws IOException { - final ThreadGroup clientQueueThreadGroup = - LoggingThreadGroup.createThreadGroup("Client Queue Initialization ", logger); + { + ThreadPoolExecutor tmp_hsPool = null; + String gName = "Handshaker " + serverSock.getInetAddress() + ":" + this.localPort; + final ThreadGroup socketThreadGroup = LoggingThreadGroup.createThreadGroup(gName, logger); - ThreadFactory clientQueueThreadFactory = new ThreadFactory() { - AtomicInteger connNum = new AtomicInteger(-1); + ThreadFactory socketThreadFactory = new ThreadFactory() { + int connNum = -1; - @Override - public Thread newThread(final Runnable command) { - String threadName = - clientQueueThreadGroup.getName() + " Thread " + connNum.incrementAndGet(); - Runnable runnable = new Runnable() { - public void run() { - try { - command.run(); - } catch (CancelException e) { - logger.debug("Client Queue Initialization was canceled.", e); - } + public Thread newThread(Runnable command) { + int tnum; + synchronized (this) { + tnum = ++connNum; } - }; - return new Thread(clientQueueThreadGroup, runnable, threadName); - } - }; - return new PooledExecutorWithDMStats(new SynchronousQueue(), 16, getStats().getCnxPoolHelper(), - clientQueueThreadFactory, 60000); - } - - private ThreadPoolExecutor initializeServerConnectionThreadPool() throws IOException { - String gName = "ServerConnection " - // + serverSock.getInetAddress() - + "on port " + this.localPort; - final ThreadGroup socketThreadGroup = LoggingThreadGroup.createThreadGroup(gName, logger); - - ThreadFactory socketThreadFactory = new ThreadFactory() { - AtomicInteger connNum = new AtomicInteger(-1); - - @Override - public Thread newThread(final Runnable command) { - String tName = socketThreadGroup.getName() + " Thread " + connNum.incrementAndGet(); - getStats().incConnectionThreadsCreated(); - Runnable r = new Runnable() { - public void run() { + String tName = socketThreadGroup.getName() + " Thread " + tnum; + getStats().incAcceptThreadsCreated(); + return new Thread(socketThreadGroup, command, tName); + } + }; + try { + final BlockingQueue bq = new SynchronousQueue(); + final RejectedExecutionHandler reh = new RejectedExecutionHandler() { + public void rejectedExecution(Runnable r, ThreadPoolExecutor pool) { try { - command.run(); - } catch (CancelException e) { // bug 39463 - // ignore - } finally { - ConnectionTable.releaseThreadsSockets(); + bq.put(r); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); // preserve the state + throw new RejectedExecutionException( + LocalizedStrings.AcceptorImpl_INTERRUPTED.toLocalizedString(), ex); } } }; - return new Thread(socketThreadGroup, r, tName); - } - }; - try { - if (isSelector()) { - return new PooledExecutorWithDMStats(new LinkedBlockingQueue(), this.maxThreads, - getStats().getCnxPoolHelper(), socketThreadFactory, Integer.MAX_VALUE); - } else { - return new ThreadPoolExecutor(MINIMUM_MAX_CONNECTIONS, this.maxConnections, 0L, - TimeUnit.MILLISECONDS, new SynchronousQueue(), socketThreadFactory); + tmp_hsPool = new ThreadPoolExecutor(1, HANDSHAKE_POOL_SIZE, 60, TimeUnit.SECONDS, bq, + socketThreadFactory, reh); + } catch (IllegalArgumentException poolInitException) { + this.stats.close(); + this.serverSock.close(); + this.pool.shutdownNow(); + throw poolInitException; } - } catch (IllegalArgumentException poolInitException) { - this.stats.close(); - this.serverSock.close(); - throw poolInitException; + this.hsPool = tmp_hsPool; } + + isAuthenticationRequired = this.securityService.isClientSecurityRequired(); + + isIntegratedSecurity = this.securityService.isIntegratedSecurity(); + + String postAuthzFactoryName = + this.cache.getDistributedSystem().getProperties().getProperty(SECURITY_CLIENT_ACCESSOR_PP); + + isPostAuthzCallbackPresent = + (postAuthzFactoryName != null && postAuthzFactoryName.length() > 0) ? true : false; } public long getAcceptorId() { @@ -695,8 +666,8 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool { @Deprecated private static final int DEPRECATED_SELECTOR_POOL_SIZE = Integer.getInteger("BridgeServer.SELECTOR_POOL_SIZE", 16).intValue(); - private static final int HANDSHAKE_POOL_SIZE = Integer - .getInteger("BridgeServer.HANDSHAKE_POOL_SIZE", HANDSHAKER_DEFAULT_POOL_SIZE).intValue(); + private static final int HANDSHAKE_POOL_SIZE = + Integer.getInteger("BridgeServer.HANDSHAKE_POOL_SIZE", 4).intValue(); @Override public void start() throws IOException { @@ -831,9 +802,8 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool { * @see SystemFailure#loadEmergencyClasses() */ public static void loadEmergencyClasses() { - if (emergencyClassesLoaded) { + if (emergencyClassesLoaded) return; - } emergencyClassesLoaded = true; CachedRegionHelper.loadEmergencyClasses(); ServerConnection.loadEmergencyClasses(); @@ -900,9 +870,8 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool { private Selector tmpSel; private void checkForStuckKeys() { - if (!WORKAROUND_SELECTOR_BUG) { + if (!WORKAROUND_SELECTOR_BUG) return; - } if (tmpSel == null) { try { tmpSel = Selector.open(); @@ -918,9 +887,8 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool { while (it.hasNext()) { SelectionKey sk = (SelectionKey) it.next(); ServerConnection sc = (ServerConnection) sk.attachment(); - if (sc == null) { + if (sc == null) continue; - } try { sk.cancel(); this.selector.selectNow(); // clear the cancelled key @@ -1072,7 +1040,40 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool { break; } if (events == 0) { + // zeroEventsCount++; + // if (zeroEventsCount > 0) { + // zeroEventsCount = 0; checkForStuckKeys(); + + // try { + // this.selector.close(); // this selector is sick! + // } catch (IOException ignore) { + // } + // this.selector = Selector.open(); + // { + // Iterator it = selectorRegistrations.iterator(); + // while (it.hasNext()) { + // ServerConnection sc = (ServerConnection)it.next(); + // sc.registerWithSelector2(this.selector); + // } + // } + // } + // ArrayList al = new ArrayList(); + // Iterator keysIt = this.selector.keys().iterator(); + // while (keysIt.hasNext()) { + // SelectionKey sk = (SelectionKey)keysIt.next(); + // al.add(sk.attachment()); + // sk.cancel(); + // } + // events = this.selector.selectNow(); + // Iterator alIt = al.iterator(); + // while (alIt.hasNext()) { + // ServerConnection sc = (ServerConnection)alIt.next(); + // sc.registerWithSelector2(this.selector); + // } + // events = this.selector.select(); + // } else { + // zeroEventsCount = 0; } while (events > 0) { int cancelCount = 0; @@ -1129,11 +1130,16 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool { logger.warn( LocalizedMessage.create(LocalizedStrings.AcceptorImpl_UNEXPECTED, rejected)); } + // } else if (key.isValid() && key.isConnectable()) { + // logger.info("DEBUG isConnectable and isValid key=" + key); + // finishCon(sc); } else { finishCon(sc); if (key.isValid()) { logger.warn(LocalizedMessage.create( LocalizedStrings.AcceptorImpl_IGNORING_EVENT_ON_SELECTOR_KEY__0, key)); + // } else { + // logger.info("DEBUG !isValid key=" + key); } } } catch (CancelledKeyException ex) { // fix for bug 37739 @@ -1399,10 +1405,6 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool { return this.clientServerCnxCount.get(); } - public boolean isNotifyBySubscription() { - return notifyBySubscription; - } - protected void handleNewClientConnection(final Socket socket, final ServerConnectionFactory serverConnectionFactory) throws IOException { // Read the first byte. If this socket is being used for 'client to server' @@ -1425,9 +1427,12 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool { throw new EOFException(); } - // GEODE-3637 - If the communicationMode is client Subscriptions, hand-off the client queue - // initialization to be done in another threadPool - if (initializeClientPools(socket, communicationMode)) { + if (communicationMode.isSubscriptionFeed()) { + boolean primary = communicationMode == CommunicationMode.PrimaryServerToClient; + logger.debug(":Bridge server: Initializing {} server-to-client communication socket: {}", + primary ? "primary" : "secondary", socket); + AcceptorImpl.this.clientNotifier.registerClient(socket, primary, this.acceptorId, + this.notifyBySubscription); return; } @@ -1493,17 +1498,6 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool { } } - private boolean initializeClientPools(Socket socket, CommunicationMode communicationMode) { - if (communicationMode.isSubscriptionFeed()) { - boolean isPrimaryServerToClient = - communicationMode == CommunicationMode.PrimaryServerToClient; - clientQueueInitPool - .execute(new ClientQueueInitializerTask(socket, isPrimaryServerToClient, this)); - return true; - } - return false; - } - private CommunicationMode getCommunicationModeForNonSelector(Socket socket) throws IOException { socket.setSoTimeout(this.acceptTimeout); this.socketCreator.configureServerSSLSocket(socket); @@ -1668,6 +1662,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool { * then calculate it. * @return the ip address or host name this acceptor will listen on. An "" if all local addresses * will be listened to. + * * @since GemFire 5.7 */ private static String calcBindHostName(Cache cache, String bindName) { @@ -1796,6 +1791,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool { * This method returns a thread safe structure which can be iterated over without worrying about * ConcurrentModificationException. JMX MBeans/Commands need to iterate over this list to get * client info. + * */ public ServerConnection[] getAllServerConnectionList() { return this.allSCList; @@ -1819,42 +1815,4 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool { releaseCommBuffer(Message.setTLCommBuffer(null)); } - - private class ClientQueueInitializerTask implements Runnable { - private final Socket socket; - private final boolean isPrimaryServerToClient; - private final AcceptorImpl acceptor; - - public ClientQueueInitializerTask(Socket socket, boolean isPrimaryServerToClient, - AcceptorImpl acceptor) { - this.socket = socket; - this.acceptor = acceptor; - this.isPrimaryServerToClient = isPrimaryServerToClient; - } - - @Override - public void run() { - logger.info(":Bridge server: Initializing {} server-to-client communication socket: {}", - isPrimaryServerToClient ? "primary" : "secondary", socket); - try { - acceptor.getCacheClientNotifier().registerClient(socket, isPrimaryServerToClient, - acceptor.getAcceptorId(), acceptor.isNotifyBySubscription()); - } catch (IOException ex) { - closeSocket(socket); - if (isRunning()) { - if (!acceptor.loggedAcceptError) { - acceptor.loggedAcceptError = true; - if (ex instanceof SocketTimeoutException) { - logger.warn(LocalizedMessage.create( - LocalizedStrings.AcceptorImpl_CACHE_SERVER_FAILED_ACCEPTING_CLIENT_CONNECTION_DUE_TO_SOCKET_TIMEOUT)); - } else { - logger.warn(LocalizedMessage.create( - LocalizedStrings.AcceptorImpl_CACHE_SERVER_FAILED_ACCEPTING_CLIENT_CONNECTION__0, - ex), ex); - } - } - } - } - } - } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java index 74451e5..0e510af 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java @@ -189,9 +189,7 @@ public abstract class ServerConnection implements Runnable { */ private volatile int requestSpecificTimeout = -1; - /** - * Tracks the id of the most recent batch to which a reply has been sent - */ + /** Tracks the id of the most recent batch to which a reply has been sent */ private int latestBatchIdReplied = -1; /* @@ -713,9 +711,8 @@ public abstract class ServerConnection implements Runnable { // can be used. initializeCommands(); // its initialized in verifyClientConnection call - if (!getCommunicationMode().isWAN()) { + if (!getCommunicationMode().isWAN()) initializeClientUserAuths(); - } } if (TEST_VERSION_AFTER_HANDSHAKE_FLAG) { Assert.assertTrue((this.handshake.getVersion().ordinal() == testVersionAfterHandshake), @@ -895,9 +892,7 @@ public abstract class ServerConnection implements Runnable { } } if (unregisterClient)// last serverconnection call all close on auth objects - { cleanClientAuths(); - } this.clientUserAuths = null; if (needsUnregister) { this.acceptor.getClientHealthMonitor().removeConnection(this.proxyId, this); @@ -922,9 +917,8 @@ public abstract class ServerConnection implements Runnable { ClientUserAuths cua = new ClientUserAuths(proxyId.hashCode()); ClientUserAuths retCua = proxyIdVsClientUserAuths.putIfAbsent(proxyId, cua); - if (retCua == null) { + if (retCua == null) return cua; - } return retCua; } @@ -960,9 +954,8 @@ public abstract class ServerConnection implements Runnable { boolean removed = this.clientUserAuths.removeSubject(aIds.getUniqueId()); // if not successfull, try the old way - if (!removed) { + if (!removed) removed = this.clientUserAuths.removeUserId(aIds.getUniqueId(), keepalive); - } return removed; } catch (NullPointerException npe) { @@ -1017,6 +1010,7 @@ public abstract class ServerConnection implements Runnable { throw new AuthenticationFailedException("Authentication failed"); } + byte[] credBytes = msg.getPart(0).getSerializedForm(); credBytes = ((HandShake) this.handshake).decryptBytes(credBytes); @@ -1130,7 +1124,6 @@ public abstract class ServerConnection implements Runnable { public void run() { setOwner(); - if (getAcceptor().isSelector()) { boolean finishedMsg = false; try { @@ -1143,7 +1136,9 @@ public abstract class ServerConnection implements Runnable { finishedMsg = true; } } - } catch (java.nio.channels.ClosedChannelException | CancelException ignore) { + } catch (java.nio.channels.ClosedChannelException ignore) { + // ok shutting down + } catch (CancelException e) { // ok shutting down } catch (IOException ex) { logger.warn( @@ -1192,7 +1187,6 @@ public abstract class ServerConnection implements Runnable { * If registered with a selector then this will be the key we are registered with. */ // private SelectionKey sKey = null; - /** * Register this connection with the given selector for read events. Note that switch the channel * to non-blocking so it can be in a selector. @@ -1208,8 +1202,7 @@ public abstract class ServerConnection implements Runnable { } public void registerWithSelector2(Selector s) throws IOException { - /* this.sKey = */ - getSelectableChannel().register(s, SelectionKey.OP_READ, this); + /* this.sKey = */getSelectableChannel().register(s, SelectionKey.OP_READ, this); } /** @@ -1232,6 +1225,7 @@ public abstract class ServerConnection implements Runnable { } /** + * * @return String representing the DistributedSystemMembership of the Client VM */ public String getMembershipID() { @@ -1271,11 +1265,10 @@ public abstract class ServerConnection implements Runnable { } protected int getClientReadTimeout() { - if (this.requestSpecificTimeout == -1) { + if (this.requestSpecificTimeout == -1) return this.handshake.getClientReadTimeout(); - } else { + else return this.requestSpecificTimeout; - } } protected boolean isProcessingMessage() { @@ -1526,9 +1519,7 @@ public abstract class ServerConnection implements Runnable { return this.name; } - /** - * returns the name of this connection - */ + /** returns the name of this connection */ public String getName() { return this.name; } @@ -1745,13 +1736,11 @@ public abstract class ServerConnection implements Runnable { // for backward client it will be store in member variable userAuthId // for other look "requestMsg" here and get unique-id from this to get the authzrequest - if (!AcceptorImpl.isAuthenticationRequired()) { + if (!AcceptorImpl.isAuthenticationRequired()) return null; - } - if (AcceptorImpl.isIntegratedSecurity()) { + if (AcceptorImpl.isIntegratedSecurity()) return null; - } long uniqueId = getUniqueId(); @@ -1779,13 +1768,11 @@ public abstract class ServerConnection implements Runnable { public AuthorizeRequestPP getPostAuthzRequest() throws AuthenticationRequiredException, IOException { - if (!AcceptorImpl.isAuthenticationRequired()) { + if (!AcceptorImpl.isAuthenticationRequired()) return null; - } - if (AcceptorImpl.isIntegratedSecurity()) { + if (AcceptorImpl.isIntegratedSecurity()) return null; - } // look client version and return authzrequest // for backward client it will be store in member variable userAuthId @@ -1812,9 +1799,7 @@ public abstract class ServerConnection implements Runnable { return postAuthReq; } - /** - * returns the member ID byte array to be used for creating EventID objects - */ + /** returns the member ID byte array to be used for creating EventID objects */ public byte[] getEventMemberIDByteArray() { return this.memberIdByteArray; } diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplClientQueueDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplClientQueueDUnitTest.java deleted file mode 100644 index c0b2d07..0000000 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplClientQueueDUnitTest.java +++ /dev/null @@ -1,263 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.geode.internal.cache.tier.sockets; - -import static org.junit.Assert.assertTrue; - -import java.io.File; -import java.io.IOException; -import java.io.Serializable; -import java.rmi.RemoteException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import org.awaitility.Awaitility; -import org.junit.After; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import org.apache.geode.cache.EntryEvent; -import org.apache.geode.cache.InterestPolicy; -import org.apache.geode.cache.InterestResultPolicy; -import org.apache.geode.cache.Region; -import org.apache.geode.cache.RegionShortcut; -import org.apache.geode.cache.SubscriptionAttributes; -import org.apache.geode.cache.client.ClientCache; -import org.apache.geode.cache.client.ClientCacheFactory; -import org.apache.geode.cache.client.ClientRegionFactory; -import org.apache.geode.cache.client.ClientRegionShortcut; -import org.apache.geode.cache.server.CacheServer; -import org.apache.geode.cache.server.ClientSubscriptionConfig; -import org.apache.geode.cache.util.CacheListenerAdapter; -import org.apache.geode.distributed.DistributedLockBlackboard; -import org.apache.geode.distributed.DistributedLockBlackboardImpl; -import org.apache.geode.internal.cache.DiskStoreAttributes; -import org.apache.geode.internal.cache.InitialImageOperation; -import org.apache.geode.internal.cache.InternalCache; -import org.apache.geode.test.dunit.AsyncInvocation; -import org.apache.geode.test.dunit.Host; -import org.apache.geode.test.dunit.VM; -import org.apache.geode.test.dunit.rules.CacheRule; -import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties; -import org.apache.geode.test.dunit.rules.DistributedTestRule; -import org.apache.geode.test.dunit.rules.SharedCountersRule; -import org.apache.geode.test.junit.categories.DistributedTest; -import org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder; -import org.apache.geode.test.junit.rules.serializable.SerializableTestName; - -@Category(DistributedTest.class) -public class AcceptorImplClientQueueDUnitTest implements Serializable { - private final Host host = Host.getHost(0); - private static final int numberOfEntries = 200; - private static final AtomicInteger eventCount = new AtomicInteger(0); - private static final AtomicBoolean completedClient2 = new AtomicBoolean(false); - - @ClassRule - public static DistributedTestRule distributedTestRule = new DistributedTestRule(); - - @Rule - public CacheRule cacheRule = - CacheRule.builder().createCacheIn(host.getVM(0)).createCacheIn(host.getVM(1)) - .addSystemProperty("BridgeServer.HANDSHAKE_POOL_SIZE", "1").build(); - - @Rule - public SerializableTestName name = new SerializableTestName(); - - @Rule - public SerializableTemporaryFolder tempDir = new SerializableTemporaryFolder(); - - @Rule - public DistributedRestoreSystemProperties restoreSystemProperties = - new DistributedRestoreSystemProperties(); - - private DistributedLockBlackboard blackboard = null; - - @Before - public void setup() throws Exception { - blackboard = DistributedLockBlackboardImpl.getInstance(); - } - - @After - public void tearDown() throws RemoteException { - blackboard.initCount(); - host.getAllVMs().forEach((vm) -> vm.invoke(() -> { - InitialImageOperation.slowImageProcessing = 0; - System.getProperties().remove("BridgeServer.HANDSHAKE_POOL_SIZE"); - })); - } - - @Test - public void testClientSubscriptionQueueBlockingConnectionInitialization() throws Exception { - VM vm0 = host.getVM(0); - VM vm1 = host.getVM(1); - VM vm2 = host.getVM(2); - VM vm3 = host.getVM(3); - int vm0_port = vm0.invoke("Start server with subscription turned on", () -> { - try { - return createSubscriptionServer(cacheRule.getCache()); - } catch (IOException e) { - return 0; - } - }); - - vm2.invoke("Start Client1 with durable interest registration turned on", () -> { - ClientCacheFactory clientCacheFactory = new ClientCacheFactory(); - clientCacheFactory.setPoolSubscriptionEnabled(true); - clientCacheFactory.setPoolSubscriptionRedundancy(1); - clientCacheFactory.setPoolReadTimeout(200); - clientCacheFactory.addPoolServer(host.getHostName(), vm0_port); - ClientCache cache = clientCacheFactory.set("durable-client-id", "1") - .set("durable-client-timeout", "300").set("mcast-port", "0").create(); - ClientRegionFactory<Object, Object> clientRegionFactory = - cache.createClientRegionFactory(ClientRegionShortcut.PROXY); - Region region = clientRegionFactory.create("subscriptionRegion"); - - region.registerInterestRegex(".*", InterestResultPolicy.NONE, true); - cache.readyForEvents(); - cache.close(true); - }); - vm3.invoke("Start Client2 to add entries to region", () -> { - ClientCacheFactory clientCacheFactory = new ClientCacheFactory(); - clientCacheFactory.addPoolServer(host.getHostName(), vm0_port); - ClientCache cache = clientCacheFactory.set("mcast-port", "0").create(); - ClientRegionFactory<Object, Object> clientRegionFactory = - cache.createClientRegionFactory(ClientRegionShortcut.PROXY); - Region region = clientRegionFactory.create("subscriptionRegion"); - - for (int i = 0; i < numberOfEntries; i++) { - region.put(i, i); - } - cache.close(); - }); - - int vm1_port = vm1.invoke("Start server2 in with subscriptions turned on", () -> { - try { - int serverPort = createSubscriptionServer(cacheRule.getCache()); - InitialImageOperation.slowImageProcessing = 30; - return serverPort; - } catch (IOException e) { - return 0; - } - }); - - vm0.invoke("Turn on slow image processsing", () -> { - InitialImageOperation.slowImageProcessing = 30; - }); - - AsyncInvocation<Boolean> completedClient1 = - vm2.invokeAsync("Start Client1, expecting durable messages to be delivered", () -> { - - ClientCacheFactory clientCacheFactory = new ClientCacheFactory(); - clientCacheFactory.setPoolSubscriptionEnabled(true); - clientCacheFactory.setPoolSubscriptionRedundancy(1); - clientCacheFactory.setPoolMinConnections(1); - clientCacheFactory.setPoolMaxConnections(1); - clientCacheFactory.setPoolReadTimeout(200); - clientCacheFactory.addPoolServer(host.getHostName(), vm1_port); - ClientCacheFactory cacheFactory = clientCacheFactory.set("durable-client-id", "1") - .set("durable-client-timeout", "300").set("mcast-port", "0"); - blackboard.incCount(); - ClientCache cache = cacheFactory.create(); - - ClientRegionFactory<Object, Object> clientRegionFactory = - cache.createClientRegionFactory(ClientRegionShortcut.PROXY); - Region region = clientRegionFactory.addCacheListener(new CacheListenerAdapter() { - @Override - public void afterCreate(EntryEvent event) { - eventCount.incrementAndGet(); - } - - @Override - public void afterUpdate(EntryEvent event) { - eventCount.incrementAndGet(); - } - }).create("subscriptionRegion"); - - region.registerInterestRegex(".*", InterestResultPolicy.NONE, true); - cache.readyForEvents(); - Awaitility.await().atMost(10, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS) - .until(() -> eventCount.get() == numberOfEntries); - cache.close(); - return eventCount.get() == numberOfEntries; - }); - - vm3.invokeAsync("Start Client2 to add entries to region", () -> { - while (true) { - Thread.sleep(100); - if (blackboard.getCount() == 1) { - break; - } - } - ClientCache cache = null; - ClientCacheFactory clientCacheFactory = new ClientCacheFactory(); - clientCacheFactory.setPoolRetryAttempts(0); - clientCacheFactory.setPoolMinConnections(1); - clientCacheFactory.setPoolMaxConnections(1); - clientCacheFactory.setPoolReadTimeout(200); - clientCacheFactory.setPoolSocketConnectTimeout(500); - clientCacheFactory.addPoolServer(host.getHostName(), vm1_port); - cache = clientCacheFactory.set("mcast-port", "0").create(); - ClientRegionFactory<Object, Object> clientRegionFactory = - cache.createClientRegionFactory(ClientRegionShortcut.PROXY); - Region region = clientRegionFactory.create("subscriptionRegion"); - - int returnValue = 0; - for (int i = 0; i < 100; i++) { - returnValue = (int) region.get(i); - } - cache.close(); - completedClient2.set(returnValue == 99); - }); - assertTrue(completedClient1.get()); - assertTrue(vm3.invoke(() -> completedClient2.get())); - } - - private int createSubscriptionServer(InternalCache cache) throws IOException { - initializeDiskStore(cache); - initializeReplicateRegion(cache); - return initializeCacheServerWithSubscription(host, cache); - } - - private void initializeDiskStore(InternalCache cache) throws IOException { - DiskStoreAttributes diskStoreAttributes = new DiskStoreAttributes(); - diskStoreAttributes.name = "clientQueueDS"; - diskStoreAttributes.diskDirs = new File[] {tempDir.newFolder(name + "_dir")}; - cache.createDiskStoreFactory(diskStoreAttributes).create("clientQueueDS"); - } - - private void initializeReplicateRegion(InternalCache cache) { - cache.createRegionFactory(RegionShortcut.REPLICATE).setStatisticsEnabled(true) - .setSubscriptionAttributes(new SubscriptionAttributes(InterestPolicy.ALL)) - .create("subscriptionRegion"); - } - - private int initializeCacheServerWithSubscription(Host host, InternalCache cache) - throws IOException { - CacheServer cacheServer1 = cache.addCacheServer(false); - ClientSubscriptionConfig clientSubscriptionConfig = cacheServer1.getClientSubscriptionConfig(); - clientSubscriptionConfig.setEvictionPolicy("entry"); - clientSubscriptionConfig.setCapacity(5); - clientSubscriptionConfig.setDiskStoreName("clientQueueDS"); - cacheServer1.setPort(0); - cacheServer1.setHostnameForClients(host.getHostName()); - cacheServer1.start(); - return cacheServer1.getPort(); - } -} diff --git a/geode-core/src/test/java/org/apache/geode/test/dunit/rules/CacheRule.java b/geode-core/src/test/java/org/apache/geode/test/dunit/rules/CacheRule.java index b65bf86..dc42da8 100644 --- a/geode-core/src/test/java/org/apache/geode/test/dunit/rules/CacheRule.java +++ b/geode-core/src/test/java/org/apache/geode/test/dunit/rules/CacheRule.java @@ -63,7 +63,6 @@ public class CacheRule extends DistributedExternalResource { private final boolean disconnectAfter; private final List<VM> createCacheInVMs; private final Properties config; - private final Properties systemProperties; public static Builder builder() { return new Builder(); @@ -75,19 +74,18 @@ public class CacheRule extends DistributedExternalResource { this.disconnectAfter = builder.disconnectAfter; this.createCacheInVMs = builder.createCacheInVMs; this.config = builder.config; - this.systemProperties = builder.systemProperties; } @Override protected void before() { if (createCacheInAll) { - invoker().invokeInEveryVMAndController(() -> createCache(config, systemProperties)); + invoker().invokeInEveryVMAndController(() -> createCache(config)); } else { if (createCache) { - createCache(config, systemProperties); + createCache(config); } for (VM vm : createCacheInVMs) { - vm.invoke(() -> createCache(config, systemProperties)); + vm.invoke(() -> createCache(config)); } } } @@ -110,8 +108,7 @@ public class CacheRule extends DistributedExternalResource { return cache.getInternalDistributedSystem(); } - private static void createCache(final Properties config, final Properties systemProperties) { - System.getProperties().putAll(systemProperties); + private static void createCache(final Properties config) { cache = (InternalCache) new CacheFactory(config).create(); } @@ -144,7 +141,6 @@ public class CacheRule extends DistributedExternalResource { private boolean disconnectAfter; private List<VM> createCacheInVMs = new ArrayList<>(); private Properties config = new Properties(); - private Properties systemProperties = new Properties(); public Builder() { config.setProperty(LOCATORS, getLocators()); @@ -199,16 +195,6 @@ public class CacheRule extends DistributedExternalResource { return this; } - public Builder addSystemProperty(final String key, final String value) { - this.systemProperties.put(key, value); - return this; - } - - public Builder addSystemProperties(final Properties config) { - this.systemProperties.putAll(config); - return this; - } - public CacheRule build() { return new CacheRule(this); } -- To stop receiving notification emails like this one, please contact "[email protected]" <[email protected]>.
