PROTON-723: based on gordon's patch, added support for the coordinator target
git-svn-id: https://svn.apache.org/repos/asf/qpid/proton/trunk@1635266 13f79535-47bb-0310-9956-ffa450edef68 Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/4c6f2122 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/4c6f2122 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/4c6f2122 Branch: refs/heads/examples Commit: 4c6f21223d44ed585c5f25bf4f706d2c751a485a Parents: ca38e6c Author: Rafael H. Schloming <[email protected]> Authored: Wed Oct 29 19:27:57 2014 +0000 Committer: Rafael H. Schloming <[email protected]> Committed: Wed Oct 29 19:27:57 2014 +0000 ---------------------------------------------------------------------- proton-c/src/protocol.py | 2 + proton-c/src/transport/transport.c | 90 +- proton-c/src/transport/transport.c.orig | 2229 ++++++++++++++++++++++++++ proton-j/src/main/resources/cengine.py | 25 +- tests/python/proton_tests/engine.py | 10 +- 5 files changed, 2313 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4c6f2122/proton-c/src/protocol.py ---------------------------------------------------------------------- diff --git a/proton-c/src/protocol.py b/proton-c/src/protocol.py index df47b0a..685e63b 100644 --- a/proton-c/src/protocol.py +++ b/proton-c/src/protocol.py @@ -20,6 +20,7 @@ import mllib, os, sys doc = mllib.xml_parse(os.path.join(os.path.dirname(__file__), "transport.xml")) mdoc = mllib.xml_parse(os.path.join(os.path.dirname(__file__), "messaging.xml")) +tdoc = mllib.xml_parse(os.path.join(os.path.dirname(__file__), "transactions.xml")) sdoc = mllib.xml_parse(os.path.join(os.path.dirname(__file__), "security.xml")) def eq(attr, value): @@ -27,6 +28,7 @@ def eq(attr, value): TYPES = doc.query["amqp/section/type", eq("@class", "composite")] + \ mdoc.query["amqp/section/type", eq("@class", "composite")] + \ + tdoc.query["amqp/section/type", eq("@class", "composite")] + \ sdoc.query["amqp/section/type", eq("@class", "composite")] + \ mdoc.query["amqp/section/type", eq("@provides", "section")] RESTRICTIONS = {} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4c6f2122/proton-c/src/transport/transport.c ---------------------------------------------------------------------- diff --git a/proton-c/src/transport/transport.c b/proton-c/src/transport/transport.c index f37e636..d91b55a 100644 --- a/proton-c/src/transport/transport.c +++ b/proton-c/src/transport/transport.c @@ -695,7 +695,15 @@ int pn_do_attach(pn_dispatcher_t *disp) pn_terminus_set_timeout(rtgt, tgt_timeout); pn_terminus_set_dynamic(rtgt, tgt_dynamic); } else { - pn_terminus_set_type(rtgt, PN_UNSPECIFIED); + uint64_t code = 0; + pn_data_clear(link->remote_target.capabilities); + err = pn_scan_args(disp, "D.[.....D..DL[C]...]", &code, + link->remote_target.capabilities); + if (code == COORDINATOR) { + pn_terminus_set_type(rtgt, PN_COORDINATOR); + } else { + pn_terminus_set_type(rtgt, PN_UNSPECIFIED); + } } if (snd_settle) @@ -1323,34 +1331,58 @@ int pn_process_link_setup(pn_transport_t *transport, pn_endpoint_t *endpoint) { pni_map_local_handle(link); const pn_distribution_mode_t dist_mode = link->source.distribution_mode; - int err = pn_post_frame(transport->disp, ssn_state->local_channel, - "DL[SIoBB?DL[SIsIoC?sCnCC]?DL[SIsIoCC]nnI]", ATTACH, - pn_string_get(link->name), - state->local_handle, - endpoint->type == RECEIVER, - link->snd_settle_mode, - link->rcv_settle_mode, - (bool) link->source.type, SOURCE, - pn_string_get(link->source.address), - link->source.durability, - expiry_symbol(link->source.expiry_policy), - link->source.timeout, - link->source.dynamic, - link->source.properties, - (dist_mode != PN_DIST_MODE_UNSPECIFIED), dist_mode2symbol(dist_mode), - link->source.filter, - link->source.outcomes, - link->source.capabilities, - (bool) link->target.type, TARGET, - pn_string_get(link->target.address), - link->target.durability, - expiry_symbol(link->target.expiry_policy), - link->target.timeout, - link->target.dynamic, - link->target.properties, - link->target.capabilities, - 0); - if (err) return err; + if (link->target.type == PN_COORDINATOR) { + int err = pn_post_frame(transport->disp, ssn_state->local_channel, + "DL[SIoBB?DL[SIsIoC?sCnCC]DL[C]nnI]", ATTACH, + pn_string_get(link->name), + state->local_handle, + endpoint->type == RECEIVER, + link->snd_settle_mode, + link->rcv_settle_mode, + (bool) link->source.type, SOURCE, + pn_string_get(link->source.address), + link->source.durability, + expiry_symbol(link->source.expiry_policy), + link->source.timeout, + link->source.dynamic, + link->source.properties, + (dist_mode != PN_DIST_MODE_UNSPECIFIED), dist_mode2symbol(dist_mode), + link->source.filter, + link->source.outcomes, + link->source.capabilities, + COORDINATOR, link->target.capabilities, + 0); + if (err) return err; + } else { + int err = pn_post_frame(transport->disp, ssn_state->local_channel, + "DL[SIoBB?DL[SIsIoC?sCnCC]?DL[SIsIoCC]nnI]", ATTACH, + pn_string_get(link->name), + state->local_handle, + endpoint->type == RECEIVER, + link->snd_settle_mode, + link->rcv_settle_mode, + (bool) link->source.type, SOURCE, + pn_string_get(link->source.address), + link->source.durability, + expiry_symbol(link->source.expiry_policy), + link->source.timeout, + link->source.dynamic, + link->source.properties, + (dist_mode != PN_DIST_MODE_UNSPECIFIED), dist_mode2symbol(dist_mode), + link->source.filter, + link->source.outcomes, + link->source.capabilities, + (bool) link->target.type, TARGET, + pn_string_get(link->target.address), + link->target.durability, + expiry_symbol(link->target.expiry_policy), + link->target.timeout, + link->target.dynamic, + link->target.properties, + link->target.capabilities, + 0); + if (err) return err; + } } } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4c6f2122/proton-c/src/transport/transport.c.orig ---------------------------------------------------------------------- diff --git a/proton-c/src/transport/transport.c.orig b/proton-c/src/transport/transport.c.orig new file mode 100644 index 0000000..f37e636 --- /dev/null +++ b/proton-c/src/transport/transport.c.orig @@ -0,0 +1,2229 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "engine/engine-internal.h" +#include <stdlib.h> +#include <string.h> +#include <proton/framing.h> +#include "protocol.h" +#include "dispatch_actions.h" + +#include <assert.h> +#include <stdarg.h> +#include <stdio.h> + +#include "sasl/sasl-internal.h" +#include "ssl/ssl-internal.h" +#include "platform.h" +#include "platform_fmt.h" + +static ssize_t transport_consume(pn_transport_t *transport); + +// delivery buffers + +void pn_delivery_map_init(pn_delivery_map_t *db, pn_sequence_t next) +{ + db->deliveries = pn_hash(PN_OBJECT, 0, 0.75); + db->next = next; +} + +void pn_delivery_map_free(pn_delivery_map_t *db) +{ + pn_free(db->deliveries); +} + +pn_delivery_t *pn_delivery_map_get(pn_delivery_map_t *db, pn_sequence_t id) +{ + return (pn_delivery_t *) pn_hash_get(db->deliveries, id); +} + +static void pn_delivery_state_init(pn_delivery_state_t *ds, pn_delivery_t *delivery, pn_sequence_t id) +{ + ds->id = id; + ds->sent = false; + ds->init = true; +} + +pn_delivery_state_t *pn_delivery_map_push(pn_delivery_map_t *db, pn_delivery_t *delivery) +{ + pn_delivery_state_t *ds = &delivery->state; + pn_delivery_state_init(ds, delivery, db->next++); + pn_hash_put(db->deliveries, ds->id, delivery); + return ds; +} + +void pn_delivery_map_del(pn_delivery_map_t *db, pn_delivery_t *delivery) +{ + if (delivery->state.init) { + pn_hash_del(db->deliveries, delivery->state.id); + } + delivery->state.init = false; + delivery->state.sent = false; +} + +void pn_delivery_map_clear(pn_delivery_map_t *dm) +{ + pn_hash_t *hash = dm->deliveries; + for (pn_handle_t entry = pn_hash_head(hash); + entry; + entry = pn_hash_next(hash, entry)) + { + pn_delivery_t *dlv = (pn_delivery_t *) pn_hash_value(hash, entry); + pn_delivery_map_del(dm, dlv); + } + dm->next = 0; +} + +static ssize_t pn_input_read_amqp_header(pn_io_layer_t *io_layer, const char *bytes, size_t available); +static ssize_t pn_input_read_amqp(pn_io_layer_t *io_layer, const char *bytes, size_t available); +static ssize_t pn_output_write_amqp_header(pn_io_layer_t *io_layer, char *bytes, size_t available); +static ssize_t pn_output_write_amqp(pn_io_layer_t *io_layer, char *bytes, size_t available); +static pn_timestamp_t pn_tick_amqp(pn_io_layer_t *io_layer, pn_timestamp_t now); + +static void pni_default_tracer(pn_transport_t *transport, const char *message) +{ + fprintf(stderr, "[%p]:%s\n", (void *) transport, message); +} + +static void pn_transport_initialize(void *object) +{ + pn_transport_t *transport = (pn_transport_t *)object; + transport->freed = false; + transport->output_buf = NULL; + transport->output_size = PN_DEFAULT_MAX_FRAME_SIZE ? PN_DEFAULT_MAX_FRAME_SIZE : 16 * 1024; + transport->input_buf = NULL; + transport->input_size = PN_DEFAULT_MAX_FRAME_SIZE ? PN_DEFAULT_MAX_FRAME_SIZE : 16 * 1024; + transport->tracer = pni_default_tracer; + transport->header_count = 0; + transport->sasl = NULL; + transport->ssl = NULL; + transport->scratch = pn_string(NULL); + transport->disp = pn_dispatcher(0, transport); + transport->connection = NULL; + + pn_io_layer_t *io_layer = transport->io_layers; + while (io_layer != &transport->io_layers[PN_IO_AMQP]) { + io_layer->context = NULL; + io_layer->next = io_layer + 1; + io_layer->process_input = pn_io_layer_input_passthru; + io_layer->process_output = pn_io_layer_output_passthru; + io_layer->process_tick = pn_io_layer_tick_passthru; + io_layer->buffered_output = NULL; + io_layer->buffered_input = NULL; + ++io_layer; + } + + pn_io_layer_t *amqp = &transport->io_layers[PN_IO_AMQP]; + amqp->context = transport; + amqp->process_input = pn_input_read_amqp_header; + amqp->process_output = pn_output_write_amqp_header; + amqp->process_tick = pn_io_layer_tick_passthru; + amqp->buffered_output = NULL; + amqp->buffered_input = NULL; + amqp->next = NULL; + + transport->open_sent = false; + transport->open_rcvd = false; + transport->close_sent = false; + transport->close_rcvd = false; + transport->tail_closed = false; + transport->head_closed = false; + transport->remote_container = NULL; + transport->remote_hostname = NULL; + transport->local_max_frame = PN_DEFAULT_MAX_FRAME_SIZE; + transport->remote_max_frame = 0; + transport->channel_max = 0; + transport->remote_channel_max = 0; + transport->local_idle_timeout = 0; + transport->dead_remote_deadline = 0; + transport->last_bytes_input = 0; + transport->remote_idle_timeout = 0; + transport->keepalive_deadline = 0; + transport->last_bytes_output = 0; + transport->remote_offered_capabilities = pn_data(0); + transport->remote_desired_capabilities = pn_data(0); + transport->remote_properties = pn_data(0); + transport->disp_data = pn_data(0); + pn_condition_init(&transport->remote_condition); + pn_condition_init(&transport->condition); + transport->error = pn_error(); + + transport->local_channels = pn_hash(PN_OBJECT, 0, 0.75); + transport->remote_channels = pn_hash(PN_OBJECT, 0, 0.75); + + transport->bytes_input = 0; + transport->bytes_output = 0; + + transport->input_pending = 0; + transport->output_pending = 0; + + transport->done_processing = false; + + transport->posted_head_closed = false; + transport->posted_tail_closed = false; +} + +pn_session_t *pn_channel_state(pn_transport_t *transport, uint16_t channel) +{ + return (pn_session_t *) pn_hash_get(transport->remote_channels, channel); +} + +static void pni_map_remote_channel(pn_session_t *session, uint16_t channel) +{ + pn_transport_t *transport = session->connection->transport; + pn_hash_put(transport->remote_channels, channel, session); + session->state.remote_channel = channel; +} + +void pni_transport_unbind_handles(pn_hash_t *handles, bool reset_state); + +static void pni_unmap_remote_channel(pn_session_t *ssn) +{ + // XXX: should really update link state also + pni_transport_unbind_handles(ssn->state.remote_handles, false); + pn_transport_t *transport = ssn->connection->transport; + uint16_t channel = ssn->state.remote_channel; + ssn->state.remote_channel = -2; + // note: may free the session: + pn_hash_del(transport->remote_channels, channel); +} + + +static void pn_transport_finalize(void *object); +#define pn_transport_hashcode NULL +#define pn_transport_compare NULL +#define pn_transport_inspect NULL + +pn_transport_t *pn_transport() +{ + static const pn_class_t clazz = PN_CLASS(pn_transport); + pn_transport_t *transport = + (pn_transport_t *) pn_class_new(&clazz, sizeof(pn_transport_t)); + if (!transport) return NULL; + + transport->output_buf = (char *) malloc(transport->output_size); + if (!transport->output_buf) { + pn_transport_free(transport); + return NULL; + } + + transport->input_buf = (char *) malloc(transport->input_size); + if (!transport->input_buf) { + pn_transport_free(transport); + return NULL; + } + return transport; +} + +void pn_transport_free(pn_transport_t *transport) +{ + if (!transport) return; + assert(!transport->freed); + transport->freed = true; + // once the application frees the transport, no further I/O + // processing can be done to the connection: + pn_transport_unbind(transport); + pn_decref(transport); +} + +static void pn_transport_finalize(void *object) +{ + pn_transport_t *transport = (pn_transport_t *) object; + + pn_ssl_free(transport->ssl); + pn_sasl_free(transport->sasl); + pn_dispatcher_free(transport->disp); + free(transport->remote_container); + free(transport->remote_hostname); + pn_free(transport->remote_offered_capabilities); + pn_free(transport->remote_desired_capabilities); + pn_free(transport->remote_properties); + pn_free(transport->disp_data); + pn_condition_tini(&transport->remote_condition); + pn_condition_tini(&transport->condition); + pn_error_free(transport->error); + pn_free(transport->local_channels); + pn_free(transport->remote_channels); + if (transport->input_buf) free(transport->input_buf); + if (transport->output_buf) free(transport->output_buf); + pn_free(transport->scratch); +} + +int pn_transport_bind(pn_transport_t *transport, pn_connection_t *connection) +{ + assert(transport); + assert(connection); + + if (transport->connection) return PN_STATE_ERR; + if (connection->transport) return PN_STATE_ERR; + + transport->connection = connection; + connection->transport = transport; + + pn_collector_put(connection->collector, PN_OBJECT, connection, PN_CONNECTION_BOUND); + + pn_incref(connection); + if (transport->open_rcvd) { + PN_SET_REMOTE(connection->endpoint.state, PN_REMOTE_ACTIVE); + pn_collector_put(connection->collector, PN_OBJECT, connection, PN_CONNECTION_REMOTE_OPEN); + transport->disp->halt = false; + transport_consume(transport); // blech - testBindAfterOpen + } + + return 0; +} + +void pni_transport_unbind_handles(pn_hash_t *handles, bool reset_state) +{ + for (pn_handle_t h = pn_hash_head(handles); h; h = pn_hash_next(handles, h)) { + uintptr_t key = pn_hash_key(handles, h); + if (reset_state) { + pn_link_t *link = (pn_link_t *) pn_hash_value(handles, h); + pn_link_unbound(link); + } + pn_hash_del(handles, key); + } +} + +void pni_transport_unbind_channels(pn_hash_t *channels) +{ + for (pn_handle_t h = pn_hash_head(channels); h; h = pn_hash_next(channels, h)) { + uintptr_t key = pn_hash_key(channels, h); + pn_session_t *ssn = (pn_session_t *) pn_hash_value(channels, h); + pni_transport_unbind_handles(ssn->state.local_handles, true); + pni_transport_unbind_handles(ssn->state.remote_handles, true); + pn_session_unbound(ssn); + pn_hash_del(channels, key); + } +} + +int pn_transport_unbind(pn_transport_t *transport) +{ + assert(transport); + if (!transport->connection) return 0; + + + pn_connection_t *conn = transport->connection; + transport->connection = NULL; + + pn_collector_put(conn->collector, PN_OBJECT, conn, PN_CONNECTION_UNBOUND); + + // XXX: what happens if the endpoints are freed before we get here? + pn_session_t *ssn = pn_session_head(conn, 0); + while (ssn) { + pn_delivery_map_clear(&ssn->state.incoming); + pn_delivery_map_clear(&ssn->state.outgoing); + ssn = pn_session_next(ssn, 0); + } + + pn_endpoint_t *endpoint = conn->endpoint_head; + while (endpoint) { + pn_condition_clear(&endpoint->remote_condition); + pn_modified(conn, endpoint, true); + endpoint = endpoint->endpoint_next; + } + + pni_transport_unbind_channels(transport->local_channels); + pni_transport_unbind_channels(transport->remote_channels); + + pn_connection_unbound(conn); + pn_decref(conn); + return 0; +} + +pn_error_t *pn_transport_error(pn_transport_t *transport) +{ + assert(transport); + if (pn_condition_is_set(&transport->condition)) { + pn_error_format(transport->error, PN_ERR, "%s: %s", + pn_condition_get_name(&transport->condition), + pn_condition_get_description(&transport->condition)); + } else { + pn_error_clear(transport->error); + } + return transport->error; +} + +pn_condition_t *pn_transport_condition(pn_transport_t *transport) +{ + assert(transport); + return &transport->condition; +} + +static void pni_map_remote_handle(pn_link_t *link, uint32_t handle) +{ + link->state.remote_handle = handle; + pn_hash_put(link->session->state.remote_handles, handle, link); +} + +static void pni_unmap_remote_handle(pn_link_t *link) +{ + uintptr_t handle = link->state.remote_handle; + link->state.remote_handle = -2; + // may delete link: + pn_hash_del(link->session->state.remote_handles, handle); +} + +pn_link_t *pn_handle_state(pn_session_t *ssn, uint32_t handle) +{ + return (pn_link_t *) pn_hash_get(ssn->state.remote_handles, handle); +} + +bool pni_disposition_batchable(pn_disposition_t *disposition) +{ + switch (disposition->type) { + case PN_ACCEPTED: + return true; + case PN_RELEASED: + return true; + default: + return false; + } +} + +void pni_disposition_encode(pn_disposition_t *disposition, pn_data_t *data) +{ + pn_condition_t *cond = &disposition->condition; + switch (disposition->type) { + case PN_RECEIVED: + pn_data_put_list(data); + pn_data_enter(data); + pn_data_put_uint(data, disposition->section_number); + pn_data_put_ulong(data, disposition->section_offset); + pn_data_exit(data); + break; + case PN_ACCEPTED: + case PN_RELEASED: + return; + case PN_REJECTED: + pn_data_fill(data, "[?DL[sSC]]", pn_condition_is_set(cond), ERROR, + pn_condition_get_name(cond), + pn_condition_get_description(cond), + pn_condition_info(cond)); + break; + case PN_MODIFIED: + pn_data_fill(data, "[ooC]", + disposition->failed, + disposition->undeliverable, + disposition->annotations); + break; + default: + pn_data_copy(data, disposition->data); + break; + } +} + +int pn_post_close(pn_transport_t *transport, const char *condition, const char *description) +{ + pn_condition_t *cond = NULL; + if (transport->connection) { + cond = pn_connection_condition(transport->connection); + } + pn_data_t *info = NULL; + if (!condition && pn_condition_is_set(cond)) { + condition = pn_condition_get_name(cond); + description = pn_condition_get_description(cond); + info = pn_condition_info(cond); + } + + return pn_post_frame(transport->disp, 0, "DL[?DL[sSC]]", CLOSE, + (bool) condition, ERROR, condition, description, info); +} + +static pn_collector_t *pni_transport_collector(pn_transport_t *transport) +{ + if (transport->connection && transport->connection->collector) { + return transport->connection->collector; + } else { + return NULL; + } +} + +int pn_do_error(pn_transport_t *transport, const char *condition, const char *fmt, ...) +{ + va_list ap; + va_start(ap, fmt); + char buf[1024]; + // XXX: result + vsnprintf(buf, 1024, fmt, ap); + va_end(ap); + if (!transport->close_sent) { + if (!transport->open_sent) { + pn_post_frame(transport->disp, 0, "DL[S]", OPEN, ""); + } + + pn_post_close(transport, condition, buf); + transport->close_sent = true; + } + transport->disp->halt = true; + pn_condition_set_name(&transport->condition, condition); + pn_condition_set_description(&transport->condition, buf); + pn_collector_t *collector = pni_transport_collector(transport); + pn_collector_put(collector, PN_OBJECT, transport, PN_TRANSPORT_ERROR); + pn_transport_logf(transport, "ERROR %s %s", condition, buf); + return PN_ERR; +} + +static char *pn_bytes_strdup(pn_bytes_t str) +{ + return pn_strndup(str.start, str.size); +} + +int pn_do_open(pn_dispatcher_t *disp) +{ + pn_transport_t *transport = disp->transport; + pn_connection_t *conn = transport->connection; + bool container_q, hostname_q; + pn_bytes_t remote_container, remote_hostname; + pn_data_clear(transport->remote_offered_capabilities); + pn_data_clear(transport->remote_desired_capabilities); + pn_data_clear(transport->remote_properties); + int err = pn_scan_args(disp, "D.[?S?SIHI..CCC]", &container_q, + &remote_container, &hostname_q, &remote_hostname, + &transport->remote_max_frame, + &transport->remote_channel_max, + &transport->remote_idle_timeout, + transport->remote_offered_capabilities, + transport->remote_desired_capabilities, + transport->remote_properties); + if (err) return err; + if (transport->remote_max_frame > 0) { + if (transport->remote_max_frame < AMQP_MIN_MAX_FRAME_SIZE) { + pn_transport_logf(transport, "Peer advertised bad max-frame (%u), forcing to %u", + transport->remote_max_frame, AMQP_MIN_MAX_FRAME_SIZE); + transport->remote_max_frame = AMQP_MIN_MAX_FRAME_SIZE; + } + disp->remote_max_frame = transport->remote_max_frame; + pn_buffer_clear( disp->frame ); + } + if (container_q) { + transport->remote_container = pn_bytes_strdup(remote_container); + } else { + transport->remote_container = NULL; + } + if (hostname_q) { + transport->remote_hostname = pn_bytes_strdup(remote_hostname); + } else { + transport->remote_hostname = NULL; + } + + if (conn) { + PN_SET_REMOTE(conn->endpoint.state, PN_REMOTE_ACTIVE); + pn_collector_put(conn->collector, PN_OBJECT, conn, PN_CONNECTION_REMOTE_OPEN); + } else { + transport->disp->halt = true; + } + if (transport->remote_idle_timeout) + transport->io_layers[PN_IO_AMQP].process_tick = pn_tick_amqp; // enable timeouts + transport->open_rcvd = true; + return 0; +} + +int pn_do_begin(pn_dispatcher_t *disp) +{ + pn_transport_t *transport = disp->transport; + bool reply; + uint16_t remote_channel; + pn_sequence_t next; + int err = pn_scan_args(disp, "D.[?HI]", &reply, &remote_channel, &next); + if (err) return err; + + pn_session_t *ssn; + if (reply) { + // XXX: what if session is NULL? + ssn = (pn_session_t *) pn_hash_get(transport->local_channels, remote_channel); + } else { + ssn = pn_session(transport->connection); + } + ssn->state.incoming_transfer_count = next; + pni_map_remote_channel(ssn, disp->channel); + PN_SET_REMOTE(ssn->endpoint.state, PN_REMOTE_ACTIVE); + pn_collector_put(transport->connection->collector, PN_OBJECT, ssn, PN_SESSION_REMOTE_OPEN); + return 0; +} + +pn_link_t *pn_find_link(pn_session_t *ssn, pn_bytes_t name, bool is_sender) +{ + pn_endpoint_type_t type = is_sender ? SENDER : RECEIVER; + + for (size_t i = 0; i < pn_list_size(ssn->links); i++) + { + pn_link_t *link = (pn_link_t *) pn_list_get(ssn->links, i); + if (link->endpoint.type == type && + !strncmp(name.start, pn_string_get(link->name), name.size)) + { + return link; + } + } + return NULL; +} + +static pn_expiry_policy_t symbol2policy(pn_bytes_t symbol) +{ + if (!symbol.start) + return PN_EXPIRE_WITH_SESSION; + + if (!strncmp(symbol.start, "link-detach", symbol.size)) + return PN_EXPIRE_WITH_LINK; + if (!strncmp(symbol.start, "session-end", symbol.size)) + return PN_EXPIRE_WITH_SESSION; + if (!strncmp(symbol.start, "connection-close", symbol.size)) + return PN_EXPIRE_WITH_CONNECTION; + if (!strncmp(symbol.start, "never", symbol.size)) + return PN_EXPIRE_NEVER; + + return PN_EXPIRE_WITH_SESSION; +} + +static pn_distribution_mode_t symbol2dist_mode(const pn_bytes_t symbol) +{ + if (!symbol.start) + return PN_DIST_MODE_UNSPECIFIED; + + if (!strncmp(symbol.start, "move", symbol.size)) + return PN_DIST_MODE_MOVE; + if (!strncmp(symbol.start, "copy", symbol.size)) + return PN_DIST_MODE_COPY; + + return PN_DIST_MODE_UNSPECIFIED; +} + +static const char *dist_mode2symbol(const pn_distribution_mode_t mode) +{ + switch (mode) + { + case PN_DIST_MODE_COPY: + return "copy"; + case PN_DIST_MODE_MOVE: + return "move"; + default: + return NULL; + } +} + +int pn_terminus_set_address_bytes(pn_terminus_t *terminus, pn_bytes_t address) +{ + assert(terminus); + return pn_string_setn(terminus->address, address.start, address.size); +} + +int pn_do_attach(pn_dispatcher_t *disp) +{ + pn_transport_t *transport = disp->transport; + pn_bytes_t name; + uint32_t handle; + bool is_sender; + pn_bytes_t source, target; + pn_durability_t src_dr, tgt_dr; + pn_bytes_t src_exp, tgt_exp; + pn_seconds_t src_timeout, tgt_timeout; + bool src_dynamic, tgt_dynamic; + pn_sequence_t idc; + pn_bytes_t dist_mode; + bool snd_settle, rcv_settle; + uint8_t snd_settle_mode, rcv_settle_mode; + int err = pn_scan_args(disp, "D.[SIo?B?BD.[SIsIo.s]D.[SIsIo]..I]", &name, &handle, + &is_sender, + &snd_settle, &snd_settle_mode, + &rcv_settle, &rcv_settle_mode, + &source, &src_dr, &src_exp, &src_timeout, &src_dynamic, &dist_mode, + &target, &tgt_dr, &tgt_exp, &tgt_timeout, &tgt_dynamic, + &idc); + if (err) return err; + char strbuf[128]; // avoid malloc for most link names + char *strheap = (name.size >= sizeof(strbuf)) ? (char *) malloc(name.size + 1) : NULL; + char *strname = strheap ? strheap : strbuf; + strncpy(strname, name.start, name.size); + strname[name.size] = '\0'; + + pn_session_t *ssn = pn_channel_state(transport, disp->channel); + if (!ssn) { + pn_do_error(transport, "amqp:connection:no-session", "attach without a session"); + return PN_EOS; + } + pn_link_t *link = pn_find_link(ssn, name, is_sender); + if (!link) { + if (is_sender) { + link = (pn_link_t *) pn_sender(ssn, strname); + } else { + link = (pn_link_t *) pn_receiver(ssn, strname); + } + } + + if (strheap) { + free(strheap); + } + + pni_map_remote_handle(link, handle); + PN_SET_REMOTE(link->endpoint.state, PN_REMOTE_ACTIVE); + pn_terminus_t *rsrc = &link->remote_source; + if (source.start || src_dynamic) { + pn_terminus_set_type(rsrc, PN_SOURCE); + pn_terminus_set_address_bytes(rsrc, source); + pn_terminus_set_durability(rsrc, src_dr); + pn_terminus_set_expiry_policy(rsrc, symbol2policy(src_exp)); + pn_terminus_set_timeout(rsrc, src_timeout); + pn_terminus_set_dynamic(rsrc, src_dynamic); + pn_terminus_set_distribution_mode(rsrc, symbol2dist_mode(dist_mode)); + } else { + pn_terminus_set_type(rsrc, PN_UNSPECIFIED); + } + pn_terminus_t *rtgt = &link->remote_target; + if (target.start || tgt_dynamic) { + pn_terminus_set_type(rtgt, PN_TARGET); + pn_terminus_set_address_bytes(rtgt, target); + pn_terminus_set_durability(rtgt, tgt_dr); + pn_terminus_set_expiry_policy(rtgt, symbol2policy(tgt_exp)); + pn_terminus_set_timeout(rtgt, tgt_timeout); + pn_terminus_set_dynamic(rtgt, tgt_dynamic); + } else { + pn_terminus_set_type(rtgt, PN_UNSPECIFIED); + } + + if (snd_settle) + link->remote_snd_settle_mode = snd_settle_mode; + if (rcv_settle) + link->remote_rcv_settle_mode = rcv_settle_mode; + + pn_data_clear(link->remote_source.properties); + pn_data_clear(link->remote_source.filter); + pn_data_clear(link->remote_source.outcomes); + pn_data_clear(link->remote_source.capabilities); + pn_data_clear(link->remote_target.properties); + pn_data_clear(link->remote_target.capabilities); + + err = pn_scan_args(disp, "D.[.....D.[.....C.C.CC]D.[.....CC]", + link->remote_source.properties, + link->remote_source.filter, + link->remote_source.outcomes, + link->remote_source.capabilities, + link->remote_target.properties, + link->remote_target.capabilities); + if (err) return err; + + pn_data_rewind(link->remote_source.properties); + pn_data_rewind(link->remote_source.filter); + pn_data_rewind(link->remote_source.outcomes); + pn_data_rewind(link->remote_source.capabilities); + pn_data_rewind(link->remote_target.properties); + pn_data_rewind(link->remote_target.capabilities); + + if (!is_sender) { + link->state.delivery_count = idc; + } + + pn_collector_put(transport->connection->collector, PN_OBJECT, link, PN_LINK_REMOTE_OPEN); + return 0; +} + +int pn_post_flow(pn_transport_t *transport, pn_session_t *ssn, pn_link_t *link); + +// free the delivery +static void pn_full_settle(pn_delivery_map_t *db, pn_delivery_t *delivery) +{ + assert(!delivery->work); + pn_clear_tpwork(delivery); + pn_real_settle(delivery); +} + +int pn_do_transfer(pn_dispatcher_t *disp) +{ + // XXX: multi transfer + pn_transport_t *transport = disp->transport; + uint32_t handle; + pn_bytes_t tag; + bool id_present; + pn_sequence_t id; + bool settled; + bool more; + int err = pn_scan_args(disp, "D.[I?Iz.oo]", &handle, &id_present, &id, &tag, + &settled, &more); + if (err) return err; + pn_session_t *ssn = pn_channel_state(transport, disp->channel); + + if (!ssn->state.incoming_window) { + return pn_do_error(transport, "amqp:session:window-violation", "incoming session window exceeded"); + } + + pn_link_t *link = pn_handle_state(ssn, handle); + pn_delivery_t *delivery; + if (link->unsettled_tail && !link->unsettled_tail->done) { + delivery = link->unsettled_tail; + } else { + pn_delivery_map_t *incoming = &ssn->state.incoming; + + if (!ssn->state.incoming_init) { + incoming->next = id; + ssn->state.incoming_init = true; + ssn->incoming_deliveries++; + } + + delivery = pn_delivery(link, pn_dtag(tag.start, tag.size)); + pn_delivery_state_t *state = pn_delivery_map_push(incoming, delivery); + if (id_present && id != state->id) { + return pn_do_error(transport, "amqp:session:invalid-field", + "sequencing error, expected delivery-id %u, got %u", + state->id, id); + } + + link->state.delivery_count++; + link->state.link_credit--; + link->queued++; + + // XXX: need to fill in remote state: delivery->remote.state = ...; + delivery->remote.settled = settled; + if (settled) { + delivery->updated = true; + pn_work_update(transport->connection, delivery); + } + } + + pn_buffer_append(delivery->bytes, disp->payload, disp->size); + ssn->incoming_bytes += disp->size; + delivery->done = !more; + + ssn->state.incoming_transfer_count++; + ssn->state.incoming_window--; + + // XXX: need better policy for when to refresh window + if (!ssn->state.incoming_window && (int32_t) link->state.local_handle >= 0) { + pn_post_flow(transport, ssn, link); + } + + pn_collector_put(transport->connection->collector, PN_OBJECT, delivery, PN_DELIVERY); + return 0; +} + +int pn_do_flow(pn_dispatcher_t *disp) +{ + pn_transport_t *transport = disp->transport; + pn_sequence_t onext, inext, delivery_count; + uint32_t iwin, owin, link_credit; + uint32_t handle; + bool inext_init, handle_init, dcount_init, drain; + int err = pn_scan_args(disp, "D.[?IIII?I?II.o]", &inext_init, &inext, &iwin, + &onext, &owin, &handle_init, &handle, &dcount_init, + &delivery_count, &link_credit, &drain); + if (err) return err; + + pn_session_t *ssn = pn_channel_state(transport, disp->channel); + + if (inext_init) { + ssn->state.remote_incoming_window = inext + iwin - ssn->state.outgoing_transfer_count; + } else { + ssn->state.remote_incoming_window = iwin; + } + + if (handle_init) { + pn_link_t *link = pn_handle_state(ssn, handle); + if (link->endpoint.type == SENDER) { + pn_sequence_t receiver_count; + if (dcount_init) { + receiver_count = delivery_count; + } else { + // our initial delivery count + receiver_count = 0; + } + pn_sequence_t old = link->state.link_credit; + link->state.link_credit = receiver_count + link_credit - link->state.delivery_count; + link->credit += link->state.link_credit - old; + link->drain = drain; + pn_delivery_t *delivery = pn_link_current(link); + if (delivery) pn_work_update(transport->connection, delivery); + } else { + pn_sequence_t delta = delivery_count - link->state.delivery_count; + if (delta > 0) { + link->state.delivery_count += delta; + link->state.link_credit -= delta; + link->credit -= delta; + link->drained += delta; + } + } + + pn_collector_put(transport->connection->collector, PN_OBJECT, link, PN_LINK_FLOW); + } + + return 0; +} + +#define SCAN_ERROR_DEFAULT ("D.[D.[sSC]") +#define SCAN_ERROR_DETACH ("D.[..D.[sSC]") +#define SCAN_ERROR_DISP ("[D.[sSC]") + +static int pn_scan_error(pn_data_t *data, pn_condition_t *condition, const char *fmt) +{ + pn_bytes_t cond; + pn_bytes_t desc; + pn_condition_clear(condition); + int err = pn_data_scan(data, fmt, &cond, &desc, condition->info); + if (err) return err; + pn_string_setn(condition->name, cond.start, cond.size); + pn_string_setn(condition->description, desc.start, desc.size); + pn_data_rewind(condition->info); + return 0; +} + +int pn_do_disposition(pn_dispatcher_t *disp) +{ + pn_transport_t *transport = disp->transport; + bool role; + pn_sequence_t first, last; + uint64_t type = 0; + bool last_init, settled, type_init; + pn_data_clear(transport->disp_data); + int err = pn_scan_args(disp, "D.[oI?IoD?LC]", &role, &first, &last_init, + &last, &settled, &type_init, &type, + transport->disp_data); + if (err) return err; + if (!last_init) last = first; + + pn_session_t *ssn = pn_channel_state(transport, disp->channel); + pn_delivery_map_t *deliveries; + if (role) { + deliveries = &ssn->state.outgoing; + } else { + deliveries = &ssn->state.incoming; + } + + pn_data_rewind(transport->disp_data); + bool remote_data = (pn_data_next(transport->disp_data) && + pn_data_get_list(transport->disp_data) > 0); + + for (pn_sequence_t id = first; id <= last; id++) { + pn_delivery_t *delivery = pn_delivery_map_get(deliveries, id); + pn_disposition_t *remote = &delivery->remote; + if (delivery) { + if (type_init) remote->type = type; + if (remote_data) { + switch (type) { + case PN_RECEIVED: + pn_data_rewind(transport->disp_data); + pn_data_next(transport->disp_data); + pn_data_enter(transport->disp_data); + if (pn_data_next(transport->disp_data)) + remote->section_number = pn_data_get_uint(transport->disp_data); + if (pn_data_next(transport->disp_data)) + remote->section_offset = pn_data_get_ulong(transport->disp_data); + break; + case PN_ACCEPTED: + break; + case PN_REJECTED: + err = pn_scan_error(transport->disp_data, &remote->condition, SCAN_ERROR_DISP); + if (err) return err; + break; + case PN_RELEASED: + break; + case PN_MODIFIED: + pn_data_rewind(transport->disp_data); + pn_data_next(transport->disp_data); + pn_data_enter(transport->disp_data); + if (pn_data_next(transport->disp_data)) + remote->failed = pn_data_get_bool(transport->disp_data); + if (pn_data_next(transport->disp_data)) + remote->undeliverable = pn_data_get_bool(transport->disp_data); + pn_data_narrow(transport->disp_data); + pn_data_clear(remote->data); + pn_data_appendn(remote->annotations, transport->disp_data, 1); + pn_data_widen(transport->disp_data); + break; + default: + pn_data_copy(remote->data, transport->disp_data); + break; + } + } + remote->settled = settled; + delivery->updated = true; + pn_work_update(transport->connection, delivery); + + pn_collector_put(transport->connection->collector, PN_OBJECT, delivery, PN_DELIVERY); + } + } + + return 0; +} + +int pn_do_detach(pn_dispatcher_t *disp) +{ + pn_transport_t *transport = disp->transport; + uint32_t handle; + bool closed; + int err = pn_scan_args(disp, "D.[Io]", &handle, &closed); + if (err) return err; + + pn_session_t *ssn = pn_channel_state(transport, disp->channel); + if (!ssn) { + return pn_do_error(transport, "amqp:invalid-field", "no such channel: %u", disp->channel); + } + pn_link_t *link = pn_handle_state(ssn, handle); + if (!link) { + return pn_do_error(transport, "amqp:invalid-field", "no such handle: %u", handle); + } + + err = pn_scan_error(disp->args, &link->endpoint.remote_condition, SCAN_ERROR_DETACH); + if (err) return err; + + if (closed) + { + PN_SET_REMOTE(link->endpoint.state, PN_REMOTE_CLOSED); + pn_collector_put(transport->connection->collector, PN_OBJECT, link, PN_LINK_REMOTE_CLOSE); + } else { + pn_collector_put(transport->connection->collector, PN_OBJECT, link, PN_LINK_REMOTE_DETACH); + } + + pni_unmap_remote_handle(link); + return 0; +} + +int pn_do_end(pn_dispatcher_t *disp) +{ + pn_transport_t *transport = disp->transport; + pn_session_t *ssn = pn_channel_state(transport, disp->channel); + int err = pn_scan_error(disp->args, &ssn->endpoint.remote_condition, SCAN_ERROR_DEFAULT); + if (err) return err; + PN_SET_REMOTE(ssn->endpoint.state, PN_REMOTE_CLOSED); + pn_collector_put(transport->connection->collector, PN_OBJECT, ssn, PN_SESSION_REMOTE_CLOSE); + pni_unmap_remote_channel(ssn); + return 0; +} + +int pn_do_close(pn_dispatcher_t *disp) +{ + pn_transport_t *transport = disp->transport; + pn_connection_t *conn = transport->connection; + int err = pn_scan_error(disp->args, &transport->remote_condition, SCAN_ERROR_DEFAULT); + if (err) return err; + transport->close_rcvd = true; + PN_SET_REMOTE(conn->endpoint.state, PN_REMOTE_CLOSED); + pn_collector_put(transport->connection->collector, PN_OBJECT, conn, PN_CONNECTION_REMOTE_CLOSE); + return 0; +} + +// deprecated +ssize_t pn_transport_input(pn_transport_t *transport, const char *bytes, size_t available) +{ + if (!transport) return PN_ARG_ERR; + if (available == 0) { + return pn_transport_close_tail(transport); + } + const size_t original = available; + ssize_t capacity = pn_transport_capacity(transport); + if (capacity < 0) return capacity; + while (available && capacity) { + char *dest = pn_transport_tail(transport); + assert(dest); + size_t count = pn_min( (size_t)capacity, available ); + memmove( dest, bytes, count ); + available -= count; + bytes += count; + int rc = pn_transport_process( transport, count ); + if (rc < 0) return rc; + capacity = pn_transport_capacity(transport); + if (capacity < 0) return capacity; + } + + return original - available; +} + +static void pni_maybe_post_closed(pn_transport_t *transport) +{ + pn_collector_t *collector = pni_transport_collector(transport); + if (transport->posted_head_closed && transport->posted_tail_closed) { + pn_collector_put(collector, PN_OBJECT, transport, PN_TRANSPORT_CLOSED); + } +} + +// process pending input until none remaining or EOS +static ssize_t transport_consume(pn_transport_t *transport) +{ + pn_io_layer_t *io_layer = transport->io_layers; + size_t consumed = 0; + + while (transport->input_pending || transport->tail_closed) { + ssize_t n; + n = io_layer->process_input( io_layer, + transport->input_buf + consumed, + transport->input_pending ); + if (n > 0) { + consumed += n; + transport->input_pending -= n; + } else if (n == 0) { + break; + } else { + assert(n == PN_EOS); + if (transport->disp->trace & (PN_TRACE_RAW | PN_TRACE_FRM)) + pn_transport_log(transport, " <- EOS"); + transport->input_pending = 0; // XXX ??? + if (!transport->posted_tail_closed) { + pn_collector_t *collector = pni_transport_collector(transport); + pn_collector_put(collector, PN_OBJECT, transport, PN_TRANSPORT_TAIL_CLOSED); + transport->posted_tail_closed = true; + pni_maybe_post_closed(transport); + } + return n; + } + } + + if (transport->input_pending && consumed) { + memmove( transport->input_buf, &transport->input_buf[consumed], transport->input_pending ); + } + + return consumed; +} + +static ssize_t pn_input_read_header(pn_transport_t *transport, const char *bytes, size_t available, + const char *header, size_t size, const char *protocol, + ssize_t (*next)(pn_io_layer_t *, const char *, size_t)) +{ + const char *point = header + transport->header_count; + int delta = pn_min(available, size - transport->header_count); + if (!available || memcmp(bytes, point, delta)) { + char quoted[1024]; + pn_quote_data(quoted, 1024, bytes, available); + pn_do_error(transport, "amqp:connection:framing-error", + "%s header mismatch: '%s'%s", protocol, quoted, + available ? "" : " (connection aborted)"); + return PN_EOS; + } else { + transport->header_count += delta; + if (transport->header_count == size) { + transport->header_count = 0; + transport->io_layers[PN_IO_AMQP].process_input = next; + + if (transport->disp->trace & PN_TRACE_FRM) + pn_transport_logf(transport, " <- %s", protocol); + } + return delta; + } +} + +#define AMQP_HEADER ("AMQP\x00\x01\x00\x00") + +static ssize_t pn_input_read_amqp_header(pn_io_layer_t *io_layer, const char *bytes, size_t available) +{ + pn_transport_t *transport = (pn_transport_t *)io_layer->context; + return pn_input_read_header(transport, bytes, available, AMQP_HEADER, 8, + "AMQP", pn_input_read_amqp); +} + +static ssize_t pn_input_read_amqp(pn_io_layer_t *io_layer, const char *bytes, size_t available) +{ + pn_transport_t *transport = (pn_transport_t *)io_layer->context; + if (transport->close_rcvd) { + if (available > 0) { + pn_do_error(transport, "amqp:connection:framing-error", "data after close"); + return PN_EOS; + } + } + + if (!available) { + pn_do_error(transport, "amqp:connection:framing-error", "connection aborted"); + return PN_EOS; + } + + + ssize_t n = pn_dispatcher_input(transport->disp, bytes, available); + if (n < 0) { + //return pn_error_set(transport->error, n, "dispatch error"); + return PN_EOS; + } else if (transport->close_rcvd) { + return PN_EOS; + } else { + return n; + } +} + +/* process AMQP related timer events */ +static pn_timestamp_t pn_tick_amqp(pn_io_layer_t *io_layer, pn_timestamp_t now) +{ + pn_timestamp_t timeout = 0; + pn_transport_t *transport = (pn_transport_t *)io_layer->context; + + if (transport->local_idle_timeout) { + if (transport->dead_remote_deadline == 0 || + transport->last_bytes_input != transport->bytes_input) { + transport->dead_remote_deadline = now + transport->local_idle_timeout; + transport->last_bytes_input = transport->bytes_input; + } else if (transport->dead_remote_deadline <= now) { + transport->dead_remote_deadline = now + transport->local_idle_timeout; + // Note: AMQP-1.0 really should define a generic "timeout" error, but does not. + pn_do_error(transport, "amqp:resource-limit-exceeded", "local-idle-timeout expired"); + } + timeout = transport->dead_remote_deadline; + } + + // Prevent remote idle timeout as describe by AMQP 1.0: + if (transport->remote_idle_timeout && !transport->close_sent) { + if (transport->keepalive_deadline == 0 || + transport->last_bytes_output != transport->bytes_output) { + transport->keepalive_deadline = now + (pn_timestamp_t)(transport->remote_idle_timeout/2.0); + transport->last_bytes_output = transport->bytes_output; + } else if (transport->keepalive_deadline <= now) { + transport->keepalive_deadline = now + (pn_timestamp_t)(transport->remote_idle_timeout/2.0); + if (transport->disp->available == 0) { // no outbound data pending + // so send empty frame (and account for it!) + pn_post_frame(transport->disp, 0, ""); + transport->last_bytes_output += transport->disp->available; + } + } + timeout = pn_timestamp_min( timeout, transport->keepalive_deadline ); + } + + return timeout; +} + +int pn_process_conn_setup(pn_transport_t *transport, pn_endpoint_t *endpoint) +{ + if (endpoint->type == CONNECTION) + { + if (!(endpoint->state & PN_LOCAL_UNINIT) && !transport->open_sent) + { + // as per the recommendation in the spec, advertise half our + // actual timeout to the remote + const pn_millis_t idle_timeout = transport->local_idle_timeout + ? (transport->local_idle_timeout/2) + : 0; + pn_connection_t *connection = (pn_connection_t *) endpoint; + const char *cid = pn_string_get(connection->container); + int err = pn_post_frame(transport->disp, 0, "DL[SS?I?H?InnCCC]", OPEN, + cid ? cid : "", + pn_string_get(connection->hostname), + // if not zero, advertise our max frame size and idle timeout + (bool)transport->local_max_frame, transport->local_max_frame, + (bool)transport->channel_max, transport->channel_max, + (bool)idle_timeout, idle_timeout, + connection->offered_capabilities, + connection->desired_capabilities, + connection->properties); + if (err) return err; + transport->open_sent = true; + } + } + + return 0; +} + +static uint16_t allocate_alias(pn_hash_t *aliases) +{ + for (uint32_t i = 0; i < 65536; i++) { + if (!pn_hash_get(aliases, i)) { + return i; + } + } + + assert(false); + return 0; +} + +size_t pn_session_outgoing_window(pn_session_t *ssn) +{ + uint32_t size = ssn->connection->transport->remote_max_frame; + if (!size) { + return ssn->outgoing_deliveries; + } else { + pn_sequence_t frames = ssn->outgoing_bytes/size; + if (ssn->outgoing_bytes % size) { + frames++; + } + return pn_max(frames, ssn->outgoing_deliveries); + } +} + +size_t pn_session_incoming_window(pn_session_t *ssn) +{ + uint32_t size = ssn->connection->transport->local_max_frame; + if (!size) { + return 2147483647; // biggest legal value + } else { + return (ssn->incoming_capacity - ssn->incoming_bytes)/size; + } +} + +static void pni_map_local_channel(pn_session_t *ssn) +{ + pn_transport_t *transport = ssn->connection->transport; + pn_session_state_t *state = &ssn->state; + uint16_t channel = allocate_alias(transport->local_channels); + state->local_channel = channel; + pn_hash_put(transport->local_channels, channel, ssn); +} + +int pn_process_ssn_setup(pn_transport_t *transport, pn_endpoint_t *endpoint) +{ + if (endpoint->type == SESSION && transport->open_sent) + { + pn_session_t *ssn = (pn_session_t *) endpoint; + pn_session_state_t *state = &ssn->state; + if (!(endpoint->state & PN_LOCAL_UNINIT) && state->local_channel == (uint16_t) -1) + { + pni_map_local_channel(ssn); + state->incoming_window = pn_session_incoming_window(ssn); + state->outgoing_window = pn_session_outgoing_window(ssn); + pn_post_frame(transport->disp, state->local_channel, "DL[?HIII]", BEGIN, + ((int16_t) state->remote_channel >= 0), state->remote_channel, + state->outgoing_transfer_count, + state->incoming_window, + state->outgoing_window); + } + } + + return 0; +} + +static const char *expiry_symbol(pn_expiry_policy_t policy) +{ + switch (policy) + { + case PN_EXPIRE_WITH_LINK: + return "link-detach"; + case PN_EXPIRE_WITH_SESSION: + return NULL; + case PN_EXPIRE_WITH_CONNECTION: + return "connection-close"; + case PN_EXPIRE_NEVER: + return "never"; + } + return NULL; +} + +static void pni_map_local_handle(pn_link_t *link) { + pn_link_state_t *state = &link->state; + pn_session_state_t *ssn_state = &link->session->state; + state->local_handle = allocate_alias(ssn_state->local_handles); + pn_hash_put(ssn_state->local_handles, state->local_handle, link); +} + +int pn_process_link_setup(pn_transport_t *transport, pn_endpoint_t *endpoint) +{ + if (transport->open_sent && (endpoint->type == SENDER || + endpoint->type == RECEIVER)) + { + pn_link_t *link = (pn_link_t *) endpoint; + pn_session_state_t *ssn_state = &link->session->state; + pn_link_state_t *state = &link->state; + if (((int16_t) ssn_state->local_channel >= 0) && + !(endpoint->state & PN_LOCAL_UNINIT) && state->local_handle == (uint32_t) -1) + { + pni_map_local_handle(link); + const pn_distribution_mode_t dist_mode = link->source.distribution_mode; + int err = pn_post_frame(transport->disp, ssn_state->local_channel, + "DL[SIoBB?DL[SIsIoC?sCnCC]?DL[SIsIoCC]nnI]", ATTACH, + pn_string_get(link->name), + state->local_handle, + endpoint->type == RECEIVER, + link->snd_settle_mode, + link->rcv_settle_mode, + (bool) link->source.type, SOURCE, + pn_string_get(link->source.address), + link->source.durability, + expiry_symbol(link->source.expiry_policy), + link->source.timeout, + link->source.dynamic, + link->source.properties, + (dist_mode != PN_DIST_MODE_UNSPECIFIED), dist_mode2symbol(dist_mode), + link->source.filter, + link->source.outcomes, + link->source.capabilities, + (bool) link->target.type, TARGET, + pn_string_get(link->target.address), + link->target.durability, + expiry_symbol(link->target.expiry_policy), + link->target.timeout, + link->target.dynamic, + link->target.properties, + link->target.capabilities, + 0); + if (err) return err; + } + } + + return 0; +} + +int pn_post_flow(pn_transport_t *transport, pn_session_t *ssn, pn_link_t *link) +{ + ssn->state.incoming_window = pn_session_incoming_window(ssn); + ssn->state.outgoing_window = pn_session_outgoing_window(ssn); + bool linkq = (bool) link; + pn_link_state_t *state = &link->state; + return pn_post_frame(transport->disp, ssn->state.local_channel, "DL[?IIII?I?I?In?o]", FLOW, + (int16_t) ssn->state.remote_channel >= 0, ssn->state.incoming_transfer_count, + ssn->state.incoming_window, + ssn->state.outgoing_transfer_count, + ssn->state.outgoing_window, + linkq, linkq ? state->local_handle : 0, + linkq, linkq ? state->delivery_count : 0, + linkq, linkq ? state->link_credit : 0, + linkq, linkq ? link->drain : false); +} + +int pn_process_flow_receiver(pn_transport_t *transport, pn_endpoint_t *endpoint) +{ + if (endpoint->type == RECEIVER && endpoint->state & PN_LOCAL_ACTIVE) + { + pn_link_t *rcv = (pn_link_t *) endpoint; + pn_session_t *ssn = rcv->session; + pn_link_state_t *state = &rcv->state; + if ((int16_t) ssn->state.local_channel >= 0 && + (int32_t) state->local_handle >= 0 && + ((rcv->drain || state->link_credit != rcv->credit - rcv->queued) || !ssn->state.incoming_window)) { + state->link_credit = rcv->credit - rcv->queued; + return pn_post_flow(transport, ssn, rcv); + } + } + + return 0; +} + +int pn_flush_disp(pn_transport_t *transport, pn_session_t *ssn) +{ + uint64_t code = ssn->state.disp_code; + bool settled = ssn->state.disp_settled; + if (ssn->state.disp) { + int err = pn_post_frame(transport->disp, ssn->state.local_channel, "DL[oIIo?DL[]]", DISPOSITION, + ssn->state.disp_type, ssn->state.disp_first, ssn->state.disp_last, + settled, (bool)code, code); + if (err) return err; + ssn->state.disp_type = 0; + ssn->state.disp_code = 0; + ssn->state.disp_settled = 0; + ssn->state.disp_first = 0; + ssn->state.disp_last = 0; + ssn->state.disp = false; + } + return 0; +} + +int pn_post_disp(pn_transport_t *transport, pn_delivery_t *delivery) +{ + pn_link_t *link = delivery->link; + pn_session_t *ssn = link->session; + pn_session_state_t *ssn_state = &ssn->state; + pn_modified(transport->connection, &link->session->endpoint, false); + pn_delivery_state_t *state = &delivery->state; + assert(state->init); + bool role = (link->endpoint.type == RECEIVER); + uint64_t code = delivery->local.type; + + if (!code && !delivery->local.settled) { + return 0; + } + + if (!pni_disposition_batchable(&delivery->local)) { + pn_data_clear(transport->disp_data); + pni_disposition_encode(&delivery->local, transport->disp_data); + return pn_post_frame(transport->disp, ssn->state.local_channel, + "DL[oIIo?DLC]", DISPOSITION, + role, state->id, state->id, delivery->local.settled, + (bool)code, code, transport->disp_data); + } + + if (ssn_state->disp && code == ssn_state->disp_code && + delivery->local.settled == ssn_state->disp_settled && + ssn_state->disp_type == role) { + if (state->id == ssn_state->disp_first - 1) { + ssn_state->disp_first = state->id; + return 0; + } else if (state->id == ssn_state->disp_last + 1) { + ssn_state->disp_last = state->id; + return 0; + } + } + + if (ssn_state->disp) { + int err = pn_flush_disp(transport, ssn); + if (err) return err; + } + + ssn_state->disp_type = role; + ssn_state->disp_code = code; + ssn_state->disp_settled = delivery->local.settled; + ssn_state->disp_first = state->id; + ssn_state->disp_last = state->id; + ssn_state->disp = true; + + return 0; +} + +int pn_process_tpwork_sender(pn_transport_t *transport, pn_delivery_t *delivery, bool *settle) +{ + *settle = false; + pn_link_t *link = delivery->link; + pn_session_state_t *ssn_state = &link->session->state; + pn_link_state_t *link_state = &link->state; + bool xfr_posted = false; + if ((int16_t) ssn_state->local_channel >= 0 && (int32_t) link_state->local_handle >= 0) { + pn_delivery_state_t *state = &delivery->state; + if (!state->sent && (delivery->done || pn_buffer_size(delivery->bytes) > 0) && + ssn_state->remote_incoming_window > 0 && link_state->link_credit > 0) { + if (!state->init) { + state = pn_delivery_map_push(&ssn_state->outgoing, delivery); + } + + pn_bytes_t bytes = pn_buffer_bytes(delivery->bytes); + pn_set_payload(transport->disp, bytes.start, bytes.size); + pn_bytes_t tag = pn_buffer_bytes(delivery->tag); + int count = pn_post_transfer_frame(transport->disp, + ssn_state->local_channel, + link_state->local_handle, + state->id, &tag, + 0, // message-format + delivery->local.settled, + !delivery->done, + ssn_state->remote_incoming_window); + if (count < 0) return count; + xfr_posted = true; + ssn_state->outgoing_transfer_count += count; + ssn_state->remote_incoming_window -= count; + + int sent = bytes.size - transport->disp->output_size; + pn_buffer_trim(delivery->bytes, sent, 0); + link->session->outgoing_bytes -= sent; + if (!pn_buffer_size(delivery->bytes) && delivery->done) { + state->sent = true; + link_state->delivery_count++; + link_state->link_credit--; + link->queued--; + link->session->outgoing_deliveries--; + } + + pn_collector_put(transport->connection->collector, PN_OBJECT, link, PN_LINK_FLOW); + } + } + + pn_delivery_state_t *state = delivery->state.init ? &delivery->state : NULL; + if ((int16_t) ssn_state->local_channel >= 0 && !delivery->remote.settled + && state && state->sent && !xfr_posted) { + int err = pn_post_disp(transport, delivery); + if (err) return err; + } + + *settle = delivery->local.settled && state && state->sent; + return 0; +} + +int pn_process_tpwork_receiver(pn_transport_t *transport, pn_delivery_t *delivery, bool *settle) +{ + *settle = false; + pn_link_t *link = delivery->link; + // XXX: need to prevent duplicate disposition sending + pn_session_t *ssn = link->session; + if ((int16_t) ssn->state.local_channel >= 0 && !delivery->remote.settled && delivery->state.init) { + int err = pn_post_disp(transport, delivery); + if (err) return err; + } + + // XXX: need to centralize this policy and improve it + if (!ssn->state.incoming_window) { + int err = pn_post_flow(transport, ssn, link); + if (err) return err; + } + + *settle = delivery->local.settled; + return 0; +} + +int pn_process_tpwork(pn_transport_t *transport, pn_endpoint_t *endpoint) +{ + if (endpoint->type == CONNECTION && !transport->close_sent) + { + pn_connection_t *conn = (pn_connection_t *) endpoint; + pn_delivery_t *delivery = conn->tpwork_head; + while (delivery) + { + pn_delivery_t *tp_next = delivery->tpwork_next; + bool settle = false; + + pn_link_t *link = delivery->link; + pn_delivery_map_t *dm = NULL; + if (pn_link_is_sender(link)) { + dm = &link->session->state.outgoing; + int err = pn_process_tpwork_sender(transport, delivery, &settle); + if (err) return err; + } else { + dm = &link->session->state.incoming; + int err = pn_process_tpwork_receiver(transport, delivery, &settle); + if (err) return err; + } + + if (settle) { + pn_full_settle(dm, delivery); + } else if (!pn_delivery_buffered(delivery)) { + pn_clear_tpwork(delivery); + } + + delivery = tp_next; + } + } + + return 0; +} + +int pn_process_flush_disp(pn_transport_t *transport, pn_endpoint_t *endpoint) +{ + if (endpoint->type == SESSION) { + pn_session_t *session = (pn_session_t *) endpoint; + pn_session_state_t *state = &session->state; + if ((int16_t) state->local_channel >= 0 && !transport->close_sent) + { + int err = pn_flush_disp(transport, session); + if (err) return err; + } + } + + return 0; +} + +int pn_process_flow_sender(pn_transport_t *transport, pn_endpoint_t *endpoint) +{ + if (endpoint->type == SENDER && endpoint->state & PN_LOCAL_ACTIVE) + { + pn_link_t *snd = (pn_link_t *) endpoint; + pn_session_t *ssn = snd->session; + pn_link_state_t *state = &snd->state; + if ((int16_t) ssn->state.local_channel >= 0 && + (int32_t) state->local_handle >= 0 && + snd->drain && snd->drained) { + pn_delivery_t *tail = snd->unsettled_tail; + if (!tail || !pn_delivery_buffered(tail)) { + state->delivery_count += state->link_credit; + state->link_credit = 0; + snd->drained = 0; + return pn_post_flow(transport, ssn, snd); + } + } + } + + return 0; +} + +static void pni_unmap_local_handle(pn_link_t *link) { + pn_link_state_t *state = &link->state; + uintptr_t handle = state->local_handle; + state->local_handle = -2; + // may delete link + pn_hash_del(link->session->state.local_handles, handle); +} + +int pn_process_link_teardown(pn_transport_t *transport, pn_endpoint_t *endpoint) +{ + if (endpoint->type == SENDER || endpoint->type == RECEIVER) + { + pn_link_t *link = (pn_link_t *) endpoint; + pn_session_t *session = link->session; + pn_session_state_t *ssn_state = &session->state; + pn_link_state_t *state = &link->state; + if (((endpoint->state & PN_LOCAL_CLOSED) || link->detached) && (int32_t) state->local_handle >= 0 && + (int16_t) ssn_state->local_channel >= 0 && !transport->close_sent) { + if (pn_link_is_sender(link) && pn_link_queued(link) && + (int32_t) state->remote_handle != -2 && + (int16_t) ssn_state->remote_channel != -2 && + !transport->close_rcvd) return 0; + + const char *name = NULL; + const char *description = NULL; + pn_data_t *info = NULL; + + if (pn_condition_is_set(&endpoint->condition)) { + name = pn_condition_get_name(&endpoint->condition); + description = pn_condition_get_description(&endpoint->condition); + info = pn_condition_info(&endpoint->condition); + } + + int err = + pn_post_frame(transport->disp, ssn_state->local_channel, + "DL[Io?DL[sSC]]", DETACH, state->local_handle, !link->detached, + (bool)name, ERROR, name, description, info); + if (err) return err; + pni_unmap_local_handle(link); + } + + pn_clear_modified(transport->connection, endpoint); + } + + return 0; +} + +bool pn_pointful_buffering(pn_transport_t *transport, pn_session_t *session) +{ + if (transport->close_rcvd) return false; + if (!transport->open_rcvd) return true; + + pn_connection_t *conn = transport->connection; + pn_link_t *link = pn_link_head(conn, 0); + while (link) { + if (pn_link_is_sender(link) && pn_link_queued(link) > 0) { + pn_session_t *ssn = link->session; + if (session && session == ssn) { + if ((int32_t) link->state.remote_handle != -2 && + (int16_t) session->state.remote_channel != -2) { + return true; + } + } + } + link = pn_link_next(link, 0); + } + + return false; +} + +static void pni_unmap_local_channel(pn_session_t *ssn) { + // XXX: should really update link state also + pni_transport_unbind_handles(ssn->state.local_handles, false); + pn_transport_t *transport = ssn->connection->transport; + pn_session_state_t *state = &ssn->state; + uintptr_t channel = state->local_channel; + state->local_channel = -2; + // may delete session + pn_hash_del(transport->local_channels, channel); +} + +int pn_process_ssn_teardown(pn_transport_t *transport, pn_endpoint_t *endpoint) +{ + if (endpoint->type == SESSION) + { + pn_session_t *session = (pn_session_t *) endpoint; + pn_session_state_t *state = &session->state; + if (endpoint->state & PN_LOCAL_CLOSED && (int16_t) state->local_channel >= 0 + && !transport->close_sent) + { + if (pn_pointful_buffering(transport, session)) { + return 0; + } + + const char *name = NULL; + const char *description = NULL; + pn_data_t *info = NULL; + + if (pn_condition_is_set(&endpoint->condition)) { + name = pn_condition_get_name(&endpoint->condition); + description = pn_condition_get_description(&endpoint->condition); + info = pn_condition_info(&endpoint->condition); + } + + int err = pn_post_frame(transport->disp, state->local_channel, "DL[?DL[sSC]]", END, + (bool) name, ERROR, name, description, info); + if (err) return err; + pni_unmap_local_channel(session); + } + + pn_clear_modified(transport->connection, endpoint); + } + return 0; +} + +int pn_process_conn_teardown(pn_transport_t *transport, pn_endpoint_t *endpoint) +{ + if (endpoint->type == CONNECTION) + { + if (endpoint->state & PN_LOCAL_CLOSED && !transport->close_sent) { + if (pn_pointful_buffering(transport, NULL)) return 0; + int err = pn_post_close(transport, NULL, NULL); + if (err) return err; + transport->close_sent = true; + } + + pn_clear_modified(transport->connection, endpoint); + } + return 0; +} + +int pn_phase(pn_transport_t *transport, int (*phase)(pn_transport_t *, pn_endpoint_t *)) +{ + pn_connection_t *conn = transport->connection; + pn_endpoint_t *endpoint = conn->transport_head; + while (endpoint) + { + pn_endpoint_t *next = endpoint->transport_next; + int err = phase(transport, endpoint); + if (err) return err; + endpoint = next; + } + return 0; +} + +int pn_process(pn_transport_t *transport) +{ + int err; + if ((err = pn_phase(transport, pn_process_conn_setup))) return err; + if ((err = pn_phase(transport, pn_process_ssn_setup))) return err; + if ((err = pn_phase(transport, pn_process_link_setup))) return err; + if ((err = pn_phase(transport, pn_process_flow_receiver))) return err; + + // XXX: this has to happen two times because we might settle stuff + // on the first pass and create space for more work to be done on the + // second pass + if ((err = pn_phase(transport, pn_process_tpwork))) return err; + if ((err = pn_phase(transport, pn_process_tpwork))) return err; + + if ((err = pn_phase(transport, pn_process_flush_disp))) return err; + + if ((err = pn_phase(transport, pn_process_flow_sender))) return err; + if ((err = pn_phase(transport, pn_process_link_teardown))) return err; + if ((err = pn_phase(transport, pn_process_ssn_teardown))) return err; + if ((err = pn_phase(transport, pn_process_conn_teardown))) return err; + + if (transport->connection->tpwork_head) { + pn_modified(transport->connection, &transport->connection->endpoint, false); + } + + return 0; +} + +static ssize_t pn_output_write_header(pn_transport_t *transport, + char *bytes, size_t size, + const char *header, size_t hdrsize, + const char *protocol, + ssize_t (*next)(pn_io_layer_t *, char *, size_t)) +{ + if (transport->disp->trace & PN_TRACE_FRM) + pn_transport_logf(transport, " -> %s", protocol); + assert(size >= hdrsize); + memmove(bytes, header, hdrsize); + transport->io_layers[PN_IO_AMQP].process_output = next; + return hdrsize; +} + +static ssize_t pn_output_write_amqp_header(pn_io_layer_t *io_layer, char *bytes, size_t size) +{ + pn_transport_t *transport = (pn_transport_t *)io_layer->context; + return pn_output_write_header(transport, bytes, size, AMQP_HEADER, 8, "AMQP", + pn_output_write_amqp); +} + +static ssize_t pn_output_write_amqp(pn_io_layer_t *io_layer, char *bytes, size_t size) +{ + pn_transport_t *transport = (pn_transport_t *)io_layer->context; + if (transport->connection && !transport->done_processing) { + int err = pn_process(transport); + if (err) { + pn_transport_logf(transport, "process error %i", err); + transport->done_processing = true; + } + } + + // write out any buffered data _before_ returning PN_EOS, else we + // could truncate an outgoing Close frame containing a useful error + // status + if (!transport->disp->available && transport->close_sent) { + return PN_EOS; + } + + return pn_dispatcher_output(transport->disp, bytes, size); +} + +// generate outbound data, return amount of pending output else error +static ssize_t transport_produce(pn_transport_t *transport) +{ + pn_io_layer_t *io_layer = transport->io_layers; + ssize_t space = transport->output_size - transport->output_pending; + + if (space <= 0) { // can we expand the buffer? + int more = 0; + if (!transport->remote_max_frame) // no limit, so double it + more = transport->output_size; + else if (transport->remote_max_frame > transport->output_size) + more = pn_min(transport->output_size, transport->remote_max_frame - transport->output_size); + if (more) { + char *newbuf = (char *)realloc( transport->output_buf, transport->output_size + more ); + if (newbuf) { + transport->output_buf = newbuf; + transport->output_size += more; + space += more; + } + } + } + + while (space > 0) { + ssize_t n; + n = io_layer->process_output( io_layer, + &transport->output_buf[transport->output_pending], + space ); + if (n > 0) { + space -= n; + transport->output_pending += n; + } else if (n == 0) { + break; + } else { + if (transport->output_pending) + break; // return what is available + if (transport->disp->trace & (PN_TRACE_RAW | PN_TRACE_FRM)) { + if (n < 0) { + pn_transport_log(transport, " -> EOS"); + } + /*else + pn_transport_logf(transport, " -> EOS (%" PN_ZI ") %s", n, + pn_error_text(transport->error));*/ + } + return n; + } + } + return transport->output_pending; +} + +// deprecated +ssize_t pn_transport_output(pn_transport_t *transport, char *bytes, size_t size) +{ + if (!transport) return PN_ARG_ERR; + ssize_t available = pn_transport_pending(transport); + if (available > 0) { + available = (ssize_t) pn_min( (size_t)available, size ); + memmove( bytes, pn_transport_head(transport), available ); + pn_transport_pop( transport, (size_t) available ); + } + return available; +} + + +void pn_transport_trace(pn_transport_t *transport, pn_trace_t trace) +{ + if (transport->sasl) pn_sasl_trace(transport->sasl, trace); + if (transport->ssl) pn_ssl_trace(transport->ssl, trace); + transport->disp->trace = trace; +} + +void pn_transport_set_tracer(pn_transport_t *transport, pn_tracer_t tracer) +{ + assert(transport); + assert(tracer); + + transport->tracer = tracer; +} + +pn_tracer_t pn_transport_get_tracer(pn_transport_t *transport) +{ + assert(transport); + return transport->tracer; +} + +void pn_transport_set_context(pn_transport_t *transport, void *context) +{ + assert(transport); + transport->context = context; +} + +void *pn_transport_get_context(pn_transport_t *transport) +{ + assert(transport); + return transport->context; +} + +void pn_transport_log(pn_transport_t *transport, const char *message) +{ + assert(transport); + transport->tracer(transport, message); +} + +void pn_transport_logf(pn_transport_t *transport, const char *fmt, ...) +{ + va_list ap; + + va_start(ap, fmt); + pn_string_vformat(transport->scratch, fmt, ap); + va_end(ap); + + pn_transport_log(transport, pn_string_get(transport->scratch)); +} + +uint16_t pn_transport_get_channel_max(pn_transport_t *transport) +{ + return transport->channel_max; +} + +void pn_transport_set_channel_max(pn_transport_t *transport, uint16_t channel_max) +{ + transport->channel_max = channel_max; +} + +uint16_t pn_transport_remote_channel_max(pn_transport_t *transport) +{ + return transport->remote_channel_max; +} + +uint32_t pn_transport_get_max_frame(pn_transport_t *transport) +{ + return transport->local_max_frame; +} + +void pn_transport_set_max_frame(pn_transport_t *transport, uint32_t size) +{ + // if size == 0, no advertised limit to input frame size. + if (size && size < AMQP_MIN_MAX_FRAME_SIZE) + size = AMQP_MIN_MAX_FRAME_SIZE; + transport->local_max_frame = size; +} + +uint32_t pn_transport_get_remote_max_frame(pn_transport_t *transport) +{ + return transport->remote_max_frame; +} + +pn_millis_t pn_transport_get_idle_timeout(pn_transport_t *transport) +{ + return transport->local_idle_timeout; +} + +void pn_transport_set_idle_timeout(pn_transport_t *transport, pn_millis_t timeout) +{ + transport->local_idle_timeout = timeout; + transport->io_layers[PN_IO_AMQP].process_tick = pn_tick_amqp; +} + +pn_millis_t pn_transport_get_remote_idle_timeout(pn_transport_t *transport) +{ + return transport->remote_idle_timeout; +} + +pn_timestamp_t pn_transport_tick(pn_transport_t *transport, pn_timestamp_t now) +{ + pn_io_layer_t *io_layer = transport->io_layers; + return io_layer->process_tick( io_layer, now ); +} + +uint64_t pn_transport_get_frames_output(const pn_transport_t *transport) +{ + if (transport && transport->disp) + return transport->disp->output_frames_ct; + return 0; +} + +uint64_t pn_transport_get_frames_input(const pn_transport_t *transport) +{ + if (transport && transport->disp) + return transport->disp->input_frames_ct; + return 0; +} + +/** Pass through input handler */ +ssize_t pn_io_layer_input_passthru(pn_io_layer_t *io_layer, const char *data, size_t available) +{ + pn_io_layer_t *next = io_layer->next; + if (next) + return next->process_input( next, data, available ); + return PN_EOS; +} + +/** Pass through output handler */ +ssize_t pn_io_layer_output_passthru(pn_io_layer_t *io_layer, char *bytes, size_t size) +{ + pn_io_layer_t *next = io_layer->next; + if (next) + return next->process_output( next, bytes, size ); + return PN_EOS; +} + +/** Pass through tick handler */ +pn_timestamp_t pn_io_layer_tick_passthru(pn_io_layer_t *io_layer, pn_timestamp_t now) +{ + pn_io_layer_t *next = io_layer->next; + if (next) + return next->process_tick( next, now ); + return 0; +} + + +/// + +// input +ssize_t pn_transport_capacity(pn_transport_t *transport) /* <0 == done */ +{ + if (transport->tail_closed) return PN_EOS; + //if (pn_error_code(transport->error)) return pn_error_code(transport->error); + + ssize_t capacity = transport->input_size - transport->input_pending; + if ( capacity<=0 ) { + // can we expand the size of the input buffer? + int more = 0; + if (!transport->local_max_frame) { // no limit (ha!) + more = transport->input_size; + } else if (transport->local_max_frame > transport->input_size) { + more = pn_min(transport->input_size, transport->local_max_frame - transport->input_size); + } + if (more) { + char *newbuf = (char *) realloc( transport->input_buf, transport->input_size + more ); + if (newbuf) { + transport->input_buf = newbuf; + transport->input_size += more; + capacity += more; + } + } + } + return capacity; +} + + +char *pn_transport_tail(pn_transport_t *transport) +{ + if (transport && transport->input_pending < transport->input_size) { + return &transport->input_buf[transport->input_pending]; + } + return NULL; +} + +ssize_t pn_transport_push(pn_transport_t *transport, const char *src, size_t size) +{ + assert(transport); + + ssize_t capacity = pn_transport_capacity(transport); + if (capacity < 0) { + return capacity; + } else if (size > (size_t) capacity) { + size = capacity; + } + + char *dst = pn_transport_tail(transport); + assert(dst); + memmove(dst, src, size); + + int n = pn_transport_process(transport, size); + if (n < 0) { + return n; + } else { + return size; + } +} + +void pni_close_tail(pn_transport_t *transport) +{ + if (!transport->tail_closed) { + transport->tail_closed = true; + } +} + +int pn_transport_process(pn_transport_t *transport, size_t size) +{ + assert(transport); + size = pn_min( size, (transport->input_size - transport->input_pending) ); + transport->input_pending += size; + transport->bytes_input += size; + + ssize_t n = transport_consume( transport ); + if (n == PN_EOS) { + pni_close_tail(transport); + } + + if (n < 0 && n != PN_EOS) return n; + return 0; +} + +// input stream has closed +int pn_transport_close_tail(pn_transport_t *transport) +{ + pni_close_tail(transport); + transport_consume( transport ); + return 0; + // XXX: what if not all input processed at this point? do we care??? +} + +// output +ssize_t pn_transport_pending(pn_transport_t *transport) /* <0 == done */ +{ + assert(transport); + if (transport->head_closed) return PN_EOS; + return transport_produce( transport ); +} + +const char *pn_transport_head(pn_transport_t *transport) +{ + if (transport && transport->output_pending) { + return transport->output_buf; + } + return NULL; +} + +ssize_t pn_transport_peek(pn_transport_t *transport, char *dst, size_t size) +{ + assert(transport); + + ssize_t pending = pn_transport_pending(transport); + if (pending < 0) { + return pending; + } else if (size > (size_t) pending) { + size = pending; + } + + if (pending > 0) { + const char *src = pn_transport_head(transport); + assert(src); + memmove(dst, src, size); + } + + return size; +} + +void pn_transport_pop(pn_transport_t *transport, size_t size) +{ + if (transport) { + assert( transport->output_pending >= size ); + transport->output_pending -= size; + transport->bytes_output += size; + if (transport->output_pending) { + memmove( transport->output_buf, &transport->output_buf[size], + transport->output_pending ); + } + + if (!transport->output_pending && pn_transport_pending(transport) < 0 && + !transport->posted_head_closed) { + pn_collector_t *collector = pni_transport_collector(transport); + pn_collector_put(collector, PN_OBJECT, transport, PN_TRANSPORT_HEAD_CLOSED); + transport->posted_head_closed = true; + pni_maybe_post_closed(transport); + } + } +} + +int pn_transport_close_head(pn_transport_t *transport) +{ + size_t pending = pn_transport_pending(transport); + transport->head_closed = true; + pn_transport_pop(transport, pending); + return 0; +} + +// true if the transport will not generate further output +bool pn_transport_quiesced(pn_transport_t *transport) +{ + if (!transport) return true; + ssize_t pending = pn_transport_pending(transport); + if (pending < 0) return true; // output done + else if (pending > 0) return false; + // no pending at transport, but check if data is buffered in I/O layers + pn_io_layer_t *io_layer = transport->io_layers; + while (io_layer != &transport->io_layers[PN_IO_LAYER_CT]) { + if (io_layer->buffered_output && io_layer->buffered_output( io_layer )) + return false; + ++io_layer; + } + return true; +} + +bool pn_transport_closed(pn_transport_t *transport) +{ + assert(transport); + ssize_t capacity = pn_transport_capacity(transport); + ssize_t pending = pn_transport_pending(transport); + return capacity < 0 && pending < 0; +} + +pn_connection_t *pn_transport_connection(pn_transport_t *transport) +{ + assert(transport); + return transport->connection; +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4c6f2122/proton-j/src/main/resources/cengine.py ---------------------------------------------------------------------- diff --git a/proton-j/src/main/resources/cengine.py b/proton-j/src/main/resources/cengine.py index 63603bc..f9d4ddb 100644 --- a/proton-j/src/main/resources/cengine.py +++ b/proton-j/src/main/resources/cengine.py @@ -419,21 +419,22 @@ class pn_terminus: def decode(self, impl): if impl is not None: self.type = TERMINUS_TYPES_J2P[impl.__class__] - self.address = impl.getAddress() - self.durability = DURABILITY_J2P[impl.getDurable()] - self.expiry_policy = EXPIRY_POLICY_J2P[impl.getExpiryPolicy()] - self.timeout = impl.getTimeout().longValue() - self.dynamic = impl.getDynamic() - obj2dat(impl.getDynamicNodeProperties(), self.properties) - array2dat(impl.getCapabilities(), PN_SYMBOL, self.capabilities) - if self.type == PN_SOURCE: - self.distribution_mode = DISTRIBUTION_MODE_J2P[impl.getDistributionMode()] - array2dat(impl.getOutcomes(), PN_SYMBOL, self.outcomes) - obj2dat(impl.getFilter(), self.filter) + if self.type in (PN_SOURCE, PN_TARGET): + self.address = impl.getAddress() + self.durability = DURABILITY_J2P[impl.getDurable()] + self.expiry_policy = EXPIRY_POLICY_J2P[impl.getExpiryPolicy()] + self.timeout = impl.getTimeout().longValue() + self.dynamic = impl.getDynamic() + obj2dat(impl.getDynamicNodeProperties(), self.properties) + array2dat(impl.getCapabilities(), PN_SYMBOL, self.capabilities) + if self.type == PN_SOURCE: + self.distribution_mode = DISTRIBUTION_MODE_J2P[impl.getDistributionMode()] + array2dat(impl.getOutcomes(), PN_SYMBOL, self.outcomes) + obj2dat(impl.getFilter(), self.filter) def encode(self): impl = TERMINUS_TYPES_P2J[self.type]() - if impl is not None: + if self.type in (PN_SOURCE, PN_TARGET): impl.setAddress(self.address) impl.setDurable(DURABILITY_P2J[self.durability]) impl.setExpiryPolicy(EXPIRY_POLICY_P2J[self.expiry_policy]) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4c6f2122/tests/python/proton_tests/engine.py ---------------------------------------------------------------------- diff --git a/tests/python/proton_tests/engine.py b/tests/python/proton_tests/engine.py index d35f518..eec73d0 100644 --- a/tests/python/proton_tests/engine.py +++ b/tests/python/proton_tests/engine.py @@ -547,6 +547,9 @@ class LinkTest(Test): def test_target(self): self._test_source_target(None, TerminusConfig(address="target")) + def test_coordinator(self): + self._test_source_target(None, TerminusConfig(type=Terminus.COORDINATOR)) + def test_source_target_full(self): self._test_source_target(TerminusConfig(address="source", timeout=3, @@ -619,8 +622,8 @@ class LinkTest(Test): class TerminusConfig: - def __init__(self, address=None, timeout=None, durability=None, filter=None, - capabilities=None, dynamic=False, dist_mode=None): + def __init__(self, type=None, address=None, timeout=None, durability=None, + filter=None, capabilities=None, dynamic=False, dist_mode=None): self.address = address self.timeout = timeout self.durability = durability @@ -628,8 +631,11 @@ class TerminusConfig: self.capabilities = capabilities self.dynamic = dynamic self.dist_mode = dist_mode + self.type = type def __call__(self, terminus): + if self.type is not None: + terminus.type = self.type if self.address is not None: terminus.address = self.address if self.timeout is not None: --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
