NehanPathan commented on code in PR #1170:
URL: https://github.com/apache/lucenenet/pull/1170#discussion_r2307891036
##########
src/Lucene.Net.Replicator/Http/ReplicationService.cs:
##########
@@ -118,75 +121,128 @@ private static string
ExtractRequestParam(IReplicationRequest request, string pa
return param;
}
- // LUCENENET specific - copy method not used
-
- /// <summary>
- /// Executes the replication task.
- /// </summary>
- /// <exception cref="InvalidOperationException">required parameters
are missing</exception>
- public virtual void Perform(IReplicationRequest request,
IReplicationResponse response)
+ // method to avoid code duplication in sync and async Perform methods
+ private async Task ExecuteReplicationAsync(
+ IReplicationRequest request,
+ IReplicationResponse response,
+ Func<Stream, Task> copyStreamFunc,
+ Func<SessionToken, Task> writeTokenFunc,
+ Func<Task> flushFunc)
{
string[] pathElements = GetPathElements(request);
if (pathElements.Length != 2)
- {
throw ServletException.Create("invalid path, must contain
shard ID and action, e.g. */s1/update");
- }
if (!Enum.TryParse(pathElements[ACTION_IDX], true, out
ReplicationAction action))
- {
throw ServletException.Create("Unsupported action provided: "
+ pathElements[ACTION_IDX]);
- }
if (!replicators.TryGetValue(pathElements[SHARD_IDX], out
IReplicator replicator))
- {
throw ServletException.Create("unrecognized shard ID " +
pathElements[SHARD_IDX]);
- }
- // SOLR-8933 Don't close this stream.
try
{
switch (action)
{
case ReplicationAction.OBTAIN:
- string sessionId = ExtractRequestParam(request,
REPLICATE_SESSION_ID_PARAM);
- string fileName = ExtractRequestParam(request,
REPLICATE_FILENAME_PARAM);
- string source = ExtractRequestParam(request,
REPLICATE_SOURCE_PARAM);
- using (Stream stream =
replicator.ObtainFile(sessionId, source, fileName))
- stream.CopyTo(response.Body);
- break;
+ {
+ string sessionId = ExtractRequestParam(request,
REPLICATE_SESSION_ID_PARAM);
+ string fileName = ExtractRequestParam(request,
REPLICATE_FILENAME_PARAM);
+ string source = ExtractRequestParam(request,
REPLICATE_SOURCE_PARAM);
- case ReplicationAction.RELEASE:
- replicator.Release(ExtractRequestParam(request,
REPLICATE_SESSION_ID_PARAM));
- break;
+ using (Stream stream =
replicator.ObtainFile(sessionId, source, fileName))
+ await copyStreamFunc(stream);
+ break;
+ }
- case ReplicationAction.UPDATE:
- string currentVersion =
request.QueryParam(REPLICATE_VERSION_PARAM);
- SessionToken token =
replicator.CheckForUpdate(currentVersion);
- if (token is null)
+ case ReplicationAction.RELEASE:
{
- response.Body.Write(new byte[] { 0 }, 0, 1); //
marker for null token
+ replicator.Release(ExtractRequestParam(request,
REPLICATE_SESSION_ID_PARAM));
+ break;
}
- else
+
+ case ReplicationAction.UPDATE:
{
- response.Body.Write(new byte[] { 1 }, 0, 1);
- token.Serialize(new
DataOutputStream(response.Body));
+ string currentVersion =
request.QueryParam(REPLICATE_VERSION_PARAM);
+ SessionToken token =
replicator.CheckForUpdate(currentVersion);
+ await writeTokenFunc(token);
+ break;
}
- break;
- // LUCENENET specific:
default:
if (Debugging.AssertsEnabled) Debugging.Assert(false,
"Invalid ReplicationAction specified");
break;
}
}
catch (Exception)
{
- response.StatusCode = (int)HttpStatusCode.InternalServerError;
// propagate the failure
+ response.StatusCode = (int)HttpStatusCode.InternalServerError;
}
finally
{
- response.Flush();
+ await flushFunc();
}
}
+
+ // LUCENENET specific - copy method not used
+
+ /// <summary>
+ /// Executes the replication task.
+ /// </summary>
+ /// <exception cref="InvalidOperationException">required parameters
are missing</exception>
+ public virtual void Perform(IReplicationRequest request,
IReplicationResponse response)
+ {
+ ExecuteReplicationAsync(
+ request,
+ response,
+ stream => { stream.CopyTo(response.Body); return
Task.CompletedTask; },
+ token =>
+ {
+ if (token == null)
+ {
+ response.Body.Write(new byte[] { 0 }, 0, 1);
+ }
+ else
+ {
+ response.Body.Write(new byte[] { 1 }, 0, 1);
+ token.Serialize(new DataOutputStream(response.Body));
+ }
+ return Task.CompletedTask;
+ },
+ () => { response.Flush(); return Task.CompletedTask; }
+ ).GetAwaiter().GetResult(); // // keep sync behavior
+ }
+
+
+ /// <summary>
+ /// Executes the replication task asynchronously.
+ /// </summary>
+ /// <param name="request">The replication request containing action
and parameters.</param>
+ /// <param name="response">The replication response used to send data
back to the client.</param>
+ /// <param name="cancellationToken">A <see cref="CancellationToken"/>
to observe while performing the replication.</param>
+ /// <exception cref="InvalidOperationException">Thrown when required
parameters are missing or invalid.</exception>
+ public virtual Task PerformAsync(
+ IReplicationRequest request,
+ IReplicationResponse response,
+ CancellationToken cancellationToken = default)
+ {
+ return ExecuteReplicationAsync(
+ request,
+ response,
+ stream => stream.CopyToAsync(response.Body, 81920,
cancellationToken),
Review Comment:
@NightOwl888
https://learn.microsoft.com/en-us/dotnet/api/system.io.compression.gzipstream.copytoasync?view=net-9.0
I double-checked the docs, and the default buffer size for
Stream.CopyToAsync is actually 81920 bytes (80 KB), not 8192. We used 81920
here intentionally to match the framework default and avoid unnecessarily
reducing throughput.
Did you suggest 8192 for a specific performance or compatibility reason...
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]