Github user FlorianHockmann commented on a diff in the pull request:
https://github.com/apache/tinkerpop/pull/903#discussion_r210924567
--- Diff: gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPool.cs ---
@@ -33,91 +34,134 @@ internal class ConnectionPool : IDisposable
{
private readonly ConnectionFactory _connectionFactory;
private readonly ConcurrentBag<Connection> _connections = new
ConcurrentBag<Connection>();
- private readonly object _connectionsLock = new object();
+ private readonly AutoResetEvent _newConnectionAvailable = new
AutoResetEvent(false);
+ private readonly int _minPoolSize;
+ private readonly int _maxPoolSize;
+ private readonly TimeSpan _waitForConnectionTimeout;
+ private int _nrConnections;
- public ConnectionPool(ConnectionFactory connectionFactory)
+ public ConnectionPool(ConnectionFactory connectionFactory,
ConnectionPoolSettings settings)
{
_connectionFactory = connectionFactory;
+ _minPoolSize = settings.MinSize;
+ _maxPoolSize = settings.MaxSize;
+ _waitForConnectionTimeout = settings.WaitForConnectionTimeout;
+ PopulatePoolAsync().WaitUnwrap();
}
- public int NrConnections { get; private set; }
+ public int NrConnections => Interlocked.CompareExchange(ref
_nrConnections, 0, 0);
+
+ private async Task PopulatePoolAsync()
+ {
+ var connectionCreationTasks = new
List<Task<Connection>>(_minPoolSize);
+ for (var i = 0; i < _minPoolSize; i++)
+ {
+ connectionCreationTasks.Add(CreateNewConnectionAsync());
+ }
+
+ var createdConnections = await
Task.WhenAll(connectionCreationTasks).ConfigureAwait(false);
+ foreach (var c in createdConnections)
+ {
+ _connections.Add(c);
+ }
+
+ Interlocked.CompareExchange(ref _nrConnections, _minPoolSize,
0);
+ }
public async Task<IConnection> GetAvailableConnectionAsync()
{
- if (!TryGetConnectionFromPool(out var connection))
- connection = await
CreateNewConnectionAsync().ConfigureAwait(false);
+ if (TryGetConnectionFromPool(out var connection))
+ return ProxiedConnection(connection);
+ connection = await
AddConnectionIfUnderMaximumAsync().ConfigureAwait(false) ?? WaitForConnection();
+ return ProxiedConnection(connection);
+ }
- return new ProxyConnection(connection, AddConnectionIfOpen);
+ private IConnection ProxiedConnection(Connection connection)
+ {
+ return new ProxyConnection(connection, ReturnConnectionIfOpen);
}
- private bool TryGetConnectionFromPool(out Connection connection)
+ private void ReturnConnectionIfOpen(Connection connection)
+ {
+ if (!connection.IsOpen)
+ {
+ ConsiderUnavailable();
+ DefinitelyDestroyConnection(connection);
+ return;
+ }
+
+ _connections.Add(connection);
+ _newConnectionAvailable.Set();
+ }
+
+ private async Task<Connection> AddConnectionIfUnderMaximumAsync()
{
while (true)
{
- connection = null;
- lock (_connectionsLock)
- {
- if (_connections.IsEmpty) return false;
- _connections.TryTake(out connection);
- }
+ var nrOpened = Interlocked.CompareExchange(ref
_nrConnections, 0, 0);
+ if (nrOpened >= _maxPoolSize) return null;
- if (connection.IsOpen) return true;
- connection.Dispose();
+ if (Interlocked.CompareExchange(ref _nrConnections,
nrOpened + 1, nrOpened) == nrOpened)
+ break;
}
+
+ return await CreateNewConnectionAsync().ConfigureAwait(false);
}
private async Task<Connection> CreateNewConnectionAsync()
{
- NrConnections++;
var newConnection = _connectionFactory.CreateConnection();
await newConnection.ConnectAsync().ConfigureAwait(false);
return newConnection;
}
- private void AddConnectionIfOpen(Connection connection)
+ private Connection WaitForConnection()
{
- if (!connection.IsOpen)
+ var start = DateTimeOffset.Now;
+ var remaining = _waitForConnectionTimeout;
+ do
{
- ConsiderUnavailable();
- connection.Dispose();
- return;
- }
- AddConnection(connection);
+ if (_newConnectionAvailable.WaitOne(remaining))
--- End diff --
Thanks for pointing this out @jorgebay
I actually experimented with something like an `AutoResetEvent` for the
very first version of Gremlin.Net, but didn't get it to work and therefore
ended up with the plain `lock`. I guess what I was missing back then was simply
the knowledge about the [CAS
operation](https://en.wikipedia.org/wiki/Compare-and-swap) and how it can be
used to avoid synchronous locks.
I'll give this a try again to come up with a version that hopefully
completely removes synchronous blocking. I guess this will result in our own
version of an `AsyncAutoResetEvent` that allows us to wait with a timeout as
that seems to be missing in both existing implementations (from the blog and
the library you linked). (I'm also not sure whether it's worth it to add a
dependency on `AsyncEx` just to get one class.)
---