Repository: qpid-proton Updated Branches: refs/heads/master 8d1d20eb3 -> 0c11d11cd
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0c11d11c/proton-c/bindings/go/src/qpid.apache.org/proton/event/wrappers_gen.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/event/wrappers_gen.go b/proton-c/bindings/go/src/qpid.apache.org/proton/event/wrappers_gen.go index f653525..8f678ca 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/event/wrappers_gen.go +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/event/wrappers_gen.go @@ -24,11 +24,14 @@ under the License. package event import ( + "qpid.apache.org/proton/internal" "time" + "unsafe" ) // #include <proton/types.h> // #include <proton/event.h> +// #include <stdlib.h> // #include <proton/session.h> // #include <proton/link.h> // #include <proton/delivery.h> @@ -40,47 +43,57 @@ import "C" type Event struct{ pn *C.pn_event_t } -func (e Event) isNil() bool { return e.pn == nil } -func (e Event) Type() EventType { return EventType(C.pn_event_type(e.pn)) } -func (e Event) Connection() Connection { return Connection{C.pn_event_connection(e.pn)} } -func (e Event) Session() Session { return Session{C.pn_event_session(e.pn)} } -func (e Event) Link() Link { return Link{C.pn_event_link(e.pn)} } -func (e Event) Delivery() Delivery { return Delivery{C.pn_event_delivery(e.pn)} } -func (e Event) Transport() Transport { return Transport{C.pn_event_transport(e.pn)} } -func (e Event) String() string { return e.Type().String() } +func (e Event) IsNil() bool { return e.pn == nil } +func (e Event) Type() EventType { + return EventType(C.pn_event_type(e.pn)) +} +func (e Event) Connection() Connection { + return Connection{C.pn_event_connection(e.pn)} +} +func (e Event) Session() Session { + return Session{C.pn_event_session(e.pn)} +} +func (e Event) Link() Link { + return Link{C.pn_event_link(e.pn)} +} +func (e Event) Delivery() Delivery { + return Delivery{C.pn_event_delivery(e.pn)} +} +func (e Event) String() string { return e.Type().String() } type EventType int const ( - EConnectionInit EventType = C.PN_CONNECTION_INIT - EConnectionBound EventType = C.PN_CONNECTION_BOUND - EConnectionUnbound EventType = C.PN_CONNECTION_UNBOUND - EConnectionLocalOpen EventType = C.PN_CONNECTION_LOCAL_OPEN - EConnectionRemoteOpen EventType = C.PN_CONNECTION_REMOTE_OPEN - EConnectionLocalClose EventType = C.PN_CONNECTION_LOCAL_CLOSE - EConnectionRemoteClose EventType = C.PN_CONNECTION_REMOTE_CLOSE - EConnectionFinal EventType = C.PN_CONNECTION_FINAL - ESessionInit EventType = C.PN_SESSION_INIT - ESessionLocalOpen EventType = C.PN_SESSION_LOCAL_OPEN - ESessionRemoteOpen EventType = C.PN_SESSION_REMOTE_OPEN - ESessionLocalClose EventType = C.PN_SESSION_LOCAL_CLOSE - ESessionRemoteClose EventType = C.PN_SESSION_REMOTE_CLOSE - ESessionFinal EventType = C.PN_SESSION_FINAL - ELinkInit EventType = C.PN_LINK_INIT - ELinkLocalOpen EventType = C.PN_LINK_LOCAL_OPEN - ELinkRemoteOpen EventType = C.PN_LINK_REMOTE_OPEN - ELinkLocalClose EventType = C.PN_LINK_LOCAL_CLOSE - ELinkRemoteClose EventType = C.PN_LINK_REMOTE_CLOSE - ELinkLocalDetach EventType = C.PN_LINK_LOCAL_DETACH - ELinkRemoteDetach EventType = C.PN_LINK_REMOTE_DETACH - ELinkFlow EventType = C.PN_LINK_FLOW - ELinkFinal EventType = C.PN_LINK_FINAL - EDelivery EventType = C.PN_DELIVERY - ETransport EventType = C.PN_TRANSPORT - ETransportError EventType = C.PN_TRANSPORT_ERROR - ETransportHeadClosed EventType = C.PN_TRANSPORT_HEAD_CLOSED - ETransportTailClosed EventType = C.PN_TRANSPORT_TAIL_CLOSED - ETransportClosed EventType = C.PN_TRANSPORT_CLOSED + EConnectionInit EventType = C.PN_CONNECTION_INIT + EConnectionBound EventType = C.PN_CONNECTION_BOUND + EConnectionUnbound EventType = C.PN_CONNECTION_UNBOUND + EConnectionLocalOpen EventType = C.PN_CONNECTION_LOCAL_OPEN + EConnectionRemoteOpen EventType = C.PN_CONNECTION_REMOTE_OPEN + EConnectionLocalClose EventType = C.PN_CONNECTION_LOCAL_CLOSE + EConnectionRemoteClose EventType = C.PN_CONNECTION_REMOTE_CLOSE + EConnectionFinal EventType = C.PN_CONNECTION_FINAL + ESessionInit EventType = C.PN_SESSION_INIT + ESessionLocalOpen EventType = C.PN_SESSION_LOCAL_OPEN + ESessionRemoteOpen EventType = C.PN_SESSION_REMOTE_OPEN + ESessionLocalClose EventType = C.PN_SESSION_LOCAL_CLOSE + ESessionRemoteClose EventType = C.PN_SESSION_REMOTE_CLOSE + ESessionFinal EventType = C.PN_SESSION_FINAL + ELinkInit EventType = C.PN_LINK_INIT + ELinkLocalOpen EventType = C.PN_LINK_LOCAL_OPEN + ELinkRemoteOpen EventType = C.PN_LINK_REMOTE_OPEN + ELinkLocalClose EventType = C.PN_LINK_LOCAL_CLOSE + ELinkRemoteClose EventType = C.PN_LINK_REMOTE_CLOSE + ELinkLocalDetach EventType = C.PN_LINK_LOCAL_DETACH + ELinkRemoteDetach EventType = C.PN_LINK_REMOTE_DETACH + ELinkFlow EventType = C.PN_LINK_FLOW + ELinkFinal EventType = C.PN_LINK_FINAL + EDelivery EventType = C.PN_DELIVERY + ETransport EventType = C.PN_TRANSPORT + ETransportAuthenticated EventType = C.PN_TRANSPORT_AUTHENTICATED + ETransportError EventType = C.PN_TRANSPORT_ERROR + ETransportHeadClosed EventType = C.PN_TRANSPORT_HEAD_CLOSED + ETransportTailClosed EventType = C.PN_TRANSPORT_TAIL_CLOSED + ETransportClosed EventType = C.PN_TRANSPORT_CLOSED ) func (e EventType) String() string { @@ -136,6 +149,8 @@ func (e EventType) String() string { return "Delivery" case C.PN_TRANSPORT: return "Transport" + case C.PN_TRANSPORT_AUTHENTICATED: + return "TransportAuthenticated" case C.PN_TRANSPORT_ERROR: return "TransportError" case C.PN_TRANSPORT_HEAD_CLOSED: @@ -152,21 +167,43 @@ func (e EventType) String() string { type Session struct{ pn *C.pn_session_t } -func (s Session) isNil() bool { return s.pn == nil } -func (s Session) Free() { C.pn_session_free(s.pn) } -func (s Session) State() State { return State(C.pn_session_state(s.pn)) } -func (s Session) Error() error { return pnError(C.pn_session_error(s.pn)) } -func (s Session) Condition() Condition { return Condition{C.pn_session_condition(s.pn)} } -func (s Session) RemoteCondition() Condition { return Condition{C.pn_session_remote_condition(s.pn)} } -func (s Session) Connection() Connection { return Connection{C.pn_session_connection(s.pn)} } -func (s Session) Open() { C.pn_session_open(s.pn) } -func (s Session) Close() { C.pn_session_close(s.pn) } -func (s Session) IncomingCapacity() uint { return uint(C.pn_session_get_incoming_capacity(s.pn)) } +func (s Session) IsNil() bool { return s.pn == nil } +func (s Session) Free() { + C.pn_session_free(s.pn) +} +func (s Session) State() State { + return State(C.pn_session_state(s.pn)) +} +func (s Session) Error() error { + return internal.PnError(unsafe.Pointer(C.pn_session_error(s.pn))) +} +func (s Session) Condition() Condition { + return Condition{C.pn_session_condition(s.pn)} +} +func (s Session) RemoteCondition() Condition { + return Condition{C.pn_session_remote_condition(s.pn)} +} +func (s Session) Connection() Connection { + return Connection{C.pn_session_connection(s.pn)} +} +func (s Session) Open() { + C.pn_session_open(s.pn) +} +func (s Session) Close() { + C.pn_session_close(s.pn) +} +func (s Session) IncomingCapacity() uint { + return uint(C.pn_session_get_incoming_capacity(s.pn)) +} func (s Session) SetIncomingCapacity(capacity uint) { C.pn_session_set_incoming_capacity(s.pn, C.size_t(capacity)) } -func (s Session) OutgoingBytes() uint { return uint(C.pn_session_outgoing_bytes(s.pn)) } -func (s Session) IncomingBytes() uint { return uint(C.pn_session_incoming_bytes(s.pn)) } +func (s Session) OutgoingBytes() uint { + return uint(C.pn_session_outgoing_bytes(s.pn)) +} +func (s Session) IncomingBytes() uint { + return uint(C.pn_session_incoming_bytes(s.pn)) +} func (s Session) Next(state State) Session { return Session{C.pn_session_next(s.pn, C.pn_state_t(state))} } @@ -185,11 +222,11 @@ func (e SndSettleMode) String() string { switch e { case C.PN_SND_UNSETTLED: - return "PnSndUnsettled" + return "SndUnsettled" case C.PN_SND_SETTLED: - return "PnSndSettled" + return "SndSettled" case C.PN_SND_MIXED: - return "PnSndMixed" + return "SndMixed" } return "unknown" } @@ -205,43 +242,97 @@ func (e RcvSettleMode) String() string { switch e { case C.PN_RCV_FIRST: - return "PnRcvFirst" + return "RcvFirst" case C.PN_RCV_SECOND: - return "PnRcvSecond" + return "RcvSecond" } return "unknown" } type Link struct{ pn *C.pn_link_t } -func (l Link) isNil() bool { return l.pn == nil } -func (l Link) Free() { C.pn_link_free(l.pn) } -func (l Link) Name() string { return C.GoString(C.pn_link_name(l.pn)) } -func (l Link) IsSender() bool { return bool(C.pn_link_is_sender(l.pn)) } -func (l Link) IsReceiver() bool { return bool(C.pn_link_is_receiver(l.pn)) } -func (l Link) State() State { return State(C.pn_link_state(l.pn)) } -func (l Link) Error() error { return pnError(C.pn_link_error(l.pn)) } -func (l Link) Condition() Condition { return Condition{C.pn_link_condition(l.pn)} } -func (l Link) RemoteCondition() Condition { return Condition{C.pn_link_remote_condition(l.pn)} } -func (l Link) Session() Session { return Session{C.pn_link_session(l.pn)} } -func (l Link) Next(state State) Link { return Link{C.pn_link_next(l.pn, C.pn_state_t(state))} } -func (l Link) Open() { C.pn_link_open(l.pn) } -func (l Link) Close() { C.pn_link_close(l.pn) } -func (l Link) Detach() { C.pn_link_detach(l.pn) } -func (l Link) Source() Terminus { return Terminus{C.pn_link_source(l.pn)} } -func (l Link) Target() Terminus { return Terminus{C.pn_link_target(l.pn)} } -func (l Link) RemoteSource() Terminus { return Terminus{C.pn_link_remote_source(l.pn)} } -func (l Link) RemoteTarget() Terminus { return Terminus{C.pn_link_remote_target(l.pn)} } -func (l Link) Current() Delivery { return Delivery{C.pn_link_current(l.pn)} } -func (l Link) Advance() bool { return bool(C.pn_link_advance(l.pn)) } -func (l Link) Credit() int { return int(C.pn_link_credit(l.pn)) } -func (l Link) Queued() int { return int(C.pn_link_queued(l.pn)) } -func (l Link) RemoteCredit() int { return int(C.pn_link_remote_credit(l.pn)) } -func (l Link) IsDrain() bool { return bool(C.pn_link_get_drain(l.pn)) } -func (l Link) Drained() int { return int(C.pn_link_drained(l.pn)) } -func (l Link) Available() int { return int(C.pn_link_available(l.pn)) } -func (l Link) SndSettleMode() SndSettleMode { return SndSettleMode(C.pn_link_snd_settle_mode(l.pn)) } -func (l Link) RcvSettleMode() RcvSettleMode { return RcvSettleMode(C.pn_link_rcv_settle_mode(l.pn)) } +func (l Link) IsNil() bool { return l.pn == nil } +func (l Link) Free() { + C.pn_link_free(l.pn) +} +func (l Link) Name() string { + return C.GoString(C.pn_link_name(l.pn)) +} +func (l Link) IsSender() bool { + return bool(C.pn_link_is_sender(l.pn)) +} +func (l Link) IsReceiver() bool { + return bool(C.pn_link_is_receiver(l.pn)) +} +func (l Link) State() State { + return State(C.pn_link_state(l.pn)) +} +func (l Link) Error() error { + return internal.PnError(unsafe.Pointer(C.pn_link_error(l.pn))) +} +func (l Link) Condition() Condition { + return Condition{C.pn_link_condition(l.pn)} +} +func (l Link) RemoteCondition() Condition { + return Condition{C.pn_link_remote_condition(l.pn)} +} +func (l Link) Session() Session { + return Session{C.pn_link_session(l.pn)} +} +func (l Link) Next(state State) Link { + return Link{C.pn_link_next(l.pn, C.pn_state_t(state))} +} +func (l Link) Open() { + C.pn_link_open(l.pn) +} +func (l Link) Close() { + C.pn_link_close(l.pn) +} +func (l Link) Detach() { + C.pn_link_detach(l.pn) +} +func (l Link) Source() Terminus { + return Terminus{C.pn_link_source(l.pn)} +} +func (l Link) Target() Terminus { + return Terminus{C.pn_link_target(l.pn)} +} +func (l Link) RemoteSource() Terminus { + return Terminus{C.pn_link_remote_source(l.pn)} +} +func (l Link) RemoteTarget() Terminus { + return Terminus{C.pn_link_remote_target(l.pn)} +} +func (l Link) Current() Delivery { + return Delivery{C.pn_link_current(l.pn)} +} +func (l Link) Advance() bool { + return bool(C.pn_link_advance(l.pn)) +} +func (l Link) Credit() int { + return int(C.pn_link_credit(l.pn)) +} +func (l Link) Queued() int { + return int(C.pn_link_queued(l.pn)) +} +func (l Link) RemoteCredit() int { + return int(C.pn_link_remote_credit(l.pn)) +} +func (l Link) IsDrain() bool { + return bool(C.pn_link_get_drain(l.pn)) +} +func (l Link) Drained() int { + return int(C.pn_link_drained(l.pn)) +} +func (l Link) Available() int { + return int(C.pn_link_available(l.pn)) +} +func (l Link) SndSettleMode() SndSettleMode { + return SndSettleMode(C.pn_link_snd_settle_mode(l.pn)) +} +func (l Link) RcvSettleMode() RcvSettleMode { + return RcvSettleMode(C.pn_link_rcv_settle_mode(l.pn)) +} func (l Link) SetSndSettleMode(mode SndSettleMode) { C.pn_link_set_snd_settle_mode(l.pn, C.pn_snd_settle_mode_t(mode)) } @@ -254,79 +345,165 @@ func (l Link) RemoteSndSettleMode() SndSettleMode { func (l Link) RemoteRcvSettleMode() RcvSettleMode { return RcvSettleMode(C.pn_link_remote_rcv_settle_mode(l.pn)) } -func (l Link) Unsettled() int { return int(C.pn_link_unsettled(l.pn)) } -func (l Link) Offered(credit int) { C.pn_link_offered(l.pn, C.int(credit)) } -func (l Link) Flow(credit int) { C.pn_link_flow(l.pn, C.int(credit)) } -func (l Link) Drain(credit int) { C.pn_link_drain(l.pn, C.int(credit)) } -func (l Link) SetDrain(drain bool) { C.pn_link_set_drain(l.pn, C.bool(drain)) } -func (l Link) Draining() bool { return bool(C.pn_link_draining(l.pn)) } +func (l Link) Unsettled() int { + return int(C.pn_link_unsettled(l.pn)) +} +func (l Link) Offered(credit int) { + C.pn_link_offered(l.pn, C.int(credit)) +} +func (l Link) Flow(credit int) { + C.pn_link_flow(l.pn, C.int(credit)) +} +func (l Link) Drain(credit int) { + C.pn_link_drain(l.pn, C.int(credit)) +} +func (l Link) SetDrain(drain bool) { + C.pn_link_set_drain(l.pn, C.bool(drain)) +} +func (l Link) Draining() bool { + return bool(C.pn_link_draining(l.pn)) +} // Wrappers for declarations in delivery.h type Delivery struct{ pn *C.pn_delivery_t } -func (d Delivery) isNil() bool { return d.pn == nil } -func (d Delivery) Tag() DeliveryTag { return DeliveryTag{C.pn_delivery_tag(d.pn)} } -func (d Delivery) Link() Link { return Link{C.pn_delivery_link(d.pn)} } -func (d Delivery) Local() Disposition { return Disposition{C.pn_delivery_local(d.pn)} } -func (d Delivery) LocalState() uint64 { return uint64(C.pn_delivery_local_state(d.pn)) } -func (d Delivery) Remote() Disposition { return Disposition{C.pn_delivery_remote(d.pn)} } -func (d Delivery) RemoteState() uint64 { return uint64(C.pn_delivery_remote_state(d.pn)) } -func (d Delivery) Settled() bool { return bool(C.pn_delivery_settled(d.pn)) } -func (d Delivery) Pending() uint { return uint(C.pn_delivery_pending(d.pn)) } -func (d Delivery) Partial() bool { return bool(C.pn_delivery_partial(d.pn)) } -func (d Delivery) Writable() bool { return bool(C.pn_delivery_writable(d.pn)) } -func (d Delivery) Readable() bool { return bool(C.pn_delivery_readable(d.pn)) } -func (d Delivery) Updated() bool { return bool(C.pn_delivery_updated(d.pn)) } -func (d Delivery) Update(state uint64) { C.pn_delivery_update(d.pn, C.uint64_t(state)) } -func (d Delivery) Clear() { C.pn_delivery_clear(d.pn) } -func (d Delivery) Settle() { C.pn_delivery_settle(d.pn) } -func (d Delivery) Dump() { C.pn_delivery_dump(d.pn) } -func (d Delivery) Buffered() bool { return bool(C.pn_delivery_buffered(d.pn)) } +func (d Delivery) IsNil() bool { return d.pn == nil } +func (d Delivery) Tag() DeliveryTag { + return DeliveryTag{C.pn_delivery_tag(d.pn)} +} +func (d Delivery) Link() Link { + return Link{C.pn_delivery_link(d.pn)} +} +func (d Delivery) Local() Disposition { + return Disposition{C.pn_delivery_local(d.pn)} +} +func (d Delivery) LocalState() uint64 { + return uint64(C.pn_delivery_local_state(d.pn)) +} +func (d Delivery) Remote() Disposition { + return Disposition{C.pn_delivery_remote(d.pn)} +} +func (d Delivery) RemoteState() uint64 { + return uint64(C.pn_delivery_remote_state(d.pn)) +} +func (d Delivery) Settled() bool { + return bool(C.pn_delivery_settled(d.pn)) +} +func (d Delivery) Pending() uint { + return uint(C.pn_delivery_pending(d.pn)) +} +func (d Delivery) Partial() bool { + return bool(C.pn_delivery_partial(d.pn)) +} +func (d Delivery) Writable() bool { + return bool(C.pn_delivery_writable(d.pn)) +} +func (d Delivery) Readable() bool { + return bool(C.pn_delivery_readable(d.pn)) +} +func (d Delivery) Updated() bool { + return bool(C.pn_delivery_updated(d.pn)) +} +func (d Delivery) Update(state uint64) { + C.pn_delivery_update(d.pn, C.uint64_t(state)) +} +func (d Delivery) Clear() { + C.pn_delivery_clear(d.pn) +} +func (d Delivery) Settle() { + C.pn_delivery_settle(d.pn) +} +func (d Delivery) Dump() { + C.pn_delivery_dump(d.pn) +} +func (d Delivery) Buffered() bool { + return bool(C.pn_delivery_buffered(d.pn)) +} // Wrappers for declarations in disposition.h type Disposition struct{ pn *C.pn_disposition_t } -func (d Disposition) isNil() bool { return d.pn == nil } -func (d Disposition) Type() uint64 { return uint64(C.pn_disposition_type(d.pn)) } -func (d Disposition) Condition() Condition { return Condition{C.pn_disposition_condition(d.pn)} } -func (d Disposition) Data() Data { return Data{C.pn_disposition_data(d.pn)} } -func (d Disposition) SectionNumber() uint32 { return uint32(C.pn_disposition_get_section_number(d.pn)) } -func (d Disposition) SetSectionNumber(section_number uint32) { +func (d Disposition) IsNil() bool { return d.pn == nil } +func (d Disposition) Type() uint64 { + return uint64(C.pn_disposition_type(d.pn)) +} +func (d Disposition) Condition() Condition { + return Condition{C.pn_disposition_condition(d.pn)} +} +func (d Disposition) Data() Data { + return Data{C.pn_disposition_data(d.pn)} +} +func (d Disposition) SectionNumber() uint16 { + return uint16(C.pn_disposition_get_section_number(d.pn)) +} +func (d Disposition) SetSectionNumber(section_number uint16) { C.pn_disposition_set_section_number(d.pn, C.uint32_t(section_number)) } -func (d Disposition) SectionOffset() uint64 { return uint64(C.pn_disposition_get_section_offset(d.pn)) } +func (d Disposition) SectionOffset() uint64 { + return uint64(C.pn_disposition_get_section_offset(d.pn)) +} func (d Disposition) SetSectionOffset(section_offset uint64) { C.pn_disposition_set_section_offset(d.pn, C.uint64_t(section_offset)) } -func (d Disposition) IsFailed() bool { return bool(C.pn_disposition_is_failed(d.pn)) } -func (d Disposition) SetFailed(failed bool) { C.pn_disposition_set_failed(d.pn, C.bool(failed)) } -func (d Disposition) IsUndeliverable() bool { return bool(C.pn_disposition_is_undeliverable(d.pn)) } +func (d Disposition) IsFailed() bool { + return bool(C.pn_disposition_is_failed(d.pn)) +} +func (d Disposition) SetFailed(failed bool) { + C.pn_disposition_set_failed(d.pn, C.bool(failed)) +} +func (d Disposition) IsUndeliverable() bool { + return bool(C.pn_disposition_is_undeliverable(d.pn)) +} func (d Disposition) SetUndeliverable(undeliverable bool) { C.pn_disposition_set_undeliverable(d.pn, C.bool(undeliverable)) } -func (d Disposition) Annotations() Data { return Data{C.pn_disposition_annotations(d.pn)} } +func (d Disposition) Annotations() Data { + return Data{C.pn_disposition_annotations(d.pn)} +} // Wrappers for declarations in condition.h type Condition struct{ pn *C.pn_condition_t } -func (c Condition) isNil() bool { return c.pn == nil } -func (c Condition) IsSet() bool { return bool(C.pn_condition_is_set(c.pn)) } -func (c Condition) Clear() { C.pn_condition_clear(c.pn) } -func (c Condition) Name() string { return C.GoString(C.pn_condition_get_name(c.pn)) } +func (c Condition) IsNil() bool { return c.pn == nil } +func (c Condition) IsSet() bool { + return bool(C.pn_condition_is_set(c.pn)) +} +func (c Condition) Clear() { + C.pn_condition_clear(c.pn) +} +func (c Condition) Name() string { + return C.GoString(C.pn_condition_get_name(c.pn)) +} func (c Condition) SetName(name string) int { - return int(C.pn_condition_set_name(c.pn, C.CString(name))) + nameC := C.CString(name) + defer C.free(unsafe.Pointer(nameC)) + + return int(C.pn_condition_set_name(c.pn, nameC)) +} +func (c Condition) Description() string { + return C.GoString(C.pn_condition_get_description(c.pn)) } -func (c Condition) Description() string { return C.GoString(C.pn_condition_get_description(c.pn)) } func (c Condition) SetDescription(description string) int { - return int(C.pn_condition_set_description(c.pn, C.CString(description))) + descriptionC := C.CString(description) + defer C.free(unsafe.Pointer(descriptionC)) + + return int(C.pn_condition_set_description(c.pn, descriptionC)) +} +func (c Condition) Info() Data { + return Data{C.pn_condition_info(c.pn)} +} +func (c Condition) IsRedirect() bool { + return bool(C.pn_condition_is_redirect(c.pn)) +} +func (c Condition) RedirectHost() string { + return C.GoString(C.pn_condition_redirect_host(c.pn)) +} +func (c Condition) RedirectPort() int { + return int(C.pn_condition_redirect_port(c.pn)) } -func (c Condition) Info() Data { return Data{C.pn_condition_info(c.pn)} } -func (c Condition) IsRedirect() bool { return bool(C.pn_condition_is_redirect(c.pn)) } -func (c Condition) RedirectHost() string { return C.GoString(C.pn_condition_redirect_host(c.pn)) } -func (c Condition) RedirectPort() int { return int(C.pn_condition_redirect_port(c.pn)) } // Wrappers for declarations in terminus.h @@ -343,13 +520,13 @@ func (e TerminusType) String() string { switch e { case C.PN_UNSPECIFIED: - return "PnUnspecified" + return "Unspecified" case C.PN_SOURCE: - return "PnSource" + return "Source" case C.PN_TARGET: - return "PnTarget" + return "Target" case C.PN_COORDINATOR: - return "PnCoordinator" + return "Coordinator" } return "unknown" } @@ -366,11 +543,11 @@ func (e Durability) String() string { switch e { case C.PN_NONDURABLE: - return "PnNondurable" + return "Nondurable" case C.PN_CONFIGURATION: - return "PnConfiguration" + return "Configuration" case C.PN_DELIVERIES: - return "PnDeliveries" + return "Deliveries" } return "unknown" } @@ -388,13 +565,13 @@ func (e ExpiryPolicy) String() string { switch e { case C.PN_EXPIRE_WITH_LINK: - return "PnExpireWithLink" + return "ExpireWithLink" case C.PN_EXPIRE_WITH_SESSION: - return "PnExpireWithSession" + return "ExpireWithSession" case C.PN_EXPIRE_WITH_CONNECTION: - return "PnExpireWithConnection" + return "ExpireWithConnection" case C.PN_EXPIRE_NEVER: - return "PnExpireNever" + return "ExpireNever" } return "unknown" } @@ -411,30 +588,39 @@ func (e DistributionMode) String() string { switch e { case C.PN_DIST_MODE_UNSPECIFIED: - return "PnDistModeUnspecified" + return "DistModeUnspecified" case C.PN_DIST_MODE_COPY: - return "PnDistModeCopy" + return "DistModeCopy" case C.PN_DIST_MODE_MOVE: - return "PnDistModeMove" + return "DistModeMove" } return "unknown" } type Terminus struct{ pn *C.pn_terminus_t } -func (t Terminus) isNil() bool { return t.pn == nil } -func (t Terminus) Type() TerminusType { return TerminusType(C.pn_terminus_get_type(t.pn)) } +func (t Terminus) IsNil() bool { return t.pn == nil } +func (t Terminus) Type() TerminusType { + return TerminusType(C.pn_terminus_get_type(t.pn)) +} func (t Terminus) SetType(type_ TerminusType) int { return int(C.pn_terminus_set_type(t.pn, C.pn_terminus_type_t(type_))) } -func (t Terminus) Address() string { return C.GoString(C.pn_terminus_get_address(t.pn)) } +func (t Terminus) Address() string { + return C.GoString(C.pn_terminus_get_address(t.pn)) +} func (t Terminus) SetAddress(address string) int { - return int(C.pn_terminus_set_address(t.pn, C.CString(address))) + addressC := C.CString(address) + defer C.free(unsafe.Pointer(addressC)) + + return int(C.pn_terminus_set_address(t.pn, addressC)) } func (t Terminus) SetDistributionMode(mode DistributionMode) int { return int(C.pn_terminus_set_distribution_mode(t.pn, C.pn_distribution_mode_t(mode))) } -func (t Terminus) Durability() Durability { return Durability(C.pn_terminus_get_durability(t.pn)) } +func (t Terminus) Durability() Durability { + return Durability(C.pn_terminus_get_durability(t.pn)) +} func (t Terminus) SetDurability(durability Durability) int { return int(C.pn_terminus_set_durability(t.pn, C.pn_durability_t(durability))) } @@ -450,56 +636,114 @@ func (t Terminus) Timeout() time.Duration { func (t Terminus) SetTimeout(timeout time.Duration) int { return int(C.pn_terminus_set_timeout(t.pn, C.pn_seconds_t(timeout))) } -func (t Terminus) IsDynamic() bool { return bool(C.pn_terminus_is_dynamic(t.pn)) } +func (t Terminus) IsDynamic() bool { + return bool(C.pn_terminus_is_dynamic(t.pn)) +} func (t Terminus) SetDynamic(dynamic bool) int { return int(C.pn_terminus_set_dynamic(t.pn, C.bool(dynamic))) } -func (t Terminus) Properties() Data { return Data{C.pn_terminus_properties(t.pn)} } -func (t Terminus) Capabilities() Data { return Data{C.pn_terminus_capabilities(t.pn)} } -func (t Terminus) Outcomes() Data { return Data{C.pn_terminus_outcomes(t.pn)} } -func (t Terminus) Filter() Data { return Data{C.pn_terminus_filter(t.pn)} } -func (t Terminus) Copy(src Terminus) int { return int(C.pn_terminus_copy(t.pn, src.pn)) } +func (t Terminus) Properties() Data { + return Data{C.pn_terminus_properties(t.pn)} +} +func (t Terminus) Capabilities() Data { + return Data{C.pn_terminus_capabilities(t.pn)} +} +func (t Terminus) Outcomes() Data { + return Data{C.pn_terminus_outcomes(t.pn)} +} +func (t Terminus) Filter() Data { + return Data{C.pn_terminus_filter(t.pn)} +} +func (t Terminus) Copy(src Terminus) int { + return int(C.pn_terminus_copy(t.pn, src.pn)) +} // Wrappers for declarations in connection.h type Connection struct{ pn *C.pn_connection_t } -func (c Connection) isNil() bool { return c.pn == nil } -func (c Connection) Free() { C.pn_connection_free(c.pn) } -func (c Connection) Release() { C.pn_connection_release(c.pn) } -func (c Connection) Error() error { return pnError(C.pn_connection_error(c.pn)) } -func (c Connection) State() State { return State(C.pn_connection_state(c.pn)) } -func (c Connection) Open() { C.pn_connection_open(c.pn) } -func (c Connection) Close() { C.pn_connection_close(c.pn) } -func (c Connection) Reset() { C.pn_connection_reset(c.pn) } -func (c Connection) Condition() Condition { return Condition{C.pn_connection_condition(c.pn)} } +func (c Connection) IsNil() bool { return c.pn == nil } +func (c Connection) Free() { + C.pn_connection_free(c.pn) +} +func (c Connection) Release() { + C.pn_connection_release(c.pn) +} +func (c Connection) Error() error { + return internal.PnError(unsafe.Pointer(C.pn_connection_error(c.pn))) +} +func (c Connection) State() State { + return State(C.pn_connection_state(c.pn)) +} +func (c Connection) Open() { + C.pn_connection_open(c.pn) +} +func (c Connection) Close() { + C.pn_connection_close(c.pn) +} +func (c Connection) Reset() { + C.pn_connection_reset(c.pn) +} +func (c Connection) Condition() Condition { + return Condition{C.pn_connection_condition(c.pn)} +} func (c Connection) RemoteCondition() Condition { return Condition{C.pn_connection_remote_condition(c.pn)} } -func (c Connection) Container() string { return C.GoString(C.pn_connection_get_container(c.pn)) } +func (c Connection) Container() string { + return C.GoString(C.pn_connection_get_container(c.pn)) +} func (c Connection) SetContainer(container string) { - C.pn_connection_set_container(c.pn, C.CString(container)) + containerC := C.CString(container) + defer C.free(unsafe.Pointer(containerC)) + + C.pn_connection_set_container(c.pn, containerC) +} +func (c Connection) SetUser(user string) { + userC := C.CString(user) + defer C.free(unsafe.Pointer(userC)) + + C.pn_connection_set_user(c.pn, userC) +} +func (c Connection) SetPassword(password string) { + passwordC := C.CString(password) + defer C.free(unsafe.Pointer(passwordC)) + + C.pn_connection_set_password(c.pn, passwordC) +} +func (c Connection) User() string { + return C.GoString(C.pn_connection_get_user(c.pn)) +} +func (c Connection) Hostname() string { + return C.GoString(C.pn_connection_get_hostname(c.pn)) } -func (c Connection) Hostname() string { return C.GoString(C.pn_connection_get_hostname(c.pn)) } func (c Connection) SetHostname(hostname string) { - C.pn_connection_set_hostname(c.pn, C.CString(hostname)) + hostnameC := C.CString(hostname) + defer C.free(unsafe.Pointer(hostnameC)) + + C.pn_connection_set_hostname(c.pn, hostnameC) } func (c Connection) RemoteContainer() string { return C.GoString(C.pn_connection_remote_container(c.pn)) } -func (c Connection) RemoteHostname() string { return C.GoString(C.pn_connection_remote_hostname(c.pn)) } +func (c Connection) RemoteHostname() string { + return C.GoString(C.pn_connection_remote_hostname(c.pn)) +} func (c Connection) OfferedCapabilities() Data { return Data{C.pn_connection_offered_capabilities(c.pn)} } func (c Connection) DesiredCapabilities() Data { return Data{C.pn_connection_desired_capabilities(c.pn)} } -func (c Connection) Properties() Data { return Data{C.pn_connection_properties(c.pn)} } +func (c Connection) Properties() Data { + return Data{C.pn_connection_properties(c.pn)} +} func (c Connection) RemoteOfferedCapabilities() Data { return Data{C.pn_connection_remote_offered_capabilities(c.pn)} } func (c Connection) RemoteDesiredCapabilities() Data { return Data{C.pn_connection_remote_desired_capabilities(c.pn)} } -func (c Connection) RemoteProperties() Data { return Data{C.pn_connection_remote_properties(c.pn)} } -func (c Connection) Transport() Transport { return Transport{C.pn_connection_transport(c.pn)} } +func (c Connection) RemoteProperties() Data { + return Data{C.pn_connection_remote_properties(c.pn)} +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0c11d11c/proton-c/bindings/go/src/qpid.apache.org/proton/internal/error.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/internal/error.go b/proton-c/bindings/go/src/qpid.apache.org/proton/internal/error.go new file mode 100644 index 0000000..01ba890 --- /dev/null +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/internal/error.go @@ -0,0 +1,125 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +// Internal implementation details - ignore. +package internal + +// #cgo LDFLAGS: -lqpid-proton +// #include <proton/error.h> +// #include <proton/codec.h> +import "C" + +import ( + "fmt" + "runtime" + "sync" + "sync/atomic" + "unsafe" +) + +// Error type for all proton errors. +type Error string + +// Error prefixes error message with proton: +func (e Error) Error() string { + return "proton: " + string(e) +} + +// Errorf creates an Error with a formatted message +func Errorf(format string, a ...interface{}) Error { + return Error(fmt.Sprintf(format, a...)) +} + +type PnErrorCode int + +func (e PnErrorCode) String() string { + switch e { + case C.PN_EOS: + return "end-of-data" + case C.PN_ERR: + return "error" + case C.PN_OVERFLOW: + return "overflow" + case C.PN_UNDERFLOW: + return "underflow" + case C.PN_STATE_ERR: + return "bad-state" + case C.PN_ARG_ERR: + return "invalid-argument" + case C.PN_TIMEOUT: + return "timeout" + case C.PN_INTR: + return "interrupted" + case C.PN_INPROGRESS: + return "in-progress" + default: + return fmt.Sprintf("unknown-error(%d)", e) + } +} + +func PnError(p unsafe.Pointer) error { + e := (*C.pn_error_t)(p) + if e == nil || C.pn_error_code(e) == 0 { + return nil + } + return Errorf("%s: %s", PnErrorCode(C.pn_error_code(e)), C.GoString(C.pn_error_text(e))) +} + +// DoRecover is called to recover from internal panics +func DoRecover(err *error) { + r := recover() + switch r := r.(type) { + case nil: // We are not recovering + return + case runtime.Error: // Don't catch runtime.Error + panic(r) + case error: + *err = r + default: + panic(r) + } +} + +// panicIf panics if condition is true, the panic value is Errorf(fmt, args...) +func panicIf(condition bool, fmt string, args ...interface{}) { + if condition { + panic(Errorf(fmt, args...)) + } +} + +// FirstError is a goroutine-safe error holder that keeps the first error that is set. +type FirstError struct { + err atomic.Value + once sync.Once +} + +// Set the error if not allread set. +func (e *FirstError) Set(err error) { + e.once.Do(func() { e.err.Store(err) }) +} + +// Get the error. +func (e *FirstError) Get() error { + v := e.err.Load() + if v != nil { + return v.(error) + } else { + return nil + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0c11d11c/proton-c/bindings/go/src/qpid.apache.org/proton/interop_test.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/interop_test.go b/proton-c/bindings/go/src/qpid.apache.org/proton/interop_test.go index d1703db..67d804c 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/interop_test.go +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/interop_test.go @@ -35,7 +35,7 @@ import ( func assertEqual(want interface{}, got interface{}) { if !reflect.DeepEqual(want, got) { - panic(errorf("%#v != %#v", want, got)) + panic(fmt.Errorf("%#v != %#v", want, got)) } } @@ -48,7 +48,7 @@ func assertNil(err interface{}) { func getReader(name string) (r io.Reader) { r, err := os.Open("../../../../../../tests/interop/" + name + ".amqp") if err != nil { - panic(errorf("Can't open %#v: %v", name, err)) + panic(fmt.Errorf("Can't open %#v: %v", name, err)) } return } @@ -197,7 +197,7 @@ func TestStrings(t *testing.T) { assertDecode(d, "", &sym) remains = remaining(d) if remains != "" { - panic(errorf("leftover: %s", remains)) + t.Fatalf("leftover: %s", remains) } // Test some error handling @@ -220,7 +220,7 @@ func TestStrings(t *testing.T) { t.Error(err) } _, err = Unmarshal([]byte("foobar"), nil) - if !strings.Contains(err.Error(), "invalid argument") { + if !strings.Contains(err.Error(), "invalid-argument") { t.Error(err) } } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0c11d11c/proton-c/bindings/go/src/qpid.apache.org/proton/marshal.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/marshal.go b/proton-c/bindings/go/src/qpid.apache.org/proton/marshal.go index 74bd8e2..e4f230d 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/marshal.go +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/marshal.go @@ -24,10 +24,19 @@ import "C" import ( "io" + "qpid.apache.org/proton/internal" "reflect" "unsafe" ) +func dataError(prefix string, data *C.pn_data_t) error { + err := internal.PnError(unsafe.Pointer(C.pn_data_error(data))) + if err != nil { + err = internal.Errorf("%s: %s", prefix, err.(internal.Error)) + } + return err +} + /* Marshal encodes a Go value as AMQP data in buffer. If buffer is nil, or is not large enough, a new buffer is created. @@ -71,7 +80,7 @@ TODO Go types: array, slice, struct Go types that cannot be marshaled: complex64/128, uintptr, function, interface, channel */ func Marshal(v interface{}, buffer []byte) (outbuf []byte, err error) { - defer doRecover(&err) + defer internal.DoRecover(&err) data := C.pn_data(0) defer C.pn_data_free(data) put(data, v) @@ -92,7 +101,7 @@ func Marshal(v interface{}, buffer []byte) (outbuf []byte, err error) { const minEncode = 256 // overflow is returned when an encoding function can't fit data in the buffer. -var overflow = errorf("buffer too small") +var overflow = internal.Errorf("buffer too small") // encodeFn encodes into buffer[0:len(buffer)]. // Returns buffer with length adjusted for data encoded. @@ -173,7 +182,7 @@ func put(data *C.pn_data_t, v interface{}) { case reflect.Slice: putList(data, v) default: - panic(errorf("cannot marshal %s to AMQP", reflect.TypeOf(v))) + panic(internal.Errorf("cannot marshal %s to AMQP", reflect.TypeOf(v))) } } err := dataError("marshal", data) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0c11d11c/proton-c/bindings/go/src/qpid.apache.org/proton/message.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/message.go b/proton-c/bindings/go/src/qpid.apache.org/proton/message.go index 0788e11..44be51d 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/message.go +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/message.go @@ -25,12 +25,15 @@ package proton import "C" import ( - "qpid.apache.org/proton/event" - "sync/atomic" + "qpid.apache.org/proton/internal" "time" + "unsafe" ) +// FIXME aconway 2015-04-28: Do we need the interface or can we just export the struct? + // Message is the interface to an AMQP message. +// Instances of this interface contain a pointer to the underlying struct. type Message interface { /** * Inferred indicates how the message content @@ -156,11 +159,9 @@ type Message interface { // the message is encoded into it, otherwise a new buffer is created. // Returns the buffer containing the message. Encode(buffer []byte) ([]byte, error) - - // Send the message over an outgoing link - Send(event.Link) (event.Delivery, error) } +// NewMessage creates a new message instance. The returned interface contains a pointer. func NewMessage() Message { pn := C.pn_message() // Pick up default setting from C message. defer C.pn_message_free(pn) @@ -310,30 +311,15 @@ func DecodeMessage(data []byte) (Message, error) { pnMsg := C.pn_message() defer C.pn_message_free(pnMsg) if len(data) == 0 { - return nil, errorf("empty buffer for decode") + return nil, internal.Errorf("empty buffer for decode") } if C.pn_message_decode(pnMsg, cPtr(data), cLen(data)) < 0 { - return nil, pnError("decoding message", C.pn_message_error(pnMsg)) + return nil, internal.Errorf("decoding message: %s", + internal.PnError(unsafe.Pointer(C.pn_message_error(pnMsg)))) } return goMessage(pnMsg), nil } -// EventMessage decodes the message containined in a delivery event. -func EventMessage(e event.Event) (Message, error) { - // FIXME aconway 2015-04-07: temporary - // defer doRecover(&err) - delivery := e.Delivery() - if !delivery.Readable() || delivery.Partial() { - return nil, errorf("attempting to get incomplete message") - } - data := make([]byte, delivery.Pending()) - result := delivery.Link().Recv(data) - if result != len(data) { - return nil, errorf("cannot receive message: %s", pnErrorName(result)) - } - return DecodeMessage(data) -} - // Encode the message into bufffer. // If buffer is nil or len(buffer) is not sufficient to encode the message a larger // buffer will be returned. @@ -347,42 +333,10 @@ func (m *message) Encode(buffer []byte) ([]byte, error) { case result == C.PN_OVERFLOW: return buf, overflow case result < 0: - return buf, errorf("cannot encode message: %s", pnErrorName(int(result))) + return buf, internal.Errorf("cannot encode message: %s", internal.PnErrorCode(result)) default: return buf[:len], nil } } return encodeGrow(buffer, encode) } - -// FIXME aconway 2015-04-08: proper handling of delivery tags. -var tag uint64 - -func getTag() string { - return string(atomic.AddUint64(&tag, 1)) -} - -func (m *message) Send(link event.Link) (event.Delivery, error) { - if !link.IsSender() { - return event.Delivery{}, errorf("attempt to send message on receiving link") - } - // FIXME aconway 2015-04-08: buffering, error handling - delivery := link.Delivery(getTag()) - bytes, err := m.Encode(nil) - if err != nil { - return event.Delivery{}, errorf("cannot send mesage %s", err) - } - result := link.Send(bytes) - link.Advance() - if result != len(bytes) { - if result < 0 { - return delivery, errorf("send failed %v", pnErrorName(result)) - } else { - return delivery, errorf("send incomplete %v of %v", result, len(bytes)) - } - } - if link.RemoteSndSettleMode() == event.PnSndSettled { // FIXME aconway 2015-04-08: enum names - delivery.Settle() - } - return delivery, nil -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0c11d11c/proton-c/bindings/go/src/qpid.apache.org/proton/messaging/doc.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/messaging/doc.go b/proton-c/bindings/go/src/qpid.apache.org/proton/messaging/doc.go new file mode 100644 index 0000000..c815f4e --- /dev/null +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/messaging/doc.go @@ -0,0 +1,28 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +/* +Package messaging provides a procedural, concurrent Go API for exchanging AMQP messages. +*/ +package messaging + +// #cgo LDFLAGS: -lqpid-proton +import "C" + +// Just for package comment http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0c11d11c/proton-c/bindings/go/src/qpid.apache.org/proton/messaging/example_test.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/messaging/example_test.go b/proton-c/bindings/go/src/qpid.apache.org/proton/messaging/example_test.go new file mode 100644 index 0000000..02302b6 --- /dev/null +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/messaging/example_test.go @@ -0,0 +1,268 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +// Tests to verify that example code behaves as expected. +package messaging + +import ( + "bufio" + "bytes" + "fmt" + "io" + "io/ioutil" + "math/rand" + "net" + "os" + "os/exec" + "path" + "path/filepath" + "reflect" + "testing" + "time" +) + +func panicIf(err error) { + if err != nil { + panic(err) + } +} + +// A demo broker process +type broker struct { + cmd *exec.Cmd + addr string + runerr chan error + err error +} + +// Try to connect to the broker to verify it is ready, give up after a timeout +func (b *broker) check() error { + dialer := net.Dialer{Deadline: time.Now().Add(time.Second * 10)} + for { + c, err := dialer.Dial("tcp", b.addr) + if err == nil { // Success + c.Close() + return nil + } + select { + case runerr := <-b.runerr: // Broker exited. + return runerr + default: + } + if neterr, ok := err.(net.Error); ok && neterr.Timeout() { // Running but timed out + b.stop() + return fmt.Errorf("timed out waiting for broker") + } + time.Sleep(time.Second / 10) + } +} + +// Start the demo broker, wait till it is listening on *addr. No-op if already started. +func (b *broker) start() error { + build("event/broker.go") + if b.cmd == nil { // Not already started + // FIXME aconway 2015-04-30: better way to pick/configure a broker address. + b.addr = fmt.Sprintf(":%d", rand.Intn(10000)+10000) + b.cmd = exec.Command(exepath("broker"), "-addr", b.addr, "-verbose", "0") + b.runerr = make(chan error) + // Change the -verbose setting above to see broker output on stdout/stderr. + b.cmd.Stderr, b.cmd.Stdout = os.Stderr, os.Stdout + go func() { + b.runerr <- b.cmd.Run() + }() + b.err = b.check() + } + return b.err +} + +func (b *broker) stop() { + if b != nil && b.cmd != nil { + b.cmd.Process.Kill() + b.cmd.Wait() + } +} + +// FIXME aconway 2015-04-30: redo all assert/panic tests with checkEqual style. +func checkEqual(want interface{}, got interface{}) error { + if reflect.DeepEqual(want, got) { + return nil + } + return fmt.Errorf("%#v != %#v", want, got) +} + +// runCommand returns an exec.Cmd to run an example. +func exampleCommand(prog string, arg ...string) *exec.Cmd { + build(prog + ".go") + cmd := exec.Command(exepath(prog), arg...) + cmd.Stderr = os.Stderr + return cmd +} + +// Run an example Go program, return the combined output as a string. +func runExample(prog string, arg ...string) (string, error) { + cmd := exampleCommand(prog, arg...) + out, err := cmd.Output() + return string(out), err +} + +func prefix(prefix string, err error) error { + if err != nil { + return fmt.Errorf("%s: %s", prefix, err) + } + return nil +} + +func runExampleWant(want string, prog string, args ...string) error { + out, err := runExample(prog, args...) + if err != nil { + return fmt.Errorf("%s failed: %s: %s", prog, err, out) + } + return prefix(prog, checkEqual(want, out)) +} + +func exampleArgs(args ...string) []string { + return append(args, testBroker.addr+"/foo", testBroker.addr+"/bar", testBroker.addr+"/baz") +} + +// Send then receive +func TestExampleSendReceive(t *testing.T) { + if testing.Short() { + t.Skip("Skip demo tests in short mode") + } + testBroker.start() + err := runExampleWant( + "send: Received all 15 acknowledgements\n", + "send", + exampleArgs("-count", "5", "-verbose", "1")...) + if err != nil { + t.Fatal(err) + } + err = runExampleWant( + "receive: Listening\nreceive: Received 15 messages\n", + "receive", + exampleArgs("-verbose", "1", "-count", "15")...) + if err != nil { + t.Fatal(err) + } +} + +var ready error + +func init() { ready = fmt.Errorf("Ready") } + +// Run receive in a goroutine. +// Send ready on errchan when it is listening. +// Send final error when it is done. +// Returns the Cmd, caller must Wait() +func goReceiveWant(errchan chan<- error, want string, arg ...string) *exec.Cmd { + cmd := exampleCommand("receive", arg...) + go func() { + pipe, err := cmd.StdoutPipe() + if err != nil { + errchan <- err + return + } + out := bufio.NewReader(pipe) + cmd.Start() + line, err := out.ReadString('\n') + if err != nil && err != io.EOF { + errchan <- err + return + } + listening := "receive: Listening\n" + if line != listening { + errchan <- checkEqual(listening, line) + return + } + errchan <- ready + buf := bytes.Buffer{} + io.Copy(&buf, out) // Collect the rest of the output + errchan <- checkEqual(want, buf.String()) + close(errchan) + }() + return cmd +} + +// Start receiver first, wait till it is running, then send. +func TestExampleReceiveSend(t *testing.T) { + if testing.Short() { + t.Skip("Skip demo tests in short mode") + } + testBroker.start() + recvErr := make(chan error) + recvCmd := goReceiveWant(recvErr, + "receive: Received 15 messages\n", + exampleArgs("-count", "15", "-verbose", "1")...) + defer func() { + recvCmd.Process.Kill() + recvCmd.Wait() + }() + if err := <-recvErr; err != ready { // Wait for receiver ready + t.Fatal(err) + } + err := runExampleWant( + "send: Received all 15 acknowledgements\n", + "send", + exampleArgs("-count", "5", "-verbose", "1")...) + if err != nil { + t.Fatal(err) + } + if err := <-recvErr; err != nil { + t.Fatal(err) + } +} + +func exepath(relative string) string { + if binDir == "" { + panic("bindir not set, cannot run example binaries") + } + return path.Join(binDir, relative) +} + +var testBroker *broker +var binDir, exampleDir string +var built map[string]bool + +func init() { + built = make(map[string]bool) +} + +func build(prog string) { + if !built[prog] { + build := exec.Command("go", "build", path.Join(exampleDir, prog)) + build.Dir = binDir + out, err := build.CombinedOutput() + if err != nil { + panic(fmt.Errorf("%v: %s", err, out)) + } + built[prog] = true + } +} + +func TestMain(m *testing.M) { + var err error + exampleDir, err = filepath.Abs("../../../../../../../examples/go") + panicIf(err) + binDir, err = ioutil.TempDir("", "example_test.go") + panicIf(err) + defer os.Remove(binDir) // Clean up binaries + testBroker = &broker{} // Broker is started on-demand by tests. + defer testBroker.stop() + os.Exit(m.Run()) +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0c11d11c/proton-c/bindings/go/src/qpid.apache.org/proton/messaging/handler.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/messaging/handler.go b/proton-c/bindings/go/src/qpid.apache.org/proton/messaging/handler.go new file mode 100644 index 0000000..e0d9d7e --- /dev/null +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/messaging/handler.go @@ -0,0 +1,70 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package messaging + +import ( + "qpid.apache.org/proton" + "qpid.apache.org/proton/event" +) + +// FIXME aconway 2015-04-28: cleanup - exposing delivery vs. disposition. + +type acksMap map[event.Delivery]chan Disposition +type receiverMap map[event.Link]chan proton.Message + +type handler struct { + connection *Connection + acks acksMap + receivers receiverMap +} + +func newHandler(c *Connection) *handler { + return &handler{c, make(acksMap), make(receiverMap)} +} + +func (h *handler) Handle(t event.MessagingEventType, e event.Event) error { + switch t { + // FIXME aconway 2015-04-29: handle errors. + case event.MConnectionClosed: + for _, ack := range h.acks { + // FIXME aconway 2015-04-29: communicate error info + close(ack) + } + + case event.MSettled: + ack := h.acks[e.Delivery()] + if ack != nil { + ack <- Disposition(e.Delivery().Remote().Type()) + close(ack) + delete(h.acks, e.Delivery()) + } + + case event.MMessage: + r := h.receivers[e.Link()] + if r != nil { + m, _ := event.DecodeMessage(e) + // FIXME aconway 2015-04-29: hack, direct send, possible blocking. + r <- m + } else { + // FIXME aconway 2015-04-29: Message with no receiver - log? panic? deadletter? drop? + } + } + return nil +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0c11d11c/proton-c/bindings/go/src/qpid.apache.org/proton/messaging/messaging.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/messaging/messaging.go b/proton-c/bindings/go/src/qpid.apache.org/proton/messaging/messaging.go new file mode 100644 index 0000000..d32aada --- /dev/null +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/messaging/messaging.go @@ -0,0 +1,250 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package messaging + +// #include <proton/disposition.h> +import "C" + +import ( + "net" + "qpid.apache.org/proton" + "qpid.apache.org/proton/event" +) + +// Connection is a connection to a remote AMQP endpoint. +// +// You can set exported fields to configure the connection before calling +// Connection.Open() +// +type Connection struct { + // Server = true means a the connection will do automatic protocol detection. + Server bool + + // FIXME aconway 2015-04-17: Other parameters to set up SSL, SASL etc. + + handler *handler + pump *event.Pump + session Session +} + +// Make an AMQP connection over a net.Conn connection. +// +// Use Connection.Close() to close the Connection, this will also close conn. +// Using conn.Close() directly will cause an abrupt disconnect rather than an +// orderly AMQP close. +// +func (c *Connection) Open(conn net.Conn) (err error) { + c.handler = newHandler(c) + c.pump, err = event.NewPump(conn, + event.NewMessagingDelegator(c.handler), + ) + if err != nil { + return err + } + if c.Server { + c.pump.Server() + } + go c.pump.Run() + return nil +} + +// Connect opens a default client connection. It is a shortcut for +// c := &Connection +// c.Open() +// +func Connect(conn net.Conn) (*Connection, error) { + c := &Connection{} + err := c.Open(conn) + return c, err +} + +// Close the connection. +// +// Connections must be closed to clean up resources and stop associated goroutines. +func (c *Connection) Close() error { return c.pump.Close() } + +// DefaultSession returns a default session for the connection. +// +// It is created on the first call to DefaultSession() and returned from all subsequent calls. +// Use Session() for more control over creating sessions. +// +func (c *Connection) DefaultSession() (s Session, err error) { + if c.session.e.IsNil() { + c.session, err = c.Session() + } + return c.session, err +} + +type sessionErr struct { + s event.Session + err error +} + +// Session creates a new session. +func (c *Connection) Session() (Session, error) { + connection := c.pump.Connection() + result := make(chan sessionErr) + c.pump.Inject <- func() { + s, err := connection.Session() + if err == nil { + s.Open() + } + result <- sessionErr{s, err} + } + se := <-result + return Session{se.s, c.pump}, se.err +} + +// FIXME aconway 2015-04-27: set sender name, options etc. + +// Sender creates a Sender that will send messages to the address addr. +func (c *Connection) Sender(addr string) (s Sender, err error) { + session, err := c.DefaultSession() + if err != nil { + return Sender{}, err + } + result := make(chan Sender) + c.pump.Inject <- func() { + link := session.e.Sender(linkNames.Next()) + if link.IsNil() { + err = session.e.Error() + } else { + link.Target().SetAddress(addr) + // FIXME aconway 2015-04-27: link options? + link.Open() + } + result <- Sender{Link{c, link}} + } + return <-result, err +} + +// Receiver returns a receiver that will receive messages sent to address addr. +func (c *Connection) Receiver(addr string) (r Receiver, err error) { + // FIXME aconway 2015-04-29: move code to session, in link.go? + session, err := c.DefaultSession() + if err != nil { + return Receiver{}, err + } + result := make(chan Receiver) + c.pump.Inject <- func() { + link := session.e.Receiver(linkNames.Next()) + if link.IsNil() { + err = session.e.Error() + } else { + link.Source().SetAddress(addr) + // FIXME aconway 2015-04-27: link options? + link.Open() + } + // FIXME aconway 2015-04-29: hack to avoid blocking, need proper buffering linked to flow control + rchan := make(chan proton.Message, 1000) + c.handler.receivers[link] = rchan + result <- Receiver{Link{c, link}, rchan} + } + return <-result, err +} + +// FIXME aconway 2015-04-29: counter per session. +var linkNames proton.UidCounter + +// Session is an AMQP session, it contains Senders and Receivers. +// Every Connection has a DefaultSession, you can create additional sessions +// with Connection.Session() +type Session struct { + e event.Session + pump *event.Pump +} + +// FIXME aconway 2015-05-05: REWORK Sender/receiver/session. + +// Disposition indicates the outcome of a settled message delivery. +type Disposition uint64 + +const ( + // Message was accepted by the receiver + Accepted Disposition = C.PN_ACCEPTED + // Message was rejected as invalid by the receiver + Rejected = C.PN_REJECTED + // Message was not processed by the receiver but may be processed by some other receiver. + Released = C.PN_RELEASED +) + +// String human readable name for a Disposition. +func (d Disposition) String() string { + switch d { + case Accepted: + return "Accepted" + case Rejected: + return "Rejected" + case Released: + return "Released" + default: + return "Unknown" + } +} + +// FIXME aconway 2015-04-29: How to signal errors via ack channels. + +// An Acknowledgement is a channel which will receive the Disposition of the message +// when it is acknowledged. The channel is closed after the disposition is sent. +type Acknowledgement <-chan Disposition + +// Link has common data and methods for Sender and Receiver links. +type Link struct { + connection *Connection + elink event.Link +} + +// Sender sends messages. +type Sender struct { + Link +} + +// FIXME aconway 2015-04-28: allow user to specify delivery tag. +// FIXME aconway 2015-04-28: should we provide a sending channel rather than a send function? + +// Send sends a message. If d is not nil, the disposition is retured on d. +// If d is nil the message is sent pre-settled and no disposition is returned. +func (s *Sender) Send(m proton.Message) (ack Acknowledgement, err error) { + ackChan := make(chan Disposition, 1) + ack = ackChan + s.connection.pump.Inject <- func() { + // FIXME aconway 2015-04-28: flow control & credit, buffer or fail? + delivery, err := s.elink.Send(m) + if err == nil { // FIXME aconway 2015-04-28: error handling + s.connection.handler.acks[delivery] = ackChan + } + } + return ack, nil +} + +// Close the sender. +func (s *Sender) Close() error { return nil } // FIXME aconway 2015-04-27: close/free + +// Receiver receives messages via the channel Receive. +type Receiver struct { + Link + // Channel of messag + Receive <-chan proton.Message +} + +// FIXME aconway 2015-04-29: settlement - ReceivedMessage with Settle() method? + +// Close the Receiver. +func (r *Receiver) Close() error { return nil } // FIXME aconway 2015-04-29: close/free http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0c11d11c/proton-c/bindings/go/src/qpid.apache.org/proton/uid.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/uid.go b/proton-c/bindings/go/src/qpid.apache.org/proton/uid.go new file mode 100644 index 0000000..de80846 --- /dev/null +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/uid.go @@ -0,0 +1,40 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +// Generating unique IDs for various things. + +package proton + +import ( + "strconv" + "sync/atomic" +) + +// A simple atomic counter to generate unique 64 bit IDs. +type UidCounter struct{ count uint64 } + +// NextInt gets the next uint64 value from the atomic counter. +func (uc *UidCounter) NextInt() uint64 { + return atomic.AddUint64(&uc.count, 1) +} + +// Next gets the next integer value encoded as a base32 string, safe for NUL terminated C strings. +func (uc *UidCounter) Next() string { + return strconv.FormatUint(uc.NextInt(), 32) +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0c11d11c/proton-c/bindings/go/src/qpid.apache.org/proton/unmarshal.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/unmarshal.go b/proton-c/bindings/go/src/qpid.apache.org/proton/unmarshal.go index af6c933..f904aae 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/unmarshal.go +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/unmarshal.go @@ -24,13 +24,35 @@ import "C" import ( "bytes" + "fmt" "io" + "qpid.apache.org/proton/internal" "reflect" "unsafe" ) const minDecode = 1024 +// Error returned if AMQP data cannot be unmarshaled as the desired Go type. +type BadUnmarshal struct { + // The name of the AMQP type. + AMQPType string + // The Go type. + GoType reflect.Type +} + +func newBadUnmarshal(pnType C.pn_type_t, v interface{}) *BadUnmarshal { + return &BadUnmarshal{pnTypeString(pnType), reflect.TypeOf(v)} +} + +func (e BadUnmarshal) Error() string { + if e.GoType.Kind() != reflect.Ptr { + return fmt.Sprintf("proton: cannot unmarshal to type %s, not a pointer", e.GoType) + } else { + return fmt.Sprintf("proton: cannot unmarshal AMQP %s to %s", e.AMQPType, e.GoType) + } +} + // // Decoding from a pn_data_t // @@ -67,7 +89,7 @@ func (d *Decoder) Buffered() io.Reader { // See the documentation for Unmarshal for details about the conversion of AMQP into a Go value. // func (d *Decoder) Decode(v interface{}) (err error) { - defer doRecover(&err) + defer internal.DoRecover(&err) data := C.pn_data(0) defer C.pn_data_free(data) var n int @@ -148,12 +170,12 @@ Maps: currently we cannot unmarshal AMQP maps with unhashable key types, need an representation for those. */ func Unmarshal(bytes []byte, v interface{}) (n int, err error) { - defer doRecover(&err) + defer internal.DoRecover(&err) data := C.pn_data(0) defer C.pn_data_free(data) n = unmarshal(data, bytes, v) if n == 0 { - err = errorf("not enough data") + err = internal.Errorf("not enough data") } return } @@ -524,7 +546,7 @@ func decode(data *C.pn_data_t, bytes []byte) int { C.pn_error_clear(C.pn_data_error(data)) return 0 } else if n <= 0 { - panic(errorf("unmarshal %s", pnErrorName(n))) + panic(internal.Errorf("unmarshal %s", internal.PnErrorCode(n))) } return n } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0c11d11c/proton-c/bindings/go/src/qpid.apache.org/proton/url.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/url.go b/proton-c/bindings/go/src/qpid.apache.org/proton/url.go index 72be720..5bac6ac 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/url.go +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/url.go @@ -35,6 +35,7 @@ import "C" import ( "net" "net/url" + "qpid.apache.org/proton/internal" "unsafe" ) @@ -53,7 +54,7 @@ func ParseURL(s string) (u *url.URL, err error) { defer C.free(unsafe.Pointer(cstr)) pnUrl := C.pn_url_parse(cstr) if pnUrl == nil { - return nil, errorf("bad URL %#v", s) + return nil, internal.Errorf("bad URL %#v", s) } defer C.pn_url_free(pnUrl) @@ -65,7 +66,7 @@ func ParseURL(s string) (u *url.URL, err error) { path := C.GoString(C.pn_url_get_path(pnUrl)) if err != nil { - return nil, errorf("bad URL %#v: %s", s, err) + return nil, internal.Errorf("bad URL %#v: %s", s, err) } if scheme == "" { scheme = amqp http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0c11d11c/proton-c/bindings/python/proton/handlers.py ---------------------------------------------------------------------- diff --git a/proton-c/bindings/python/proton/handlers.py b/proton-c/bindings/python/proton/handlers.py index 6836788..bbbdd1e 100644 --- a/proton-c/bindings/python/proton/handlers.py +++ b/proton-c/bindings/python/proton/handlers.py @@ -543,7 +543,7 @@ class CFlowController(WrappedHandler): def __init__(self, window=1024): WrappedHandler.__init__(self, lambda: pn_flowcontroller(window)) -class CHandshaker(WrappedHandler): +oclass CHandshaker(WrappedHandler): def __init__(self): WrappedHandler.__init__(self, pn_handshaker) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
