This is an automated email from the ASF dual-hosted git repository. astitcher pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
commit 547588739d8b81f8bd7bf76cdfa425254b1a7ffd Author: Andrew Stitcher <astitc...@apache.org> AuthorDate: Tue Oct 29 16:44:18 2024 -0400 PROTON-2873: Add transaction disposition To support work on transaction support add a specific transactional disposition type. So that you can avoid making custom dispositions in this case. --- c/include/proton/disposition.h | 57 +++++++++++++++++++++++++++ c/src/core/emitters.h | 17 ++++++++ c/src/core/engine-internal.h | 7 ++++ c/src/core/engine.c | 56 ++++++++++++++++++++++++++ c/src/core/transport.c | 14 +++++++ c/tools/codec-generator/specs.json | 2 + python/cproton.h | 7 ++++ python/cproton.py | 14 ++++++- python/proton/__init__.py | 3 +- python/proton/_delivery.py | 78 +++++++++++++++++++++++++++++++++++-- python/tests/proton_tests/engine.py | 22 ++++++++++- 11 files changed, 269 insertions(+), 8 deletions(-) diff --git a/c/include/proton/disposition.h b/c/include/proton/disposition.h index d809b2100..e3b7a7800 100644 --- a/c/include/proton/disposition.h +++ b/c/include/proton/disposition.h @@ -90,6 +90,12 @@ typedef struct pn_disposition_t pn_disposition_t; */ #define PN_MODIFIED (0x0000000000000027) +/** + * The PN_TRANSACTIONAL_STATE delivery state is a non terminal state + * indicating the transactional state of a delivery. + */ +#define PN_TRANSACTIONAL_STATE (0x0000000000000034) + /** * Get the type of a disposition. * @@ -248,6 +254,13 @@ typedef struct pn_rejected_disposition_t pn_rejected_disposition_t; */ typedef struct pn_modified_disposition_t pn_modified_disposition_t; +/** + * A transactional delivery disposition + * + * This represents transactional delivery state. + */ +typedef struct pn_transactional_disposition_t pn_transactional_disposition_t; + /** * A custom delivery disposition * @@ -291,6 +304,15 @@ PN_EXTERN pn_rejected_disposition_t *pn_rejected_disposition(pn_disposition_t *d */ PN_EXTERN pn_modified_disposition_t *pn_modified_disposition(pn_disposition_t *disposition); +/** + * Convert a delivery disposition to a transactional disposition + * + * @param[in] disposition delivery disposition object + * @return a pointer to the transactional disposition or NULL + * if the disposition is not that type + */ +PN_EXTERN pn_transactional_disposition_t *pn_transactional_disposition(pn_disposition_t *disposition); + /** * Access the disposition as a raw pn_data_t. * @@ -419,6 +441,41 @@ PN_EXTERN void pn_modified_disposition_set_undeliverable(pn_modified_disposition */ PN_EXTERN pn_data_t *pn_modified_disposition_annotations(pn_modified_disposition_t *disposition); + +/** + * Get the transaction id for a transactional disposition + * + * @param[in] disposition a transactional disposition object + * @return the transaction id + */ +PN_EXTERN pn_bytes_t pn_transactional_disposition_get_id(pn_transactional_disposition_t *disposition); + +/** + * Set the transaction id for a transactional disposition + * + * @param[in] disposition a transactional disposition object + * @param[in] id the transaction id + */ +PN_EXTERN void pn_transactional_disposition_set_id(pn_transactional_disposition_t *disposition, pn_bytes_t id); + +/** + * Get the provisional outcome of the delivery if the transaction is committed successfully. + * + * @param[in] disposition a transactional disposition object + * @return the provisional outcome of the transaction + */ +PN_EXTERN uint64_t pn_transactional_disposition_get_outcome_type(pn_transactional_disposition_t *disposition); + +/** + * Set the provisional outcome of the del;ivery if the transaction is committed successfully. + * + * Only terminal disposition states are allowed (::PN_ACCEPTED, ::PN_REJECTED, ::PN_RELEASED, ::PN_MODIFIED) + * + * @param[in] disposition a transactional disposition object + * @param[in] outcome the provisional outcome of the transaction + */ +PN_EXTERN void pn_transactional_disposition_set_outcome_type(pn_transactional_disposition_t *disposition, uint64_t outcome); + /** * @} */ diff --git a/c/src/core/emitters.h b/c/src/core/emitters.h index 2cd59a64c..f20bdff10 100644 --- a/c/src/core/emitters.h +++ b/c/src/core/emitters.h @@ -647,6 +647,19 @@ static inline void emit_modified_disposition(pni_emitter_t* emitter, pni_compoun } } +static inline void emit_disposition(pni_emitter_t* emitter, pni_compound_context* compound0, pn_disposition_t *disposition); + +static inline void emit_transactional_disposition(pni_emitter_t* emitter, pni_compound_context* compound0, pn_transactional_disposition_t *disposition){ + for (bool small_encoding = true; ; small_encoding = false) { + pni_compound_context c = emit_list(emitter, compound0, small_encoding, true); + pni_compound_context compound = c; + emit_binary_bytes(emitter, &compound, disposition->id); + emit_raw(emitter, &compound, disposition->outcome_raw); + emit_end_list(emitter, &compound, small_encoding); + if (encode_succeeded(emitter, &compound)) break; + } +} + static inline void emit_custom_disposition(pni_emitter_t* emitter, pni_compound_context* compound0, pn_custom_disposition_t *disposition){ emit_descriptor(emitter, compound0, disposition->type); if ((disposition->data && pn_data_size(disposition->data) == 0) || @@ -687,6 +700,10 @@ static inline void emit_disposition(pni_emitter_t* emitter, pni_compound_context emit_descriptor(emitter, compound0, AMQP_DESC_MODIFIED); emit_modified_disposition(emitter, compound0, &disposition->u.s_modified); return; + case PN_DISP_TRANSACTIONAL: + emit_descriptor(emitter, compound0, AMQP_DESC_TRANSACTIONAL_STATE); + emit_transactional_disposition(emitter, compound0, &disposition->u.s_transactional); + return; case PN_DISP_CUSTOM: emit_custom_disposition(emitter, compound0, &disposition->u.s_custom); return; diff --git a/c/src/core/engine-internal.h b/c/src/core/engine-internal.h index da563813a..f10e6a8f0 100644 --- a/c/src/core/engine-internal.h +++ b/c/src/core/engine-internal.h @@ -341,6 +341,7 @@ typedef enum pn_disposition_type_t { PN_DISP_REJECTED = PN_REJECTED, PN_DISP_RELEASED = PN_RELEASED, PN_DISP_MODIFIED = PN_MODIFIED, + PN_DISP_TRANSACTIONAL = PN_TRANSACTIONAL_STATE, } pn_disposition_type_t; struct pn_received_disposition_t { @@ -359,6 +360,11 @@ struct pn_modified_disposition_t { bool undeliverable; }; +struct pn_transactional_disposition_t { + pn_bytes_t id; + pn_bytes_t outcome_raw; +}; + struct pn_custom_disposition_t { pn_data_t *data; pn_bytes_t data_raw; @@ -370,6 +376,7 @@ struct pn_disposition_t { struct pn_received_disposition_t s_received; struct pn_rejected_disposition_t s_rejected; struct pn_modified_disposition_t s_modified; + struct pn_transactional_disposition_t s_transactional; struct pn_custom_disposition_t s_custom; } u; uint16_t type; diff --git a/c/src/core/engine.c b/c/src/core/engine.c index 7ccb308b9..d7fb091d4 100644 --- a/c/src/core/engine.c +++ b/c/src/core/engine.c @@ -27,6 +27,7 @@ #include "consumers.h" #include "core/frame_consumers.h" #include "emitters.h" +#include "core/frame_generators.h" #include "fixed_string.h" #include "framing.h" #include "memory.h" @@ -1597,6 +1598,10 @@ static void pn_disposition_finalize(pn_disposition_t *ds) pn_data_free(ds->u.s_custom.data); pn_bytes_free(ds->u.s_custom.data_raw); break; + case PN_DISP_TRANSACTIONAL: + pn_bytes_free(ds->u.s_transactional.id); + pn_bytes_free(ds->u.s_transactional.outcome_raw); + break; } } @@ -1868,6 +1873,9 @@ void pni_disposition_to_raw(pn_disposition_t *disposition) { case PN_DISP_MODIFIED: emit_modified_disposition(&emitter, &compound, &disposition->u.s_modified); break; + case PN_DISP_TRANSACTIONAL: + emit_transactional_disposition(&emitter, &compound, &disposition->u.s_transactional); + break; } if (type != PN_DISP_EMPTY) { @@ -2005,6 +2013,13 @@ pn_modified_disposition_t *pn_modified_disposition(pn_disposition_t *disposition return &disposition->u.s_modified; } +pn_transactional_disposition_t *pn_transactional_disposition(pn_disposition_t *disposition) +{ + if (disposition->type==PN_DISP_EMPTY) disposition->type = PN_DISP_TRANSACTIONAL; + else if (disposition->type!=PN_DISP_TRANSACTIONAL) return NULL; + return &disposition->u.s_transactional; +} + pn_data_t *pn_custom_disposition_data(pn_custom_disposition_t *disposition) { assert(disposition); @@ -2084,6 +2099,44 @@ pn_data_t *pn_modified_disposition_annotations(pn_modified_disposition_t *dispos return disposition->annotations; } +pn_bytes_t pn_transactional_disposition_get_id(pn_transactional_disposition_t *disposition) +{ + assert(disposition); + return disposition->id; +} + +void pn_transactional_disposition_set_id(pn_transactional_disposition_t *disposition, pn_bytes_t id) +{ + assert(disposition); + pn_bytes_free(disposition->id); + disposition->id = pn_bytes_dup(id); +} + +uint64_t pn_transactional_disposition_get_outcome_type(pn_transactional_disposition_t *disposition) +{ + assert(disposition); + if (disposition->outcome_raw.size) { + bool qtype = false; + uint64_t type; + pn_amqp_decode_DQLq(disposition->outcome_raw, &qtype, &type); + if (qtype) { + return type; + } + } + return PN_DISP_EMPTY; +} + +void pn_transactional_disposition_set_outcome_type(pn_transactional_disposition_t *disposition, uint64_t type) +{ + assert(disposition); + // Generate a described LIST0 directly - this needs a max of 11 bytes + char outcome_scratch[11]; + pn_rwbytes_t scratch = {.size=sizeof(outcome_scratch), .start=outcome_scratch}; + pn_bytes_t outcome_raw = pn_amqp_encode_DLEe(&scratch, type); + pn_bytes_free(disposition->outcome_raw); + disposition->outcome_raw = pn_bytes_dup(outcome_raw); +} + pn_delivery_tag_t pn_delivery_tag(pn_delivery_t *delivery) { if (delivery) { @@ -2420,6 +2473,7 @@ void pn_delivery_update(pn_delivery_t *delivery, uint64_t state) case PN_RECEIVED: case PN_MODIFIED: case PN_RELEASED: + case PN_TRANSACTIONAL_STATE: break; default: delivery->local.u.s_custom.type = state; @@ -2434,6 +2488,7 @@ void pn_delivery_update(pn_delivery_t *delivery, uint64_t state) case PN_RECEIVED: case PN_MODIFIED: case PN_RELEASED: + case PN_TRANSACTIONAL_STATE: delivery->local.type = state; break; default: @@ -2804,6 +2859,7 @@ const char *pn_disposition_type_name(uint64_t d) { case PN_REJECTED: return "rejected"; case PN_RELEASED: return "released"; case PN_MODIFIED: return "modified"; + case PN_TRANSACTIONAL_STATE: return "transactional_state"; default: return "unknown"; } } diff --git a/c/src/core/transport.c b/c/src/core/transport.c index 892e430c1..98d015e63 100644 --- a/c/src/core/transport.c +++ b/c/src/core/transport.c @@ -1602,6 +1602,20 @@ static void pni_amqp_decode_disposition (uint64_t type, pn_bytes_t disp_data, pn } break; } + case AMQP_DESC_TRANSACTIONAL_STATE: { + pn_bytes_t id; + bool qoutcome; + pn_bytes_t outcome_raw; + pn_amqp_decode_DqEzQRe(disp_data, &id, &qoutcome, &outcome_raw); + disp->type = PN_DISP_TRANSACTIONAL; + pn_bytes_free(disp->u.s_transactional.id); + disp->u.s_transactional.id = pn_bytes_dup(id); + disp->u.s_transactional.outcome_raw = (pn_bytes_t){0, NULL}; + if (qoutcome) { + disp->u.s_transactional.outcome_raw = pn_bytes_dup(outcome_raw); + } + break; + } default: { pn_bytes_t data_raw = (pn_bytes_t){0, NULL}; pn_amqp_decode_DqR(disp_data, &data_raw); diff --git a/c/tools/codec-generator/specs.json b/c/tools/codec-generator/specs.json index dddf6d6f8..82b4a013f 100644 --- a/c/tools/codec-generator/specs.json +++ b/c/tools/codec-generator/specs.json @@ -2,6 +2,7 @@ "fill_specs": [ "R", "DLR", + "DL[]", "DL[c]", "DL[?HIIII]", "DL[?IIII?I?I?In?o]", @@ -44,6 +45,7 @@ "D.[s]", "D.[z]", "D.[Bz]", + "D.[z?R]", "D.[R]", "D?L.", "D?L?." diff --git a/python/cproton.h b/python/cproton.h index 59fc89944..76b9d87ab 100644 --- a/python/cproton.h +++ b/python/cproton.h @@ -410,11 +410,13 @@ typedef struct pn_custom_disposition_t pn_custom_disposition_t; typedef struct pn_received_disposition_t pn_received_disposition_t; typedef struct pn_rejected_disposition_t pn_rejected_disposition_t; typedef struct pn_modified_disposition_t pn_modified_disposition_t; +typedef struct pn_transactional_disposition_t pn_transactional_disposition_t; pn_custom_disposition_t *pn_custom_disposition(pn_disposition_t *disposition); pn_received_disposition_t *pn_received_disposition(pn_disposition_t *disposition); pn_rejected_disposition_t *pn_rejected_disposition(pn_disposition_t *disposition); pn_modified_disposition_t *pn_modified_disposition(pn_disposition_t *disposition); +pn_transactional_disposition_t *pn_transactional_disposition(pn_disposition_t *disposition); void pn_custom_disposition_set_type(pn_custom_disposition_t *disposition, uint64_t type); uint64_t pn_custom_disposition_get_type(pn_custom_disposition_t *disposition); @@ -429,6 +431,10 @@ void pn_modified_disposition_set_failed(pn_modified_disposition_t *disposition, _Bool pn_modified_disposition_is_undeliverable(pn_modified_disposition_t *disposition); void pn_modified_disposition_set_undeliverable(pn_modified_disposition_t *disposition, _Bool undeliverable); pn_data_t *pn_modified_disposition_annotations(pn_modified_disposition_t *disposition); +pn_bytes_t pn_transactional_disposition_get_id(pn_transactional_disposition_t *disposition); +void pn_transactional_disposition_set_id(pn_transactional_disposition_t *disposition, pn_bytes_t id); +uint64_t pn_transactional_disposition_get_outcome_type(pn_transactional_disposition_t *disposition); +void pn_transactional_disposition_set_outcome_type(pn_transactional_disposition_t *disposition, uint64_t outcome); int pn_error_code(pn_error_t *error); const char *pn_error_text(pn_error_t *error); @@ -656,6 +662,7 @@ int pn_transport_unbind(pn_transport_t *transport); #define PN_REJECTED ... #define PN_RELEASED ... #define PN_MODIFIED ... +#define PN_TRANSACTIONAL_STATE ... // Default message priority #define PN_DEFAULT_PRIORITY ... diff --git a/python/cproton.py b/python/cproton.py index 1dea17ac1..656b13534 100644 --- a/python/cproton.py +++ b/python/cproton.py @@ -54,7 +54,7 @@ from cproton_ffi.lib import (PN_ACCEPTED, PN_ARRAY, PN_BINARY, PN_BOOL, PN_BYTE, PN_SSL_RESUME_UNKNOWN, PN_SSL_SHA1, PN_SSL_SHA256, PN_SSL_SHA512, PN_SSL_VERIFY_PEER, PN_SSL_VERIFY_PEER_NAME, PN_STRING, PN_SYMBOL, PN_TARGET, PN_TIMEOUT, PN_TIMER_TASK, PN_TIMESTAMP, PN_TRACE_DRV, - PN_TRACE_FRM, PN_TRACE_OFF, PN_TRACE_RAW, PN_TRANSPORT, + PN_TRACE_FRM, PN_TRACE_OFF, PN_TRACE_RAW, PN_TRANSACTIONAL_STATE, PN_TRANSPORT, PN_TRANSPORT_CLOSED, PN_TRANSPORT_ERROR, PN_TRANSPORT_HEAD_CLOSED, PN_TRANSPORT_TAIL_CLOSED, PN_UBYTE, PN_UINT, PN_ULONG, PN_UNSPECIFIED, PN_USHORT, PN_UUID, PN_VERSION_MAJOR, PN_VERSION_MINOR, @@ -145,7 +145,9 @@ from cproton_ffi.lib import (PN_ACCEPTED, PN_ARRAY, PN_BINARY, PN_BOOL, PN_BYTE, pn_terminus_properties, pn_terminus_set_distribution_mode, pn_terminus_set_durability, pn_terminus_set_dynamic, pn_terminus_set_expiry_policy, pn_terminus_set_timeout, - pn_terminus_set_type, pn_transport, pn_transport_attachments, + pn_terminus_set_type, pn_transactional_disposition, + pn_transactional_disposition_get_outcome_type, + pn_transactional_disposition_set_outcome_type, pn_transport, pn_transport_attachments, pn_transport_bind, pn_transport_capacity, pn_transport_close_head, pn_transport_close_tail, pn_transport_closed, pn_transport_condition, pn_transport_connection, pn_transport_error, @@ -763,3 +765,11 @@ def pn_ssl_get_peer_hostname(ssl, size): def pn_ssl_set_peer_hostname(ssl, hostname): return lib.pn_ssl_set_peer_hostname(ssl, string2utf8(hostname)) + + +def pn_transactional_disposition_get_id(disp): + return bytes2pybytes(lib.pn_transactional_disposition_get_id(disp)) + + +def pn_transactional_disposition_set_id(disp, id): + return lib.pn_transactional_disposition_set_id(disp, py2bytes(id)) diff --git a/python/proton/__init__.py b/python/proton/__init__.py index 2b7c54cca..67c4a66f2 100644 --- a/python/proton/__init__.py +++ b/python/proton/__init__.py @@ -37,7 +37,7 @@ from ._condition import Condition from ._data import UNDESCRIBED, Array, Data, Described, char, symbol, timestamp, ubyte, ushort, uint, ulong, \ byte, short, int32, float32, decimal32, decimal64, decimal128, AnnotationDict, PropertyDict, SymbolList from ._delivery import Delivery, Disposition, DispositionType, CustomDisposition, RejectedDisposition, \ - ModifiedDisposition, ReceivedDisposition + ModifiedDisposition, ReceivedDisposition, TransactionalDisposition from ._endpoints import Endpoint, Connection, Session, Link, Receiver, Sender, Terminus from ._events import Collector, Event, EventType from ._exceptions import ProtonException, MessageException, DataException, TransportException, \ @@ -89,6 +89,7 @@ __all__ = [ "Terminus", "Timeout", "Interrupt", + "TransactionalDisposition", "Transport", "TransportException", "Url", diff --git a/python/proton/_delivery.py b/python/proton/_delivery.py index 1425b582b..81f2fda3c 100644 --- a/python/proton/_delivery.py +++ b/python/proton/_delivery.py @@ -17,9 +17,9 @@ # under the License. # -from cproton import (PN_ACCEPTED, PN_MODIFIED, PN_RECEIVED, PN_REJECTED, PN_RELEASED, pn_delivery_abort, - pn_delivery_aborted, pn_delivery_attachments, pn_delivery_link, pn_delivery_local, - pn_delivery_local_state, +from cproton import (PN_ACCEPTED, PN_MODIFIED, PN_RECEIVED, PN_REJECTED, PN_RELEASED, PN_TRANSACTIONAL_STATE, + pn_delivery_abort, pn_delivery_aborted, pn_delivery_attachments, pn_delivery_link, + pn_delivery_local, pn_delivery_local_state, pn_delivery_partial, pn_delivery_pending, pn_delivery_readable, pn_delivery_remote, pn_delivery_remote_state, pn_delivery_settle, pn_delivery_settled, pn_delivery_tag, pn_delivery_update, pn_delivery_updated, @@ -43,7 +43,12 @@ from cproton import (PN_ACCEPTED, PN_MODIFIED, PN_RECEIVED, PN_REJECTED, PN_RELE pn_modified_disposition_set_failed, pn_modified_disposition_is_undeliverable, pn_modified_disposition_set_undeliverable, - pn_modified_disposition_annotations) + pn_modified_disposition_annotations, + pn_transactional_disposition, + pn_transactional_disposition_get_id, + pn_transactional_disposition_set_id, + pn_transactional_disposition_get_outcome_type, + pn_transactional_disposition_set_outcome_type) from ._condition import cond2obj, obj2cond, Condition from ._data import dat2obj, obj2dat @@ -98,6 +103,12 @@ class DispositionType(IntEnum): delivery being settled. """ + TRANSACTIONAL_STATE = PN_TRANSACTIONAL_STATE + """ + A non-terminal delivery state indicating the transactional + state of a delivery + """ + @classmethod def or_int(cls, i: int) -> Union[int, 'DispositionType']: return cls(i) if i in cls._value2member_map_ else i @@ -119,6 +130,7 @@ class Disposition: REJECTED = DispositionType.REJECTED RELEASED = DispositionType.RELEASED MODIFIED = DispositionType.MODIFIED + TRANSACTIONAL_STATE = DispositionType.TRANSACTIONAL_STATE class RemoteDisposition(Disposition): @@ -133,6 +145,8 @@ class RemoteDisposition(Disposition): return super().__new__(RemoteRejectedDisposition) elif state == cls.MODIFIED: return super().__new__(RemoteModifiedDisposition) + elif state == cls.TRANSACTIONAL_STATE: + return super().__new__(RemoteTransactionalDisposition) else: return super().__new__(RemoteCustomDisposition) @@ -237,6 +251,29 @@ class RemoteModifiedDisposition(RemoteDisposition): ModifiedDisposition(self._failed, self._undeliverable, self._annotations).apply_to(local_disposition) +class RemoteTransactionalDisposition(RemoteDisposition): + + def __init__(self, delivery_impl): + impl = pn_transactional_disposition(pn_delivery_remote(delivery_impl)) + self._id = pn_transactional_disposition_get_id(impl) + self._outcome_type = pn_transactional_disposition_get_outcome_type(impl) + + @property + def type(self) -> Union[int, DispositionType]: + return Disposition.TRANSACTIONAL_STATE + + @property + def id(self): + return self._id + + @property + def outcome_type(self): + return self._outcome_type + + def apply_to(self, local_disposition: 'LocalDisposition'): + TransactionalDisposition(self._id, self._outcome_type).apply_to(local_disposition) + + class LocalDisposition(Disposition): def __init__(self, delivery_impl): @@ -430,6 +467,39 @@ class ModifiedDisposition(LocalDisposition): obj2dat(self._annotations, pn_modified_disposition_annotations(disp)) +class TransactionalDisposition(LocalDisposition): + + def __init__(self, id, outcome_type=None): + self._id = id + self._outcome_type = outcome_type + + @property + def type(self) -> Union[int, DispositionType]: + return Disposition.TRANSACTIONAL_STATE + + @property + def id(self): + return self._id + + @id.setter + def id(self, id): + self._id = id + + @property + def outcome_type(self): + return self._outcome_type + + @outcome_type.setter + def outcome_type(self, type): + self._outcome_type = type + + def apply_to(self, local_disposition: LocalDisposition): + disp = pn_transactional_disposition(local_disposition._impl) + pn_transactional_disposition_set_id(disp, self._id) + if self._outcome_type: + pn_transactional_disposition_set_outcome_type(disp, self._outcome_type) + + class Delivery(Wrapper): """ Tracks and/or records the delivery of a message over a link. diff --git a/python/tests/proton_tests/engine.py b/python/tests/proton_tests/engine.py index 894a910ff..94010d61e 100644 --- a/python/tests/proton_tests/engine.py +++ b/python/tests/proton_tests/engine.py @@ -24,7 +24,7 @@ from typing import Union from proton import Array, Condition, Collector, Connection, Data, Delivery, Disposition, DispositionType, Endpoint, \ Event, CustomDisposition, Link, ModifiedDisposition, PropertyDict, ReceivedDisposition, RejectedDisposition, \ - SASL, SessionException, SymbolList, Terminus, Transport, UNDESCRIBED, symbol + SASL, SessionException, SymbolList, Terminus, TransactionalDisposition, Transport, UNDESCRIBED, symbol from proton.reactor import Container from . import common @@ -2410,6 +2410,23 @@ class NewCustomTester(DispositionTester): assert dlv.remote.data == self._data, (dlv.data, self._data) +class TransactionalTester(DispositionTester): + def __init__(self, id, outcome_type): + self._id = id + self._outcome_type = outcome_type + super().__init__(Disposition.TRANSACTIONAL_STATE) + + def apply(self, dlv: Delivery): + dlv.local = TransactionalDisposition(self._id, self._outcome_type) + dlv.update() + + def check(self, dlv: Delivery): + assert dlv.remote_state == self._type + assert dlv.remote.type == self._type + assert dlv.remote.id == self._id + assert dlv.remote.outcome_type == self._outcome_type + + class DeliveryTest(Test): def tearDown(self): @@ -2488,6 +2505,9 @@ class DeliveryTest(Test): self._testDisposition(NewModifiedTester(failed=True, undeliverable=True, annotations={"key": "value"})) + def testTransactional(self): + self._testDisposition(TransactionalTester(id=b'1324xxx', outcome_type=Disposition.ACCEPTED)) + def testCustom(self): self._testDisposition(CustomTester(0x12345, [1, 2, 3])) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org