kgiusti commented on a change in pull request #693: DISPATCH-1579: open 
dedicated inter-router sessions for data and control links
URL: https://github.com/apache/qpid-dispatch/pull/693#discussion_r389857365
 
 

 ##########
 File path: src/router_node.c
 ##########
 @@ -786,18 +786,47 @@ static int AMQP_link_attach_handler(void* context, 
qd_link_t *link)
 
 
 /**
- * Handler for flow events on links
+ * Handler for flow events on links.  Flow updates include session window
+ * state, which needs to be checked for unblocking Q3.
  */
 static int AMQP_link_flow_handler(void* context, qd_link_t *link)
 {
-    qd_router_t *router = (qd_router_t*) context;
-    qdr_link_t  *rlink  = (qdr_link_t*) qd_link_get_context(link);
-    pn_link_t   *pnlink = qd_link_pn(link);
-
-    if (!rlink)
-        return 0;
+    qd_router_t *router  = (qd_router_t*) context;
+    pn_link_t   *pnlink  = qd_link_pn(link);
+    pn_session_t *pn_ssn = pn_link_session(pnlink);
+
+    bool link_flowed = false;
+    if (pn_ssn) {
+        qd_session_t *qd_ssn = pn_session_get_context(pn_ssn);
+        if (qd_ssn && qd_session_is_q3_blocked(qd_ssn)) {
+            // Q3 blocked - have we drained enough outgoing bytes?
+            const size_t q3_lower = BUFFER_SIZE * QD_QLIMIT_Q3_LOWER;
+            if (pn_session_outgoing_bytes(pn_ssn) < q3_lower) {
+                // yes.  We must now unblock all links that have been blocked 
by Q3
+                qd_link_list_t *blinks = qd_session_q3_blocked_links(qd_ssn);
+                qd_link_t *blink = DEQ_HEAD(*blinks);
+                while (blink) {
+                    qd_link_q3_unblock(blink);  // removes from blinks list!
+                    qdr_link_t *qdr_link = (qdr_link_t *) 
qd_link_get_context(blink);
+                    if (qdr_link) {
+                        pn_link_t *pn_link = qd_link_pn(blink);
+                        // signalling flow to the core causes the link to be 
activated
+                        qdr_link_flow(router->router_core, qdr_link, 
pn_link_remote_credit(pn_link), pn_link_get_drain(pn_link));
+                        if (blink == link)
+                            link_flowed = true;  // original link flowed - do 
not flow again
+                    }
+                    blink = DEQ_HEAD(*blinks);
+                }
+            }
+        }
+    }
 
-    qdr_link_flow(router->router_core, rlink, pn_link_remote_credit(pnlink), 
pn_link_get_drain(pnlink));
+    if (!link_flowed) {
 
 Review comment:
   will do

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@qpid.apache.org
For additional commands, e-mail: dev-h...@qpid.apache.org

Reply via email to