kenhuuu commented on code in PR #3315:
URL: https://github.com/apache/tinkerpop/pull/3315#discussion_r2933360284
##########
gremlin-dotnet/src/Gremlin.Net/Driver/Connection.cs:
##########
@@ -22,306 +22,151 @@
#endregion
using System;
-using System.Collections.Concurrent;
using System.Collections.Generic;
-using System.Text;
+using System.IO;
+using System.IO.Compression;
+using System.Linq;
+using System.Net.Http;
+using System.Net.Http.Headers;
using System.Threading;
using System.Threading.Tasks;
using Gremlin.Net.Driver.Messages;
using Gremlin.Net.Process;
+using Gremlin.Net.Structure.IO;
namespace Gremlin.Net.Driver
{
- internal interface IResponseHandlerForSingleRequestMessage
+ /// <summary>
+ /// HTTP-based connection that sends requests via HTTP POST to Gremlin
Server.
+ /// </summary>
+ internal class Connection : IDisposable
{
- void HandleReceived(ResponseMessage<List<object>> received);
- void Finalize(Dictionary<string, object> statusAttributes);
- void HandleFailure(Exception objException);
- void Cancel();
- }
+ private const string GraphBinaryMimeType =
SerializationTokens.GraphBinary4MimeType;
- internal class Connection : IConnection
- {
- private readonly IMessageSerializer _messageSerializer;
+ private readonly HttpClient _httpClient;
private readonly Uri _uri;
- private readonly IWebSocketConnection _webSocketConnection;
- private readonly string? _username;
- private readonly string? _password;
- private readonly string? _sessionId;
- private readonly bool _sessionEnabled;
-
- private readonly ConcurrentQueue<(RequestMessage msg,
CancellationToken cancellationToken)> _writeQueue = new();
-
- private readonly ConcurrentDictionary<Guid,
IResponseHandlerForSingleRequestMessage> _callbackByRequestId =
- new();
+ private readonly IMessageSerializer _serializer;
+ private readonly bool _enableCompression;
+ private readonly bool _enableUserAgentOnConnect;
+ private readonly bool _bulkResults;
+ // Interceptor slot reserved for future spec
+ // private readonly IReadOnlyList<Func<HttpRequestMessage, Task>>
_interceptors;
- private readonly ConcurrentDictionary<Guid,
CancellationTokenRegistration> _cancellationByRequestId = new();
- private int _connectionState = 0;
- private int _writeInProgress = 0;
- private const int Closed = 1;
-
- public Connection(IWebSocketConnection webSocketConnection, Uri uri,
string? username, string? password,
- IMessageSerializer messageSerializer, string? sessionId)
+ public Connection(Uri uri, IMessageSerializer serializer,
+ ConnectionSettings settings)
{
_uri = uri;
- _username = username;
- _password = password;
- _sessionId = sessionId;
- if (!string.IsNullOrEmpty(sessionId))
- {
- _sessionEnabled = true;
- }
- _messageSerializer = messageSerializer;
- _webSocketConnection = webSocketConnection;
- }
-
- public async Task ConnectAsync(CancellationToken cancellationToken)
- {
- await _webSocketConnection.ConnectAsync(_uri,
cancellationToken).ConfigureAwait(false);
- BeginReceiving();
- }
-
- public int NrRequestsInFlight => _callbackByRequestId.Count;
-
- public bool IsOpen => _webSocketConnection.IsOpen && Volatile.Read(ref
_connectionState) != Closed;
+ _serializer = serializer;
+ _enableCompression = settings.EnableCompression;
+ _enableUserAgentOnConnect = settings.EnableUserAgentOnConnect;
+ _bulkResults = settings.BulkResults;
- public Task<ResultSet<T>> SubmitAsync<T>(RequestMessage
requestMessage, CancellationToken cancellationToken)
- {
- var receiver = new ResponseHandlerForSingleRequestMessage<T>();
- _callbackByRequestId.GetOrAdd(requestMessage.RequestId, receiver);
-
- _cancellationByRequestId.GetOrAdd(requestMessage.RequestId,
cancellationToken.Register(() =>
- {
- if (_callbackByRequestId.TryRemove(requestMessage.RequestId,
out var responseHandler))
- {
- responseHandler.Cancel();
- }
- }));
- _writeQueue.Enqueue((requestMessage, cancellationToken));
- BeginSendingMessages();
- return receiver.Result;
- }
-
- private void BeginReceiving()
- {
- var state = Volatile.Read(ref _connectionState);
- if (state == Closed) return;
- ReceiveMessagesAsync().Forget();
- }
-
- private async Task ReceiveMessagesAsync()
- {
- while (true)
- {
- try
- {
- var received = await
_webSocketConnection.ReceiveMessageAsync().ConfigureAwait(false);
- await HandleReceivedAsync(received).ConfigureAwait(false);
- }
- catch (Exception e)
- {
- await
CloseConnectionBecauseOfFailureAsync(e).ConfigureAwait(false);
- break;
- }
- }
+#if NET6_0_OR_GREATER
+ var handler = new SocketsHttpHandler
+ {
+ PooledConnectionIdleTimeout = settings.IdleConnectionTimeout,
+ MaxConnectionsPerServer = settings.MaxConnectionsPerServer,
+ ConnectTimeout = settings.ConnectionTimeout,
+ KeepAlivePingTimeout = settings.KeepAliveInterval,
+ };
+ _httpClient = new HttpClient(handler);
+#else
+ _httpClient = new HttpClient();
+ _httpClient.Timeout = settings.ConnectionTimeout;
+#endif
}
- private async Task HandleReceivedAsync(byte[] received)
+ /// <summary>
+ /// Constructor that accepts a pre-configured HttpClient (for
testing).
+ /// </summary>
+ internal Connection(Uri uri, IMessageSerializer serializer,
+ ConnectionSettings settings, HttpClient httpClient)
{
- var receivedMsg = await
_messageSerializer.DeserializeMessageAsync(received).ConfigureAwait(false);
- if (receivedMsg == null)
- {
- throw new InvalidOperationException(
- "Received data deserialized into null object message.
Cannot operate on it.");
- }
-
- try
- {
- HandleReceivedMessage(receivedMsg);
- }
- catch (Exception e)
- {
- if (receivedMsg!.RequestId != null)
- {
-
if(_callbackByRequestId.TryRemove(receivedMsg.RequestId.Value, out var
responseHandler))
- {
- responseHandler?.HandleFailure(e);
-
- }
- if
(_cancellationByRequestId.TryRemove(receivedMsg.RequestId.Value, out var
cancellation))
- {
- cancellation.Dispose();
- }
- }
- }
+ _uri = uri;
+ _serializer = serializer;
+ _enableCompression = settings.EnableCompression;
+ _enableUserAgentOnConnect = settings.EnableUserAgentOnConnect;
+ _bulkResults = settings.BulkResults;
+ _httpClient = httpClient;
}
- private void HandleReceivedMessage(ResponseMessage<List<object>>
receivedMsg)
+ public async Task<ResultSet<T>> SubmitAsync<T>(RequestMessage
requestMessage,
+ CancellationToken cancellationToken = default)
{
- var status = receivedMsg.Status;
- status.ThrowIfStatusIndicatesError();
-
- if (status.Code == ResponseStatusCode.Authenticate)
- {
- Authenticate();
- return;
- }
+ var requestBytes = await
_serializer.SerializeMessageAsync(requestMessage, cancellationToken)
+ .ConfigureAwait(false);
- if (receivedMsg.RequestId == null) return;
+ using var content = new ByteArrayContent(requestBytes);
+ content.Headers.ContentType = new
MediaTypeHeaderValue(GraphBinaryMimeType);
- _callbackByRequestId.TryGetValue(receivedMsg.RequestId.Value, out
var responseHandler);
- if (status.Code != ResponseStatusCode.NoContent)
- {
- responseHandler?.HandleReceived(receivedMsg);
- }
+ using var httpRequest = new HttpRequestMessage(HttpMethod.Post,
_uri);
+ httpRequest.Content = content;
+ httpRequest.Headers.Accept.Add(new
MediaTypeWithQualityHeaderValue(GraphBinaryMimeType));
- if (status.Code == ResponseStatusCode.Success || status.Code ==
ResponseStatusCode.NoContent)
+ if (_enableCompression)
{
- if
(_cancellationByRequestId.TryRemove(receivedMsg.RequestId.Value, out var
cancellation))
- {
- cancellation.Dispose();
- }
- responseHandler?.Finalize(status.Attributes);
- _callbackByRequestId.TryRemove(receivedMsg.RequestId.Value,
out _);
+ httpRequest.Headers.AcceptEncoding.Add(new
StringWithQualityHeaderValue("deflate"));
}
- }
-
- private void Authenticate()
- {
- if (string.IsNullOrEmpty(_username) ||
string.IsNullOrEmpty(_password))
- throw new InvalidOperationException(
- $"The Gremlin Server requires authentication, but no
credentials are specified - username: {_username}, password: {_password}.");
-
- var message =
RequestMessage.Build(Tokens.OpsAuthentication).Processor(Tokens.ProcessorTraversal)
- .AddArgument(Tokens.ArgsSasl, SaslArgument()).Create();
- _writeQueue.Enqueue((message, CancellationToken.None));
- BeginSendingMessages();
- }
-
- private string SaslArgument()
- {
- var auth = $"\0{_username}\0{_password}";
- var authBytes = Encoding.UTF8.GetBytes(auth);
- return Convert.ToBase64String(authBytes);
- }
-
- private void BeginSendingMessages()
- {
- if (Interlocked.CompareExchange(ref _writeInProgress, 1, 0) != 0)
- return;
- SendMessagesFromQueueAsync().Forget();
- }
-
- private async Task SendMessagesFromQueueAsync()
- {
- while (_writeQueue.TryDequeue(out var msg))
+ if (_enableUserAgentOnConnect)
{
- try
- {
- await SendMessageAsync(msg.msg,
msg.cancellationToken).ConfigureAwait(false);
- }
- catch (OperationCanceledException e) when
(msg.cancellationToken == e.CancellationToken)
- {
- // Send was cancelled for this message -> silently catch
as we want to continue sending from this
- // connection. The task responsible for submitting this
message will be cancelled by the
- // `ResponseHandlerForSingleRequestMessage`.
- }
- catch (Exception e)
- {
- await
CloseConnectionBecauseOfFailureAsync(e).ConfigureAwait(false);
- break;
- }
+ httpRequest.Headers.TryAddWithoutValidation("User-Agent",
Utils.UserAgent);
}
- Interlocked.CompareExchange(ref _writeInProgress, 0, 1);
- // Since the loop ended and the write in progress was set to 0
- // a new item could have been added, write queue can contain items
at this time
- if (!_writeQueue.IsEmpty && Interlocked.CompareExchange(ref
_writeInProgress, 1, 0) == 0)
+ if (_bulkResults)
{
- await SendMessagesFromQueueAsync().ConfigureAwait(false);
+ httpRequest.Headers.Add("bulkResults", "true");
}
- }
- private async Task CloseConnectionBecauseOfFailureAsync(Exception
exception)
- {
- EmptyWriteQueue();
- await CloseAsync().ConfigureAwait(false);
- NotifyAboutConnectionFailure(exception);
- }
+ // Future: apply interceptors here
- private void EmptyWriteQueue()
- {
- while (_writeQueue.TryDequeue(out _))
- {
- }
- }
+ using var response = await _httpClient.SendAsync(httpRequest,
cancellationToken)
+ .ConfigureAwait(false);
- private void NotifyAboutConnectionFailure(Exception exception)
- {
- foreach (var cb in _callbackByRequestId.Values)
+ // If the HTTP status indicates an error and the response is not
GraphBinary
+ // (e.g. a proxy 502 or server 404 returning HTML/plain text),
fail fast with
+ // a clear message instead of letting the deserializer throw a
confusing parse error.
+ // When the Content-Type matches GraphBinary, fall through to
normal deserialization
+ // so the status footer in the GB4 response can surface the
application-level error.
+ if (!response.IsSuccessStatusCode &&
+ response.Content.Headers.ContentType?.MediaType !=
GraphBinaryMimeType)
{
- cb.HandleFailure(exception);
+ var errorBody = await
response.Content.ReadAsStringAsync().ConfigureAwait(false);
+ throw new HttpRequestException(
+ $"Gremlin Server returned HTTP {(int)response.StatusCode}:
{errorBody}");
}
- _callbackByRequestId.Clear();
- DisposeCancellationRegistrations();
- }
- private async Task SendMessageAsync(RequestMessage message,
CancellationToken cancellationToken)
- {
- if (_sessionEnabled)
- {
- message = RebuildSessionMessage(message);
- }
+ var responseBytes = await
ReadResponseBytesAsync(response).ConfigureAwait(false);
- var serializedMsg = await
_messageSerializer.SerializeMessageAsync(message, cancellationToken)
+ var responseMessage = await
_serializer.DeserializeMessageAsync(responseBytes, cancellationToken)
.ConfigureAwait(false);
-#if NET6_0_OR_GREATER
- if (message.Processor == Tokens.OpsAuthentication)
- {
- // Don't compress a message that contains credentials to
prevent attacks like CRIME or BREACH
- await
_webSocketConnection.SendMessageUncompressedAsync(serializedMsg,
cancellationToken).ConfigureAwait(false);
- return;
- }
-#endif
- await _webSocketConnection.SendMessageAsync(serializedMsg,
cancellationToken).ConfigureAwait(false);
+
+ return BuildResultSet<T>(responseMessage);
}
- private RequestMessage RebuildSessionMessage(RequestMessage message)
+ private static async Task<byte[]>
ReadResponseBytesAsync(HttpResponseMessage response)
{
- if (message.Processor == Tokens.OpsAuthentication)
+ using var stream = await
response.Content.ReadAsStreamAsync().ConfigureAwait(false);
+ if (response.Content.Headers.ContentEncoding.Contains("deflate"))
{
- return message;
+ using var deflateStream = new DeflateStream(stream,
CompressionMode.Decompress);
+ using var ms = new MemoryStream();
+ await deflateStream.CopyToAsync(ms).ConfigureAwait(false);
+ return ms.ToArray();
}
-
- var msgBuilder = RequestMessage.Build(message.Operation)
-
.OverrideRequestId(message.RequestId).Processor(Tokens.ProcessorSession);
- foreach(var kv in message.Arguments)
- {
- msgBuilder.AddArgument(kv.Key, kv.Value);
- }
- msgBuilder.AddArgument(Tokens.ArgsSession, _sessionId!);
- return msgBuilder.Create();
+ using var memoryStream = new MemoryStream();
+ await stream.CopyToAsync(memoryStream).ConfigureAwait(false);
+ return memoryStream.ToArray();
}
- public async Task CloseAsync()
+ private static ResultSet<T>
BuildResultSet<T>(ResponseMessage<List<object>> responseMessage)
Review Comment:
Whats the point of this? You are just making a copy of one list to another?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]