This is an automated email from the ASF dual-hosted git repository.

astitcher pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git

commit 547588739d8b81f8bd7bf76cdfa425254b1a7ffd
Author: Andrew Stitcher <astitc...@apache.org>
AuthorDate: Tue Oct 29 16:44:18 2024 -0400

    PROTON-2873: Add transaction disposition
    
    To support work on transaction support add a specific transactional
    disposition type. So that you can avoid making custom dispositions
    in this case.
---
 c/include/proton/disposition.h      | 57 +++++++++++++++++++++++++++
 c/src/core/emitters.h               | 17 ++++++++
 c/src/core/engine-internal.h        |  7 ++++
 c/src/core/engine.c                 | 56 ++++++++++++++++++++++++++
 c/src/core/transport.c              | 14 +++++++
 c/tools/codec-generator/specs.json  |  2 +
 python/cproton.h                    |  7 ++++
 python/cproton.py                   | 14 ++++++-
 python/proton/__init__.py           |  3 +-
 python/proton/_delivery.py          | 78 +++++++++++++++++++++++++++++++++++--
 python/tests/proton_tests/engine.py | 22 ++++++++++-
 11 files changed, 269 insertions(+), 8 deletions(-)

diff --git a/c/include/proton/disposition.h b/c/include/proton/disposition.h
index d809b2100..e3b7a7800 100644
--- a/c/include/proton/disposition.h
+++ b/c/include/proton/disposition.h
@@ -90,6 +90,12 @@ typedef struct pn_disposition_t pn_disposition_t;
  */
 #define PN_MODIFIED (0x0000000000000027)
 
+/**
+ * The PN_TRANSACTIONAL_STATE delivery state is a non terminal state
+ * indicating the transactional state of a delivery.
+ */
+#define PN_TRANSACTIONAL_STATE (0x0000000000000034)
+
 /**
  * Get the type of a disposition.
  *
@@ -248,6 +254,13 @@ typedef struct pn_rejected_disposition_t 
pn_rejected_disposition_t;
  */
 typedef struct pn_modified_disposition_t pn_modified_disposition_t;
 
+/**
+ * A transactional delivery disposition
+ *
+ * This represents transactional delivery state.
+ */
+typedef struct pn_transactional_disposition_t pn_transactional_disposition_t;
+
 /**
  * A custom delivery disposition
  *
@@ -291,6 +304,15 @@ PN_EXTERN pn_rejected_disposition_t 
*pn_rejected_disposition(pn_disposition_t *d
  */
 PN_EXTERN pn_modified_disposition_t *pn_modified_disposition(pn_disposition_t 
*disposition);
 
+/**
+ * Convert a delivery disposition to a transactional disposition
+ *
+ * @param[in] disposition delivery disposition object
+ * @return a pointer to the transactional disposition or NULL
+ * if the disposition is not that type
+ */
+PN_EXTERN pn_transactional_disposition_t 
*pn_transactional_disposition(pn_disposition_t *disposition);
+
 /**
  * Access the disposition as a raw pn_data_t.
  *
@@ -419,6 +441,41 @@ PN_EXTERN void 
pn_modified_disposition_set_undeliverable(pn_modified_disposition
  */
 PN_EXTERN pn_data_t 
*pn_modified_disposition_annotations(pn_modified_disposition_t *disposition);
 
+
+/**
+ * Get the transaction id for a transactional disposition
+ *
+ * @param[in] disposition a transactional disposition object
+ * @return the transaction id
+ */
+PN_EXTERN pn_bytes_t 
pn_transactional_disposition_get_id(pn_transactional_disposition_t 
*disposition);
+
+/**
+ * Set the transaction id for a transactional disposition
+ *
+ * @param[in] disposition a transactional disposition object
+ * @param[in] id the transaction id
+ */
+PN_EXTERN void 
pn_transactional_disposition_set_id(pn_transactional_disposition_t 
*disposition, pn_bytes_t id);
+
+/**
+ * Get the provisional outcome of the delivery if the transaction is committed 
successfully.
+ *
+ * @param[in] disposition a transactional disposition object
+ * @return the provisional outcome of the transaction
+ */
+PN_EXTERN uint64_t 
pn_transactional_disposition_get_outcome_type(pn_transactional_disposition_t 
*disposition);
+
+/**
+ * Set the provisional outcome of the del;ivery if the transaction is 
committed successfully.
+ *
+ * Only terminal disposition states are allowed (::PN_ACCEPTED, ::PN_REJECTED, 
::PN_RELEASED, ::PN_MODIFIED)
+ *
+ * @param[in] disposition a transactional disposition object
+ * @param[in] outcome the provisional outcome of the transaction
+ */
+PN_EXTERN void 
pn_transactional_disposition_set_outcome_type(pn_transactional_disposition_t 
*disposition, uint64_t outcome);
+
 /**
  * @}
  */
diff --git a/c/src/core/emitters.h b/c/src/core/emitters.h
index 2cd59a64c..f20bdff10 100644
--- a/c/src/core/emitters.h
+++ b/c/src/core/emitters.h
@@ -647,6 +647,19 @@ static inline void 
emit_modified_disposition(pni_emitter_t* emitter, pni_compoun
   }
 }
 
+static inline void emit_disposition(pni_emitter_t* emitter, 
pni_compound_context* compound0, pn_disposition_t *disposition);
+
+static inline void emit_transactional_disposition(pni_emitter_t* emitter, 
pni_compound_context* compound0, pn_transactional_disposition_t *disposition){
+  for (bool small_encoding = true; ; small_encoding = false) {
+    pni_compound_context c = emit_list(emitter, compound0, small_encoding, 
true);
+    pni_compound_context compound = c;
+    emit_binary_bytes(emitter, &compound, disposition->id);
+    emit_raw(emitter, &compound, disposition->outcome_raw);
+    emit_end_list(emitter, &compound, small_encoding);
+    if (encode_succeeded(emitter, &compound)) break;
+  }
+}
+
 static inline void emit_custom_disposition(pni_emitter_t* emitter, 
pni_compound_context* compound0, pn_custom_disposition_t *disposition){
   emit_descriptor(emitter, compound0, disposition->type);
   if ((disposition->data && pn_data_size(disposition->data) == 0) ||
@@ -687,6 +700,10 @@ static inline void emit_disposition(pni_emitter_t* 
emitter, pni_compound_context
       emit_descriptor(emitter, compound0, AMQP_DESC_MODIFIED);
       emit_modified_disposition(emitter, compound0, 
&disposition->u.s_modified);
       return;
+    case PN_DISP_TRANSACTIONAL:
+      emit_descriptor(emitter, compound0, AMQP_DESC_TRANSACTIONAL_STATE);
+      emit_transactional_disposition(emitter, compound0, 
&disposition->u.s_transactional);
+      return;
     case PN_DISP_CUSTOM:
       emit_custom_disposition(emitter, compound0, &disposition->u.s_custom);
       return;
diff --git a/c/src/core/engine-internal.h b/c/src/core/engine-internal.h
index da563813a..f10e6a8f0 100644
--- a/c/src/core/engine-internal.h
+++ b/c/src/core/engine-internal.h
@@ -341,6 +341,7 @@ typedef enum pn_disposition_type_t {
   PN_DISP_REJECTED = PN_REJECTED,
   PN_DISP_RELEASED = PN_RELEASED,
   PN_DISP_MODIFIED = PN_MODIFIED,
+  PN_DISP_TRANSACTIONAL = PN_TRANSACTIONAL_STATE,
 } pn_disposition_type_t;
 
 struct pn_received_disposition_t {
@@ -359,6 +360,11 @@ struct pn_modified_disposition_t {
   bool undeliverable;
 };
 
+struct pn_transactional_disposition_t {
+  pn_bytes_t id;
+  pn_bytes_t outcome_raw;
+};
+
 struct pn_custom_disposition_t {
   pn_data_t *data;
   pn_bytes_t data_raw;
@@ -370,6 +376,7 @@ struct pn_disposition_t {
     struct pn_received_disposition_t s_received;
     struct pn_rejected_disposition_t s_rejected;
     struct pn_modified_disposition_t s_modified;
+    struct pn_transactional_disposition_t s_transactional;
     struct pn_custom_disposition_t   s_custom;
   } u;
   uint16_t type;
diff --git a/c/src/core/engine.c b/c/src/core/engine.c
index 7ccb308b9..d7fb091d4 100644
--- a/c/src/core/engine.c
+++ b/c/src/core/engine.c
@@ -27,6 +27,7 @@
 #include "consumers.h"
 #include "core/frame_consumers.h"
 #include "emitters.h"
+#include "core/frame_generators.h"
 #include "fixed_string.h"
 #include "framing.h"
 #include "memory.h"
@@ -1597,6 +1598,10 @@ static void pn_disposition_finalize(pn_disposition_t *ds)
       pn_data_free(ds->u.s_custom.data);
       pn_bytes_free(ds->u.s_custom.data_raw);
       break;
+    case PN_DISP_TRANSACTIONAL:
+      pn_bytes_free(ds->u.s_transactional.id);
+      pn_bytes_free(ds->u.s_transactional.outcome_raw);
+      break;
   }
 }
 
@@ -1868,6 +1873,9 @@ void pni_disposition_to_raw(pn_disposition_t 
*disposition) {
     case PN_DISP_MODIFIED:
       emit_modified_disposition(&emitter, &compound, 
&disposition->u.s_modified);
       break;
+    case PN_DISP_TRANSACTIONAL:
+      emit_transactional_disposition(&emitter, &compound, 
&disposition->u.s_transactional);
+      break;
   }
 
   if (type != PN_DISP_EMPTY) {
@@ -2005,6 +2013,13 @@ pn_modified_disposition_t 
*pn_modified_disposition(pn_disposition_t *disposition
   return &disposition->u.s_modified;
 }
 
+pn_transactional_disposition_t *pn_transactional_disposition(pn_disposition_t 
*disposition)
+{
+  if (disposition->type==PN_DISP_EMPTY) disposition->type = 
PN_DISP_TRANSACTIONAL;
+  else if (disposition->type!=PN_DISP_TRANSACTIONAL) return NULL;
+  return &disposition->u.s_transactional;
+}
+
 pn_data_t *pn_custom_disposition_data(pn_custom_disposition_t *disposition)
 {
   assert(disposition);
@@ -2084,6 +2099,44 @@ pn_data_t 
*pn_modified_disposition_annotations(pn_modified_disposition_t *dispos
   return disposition->annotations;
 }
 
+pn_bytes_t pn_transactional_disposition_get_id(pn_transactional_disposition_t 
*disposition)
+{
+  assert(disposition);
+  return disposition->id;
+}
+
+void pn_transactional_disposition_set_id(pn_transactional_disposition_t 
*disposition, pn_bytes_t id)
+{
+  assert(disposition);
+  pn_bytes_free(disposition->id);
+  disposition->id = pn_bytes_dup(id);
+}
+
+uint64_t 
pn_transactional_disposition_get_outcome_type(pn_transactional_disposition_t 
*disposition)
+{
+  assert(disposition);
+  if (disposition->outcome_raw.size) {
+    bool qtype = false;
+    uint64_t type;
+    pn_amqp_decode_DQLq(disposition->outcome_raw, &qtype, &type);
+    if (qtype) {
+      return type;
+    }
+  }
+  return PN_DISP_EMPTY;
+}
+
+void 
pn_transactional_disposition_set_outcome_type(pn_transactional_disposition_t 
*disposition, uint64_t type)
+{
+  assert(disposition);
+  // Generate a described LIST0 directly - this needs a max of 11 bytes
+  char outcome_scratch[11];
+  pn_rwbytes_t scratch = {.size=sizeof(outcome_scratch), 
.start=outcome_scratch};
+  pn_bytes_t outcome_raw = pn_amqp_encode_DLEe(&scratch, type);
+  pn_bytes_free(disposition->outcome_raw);
+  disposition->outcome_raw = pn_bytes_dup(outcome_raw);
+}
+
 pn_delivery_tag_t pn_delivery_tag(pn_delivery_t *delivery)
 {
   if (delivery) {
@@ -2420,6 +2473,7 @@ void pn_delivery_update(pn_delivery_t *delivery, uint64_t 
state)
       case PN_RECEIVED:
       case PN_MODIFIED:
       case PN_RELEASED:
+      case PN_TRANSACTIONAL_STATE:
         break;
       default:
         delivery->local.u.s_custom.type = state;
@@ -2434,6 +2488,7 @@ void pn_delivery_update(pn_delivery_t *delivery, uint64_t 
state)
     case PN_RECEIVED:
     case PN_MODIFIED:
     case PN_RELEASED:
+    case PN_TRANSACTIONAL_STATE:
       delivery->local.type = state;
       break;
     default:
@@ -2804,6 +2859,7 @@ const char *pn_disposition_type_name(uint64_t d) {
    case PN_REJECTED: return "rejected";
    case PN_RELEASED: return "released";
    case PN_MODIFIED: return "modified";
+   case PN_TRANSACTIONAL_STATE: return "transactional_state";
    default: return "unknown";
   }
 }
diff --git a/c/src/core/transport.c b/c/src/core/transport.c
index 892e430c1..98d015e63 100644
--- a/c/src/core/transport.c
+++ b/c/src/core/transport.c
@@ -1602,6 +1602,20 @@ static void pni_amqp_decode_disposition (uint64_t type, 
pn_bytes_t disp_data, pn
       }
       break;
     }
+    case AMQP_DESC_TRANSACTIONAL_STATE: {
+      pn_bytes_t id;
+      bool qoutcome;
+      pn_bytes_t outcome_raw;
+      pn_amqp_decode_DqEzQRe(disp_data, &id, &qoutcome, &outcome_raw);
+      disp->type = PN_DISP_TRANSACTIONAL;
+      pn_bytes_free(disp->u.s_transactional.id);
+      disp->u.s_transactional.id = pn_bytes_dup(id);
+      disp->u.s_transactional.outcome_raw = (pn_bytes_t){0, NULL};
+      if (qoutcome) {
+        disp->u.s_transactional.outcome_raw = pn_bytes_dup(outcome_raw);
+      }
+      break;
+    }
     default: {
       pn_bytes_t data_raw = (pn_bytes_t){0, NULL};
       pn_amqp_decode_DqR(disp_data, &data_raw);
diff --git a/c/tools/codec-generator/specs.json 
b/c/tools/codec-generator/specs.json
index dddf6d6f8..82b4a013f 100644
--- a/c/tools/codec-generator/specs.json
+++ b/c/tools/codec-generator/specs.json
@@ -2,6 +2,7 @@
   "fill_specs": [
     "R",
     "DLR",
+    "DL[]",
     "DL[c]",
     "DL[?HIIII]",
     "DL[?IIII?I?I?In?o]",
@@ -44,6 +45,7 @@
     "D.[s]",
     "D.[z]",
     "D.[Bz]",
+    "D.[z?R]",
     "D.[R]",
     "D?L.",
     "D?L?."
diff --git a/python/cproton.h b/python/cproton.h
index 59fc89944..76b9d87ab 100644
--- a/python/cproton.h
+++ b/python/cproton.h
@@ -410,11 +410,13 @@ typedef struct pn_custom_disposition_t 
pn_custom_disposition_t;
 typedef struct pn_received_disposition_t pn_received_disposition_t;
 typedef struct pn_rejected_disposition_t pn_rejected_disposition_t;
 typedef struct pn_modified_disposition_t pn_modified_disposition_t;
+typedef struct pn_transactional_disposition_t pn_transactional_disposition_t;
 
 pn_custom_disposition_t *pn_custom_disposition(pn_disposition_t *disposition);
 pn_received_disposition_t *pn_received_disposition(pn_disposition_t 
*disposition);
 pn_rejected_disposition_t *pn_rejected_disposition(pn_disposition_t 
*disposition);
 pn_modified_disposition_t *pn_modified_disposition(pn_disposition_t 
*disposition);
+pn_transactional_disposition_t *pn_transactional_disposition(pn_disposition_t 
*disposition);
 
 void pn_custom_disposition_set_type(pn_custom_disposition_t *disposition, 
uint64_t type);
 uint64_t pn_custom_disposition_get_type(pn_custom_disposition_t *disposition);
@@ -429,6 +431,10 @@ void 
pn_modified_disposition_set_failed(pn_modified_disposition_t *disposition,
 _Bool pn_modified_disposition_is_undeliverable(pn_modified_disposition_t 
*disposition);
 void pn_modified_disposition_set_undeliverable(pn_modified_disposition_t 
*disposition, _Bool undeliverable);
 pn_data_t *pn_modified_disposition_annotations(pn_modified_disposition_t 
*disposition);
+pn_bytes_t pn_transactional_disposition_get_id(pn_transactional_disposition_t 
*disposition);
+void pn_transactional_disposition_set_id(pn_transactional_disposition_t 
*disposition, pn_bytes_t id);
+uint64_t 
pn_transactional_disposition_get_outcome_type(pn_transactional_disposition_t 
*disposition);
+void 
pn_transactional_disposition_set_outcome_type(pn_transactional_disposition_t 
*disposition, uint64_t outcome);
 
 int pn_error_code(pn_error_t *error);
 const char *pn_error_text(pn_error_t *error);
@@ -656,6 +662,7 @@ int pn_transport_unbind(pn_transport_t *transport);
 #define PN_REJECTED ...
 #define PN_RELEASED ...
 #define PN_MODIFIED ...
+#define PN_TRANSACTIONAL_STATE ...
 
 // Default message priority
 #define PN_DEFAULT_PRIORITY ...
diff --git a/python/cproton.py b/python/cproton.py
index 1dea17ac1..656b13534 100644
--- a/python/cproton.py
+++ b/python/cproton.py
@@ -54,7 +54,7 @@ from cproton_ffi.lib import (PN_ACCEPTED, PN_ARRAY, 
PN_BINARY, PN_BOOL, PN_BYTE,
                              PN_SSL_RESUME_UNKNOWN, PN_SSL_SHA1, 
PN_SSL_SHA256, PN_SSL_SHA512,
                              PN_SSL_VERIFY_PEER, PN_SSL_VERIFY_PEER_NAME, 
PN_STRING, PN_SYMBOL,
                              PN_TARGET, PN_TIMEOUT, PN_TIMER_TASK, 
PN_TIMESTAMP, PN_TRACE_DRV,
-                             PN_TRACE_FRM, PN_TRACE_OFF, PN_TRACE_RAW, 
PN_TRANSPORT,
+                             PN_TRACE_FRM, PN_TRACE_OFF, PN_TRACE_RAW, 
PN_TRANSACTIONAL_STATE, PN_TRANSPORT,
                              PN_TRANSPORT_CLOSED, PN_TRANSPORT_ERROR, 
PN_TRANSPORT_HEAD_CLOSED,
                              PN_TRANSPORT_TAIL_CLOSED, PN_UBYTE, PN_UINT, 
PN_ULONG, PN_UNSPECIFIED,
                              PN_USHORT, PN_UUID, PN_VERSION_MAJOR, 
PN_VERSION_MINOR,
@@ -145,7 +145,9 @@ from cproton_ffi.lib import (PN_ACCEPTED, PN_ARRAY, 
PN_BINARY, PN_BOOL, PN_BYTE,
                              pn_terminus_properties, 
pn_terminus_set_distribution_mode,
                              pn_terminus_set_durability, 
pn_terminus_set_dynamic,
                              pn_terminus_set_expiry_policy, 
pn_terminus_set_timeout,
-                             pn_terminus_set_type, pn_transport, 
pn_transport_attachments,
+                             pn_terminus_set_type, 
pn_transactional_disposition,
+                             pn_transactional_disposition_get_outcome_type,
+                             pn_transactional_disposition_set_outcome_type, 
pn_transport, pn_transport_attachments,
                              pn_transport_bind, pn_transport_capacity, 
pn_transport_close_head,
                              pn_transport_close_tail, pn_transport_closed, 
pn_transport_condition,
                              pn_transport_connection, pn_transport_error,
@@ -763,3 +765,11 @@ def pn_ssl_get_peer_hostname(ssl, size):
 
 def pn_ssl_set_peer_hostname(ssl, hostname):
     return lib.pn_ssl_set_peer_hostname(ssl, string2utf8(hostname))
+
+
+def pn_transactional_disposition_get_id(disp):
+    return bytes2pybytes(lib.pn_transactional_disposition_get_id(disp))
+
+
+def pn_transactional_disposition_set_id(disp, id):
+    return lib.pn_transactional_disposition_set_id(disp, py2bytes(id))
diff --git a/python/proton/__init__.py b/python/proton/__init__.py
index 2b7c54cca..67c4a66f2 100644
--- a/python/proton/__init__.py
+++ b/python/proton/__init__.py
@@ -37,7 +37,7 @@ from ._condition import Condition
 from ._data import UNDESCRIBED, Array, Data, Described, char, symbol, 
timestamp, ubyte, ushort, uint, ulong, \
     byte, short, int32, float32, decimal32, decimal64, decimal128, 
AnnotationDict, PropertyDict, SymbolList
 from ._delivery import Delivery, Disposition, DispositionType, 
CustomDisposition, RejectedDisposition, \
-    ModifiedDisposition, ReceivedDisposition
+    ModifiedDisposition, ReceivedDisposition, TransactionalDisposition
 from ._endpoints import Endpoint, Connection, Session, Link, Receiver, Sender, 
Terminus
 from ._events import Collector, Event, EventType
 from ._exceptions import ProtonException, MessageException, DataException, 
TransportException, \
@@ -89,6 +89,7 @@ __all__ = [
     "Terminus",
     "Timeout",
     "Interrupt",
+    "TransactionalDisposition",
     "Transport",
     "TransportException",
     "Url",
diff --git a/python/proton/_delivery.py b/python/proton/_delivery.py
index 1425b582b..81f2fda3c 100644
--- a/python/proton/_delivery.py
+++ b/python/proton/_delivery.py
@@ -17,9 +17,9 @@
 # under the License.
 #
 
-from cproton import (PN_ACCEPTED, PN_MODIFIED, PN_RECEIVED, PN_REJECTED, 
PN_RELEASED, pn_delivery_abort,
-                     pn_delivery_aborted, pn_delivery_attachments, 
pn_delivery_link, pn_delivery_local,
-                     pn_delivery_local_state,
+from cproton import (PN_ACCEPTED, PN_MODIFIED, PN_RECEIVED, PN_REJECTED, 
PN_RELEASED, PN_TRANSACTIONAL_STATE,
+                     pn_delivery_abort, pn_delivery_aborted, 
pn_delivery_attachments, pn_delivery_link,
+                     pn_delivery_local, pn_delivery_local_state,
                      pn_delivery_partial, pn_delivery_pending, 
pn_delivery_readable, pn_delivery_remote,
                      pn_delivery_remote_state,
                      pn_delivery_settle, pn_delivery_settled, pn_delivery_tag, 
pn_delivery_update, pn_delivery_updated,
@@ -43,7 +43,12 @@ from cproton import (PN_ACCEPTED, PN_MODIFIED, PN_RECEIVED, 
PN_REJECTED, PN_RELE
                      pn_modified_disposition_set_failed,
                      pn_modified_disposition_is_undeliverable,
                      pn_modified_disposition_set_undeliverable,
-                     pn_modified_disposition_annotations)
+                     pn_modified_disposition_annotations,
+                     pn_transactional_disposition,
+                     pn_transactional_disposition_get_id,
+                     pn_transactional_disposition_set_id,
+                     pn_transactional_disposition_get_outcome_type,
+                     pn_transactional_disposition_set_outcome_type)
 
 from ._condition import cond2obj, obj2cond, Condition
 from ._data import dat2obj, obj2dat
@@ -98,6 +103,12 @@ class DispositionType(IntEnum):
     delivery being settled.
     """
 
+    TRANSACTIONAL_STATE = PN_TRANSACTIONAL_STATE
+    """
+    A non-terminal delivery state indicating the transactional
+    state of a delivery
+    """
+
     @classmethod
     def or_int(cls, i: int) -> Union[int, 'DispositionType']:
         return cls(i) if i in cls._value2member_map_ else i
@@ -119,6 +130,7 @@ class Disposition:
     REJECTED = DispositionType.REJECTED
     RELEASED = DispositionType.RELEASED
     MODIFIED = DispositionType.MODIFIED
+    TRANSACTIONAL_STATE = DispositionType.TRANSACTIONAL_STATE
 
 
 class RemoteDisposition(Disposition):
@@ -133,6 +145,8 @@ class RemoteDisposition(Disposition):
             return super().__new__(RemoteRejectedDisposition)
         elif state == cls.MODIFIED:
             return super().__new__(RemoteModifiedDisposition)
+        elif state == cls.TRANSACTIONAL_STATE:
+            return super().__new__(RemoteTransactionalDisposition)
         else:
             return super().__new__(RemoteCustomDisposition)
 
@@ -237,6 +251,29 @@ class RemoteModifiedDisposition(RemoteDisposition):
         ModifiedDisposition(self._failed, self._undeliverable, 
self._annotations).apply_to(local_disposition)
 
 
+class RemoteTransactionalDisposition(RemoteDisposition):
+
+    def __init__(self, delivery_impl):
+        impl = pn_transactional_disposition(pn_delivery_remote(delivery_impl))
+        self._id = pn_transactional_disposition_get_id(impl)
+        self._outcome_type = 
pn_transactional_disposition_get_outcome_type(impl)
+
+    @property
+    def type(self) -> Union[int, DispositionType]:
+        return Disposition.TRANSACTIONAL_STATE
+
+    @property
+    def id(self):
+        return self._id
+
+    @property
+    def outcome_type(self):
+        return self._outcome_type
+
+    def apply_to(self, local_disposition: 'LocalDisposition'):
+        TransactionalDisposition(self._id, 
self._outcome_type).apply_to(local_disposition)
+
+
 class LocalDisposition(Disposition):
 
     def __init__(self, delivery_impl):
@@ -430,6 +467,39 @@ class ModifiedDisposition(LocalDisposition):
         obj2dat(self._annotations, pn_modified_disposition_annotations(disp))
 
 
+class TransactionalDisposition(LocalDisposition):
+
+    def __init__(self, id, outcome_type=None):
+        self._id = id
+        self._outcome_type = outcome_type
+
+    @property
+    def type(self) -> Union[int, DispositionType]:
+        return Disposition.TRANSACTIONAL_STATE
+
+    @property
+    def id(self):
+        return self._id
+
+    @id.setter
+    def id(self, id):
+        self._id = id
+
+    @property
+    def outcome_type(self):
+        return self._outcome_type
+
+    @outcome_type.setter
+    def outcome_type(self, type):
+        self._outcome_type = type
+
+    def apply_to(self, local_disposition: LocalDisposition):
+        disp = pn_transactional_disposition(local_disposition._impl)
+        pn_transactional_disposition_set_id(disp, self._id)
+        if self._outcome_type:
+            pn_transactional_disposition_set_outcome_type(disp, 
self._outcome_type)
+
+
 class Delivery(Wrapper):
     """
     Tracks and/or records the delivery of a message over a link.
diff --git a/python/tests/proton_tests/engine.py 
b/python/tests/proton_tests/engine.py
index 894a910ff..94010d61e 100644
--- a/python/tests/proton_tests/engine.py
+++ b/python/tests/proton_tests/engine.py
@@ -24,7 +24,7 @@ from typing import Union
 
 from proton import Array, Condition, Collector, Connection, Data, Delivery, 
Disposition, DispositionType, Endpoint, \
     Event, CustomDisposition, Link, ModifiedDisposition, PropertyDict, 
ReceivedDisposition, RejectedDisposition, \
-    SASL, SessionException, SymbolList, Terminus, Transport, UNDESCRIBED, 
symbol
+    SASL, SessionException, SymbolList, Terminus, TransactionalDisposition, 
Transport, UNDESCRIBED, symbol
 from proton.reactor import Container
 
 from . import common
@@ -2410,6 +2410,23 @@ class NewCustomTester(DispositionTester):
         assert dlv.remote.data == self._data, (dlv.data, self._data)
 
 
+class TransactionalTester(DispositionTester):
+    def __init__(self, id, outcome_type):
+        self._id = id
+        self._outcome_type = outcome_type
+        super().__init__(Disposition.TRANSACTIONAL_STATE)
+
+    def apply(self, dlv: Delivery):
+        dlv.local = TransactionalDisposition(self._id, self._outcome_type)
+        dlv.update()
+
+    def check(self, dlv: Delivery):
+        assert dlv.remote_state == self._type
+        assert dlv.remote.type == self._type
+        assert dlv.remote.id == self._id
+        assert dlv.remote.outcome_type == self._outcome_type
+
+
 class DeliveryTest(Test):
 
     def tearDown(self):
@@ -2488,6 +2505,9 @@ class DeliveryTest(Test):
         self._testDisposition(NewModifiedTester(failed=True, 
undeliverable=True,
                                                 annotations={"key": "value"}))
 
+    def testTransactional(self):
+        self._testDisposition(TransactionalTester(id=b'1324xxx', 
outcome_type=Disposition.ACCEPTED))
+
     def testCustom(self):
         self._testDisposition(CustomTester(0x12345, [1, 2, 3]))
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to