This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
The following commit(s) were added to refs/heads/master by this push: new 72fccd6 PROTON-1914: early settlement of inbound streamed message. This closes #279 new 3c638f9 PROTON-1914: early settlement of inbound streamed message. This closes #279 72fccd6 is described below commit 72fccd66580dc510540010794e1f05f8a57183f2 Author: Cliff Jansen <cliffjan...@apache.org> AuthorDate: Fri Dec 11 10:29:30 2020 -0800 PROTON-1914: early settlement of inbound streamed message. This closes #279 --- c/src/core/engine-internal.h | 2 + c/src/core/engine.c | 2 + c/src/core/transport.c | 78 ++++++++++++++++++++++++++++---------- c/tests/connection_driver_test.cpp | 23 +++++++---- 4 files changed, 78 insertions(+), 27 deletions(-) diff --git a/c/src/core/engine-internal.h b/c/src/core/engine-internal.h index 11718c9..832d29d 100644 --- a/c/src/core/engine-internal.h +++ b/c/src/core/engine-internal.h @@ -305,6 +305,7 @@ struct pn_link_t { pn_sequence_t available; pn_sequence_t credit; pn_sequence_t queued; + pn_sequence_t more_id; int drained; // number of drained credits uint8_t snd_settle_mode; uint8_t rcv_settle_mode; @@ -313,6 +314,7 @@ struct pn_link_t { bool drain_flag_mode; // receiver only bool drain; bool detached; + bool more_pending; }; struct pn_disposition_t { diff --git a/c/src/core/engine.c b/c/src/core/engine.c index bfc8613..1aa1992 100644 --- a/c/src/core/engine.c +++ b/c/src/core/engine.c @@ -1177,6 +1177,7 @@ pn_link_t *pn_link_new(int type, pn_session_t *session, const char *name) link->available = 0; link->credit = 0; link->queued = 0; + link->more_id = 0; link->drain = false; link->drain_flag_mode = true; link->drained = 0; @@ -1186,6 +1187,7 @@ pn_link_t *pn_link_new(int type, pn_session_t *session, const char *name) link->remote_snd_settle_mode = PN_SND_MIXED; link->remote_rcv_settle_mode = PN_RCV_FIRST; link->detached = false; + link->more_pending = false; link->properties = 0; link->remote_properties = 0; diff --git a/c/src/core/transport.c b/c/src/core/transport.c index fe6ebf1..0467eef 100644 --- a/c/src/core/transport.c +++ b/c/src/core/transport.c @@ -1519,12 +1519,36 @@ int pn_do_transfer(pn_transport_t *transport, uint8_t frame_type, uint16_t chann if (!link) { return pn_do_error(transport, "amqp:invalid-field", "no such handle: %u", handle); } - pn_delivery_t *delivery; - if (link->unsettled_tail && !link->unsettled_tail->done) { - delivery = link->unsettled_tail; - if (settled_set && !settled && delivery->remote.settled) - return pn_do_error(transport, "amqp:invalid-field", "invalid transition from settled to unsettled"); + pn_delivery_t *delivery = NULL; + bool new_delivery = false; + if (link->more_pending) { + // Ongoing multiframe delivery. + if (link->unsettled_tail && !link->unsettled_tail->done) { + delivery = link->unsettled_tail; + if (settled_set && !settled && delivery->remote.settled) + return pn_do_error(transport, "amqp:invalid-field", "invalid transition from settled to unsettled"); + if (id_present && id != delivery->state.id) + return pn_do_error(transport, "amqp:invalid-field", "invalid delivery-id for a continuation transfer"); + } else { + // Application has already settled. Delivery is no more. + // Ignore content and look for transition to a new delivery. + if (!id_present || id == link->more_id) { + // Still old delivery. + if (!more || aborted) + link->more_pending = false; + } else { + // New id. + new_delivery = true; + link->more_pending = false; + } + } } else { + new_delivery = true; + } + + if (new_delivery) { + assert(!link->more_pending); + assert(delivery == NULL); pn_delivery_map_t *incoming = &ssn->state.incoming; if (!ssn->state.incoming_init) { @@ -1550,17 +1574,38 @@ int pn_do_transfer(pn_transport_t *transport, uint8_t frame_type, uint16_t chann link->queued++; } - pn_buffer_append(delivery->bytes, payload->start, payload->size); - ssn->incoming_bytes += payload->size; - delivery->done = !more; + if (delivery) { + pn_buffer_append(delivery->bytes, payload->start, payload->size); + if (more) { + if (!link->more_pending) { + // First frame of a multi-frame transfer. Remember at link level. + link->more_pending = true; + assert(id_present); // Id MUST be set on first frame, and already checked above. + link->more_id = id; + } + delivery->done = false; + } + else + delivery->done = true; + + // XXX: need to fill in remote state: delivery->remote.state = ...; + if (settled && !delivery->remote.settled) { + delivery->remote.settled = settled; + delivery->updated = true; + pn_work_update(transport->connection, delivery); + } - // XXX: need to fill in remote state: delivery->remote.state = ...; - if (settled && !delivery->remote.settled) { - delivery->remote.settled = settled; - delivery->updated = true; - pn_work_update(transport->connection, delivery); + if ((delivery->aborted = aborted)) { + delivery->remote.settled = true; + delivery->done = true; + delivery->updated = true; + link->more_pending = false; + pn_work_update(transport->connection, delivery); + } + pn_collector_put(transport->connection->collector, PN_OBJECT, delivery, PN_DELIVERY); } + ssn->incoming_bytes += payload->size; ssn->state.incoming_transfer_count++; ssn->state.incoming_window--; @@ -1569,13 +1614,6 @@ int pn_do_transfer(pn_transport_t *transport, uint8_t frame_type, uint16_t chann pni_post_flow(transport, ssn, link); } - if ((delivery->aborted = aborted)) { - delivery->remote.settled = true; - delivery->done = true; - delivery->updated = true; - pn_work_update(transport->connection, delivery); - } - pn_collector_put(transport->connection->collector, PN_OBJECT, delivery, PN_DELIVERY); return 0; } diff --git a/c/tests/connection_driver_test.cpp b/c/tests/connection_driver_test.cpp index f8e3345..8a86db5 100644 --- a/c/tests/connection_driver_test.cpp +++ b/c/tests/connection_driver_test.cpp @@ -1,6 +1,6 @@ /* * Licensed to the Apache Software Foundation (ASF) under one - h * or more contributor license agreements. See the NOTICE file + * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the @@ -525,10 +525,9 @@ TEST_CASE("driver_duplicate_link_client", "[!hide][!shouldfail]") { cond_empty()); } -/* Settling an incomplete delivery should not cause an error - This test will fail till PROTON-1914 is fixed +/* Settling an incomplete delivery should not cause an error. */ -TEST_CASE("driver_settle_incomplete_receiver", "[!hide][!shouldfail]") { +TEST_CASE("driver_settle_incomplete_receiver") { send_client_handler client; delivery_handler server; pn_test::driver_pair d(client, server); @@ -544,24 +543,34 @@ TEST_CASE("driver_settle_incomplete_receiver", "[!hide][!shouldfail]") { /* Send/receive a frame */ CHECK(sizeof(data) == pn_link_send(snd, data, sizeof(data))); + server.log_clear(); d.run(); CHECK_THAT(ETYPES(PN_DELIVERY), Equals(server.log_clear())); CHECK(sizeof(data) == pn_link_recv(rcv, rbuf, sizeof(data))); d.run(); - /* Settle the receiver's delivery */ + /* Settle early while the sender is still sending */ pn_delivery_settle(pn_link_current(rcv)); + CHECK(sizeof(data) == pn_link_send(snd, data, sizeof(data))); d.run(); CHECK_THAT(*pn_connection_remote_condition(d.client.connection), cond_empty()); CHECK_THAT(*pn_connection_condition(d.server.connection), cond_empty()); - /* Send/receive a frame, should not cause error */ + pn_delivery_settle(pn_link_current(snd)); + + /* Send/receive a new message, should not cause error */ + pn_link_flow(rcv, 1); + d.run(); + pn_delivery(snd, pn_bytes("2")); /* Prepare to send */ CHECK(sizeof(data) == pn_link_send(snd, data, sizeof(data))); + server.log_clear(); d.run(); CHECK_THAT(ETYPES(PN_DELIVERY), Equals(server.log_clear())); CHECK(sizeof(data) == pn_link_recv(rcv, rbuf, sizeof(data))); - d.run(); + pn_delivery_tag_t tag = pn_delivery_tag(pn_link_current(rcv)); + CHECK(tag.size == 1); + CHECK(tag.start[0] == '2'); CHECK_THAT(*pn_connection_remote_condition(d.client.connection), cond_empty()); CHECK_THAT(*pn_connection_condition(d.server.connection), cond_empty()); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org