This is an automated email from the ASF dual-hosted git repository. astitcher pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
commit b19fbee7fc03d8e14791382416105fd4aed7ddb7 Author: Andrew Stitcher <[email protected]> AuthorDate: Thu Jun 12 18:54:41 2025 -0400 PROTON-2898: [C} Some examples using pn_proactor_import_socket There is an example for both client and server ends. --- c/examples/CMakeLists.txt | 8 + c/examples/direct-fd.c | 414 ++++++++++++++++++++++++++++++++++++++++++++++ c/examples/send-fd.c | 274 ++++++++++++++++++++++++++++++ 3 files changed, 696 insertions(+) diff --git a/c/examples/CMakeLists.txt b/c/examples/CMakeLists.txt index 7488a34d0..a1418fd1f 100644 --- a/c/examples/CMakeLists.txt +++ b/c/examples/CMakeLists.txt @@ -30,3 +30,11 @@ foreach (name broker send receive direct send-abort send-ssl raw_echo raw_connec set_target_properties(c-${name} PROPERTIES OUTPUT_NAME ${name}) endforeach() + +foreach (name send-fd direct-fd) + add_executable(c-${name} ${name}.c) + target_link_libraries(c-${name} Proton::core Proton::proactor Threads::Threads ${PLATFORM_LIBS}) + set_target_properties(c-${name} PROPERTIES + OUTPUT_NAME ${name} + C_EXTENSIONS ON) +endforeach() diff --git a/c/examples/direct-fd.c b/c/examples/direct-fd.c new file mode 100644 index 000000000..592a90aa6 --- /dev/null +++ b/c/examples/direct-fd.c @@ -0,0 +1,414 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <proton/condition.h> +#include <proton/connection.h> +#include <proton/delivery.h> +#include <proton/link.h> +#include <proton/listener.h> +#include <proton/netaddr.h> +#include <proton/message.h> +#include <proton/object.h> +#include <proton/proactor.h> +#include <proton/proactor_ext.h> +#include <proton/sasl.h> +#include <proton/session.h> +#include <proton/transport.h> + +#include <stdbool.h> +#include <stdio.h> +#include <stdlib.h> + +#ifdef _WIN32 +# include <winsock2.h> +# include <ws2tcpip.h> +# include <windows.h> +void perror_wsa(const char* message) { + static char error_msg[256]; // Buffer for the error message + DWORD error_code = WSAGetLastError(); + + int length = FormatMessage( + FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, + NULL, + error_code, + MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), + error_msg, + sizeof(error_msg), + NULL + ); + + if (length > 0) { + fprintf(stderr, "%s: %s\n", message, error_msg); + } + else { + fprintf(stderr, "%s: Unknown error code %ld\n", message, error_code); + } +} +# undef perror +# define perror perror_wsa +#else +# include <netdb.h> +# include <sys/socket.h> +# include <sys/types.h> +# include <unistd.h> +# define INVALID_SOCKET (-1) +# define SOCKET_ERROR (-1) +# define closesocket close +#endif // _WIN32 + +typedef struct app_data_t { + const char *host, *port; + const char *amqp_address; + const char *container_id; + int message_count; + + pn_proactor_t *proactor; + pn_listener_t *listener; + pn_rwbytes_t msgin, msgout; /* Buffers for incoming/outgoing messages */ + + /* Sender values */ + int sent; + int acknowledged; + pn_link_t *sender; + + /* Receiver values */ + int received; +} app_data_t; + +static const int BATCH = 1000; /* Batch size for unlimited receive */ + +static int exit_code = 0; + +/* Close the connection and the listener so so we will get a + * PN_PROACTOR_INACTIVE event and exit, once all outstanding events + * are processed. + */ +static void close_all(pn_connection_t *c, app_data_t *app) { + if (c) pn_connection_close(c); + if (app->listener) pn_listener_close(app->listener); +} + +static void check_condition(pn_event_t *e, pn_condition_t *cond, app_data_t *app) { + if (pn_condition_is_set(cond)) { + fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)), + pn_condition_get_name(cond), pn_condition_get_description(cond)); + close_all(pn_event_connection(e), app); + exit_code = 1; + } +} + +/* Create a message with a map { "sequence" : number } encode it and return the encoded buffer. */ +static void send_message(app_data_t *app, pn_link_t *sender) { + /* Construct a message with the map { "sequence": app.sent } */ + pn_message_t* message = pn_message(); + pn_data_t* body = pn_message_body(message); + pn_message_set_id(message, (pn_atom_t){.type=PN_ULONG, .u.as_ulong=app->sent}); + pn_data_put_map(body); + pn_data_enter(body); + pn_data_put_string(body, pn_bytes(sizeof("sequence")-1, "sequence")); + pn_data_put_int(body, app->sent); /* The sequence number */ + pn_data_exit(body); + if (pn_message_send(message, sender, &app->msgout) < 0) { + fprintf(stderr, "send error: %s\n", pn_error_text(pn_message_error(message))); + exit_code = 1; + } + pn_message_free(message); +} + +static void decode_message(pn_rwbytes_t data) { + pn_message_t *m = pn_message(); + int err = pn_message_decode(m, data.start, data.size); + if (!err) { + /* Print the decoded message */ + char *s = pn_tostring(pn_message_body(m)); + printf("%s\n", s); + fflush(stdout); + free(s); + pn_message_free(m); + free(data.start); + } else { + fprintf(stderr, "decode error: %s\n", pn_error_text(pn_message_error(m))); + exit_code = 1; + } +} + +/* This function handles events when we are acting as the receiver */ +static void handle_receive(app_data_t *app, pn_event_t* event) { + switch (pn_event_type(event)) { + + case PN_LINK_REMOTE_OPEN: { + pn_link_t *l = pn_event_link(event); + pn_link_open(l); + pn_link_flow(l, app->message_count ? app->message_count : BATCH); + } break; + + case PN_DELIVERY: { /* Incoming message data */ + pn_delivery_t *d = pn_event_delivery(event); + if (pn_delivery_readable(d)) { + pn_link_t *l = pn_delivery_link(d); + size_t size = pn_delivery_pending(d); + pn_rwbytes_t* m = &app->msgin; /* Append data to incoming message buffer */ + ssize_t recv; + m->size += size; + m->start = (char*)realloc(m->start, m->size); + recv = pn_link_recv(l, m->start, m->size); + if (recv == PN_ABORTED) { + printf("Message aborted\n"); + fflush(stdout); + m->size = 0; /* Forget the data we accumulated */ + pn_delivery_settle(d); /* Free the delivery so we can receive the next message */ + pn_link_flow(l, 1); /* Replace credit for aborted message */ + } else if (recv < 0 && recv != PN_EOS) { /* Unexpected error */ + pn_condition_format(pn_link_condition(l), "broker", "PN_DELIVERY error: %s", pn_code(recv)); + pn_link_close(l); /* Unexpected error, close the link */ + } else if (!pn_delivery_partial(d)) { /* Message is complete */ + decode_message(*m); + *m = pn_rwbytes_null; + pn_delivery_update(d, PN_ACCEPTED); + pn_delivery_settle(d); /* settle and free d */ + if (app->message_count == 0) { + /* receive forever - see if more credit is needed */ + if (pn_link_credit(l) < BATCH/2) { + pn_link_flow(l, BATCH - pn_link_credit(l)); + } + } else if (++app->received >= app->message_count) { + printf("%d messages received\n", app->received); + close_all(pn_event_connection(event), app); + } + } + } + break; + } + default: + break; + } +} + +/* This function handles events when we are acting as the sender */ +static void handle_send(app_data_t* app, pn_event_t* event) { + switch (pn_event_type(event)) { + + case PN_LINK_REMOTE_OPEN: { + pn_link_t* l = pn_event_link(event); + pn_terminus_set_address(pn_link_target(l), app->amqp_address); + pn_link_open(l); + } break; + + case PN_LINK_FLOW: { + /* The peer has given us some credit, now we can send messages */ + pn_link_t *sender = pn_event_link(event); + while (pn_link_credit(sender) > 0 && app->sent < app->message_count) { + ++app->sent; + /* Use sent counter as unique delivery tag. */ + pn_delivery(sender, pn_dtag((const char *)&app->sent, sizeof(app->sent))); + send_message(app, sender); + } + break; + } + + case PN_DELIVERY: { + /* We received acknowledgement from the peer that a message was delivered. */ + pn_delivery_t* d = pn_event_delivery(event); + if (pn_delivery_remote_state(d) == PN_ACCEPTED) { + if (++app->acknowledged == app->message_count) { + printf("%d messages sent and acknowledged\n", app->acknowledged); + close_all(pn_event_connection(event), app); + } + } + } break; + + default: + break; + } +} + +/* Handle all events, delegate to handle_send or handle_receive depending on link mode. + Return true to continue, false to exit +*/ +static bool handle(app_data_t* app, pn_event_t* event) { + switch (pn_event_type(event)) { + + case PN_LISTENER_OPEN: { + char port[256]; /* Get the listening port */ + pn_netaddr_host_port(pn_listener_addr(pn_event_listener(event)), NULL, 0, port, sizeof(port)); + printf("listening on %s\n", port); + fflush(stdout); + break; + } + case PN_LISTENER_ACCEPT: + pn_listener_accept2(pn_event_listener(event), NULL, NULL); + break; + + case PN_CONNECTION_INIT: + pn_connection_set_container(pn_event_connection(event), app->container_id); + break; + + case PN_CONNECTION_BOUND: { + /* Turn off security */ + pn_transport_t *t = pn_event_transport(event); + pn_transport_require_auth(t, false); + pn_sasl_allowed_mechs(pn_sasl(t), "ANONYMOUS"); + break; + } + case PN_CONNECTION_REMOTE_OPEN: { + pn_connection_open(pn_event_connection(event)); /* Complete the open */ + break; + } + + case PN_SESSION_REMOTE_OPEN: { + pn_session_open(pn_event_session(event)); + break; + } + + case PN_TRANSPORT_CLOSED: + check_condition(event, pn_transport_condition(pn_event_transport(event)), app); + break; + + case PN_CONNECTION_REMOTE_CLOSE: + check_condition(event, pn_connection_remote_condition(pn_event_connection(event)), app); + pn_connection_close(pn_event_connection(event)); /* Return the close */ + break; + + case PN_SESSION_REMOTE_CLOSE: + check_condition(event, pn_session_remote_condition(pn_event_session(event)), app); + pn_session_close(pn_event_session(event)); /* Return the close */ + pn_session_free(pn_event_session(event)); + break; + + case PN_LINK_REMOTE_CLOSE: + case PN_LINK_REMOTE_DETACH: + check_condition(event, pn_link_remote_condition(pn_event_link(event)), app); + pn_link_close(pn_event_link(event)); /* Return the close */ + pn_link_free(pn_event_link(event)); + break; + + case PN_PROACTOR_TIMEOUT: + /* Wake the sender's connection */ + pn_connection_wake(pn_session_connection(pn_link_session(app->sender))); + break; + + case PN_LISTENER_CLOSE: + app->listener = NULL; /* Listener is closed */ + check_condition(event, pn_listener_condition(pn_event_listener(event)), app); + break; + + case PN_PROACTOR_INACTIVE: + return false; + break; + + default: { + pn_link_t *l = pn_event_link(event); + if (l) { /* Only delegate link-related events */ + if (pn_link_is_sender(l)) { + handle_send(app, event); + } else { + handle_receive(app, event); + } + } + } + } + return exit_code == 0; +} + +void run(app_data_t *app) { + /* Loop and handle events */ + do { + pn_event_batch_t *events = pn_proactor_wait(app->proactor); + pn_event_t *e; + for (e = pn_event_batch_next(events); e; e = pn_event_batch_next(events)) { + if (!handle(app, e)) { + return; + } + } + pn_proactor_done(app->proactor, events); + } while(true); +} + +inline static bool check_error(int err, const char* message) { + if (err == SOCKET_ERROR) { + perror(message); + } + return err == SOCKET_ERROR; +} + +inline static bool check_socket_error(pn_socket_t err, const char* message) { + if (err == INVALID_SOCKET) { + perror(message); + } + return err == INVALID_SOCKET; +} + +inline static bool checked_gai(const char* node, const char* port, struct addrinfo* hints, struct addrinfo** ai) { + int err = getaddrinfo(node, port, hints, ai); + if (err != 0) { + printf("getaddrinfo: %s", gai_strerror(err)); + } + return err != 0; +} + +static pn_socket_t accept_socket(const char* host, const char* port) { + struct addrinfo *ai; + if (checked_gai(host, port, + &(struct addrinfo){.ai_family=AF_UNSPEC, .ai_socktype=SOCK_STREAM, .ai_flags=AI_PASSIVE | AI_ALL}, + &ai)) + return INVALID_SOCKET; + pn_socket_t l = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol); + if (check_socket_error(l, "socket")) goto error1; + int err = bind(l, ai->ai_addr, ai->ai_addrlen); + if (check_error(err, "bind")) goto error2; + err = listen(l, 8); + if (check_error(err, "listen")) goto error2; + pn_socket_t s = accept(l, NULL, NULL); + if (check_socket_error(s, "accept")) goto error2; + closesocket(l); + freeaddrinfo(ai); + return s; + +error2: + closesocket(l); +error1: + freeaddrinfo(ai); + return INVALID_SOCKET; +} + +int main(int argc, char **argv) { + struct app_data_t app = { + .container_id = argv[0], /* Should be unique */ + .host = (argc > 1) ? argv[1] : NULL, + .port = (argc > 2) ? argv[2] : "5672", + .amqp_address = (argc > 3) ? argv[3] : "examples", + .message_count = (argc > 4) ? atoi(argv[4]) : 10, + .proactor = pn_proactor()}; + + pn_socket_t s = accept_socket(app.host, app.port); + if (s==INVALID_SOCKET) goto error; + + pn_transport_t *t = pn_transport(); + pn_transport_set_server(t); + pn_proactor_import_socket(app.proactor, NULL, t, s); + run(&app); + +error: + pn_proactor_free(app.proactor); + free(app.msgout.start); + free(app.msgin.start); + return exit_code; +} diff --git a/c/examples/send-fd.c b/c/examples/send-fd.c new file mode 100644 index 000000000..e40fc0f80 --- /dev/null +++ b/c/examples/send-fd.c @@ -0,0 +1,274 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <proton/connection.h> +#include <proton/condition.h> +#include <proton/delivery.h> +#include <proton/link.h> +#include <proton/message.h> +#include <proton/proactor.h> +#include <proton/proactor_ext.h> +#include <proton/session.h> +#include <proton/transport.h> + +#include <stdbool.h> +#include <stdio.h> +#include <stdlib.h> + +#ifdef _WIN32 +# include <winsock2.h> +# include <ws2tcpip.h> +# include <windows.h> +void perror_wsa(const char* message) { + char error_msg[256]; // Buffer for the error message + DWORD error_code = WSAGetLastError(); + + int length = FormatMessage( + FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, + NULL, + error_code, + MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), + error_msg, + sizeof(error_msg), + NULL + ); + + if (length > 0) { + fprintf(stderr, "%s: %s\n", message, error_msg); + } + else { + fprintf(stderr, "%s: Unknown error code %ld\n", message, error_code); + } +} +# undef perror +# define perror perror_wsa +#else +# include <errno.h> +# include <netdb.h> +# include <sys/socket.h> +# include <sys/types.h> +# include <unistd.h> +# define INVALID_SOCKET (-1) +# define SOCKET_ERROR (-1) +# define closesocket close +#endif + +typedef struct app_data_t { + const char *host, *port; + const char *amqp_address; + const char *container_id; + int message_count; + + pn_proactor_t *proactor; + pn_message_t *message; + pn_rwbytes_t message_buffer; + int sent; + int acknowledged; +} app_data_t; + +static int exit_code = 0; + +static void check_condition(pn_event_t *e, pn_condition_t *cond) { + if (pn_condition_is_set(cond)) { + fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)), + pn_condition_get_name(cond), pn_condition_get_description(cond)); + pn_connection_close(pn_event_connection(e)); + exit_code = 1; + } +} + +/* Create a message with a map { "sequence" : number } encode it and return the encoded buffer. */ +static void send_message(app_data_t* app, pn_link_t *sender) { + /* Construct a message with the map { "sequence": app.sent } */ + pn_data_t* body; + pn_message_clear(app->message); + body = pn_message_body(app->message); + pn_message_set_id(app->message, (pn_atom_t){.type=PN_ULONG, .u.as_ulong=app->sent}); + pn_data_put_map(body); + pn_data_enter(body); + pn_data_put_string(body, pn_bytes(sizeof("sequence")-1, "sequence")); + pn_data_put_int(body, app->sent); /* The sequence number */ + pn_data_exit(body); + if (pn_message_send(app->message, sender, &app->message_buffer) < 0) { + fprintf(stderr, "error sending message: %s\n", pn_error_text(pn_message_error(app->message))); + exit(1); + } +} + +/* Returns true to continue, false if finished */ +static bool handle(app_data_t* app, pn_event_t* event) { + switch (pn_event_type(event)) { + + case PN_CONNECTION_INIT: { + pn_connection_t* c = pn_event_connection(event); + pn_session_t* s = pn_session(pn_event_connection(event)); + pn_connection_set_container(c, app->container_id); + pn_connection_open(c); + pn_session_open(s); + { + pn_link_t* l = pn_sender(s, "my_sender"); + pn_terminus_set_address(pn_link_target(l), app->amqp_address); + pn_link_open(l); + break; + } + } + + case PN_LINK_FLOW: { + /* The peer has given us some credit, now we can send messages */ + pn_link_t *sender = pn_event_link(event); + while (pn_link_credit(sender) > 0 && app->sent < app->message_count) { + ++app->sent; + /* Use sent counter as unique delivery tag. */ + pn_delivery(sender, pn_dtag((const char *)&app->sent, sizeof(app->sent))); + send_message(app, sender); + } + break; + } + + case PN_DELIVERY: { + /* We received acknowledgement from the peer that a message was delivered. */ + pn_delivery_t* d = pn_event_delivery(event); + if (pn_delivery_remote_state(d) == PN_ACCEPTED) { + pn_delivery_settle(d); + if (++app->acknowledged == app->message_count) { + printf("%d messages sent and acknowledged\n", app->acknowledged); + pn_connection_close(pn_event_connection(event)); + /* Continue handling events till we receive TRANSPORT_CLOSED */ + } + } else { + fprintf(stderr, "unexpected delivery state %d\n", (int)pn_delivery_remote_state(d)); + pn_connection_close(pn_event_connection(event)); + exit_code=1; + } + break; + } + + case PN_TRANSPORT_CLOSED: + check_condition(event, pn_transport_condition(pn_event_transport(event))); + break; + + case PN_CONNECTION_REMOTE_CLOSE: + check_condition(event, pn_connection_remote_condition(pn_event_connection(event))); + pn_connection_close(pn_event_connection(event)); + break; + + case PN_SESSION_REMOTE_CLOSE: + check_condition(event, pn_session_remote_condition(pn_event_session(event))); + pn_connection_close(pn_event_connection(event)); + break; + + case PN_LINK_REMOTE_CLOSE: + case PN_LINK_REMOTE_DETACH: + check_condition(event, pn_link_remote_condition(pn_event_link(event))); + pn_connection_close(pn_event_connection(event)); + break; + + case PN_PROACTOR_INACTIVE: + return false; + + default: break; + } + return true; +} + +void run(app_data_t *app) { + /* Loop and handle events */ + do { + pn_event_batch_t *events = pn_proactor_wait(app->proactor); + pn_event_t *e; + for (e = pn_event_batch_next(events); e; e = pn_event_batch_next(events)) { + if (!handle(app, e)) { + return; + } + } + pn_proactor_done(app->proactor, events); + } while(true); +} + +inline static bool check_error(int err, const char* message) { + if (err == SOCKET_ERROR) { + perror(message); + } + return err == SOCKET_ERROR; +} + +inline static bool check_socket_error(pn_socket_t err, const char* message) { + if (err == INVALID_SOCKET) { + perror(message); + } + return err == INVALID_SOCKET; +} + +inline static bool checked_gai(const char *node, const char *port, struct addrinfo *hints, struct addrinfo **ai) { + int err = getaddrinfo(node, port, hints, ai); + if (err != 0) { + printf("getaddrinfo: %s", gai_strerror(err)); + } + return err != 0; +} + +static pn_socket_t connect_socket(const char *host, const char *port) { + struct addrinfo *ai; + if (checked_gai(host, port, &(struct addrinfo){.ai_family = AF_UNSPEC, .ai_socktype = SOCK_STREAM}, &ai)) return INVALID_SOCKET; + struct addrinfo *ai_ = ai; + while (true) { + pn_socket_t s = socket(ai_->ai_family, ai_->ai_socktype, ai_->ai_protocol); + if (s != INVALID_SOCKET) { + int err = connect(s, ai_->ai_addr, ai_->ai_addrlen); + if (err == 0) { + freeaddrinfo(ai); + return s; + } + } + ai_ = ai_->ai_next; + if (ai_) { + closesocket(s); + continue; + } + perror("connect"); + closesocket(s); + freeaddrinfo(ai); + return INVALID_SOCKET; + } +} + +int main(int argc, char **argv) { + struct app_data_t app = { + .container_id = argv[0], /* Should be unique */ + .host = (argc > 1) ? argv[1] : NULL, + .port = (argc > 2) ? argv[2] : "5672", + .amqp_address = (argc > 3) ? argv[3] : "examples", + .message_count = (argc > 4) ? atoi(argv[4]) : 10, + .message = pn_message(), + .proactor = pn_proactor()}; + + pn_socket_t s = connect_socket(app.host, app.port); + if (s == INVALID_SOCKET) goto error; + + pn_proactor_import_socket(app.proactor, NULL, NULL, s); + run(&app); + +error: + pn_proactor_free(app.proactor); + free(app.message_buffer.start); + pn_message_free(app.message); + return exit_code; +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
