Cliff has been bugging me about transport events, sending messages and
memory bounding for ages and the penny finally dropped, I think it
deserves a wider audience:

The issue is bounding memory use in a proton sending application. AMQP
flow control (as shown in our examples) covers many/most cases
providing the receiver sets a "reasonable" credit limits.

However, on the sender, if the receiver sets infinite credit, or has a
much bigger notion of "reasonable", proton will buffer messages without
regard to sender constraints. It is quite plausible that
receiver/sender have very different memory constraints - one might be a
large hub server, the other embedded on millions of small devices
(webcams for example)

The `on_sendable` or PN_FLOW event tells you the remote end has given
credit, so you can write a sender that waits for credit before sending.
I think we can use the C PN_TRANSPORT event in a similar way to limit
sender memory. Attached is a C example/explanation.

Some of our language bindings don't expose TRANSPORT and we might want
to think of a more intuitive way to express this. Also TRANSPORT is a
bit of a catch-all event, it does fire when data moves from session to
transport buffers, but it fires for other things too. We might want to
look at the event model.

Meantime I think the attached is a workable approach in C. Would love
to hear comments, this is something we probably should incorporate into
the language bindings
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

/*
  The goal is to bound the buffering for sent messages per session to some limit L.

  If a message is bigger than L then we have to over-buffer temporarily, but we
  won't buffer for any more messages till the buffer falls below L again. [1][2]

  AMQP session flow control ensures data moves to the transport buffer
  respecting the session's outgoing-window, so the possible buffering per
  session is:

      L + outgoing-window (+ large-message-overflow)

  Setting L == outgoing-window seems like an obvious choice, but different L
  could be allowed if it made sense.
 */
bool try_to_send(pn_link_t *sender, pn_bytes_t message) {
    if (!pn_link_is_sender(sender) || pn_link_credit(sender) <= 0)
        return false;                 /* No credit */

    pn_session_t *ssn = pn_link_session(sender);
    size_t limit = pn_session_outging_window(ssn);
    size_t buffered = pn_session_outging_bytes(ssn);
    bool can_send = buffered + message.size < limit;
    if (!can_send && message.size > limit) /* Oversize */
        can_send = (buffered == 0);        /* Only if buffer empty */
    if (can_send)
        pn_link_send(sender);
    return can_send;
}

/* Sketch of application code. send_available sends as much as it can, up to
   credit limit, session buffer limit, or app runs out of messages.  We call
   send_available() whenever anything happens that might remove one of those
   limits.
*/

void send_available() {
    sender = ...;
    for (message =;;) { /* Application specific, iterate over available messages. */
        if (try_to_send(sender), message) {
            /* Message was sent, remove from available list. */
        } else {
            /* Message not sent, leave on available list for next send_available() */
            break;
        }
    }
}

void handle_event(pn_event_t *e) {
switch (pn_event_type(e)) {
    /* ...  */
  case PN_FLOW:
    /* Send whatever we can when there is credit. */
    send_available();
    break;
  case PN_TRANSPORT:
    /* Send whatever we can when there is session buffer space freed up. */
    send_available();
    break;
}

void somewhere_in_the_app() {
    generate_available_message();
    send_available();           /* For single-threaded apps only */
    /* Multi-threaded apps must wakeup/notify/inject a call to send_available
       in the appropriate thread. */
}

/*
  [1] If all messages are oversize the buffering will work as follows: at least
  sizeof(message1)-outgoing-window must be sent on the network before we can
  start buffering message2. There can still be up to one outgoing-window of
  message1 in the transport buffer while we work on message2. This is not as
  efficient as being able to work on frame boundaries but probably not horrible.

  [2] pn_link_send does not really send messages, it sends bytes on the current
  delivery. It is normally used to send entire messages so we should allow the
  case of oversize messages, but it may also be possible to break a message into
  chunks.
*/

---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscr...@qpid.apache.org
For additional commands, e-mail: users-h...@qpid.apache.org

Reply via email to