This is an automated email from the ASF dual-hosted git repository.

cliffjansen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git

commit 239a39eb2d04f0588081975a4f173bb5f121d1fa
Author: Cliff Jansen <cliffjan...@apache.org>
AuthorDate: Sun Nov 21 12:37:32 2021 -0800

    PROTON-2362: epoll proactor fix for tsan_tr1.txt.  Check earmark edge case 
at same time and with same lock as for unassign_thread.
---
 c/src/proactor/epoll-internal.h       |   3 +-
 c/src/proactor/epoll.c                | 114 ++++++++++++++++++----------------
 c/src/proactor/epoll_raw_connection.c |   4 +-
 3 files changed, 65 insertions(+), 56 deletions(-)

diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h
index 5f7ed9b..f0f57af 100644
--- a/c/src/proactor/epoll-internal.h
+++ b/c/src/proactor/epoll-internal.h
@@ -348,7 +348,7 @@ int pclosefd(pn_proactor_t *p, int fd);
 void proactor_add(task_t *tsk);
 bool proactor_remove(task_t *tsk);
 
-bool unassign_thread(tslot_t *ts, tslot_state new_state);
+bool unassign_thread(pn_proactor_t *p, tslot_t *ts, tslot_state new_state, 
tslot_t **resume_thread);
 
 void task_init(task_t *tsk, task_type_t t, pn_proactor_t *p);
 static void task_finalize(task_t* tsk) {
@@ -385,6 +385,7 @@ void pni_timer_manager_finalize(pni_timer_manager_t *tm);
 pn_event_batch_t *pni_timer_manager_process(pni_timer_manager_t *tm, bool 
timeout, bool sched_ready);
 void pni_pconnection_timeout(pconnection_t *pc);
 void pni_proactor_timeout(pn_proactor_t *p);
+void pni_resume(pn_proactor_t *p, tslot_t *ts);
 
 // Generic wake primitives for a task.
 
diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
index 1adaeb3..c481274 100644
--- a/c/src/proactor/epoll.c
+++ b/c/src/proactor/epoll.c
@@ -406,7 +406,26 @@ static void resume(pn_proactor_t *p, tslot_t *ts) {
     pthread_cond_signal(&ts->cond);
   }
   unlock(&ts->mutex);
+}
+
+// Call with no lock
+void pni_resume(pn_proactor_t *p, tslot_t *ts) {
+  resume(p, ts);
+}
+
+// Call with shed_lock held
+// Caller must resume() return value if not null
+static tslot_t *resume_one_thread(pn_proactor_t *p) {
+  // If pn_proactor_get has an early return, we need to resume one suspended 
thread (if any)
+  // to be the new poller.
 
+  tslot_t *ts = p->suspend_list_head;
+  if (ts) {
+    LL_REMOVE(p, suspend_list, ts);
+    p->suspend_list_count--;
+    ts->state = PROCESSING;
+  }
+  return ts;
 }
 
 // Call with sched lock
@@ -445,11 +464,14 @@ static bool reschedule(task_t *tsk) {
   return notify;
 }
 
-// Call with sched lock
-bool unassign_thread(tslot_t *ts, tslot_state new_state) {
+// Call with sched lock.
+// If true returned, caller must call notify_poller() after releasing the lock.
+// If resume_thread is set, the caller must call resume() after releasing the 
lock.
+bool unassign_thread(pn_proactor_t *p, tslot_t *ts, tslot_state new_state, 
tslot_t **resume_thread) {
   task_t *tsk = ts->task;
   bool notify = false;
   bool deleting = (ts->state == DELETING);
+  *resume_thread = NULL;
   ts->task = NULL;
   ts->state = new_state;
   if (tsk) {
@@ -460,7 +482,6 @@ bool unassign_thread(tslot_t *ts, tslot_state new_state) {
   // Check if unseen events or schedule() calls occurred while task was 
working.
 
   if (tsk && !deleting) {
-    pn_proactor_t *p = tsk->proactor;
     ts->prev_task = tsk;
     if (tsk->sched_pending) {
       // Make sure the task is already scheduled or put it on the ready list
@@ -482,6 +503,19 @@ bool unassign_thread(tslot_t *ts, tslot_state new_state) {
       }
     }
   }
+
+  // Earmark drain accounting.
+  if (ts->earmark_override) {
+    // This thread "stole" the task previously assigned to thread 
ts->earmark_override.
+    if (ts->earmark_override->generation == ts->earmark_override_gen) {
+      // Other (overridden) thread not seen since this thread completed the 
pending work on the task.
+      // Thread is perhaps gone forever, which may leave us short of a poller 
thread and hanging.
+      // Find a thread to resume if available.  Worst case is a spurious 
resume/suspend by an idle thread.
+      *resume_thread = resume_one_thread(p);
+    }
+    ts->earmark_override = NULL;
+  }
+
   return notify;
 }
 
@@ -1014,10 +1048,11 @@ static void pconnection_done(pconnection_t *pc) {
       pconnection_cleanup(pc);
       // pc may be undefined now
       lock(&p->sched_mutex);
-      notify = unassign_thread(ts, UNUSED);
+      tslot_t *resume_thread;
+      notify = unassign_thread(p, ts, UNUSED, &resume_thread);
       unlock(&p->sched_mutex);
-      if (notify)
-        notify_poller(p);
+      if (notify) notify_poller(p);
+      if (resume_thread) resume(p, resume_thread);
       return;
     }
   }
@@ -1029,10 +1064,12 @@ static void pconnection_done(pconnection_t *pc) {
 
   if (wanted) pconnection_rearm(pc, wanted);  // May free pc on another 
thread.  Return without touching pc again.
   lock(&p->sched_mutex);
-  if (unassign_thread(ts, UNUSED))
+  tslot_t *resume_thread;
+  if (unassign_thread(p, ts, UNUSED, &resume_thread))
     notify = true;
   unlock(&p->sched_mutex);
   if (notify) notify_poller(p);
+  if (resume_thread) resume(p, resume_thread);
   return;
 }
 
@@ -1803,20 +1840,23 @@ static void listener_done(pn_listener_t *l) {
     unlock(&l->task.mutex);
     pn_listener_free(l);
     lock(&p->sched_mutex);
-    notify = unassign_thread(ts, UNUSED);
+    tslot_t *resume_thread;
+    notify = unassign_thread(p, ts, UNUSED, &resume_thread);
     unlock(&p->sched_mutex);
-    if (notify)
-      notify_poller(p);
+    if (notify) notify_poller(p);
+    if (resume_thread) resume(p, resume_thread);
     return;
   } else if (n_events || listener_has_event(l))
     notify = schedule(&l->task);
   unlock(&l->task.mutex);
 
   lock(&p->sched_mutex);
-  if (unassign_thread(ts, UNUSED))
+  tslot_t *resume_thread;
+  if (unassign_thread(p, ts, UNUSED, &resume_thread))
     notify = true;
   unlock(&p->sched_mutex);
   if (notify) notify_poller(p);
+  if (resume_thread) resume(p, resume_thread);
 }
 
 pn_proactor_t *pn_listener_proactor(pn_listener_t* l) {
@@ -2167,21 +2207,6 @@ static tslot_t *find_tslot(pn_proactor_t *p) {
   return ts;
 }
 
-// Call with shed_lock held
-// Caller must resume() return value if not null
-static tslot_t *resume_one_thread(pn_proactor_t *p) {
-  // If pn_proactor_get has an early return, we need to resume one suspended 
thread (if any)
-  // to be the new poller.
-
-  tslot_t *ts = p->suspend_list_head;
-  if (ts) {
-    LL_REMOVE(p, suspend_list, ts);
-    p->suspend_list_count--;
-    ts->state = PROCESSING;
-  }
-  return ts;
-}
-
 // Called with sched lock, returns with sched lock still held.
 static pn_event_batch_t *process(task_t *tsk) {
   bool tsk_ready = false;
@@ -2421,10 +2446,12 @@ static pn_event_batch_t 
*next_event_batch(pn_proactor_t* p, bool can_block) {
         unlock(&p->sched_mutex);
         return batch;
       }
-      bool notify = unassign_thread(ts, PROCESSING);
-      if (notify) {
+      tslot_t *resume_thread;
+      bool notify = unassign_thread(p, ts, PROCESSING, &resume_thread);
+      if (notify || resume_thread) {
         unlock(&p->sched_mutex);
-        notify_poller(p);
+        if (notify) notify_poller(p);
+        if (resume_thread) resume(p, resume_thread);
         lock(&p->sched_mutex);
       }
       continue;  // Long time may have passed.  Back to beginning.
@@ -2654,44 +2681,23 @@ pn_event_batch_t *pn_proactor_get(struct pn_proactor_t* 
p) {
   return next_event_batch(p, false);
 }
 
-// Call with no locks
-static inline void check_earmark_override(pn_proactor_t *p, tslot_t *ts) {
-  if (!ts || !ts->earmark_override)
-    return;
-  if (ts->earmark_override->generation == ts->earmark_override_gen) {
-    // Other (overridden) thread not seen since this thread started and 
finished the event batch.
-    // Thread is perhaps gone forever, which may leave us short of a poller 
thread
-    lock(&p->sched_mutex);
-    tslot_t *res_ts = resume_one_thread(p);
-    unlock(&p->sched_mutex);
-    if (res_ts) resume(p, res_ts);
-  }
-  ts->earmark_override = NULL;
-}
-
 void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) {
   pconnection_t *pc = batch_pconnection(batch);
   if (pc) {
-    tslot_t *ts = pc->task.runner;
     pconnection_done(pc);
     // pc possibly freed/invalid
-    check_earmark_override(p, ts);
     return;
   }
   pn_listener_t *l = batch_listener(batch);
   if (l) {
-    tslot_t *ts = l->task.runner;
     listener_done(l);
     // l possibly freed/invalid
-    check_earmark_override(p, ts);
     return;
   }
   praw_connection_t *rc = pni_batch_raw_connection(batch);
   if (rc) {
-    tslot_t *ts = pni_raw_connection_task(rc)->runner;
     pni_raw_connection_done(rc);
     // rc possibly freed/invalid
-    check_earmark_override(p, ts);
     return;
   }
   pn_proactor_t *bp = batch_proactor(batch);
@@ -2712,13 +2718,13 @@ void pn_proactor_done(pn_proactor_t *p, 
pn_event_batch_t *batch) {
     unlock(&p->task.mutex);
     lock(&p->sched_mutex);
     tslot_t *ts = p->task.runner;
-    if (unassign_thread(ts, UNUSED))
+    tslot_t *resume_thread;
+    if (unassign_thread(p, ts, UNUSED, &resume_thread))
       notify = true;
     unlock(&p->sched_mutex);
 
-    if (notify)
-      notify_poller(p);
-    check_earmark_override(p, ts);
+    if (notify) notify_poller(p);
+    if (resume_thread) resume(p, resume_thread);
     return;
   }
 }
diff --git a/c/src/proactor/epoll_raw_connection.c 
b/c/src/proactor/epoll_raw_connection.c
index 99c5dd9..94d6460 100644
--- a/c/src/proactor/epoll_raw_connection.c
+++ b/c/src/proactor/epoll_raw_connection.c
@@ -421,7 +421,9 @@ void pni_raw_connection_done(praw_connection_t *rc) {
   }
 
   lock(&p->sched_mutex);
-  notify |= unassign_thread(ts, UNUSED);
+  tslot_t *resume_thread;
+  notify |= unassign_thread(p, ts, UNUSED, &resume_thread);
   unlock(&p->sched_mutex);
   if (notify) notify_poller(p);
+  if (resume_thread) pni_resume(p, resume_thread);
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to