Copilot commented on code in PR #3286:
URL: https://github.com/apache/brpc/pull/3286#discussion_r3212664914


##########
src/bthread/rwlock.cpp:
##########
@@ -296,69 +407,65 @@ __BEGIN_DECLS
 
 int bthread_rwlock_init(bthread_rwlock_t* __restrict rwlock,
                         const bthread_rwlockattr_t* __restrict) {
-    int rc = bthread_sem_init(&rwlock->reader_sema, 0);
-    if (BAIDU_UNLIKELY(0 != rc)) {
-        return rc;
+    rwlock->writer_wait_count = bthread::butex_create_checked<unsigned>();
+    rwlock->lock_word = bthread::butex_create_checked<unsigned>();
+    if (NULL == rwlock->writer_wait_count || NULL == rwlock->lock_word) {
+        LOG(ERROR) << "Out of memory";
+        return ENOMEM;
     }
-    bthread_sem_disable_csite(&rwlock->reader_sema);
-    rc = bthread_sem_init(&rwlock->writer_sema, 0);
-    if (BAIDU_UNLIKELY(0 != rc)) {
-        bthread_sem_destroy(&rwlock->reader_sema);
-        return rc;
-    }
-    bthread_sem_disable_csite(&rwlock->writer_sema);
-
-    rwlock->reader_count = 0;
-    rwlock->reader_wait = 0;
-    rwlock->wlock_flag = false;
+    *rwlock->writer_wait_count = 0;
+    *rwlock->lock_word = 0;
 
     bthread_mutexattr_t attr;
     bthread_mutexattr_init(&attr);
+    // Disable csite on the inner queue mutex so the writer's wait time is
+    // accounted exactly once -- by the rwlock layer, not double-counted via
+    // the inner mutex.
     bthread_mutexattr_disable_csite(&attr);
-    rc = bthread_mutex_init(&rwlock->write_queue_mutex, &attr);
-    if (BAIDU_UNLIKELY(0 != rc)) {
-        bthread_sem_destroy(&rwlock->reader_sema);
-        bthread_sem_destroy(&rwlock->writer_sema);
-        return rc;
-    }
+    bthread_mutex_init(&rwlock->writer_queue_mutex, &attr);
     bthread_mutexattr_destroy(&attr);

Review Comment:
   bthread_rwlock_init() leaks resources and may report success on failure: if 
only one butex_create_checked() succeeds, the other pointer is leaked; and the 
return value of bthread_mutex_init() is ignored (so init can return 0 even when 
mutex init fails). Please handle partial allocation cleanup, check 
bthread_mutex_init()'s rc, and on failure destroy any created butexes and 
return the error code.



##########
src/bthread/rwlock.cpp:
##########
@@ -296,69 +407,65 @@ __BEGIN_DECLS
 
 int bthread_rwlock_init(bthread_rwlock_t* __restrict rwlock,
                         const bthread_rwlockattr_t* __restrict) {
-    int rc = bthread_sem_init(&rwlock->reader_sema, 0);
-    if (BAIDU_UNLIKELY(0 != rc)) {
-        return rc;
+    rwlock->writer_wait_count = bthread::butex_create_checked<unsigned>();
+    rwlock->lock_word = bthread::butex_create_checked<unsigned>();
+    if (NULL == rwlock->writer_wait_count || NULL == rwlock->lock_word) {
+        LOG(ERROR) << "Out of memory";
+        return ENOMEM;
     }
-    bthread_sem_disable_csite(&rwlock->reader_sema);
-    rc = bthread_sem_init(&rwlock->writer_sema, 0);
-    if (BAIDU_UNLIKELY(0 != rc)) {
-        bthread_sem_destroy(&rwlock->reader_sema);
-        return rc;
-    }
-    bthread_sem_disable_csite(&rwlock->writer_sema);
-
-    rwlock->reader_count = 0;
-    rwlock->reader_wait = 0;
-    rwlock->wlock_flag = false;
+    *rwlock->writer_wait_count = 0;
+    *rwlock->lock_word = 0;
 
     bthread_mutexattr_t attr;
     bthread_mutexattr_init(&attr);
+    // Disable csite on the inner queue mutex so the writer's wait time is
+    // accounted exactly once -- by the rwlock layer, not double-counted via
+    // the inner mutex.
     bthread_mutexattr_disable_csite(&attr);
-    rc = bthread_mutex_init(&rwlock->write_queue_mutex, &attr);
-    if (BAIDU_UNLIKELY(0 != rc)) {
-        bthread_sem_destroy(&rwlock->reader_sema);
-        bthread_sem_destroy(&rwlock->writer_sema);
-        return rc;
-    }
+    bthread_mutex_init(&rwlock->writer_queue_mutex, &attr);
     bthread_mutexattr_destroy(&attr);
 
-    bthread::make_contention_site_invalid(&rwlock->writer_csite);
-
     return 0;
 }
 
 int bthread_rwlock_destroy(bthread_rwlock_t* rwlock) {
-    bthread_sem_destroy(&rwlock->reader_sema);
-    bthread_sem_destroy(&rwlock->writer_sema);
-    bthread_mutex_destroy(&rwlock->write_queue_mutex);
+    bthread::butex_destroy(rwlock->writer_wait_count);
+    bthread::butex_destroy(rwlock->lock_word);
     return 0;

Review Comment:
   bthread_rwlock_destroy() currently destroys the two butexes but never calls 
bthread_mutex_destroy(&writer_queue_mutex). Since bthread_mutex_init allocates 
an internal butex, skipping destroy leaks that memory and may leave internal 
state uncollected. Please destroy writer_queue_mutex as part of rwlock 
destruction (and consider nulling pointers to avoid accidental double-destroy).



##########
src/bthread/rwlock.cpp:
##########
@@ -296,69 +407,65 @@ __BEGIN_DECLS
 
 int bthread_rwlock_init(bthread_rwlock_t* __restrict rwlock,
                         const bthread_rwlockattr_t* __restrict) {
-    int rc = bthread_sem_init(&rwlock->reader_sema, 0);
-    if (BAIDU_UNLIKELY(0 != rc)) {
-        return rc;
+    rwlock->writer_wait_count = bthread::butex_create_checked<unsigned>();
+    rwlock->lock_word = bthread::butex_create_checked<unsigned>();
+    if (NULL == rwlock->writer_wait_count || NULL == rwlock->lock_word) {
+        LOG(ERROR) << "Out of memory";
+        return ENOMEM;
     }
-    bthread_sem_disable_csite(&rwlock->reader_sema);
-    rc = bthread_sem_init(&rwlock->writer_sema, 0);
-    if (BAIDU_UNLIKELY(0 != rc)) {
-        bthread_sem_destroy(&rwlock->reader_sema);
-        return rc;
-    }
-    bthread_sem_disable_csite(&rwlock->writer_sema);
-
-    rwlock->reader_count = 0;
-    rwlock->reader_wait = 0;
-    rwlock->wlock_flag = false;
+    *rwlock->writer_wait_count = 0;
+    *rwlock->lock_word = 0;
 
     bthread_mutexattr_t attr;
     bthread_mutexattr_init(&attr);
+    // Disable csite on the inner queue mutex so the writer's wait time is
+    // accounted exactly once -- by the rwlock layer, not double-counted via
+    // the inner mutex.
     bthread_mutexattr_disable_csite(&attr);
-    rc = bthread_mutex_init(&rwlock->write_queue_mutex, &attr);
-    if (BAIDU_UNLIKELY(0 != rc)) {
-        bthread_sem_destroy(&rwlock->reader_sema);
-        bthread_sem_destroy(&rwlock->writer_sema);
-        return rc;
-    }
+    bthread_mutex_init(&rwlock->writer_queue_mutex, &attr);
     bthread_mutexattr_destroy(&attr);
 
-    bthread::make_contention_site_invalid(&rwlock->writer_csite);
-
     return 0;
 }
 
 int bthread_rwlock_destroy(bthread_rwlock_t* rwlock) {
-    bthread_sem_destroy(&rwlock->reader_sema);
-    bthread_sem_destroy(&rwlock->writer_sema);
-    bthread_mutex_destroy(&rwlock->write_queue_mutex);
+    bthread::butex_destroy(rwlock->writer_wait_count);
+    bthread::butex_destroy(rwlock->lock_word);
     return 0;
 }
 
 int bthread_rwlock_rdlock(bthread_rwlock_t* rwlock) {
-    return bthread::rwlock_rdlock(rwlock);
+    return bthread::rwlock_rdlock(rwlock, false, NULL);
 }
 
 int bthread_rwlock_tryrdlock(bthread_rwlock_t* rwlock) {
-    return bthread::rwlock_tryrdlock(rwlock);
+    return bthread::rwlock_rdlock(rwlock, true, NULL);
 }
 
 int bthread_rwlock_timedrdlock(bthread_rwlock_t* __restrict rwlock,
                                const struct timespec* __restrict abstime) {
-    return bthread::rwlock_timedrdlock(rwlock, abstime);
+    return bthread::rwlock_rdlock(rwlock, false, abstime);
 }
 
 int bthread_rwlock_wrlock(bthread_rwlock_t* rwlock) {
-    return bthread::rwlock_wrlock(rwlock);
+    return bthread::rwlock_wrlock(rwlock, false, NULL);
 }
 
 int bthread_rwlock_trywrlock(bthread_rwlock_t* rwlock) {
-    return bthread::rwlock_trywrlock(rwlock);
+    return bthread::rwlock_wrlock(rwlock, true, NULL);
 }
 
 int bthread_rwlock_timedwrlock(bthread_rwlock_t* __restrict rwlock,
                                const struct timespec* __restrict abstime) {
-    return bthread::rwlock_timedwrlock(rwlock, abstime);
+    return bthread::rwlock_wrlock(rwlock, false, abstime);
+}
+
+int bthread_rwlock_unrlock(bthread_rwlock_t* rwlock) {
+    return bthread::rwlock_unrdlock(rwlock);
+}
+
+int bthread_rwlock_unwlock(bthread_rwlock_t* rwlock) {
+    return bthread::rwlock_unwrlock(rwlock);
 }
 

Review Comment:
   bthread_rwlock_unrlock() / bthread_rwlock_unwlock() are added as extern "C" 
symbols, but they are not declared in public headers (and the names look like 
typos of unrdlock/unwrlock). If these are intended to be public APIs, please 
add declarations + documentation; otherwise, remove them to avoid exporting 
unused/undocumented ABI.
   



##########
src/bthread/types.h:
##########
@@ -225,16 +225,19 @@ typedef struct bthread_sem_t {
 typedef struct bthread_rwlock_t {
 #if defined(__cplusplus)
     bthread_rwlock_t()
-        : reader_count(0), reader_wait(0), wlock_flag(false), writer_csite{} {}
+        : writer_wait_count(0), lock_word(NULL) {}
     DISALLOW_COPY_AND_ASSIGN(bthread_rwlock_t);
 #endif
-    bthread_sem_t reader_sema; // Semaphore for readers to wait for completing 
writers.
-    bthread_sem_t writer_sema; // Semaphore for writers to wait for completing 
readers.
-    int reader_count; // Number of pending readers.
-    int reader_wait; // Number of departing readers.
-    bool wlock_flag; // Flag used to indicate that a write lock has been held.
-    bthread_mutex_t write_queue_mutex; // Held if there are pending writers.
-    bthread_contention_site_t writer_csite;
+    // Count of the bthread who holding write lock yet.
+    unsigned* writer_wait_count;
+    // Held if there are pending writers.
+    bthread_mutex_t writer_queue_mutex;
+    // Bit-packed atomic lock word (used as a butex):
+    //   bit 31  : 1 if the write lock is held, 0 otherwise.
+    //   bit 0~30: number of readers currently holding the read lock.
+    //   0       : unlocked.
+    // The high bit and the low 31 bits are mutually exclusive.
+    unsigned* lock_word;
 } bthread_rwlock_t;

Review Comment:
   This change replaces the fields in bthread_rwlock_t with new pointers/mutex, 
which changes the public struct layout in src/bthread/types.h and is a breaking 
ABI change for any code compiled against an older brpc that allocates 
bthread_rwlock_t. The PR description currently leaves "Breaking backward 
compatibility" blank; please explicitly call out this ABI break (or preserve 
layout if ABI stability is required).



##########
src/bthread/types.h:
##########
@@ -225,16 +225,19 @@ typedef struct bthread_sem_t {
 typedef struct bthread_rwlock_t {
 #if defined(__cplusplus)
     bthread_rwlock_t()
-        : reader_count(0), reader_wait(0), wlock_flag(false), writer_csite{} {}
+        : writer_wait_count(0), lock_word(NULL) {}
     DISALLOW_COPY_AND_ASSIGN(bthread_rwlock_t);
 #endif
-    bthread_sem_t reader_sema; // Semaphore for readers to wait for completing 
writers.
-    bthread_sem_t writer_sema; // Semaphore for writers to wait for completing 
readers.
-    int reader_count; // Number of pending readers.
-    int reader_wait; // Number of departing readers.
-    bool wlock_flag; // Flag used to indicate that a write lock has been held.
-    bthread_mutex_t write_queue_mutex; // Held if there are pending writers.
-    bthread_contention_site_t writer_csite;
+    // Count of the bthread who holding write lock yet.

Review Comment:
   The comment on writer_wait_count is inaccurate and grammatically incorrect: 
this counter tracks all writers "in flight" (waiting on the queue mutex / 
waiting for lock_word==0 / holding the write lock), not "bthread who holding 
write lock". Please update the comment to match the actual semantics to avoid 
future misuse.
   



##########
src/bthread/rwlock.cpp:
##########
@@ -17,273 +17,384 @@
 
 #include "bvar/collector.h"
 #include "bthread/rwlock.h"
+#include "bthread/mutex.h"
 #include "bthread/butex.h"
 
 namespace bthread {
 
-// A `bthread_rwlock_t' is a reader/writer mutual exclusion lock,
-// which is a bthread implementation of golang RWMutex.
-// The lock can be held by an arbitrary number of readers or a single writer.
-// For details, see 
https://github.com/golang/go/blob/master/src/sync/rwmutex.go
-
-// Define in bthread/mutex.cpp
+// Defined in bthread/mutex.cpp; reused here so that bthread_rwlock_t
+// participates in the global ContentionProfiler just like bthread_mutex_t
+// and bthread_sem_t.
 class ContentionProfiler;
 extern ContentionProfiler* g_cp;
 extern bvar::CollectorSpeedLimit g_cp_sl;
-extern bool is_contention_site_valid(const bthread_contention_site_t& cs);
-extern void make_contention_site_invalid(bthread_contention_site_t* cs);
 extern void submit_contention(const bthread_contention_site_t& csite, int64_t 
now_ns);
 
-// It is enough for readers. If the reader exceeds this value,
-// need to use `int64_t' instead of `int'.
-const int RWLockMaxReaders = 1 << 30;
-
-// For reading.
-static int rwlock_rdlock_impl(bthread_rwlock_t* __restrict rwlock,
-                              const struct timespec* __restrict abstime) {
-    int reader_count = ((butil::atomic<int>*)&rwlock->reader_count)
-        ->fetch_add(1, butil::memory_order_acquire) + 1;
-    // Fast path.
-    if (reader_count >= 0) {
-        CHECK_LT(reader_count, RWLockMaxReaders);
-        return 0;
-    }
-
-    // Slow path.
+// Lazily arm sampling on first contention. Caller must declare
+// `size_t sampling_range' and `int64_t start_ns' in scope:
+//   start_ns ==  0 -> not yet decided
+//   start_ns == -1 -> decided NOT to sample (profiler off / not selected)
+//   start_ns  >  0 -> sampling armed; value is the wall-clock start time
+#define BTHREAD_RWLOCK_MAYBE_START_SAMPLING                                    
   \
+    do {                                                                       
   \
+        if (start_ns == 0) {                                                   
   \
+            if (g_cp != NULL) {                                                
   \
+                sampling_range = bvar::is_collectable(&g_cp_sl);               
   \
+                start_ns = bvar::is_sampling_range_valid(sampling_range) ?     
   \
+                    butil::cpuwide_time_ns() : -1;                             
   \
+            } else {                                                           
   \
+                start_ns = -1;                                                 
   \
+            }                                                                  
   \
+        }                                                                      
   \
+    } while (0)
 
-    // Don't sample when contention profiler is off.
-    if (NULL == bthread::g_cp) {
-        return bthread_sem_timedwait(&rwlock->reader_sema, abstime);
-    }
-    // Ask Collector if this (contended) locking should be sampled.
-    const size_t sampling_range = bvar::is_collectable(&bthread::g_cp_sl);
-    if (!bvar::is_sampling_range_valid(sampling_range)) { // Don't sample.
-        return bthread_sem_timedwait(&rwlock->reader_sema, abstime);
+// Submit one contention sample if sampling was armed for this attempt.
+// `start_ns > 0' is the convention used everywhere in this file to indicate
+// that BTHREAD_RWLOCK_MAYBE_START_SAMPLING actually decided to sample.
+// No-op otherwise. Force-inlined so the uncontended fast path stays cheap.
+static BUTIL_FORCE_INLINE void submit_contention_if_sampled(
+        int64_t start_ns, size_t sampling_range) {
+    if (start_ns > 0) {
+        const int64_t end_ns = butil::cpuwide_time_ns();
+        const bthread_contention_site_t csite{end_ns - start_ns, 
sampling_range};
+        submit_contention(csite, end_ns);
     }
-
-    // Sample.
-    const int64_t start_ns = butil::cpuwide_time_ns();
-    int rc = bthread_sem_timedwait(&rwlock->reader_sema, abstime);
-    const int64_t end_ns = butil::cpuwide_time_ns();
-    const bthread_contention_site_t csite{end_ns - start_ns, sampling_range};
-    // Submit `csite' for each reader immediately after
-    // owning rdlock to avoid the contention of `csite'.
-    bthread::submit_contention(csite, end_ns);
-
-    return rc;
-}
-
-static inline int rwlock_rdlock(bthread_rwlock_t* rwlock) {
-    return rwlock_rdlock_impl(rwlock, NULL);
 }
 
-static inline int rwlock_timedrdlock(bthread_rwlock_t* __restrict rwlock,
-                                     const struct timespec* __restrict 
abstime) {
-    return rwlock_rdlock_impl(rwlock, abstime);
-}
+// bthread RWLock
+// writer-priority implementation overview
+// Three synchronization fields are used:
+//
+//   * `lock_word' (32-bit butex):
+//       bit 31  : 1 if the write lock is held, 0 otherwise.
+//       bit 0~30: number of readers currently holding the read lock.
+//       Mutually exclusive: when bit 31 is set, the lower 31 bits are 0.
+//
+//   * `writer_wait_count' (32-bit butex):
+//       Number of writers that have entered wrlock() but not yet finished
+//       (i.e. currently waiting for the mutex / waiting for lock_word==0 /
+//       holding the write lock). Each writer accounts for itself: it is
+//       incremented at the very beginning of wrlock() and decremented at
+//       the very end of unwrlock()/cleanup().
+//       Readers consult this field to implement writer-priority: if any
+//       writer is "in flight", new readers yield by waiting on it.
+//
+//   * `writer_queue_mutex' (bthread_mutex_t):
+//       Serializes writers so that at most one writer races for `lock_word'
+//       at any time. Other writers queue up on this mutex.
+//
+// Wakeup channels:
+//   * Readers waiting on writers   -> wait on  writer_wait_count, woken by 
unwrlock/cleanup
+//   * Writers waiting on readers   -> wait on  lock_word, woken by unrdlock
+//   * Writers waiting on writers   -> wait on  writer_queue_mutex
+
+int rwlock_rdlock(bthread_rwlock_t* rwlock, bool try_lock,
+                  const struct timespec* abstime) {

Review Comment:
   The new helper functions 
rwlock_rdlock/rwlock_unrdlock/rwlock_wrlock/rwlock_unwrlock/rwlock_unlock now 
have external linkage (they are not static and are not in an unnamed 
namespace). Since these are internal implementation details only used by this 
TU via the C wrappers, consider making them static (or moving into an unnamed 
namespace) to avoid exporting unintended C++ symbols and reduce ABI surface.
   



##########
src/bthread/rwlock.cpp:
##########
@@ -17,273 +17,384 @@
 
 #include "bvar/collector.h"
 #include "bthread/rwlock.h"
+#include "bthread/mutex.h"
 #include "bthread/butex.h"
 
 namespace bthread {
 
-// A `bthread_rwlock_t' is a reader/writer mutual exclusion lock,
-// which is a bthread implementation of golang RWMutex.
-// The lock can be held by an arbitrary number of readers or a single writer.
-// For details, see 
https://github.com/golang/go/blob/master/src/sync/rwmutex.go
-
-// Define in bthread/mutex.cpp
+// Defined in bthread/mutex.cpp; reused here so that bthread_rwlock_t
+// participates in the global ContentionProfiler just like bthread_mutex_t
+// and bthread_sem_t.
 class ContentionProfiler;
 extern ContentionProfiler* g_cp;
 extern bvar::CollectorSpeedLimit g_cp_sl;
-extern bool is_contention_site_valid(const bthread_contention_site_t& cs);
-extern void make_contention_site_invalid(bthread_contention_site_t* cs);
 extern void submit_contention(const bthread_contention_site_t& csite, int64_t 
now_ns);
 
-// It is enough for readers. If the reader exceeds this value,
-// need to use `int64_t' instead of `int'.
-const int RWLockMaxReaders = 1 << 30;
-
-// For reading.
-static int rwlock_rdlock_impl(bthread_rwlock_t* __restrict rwlock,
-                              const struct timespec* __restrict abstime) {
-    int reader_count = ((butil::atomic<int>*)&rwlock->reader_count)
-        ->fetch_add(1, butil::memory_order_acquire) + 1;
-    // Fast path.
-    if (reader_count >= 0) {
-        CHECK_LT(reader_count, RWLockMaxReaders);
-        return 0;
-    }
-
-    // Slow path.
+// Lazily arm sampling on first contention. Caller must declare
+// `size_t sampling_range' and `int64_t start_ns' in scope:
+//   start_ns ==  0 -> not yet decided
+//   start_ns == -1 -> decided NOT to sample (profiler off / not selected)
+//   start_ns  >  0 -> sampling armed; value is the wall-clock start time
+#define BTHREAD_RWLOCK_MAYBE_START_SAMPLING                                    
   \
+    do {                                                                       
   \
+        if (start_ns == 0) {                                                   
   \
+            if (g_cp != NULL) {                                                
   \
+                sampling_range = bvar::is_collectable(&g_cp_sl);               
   \
+                start_ns = bvar::is_sampling_range_valid(sampling_range) ?     
   \
+                    butil::cpuwide_time_ns() : -1;                             
   \
+            } else {                                                           
   \
+                start_ns = -1;                                                 
   \
+            }                                                                  
   \
+        }                                                                      
   \
+    } while (0)
 
-    // Don't sample when contention profiler is off.
-    if (NULL == bthread::g_cp) {
-        return bthread_sem_timedwait(&rwlock->reader_sema, abstime);
-    }
-    // Ask Collector if this (contended) locking should be sampled.
-    const size_t sampling_range = bvar::is_collectable(&bthread::g_cp_sl);
-    if (!bvar::is_sampling_range_valid(sampling_range)) { // Don't sample.
-        return bthread_sem_timedwait(&rwlock->reader_sema, abstime);
+// Submit one contention sample if sampling was armed for this attempt.
+// `start_ns > 0' is the convention used everywhere in this file to indicate
+// that BTHREAD_RWLOCK_MAYBE_START_SAMPLING actually decided to sample.
+// No-op otherwise. Force-inlined so the uncontended fast path stays cheap.
+static BUTIL_FORCE_INLINE void submit_contention_if_sampled(
+        int64_t start_ns, size_t sampling_range) {
+    if (start_ns > 0) {
+        const int64_t end_ns = butil::cpuwide_time_ns();
+        const bthread_contention_site_t csite{end_ns - start_ns, 
sampling_range};
+        submit_contention(csite, end_ns);
     }
-
-    // Sample.
-    const int64_t start_ns = butil::cpuwide_time_ns();
-    int rc = bthread_sem_timedwait(&rwlock->reader_sema, abstime);
-    const int64_t end_ns = butil::cpuwide_time_ns();
-    const bthread_contention_site_t csite{end_ns - start_ns, sampling_range};
-    // Submit `csite' for each reader immediately after
-    // owning rdlock to avoid the contention of `csite'.
-    bthread::submit_contention(csite, end_ns);
-
-    return rc;
-}
-
-static inline int rwlock_rdlock(bthread_rwlock_t* rwlock) {
-    return rwlock_rdlock_impl(rwlock, NULL);
 }
 
-static inline int rwlock_timedrdlock(bthread_rwlock_t* __restrict rwlock,
-                                     const struct timespec* __restrict 
abstime) {
-    return rwlock_rdlock_impl(rwlock, abstime);
-}
+// bthread RWLock
+// writer-priority implementation overview
+// Three synchronization fields are used:
+//
+//   * `lock_word' (32-bit butex):
+//       bit 31  : 1 if the write lock is held, 0 otherwise.
+//       bit 0~30: number of readers currently holding the read lock.
+//       Mutually exclusive: when bit 31 is set, the lower 31 bits are 0.
+//
+//   * `writer_wait_count' (32-bit butex):
+//       Number of writers that have entered wrlock() but not yet finished
+//       (i.e. currently waiting for the mutex / waiting for lock_word==0 /
+//       holding the write lock). Each writer accounts for itself: it is
+//       incremented at the very beginning of wrlock() and decremented at
+//       the very end of unwrlock()/cleanup().
+//       Readers consult this field to implement writer-priority: if any
+//       writer is "in flight", new readers yield by waiting on it.
+//
+//   * `writer_queue_mutex' (bthread_mutex_t):
+//       Serializes writers so that at most one writer races for `lock_word'
+//       at any time. Other writers queue up on this mutex.
+//
+// Wakeup channels:
+//   * Readers waiting on writers   -> wait on  writer_wait_count, woken by 
unwrlock/cleanup
+//   * Writers waiting on readers   -> wait on  lock_word, woken by unrdlock
+//   * Writers waiting on writers   -> wait on  writer_queue_mutex
+
+int rwlock_rdlock(bthread_rwlock_t* rwlock, bool try_lock,
+                  const struct timespec* abstime) {
+    auto lock_word = (butil::atomic<unsigned>*)rwlock->lock_word;
+    auto writer_wait_count = 
(butil::atomic<unsigned>*)rwlock->writer_wait_count;
+
+    // Sampling state for the contention profiler (lazily armed on first
+    // contention so that the uncontended fast path stays cheap):
+    //   start_ns  == 0  -> not yet decided
+    //   start_ns  == -1 -> decided NOT to sample
+    //   start_ns  >  0  -> sampling armed; submit on exit
+    // Each reader samples independently and submits once on its own way out;
+    // we deliberately do NOT use rwlock->writer_csite here because that field
+    // is exclusively owned by the writer.
+    size_t sampling_range = bvar::INVALID_SAMPLING_RANGE;
+    int64_t start_ns = 0;
+    int rc = 0;
 
-// Returns 0 if the lock was acquired, otherwise errno.
-static  inline int rwlock_tryrdlock(bthread_rwlock_t* rwlock) {
     while (true) {
-        int reader_count = ((butil::atomic<int>*)&rwlock->reader_count)
-            ->load(butil::memory_order_relaxed);
-        if (reader_count < 0) {
-            // Failed to acquire the read lock because there is a writer.
-            return EBUSY;
-        }
-        if (((butil::atomic<int>*)&rwlock->reader_count)
-                ->compare_exchange_weak(reader_count, reader_count + 1,
-                                        butil::memory_order_acquire,
-                                        butil::memory_order_relaxed)) {
-            return 0;
+        // Writer-priority: if any writer is in flight, yield to it.
+        // `relaxed' is sufficient here because:
+        //   - There is no payload data published via writer_wait_count; data
+        //     visibility is established via the acquire-CAS on `lock_word' 
below
+        //     paired with the release-CAS in unwrlock().
+        //   - butex_wait() will re-check the expected value before sleeping,
+        //     so we cannot lose a wakeup even if `w' is slightly stale.
+        unsigned w = writer_wait_count->load(butil::memory_order_relaxed);
+        if (w > 0) {
+            if (try_lock) {
+                // Don't sample tryrdlock failures: they are by design a
+                // non-blocking probe, not a contention event.
+                return EBUSY;
+            }
+            // We are about to block on writer_wait_count; arm sampling
+            // before parking so the wait time is included in the report.
+            BTHREAD_RWLOCK_MAYBE_START_SAMPLING;
+            if (butex_wait(writer_wait_count, w, abstime) < 0 &&
+                errno != EWOULDBLOCK && errno != EINTR) {
+                rc = errno;
+                break;
+            }
+            continue;
         }
-    }
-}
 
-static inline int rwlock_unrdlock(bthread_rwlock_t* rwlock) {
-    int reader_count = ((butil::atomic<int>*)&rwlock->reader_count)
-        ->fetch_add(-1, butil::memory_order_relaxed) - 1;
-    // Fast path.
-    if (reader_count >= 0) {
-        return 0;
+        // No writer in flight: try to add ourselves to the reader count.
+        // 2^31 readers should be enough for any realistic workload.
+        unsigned l = lock_word->load(butil::memory_order_relaxed);
+        if ((l >> 31) == 0) {
+            // Acquire on success synchronizes-with the release-CAS in
+            // unwrlock(), so any data written by the previous writer is
+            // visible to us before we start reading.
+            if (lock_word->compare_exchange_weak(l, l + 1,
+                                                 butil::memory_order_acquire,
+                                                 butil::memory_order_relaxed)) 
{
+                rc = 0;
+                break;
+            }
+            // CAS failed (likely another reader bumped r): retry.
+        } else if (try_lock) {
+            // Write lock is currently held.
+            return EBUSY;
+        } else {
+            // Write lock currently held but not yet self-accounted as a
+            // pending writer (very narrow window inside wrlock). Arm
+            // sampling now so the spin/wait until writer_wait_count >= 1
+            // is also accounted for.
+            BTHREAD_RWLOCK_MAYBE_START_SAMPLING;
+        }
+        // Otherwise (write lock held but not try_lock): spin once more.
+        // The next iteration will observe writer_wait_count >= 1 (writers
+        // self-account in writer_wait_count for the entire wrlock lifetime),
+        // and we will block on it instead of busy spinning.
     }
-    // Slow path.
 
-    if (BAIDU_UNLIKELY(reader_count + 1 == 0 || reader_count + 1 == 
-RWLockMaxReaders)) {
-        CHECK(false) << "rwlock_unrdlock of unlocked rwlock";
-        return EINVAL;
-    }
+    // Submit one contention sample for this reader (success or failure).
+    submit_contention_if_sampled(start_ns, sampling_range);
+    return rc;
+}
 
-    // A writer is pending.
-    int reader_wait = ((butil::atomic<int>*)&rwlock->reader_wait)
-        ->fetch_add(-1, butil::memory_order_relaxed) - 1;
-    if (reader_wait != 0) {
+int rwlock_unrdlock(bthread_rwlock_t* rwlock) {
+    auto lock_word = (butil::atomic<unsigned>*)rwlock->lock_word;
+    while (true) {
+        unsigned l = lock_word->load(butil::memory_order_relaxed);
+        // Misuse detection: the caller must currently hold a read lock.
+        // r == 0           -> no lock is held (double unlock?)
+        // (r >> 31) != 0   -> write lock is held, not read lock
+        if (l == 0 || (l >> 31) != 0) {
+            LOG(ERROR) << "Invalid unrlock!";
+            return -1;

Review Comment:
   rwlock_unrdlock() reports misuse by returning -1 and logging "Invalid 
unrlock!". The public rwlock APIs in this file otherwise return 0 or a positive 
errno value (e.g. EINVAL/EBUSY/ETIMEDOUT). Returning -1 breaks that contract 
and makes error handling inconsistent for callers. Please return a proper errno 
(likely EINVAL) and consider improving the log message (typo: "unrlock" vs 
"unrdlock").
   



##########
src/bthread/rwlock.cpp:
##########
@@ -17,273 +17,384 @@
 
 #include "bvar/collector.h"
 #include "bthread/rwlock.h"
+#include "bthread/mutex.h"
 #include "bthread/butex.h"
 
 namespace bthread {
 
-// A `bthread_rwlock_t' is a reader/writer mutual exclusion lock,
-// which is a bthread implementation of golang RWMutex.
-// The lock can be held by an arbitrary number of readers or a single writer.
-// For details, see 
https://github.com/golang/go/blob/master/src/sync/rwmutex.go
-
-// Define in bthread/mutex.cpp
+// Defined in bthread/mutex.cpp; reused here so that bthread_rwlock_t
+// participates in the global ContentionProfiler just like bthread_mutex_t
+// and bthread_sem_t.
 class ContentionProfiler;
 extern ContentionProfiler* g_cp;
 extern bvar::CollectorSpeedLimit g_cp_sl;
-extern bool is_contention_site_valid(const bthread_contention_site_t& cs);
-extern void make_contention_site_invalid(bthread_contention_site_t* cs);
 extern void submit_contention(const bthread_contention_site_t& csite, int64_t 
now_ns);
 
-// It is enough for readers. If the reader exceeds this value,
-// need to use `int64_t' instead of `int'.
-const int RWLockMaxReaders = 1 << 30;
-
-// For reading.
-static int rwlock_rdlock_impl(bthread_rwlock_t* __restrict rwlock,
-                              const struct timespec* __restrict abstime) {
-    int reader_count = ((butil::atomic<int>*)&rwlock->reader_count)
-        ->fetch_add(1, butil::memory_order_acquire) + 1;
-    // Fast path.
-    if (reader_count >= 0) {
-        CHECK_LT(reader_count, RWLockMaxReaders);
-        return 0;
-    }
-
-    // Slow path.
+// Lazily arm sampling on first contention. Caller must declare
+// `size_t sampling_range' and `int64_t start_ns' in scope:
+//   start_ns ==  0 -> not yet decided
+//   start_ns == -1 -> decided NOT to sample (profiler off / not selected)
+//   start_ns  >  0 -> sampling armed; value is the wall-clock start time
+#define BTHREAD_RWLOCK_MAYBE_START_SAMPLING                                    
   \
+    do {                                                                       
   \
+        if (start_ns == 0) {                                                   
   \
+            if (g_cp != NULL) {                                                
   \
+                sampling_range = bvar::is_collectable(&g_cp_sl);               
   \
+                start_ns = bvar::is_sampling_range_valid(sampling_range) ?     
   \
+                    butil::cpuwide_time_ns() : -1;                             
   \
+            } else {                                                           
   \
+                start_ns = -1;                                                 
   \
+            }                                                                  
   \
+        }                                                                      
   \
+    } while (0)
 
-    // Don't sample when contention profiler is off.
-    if (NULL == bthread::g_cp) {
-        return bthread_sem_timedwait(&rwlock->reader_sema, abstime);
-    }
-    // Ask Collector if this (contended) locking should be sampled.
-    const size_t sampling_range = bvar::is_collectable(&bthread::g_cp_sl);
-    if (!bvar::is_sampling_range_valid(sampling_range)) { // Don't sample.
-        return bthread_sem_timedwait(&rwlock->reader_sema, abstime);
+// Submit one contention sample if sampling was armed for this attempt.
+// `start_ns > 0' is the convention used everywhere in this file to indicate
+// that BTHREAD_RWLOCK_MAYBE_START_SAMPLING actually decided to sample.
+// No-op otherwise. Force-inlined so the uncontended fast path stays cheap.
+static BUTIL_FORCE_INLINE void submit_contention_if_sampled(
+        int64_t start_ns, size_t sampling_range) {
+    if (start_ns > 0) {
+        const int64_t end_ns = butil::cpuwide_time_ns();
+        const bthread_contention_site_t csite{end_ns - start_ns, 
sampling_range};
+        submit_contention(csite, end_ns);
     }
-
-    // Sample.
-    const int64_t start_ns = butil::cpuwide_time_ns();
-    int rc = bthread_sem_timedwait(&rwlock->reader_sema, abstime);
-    const int64_t end_ns = butil::cpuwide_time_ns();
-    const bthread_contention_site_t csite{end_ns - start_ns, sampling_range};
-    // Submit `csite' for each reader immediately after
-    // owning rdlock to avoid the contention of `csite'.
-    bthread::submit_contention(csite, end_ns);
-
-    return rc;
-}
-
-static inline int rwlock_rdlock(bthread_rwlock_t* rwlock) {
-    return rwlock_rdlock_impl(rwlock, NULL);
 }
 
-static inline int rwlock_timedrdlock(bthread_rwlock_t* __restrict rwlock,
-                                     const struct timespec* __restrict 
abstime) {
-    return rwlock_rdlock_impl(rwlock, abstime);
-}
+// bthread RWLock
+// writer-priority implementation overview
+// Three synchronization fields are used:
+//
+//   * `lock_word' (32-bit butex):
+//       bit 31  : 1 if the write lock is held, 0 otherwise.
+//       bit 0~30: number of readers currently holding the read lock.
+//       Mutually exclusive: when bit 31 is set, the lower 31 bits are 0.
+//
+//   * `writer_wait_count' (32-bit butex):
+//       Number of writers that have entered wrlock() but not yet finished
+//       (i.e. currently waiting for the mutex / waiting for lock_word==0 /
+//       holding the write lock). Each writer accounts for itself: it is
+//       incremented at the very beginning of wrlock() and decremented at
+//       the very end of unwrlock()/cleanup().
+//       Readers consult this field to implement writer-priority: if any
+//       writer is "in flight", new readers yield by waiting on it.
+//
+//   * `writer_queue_mutex' (bthread_mutex_t):
+//       Serializes writers so that at most one writer races for `lock_word'
+//       at any time. Other writers queue up on this mutex.
+//
+// Wakeup channels:
+//   * Readers waiting on writers   -> wait on  writer_wait_count, woken by 
unwrlock/cleanup
+//   * Writers waiting on readers   -> wait on  lock_word, woken by unrdlock
+//   * Writers waiting on writers   -> wait on  writer_queue_mutex
+
+int rwlock_rdlock(bthread_rwlock_t* rwlock, bool try_lock,
+                  const struct timespec* abstime) {
+    auto lock_word = (butil::atomic<unsigned>*)rwlock->lock_word;
+    auto writer_wait_count = 
(butil::atomic<unsigned>*)rwlock->writer_wait_count;
+
+    // Sampling state for the contention profiler (lazily armed on first
+    // contention so that the uncontended fast path stays cheap):
+    //   start_ns  == 0  -> not yet decided
+    //   start_ns  == -1 -> decided NOT to sample
+    //   start_ns  >  0  -> sampling armed; submit on exit
+    // Each reader samples independently and submits once on its own way out;
+    // we deliberately do NOT use rwlock->writer_csite here because that field
+    // is exclusively owned by the writer.
+    size_t sampling_range = bvar::INVALID_SAMPLING_RANGE;
+    int64_t start_ns = 0;
+    int rc = 0;
 
-// Returns 0 if the lock was acquired, otherwise errno.
-static  inline int rwlock_tryrdlock(bthread_rwlock_t* rwlock) {
     while (true) {
-        int reader_count = ((butil::atomic<int>*)&rwlock->reader_count)
-            ->load(butil::memory_order_relaxed);
-        if (reader_count < 0) {
-            // Failed to acquire the read lock because there is a writer.
-            return EBUSY;
-        }
-        if (((butil::atomic<int>*)&rwlock->reader_count)
-                ->compare_exchange_weak(reader_count, reader_count + 1,
-                                        butil::memory_order_acquire,
-                                        butil::memory_order_relaxed)) {
-            return 0;
+        // Writer-priority: if any writer is in flight, yield to it.
+        // `relaxed' is sufficient here because:
+        //   - There is no payload data published via writer_wait_count; data
+        //     visibility is established via the acquire-CAS on `lock_word' 
below
+        //     paired with the release-CAS in unwrlock().
+        //   - butex_wait() will re-check the expected value before sleeping,
+        //     so we cannot lose a wakeup even if `w' is slightly stale.
+        unsigned w = writer_wait_count->load(butil::memory_order_relaxed);
+        if (w > 0) {
+            if (try_lock) {
+                // Don't sample tryrdlock failures: they are by design a
+                // non-blocking probe, not a contention event.
+                return EBUSY;
+            }
+            // We are about to block on writer_wait_count; arm sampling
+            // before parking so the wait time is included in the report.
+            BTHREAD_RWLOCK_MAYBE_START_SAMPLING;
+            if (butex_wait(writer_wait_count, w, abstime) < 0 &&
+                errno != EWOULDBLOCK && errno != EINTR) {
+                rc = errno;
+                break;
+            }
+            continue;
         }
-    }
-}
 
-static inline int rwlock_unrdlock(bthread_rwlock_t* rwlock) {
-    int reader_count = ((butil::atomic<int>*)&rwlock->reader_count)
-        ->fetch_add(-1, butil::memory_order_relaxed) - 1;
-    // Fast path.
-    if (reader_count >= 0) {
-        return 0;
+        // No writer in flight: try to add ourselves to the reader count.
+        // 2^31 readers should be enough for any realistic workload.
+        unsigned l = lock_word->load(butil::memory_order_relaxed);
+        if ((l >> 31) == 0) {
+            // Acquire on success synchronizes-with the release-CAS in
+            // unwrlock(), so any data written by the previous writer is
+            // visible to us before we start reading.
+            if (lock_word->compare_exchange_weak(l, l + 1,
+                                                 butil::memory_order_acquire,
+                                                 butil::memory_order_relaxed)) 
{
+                rc = 0;
+                break;
+            }

Review Comment:
   rwlock_rdlock() increments the reader count with `l + 1` without guarding 
against overflow of the low 31 bits. If the reader count ever reaches 
0x7fffffff, the next increment will flip bit 31 and corrupt the state by 
appearing as a write lock held. The previous implementation had an explicit 
max-readers check; please add a similar guard or a CHECK to prevent 
overflow/corruption.



##########
test/bthread_rwlock_unittest.cpp:
##########
@@ -286,6 +287,253 @@ TEST(RWLockTest, mix_thread_types) {
     ASSERT_EQ(0, bthread_rwlock_destroy(&rw));
 }
 
+// Tests below verify the writer-priority semantics and the cleanup path
+// guarded by the design notes in bthread/rwlock.cpp.
+struct WriterPriorityArgs {
+    bthread_rwlock_t* rw;
+    butil::atomic<int>* order;
+    int my_order; // sequence number captured inside the critical section
+    int hold_us;
+};
+
+void* wp_writer_fn(void* arg) {
+    auto* a = (WriterPriorityArgs*)arg;
+    EXPECT_EQ(0, bthread_rwlock_wrlock(a->rw));
+    a->my_order = a->order->fetch_add(1, butil::memory_order_relaxed);
+    bthread_usleep(a->hold_us);
+    EXPECT_EQ(0, bthread_rwlock_unlock(a->rw));
+    return NULL;
+}
+
+void* wp_reader_fn(void* arg) {
+    auto* a = (WriterPriorityArgs*)arg;
+    EXPECT_EQ(0, bthread_rwlock_rdlock(a->rw));
+    a->my_order = a->order->fetch_add(1, butil::memory_order_relaxed);
+    bthread_usleep(a->hold_us);
+    EXPECT_EQ(0, bthread_rwlock_unlock(a->rw));
+    return NULL;
+}
+
+// Verifies the writer-priority invariant guarded by the order
+// "unlock writer_queue_mutex BEFORE fetch_sub(writer_wait_count)" in
+// rwlock_unwrlock(): once a writer is queued, any new reader arriving
+// later MUST yield to that writer.
+TEST(RWLockTest, writer_priority) {
+    bthread_setconcurrency(8);
+    bthread_rwlock_t rw;
+    ASSERT_EQ(0, bthread_rwlock_init(&rw, NULL));
+
+    // (1) Main thread holds the read lock first.
+    ASSERT_EQ(0, bthread_rwlock_rdlock(&rw));
+
+    butil::atomic<int> order(0);
+    WriterPriorityArgs warg  {&rw, &order, -1, 5000};
+    WriterPriorityArgs r2arg {&rw, &order, -1, 0};
+
+    // (2) Start a writer; it should park inside wrlock() because the read
+    //     lock is held. Sleep long enough for it to fetch_add into
+    //     writer_wait_count and reach the butex_wait on `lock_word'.
+    bthread_t wth;
+    ASSERT_EQ(0, bthread_start_urgent(&wth, NULL, wp_writer_fn, &warg));
+    bthread_usleep(50 * 1000);
+
+    // (3) Now spawn a fresh reader. By writer-priority it MUST observe
+    //     writer_wait_count > 0 and park on it (NOT join the active read
+    //     lock).
+    bthread_t r2th;
+    ASSERT_EQ(0, bthread_start_urgent(&r2th, NULL, wp_reader_fn, &r2arg));
+    bthread_usleep(50 * 1000);
+
+    // (4) Release the original read lock. The writer should win the race
+    //     and complete BEFORE the queued reader.
+    ASSERT_EQ(0, bthread_rwlock_unlock(&rw));
+
+    bthread_join(wth, NULL);
+    bthread_join(r2th, NULL);
+
+    EXPECT_GE(warg.my_order, 0);
+    EXPECT_GE(r2arg.my_order, 0);
+    EXPECT_LT(warg.my_order, r2arg.my_order)
+        << "Writer-priority violated: writer entered with order="
+        << warg.my_order << " but late reader entered with order="
+        << r2arg.my_order;
+
+    ASSERT_EQ(0, bthread_rwlock_destroy(&rw));
+}
+
+void* wp_timed_wrlock_short(void* arg) {
+    auto* rw = (bthread_rwlock_t*)arg;
+    timespec ts = butil::milliseconds_from_now(50);
+    EXPECT_EQ(ETIMEDOUT, bthread_rwlock_timedwrlock(rw, &ts));
+    return NULL;
+}
+
+// Verifies the cleanup path of rwlock_wrlock_cleanup(): after multiple
+// writers fail with ETIMEDOUT, writer_wait_count must be back to 0 so
+// that subsequent readers are not blocked by leftover "ghost shares".
+TEST(RWLockTest, wrlock_failure_does_not_leak_writer_count) {
+    bthread_setconcurrency(8);
+    bthread_rwlock_t rw;
+    ASSERT_EQ(0, bthread_rwlock_init(&rw, NULL));
+
+    // Hold the read lock so every wrlock attempt must block on `lock_word'.
+    ASSERT_EQ(0, bthread_rwlock_rdlock(&rw));
+
+    const int N = 8;
+    bthread_t wth[N];
+    for (int i = 0; i < N; ++i) {
+        ASSERT_EQ(0, bthread_start_urgent(&wth[i], NULL, 
wp_timed_wrlock_short, &rw));
+    }
+    // Wait for all timed wrlock attempts to time out and run cleanup.
+    for (int i = 0; i < N; ++i) {
+        bthread_join(wth[i], NULL);
+    }
+
+    // Release the read lock; from this point on no writer is in flight,
+    // so a new reader MUST acquire the lock immediately.
+    ASSERT_EQ(0, bthread_rwlock_unlock(&rw));
+
+    timespec ts = butil::milliseconds_from_now(500);
+    butil::Timer t;
+    t.start();
+    ASSERT_EQ(0, bthread_rwlock_timedrdlock(&rw, &ts));
+    t.stop();
+    EXPECT_LT(t.m_elapsed(), 100)
+        << "Reader was blocked for " << t.m_elapsed() << "ms; "
+        << "writer_wait_count was likely leaked by the cleanup path.";
+
+    ASSERT_EQ(0, bthread_rwlock_unlock(&rw));
+    ASSERT_EQ(0, bthread_rwlock_destroy(&rw));
+}
+
+struct DataConsistencyArgs {
+    bthread_rwlock_t* rw;
+    int64_t* shared;       // protected by rw
+    int64_t local_inc;     // writer: number of increments this thread did
+    int64_t observed_max;  // reader: max value observed
+    bool is_writer;
+};
+
+void* dc_worker(void* arg) {
+    auto* a = (DataConsistencyArgs*)arg;
+    while (!g_stopped) {
+        if (a->is_writer) {
+            EXPECT_EQ(0, bthread_rwlock_wrlock(a->rw));
+            ++(*a->shared);
+            ++a->local_inc;

Review Comment:
   The new tests spin on and modify `g_stopped` from multiple threads, but 
`g_stopped` is a plain bool. This is a C++ data race (undefined behavior) and 
can cause flaky tests on some architectures/optimizations. Please make 
`g_stopped` (and similar flags like g_started/PerfArgs::ready if relied upon 
across threads) an atomic with at least relaxed loads/stores (or use proper 
synchronization).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to