This is an automated email from the ASF dual-hosted git repository.

tross pushed a commit to branch dev-protocol-adaptors
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git

commit 8a53cbbe22ab8aff7017714ca131949ed33fe345
Author: Ted Ross <tr...@apache.org>
AuthorDate: Mon Jun 22 13:17:38 2020 -0400

    Dataplane: Added implementation of qd_message_release_body.
---
 include/qpid/dispatch/buffer.h |  9 +++++++++
 src/message.c                  | 30 ++++++++++++++++++++++++++++++
 2 files changed, 39 insertions(+)

diff --git a/include/qpid/dispatch/buffer.h b/include/qpid/dispatch/buffer.h
index 4de49bd..e93b5df 100644
--- a/include/qpid/dispatch/buffer.h
+++ b/include/qpid/dispatch/buffer.h
@@ -147,6 +147,15 @@ static inline uint32_t qd_buffer_set_fanout(qd_buffer_t 
*buf, uint32_t value)
 }
 
 /**
+ * Get the fanout value on the buffer.
+ * @return the count
+ */
+static inline uint32_t qd_buffer_get_fanout(const qd_buffer_t *buf)
+{
+    return buf->bfanout;
+}
+
+/**
  * Increase the fanout by 1. How many receivers should this buffer be sent to.
  * @return the _old_ count (pre increment)
  */
diff --git a/src/message.c b/src/message.c
index 55d134e..78f2888 100644
--- a/src/message.c
+++ b/src/message.c
@@ -2292,6 +2292,36 @@ int qd_message_read_body(qd_message_t *in_msg, 
pn_raw_buffer_t* buffers, int len
 }
 
 
+void qd_message_release_body(qd_message_t *msg, pn_raw_buffer_t *buffers, int 
buffer_count)
+{
+    qd_message_pvt_t     *pvt     = (qd_message_pvt_t*) msg;
+    qd_message_content_t *content = MSG_CONTENT(msg);
+    qd_buffer_t          *buf;
+
+    LOCK(content->lock);
+    //
+    // Decrement the buffer fanout for each of the referenced buffers.
+    //
+    if (pvt->is_fanout) {
+        for (int i = 0; i < buffer_count; i++) {
+            buf = (qd_buffer_t*) buffers[i].context;
+            qd_buffer_dec_fanout(buf);
+        }
+    }
+
+    //
+    // Free buffers at the head of the list that have zero refcounts.
+    //
+    buf = DEQ_HEAD(content->buffers);
+    while (buf && qd_buffer_get_fanout(buf) == 0) {
+        DEQ_REMOVE_HEAD(content->buffers);
+        qd_buffer_free(buf);
+        buf = DEQ_HEAD(content->buffers);
+    }
+    UNLOCK(content->lock);
+}
+
+
 qd_parsed_field_t *qd_message_get_ingress    (qd_message_t *msg)
 {
     return ((qd_message_pvt_t*)msg)->content->ma_pf_ingress;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to