This is an automated email from the ASF dual-hosted git repository. mivanac pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push: new b021b12 Feature/geode 6536 2: Added retry in borrowConnection/single hop (#4833) b021b12 is described below commit b021b127867d1e01e09fac26de10d29f896ed7dc Author: Mario Ivanac <48509724+miva...@users.noreply.github.com> AuthorDate: Mon Mar 30 21:54:35 2020 +0200 Feature/geode 6536 2: Added retry in borrowConnection/single hop (#4833) * GEODE-6536: Added retry in borrowConnection/single hop * GEODE-6536: bug fix * GEODE-6536: update after comments * GEODE-6536: modify borrowConnection singleHop solution * GEODE-6536: test update * GEODE-6536: updated tests, and added parameter to desable timeout * GEODE-6536: update of cachexml impacts * GEODE-6536: remove cachexml restriction * GEODE-6536: update default value and documentation * GEODE-6536_2: change exception type * GEODE-6536_2: seize new connection only in case onlyUseExistingCnx=false --- .../apache/geode/cache30/CacheXml66DUnitTest.java | 5 +- .../cache/tier/sockets/AcceptorImplDUnitTest.java | 1 + .../cache/ConnectionPoolFactoryJUnitTest.java | 16 +++ .../AutoConnectionSourceImplJUnitTest.java | 5 + .../internal/ConnectionPoolImplJUnitTest.java | 24 ++++ .../client/internal/QueueManagerJUnitTest.java | 5 + .../pooling/ConnectionManagerImplTest.java | 45 ++++--- .../pooling/ConnectionManagerJUnitTest.java | 129 ++++++++++++++++++++- .../codeAnalysis/sanctionedDataSerializables.txt | 6 +- .../geode/cache/client/ClientCacheFactory.java | 20 ++++ .../java/org/apache/geode/cache/client/Pool.java | 7 ++ .../org/apache/geode/cache/client/PoolFactory.java | 27 +++++ .../cache/client/internal/OpExecutorImpl.java | 7 +- .../geode/cache/client/internal/PoolImpl.java | 22 +++- .../client/internal/pooling/ConnectionManager.java | 5 +- .../internal/pooling/ConnectionManagerImpl.java | 61 ++++++---- .../apache/geode/cache/configuration/PoolType.java | 25 ++++ .../geode/internal/cache/PoolFactoryImpl.java | 23 +++- .../geode/internal/cache/xmlcache/CacheXml.java | 1 + .../internal/cache/xmlcache/CacheXmlGenerator.java | 4 + .../internal/cache/xmlcache/CacheXmlParser.java | 4 + .../geode.apache.org/schema/cache/cache-1.0.xsd | 1 + .../schema.pivotal.io/gemfire/cache/cache-8.1.xsd | 1 + .../org/apache/geode/cache/doc-files/cache7_0.dtd | 1 + .../org/apache/geode/cache/doc-files/cache8_0.dtd | 1 + .../sanctioned-geode-core-serializables.txt | 2 +- .../client/internal/OpExecutorImplJUnitTest.java | 25 ++-- .../cache/client/internal/TXFailoverOpTest.java | 2 + geode-docs/reference/topics/cache_xml.html.md.erb | 7 +- .../reference/topics/client-cache.html.md.erb | 7 +- .../cache/tier/sockets/CacheServerTestUtil.java | 1 + 31 files changed, 423 insertions(+), 67 deletions(-) diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache30/CacheXml66DUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/cache30/CacheXml66DUnitTest.java index f6b5d31..489bc36 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/cache30/CacheXml66DUnitTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/cache30/CacheXml66DUnitTest.java @@ -197,6 +197,7 @@ public abstract class CacheXml66DUnitTest extends CacheXmlTestCase { assertEquals(0, cp.getServers().size()); assertEquals(createINSA(ALIAS2, 3777), cp.getLocators().get(0)); assertEquals(PoolFactory.DEFAULT_FREE_CONNECTION_TIMEOUT, cp.getFreeConnectionTimeout()); + assertEquals(PoolFactory.DEFAULT_SERVER_CONNECTION_TIMEOUT, cp.getServerConnectionTimeout()); assertEquals(PoolFactory.DEFAULT_LOAD_CONDITIONING_INTERVAL, cp.getLoadConditioningInterval()); assertEquals(PoolFactory.DEFAULT_SOCKET_BUFFER_SIZE, cp.getSocketBufferSize()); assertEquals(PoolFactory.DEFAULT_THREAD_LOCAL_CONNECTIONS, cp.getThreadLocalConnections()); @@ -265,7 +266,8 @@ public abstract class CacheXml66DUnitTest extends CacheXmlTestCase { CacheCreation cache = new CacheCreation(); PoolFactory f = cache.createPoolFactory(); f.addServer(ALIAS2, 3777).addServer(ALIAS1, 3888); - f.setFreeConnectionTimeout(12345).setLoadConditioningInterval(12345).setSocketBufferSize(12345) + f.setFreeConnectionTimeout(12345).setServerConnectionTimeout(111) + .setLoadConditioningInterval(12345).setSocketBufferSize(12345) .setThreadLocalConnections(true).setPRSingleHopEnabled(true).setReadTimeout(12345) .setMinConnections(12346).setMaxConnections(12347).setRetryAttempts(12348) .setIdleTimeout(12349).setPingInterval(12350).setStatisticInterval(12351) @@ -293,6 +295,7 @@ public abstract class CacheXml66DUnitTest extends CacheXmlTestCase { assertEquals(createINSA(ALIAS2, 3777), cp.getServers().get(0)); assertEquals(createINSA(ALIAS1, 3888), cp.getServers().get(1)); assertEquals(12345, cp.getFreeConnectionTimeout()); + assertEquals(111, cp.getServerConnectionTimeout()); assertEquals(12345, cp.getLoadConditioningInterval()); assertEquals(12345, cp.getSocketBufferSize()); assertEquals(true, cp.getThreadLocalConnections()); diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplDUnitTest.java index 8ba6d25..efd44f4 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplDUnitTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplDUnitTest.java @@ -172,6 +172,7 @@ public class AcceptorImplDUnitTest extends JUnit4DistributedTestCase { clientCacheFactory.setPoolRetryAttempts(1); clientCacheFactory.setPoolMaxConnections(1); clientCacheFactory.setPoolFreeConnectionTimeout(1000); + clientCacheFactory.setPoolServerConnectionTimeout(1000); ClientCache clientCache = clientCacheFactory.create(); Region<Object, Object> clientRegion1 = clientCache.createClientRegionFactory(ClientRegionShortcut.PROXY).create("region1"); diff --git a/geode-core/src/integrationTest/java/org/apache/geode/cache/ConnectionPoolFactoryJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/cache/ConnectionPoolFactoryJUnitTest.java index 980ea91..3d0720a 100755 --- a/geode-core/src/integrationTest/java/org/apache/geode/cache/ConnectionPoolFactoryJUnitTest.java +++ b/geode-core/src/integrationTest/java/org/apache/geode/cache/ConnectionPoolFactoryJUnitTest.java @@ -129,6 +129,8 @@ public class ConnectionPoolFactoryJUnitTest { // now add a source and try defaults again assertEquals(PoolFactory.DEFAULT_FREE_CONNECTION_TIMEOUT, defaultAttr.getFreeConnectionTimeout()); + assertEquals(PoolFactory.DEFAULT_SERVER_CONNECTION_TIMEOUT, + defaultAttr.getServerConnectionTimeout()); assertEquals(PoolFactory.DEFAULT_THREAD_LOCAL_CONNECTIONS, defaultAttr.getThreadLocalConnections()); assertEquals(PoolFactory.DEFAULT_READ_TIMEOUT, defaultAttr.getReadTimeout()); @@ -147,6 +149,7 @@ public class ConnectionPoolFactoryJUnitTest { */ int connectionTimeout = -1; + int serverConnectionTimeout = -2; int connectionLifetime = -2; boolean threadLocalConnections = false; int readTimeout = -1; @@ -167,6 +170,14 @@ public class ConnectionPoolFactoryJUnitTest { } catch (IllegalArgumentException iae) { // this is what we want } + + try { + cpf.setServerConnectionTimeout(serverConnectionTimeout); + assertTrue("This should have failed with IllegalArgumentException", false); + } catch (IllegalArgumentException iae) { + // this is what we want + } + try { cpf.setLoadConditioningInterval(connectionLifetime); assertTrue("This should have failed with IllegalArgumentException", false); @@ -254,6 +265,8 @@ public class ConnectionPoolFactoryJUnitTest { assertEquals("Attribute should match default, but doesn't", defaultAttr.getFreeConnectionTimeout(), PoolFactory.DEFAULT_FREE_CONNECTION_TIMEOUT); assertEquals("Attribute should match default, but doesn't", + defaultAttr.getServerConnectionTimeout(), PoolFactory.DEFAULT_SERVER_CONNECTION_TIMEOUT); + assertEquals("Attribute should match default, but doesn't", defaultAttr.getLoadConditioningInterval(), PoolFactory.DEFAULT_LOAD_CONDITIONING_INTERVAL); assertEquals("Attribute should match default, but doesn't", defaultAttr.getThreadLocalConnections(), PoolFactory.DEFAULT_THREAD_LOCAL_CONNECTIONS); @@ -275,6 +288,7 @@ public class ConnectionPoolFactoryJUnitTest { /* Lets do a legitimate one now */ connectionTimeout = 30; + serverConnectionTimeout = 0; connectionLifetime = -1; threadLocalConnections = true; readTimeout = 3; @@ -290,6 +304,7 @@ public class ConnectionPoolFactoryJUnitTest { cpf.setFreeConnectionTimeout(connectionTimeout); + cpf.setServerConnectionTimeout(serverConnectionTimeout); cpf.setLoadConditioningInterval(connectionLifetime); cpf.setThreadLocalConnections(threadLocalConnections); cpf.setReadTimeout(readTimeout); @@ -308,6 +323,7 @@ public class ConnectionPoolFactoryJUnitTest { try { assertEquals(connectionTimeout, cpa.getFreeConnectionTimeout()); + assertEquals(serverConnectionTimeout, cpa.getServerConnectionTimeout()); assertEquals(connectionLifetime, cpa.getLoadConditioningInterval()); assertEquals(threadLocalConnections, cpa.getThreadLocalConnections()); assertEquals(true, cpa.getSubscriptionEnabled()); diff --git a/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/AutoConnectionSourceImplJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/AutoConnectionSourceImplJUnitTest.java index 91218a6..5e783cb 100644 --- a/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/AutoConnectionSourceImplJUnitTest.java +++ b/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/AutoConnectionSourceImplJUnitTest.java @@ -496,6 +496,11 @@ public class AutoConnectionSourceImplJUnitTest { } @Override + public int getServerConnectionTimeout() { + return 0; + } + + @Override public int getLoadConditioningInterval() { return 0; } diff --git a/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/ConnectionPoolImplJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/ConnectionPoolImplJUnitTest.java index 491c049..f08206d 100644 --- a/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/ConnectionPoolImplJUnitTest.java +++ b/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/ConnectionPoolImplJUnitTest.java @@ -102,6 +102,7 @@ public class ConnectionPoolImplJUnitTest { // check defaults assertEquals(PoolFactory.DEFAULT_SOCKET_CONNECT_TIMEOUT, pool.getSocketConnectTimeout()); assertEquals(PoolFactory.DEFAULT_FREE_CONNECTION_TIMEOUT, pool.getFreeConnectionTimeout()); + assertEquals(PoolFactory.DEFAULT_SERVER_CONNECTION_TIMEOUT, pool.getServerConnectionTimeout()); assertEquals(PoolFactory.DEFAULT_SOCKET_BUFFER_SIZE, pool.getSocketBufferSize()); assertEquals(PoolFactory.DEFAULT_READ_TIMEOUT, pool.getReadTimeout()); assertEquals(PoolFactory.DEFAULT_MIN_CONNECTIONS, pool.getMinConnections()); @@ -252,4 +253,27 @@ public class ConnectionPoolImplJUnitTest { assertEquals(location1, pool.executeOnPrimary(testOp)); assertEquals(location1, pool.executeOnQueuesAndReturnPrimaryResult(testOp)); } + + @Test + public void testCalculateRetryFromThrownException() throws Exception { + int readTimeout = 234234; + int socketTimeout = 123123; + int port1 = 10000; + int retryAttempts = 0; + + PoolFactory cpf = PoolManager.createFactory(); + cpf.addServer("localhost", port).setSocketConnectTimeout(socketTimeout) + .setReadTimeout(readTimeout).setThreadLocalConnections(true); + + ServerLocation location1 = new ServerLocation("fakehost", port1); + + PoolImpl pool = (PoolImpl) cpf.create("testpool"); + try { + pool.acquireConnection(location1); + fail("expected a ServerConnectivityException"); + } catch (Exception e) { + retryAttempts = pool.calculateRetryAttempts(e); + } + assertEquals(1, retryAttempts); + } } diff --git a/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/QueueManagerJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/QueueManagerJUnitTest.java index 2134642..1653081 100644 --- a/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/QueueManagerJUnitTest.java +++ b/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/QueueManagerJUnitTest.java @@ -399,6 +399,11 @@ public class QueueManagerJUnitTest { } @Override + public int getServerConnectionTimeout() { + return 0; + } + + @Override public int getLoadConditioningInterval() { return 0; } diff --git a/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImplTest.java b/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImplTest.java index 542a8fe..a72a5b5 100644 --- a/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImplTest.java +++ b/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImplTest.java @@ -90,11 +90,20 @@ public class ConnectionManagerImplTest { } @Test - public void borrowConnectionThrowsWhenUsingExistingConnectionsAndNoConnectionsExist() { + public void borrowConnectionThrowsWhenUsingExistingConnectionsAndNoFreeConnectionsExist() { ServerLocation serverLocation = mock(ServerLocation.class); + Connection connection = mock(Connection.class); - connectionManager = createDefaultConnectionManager(); - assertThatThrownBy(() -> connectionManager.borrowConnection(serverLocation, true)) + when(connectionFactory.createClientToServerConnection(any())).thenReturn(connection); + + connectionManager = new ConnectionManagerImpl(poolName, connectionFactory, endpointManager, 1, + 0, idleTimeout, lifetimeTimeout, securityLogger, pingInterval, cancelCriterion, + poolStats); + connectionManager.start(backgroundProcessor); + + assertThat(connectionManager.borrowConnection(timeout)).isInstanceOf(PooledConnection.class); + + assertThatThrownBy(() -> connectionManager.borrowConnection(serverLocation, timeout, true)) .isInstanceOf(AllConnectionsInUseException.class); connectionManager.close(false); @@ -110,7 +119,7 @@ public class ConnectionManagerImplTest { connectionManager = createDefaultConnectionManager(); connectionManager.start(backgroundProcessor); - assertThat(connectionManager.borrowConnection(serverLocation, false)) + assertThat(connectionManager.borrowConnection(serverLocation, timeout, false)) .isInstanceOf(PooledConnection.class); assertThat(connectionManager.getConnectionCount()).isEqualTo(1); @@ -266,9 +275,9 @@ public class ConnectionManagerImplTest { cancelCriterion, poolStats); connectionManager.start(backgroundProcessor); - connectionManager.borrowConnection(serverLocation1, false); - connectionManager.borrowConnection(serverLocation2, false); - connectionManager.borrowConnection(serverLocation3, false); + connectionManager.borrowConnection(serverLocation1, timeout, false); + connectionManager.borrowConnection(serverLocation2, timeout, false); + connectionManager.borrowConnection(serverLocation3, timeout, false); assertThat(connectionManager.getConnectionCount()).isGreaterThan(maxConnections); @@ -295,9 +304,9 @@ public class ConnectionManagerImplTest { connectionManager = createDefaultConnectionManager(); connectionManager.start(backgroundProcessor); Connection heldConnection1 = - connectionManager.borrowConnection(serverLocation1, false); + connectionManager.borrowConnection(serverLocation1, timeout, false); Connection heldConnection2 = - connectionManager.borrowConnection(serverLocation2, false); + connectionManager.borrowConnection(serverLocation2, timeout, false); assertThat(connectionManager.getConnectionCount()).isEqualTo(2); connectionManager.returnConnection(heldConnection1, true); @@ -352,11 +361,11 @@ public class ConnectionManagerImplTest { connectionManager.start(backgroundProcessor); Connection heldConnection1 = - connectionManager.borrowConnection(serverLocation1, false); + connectionManager.borrowConnection(serverLocation1, timeout, false); Connection heldConnection2 = - connectionManager.borrowConnection(serverLocation2, false); + connectionManager.borrowConnection(serverLocation2, timeout, false); Connection heldConnection3 = - connectionManager.borrowConnection(serverLocation3, false); + connectionManager.borrowConnection(serverLocation3, timeout, false); assertThat(connectionManager.getConnectionCount()).isGreaterThan(maxConnections); @@ -391,7 +400,7 @@ public class ConnectionManagerImplTest { connectionManager = createDefaultConnectionManager(); connectionManager.start(backgroundProcessor); - Connection heldConnection = connectionManager.borrowConnection(serverLocation1, false); + Connection heldConnection = connectionManager.borrowConnection(serverLocation1, timeout, false); heldConnection = connectionManager.exchangeConnection(heldConnection, excluded); assertThat(heldConnection.getServer()).isEqualTo(connection2.getServer()); @@ -435,9 +444,9 @@ public class ConnectionManagerImplTest { cancelCriterion, poolStats); connectionManager.start(backgroundProcessor); - Connection heldConnection = connectionManager.borrowConnection(serverLocation1, false); - connectionManager.borrowConnection(serverLocation2, false); - connectionManager.borrowConnection(serverLocation3, false); + Connection heldConnection = connectionManager.borrowConnection(serverLocation1, timeout, false); + connectionManager.borrowConnection(serverLocation2, timeout, false); + connectionManager.borrowConnection(serverLocation3, timeout, false); assertThat(connectionManager.getConnectionCount()).isGreaterThan(maxConnections); heldConnection = connectionManager.exchangeConnection(heldConnection, excluded); @@ -470,9 +479,9 @@ public class ConnectionManagerImplTest { connectionManager.start(backgroundProcessor); Connection heldConnection1 = - connectionManager.borrowConnection(serverLocation1, false); + connectionManager.borrowConnection(serverLocation1, timeout, false); Connection heldConnection2 = - connectionManager.borrowConnection(serverLocation2, false); + connectionManager.borrowConnection(serverLocation2, timeout, false); connectionManager.returnConnection(heldConnection2); heldConnection2 = connectionManager.exchangeConnection(heldConnection1, excluded); diff --git a/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerJUnitTest.java index 4f20d91..4e00589 100644 --- a/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerJUnitTest.java +++ b/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerJUnitTest.java @@ -91,6 +91,8 @@ public class ConnectionManagerJUnitTest { Properties properties = new Properties(); properties.put(MCAST_PORT, "0"); properties.put(LOCATORS, ""); + + ds = DistributedSystem.connect(properties); background = Executors.newSingleThreadScheduledExecutor(); poolStats = new PoolStats(ds, "connectionManagerJUnitTest"); @@ -341,9 +343,9 @@ public class ConnectionManagerJUnitTest { // Ok, now get some connections that fill our queue Connection ping1 = - manager.borrowConnection(new ServerLocation("localhost", 5), false); + manager.borrowConnection(new ServerLocation("localhost", 5), BORROW_TIMEOUT_MILLIS, false); Connection ping2 = - manager.borrowConnection(new ServerLocation("localhost", 5), false); + manager.borrowConnection(new ServerLocation("localhost", 5), BORROW_TIMEOUT_MILLIS, false); manager.returnConnection(ping1); manager.returnConnection(ping2); @@ -361,6 +363,127 @@ public class ConnectionManagerJUnitTest { elapsedMillis >= BORROW_TIMEOUT_MILLIS - ALLOWABLE_ERROR_IN_MILLIS); } + /* + * Test borrow connection toward specific server. Max connection is 5, and there are free + * connections in pool. + */ + @Test + public void test_borrow_connection_toward_specific_server_freeConnections() + throws InterruptedException, AllConnectionsInUseException, NoAvailableServersException { + final long idleTimeoutMillis = 300; + final long BORROW_TIMEOUT_MILLIS = 500; + manager = + new ConnectionManagerImpl("pool", factory, endpointManager, 5, 1, idleTimeoutMillis, -1, + logger, 60 * 1000, cancelCriterion, poolStats); + manager.start(background); + + await().until(() -> manager.getConnectionCount() == 1); + + // seize connection toward any server + Connection conn1 = manager.borrowConnection(BORROW_TIMEOUT_MILLIS); + Connection conn2 = manager.borrowConnection(BORROW_TIMEOUT_MILLIS); + + long startNanos = nowNanos(); + try { + manager.borrowConnection(new ServerLocation("localhost", 5), BORROW_TIMEOUT_MILLIS, false); + } catch (AllConnectionsInUseException e) { + fail("Didn't get connection"); + } + + } + + + /* + * Test borrow connection toward specific server. Max connection is 5, and there is no free + * connections in pool. + * After connection is returned to pool, since is not toward this specific server, wait for + * idleTimeoutMillis, + * so after expire (and pool size is reduced to 4) it can be seized. + */ + @Test + public void test_borrow_connection_toward_specific_server_no_freeConnection_wait_for_timeout() + throws InterruptedException, AllConnectionsInUseException, NoAvailableServersException { + final long idleTimeoutMillis = 300; + final long BORROW_TIMEOUT_MILLIS = 500; + manager = + new ConnectionManagerImpl("pool", factory, endpointManager, 5, 1, idleTimeoutMillis, -1, + logger, 60 * 1000, cancelCriterion, poolStats); + manager.start(background); + + await().until(() -> manager.getConnectionCount() == 1); + + // seize connection toward any server + Connection conn1 = manager.borrowConnection(BORROW_TIMEOUT_MILLIS); + Connection conn2 = manager.borrowConnection(BORROW_TIMEOUT_MILLIS); + + // Seize connection toward this specific server + Connection ping1 = + manager.borrowConnection(new ServerLocation("localhost", 5), BORROW_TIMEOUT_MILLIS, false); + Connection ping2 = + manager.borrowConnection(new ServerLocation("localhost", 5), BORROW_TIMEOUT_MILLIS, false); + Connection ping3 = + manager.borrowConnection(new ServerLocation("localhost", 5), BORROW_TIMEOUT_MILLIS, false); + + // Return some connections, let them idle expire + manager.returnConnection(conn1); + manager.returnConnection(conn2); + + long startNanos = nowNanos(); + try { + manager.borrowConnection(new ServerLocation("localhost", 5), BORROW_TIMEOUT_MILLIS, true); + fail("We should not get connection"); + } catch (AllConnectionsInUseException e) { + } + + long elapsedMillis = elapsedMillis(startNanos); + Assert.assertTrue("Elapsed = " + elapsedMillis, + elapsedMillis >= BORROW_TIMEOUT_MILLIS - ALLOWABLE_ERROR_IN_MILLIS); + } + + /* + * Test borrow connection toward specific server (use). Max connection is 5, and there is no free + * connections in pool. + * We are waiting for returnConnection for connection toward this specific server. + * After it is returned, we will reuse it. + */ + @Test + public void test_borrow_connection_toward_specific_server_no_freeConnection_wait_returnConnection_toward_this_server() + throws InterruptedException, AllConnectionsInUseException, NoAvailableServersException { + final long idleTimeoutMillis = 800; + final long BORROW_TIMEOUT_MILLIS = 500; + manager = + new ConnectionManagerImpl("pool", factory, endpointManager, 5, 1, idleTimeoutMillis, -1, + logger, 60 * 1000, cancelCriterion, poolStats); + manager.start(background); + + await().until(() -> manager.getConnectionCount() == 1); + + // seize connection toward any server + Connection conn1 = manager.borrowConnection(BORROW_TIMEOUT_MILLIS); + Connection conn2 = manager.borrowConnection(BORROW_TIMEOUT_MILLIS); + + // Seize connection toward this specific server + Connection ping1 = + manager.borrowConnection(new ServerLocation("localhost", 5), BORROW_TIMEOUT_MILLIS, false); + Connection ping2 = + manager.borrowConnection(new ServerLocation("localhost", 5), BORROW_TIMEOUT_MILLIS, false); + Connection ping3 = + manager.borrowConnection(new ServerLocation("localhost", 5), BORROW_TIMEOUT_MILLIS, false); + + // Return some connections, let them idle expire + manager.returnConnection(ping1); + manager.returnConnection(ping2); + + long startNanos = nowNanos(); + try { + manager.borrowConnection(new ServerLocation("localhost", 5), BORROW_TIMEOUT_MILLIS, true); + } catch (AllConnectionsInUseException e) { + fail("Didn't get connection"); + } + + } + + @Test public void testLifetimeExpiration() throws InterruptedException, AllConnectionsInUseException, NoAvailableServersException, Throwable { @@ -692,7 +815,7 @@ public class ConnectionManagerJUnitTest { // do nothing } - Connection conn3 = manager.borrowConnection(new ServerLocation("localhost", -2), false); + Connection conn3 = manager.borrowConnection(new ServerLocation("localhost", -2), 10, false); Assert.assertEquals(2, factory.creates); Assert.assertEquals(0, factory.destroys); Assert.assertEquals(0, factory.closes); diff --git a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt index 3076db7..f305269 100644 --- a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt +++ b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt @@ -1151,8 +1151,8 @@ fromData,207 toData,178 org/apache/geode/internal/cache/PoolFactoryImpl$PoolAttributes,2 -fromData,153 -toData,153 +fromData,161 +toData,161 org/apache/geode/internal/cache/PreferBytesCachedDeserializable,2 fromData,9 @@ -2093,4 +2093,4 @@ toData,105 org/apache/geode/pdx/internal/PdxType,2 fromData,109 -toData,124 \ No newline at end of file +toData,124 diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/ClientCacheFactory.java b/geode-core/src/main/java/org/apache/geode/cache/client/ClientCacheFactory.java index dc20edf..ffaced9 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/client/ClientCacheFactory.java +++ b/geode-core/src/main/java/org/apache/geode/cache/client/ClientCacheFactory.java @@ -323,6 +323,25 @@ public class ClientCacheFactory { } /** + * Sets the server connection timeout for this pool. If the pool has a max connections setting, + * operations will block if there is no free connections toward designated server. The server + * connection timeout + * specifies how long those operations will block waiting for a connection toward server before + * receiving an {@link AllConnectionsInUseException}. If max connections is not set this setting + * has no effect. + * + * @param connectionTimeout the connection timeout in milliseconds + * @return a reference to <code>this</code> + * @throws IllegalArgumentException if <code>connectionTimeout</code> is less than + * <code>0</code>. + * @see #setPoolMaxConnections(int) + */ + public ClientCacheFactory setPoolServerConnectionTimeout(int connectionTimeout) { + getPoolFactory().setServerConnectionTimeout(connectionTimeout); + return this; + } + + /** * Sets the load conditioning interval for this pool. This interval controls how frequently the * pool will check to see if a connection to a given server should be moved to a different server * to improve the load balance. @@ -415,6 +434,7 @@ public class ClientCacheFactory { * @throws IllegalArgumentException if <code>maxConnections</code> is less than * <code>minConnections</code>. * @see #setPoolFreeConnectionTimeout(int) + * @see #setPoolServerConnectionTimeout(int) */ public ClientCacheFactory setPoolMaxConnections(int maxConnections) { getPoolFactory().setMaxConnections(maxConnections); diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/Pool.java b/geode-core/src/main/java/org/apache/geode/cache/client/Pool.java index 01ec32d..de95d15 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/client/Pool.java +++ b/geode-core/src/main/java/org/apache/geode/cache/client/Pool.java @@ -75,6 +75,13 @@ public interface Pool { int getFreeConnectionTimeout(); /** + * Returns the server connection timeout of this pool. + * + * @see PoolFactory#setServerConnectionTimeout + */ + int getServerConnectionTimeout(); + + /** * Returns the load conditioning interval of this pool. * * @see PoolFactory#setLoadConditioningInterval diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/PoolFactory.java b/geode-core/src/main/java/org/apache/geode/cache/client/PoolFactory.java index a1e0e77..20d913e 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/client/PoolFactory.java +++ b/geode-core/src/main/java/org/apache/geode/cache/client/PoolFactory.java @@ -76,6 +76,14 @@ public interface PoolFactory { int DEFAULT_FREE_CONNECTION_TIMEOUT = 10000; /** + * The default amount of time, in milliseconds, which we will wait for a server connection if max + * connections is set and there is no free connections towards designated server. + * <p> + * Current value: <code>0</code>. + */ + int DEFAULT_SERVER_CONNECTION_TIMEOUT = 0; + + /** * The default interval in which the pool will check to see if a connection to a given server * should be moved to a different server to improve the load balance. * <p> @@ -248,6 +256,25 @@ public interface PoolFactory { */ PoolFactory setFreeConnectionTimeout(int connectionTimeout); + + /** + * Sets the server connection timeout for this pool. If the pool has a max connections setting, + * operations will block if there is no free connection towards specific server. The server + * connection timeout specifies how long those operations will block waiting for a free connection + * towards specific server before receiving an {@link AllConnectionsInUseException}. + * If max connections is not set this setting has no effect. + * It differs from "setFreeConnectionTimeout" which sets wait time for any server connection in + * the pool, + * where this sets wait time for a free connection to a specific server. + * + * @see #setMaxConnections(int) + * @param serverConnectionTimeout the connection timeout in milliseconds + * @return a reference to <code>this</code> + * @throws IllegalArgumentException if <code>serverConnectionTimeout</code> is less than + * <code>0</code>. + */ + PoolFactory setServerConnectionTimeout(int serverConnectionTimeout); + /** * Sets the load conditioning interval for this pool. This interval controls how frequently the * pool will check to see if a connection to a given server should be moved to a different server diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/OpExecutorImpl.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/OpExecutorImpl.java index a1e377e..c73bee0 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/OpExecutorImpl.java +++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/OpExecutorImpl.java @@ -78,6 +78,8 @@ public class OpExecutorImpl implements ExecutablePool { private final ConnectionManager connectionManager; private final int retryAttempts; private final long serverTimeout; + private final long singleServerTimeout; + private final EndpointManager endpointManager; private final RegisterInterestTracker riTracker; private final QueueManager queueManager; @@ -91,7 +93,7 @@ public class OpExecutorImpl implements ExecutablePool { public OpExecutorImpl(ConnectionManager connectionManager, QueueManager queueManager, EndpointManager endpointManager, RegisterInterestTracker riTracker, int retryAttempts, - long serverTimeout, CancelCriterion cancelCriterion, + long serverTimeout, long singleServerTimeout, CancelCriterion cancelCriterion, PoolImpl pool) { this.connectionManager = connectionManager; this.queueManager = queueManager; @@ -99,6 +101,7 @@ public class OpExecutorImpl implements ExecutablePool { this.riTracker = riTracker; this.retryAttempts = retryAttempts; this.serverTimeout = serverTimeout; + this.singleServerTimeout = singleServerTimeout; this.cancelCriterion = cancelCriterion; this.pool = pool; } @@ -323,7 +326,7 @@ public class OpExecutorImpl implements ExecutablePool { } } if (conn == null) { - conn = connectionManager.borrowConnection(p_server, onlyUseExistingCnx); + conn = connectionManager.borrowConnection(p_server, singleServerTimeout, onlyUseExistingCnx); } try { return executeWithPossibleReAuthentication(conn, op); diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/PoolImpl.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/PoolImpl.java index 3743657..5695f4e 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/PoolImpl.java +++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/PoolImpl.java @@ -102,6 +102,7 @@ public class PoolImpl implements InternalPool { private final String name; private final int socketConnectTimeout; private final int freeConnectionTimeout; + private final int serverConnectionTimeout; private final int loadConditioningInterval; private final int socketBufferSize; @Deprecated @@ -205,6 +206,7 @@ public class PoolImpl implements InternalPool { socketConnectTimeout = attributes.getSocketConnectTimeout(); freeConnectionTimeout = attributes.getFreeConnectionTimeout(); + serverConnectionTimeout = attributes.getServerConnectionTimeout(); loadConditioningInterval = attributes.getLoadConditioningInterval(); socketBufferSize = attributes.getSocketBufferSize(); threadLocalConnections = attributes.getThreadLocalConnections(); @@ -279,7 +281,7 @@ public class PoolImpl implements InternalPool { // Fix for 43468 - make sure we check the cache cancel criterion if we get // an exception, by passing in the poolOrCache stopper executor = new OpExecutorImpl(manager, queueManager, endpointManager, riTracker, retryAttempts, - freeConnectionTimeout, new PoolOrCacheStopper(), this); + freeConnectionTimeout, serverConnectionTimeout, new PoolOrCacheStopper(), this); if (multiuserSecureModeEnabled) { proxyCacheList = new ArrayList<>(); } else { @@ -297,6 +299,7 @@ public class PoolImpl implements InternalPool { if (p == null) return false; return getFreeConnectionTimeout() == p.getFreeConnectionTimeout() + && getServerConnectionTimeout() == p.getServerConnectionTimeout() && getSocketConnectTimeout() == p.getSocketConnectTimeout() && getLoadConditioningInterval() == p.getLoadConditioningInterval() && getSocketBufferSize() == p.getSocketBufferSize() @@ -400,6 +403,11 @@ public class PoolImpl implements InternalPool { } @Override + public int getServerConnectionTimeout() { + return serverConnectionTimeout; + } + + @Override public int getLoadConditioningInterval() { return loadConditioningInterval; } @@ -686,6 +694,10 @@ public class PoolImpl implements InternalPool { throw new RuntimeException( String.format("Pool %s is different", "connectionTimeout")); } + if (getServerConnectionTimeout() != other.getServerConnectionTimeout()) { + throw new RuntimeException( + String.format("Pool %s is different", "serverConnectionTimeout")); + } if (getLoadConditioningInterval() != other.getLoadConditioningInterval()) { throw new RuntimeException( String.format("Pool %s is different", "connectionLifetime")); @@ -927,10 +939,14 @@ public class PoolImpl implements InternalPool { } /** - * Test hook that acquires and returns a connection from the pool with a given ServerLocation. + * Borrows a connection to a specific server from the pool.. Used by gateway and tests. Any + * connection + * that is acquired using this method must be returned using returnConnection, even if it is + * destroyed. + * */ public Connection acquireConnection(ServerLocation loc) { - return manager.borrowConnection(loc, false); + return manager.borrowConnection(loc, serverConnectionTimeout, false); } /** diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManager.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManager.java index 7021cdd..8861821 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManager.java +++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManager.java @@ -56,17 +56,18 @@ public interface ConnectionManager { * no connection is available. * * @param server The server the connection needs to be to. + * @param aquireTimeout The amount of time to wait for a connection to become available, if + * onlyUseExistingCnx is set to true. * @param onlyUseExistingCnx if true, will not create a new connection if none are available. * @return A connection to use. * @throws AllConnectionsInUseException If there is no available connection on the desired server, * and onlyUseExistingCnx is set. - * @throws ServerOperationException If there is an issue creating the connection due to security * @throws NoAvailableServersException If we can't connect to any server * @throws ServerConnectivityException If finding a connection and creating a connection both fail * to return a connection * */ - Connection borrowConnection(ServerLocation server, boolean onlyUseExistingCnx) + Connection borrowConnection(ServerLocation server, long aquireTimeout, boolean onlyUseExistingCnx) throws AllConnectionsInUseException, NoAvailableServersException; /** diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImpl.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImpl.java index b3bffc9..d7295f6 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImpl.java +++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImpl.java @@ -238,7 +238,7 @@ public class ConnectionManagerImpl implements ConnectionManager { return true; } - return timeout < System.nanoTime(); + return timeout <= System.nanoTime(); } @@ -301,31 +301,50 @@ public class ConnectionManagerImpl implements ConnectionManager { throw new AllConnectionsInUseException(); } - /** - * Borrow a connection to a specific server. This task currently allows us to break the connection - * limit, because it is used by tasks from the background thread that shouldn't be constrained by - * the limit. They will only violate the limit by 1 connection, and that connection will be - * destroyed when returned to the pool. - */ @Override - public PooledConnection borrowConnection(ServerLocation server, - boolean onlyUseExistingCnx) throws AllConnectionsInUseException, NoAvailableServersException { - PooledConnection connection = - availableConnectionManager.useFirst((c) -> c.getServer().equals(server)); - if (null != connection) { - return connection; - } + public PooledConnection borrowConnection(ServerLocation server, long acquireTimeout, + boolean onlyUseExistingCnx) + throws AllConnectionsInUseException, NoAvailableServersException, + ServerConnectivityException { - if (onlyUseExistingCnx) { - throw new AllConnectionsInUseException(); - } + PooledConnection connection; + logger.trace("Connection borrowConnection single hop connection"); + + long waitStart = NOT_WAITING; + try { + long timeout = System.nanoTime() + MILLISECONDS.toNanos(acquireTimeout); + while (true) { + + connection = + availableConnectionManager.useFirst((c) -> c.getServer().equals(server)); + + if (null != connection) { + return connection; + } + + if (!onlyUseExistingCnx) { + connection = forceCreateConnection(server); + if (null != connection) { + return connection; + } + throw new ServerConnectivityException(BORROW_CONN_ERROR_MSG + server); + } + + if (checkShutdownInterruptedOrTimeout(timeout)) { + break; + } + + waitStart = beginConnectionWaitStatIfNotStarted(waitStart); - connection = forceCreateConnection(server); - if (null != connection) { - return connection; + Thread.yield(); + } + } finally { + endConnectionWaitStatIfStarted(waitStart); } - throw new ServerConnectivityException(BORROW_CONN_ERROR_MSG + server); + cancelCriterion.checkCancelInProgress(null); + + throw new AllConnectionsInUseException(); } @Override diff --git a/geode-core/src/main/java/org/apache/geode/cache/configuration/PoolType.java b/geode-core/src/main/java/org/apache/geode/cache/configuration/PoolType.java index aeb8a47..50b464d 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/configuration/PoolType.java +++ b/geode-core/src/main/java/org/apache/geode/cache/configuration/PoolType.java @@ -67,6 +67,7 @@ import org.apache.geode.annotations.Experimental; * <attribute name="subscription-timeout-multiplier" type="{http://www.w3.org/2001/XMLSchema}string" /> * <attribute name="socket-connect-timeout" type="{http://www.w3.org/2001/XMLSchema}string" /> * <attribute name="free-connection-timeout" type="{http://www.w3.org/2001/XMLSchema}string" /> + * <attribute name="server-connection-timeout" type="{http://www.w3.org/2001/XMLSchema}string" /> * <attribute name="load-conditioning-interval" type="{http://www.w3.org/2001/XMLSchema}string" /> * <attribute name="min-connections" type="{http://www.w3.org/2001/XMLSchema}string" /> * <attribute name="max-connections" type="{http://www.w3.org/2001/XMLSchema}string" /> @@ -108,6 +109,8 @@ public class PoolType { private String socketConnectTimeout; @XmlAttribute(name = "free-connection-timeout") protected String freeConnectionTimeout; + @XmlAttribute(name = "server-connection-timeout") + protected String serverConnectionTimeout; @XmlAttribute(name = "load-conditioning-interval") protected String loadConditioningInterval; @XmlAttribute(name = "min-connections") @@ -273,6 +276,28 @@ public class PoolType { } /** + * Gets the value of the serverConnectionTimeout property. + * + * possible object is + * {@link String } + * + */ + public String getServerConnectionTimeout() { + return serverConnectionTimeout; + } + + /** + * Sets the value of the serverConnectionTimeout property. + * + * allowed object is + * {@link String } + * + */ + public void setServerConnectionTimeout(String value) { + serverConnectionTimeout = value; + } + + /** * Gets the value of the loadConditioningInterval property. * * possible object is diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PoolFactoryImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PoolFactoryImpl.java index 5cc4a18..4cbe6e8 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PoolFactoryImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PoolFactoryImpl.java @@ -89,6 +89,16 @@ public class PoolFactoryImpl implements InternalPoolFactory { } @Override + public PoolFactory setServerConnectionTimeout(int serverConnectionTimeout) { + if (serverConnectionTimeout < 0) { + throw new IllegalArgumentException( + "serverConnectionTimeout must be greater than or equal to 0"); + } + attributes.serverConnectionTimeout = serverConnectionTimeout; + return this; + } + + @Override public PoolFactory setLoadConditioningInterval(int connectionLifetime) { if (connectionLifetime < -1) { throw new IllegalArgumentException("connectionLifetime must be greater than or equal to -1"); @@ -308,6 +318,7 @@ public class PoolFactoryImpl implements InternalPoolFactory { public void init(Pool cp) { setSocketConnectTimeout(cp.getSocketConnectTimeout()); setFreeConnectionTimeout(cp.getFreeConnectionTimeout()); + setServerConnectionTimeout(cp.getServerConnectionTimeout()); setLoadConditioningInterval(cp.getLoadConditioningInterval()); setSocketBufferSize(cp.getSocketBufferSize()); setReadTimeout(cp.getReadTimeout()); @@ -407,6 +418,7 @@ public class PoolFactoryImpl implements InternalPoolFactory { int socketConnectTimeout = DEFAULT_SOCKET_CONNECT_TIMEOUT; SocketFactory socketFactory = DEFAULT_SOCKET_FACTORY; int connectionTimeout = DEFAULT_FREE_CONNECTION_TIMEOUT; + int serverConnectionTimeout = DEFAULT_SERVER_CONNECTION_TIMEOUT; int connectionLifetime = DEFAULT_LOAD_CONDITIONING_INTERVAL; public int socketBufferSize = DEFAULT_SOCKET_BUFFER_SIZE; @Deprecated @@ -447,6 +459,11 @@ public class PoolFactoryImpl implements InternalPoolFactory { } @Override + public int getServerConnectionTimeout() { + return serverConnectionTimeout; + } + + @Override public int getLoadConditioningInterval() { return connectionLifetime; } @@ -619,6 +636,7 @@ public class PoolFactoryImpl implements InternalPoolFactory { public void toData(DataOutput out) throws IOException { DataSerializer.writePrimitiveInt(connectionTimeout, out); + DataSerializer.writePrimitiveInt(serverConnectionTimeout, out); DataSerializer.writePrimitiveInt(connectionLifetime, out); DataSerializer.writePrimitiveInt(socketBufferSize, out); DataSerializer.writePrimitiveInt(readTimeout, out); @@ -641,6 +659,7 @@ public class PoolFactoryImpl implements InternalPoolFactory { public void fromData(DataInput in) throws IOException, ClassNotFoundException { connectionTimeout = DataSerializer.readPrimitiveInt(in); + serverConnectionTimeout = DataSerializer.readPrimitiveInt(in); connectionLifetime = DataSerializer.readPrimitiveInt(in); socketBufferSize = DataSerializer.readPrimitiveInt(in); readTimeout = DataSerializer.readPrimitiveInt(in); @@ -664,7 +683,8 @@ public class PoolFactoryImpl implements InternalPoolFactory { @Override public int hashCode() { return Objects - .hash(socketConnectTimeout, connectionTimeout, connectionLifetime, socketBufferSize, + .hash(socketConnectTimeout, connectionTimeout, serverConnectionTimeout, + connectionLifetime, socketBufferSize, threadLocalConnections, readTimeout, minConnections, maxConnections, idleTimeout, retryAttempts, pingInterval, statisticInterval, queueEnabled, prSingleHopEnabled, queueRedundancyLevel, queueMessageTrackingTimeout, queueAckInterval, @@ -683,6 +703,7 @@ public class PoolFactoryImpl implements InternalPoolFactory { PoolAttributes that = (PoolAttributes) o; return socketConnectTimeout == that.socketConnectTimeout && connectionTimeout == that.connectionTimeout + && serverConnectionTimeout == that.serverConnectionTimeout && connectionLifetime == that.connectionLifetime && socketBufferSize == that.socketBufferSize && threadLocalConnections == that.threadLocalConnections diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXml.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXml.java index 71c67ab..be6e810 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXml.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXml.java @@ -618,6 +618,7 @@ public abstract class CacheXml implements EntityResolver2, ErrorHandler { public static final String SUBSCRIPTION_TIMEOUT_MULTIPLIER = "subscription-timeout-multiplier"; public static final String SOCKET_CONNECT_TIMEOUT = "socket-connect-timeout"; public static final String FREE_CONNECTION_TIMEOUT = "free-connection-timeout"; + public static final String SERVER_CONNECTION_TIMEOUT = "server-connection-timeout"; public static final String LOAD_CONDITIONING_INTERVAL = "load-conditioning-interval"; public static final String MIN_CONNECTIONS = "min-connections"; public static final String RETRY_ATTEMPTS = "retry-attempts"; diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXmlGenerator.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXmlGenerator.java index e1babce..68434d8 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXmlGenerator.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXmlGenerator.java @@ -1153,6 +1153,10 @@ public class CacheXmlGenerator extends CacheXml implements XMLReader { atts.addAttribute("", "", FREE_CONNECTION_TIMEOUT, "", String.valueOf(cp.getFreeConnectionTimeout())); if (generateDefaults() + || cp.getServerConnectionTimeout() != PoolFactory.DEFAULT_SERVER_CONNECTION_TIMEOUT) + atts.addAttribute("", "", SERVER_CONNECTION_TIMEOUT, "", + String.valueOf(cp.getServerConnectionTimeout())); + if (generateDefaults() || cp.getLoadConditioningInterval() != PoolFactory.DEFAULT_LOAD_CONDITIONING_INTERVAL) atts.addAttribute("", "", LOAD_CONDITIONING_INTERVAL, "", String.valueOf(cp.getLoadConditioningInterval())); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXmlParser.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXmlParser.java index 7b94562..471e99b 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXmlParser.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXmlParser.java @@ -413,6 +413,10 @@ public class CacheXmlParser extends CacheXml implements ContentHandler { if (v != null) { f.setFreeConnectionTimeout(parseInt(v)); } + v = atts.getValue(SERVER_CONNECTION_TIMEOUT); + if (v != null) { + f.setServerConnectionTimeout(parseInt(v)); + } v = atts.getValue(LOAD_CONDITIONING_INTERVAL); if (v != null) { f.setLoadConditioningInterval(parseInt(v)); diff --git a/geode-core/src/main/resources/META-INF/schemas/geode.apache.org/schema/cache/cache-1.0.xsd b/geode-core/src/main/resources/META-INF/schemas/geode.apache.org/schema/cache/cache-1.0.xsd index d8d45e4..5d892d5 100755 --- a/geode-core/src/main/resources/META-INF/schemas/geode.apache.org/schema/cache/cache-1.0.xsd +++ b/geode-core/src/main/resources/META-INF/schemas/geode.apache.org/schema/cache/cache-1.0.xsd @@ -1248,6 +1248,7 @@ As of 6.5 disk-dirs is deprecated on region-attributes. Use disk-store-name inst <xsd:attribute name="subscription-timeout-multiplier" type="xsd:string" use="optional" /> <xsd:attribute name="socket-connect-timeout" type="xsd:string" use="optional" /> <xsd:attribute name="free-connection-timeout" type="xsd:string" use="optional" /> + <xsd:attribute name="server-connection-timeout" type="xsd:string" use="optional" /> <xsd:attribute name="load-conditioning-interval" type="xsd:string" use="optional" /> <xsd:attribute name="min-connections" type="xsd:string" use="optional" /> <xsd:attribute name="max-connections" type="xsd:string" use="optional" /> diff --git a/geode-core/src/main/resources/META-INF/schemas/schema.pivotal.io/gemfire/cache/cache-8.1.xsd b/geode-core/src/main/resources/META-INF/schemas/schema.pivotal.io/gemfire/cache/cache-8.1.xsd index a6add2c..910b095 100644 --- a/geode-core/src/main/resources/META-INF/schemas/schema.pivotal.io/gemfire/cache/cache-8.1.xsd +++ b/geode-core/src/main/resources/META-INF/schemas/schema.pivotal.io/gemfire/cache/cache-8.1.xsd @@ -1240,6 +1240,7 @@ As of 6.5 disk-dirs is deprecated on region-attributes. Use disk-store-name inst </xsd:element> </xsd:choice> <xsd:attribute name="free-connection-timeout" type="xsd:string" use="optional" /> + <xsd:attribute name="server-connection-timeout" type="xsd:string" use="optional" /> <xsd:attribute name="load-conditioning-interval" type="xsd:string" use="optional" /> <xsd:attribute name="min-connections" type="xsd:string" use="optional" /> <xsd:attribute name="max-connections" type="xsd:string" use="optional" /> diff --git a/geode-core/src/main/resources/org/apache/geode/cache/doc-files/cache7_0.dtd b/geode-core/src/main/resources/org/apache/geode/cache/doc-files/cache7_0.dtd index 5dce436..9358358 100644 --- a/geode-core/src/main/resources/org/apache/geode/cache/doc-files/cache7_0.dtd +++ b/geode-core/src/main/resources/org/apache/geode/cache/doc-files/cache7_0.dtd @@ -379,6 +379,7 @@ A "pool" element specifies a client to server connection pool. <!ELEMENT pool (locator+|server+)> <!ATTLIST pool free-connection-timeout CDATA #IMPLIED + server-connection-timeout CDATA #IMPLIED load-conditioning-interval CDATA #IMPLIED min-connections CDATA #IMPLIED max-connections CDATA #IMPLIED diff --git a/geode-core/src/main/resources/org/apache/geode/cache/doc-files/cache8_0.dtd b/geode-core/src/main/resources/org/apache/geode/cache/doc-files/cache8_0.dtd index a28a17f..ce90823 100644 --- a/geode-core/src/main/resources/org/apache/geode/cache/doc-files/cache8_0.dtd +++ b/geode-core/src/main/resources/org/apache/geode/cache/doc-files/cache8_0.dtd @@ -390,6 +390,7 @@ A "pool" element specifies a client to server connection pool. <!ELEMENT pool (locator+|server+)> <!ATTLIST pool free-connection-timeout CDATA #IMPLIED + server-connection-timeout CDATA #IMPLIED load-conditioning-interval CDATA #IMPLIED min-connections CDATA #IMPLIED max-connections CDATA #IMPLIED diff --git a/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt b/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt index 4c6b8c8..8887f3a 100644 --- a/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt +++ b/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt @@ -305,7 +305,7 @@ org/apache/geode/internal/cache/PartitionedRegionDataStore$CreateBucketResult,fa org/apache/geode/internal/cache/PartitionedRegionException,true,5113786059279106007 org/apache/geode/internal/cache/PartitionedRegionQueryEvaluator$MemberResultsList,false,isLastChunkReceived:boolean org/apache/geode/internal/cache/PartitionedRegionStatus,true,-6755318987122602065,numberOfLocalEntries:int -org/apache/geode/internal/cache/PoolFactoryImpl$PoolAttributes,true,1,connectionLifetime:int,connectionTimeout:int,gateway:boolean,gatewaySender:org/apache/geode/cache/wan/GatewaySender,idleTimeout:long,locators:java/util/ArrayList,maxConnections:int,minConnections:int,multiuserSecureModeEnabled:boolean,pingInterval:long,prSingleHopEnabled:boolean,queueAckInterval:int,queueEnabled:boolean,queueMessageTrackingTimeout:int,queueRedundancyLevel:int,readTimeout:int,retryAttempts:int,serverGro [...] +org/apache/geode/internal/cache/PoolFactoryImpl$PoolAttributes,true,1,connectionLifetime:int,connectionTimeout:int,gateway:boolean,gatewaySender:org/apache/geode/cache/wan/GatewaySender,idleTimeout:long,locators:java/util/ArrayList,maxConnections:int,minConnections:int,multiuserSecureModeEnabled:boolean,pingInterval:long,prSingleHopEnabled:boolean,queueAckInterval:int,queueEnabled:boolean,queueMessageTrackingTimeout:int,queueRedundancyLevel:int,readTimeout:int,retryAttempts:int,serverCon [...] org/apache/geode/internal/cache/PrimaryBucketException,true,1 org/apache/geode/internal/cache/PutAllPartialResultException,true,2411654400733621071,result:org/apache/geode/internal/cache/PutAllPartialResultException$PutAllPartialResult org/apache/geode/internal/cache/PutAllPartialResultException$PutAllPartialResult,true,-2168767259323206954,firstCauseOfFailure:java/lang/Exception,firstFailedKey:java/lang/Object,succeededKeys:org/apache/geode/internal/cache/tier/sockets/VersionedObjectList,totalMapSize:int diff --git a/geode-core/src/test/java/org/apache/geode/cache/client/internal/OpExecutorImplJUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache/client/internal/OpExecutorImplJUnitTest.java index 752ba93..0fa4c37 100644 --- a/geode-core/src/test/java/org/apache/geode/cache/client/internal/OpExecutorImplJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/cache/client/internal/OpExecutorImplJUnitTest.java @@ -45,6 +45,7 @@ import org.junit.experimental.categories.Category; import org.apache.geode.CancelCriterion; import org.apache.geode.LogWriter; import org.apache.geode.cache.client.NoAvailableServersException; +import org.apache.geode.cache.client.PoolFactory; import org.apache.geode.cache.client.ServerConnectivityException; import org.apache.geode.cache.client.ServerOperationException; import org.apache.geode.cache.client.internal.pooling.ConnectionManager; @@ -99,7 +100,7 @@ public class OpExecutorImplJUnitTest { @Test public void testExecute() throws Exception { OpExecutorImpl exec = new OpExecutorImpl(manager, queueManager, endpointManager, riTracker, 3, - 10, cancelCriterion, null); + 10, PoolFactory.DEFAULT_SERVER_CONNECTION_TIMEOUT, cancelCriterion, null); Object result = exec.execute(new Op() { @Override public Object attempt(Connection cnx) throws Exception { @@ -182,7 +183,7 @@ public class OpExecutorImplJUnitTest { @Test public void testExecuteOncePerServer() throws Exception { OpExecutorImpl exec = new OpExecutorImpl(manager, queueManager, endpointManager, riTracker, -1, - 10, cancelCriterion, null); + 10, PoolFactory.DEFAULT_SERVER_CONNECTION_TIMEOUT, cancelCriterion, null); manager.numServers = 5; try { @@ -206,7 +207,7 @@ public class OpExecutorImplJUnitTest { @Test public void testRetryFailedServers() throws Exception { OpExecutorImpl exec = new OpExecutorImpl(manager, queueManager, endpointManager, riTracker, 10, - 10, cancelCriterion, null); + 10, PoolFactory.DEFAULT_SERVER_CONNECTION_TIMEOUT, cancelCriterion, null); manager.numServers = 5; try { @@ -230,7 +231,7 @@ public class OpExecutorImplJUnitTest { @Test public void testExecuteOn() throws Exception { OpExecutorImpl exec = new OpExecutorImpl(manager, queueManager, endpointManager, riTracker, 3, - 10, cancelCriterion, null); + 10, PoolFactory.DEFAULT_SERVER_CONNECTION_TIMEOUT, cancelCriterion, null); ServerLocation server = new ServerLocation("localhost", -1); Object result = exec.executeOn(server, new Op() { @Override @@ -312,7 +313,7 @@ public class OpExecutorImplJUnitTest { @Test public void testExecuteOnAllQueueServers() { OpExecutorImpl exec = new OpExecutorImpl(manager, queueManager, endpointManager, riTracker, 3, - 10, cancelCriterion, null); + 10, PoolFactory.DEFAULT_SERVER_CONNECTION_TIMEOUT, cancelCriterion, null); exec.executeOnAllQueueServers(new Op() { @Override public Object attempt(Connection cnx) throws Exception { @@ -367,7 +368,8 @@ public class OpExecutorImplJUnitTest { public void executeWithServerAffinityDoesNotChangeInitialRetryCountOfZero() { OpExecutorImpl opExecutor = new OpExecutorImpl(manager, queueManager, endpointManager, riTracker, -1, - 10, cancelCriterion, mock(PoolImpl.class)); + 10, PoolFactory.DEFAULT_SERVER_CONNECTION_TIMEOUT, cancelCriterion, + mock(PoolImpl.class)); Op txSynchronizationOp = mock(TXSynchronizationOp.Impl.class); ServerLocation serverLocation = mock(ServerLocation.class); opExecutor.setAffinityRetryCount(0); @@ -381,7 +383,8 @@ public class OpExecutorImplJUnitTest { public void executeWithServerAffinityWithNonZeroAffinityRetryCountWillNotSetToZero() { OpExecutorImpl opExecutor = new OpExecutorImpl(manager, queueManager, endpointManager, riTracker, -1, - 10, cancelCriterion, mock(PoolImpl.class)); + 10, PoolFactory.DEFAULT_SERVER_CONNECTION_TIMEOUT, cancelCriterion, + mock(PoolImpl.class)); Op txSynchronizationOp = mock(TXSynchronizationOp.Impl.class); ServerLocation serverLocation = mock(ServerLocation.class); @@ -396,7 +399,8 @@ public class OpExecutorImplJUnitTest { public void executeWithServerAffinityWithServerConnectivityExceptionIncrementsRetryCountAndResetsToZero() { OpExecutorImpl opExecutor = spy(new OpExecutorImpl(manager, queueManager, endpointManager, riTracker, -1, - 10, cancelCriterion, mock(PoolImpl.class))); + 10, PoolFactory.DEFAULT_SERVER_CONNECTION_TIMEOUT, cancelCriterion, + mock(PoolImpl.class))); Op txSynchronizationOp = mock(TXSynchronizationOp.Impl.class); ServerLocation serverLocation = mock(ServerLocation.class); @@ -418,7 +422,8 @@ public class OpExecutorImplJUnitTest { public void executeWithServerAffinityAndRetryCountGreaterThansTxRetryAttemptThrowsServerConnectivityException() { OpExecutorImpl opExecutor = spy(new OpExecutorImpl(manager, queueManager, endpointManager, riTracker, -1, - 10, cancelCriterion, mock(PoolImpl.class))); + 10, PoolFactory.DEFAULT_SERVER_CONNECTION_TIMEOUT, cancelCriterion, + mock(PoolImpl.class))); Op txSynchronizationOp = mock(TXSynchronizationOp.Impl.class); ServerLocation serverLocation = mock(ServerLocation.class); @@ -453,7 +458,7 @@ public class OpExecutorImplJUnitTest { } @Override - public Connection borrowConnection(ServerLocation server, + public Connection borrowConnection(ServerLocation server, long aquireTimeout, boolean onlyUseExistingCnx) { borrows++; return new DummyConnection(server); diff --git a/geode-core/src/test/java/org/apache/geode/cache/client/internal/TXFailoverOpTest.java b/geode-core/src/test/java/org/apache/geode/cache/client/internal/TXFailoverOpTest.java index 9644cc3..0766640 100644 --- a/geode-core/src/test/java/org/apache/geode/cache/client/internal/TXFailoverOpTest.java +++ b/geode-core/src/test/java/org/apache/geode/cache/client/internal/TXFailoverOpTest.java @@ -26,6 +26,7 @@ import org.junit.rules.ExpectedException; import org.apache.geode.CancelCriterion; import org.apache.geode.cache.TransactionException; +import org.apache.geode.cache.client.PoolFactory; import org.apache.geode.cache.client.ServerConnectivityException; import org.apache.geode.cache.client.internal.pooling.ConnectionManager; import org.apache.geode.distributed.internal.ServerLocation; @@ -55,6 +56,7 @@ public class TXFailoverOpTest { private OpExecutorImpl getTestableOpExecutorImpl() { return new OpExecutorImpl(manager, queueManager, endpointManager, riTracker, 3, 10, + PoolFactory.DEFAULT_SERVER_CONNECTION_TIMEOUT, cancelCriterion, mockPool) { @Override diff --git a/geode-docs/reference/topics/cache_xml.html.md.erb b/geode-docs/reference/topics/cache_xml.html.md.erb index f5c74de..5f55680 100644 --- a/geode-docs/reference/topics/cache_xml.html.md.erb +++ b/geode-docs/reference/topics/cache_xml.html.md.erb @@ -839,7 +839,7 @@ Use for client caches. Defines a client's server pool used to communicate with s </tr> <tr> <td>max-connections</td> -<td>Maximum number of pool connections the pool can create. If the maximum connections are in use, an operation requiring a client-to-server connection blocks until a connection becomes available or the <code class="ph codeph">free-connection-timeout</code> is reached. If set to -1, there is no maximum. The setting must indicate a cap greater than <code class="ph codeph">min-connections</code>. +<td>Maximum number of pool connections the pool can create. If the maximum connections are in use, an operation requiring a client-to-server connection blocks until a connection becomes available or the <code class="ph codeph">free-connection-timeout</code> or <code class="ph codeph">server-connection-timeout</code> is reached. If set to -1, there is no maximum. The setting must indicate a cap greater than <code class="ph codeph">min-connections</code>. <p> <b>Note:</b> If you need to use this to cap your pool connections, you should disable the pool attribute <code class="ph codeph">pr-single-hop-enabled</code>. Leaving single hop enabled can increase thrashing and lower performance.</p></td> @@ -887,6 +887,11 @@ Set this lower than the server’s <code class="ph codeph">maximum-time-between- <td>-1</td> </tr> <tr> +<td>server-connection-timeout</td> +<td>Amount of time a thread will wait to get a pool connection towards specific server, before timing out with an exception. This timeout keeps threads from waiting indefinitely when the pool’s <code class="ph codeph">max-connections</code> has been reached and all connections in the pool are in use by other threads.</td> +<td>0</td> +</tr> +<tr> <td>server-group</td> <td>Logical named server group to use from the pool. A null value uses the global server group to which all servers belong. <p> diff --git a/geode-docs/reference/topics/client-cache.html.md.erb b/geode-docs/reference/topics/client-cache.html.md.erb index 65650f9..0db531b 100644 --- a/geode-docs/reference/topics/client-cache.html.md.erb +++ b/geode-docs/reference/topics/client-cache.html.md.erb @@ -152,7 +152,7 @@ Use for client caches. Defines a client's server pool used to communicate with s </tr> <tr> <td>max-connections</td> -<td>Maximum number of pool connections the pool can create. If the maximum connections are in use, an operation requiring a client-to-server connection blocks until a connection becomes available or the <code class="ph codeph">free-connection-timeout</code> is reached. If set to -1, there is no maximum. The setting must indicate a cap greater than <code class="ph codeph">min-connections</code>. +<td>Maximum number of pool connections the pool can create. If the maximum connections are in use, an operation requiring a client-to-server connection blocks until a connection becomes available or the <code class="ph codeph">free-connection-timeout</code> or <code class="ph codeph">server-connection-timeout</code> is reached. If set to -1, there is no maximum. The setting must indicate a cap greater than <code class="ph codeph">min-connections</code>. <div class="note note"> **Note:** <p>If you need to use this to cap your pool connections, you should disable the pool attribute <code class="ph codeph">pr-single-hop-enabled</code>. Leaving single hop enabled can increase thrashing and lower performance.</p> @@ -203,6 +203,11 @@ Use for client caches. Defines a client's server pool used to communicate with s <td>-1</td> </tr> <tr> +<td>server-connection-timeout</td> +<td>Amount of time a thread will wait to get a pool connection towards specific server, before timing out with an exception. This timeout keeps threads from waiting indefinitely when the pool’s <code class="ph codeph">max-connections</code> has been reached and all connections in the pool are in use by other threads.</td> +<td>0</td> +</tr> +<tr> <td>server-group</td> <td>Logical named server group to use from the pool. A null value uses the global server group to which all servers belong. <div class="note note"> diff --git a/geode-dunit/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheServerTestUtil.java b/geode-dunit/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheServerTestUtil.java index e0ff300..3b24d91 100755 --- a/geode-dunit/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheServerTestUtil.java +++ b/geode-dunit/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheServerTestUtil.java @@ -99,6 +99,7 @@ public class CacheServerTestUtil extends JUnit4DistributedTestCase { ClientCacheFactory ccf = new ClientCacheFactory(dsProperties); if (poolAttr != null) { ccf.setPoolFreeConnectionTimeout(poolAttr.getFreeConnectionTimeout()) + .setPoolServerConnectionTimeout(poolAttr.getServerConnectionTimeout()) .setPoolLoadConditioningInterval(poolAttr.getLoadConditioningInterval()) .setPoolSocketBufferSize(poolAttr.getSocketBufferSize()) .setPoolMinConnections(poolAttr.getMinConnections())