NightOwl888 commented on code in PR #1182:
URL: https://github.com/apache/lucenenet/pull/1182#discussion_r2337789009
##########
src/Lucene.Net.Replicator/Http/HttpClientBase.cs:
##########
@@ -217,6 +240,31 @@ private string QueryString(string request, params string[]
parameters)
.Join("&",
parameters.Select(WebUtility.UrlEncode).InPairs((key, val) =>
string.Format("{0}={1}", key, val))));
}
+ // Add this property so subclasses can access the HttpClient instance
+ protected HttpClient Client => httpc;
+
+ // BuildUrl helpers (mirror the QueryString overloads)
+ protected virtual string BuildUrl(string action, string[] parameters)
Review Comment:
I am curious why are we wrapping the private `QueryString()` method and
making it protected, here?
##########
src/Lucene.Net.Replicator/Http/HttpClientBase.cs:
##########
@@ -217,6 +240,31 @@ private string QueryString(string request, params string[]
parameters)
.Join("&",
parameters.Select(WebUtility.UrlEncode).InPairs((key, val) =>
string.Format("{0}={1}", key, val))));
}
+ // Add this property so subclasses can access the HttpClient instance
+ protected HttpClient Client => httpc;
+
+ // BuildUrl helpers (mirror the QueryString overloads)
+ protected virtual string BuildUrl(string action, string[] parameters)
+ {
+ // QueryString has signature: QueryString(string request, params
string[] parameters)
+ return QueryString(action, parameters);
+ }
+
+ protected virtual string BuildUrl(
Review Comment:
I am curious why are we wrapping the private `QueryString()` method and
making it protected, here?
##########
src/Lucene.Net.Replicator/Http/HttpClientBase.cs:
##########
@@ -262,6 +310,37 @@ public virtual Stream
GetResponseStream(HttpResponseMessage response, bool consu
return result;
}
+ /// <summary>
Review Comment:
Note that above in `GetResponseStream()`, in `net8.0` there is now a
synchronous overload of `ReadAsStream()` that we should be calling here,
instead.
Please add a new feature `FEATURE_HTTPCONTENT_READASSTREAM` to the root
`Directory.Build.targets`.
##########
src/Lucene.Net.Replicator/ReplicationClient.cs:
##########
@@ -151,6 +155,20 @@ public ReplicationClient(IReplicator replicator,
IReplicationHandler handler, IS
this.factory = factory;
}
+ /// <summary>
+ /// Constructor for async replicators.
+ /// </summary>
+ /// <param name="asyncReplicator"></param>
+ /// <param name="handler"></param>
+ /// <param name="factory"></param>
+ /// <exception cref="ArgumentNullException"></exception>
+ public ReplicationClient(IAsyncReplicator asyncReplicator,
IReplicationHandler handler, ISourceDirectoryFactory factory)
Review Comment:
I suppose it is unlikely a user will require both async and synchronous
methods at the same time. And it would clarify the `StartAsyncUpdateLoop` vs
`StartUpdateThread()` logic. Although, I think both of those could be
potentially combined.
I am hesitant to get on board with having completely separate
implementations, though. That is not typically how concrete components in the
BCL evolve. Usually, the synchronous and asynchronous methods exist side by
side.
##########
src/Lucene.Net.Replicator/Http/HttpClientBase.cs:
##########
@@ -262,6 +310,37 @@ public virtual Stream
GetResponseStream(HttpResponseMessage response, bool consu
return result;
}
+ /// <summary>
+ /// Internal utility: input stream of the provided response
asynchronously.
+ /// </summary>
+ /// <exception cref="IOException"></exception>
+ public virtual async Task<Stream>
GetResponseStreamAsync(HttpResponseMessage response, CancellationToken
cancellationToken = default)
+ {
+ #if NET8_0_OR_GREATER
Review Comment:
Please add a new feature
`FEATURE_HTTPCONTENT_READASSTREAM_CANCELLATIONTOKEN` to the root
`Directory.Build.targets` instead of using `NET8_0_OR_GREATER`.
##########
src/Lucene.Net.Replicator/Http/HttpClientBase.cs:
##########
@@ -262,6 +310,37 @@ public virtual Stream
GetResponseStream(HttpResponseMessage response, bool consu
return result;
}
+ /// <summary>
+ /// Internal utility: input stream of the provided response
asynchronously.
+ /// </summary>
+ /// <exception cref="IOException"></exception>
+ public virtual async Task<Stream>
GetResponseStreamAsync(HttpResponseMessage response, CancellationToken
cancellationToken = default)
+ {
+ #if NET8_0_OR_GREATER
+ Stream result = await
response.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false);
+ #else
+ Stream result = await
response.Content.ReadAsStreamAsync().ConfigureAwait(false);
+ #endif
+ return result;
+ }
+
+ /// <summary>
+ /// Internal utility: input stream of the provided response
asynchronously, which optionally
+ /// consumes the response's resources when the input stream is
exhausted.
+ /// </summary>
+ /// <exception cref="IOException"></exception>
+ public virtual async Task<Stream>
GetResponseStreamAsync(HttpResponseMessage response, bool consume,
CancellationToken cancellationToken = default)
+ {
+ #if NET8_0_OR_GREATER
Review Comment:
Please add a new feature
`FEATURE_HTTPCONTENT_READASSTREAM_CANCELLATIONTOKEN` to the root
`Directory.Build.targets` instead of using `NET8_0_OR_GREATER`.
##########
src/Lucene.Net.Replicator/Http/HttpClientBase.cs:
##########
@@ -200,6 +200,29 @@ protected virtual HttpResponseMessage ExecuteGet(string
request, params string[]
return Execute(req);
}
+ /// <summary>
+ /// Execute a GET request asynchronously with an array of parameters.
+ /// </summary>
+ protected Task<HttpResponseMessage> ExecuteGetAsync(string action,
string[] parameters, CancellationToken cancellationToken)
Review Comment:
Perhaps we should fix `ExecutePost()` in this PR and roll #1176 in with
this? We will just have to add an `ExecutePostAsync()` method later, anyway.
##########
src/Lucene.Net.Replicator/ReplicationClient.cs:
##########
@@ -468,6 +695,25 @@ public virtual void UpdateNow()
}
}
+ /// <summary>
+ /// Executes the update operation asynchronously immediately,
regardless if an update thread is running or not.
+ /// </summary>
+ public virtual async Task UpdateNowAsync(CancellationToken
cancellationToken = default)
+ {
+ EnsureOpen();
+
+ // Acquire the same update lock to prevent concurrent updates
+ updateLock.Lock();
Review Comment:
According to ChatGPT, no this is not. Calling async code while using a
synchronous lock can lead to deadlocks.
Do note that in other places where the call is to `Task.Run()`, this is fine
to do.
### Options
1. Use `SemaphoreSlim` instead of `ReentrantLock`. Note that `SemaphoreSlim`
is not re-entrant, though.
- Safe across await.
- Prevents reentrant calls, which may actually be a feature (forces clear
lock discipline).
- But if you really need recursion/reentrancy, this won’t do.
2. Implement an async-compatible reentrant lock
- This is tricky but possible.
- You’d track:
- Owning Thread or Task.Id
- A recursion count
- A `SemaphoreSlim` for waiting
- On EnterAsync, if the current context already owns the lock, increment
recursion count and continue immediately. Otherwise, wait.
- On Exit, decrement recursion count and release only when it reaches 0.
Using `SemaphoreSlim` is certainly easier, but I have no idea whether
reentrancy is actually required by the design, or if that is just something
that happened by accident because `ReentrantLock` was the primitive that was
chosen for synchronization. It would be possible to refactor the design to
provide protected methods that are not protected by `SemaphoreSlim` so
subclasses can avoid reentrancy. It would be harder to implement subclasses,
but possible. I am not sure whether that covers general callers, though.
In the long run, implementing an `AsyncReentrantLock` could prove to be very
useful for converting other parts of Lucene to be async, and would cause less
pain for places where it is unknown whether reentrancy is a requirement.
Here is what such an implementation could look like. Note that I haven't
tested this, it is straight out of ChatGPT. There may be a smarter way to do it
than using a lock, but if we keep it, we would need to change `lock (this)` to
`UninterruptableMonitor.Enter()` and `UninterruptableMonitor.Exit()` in a
try/catch, to ensure it doesn't throw on thread interrupt.
<details>
<summary>AsyncReentrantLock - click to expand</summary>
```c#
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
/// <summary>
/// A lightweight async-compatible reentrant lock.
/// </summary>
public sealed class AsyncReentrantLock
{
private readonly SemaphoreSlim _semaphore = new(1, 1);
// Owner + recursion count
private int? _ownerThreadId;
private int _recursionCount;
// Track waiting requests
private int _waitingCount;
/// <summary> Number of threads/tasks currently waiting for the lock.
</summary>
public int WaitingCount => Volatile.Read(ref _waitingCount);
/// <summary> True if the lock is currently held. </summary>
public bool IsLocked => _ownerThreadId != null;
/// <summary> Thread ID of the owner, or null if none. </summary>
public int? OwnerThreadId => _ownerThreadId;
/// <summary> Recursion depth of the current owner. </summary>
public int RecursionCount => _recursionCount;
// -----------------------
// Sync API
// -----------------------
public void Lock(CancellationToken cancellationToken = default)
{
int currentId = Thread.CurrentThread.ManagedThreadId;
lock (this)
{
if (_ownerThreadId == currentId)
{
_recursionCount++;
return;
}
}
Interlocked.Increment(ref _waitingCount);
try
{
_semaphore.Wait(cancellationToken);
}
finally
{
Interlocked.Decrement(ref _waitingCount);
}
lock (this)
{
_ownerThreadId = currentId;
_recursionCount = 1;
}
}
// -----------------------
// Async API
// -----------------------
public async Task LockAsync(CancellationToken cancellationToken =
default)
{
int currentId = Thread.CurrentThread.ManagedThreadId;
lock (this)
{
if (_ownerThreadId == currentId)
{
_recursionCount++;
return;
}
}
Interlocked.Increment(ref _waitingCount);
try
{
await
_semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
}
finally
{
Interlocked.Decrement(ref _waitingCount);
}
lock (this)
{
_ownerThreadId = currentId;
_recursionCount = 1;
}
}
// -----------------------
// Unlock
// -----------------------
public void Unlock()
{
int currentId = Thread.CurrentThread.ManagedThreadId;
lock (this)
{
if (_ownerThreadId != currentId)
throw new SynchronizationLockException("Current thread does
not own the lock.");
_recursionCount--;
if (_recursionCount == 0)
{
_ownerThreadId = null;
_semaphore.Release();
}
}
}
}
```
</details>
<details>
<summary>AsyncReentrantLockTests - click to expand</summary>
```c#
using NUnit.Framework;
using System;
using System.Threading;
using System.Threading.Tasks;
[TestFixture]
public class AsyncReentrantLockTests
{
[Test]
public void SyncLock_IsReentrant()
{
var l = new AsyncReentrantLock();
l.Lock();
Assert.That(l.IsLocked, Is.True);
Assert.That(l.RecursionCount, Is.EqualTo(1));
l.Lock();
Assert.That(l.RecursionCount, Is.EqualTo(2));
l.Unlock();
Assert.That(l.RecursionCount, Is.EqualTo(1));
l.Unlock();
Assert.That(l.IsLocked, Is.False);
}
[Test]
public async Task AsyncLock_IsReentrant()
{
var l = new AsyncReentrantLock();
await l.LockAsync();
Assert.That(l.IsLocked, Is.True);
await l.LockAsync();
Assert.That(l.RecursionCount, Is.EqualTo(2));
l.Unlock();
Assert.That(l.RecursionCount, Is.EqualTo(1));
l.Unlock();
Assert.That(l.IsLocked, Is.False);
}
[Test]
public void ThrowsIfUnlockedByOtherThread()
{
var l = new AsyncReentrantLock();
l.Lock();
var ex = Assert.Throws<SynchronizationLockException>(() =>
{
var t = new Thread(() => l.Unlock());
t.Start();
t.Join();
});
Assert.That(ex.Message, Does.Contain("does not own"));
}
[Test]
public async Task LockAsync_BlocksUntilReleased()
{
var l = new AsyncReentrantLock();
l.Lock();
var task = Task.Run(async () =>
{
await Task.Delay(100);
l.Unlock();
});
await l.LockAsync(); // should block until Unlock is called
Assert.That(l.IsLocked, Is.True);
l.Unlock();
await task;
}
[Test]
public async Task WaitingCount_IsAccurate()
{
var l = new AsyncReentrantLock();
l.Lock();
var t1 = l.LockAsync();
var t2 = l.LockAsync();
await Task.Delay(50); // let them enqueue
Assert.That(l.WaitingCount, Is.EqualTo(2));
l.Unlock(); // release once
await Task.WhenAll(t1, t2);
l.Unlock(); // release final owner
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]