NightOwl888 commented on code in PR #1182:
URL: https://github.com/apache/lucenenet/pull/1182#discussion_r2337789009


##########
src/Lucene.Net.Replicator/Http/HttpClientBase.cs:
##########
@@ -217,6 +240,31 @@ private string QueryString(string request, params string[] 
parameters)
                 .Join("&", 
parameters.Select(WebUtility.UrlEncode).InPairs((key, val) => 
string.Format("{0}={1}", key, val))));
         }
 
+        // Add this property so subclasses can access the HttpClient instance
+        protected HttpClient Client => httpc;
+
+        // BuildUrl helpers (mirror the QueryString overloads)
+        protected virtual string BuildUrl(string action, string[] parameters)

Review Comment:
   I am curious why are we wrapping the private `QueryString()` method and 
making it protected, here?



##########
src/Lucene.Net.Replicator/Http/HttpClientBase.cs:
##########
@@ -217,6 +240,31 @@ private string QueryString(string request, params string[] 
parameters)
                 .Join("&", 
parameters.Select(WebUtility.UrlEncode).InPairs((key, val) => 
string.Format("{0}={1}", key, val))));
         }
 
+        // Add this property so subclasses can access the HttpClient instance
+        protected HttpClient Client => httpc;
+
+        // BuildUrl helpers (mirror the QueryString overloads)
+        protected virtual string BuildUrl(string action, string[] parameters)
+        {
+            // QueryString has signature: QueryString(string request, params 
string[] parameters)
+            return QueryString(action, parameters);
+        }
+
+        protected virtual string BuildUrl(

Review Comment:
   I am curious why are we wrapping the private `QueryString()` method and 
making it protected, here?



##########
src/Lucene.Net.Replicator/Http/HttpClientBase.cs:
##########
@@ -262,6 +310,37 @@ public virtual Stream 
GetResponseStream(HttpResponseMessage response, bool consu
             return result;
         }
 
+        /// <summary>

Review Comment:
   Note that above in `GetResponseStream()`, in `net8.0` there is now a 
synchronous overload of `ReadAsStream()` that we should be calling here, 
instead.
   
   Please add a new feature `FEATURE_HTTPCONTENT_READASSTREAM` to the root 
`Directory.Build.targets`.



##########
src/Lucene.Net.Replicator/ReplicationClient.cs:
##########
@@ -151,6 +155,20 @@ public ReplicationClient(IReplicator replicator, 
IReplicationHandler handler, IS
             this.factory = factory;
         }
 
+        /// <summary>
+        /// Constructor for async replicators.
+        /// </summary>
+        /// <param name="asyncReplicator"></param>
+        /// <param name="handler"></param>
+        /// <param name="factory"></param>
+        /// <exception cref="ArgumentNullException"></exception>
+        public ReplicationClient(IAsyncReplicator asyncReplicator, 
IReplicationHandler handler, ISourceDirectoryFactory factory)

Review Comment:
   I suppose it is unlikely a user will require both async and synchronous 
methods at the same time. And it would clarify the `StartAsyncUpdateLoop` vs 
`StartUpdateThread()` logic. Although, I think both of those could be 
potentially combined.
   
   I am hesitant to get on board with having completely separate 
implementations, though. That is not typically how concrete components in the 
BCL evolve. Usually, the synchronous and asynchronous methods exist side by 
side.



##########
src/Lucene.Net.Replicator/Http/HttpClientBase.cs:
##########
@@ -262,6 +310,37 @@ public virtual Stream 
GetResponseStream(HttpResponseMessage response, bool consu
             return result;
         }
 
+        /// <summary>
+        /// Internal utility: input stream of the provided response 
asynchronously.
+        /// </summary>
+        /// <exception cref="IOException"></exception>
+        public virtual async Task<Stream> 
GetResponseStreamAsync(HttpResponseMessage response, CancellationToken 
cancellationToken = default)
+        {
+        #if NET8_0_OR_GREATER

Review Comment:
   Please add a new feature 
`FEATURE_HTTPCONTENT_READASSTREAM_CANCELLATIONTOKEN` to the root 
`Directory.Build.targets` instead of using `NET8_0_OR_GREATER`.



##########
src/Lucene.Net.Replicator/Http/HttpClientBase.cs:
##########
@@ -262,6 +310,37 @@ public virtual Stream 
GetResponseStream(HttpResponseMessage response, bool consu
             return result;
         }
 
+        /// <summary>
+        /// Internal utility: input stream of the provided response 
asynchronously.
+        /// </summary>
+        /// <exception cref="IOException"></exception>
+        public virtual async Task<Stream> 
GetResponseStreamAsync(HttpResponseMessage response, CancellationToken 
cancellationToken = default)
+        {
+        #if NET8_0_OR_GREATER
+            Stream result = await 
response.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false);
+        #else
+            Stream result = await 
response.Content.ReadAsStreamAsync().ConfigureAwait(false);
+        #endif
+            return result;
+        }
+
+        /// <summary>
+        /// Internal utility: input stream of the provided response 
asynchronously, which optionally
+        /// consumes the response's resources when the input stream is 
exhausted.
+        /// </summary>
+        /// <exception cref="IOException"></exception>
+        public virtual async Task<Stream> 
GetResponseStreamAsync(HttpResponseMessage response, bool consume, 
CancellationToken cancellationToken = default)
+        {
+        #if NET8_0_OR_GREATER

Review Comment:
   Please add a new feature 
`FEATURE_HTTPCONTENT_READASSTREAM_CANCELLATIONTOKEN` to the root 
`Directory.Build.targets` instead of using `NET8_0_OR_GREATER`.



##########
src/Lucene.Net.Replicator/Http/HttpClientBase.cs:
##########
@@ -200,6 +200,29 @@ protected virtual HttpResponseMessage ExecuteGet(string 
request, params string[]
             return Execute(req);
         }
 
+        /// <summary>
+        /// Execute a GET request asynchronously with an array of parameters.
+        /// </summary>
+        protected Task<HttpResponseMessage> ExecuteGetAsync(string action, 
string[] parameters, CancellationToken cancellationToken)

Review Comment:
   Perhaps we should fix `ExecutePost()` in this PR and roll #1176 in with 
this? We will just have to add an `ExecutePostAsync()` method later, anyway.



##########
src/Lucene.Net.Replicator/ReplicationClient.cs:
##########
@@ -468,6 +695,25 @@ public virtual void UpdateNow()
             }
         }
 
+        /// <summary>
+        /// Executes the update operation asynchronously immediately, 
regardless if an update thread is running or not.
+        /// </summary>
+        public virtual async Task UpdateNowAsync(CancellationToken 
cancellationToken = default)
+        {
+            EnsureOpen();
+
+            // Acquire the same update lock to prevent concurrent updates
+            updateLock.Lock();

Review Comment:
   According to ChatGPT, no this is not. Calling async code while using a 
synchronous lock can lead to deadlocks.
   
   Do note that in other places where the call is to `Task.Run()`, this is fine 
to do.
   
   ### Options
   
   1. Use `SemaphoreSlim` instead of `ReentrantLock`. Note that `SemaphoreSlim` 
is not re-entrant, though.
      - Safe across await.
      - Prevents reentrant calls, which may actually be a feature (forces clear 
lock discipline).
      - But if you really need recursion/reentrancy, this won’t do.
   2. Implement an async-compatible reentrant lock
      - This is tricky but possible.
      - You’d track:
         - Owning Thread or Task.Id
         - A recursion count
         - A `SemaphoreSlim` for waiting
      - On EnterAsync, if the current context already owns the lock, increment 
recursion count and continue immediately. Otherwise, wait.
      - On Exit, decrement recursion count and release only when it reaches 0.
   
   Using `SemaphoreSlim` is certainly easier, but I have no idea whether 
reentrancy is actually required by the design, or if that is just something 
that happened by accident because `ReentrantLock` was the primitive that was 
chosen for synchronization. It would be possible to refactor the design to 
provide protected methods that are not protected by `SemaphoreSlim` so 
subclasses can avoid reentrancy. It would be harder to implement subclasses, 
but possible. I am not sure whether that covers general callers, though.
   
   In the long run, implementing an `AsyncReentrantLock` could prove to be very 
useful for converting other parts of Lucene to be async, and would cause less 
pain for places where it is unknown whether reentrancy is a requirement. 
   
   Here is what such an implementation could look like. Note that I haven't 
tested this, it is straight out of ChatGPT. There may be a smarter way to do it 
than using a lock, but if we keep it, we would need to change `lock (this)` to 
`UninterruptableMonitor.Enter()` and `UninterruptableMonitor.Exit()` in a 
try/catch, to ensure it doesn't throw on thread interrupt.
   
   <details>
   
   <summary>AsyncReentrantLock - click to expand</summary>
   
   ```c#
   using System;
   using System.Collections.Concurrent;
   using System.Collections.Generic;
   using System.Threading;
   using System.Threading.Tasks;
   
   /// <summary>
   /// A lightweight async-compatible reentrant lock.
   /// </summary>
   public sealed class AsyncReentrantLock
   {
       private readonly SemaphoreSlim _semaphore = new(1, 1);
   
       // Owner + recursion count
       private int? _ownerThreadId;
       private int _recursionCount;
   
       // Track waiting requests
       private int _waitingCount;
   
       /// <summary> Number of threads/tasks currently waiting for the lock. 
</summary>
       public int WaitingCount => Volatile.Read(ref _waitingCount);
   
       /// <summary> True if the lock is currently held. </summary>
       public bool IsLocked => _ownerThreadId != null;
   
       /// <summary> Thread ID of the owner, or null if none. </summary>
       public int? OwnerThreadId => _ownerThreadId;
   
       /// <summary> Recursion depth of the current owner. </summary>
       public int RecursionCount => _recursionCount;
   
       // -----------------------
       // Sync API
       // -----------------------
       public void Lock(CancellationToken cancellationToken = default)
       {
           int currentId = Thread.CurrentThread.ManagedThreadId;
   
           lock (this)
           {
               if (_ownerThreadId == currentId)
               {
                   _recursionCount++;
                   return;
               }
           }
   
           Interlocked.Increment(ref _waitingCount);
           try
           {
               _semaphore.Wait(cancellationToken);
           }
           finally
           {
               Interlocked.Decrement(ref _waitingCount);
           }
   
           lock (this)
           {
               _ownerThreadId = currentId;
               _recursionCount = 1;
           }
       }
   
       // -----------------------
       // Async API
       // -----------------------
       public async Task LockAsync(CancellationToken cancellationToken = 
default)
       {
           int currentId = Thread.CurrentThread.ManagedThreadId;
   
           lock (this)
           {
               if (_ownerThreadId == currentId)
               {
                   _recursionCount++;
                   return;
               }
           }
   
           Interlocked.Increment(ref _waitingCount);
           try
           {
               await 
_semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
           }
           finally
           {
               Interlocked.Decrement(ref _waitingCount);
           }
   
           lock (this)
           {
               _ownerThreadId = currentId;
               _recursionCount = 1;
           }
       }
   
       // -----------------------
       // Unlock
       // -----------------------
       public void Unlock()
       {
           int currentId = Thread.CurrentThread.ManagedThreadId;
   
           lock (this)
           {
               if (_ownerThreadId != currentId)
                   throw new SynchronizationLockException("Current thread does 
not own the lock.");
   
               _recursionCount--;
   
               if (_recursionCount == 0)
               {
                   _ownerThreadId = null;
                   _semaphore.Release();
               }
           }
       }
   }
   ```
   
   </details>
   
   <details>
   
   <summary>AsyncReentrantLockTests - click to expand</summary>
   
   ```c#
   using NUnit.Framework;
   using System;
   using System.Threading;
   using System.Threading.Tasks;
   
   [TestFixture]
   public class AsyncReentrantLockTests
   {
       [Test]
       public void SyncLock_IsReentrant()
       {
           var l = new AsyncReentrantLock();
   
           l.Lock();
           Assert.That(l.IsLocked, Is.True);
           Assert.That(l.RecursionCount, Is.EqualTo(1));
   
           l.Lock();
           Assert.That(l.RecursionCount, Is.EqualTo(2));
   
           l.Unlock();
           Assert.That(l.RecursionCount, Is.EqualTo(1));
   
           l.Unlock();
           Assert.That(l.IsLocked, Is.False);
       }
   
       [Test]
       public async Task AsyncLock_IsReentrant()
       {
           var l = new AsyncReentrantLock();
   
           await l.LockAsync();
           Assert.That(l.IsLocked, Is.True);
   
           await l.LockAsync();
           Assert.That(l.RecursionCount, Is.EqualTo(2));
   
           l.Unlock();
           Assert.That(l.RecursionCount, Is.EqualTo(1));
   
           l.Unlock();
           Assert.That(l.IsLocked, Is.False);
       }
   
       [Test]
       public void ThrowsIfUnlockedByOtherThread()
       {
           var l = new AsyncReentrantLock();
           l.Lock();
   
           var ex = Assert.Throws<SynchronizationLockException>(() =>
           {
               var t = new Thread(() => l.Unlock());
               t.Start();
               t.Join();
           });
   
           Assert.That(ex.Message, Does.Contain("does not own"));
       }
   
       [Test]
       public async Task LockAsync_BlocksUntilReleased()
       {
           var l = new AsyncReentrantLock();
           l.Lock();
   
           var task = Task.Run(async () =>
           {
               await Task.Delay(100);
               l.Unlock();
           });
   
           await l.LockAsync(); // should block until Unlock is called
           Assert.That(l.IsLocked, Is.True);
   
           l.Unlock();
           await task;
       }
   
       [Test]
       public async Task WaitingCount_IsAccurate()
       {
           var l = new AsyncReentrantLock();
           l.Lock();
   
           var t1 = l.LockAsync();
           var t2 = l.LockAsync();
   
           await Task.Delay(50); // let them enqueue
   
           Assert.That(l.WaitingCount, Is.EqualTo(2));
   
           l.Unlock(); // release once
   
           await Task.WhenAll(t1, t2);
   
           l.Unlock(); // release final owner
       }
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to