http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/examples/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/c/examples/CMakeLists.txt b/c/examples/CMakeLists.txt new file mode 100644 index 0000000..6f732a6 --- /dev/null +++ b/c/examples/CMakeLists.txt @@ -0,0 +1,64 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +cmake_minimum_required (VERSION 2.8.12) + +set (Proton_DIR ${CMAKE_CURRENT_SOURCE_DIR}) +find_package(Proton REQUIRED Core Proactor) +set(CMAKE_THREAD_PREFER_PTHREAD TRUE) +find_package(Threads REQUIRED) + +include_directories(${Proton_INCLUDE_DIRS}) +add_definitions(${Proton_DEFINITIONS}) + +foreach (name broker send receive direct send-abort send-ssl) + add_executable(c-${name} ${name}.c) + target_link_libraries(c-${name} ${Proton_Proactor_LIBRARIES} ${Proton_Core_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT}) + set_target_properties(c-${name} PROPERTIES + OUTPUT_NAME ${name}) +endforeach() + + +# Add a test to run all examples + +# Make correct environment to find test executables and valgrind. +if(WIN32) + set(test_path "$<TARGET_FILE_DIR:c-broker>;$<TARGET_FILE_DIR:qpid-proton-core>;$<TARGET_FILE_DIR:qpid-proton-proactor>") +else() + set(test_path "$<TARGET_FILE_DIR:c-broker>:$ENV{PATH}") +endif() + +# Set result to a native search path - used by examples and binding tests. +# args after result are directories or search paths. +macro(set_search_path result) + set(${result} ${ARGN}) + if (UNIX) + string(REPLACE ";" ":" ${result} "${${result}}") # native search path separators. + endif() + file(TO_NATIVE_PATH "${${result}}" ${result}) # native slash separators +endmacro() + +# Add the tools directory for the 'proctest' module +set_search_path(EXAMPLE_PYTHONPATH "${CMAKE_SOURCE_DIR}/tools/python" "$ENV{PYTHON_PATH}") +set(EXAMPLE_ENV "PYTHONPATH=${EXAMPLE_PYTHONPATH}") + +add_test( + NAME c-example-tests + COMMAND ${PN_ENV_SCRIPT} ${EXAMPLE_ENV} "PATH=${test_path}" ${VALGRIND_ENV} -- + ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/example_test.py -v + WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR})
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/examples/ProtonConfig.cmake ---------------------------------------------------------------------- diff --git a/c/examples/ProtonConfig.cmake b/c/examples/ProtonConfig.cmake new file mode 100644 index 0000000..2343e24 --- /dev/null +++ b/c/examples/ProtonConfig.cmake @@ -0,0 +1,57 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Note that this file is used *only* when building the examples within +# the proton source tree not when the examples are installed separately +# from it (for example in an OS distribution package). +# +# So if you find this file installed on your system something went wrong +# with the packaging and/or package installation. +# +# For a packaged installation the equivalent file is created by the source +# tree build and installed in the appropriate place for cmake on that system. + +set (Proton_VERSION ${PN_VERSION}) + +set (Proton_INCLUDE_DIRS ${CMAKE_SOURCE_DIR}/c/include) +set (Proton_LIBRARIES ${C_EXAMPLE_LINK_FLAGS} qpid-proton) +set (Proton_DEFINITIONS ${C_EXAMPLE_FLAGS}) +set (Proton_FOUND True) + +set (Proton_Core_INCLUDE_DIRS ${CMAKE_SOURCE_DIR}/c/include) +set (Proton_Core_LIBRARIES ${C_EXAMPLE_LINK_FLAGS} qpid-proton-core) +set (Proton_Core_DEFINITIONS ${C_EXAMPLE_FLAGS}) +set (Proton_Core_FOUND True) + +if (${HAS_PROACTOR}) + set (Proton_Proactor_INCLUDE_DIRS ${CMAKE_SOURCE_DIR}/c/include) + set (Proton_Proactor_LIBRARIES ${C_EXAMPLE_LINK_FLAGS} qpid-proton-proactor) + set (Proton_Proactor_DEFINITIONS ${C_EXAMPLE_FLAGS}) + set (Proton_Proactor_FOUND True) +endif() + +# Check for all required components +foreach(comp ${Proton_FIND_COMPONENTS}) + if(NOT Proton_${comp}_FOUND) + if(Proton_FIND_REQUIRED_${comp}) + set(Proton_FOUND FALSE) + set(Proton_NOT_FOUND_MESSAGE "Didn't find required component ${comp}") + endif() + endif() +endforeach() http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/examples/README.dox ---------------------------------------------------------------------- diff --git a/c/examples/README.dox b/c/examples/README.dox new file mode 100644 index 0000000..a548d35 --- /dev/null +++ b/c/examples/README.dox @@ -0,0 +1,21 @@ +/** + * @example send.c + * + * Send a fixed number of messages to the "examples" node. + * Can be used with @ref broker.c, @ref direct.c or an external AMQP broker. + * + * @example receive.c + * + * Subscribes to the 'example' node and prints the message bodies received. + * Can be used with @ref broker.c, @ref direct.c or an external AMQP broker. + * + * @example direct.c + * + * A server that can be used to demonstrate direct (no broker) peer-to-peer communication + * It can accept an incoming connection from either the @ref send.c or @ref receive.c examples + * and will act as the directly-connected counterpart (receive or send) + * + * @example broker.c + * + * A simple multithreaded broker that works with the @ref send.c and @ref receive.c examples. + */ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/examples/broker.c ---------------------------------------------------------------------- diff --git a/c/examples/broker.c b/c/examples/broker.c new file mode 100644 index 0000000..852fa3a --- /dev/null +++ b/c/examples/broker.c @@ -0,0 +1,479 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "thread.h" + +#include <proton/engine.h> +#include <proton/listener.h> +#include <proton/netaddr.h> +#include <proton/proactor.h> +#include <proton/sasl.h> +#include <proton/ssl.h> +#include <proton/transport.h> + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + +/* The ssl-certs subdir must be in the current directory for an ssl-enabled broker */ +#define SSL_FILE(NAME) "ssl-certs/" NAME +#define SSL_PW "tserverpw" +/* Windows vs. OpenSSL certificates */ +#if defined(_WIN32) +# define CERTIFICATE(NAME) SSL_FILE(NAME "-certificate.p12") +# define SET_CREDENTIALS(DOMAIN, NAME) \ + pn_ssl_domain_set_credentials(DOMAIN, SSL_FILE(NAME "-full.p12"), "", SSL_PW) +#else +# define CERTIFICATE(NAME) SSL_FILE(NAME "-certificate.pem") +# define SET_CREDENTIALS(DOMAIN, NAME) \ + pn_ssl_domain_set_credentials(DOMAIN, CERTIFICATE(NAME), SSL_FILE(NAME "-private-key.pem"), SSL_PW) +#endif + +/* Simple re-sizable vector that acts as a queue */ +#define VEC(T) struct { T* data; size_t len, cap; } + +#define VEC_INIT(V) \ + do { \ + void **vp = (void**)&V.data; \ + V.len = 0; \ + V.cap = 16; \ + *vp = malloc(V.cap * sizeof(*V.data)); \ + } while(0) + +#define VEC_FINAL(V) free(V.data) + +#define VEC_PUSH(V, X) \ + do { \ + if (V.len == V.cap) { \ + void **vp = (void**)&V.data; \ + V.cap *= 2; \ + *vp = realloc(V.data, V.cap * sizeof(*V.data)); \ + } \ + V.data[V.len++] = X; \ + } while(0) \ + +#define VEC_POP(V) \ + do { \ + if (V.len > 0) \ + memmove(V.data, V.data+1, (--V.len)*sizeof(*V.data)); \ + } while(0) + +/* Simple thread-safe queue implementation */ +typedef struct queue_t { + pthread_mutex_t lock; + char *name; + VEC(pn_rwbytes_t) messages; /* Messages on the queue_t */ + VEC(pn_connection_t*) waiting; /* Connections waiting to send messages from this queue */ + struct queue_t *next; /* Next queue in chain */ + size_t sent; /* Count of messages sent, used as delivery tag */ +} queue_t; + +static void queue_init(queue_t *q, const char* name, queue_t *next) { + pthread_mutex_init(&q->lock, NULL); + q->name = (char*)malloc(strlen(name)+1); + memcpy(q->name, name, strlen(name)+1); + VEC_INIT(q->messages); + VEC_INIT(q->waiting); + q->next = next; + q->sent = 0; +} + +static void queue_destroy(queue_t *q) { + size_t i; + pthread_mutex_destroy(&q->lock); + for (i = 0; i < q->messages.len; ++i) + free(q->messages.data[i].start); + VEC_FINAL(q->messages); + for (i = 0; i < q->waiting.len; ++i) + pn_decref(q->waiting.data[i]); + VEC_FINAL(q->waiting); + free(q->name); +} + +/* Send a message on s, or record s as waiting if there are no messages to send. + Called in s dispatch loop, assumes s has credit. +*/ +static void queue_send(queue_t *q, pn_link_t *s) { + pn_rwbytes_t m = { 0 }; + size_t tag = 0; + pthread_mutex_lock(&q->lock); + if (q->messages.len == 0) { /* Empty, record connection as waiting */ + /* Record connection for wake-up if not already on the list. */ + pn_connection_t *c = pn_session_connection(pn_link_session(s)); + size_t i = 0; + for (; i < q->waiting.len && q->waiting.data[i] != c; ++i) + ; + if (i == q->waiting.len) { + VEC_PUSH(q->waiting, c); + } + } else { + m = q->messages.data[0]; + VEC_POP(q->messages); + tag = ++q->sent; + } + pthread_mutex_unlock(&q->lock); + if (m.start) { + pn_delivery_t *d = pn_delivery(s, pn_dtag((char*)&tag, sizeof(tag))); + pn_link_send(s, m.start, m.size); + pn_link_advance(s); + pn_delivery_settle(d); /* Pre-settled: unreliable, there will be no ack/ */ + free(m.start); + } +} + +/* Use the connection context pointer as a boolean flag to indicate we need to check queues */ +void set_check_queues(pn_connection_t *c, bool check) { + pn_connection_set_context(c, (void*)check); +} + +bool get_check_queues(pn_connection_t *c) { + return (bool)pn_connection_get_context(c); +} + +/* Use a buffer per link to accumulate message data - message can arrive in multiple deliveries, + and the broker can receive messages on many concurrently. */ +pn_rwbytes_t *message_buffer(pn_link_t *l) { + if (!pn_link_get_context(l)) { + pn_link_set_context(l, calloc(1, sizeof(pn_rwbytes_t))); + } + return (pn_rwbytes_t*)pn_link_get_context(l); +} + +/* Put a message on the queue, called in receiver dispatch loop. + If the queue was previously empty, notify waiting senders. +*/ +static void queue_receive(pn_proactor_t *d, queue_t *q, pn_rwbytes_t m) { + pthread_mutex_lock(&q->lock); + VEC_PUSH(q->messages, m); + if (q->messages.len == 1) { /* Was empty, notify waiting connections */ + size_t i; + for (i = 0; i < q->waiting.len; ++i) { + pn_connection_t *c = q->waiting.data[i]; + set_check_queues(c, true); + pn_connection_wake(c); /* Wake the connection */ + } + q->waiting.len = 0; + } + pthread_mutex_unlock(&q->lock); +} + +/* Thread safe set of queues */ +typedef struct queues_t { + pthread_mutex_t lock; + queue_t *queues; + size_t sent; +} queues_t; + +void queues_init(queues_t *qs) { + pthread_mutex_init(&qs->lock, NULL); + qs->queues = NULL; +} + +void queues_destroy(queues_t *qs) { + while (qs->queues) { + queue_t *q = qs->queues; + qs->queues = qs->queues->next; + queue_destroy(q); + free(q); + } + pthread_mutex_destroy(&qs->lock); +} + +/** Get or create the named queue. */ +queue_t* queues_get(queues_t *qs, const char* name) { + queue_t *q; + pthread_mutex_lock(&qs->lock); + for (q = qs->queues; q && strcmp(q->name, name) != 0; q = q->next) + ; + if (!q) { + q = (queue_t*)malloc(sizeof(queue_t)); + queue_init(q, name, qs->queues); + qs->queues = q; + } + pthread_mutex_unlock(&qs->lock); + return q; +} + +/* The broker implementation */ +typedef struct broker_t { + pn_proactor_t *proactor; + size_t threads; + const char *container_id; /* AMQP container-id */ + queues_t queues; + bool finished; + pn_ssl_domain_t *ssl_domain; +} broker_t; + +void broker_stop(broker_t *b) { + /* Interrupt the proactor to stop the working threads. */ + pn_proactor_interrupt(b->proactor); +} + +/* Try to send if link is sender and has credit */ +static void link_send(broker_t *b, pn_link_t *s) { + if (pn_link_is_sender(s) && pn_link_credit(s) > 0) { + const char *qname = pn_terminus_get_address(pn_link_source(s)); + queue_t *q = queues_get(&b->queues, qname); + queue_send(q, s); + } +} + +static void queue_unsub(queue_t *q, pn_connection_t *c) { + size_t i; + pthread_mutex_lock(&q->lock); + for (i = 0; i < q->waiting.len; ++i) { + if (q->waiting.data[i] == c){ + q->waiting.data[i] = q->waiting.data[0]; /* save old [0] */ + VEC_POP(q->waiting); + break; + } + } + pthread_mutex_unlock(&q->lock); +} + +/* Unsubscribe from the queue of interest to this link. */ +static void link_unsub(broker_t *b, pn_link_t *s) { + if (pn_link_is_sender(s)) { + const char *qname = pn_terminus_get_address(pn_link_source(s)); + if (qname) { + queue_t *q = queues_get(&b->queues, qname); + queue_unsub(q, pn_session_connection(pn_link_session(s))); + } + } +} + +/* Called in connection's event loop when a connection is woken for messages.*/ +static void connection_unsub(broker_t *b, pn_connection_t *c) { + pn_link_t *l; + for (l = pn_link_head(c, 0); l != NULL; l = pn_link_next(l, 0)) + link_unsub(b, l); +} + +static void session_unsub(broker_t *b, pn_session_t *ssn) { + pn_connection_t *c = pn_session_connection(ssn); + pn_link_t *l; + for (l = pn_link_head(c, 0); l != NULL; l = pn_link_next(l, 0)) { + if (pn_link_session(l) == ssn) + link_unsub(b, l); + } +} + +static void check_condition(pn_event_t *e, pn_condition_t *cond) { + if (pn_condition_is_set(cond)) { + fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)), + pn_condition_get_name(cond), pn_condition_get_description(cond)); + } +} + +const int WINDOW=5; /* Very small incoming credit window, to show flow control in action */ + +static void handle(broker_t* b, pn_event_t* e) { + pn_connection_t *c = pn_event_connection(e); + + switch (pn_event_type(e)) { + + case PN_LISTENER_OPEN: { + char port[256]; /* Get the listening port */ + pn_netaddr_host_port(pn_listener_addr(pn_event_listener(e)), NULL, 0, port, sizeof(port)); + printf("listening on %s\n", port); + fflush(stdout); + break; + } + case PN_LISTENER_ACCEPT: { + /* Configure a transport to allow SSL and SASL connections. See ssl_domain setup in main() */ + pn_transport_t *t = pn_transport(); + pn_transport_require_auth(t, false); + pn_sasl_allowed_mechs(pn_sasl(t), "ANONYMOUS"); + if (b->ssl_domain) { + pn_ssl_init(pn_ssl(t), b->ssl_domain, NULL); + } + pn_listener_accept2(pn_event_listener(e), NULL, t); + break; + } + case PN_CONNECTION_INIT: + pn_connection_set_container(c, b->container_id); + break; + + case PN_CONNECTION_REMOTE_OPEN: { + pn_connection_open(pn_event_connection(e)); /* Complete the open */ + break; + } + case PN_CONNECTION_WAKE: { + if (get_check_queues(c)) { + int flags = PN_LOCAL_ACTIVE&PN_REMOTE_ACTIVE; + pn_link_t *l; + set_check_queues(c, false); + for (l = pn_link_head(c, flags); l != NULL; l = pn_link_next(l, flags)) + link_send(b, l); + } + break; + } + case PN_SESSION_REMOTE_OPEN: { + pn_session_open(pn_event_session(e)); + break; + } + case PN_LINK_REMOTE_OPEN: { + pn_link_t *l = pn_event_link(e); + if (pn_link_is_sender(l)) { + const char *source = pn_terminus_get_address(pn_link_remote_source(l)); + pn_terminus_set_address(pn_link_source(l), source); + } else { + const char* target = pn_terminus_get_address(pn_link_remote_target(l)); + pn_terminus_set_address(pn_link_target(l), target); + pn_link_flow(l, WINDOW); + } + pn_link_open(l); + break; + } + case PN_LINK_FLOW: { + link_send(b, pn_event_link(e)); + break; + } + case PN_LINK_FINAL: { + pn_rwbytes_t *buf = (pn_rwbytes_t*)pn_link_get_context(pn_event_link(e)); + if (buf) { + free(buf->start); + free(buf); + } + break; + } + case PN_DELIVERY: { /* Incoming message data */ + pn_delivery_t *d = pn_event_delivery(e); + if (pn_delivery_readable(d)) { + pn_link_t *l = pn_delivery_link(d); + size_t size = pn_delivery_pending(d); + pn_rwbytes_t* m = message_buffer(l); /* Append data to incoming message buffer */ + ssize_t recv; + m->size += size; + m->start = (char*)realloc(m->start, m->size); + recv = pn_link_recv(l, m->start, m->size); + if (recv == PN_ABORTED) { /* */ + fprintf(stderr, "Message aborted\n"); + fflush(stderr); + m->size = 0; /* Forget the data we accumulated */ + pn_delivery_settle(d); /* Free the delivery so we can receive the next message */ + pn_link_flow(l, WINDOW - pn_link_credit(l)); /* Replace credit for the aborted message */ + } else if (recv < 0 && recv != PN_EOS) { /* Unexpected error */ + pn_condition_format(pn_link_condition(l), "broker", "PN_DELIVERY error: %s", pn_code((int)recv)); + pn_link_close(l); /* Unexpected error, close the link */ + } else if (!pn_delivery_partial(d)) { /* Message is complete */ + const char *qname = pn_terminus_get_address(pn_link_target(l)); + queue_receive(b->proactor, queues_get(&b->queues, qname), *m); + *m = pn_rwbytes_null; /* Reset the buffer for the next message*/ + pn_delivery_update(d, PN_ACCEPTED); + pn_delivery_settle(d); + pn_link_flow(l, WINDOW - pn_link_credit(l)); + } + } + break; + } + + case PN_TRANSPORT_CLOSED: + check_condition(e, pn_transport_condition(pn_event_transport(e))); + connection_unsub(b, pn_event_connection(e)); + break; + + case PN_CONNECTION_REMOTE_CLOSE: + check_condition(e, pn_connection_remote_condition(pn_event_connection(e))); + pn_connection_close(pn_event_connection(e)); + break; + + case PN_SESSION_REMOTE_CLOSE: + check_condition(e, pn_session_remote_condition(pn_event_session(e))); + session_unsub(b, pn_event_session(e)); + pn_session_close(pn_event_session(e)); + pn_session_free(pn_event_session(e)); + break; + + case PN_LINK_REMOTE_CLOSE: + check_condition(e, pn_link_remote_condition(pn_event_link(e))); + link_unsub(b, pn_event_link(e)); + pn_link_close(pn_event_link(e)); + pn_link_free(pn_event_link(e)); + break; + + case PN_LISTENER_CLOSE: + check_condition(e, pn_listener_condition(pn_event_listener(e))); + broker_stop(b); + break; + + case PN_PROACTOR_INACTIVE: /* listener and all connections closed */ + broker_stop(b); + break; + + case PN_PROACTOR_INTERRUPT: + b->finished = true; + pn_proactor_interrupt(b->proactor); /* Pass along the interrupt to the other threads */ + break; + + default: + break; + } +} + +static void* broker_thread(void *void_broker) { + broker_t *b = (broker_t*)void_broker; + do { + pn_event_batch_t *events = pn_proactor_wait(b->proactor); + pn_event_t *e; + while ((e = pn_event_batch_next(events))) { + handle(b, e); + } + pn_proactor_done(b->proactor, events); + } while(!b->finished); + return NULL; +} + +int main(int argc, char **argv) { + const char *host = (argc > 1) ? argv[1] : ""; + const char *port = (argc > 2) ? argv[2] : "amqp"; + + broker_t b = {0}; + b.proactor = pn_proactor(); + queues_init(&b.queues); + b.container_id = argv[0]; + b.threads = 4; + b.ssl_domain = pn_ssl_domain(PN_SSL_MODE_SERVER); + SET_CREDENTIALS(b.ssl_domain, "tserver"); + pn_ssl_domain_allow_unsecured_client(b.ssl_domain); /* Allow SSL and plain connections */ + { + /* Listen on addr */ + char addr[PN_MAX_ADDR]; + pn_proactor_addr(addr, sizeof(addr), host, port); + pn_proactor_listen(b.proactor, pn_listener(), addr, 16); + } + + { + /* Start n-1 threads */ + pthread_t* threads = (pthread_t*)calloc(sizeof(pthread_t), b.threads); + size_t i; + for (i = 0; i < b.threads-1; ++i) { + pthread_create(&threads[i], NULL, broker_thread, &b); + } + broker_thread(&b); /* Use the main thread too. */ + /* Join the other threads */ + for (i = 0; i < b.threads-1; ++i) { + pthread_join(threads[i], NULL); + } + pn_proactor_free(b.proactor); + free(threads); + pn_ssl_domain_free(b.ssl_domain); + return 0; + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/examples/direct.c ---------------------------------------------------------------------- diff --git a/c/examples/direct.c b/c/examples/direct.c new file mode 100644 index 0000000..6d8642c --- /dev/null +++ b/c/examples/direct.c @@ -0,0 +1,347 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <proton/condition.h> +#include <proton/connection.h> +#include <proton/delivery.h> +#include <proton/link.h> +#include <proton/listener.h> +#include <proton/netaddr.h> +#include <proton/message.h> +#include <proton/proactor.h> +#include <proton/sasl.h> +#include <proton/session.h> +#include <proton/transport.h> + +#include <stdio.h> +#include <stdlib.h> + +typedef struct app_data_t { + const char *host, *port; + const char *amqp_address; + const char *container_id; + int message_count; + + pn_proactor_t *proactor; + pn_listener_t *listener; + pn_rwbytes_t msgin, msgout; /* Buffers for incoming/outgoing messages */ + + /* Sender values */ + int sent; + int acknowledged; + pn_link_t *sender; + + /* Receiver values */ + int received; +} app_data_t; + +static const int BATCH = 1000; /* Batch size for unlimited receive */ + +static int exit_code = 0; + +/* Close the connection and the listener so so we will get a + * PN_PROACTOR_INACTIVE event and exit, once all outstanding events + * are processed. + */ +static void close_all(pn_connection_t *c, app_data_t *app) { + if (c) pn_connection_close(c); + if (app->listener) pn_listener_close(app->listener); +} + +static void check_condition(pn_event_t *e, pn_condition_t *cond, app_data_t *app) { + if (pn_condition_is_set(cond)) { + fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)), + pn_condition_get_name(cond), pn_condition_get_description(cond)); + close_all(pn_event_connection(e), app); + exit_code = 1; + } +} + +/* Create a message with a map { "sequence" : number } encode it and return the encoded buffer. */ +static pn_bytes_t encode_message(app_data_t* app) { + /* Construct a message with the map { "sequence": app.sent } */ + pn_message_t* message = pn_message(); + pn_data_t* body = pn_message_body(message); + pn_data_put_int(pn_message_id(message), app->sent); /* Set the message_id also */ + pn_data_put_map(body); + pn_data_enter(body); + pn_data_put_string(body, pn_bytes(sizeof("sequence")-1, "sequence")); + pn_data_put_int(body, app->sent); /* The sequence number */ + pn_data_exit(body); + + /* encode the message, expanding the encode buffer as needed */ + if (app->msgout.start == NULL) { + static const size_t initial_size = 128; + app->msgout = pn_rwbytes(initial_size, (char*)malloc(initial_size)); + } + /* app->msgout is the total buffer space available. */ + /* mbuf wil point at just the portion used by the encoded message */ + { + pn_rwbytes_t mbuf = pn_rwbytes(app->msgout.size, app->msgout.start); + int status = 0; + while ((status = pn_message_encode(message, mbuf.start, &mbuf.size)) == PN_OVERFLOW) { + app->msgout.size *= 2; + app->msgout.start = (char*)realloc(app->msgout.start, app->msgout.size); + mbuf.size = app->msgout.size; + } + if (status != 0) { + fprintf(stderr, "error encoding message: %s\n", pn_error_text(pn_message_error(message))); + exit(1); + } + pn_message_free(message); + return pn_bytes(mbuf.size, mbuf.start); + } +} + +static void decode_message(pn_rwbytes_t data) { + pn_message_t *m = pn_message(); + int err = pn_message_decode(m, data.start, data.size); + if (!err) { + /* Print the decoded message */ + pn_string_t *s = pn_string(NULL); + pn_inspect(pn_message_body(m), s); + printf("%s\n", pn_string_get(s)); + fflush(stdout); + pn_free(s); + pn_message_free(m); + free(data.start); + } else { + fprintf(stderr, "decode_message: %s\n", pn_code(err)); + exit_code = 1; + } +} + +/* This function handles events when we are acting as the receiver */ +static void handle_receive(app_data_t *app, pn_event_t* event) { + switch (pn_event_type(event)) { + + case PN_LINK_REMOTE_OPEN: { + pn_link_t *l = pn_event_link(event); + pn_link_open(l); + pn_link_flow(l, app->message_count ? app->message_count : BATCH); + } break; + + case PN_DELIVERY: { /* Incoming message data */ + pn_delivery_t *d = pn_event_delivery(event); + if (pn_delivery_readable(d)) { + pn_link_t *l = pn_delivery_link(d); + size_t size = pn_delivery_pending(d); + pn_rwbytes_t* m = &app->msgin; /* Append data to incoming message buffer */ + ssize_t recv; + m->size += size; + m->start = (char*)realloc(m->start, m->size); + recv = pn_link_recv(l, m->start, m->size); + if (recv == PN_ABORTED) { + fprintf(stderr, "Message aborted\n"); + fflush(stderr); + m->size = 0; /* Forget the data we accumulated */ + pn_delivery_settle(d); /* Free the delivery so we can receive the next message */ + pn_link_flow(l, 1); /* Replace credit for aborted message */ + } else if (recv < 0 && recv != PN_EOS) { /* Unexpected error */ + pn_condition_format(pn_link_condition(l), "broker", "PN_DELIVERY error: %s", pn_code(recv)); + pn_link_close(l); /* Unexpected error, close the link */ + } else if (!pn_delivery_partial(d)) { /* Message is complete */ + decode_message(*m); + *m = pn_rwbytes_null; + pn_delivery_update(d, PN_ACCEPTED); + pn_delivery_settle(d); /* settle and free d */ + if (app->message_count == 0) { + /* receive forever - see if more credit is needed */ + if (pn_link_credit(l) < BATCH/2) { + pn_link_flow(l, BATCH - pn_link_credit(l)); + } + } else if (++app->received >= app->message_count) { + printf("%d messages received\n", app->received); + close_all(pn_event_connection(event), app); + } + } + } + break; + } + default: + break; + } +} + +/* This function handles events when we are acting as the sender */ +static void handle_send(app_data_t* app, pn_event_t* event) { + switch (pn_event_type(event)) { + + case PN_LINK_REMOTE_OPEN: { + pn_link_t* l = pn_event_link(event); + pn_terminus_set_address(pn_link_target(l), app->amqp_address); + pn_link_open(l); + } break; + + case PN_LINK_FLOW: { + /* The peer has given us some credit, now we can send messages */ + pn_link_t *sender = pn_event_link(event); + while (pn_link_credit(sender) > 0 && app->sent < app->message_count) { + ++app->sent; + /* Use sent counter as unique delivery tag. */ + pn_delivery(sender, pn_dtag((const char *)&app->sent, sizeof(app->sent))); + { + pn_bytes_t msgbuf = encode_message(app); + pn_link_send(sender, msgbuf.start, msgbuf.size); + pn_link_advance(sender); + } + } + break; + } + + case PN_DELIVERY: { + /* We received acknowledgement from the peer that a message was delivered. */ + pn_delivery_t* d = pn_event_delivery(event); + if (pn_delivery_remote_state(d) == PN_ACCEPTED) { + if (++app->acknowledged == app->message_count) { + printf("%d messages sent and acknowledged\n", app->acknowledged); + close_all(pn_event_connection(event), app); + } + } + } break; + + default: + break; + } +} + +/* Handle all events, delegate to handle_send or handle_receive depending on link mode. + Return true to continue, false to exit +*/ +static bool handle(app_data_t* app, pn_event_t* event) { + switch (pn_event_type(event)) { + + case PN_LISTENER_OPEN: { + char port[256]; /* Get the listening port */ + pn_netaddr_host_port(pn_listener_addr(pn_event_listener(event)), NULL, 0, port, sizeof(port)); + printf("listening on %s\n", port); + fflush(stdout); + break; + } + case PN_LISTENER_ACCEPT: + pn_listener_accept2(pn_event_listener(event), NULL, NULL); + break; + + case PN_CONNECTION_INIT: + pn_connection_set_container(pn_event_connection(event), app->container_id); + break; + + case PN_CONNECTION_BOUND: { + /* Turn off security */ + pn_transport_t *t = pn_event_transport(event); + pn_transport_require_auth(t, false); + pn_sasl_allowed_mechs(pn_sasl(t), "ANONYMOUS"); + break; + } + case PN_CONNECTION_REMOTE_OPEN: { + pn_connection_open(pn_event_connection(event)); /* Complete the open */ + break; + } + + case PN_SESSION_REMOTE_OPEN: { + pn_session_open(pn_event_session(event)); + break; + } + + case PN_TRANSPORT_CLOSED: + check_condition(event, pn_transport_condition(pn_event_transport(event)), app); + break; + + case PN_CONNECTION_REMOTE_CLOSE: + check_condition(event, pn_connection_remote_condition(pn_event_connection(event)), app); + pn_connection_close(pn_event_connection(event)); /* Return the close */ + break; + + case PN_SESSION_REMOTE_CLOSE: + check_condition(event, pn_session_remote_condition(pn_event_session(event)), app); + pn_session_close(pn_event_session(event)); /* Return the close */ + pn_session_free(pn_event_session(event)); + break; + + case PN_LINK_REMOTE_CLOSE: + case PN_LINK_REMOTE_DETACH: + check_condition(event, pn_link_remote_condition(pn_event_link(event)), app); + pn_link_close(pn_event_link(event)); /* Return the close */ + pn_link_free(pn_event_link(event)); + break; + + case PN_PROACTOR_TIMEOUT: + /* Wake the sender's connection */ + pn_connection_wake(pn_session_connection(pn_link_session(app->sender))); + break; + + case PN_LISTENER_CLOSE: + app->listener = NULL; /* Listener is closed */ + check_condition(event, pn_listener_condition(pn_event_listener(event)), app); + break; + + case PN_PROACTOR_INACTIVE: + return false; + break; + + default: { + pn_link_t *l = pn_event_link(event); + if (l) { /* Only delegate link-related events */ + if (pn_link_is_sender(l)) { + handle_send(app, event); + } else { + handle_receive(app, event); + } + } + } + } + return true; +} + +void run(app_data_t *app) { + /* Loop and handle events */ + do { + pn_event_batch_t *events = pn_proactor_wait(app->proactor); + pn_event_t *e; + for (e = pn_event_batch_next(events); e; e = pn_event_batch_next(events)) { + if (!handle(app, e)) { + return; + } + } + pn_proactor_done(app->proactor, events); + } while(true); +} + +int main(int argc, char **argv) { + struct app_data_t app = {0}; + char addr[PN_MAX_ADDR]; + app.container_id = argv[0]; /* Should be unique */ + app.host = (argc > 1) ? argv[1] : ""; + app.port = (argc > 2) ? argv[2] : "amqp"; + app.amqp_address = (argc > 3) ? argv[3] : "examples"; + app.message_count = (argc > 4) ? atoi(argv[4]) : 10; + + /* Create the proactor and connect */ + app.proactor = pn_proactor(); + app.listener = pn_listener(); + pn_proactor_addr(addr, sizeof(addr), app.host, app.port); + pn_proactor_listen(app.proactor, app.listener, addr, 16); + run(&app); + pn_proactor_free(app.proactor); + free(app.msgout.start); + free(app.msgin.start); + return exit_code; +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/examples/example_test.py ---------------------------------------------------------------------- diff --git a/c/examples/example_test.py b/c/examples/example_test.py new file mode 100644 index 0000000..b6a5a4a --- /dev/null +++ b/c/examples/example_test.py @@ -0,0 +1,125 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License +# + +# This is a test script to run the examples and verify that they behave as expected. + +import unittest, sys, time +from proctest import * + +def python_cmd(name): + dir = os.path.dirname(__file__) + return [sys.executable, os.path.join(dir, "..", "..", "python", name)] + +MESSAGES=10 + +def receive_expect_messages(n=MESSAGES): return ''.join(['{"sequence"=%s}\n'%i for i in range(1, n+1)]) +def receive_expect_total(n=MESSAGES): return "%s messages received\n"%n +def receive_expect(n=MESSAGES): return receive_expect_messages(n)+receive_expect_total(n) + +def send_expect(n=MESSAGES): return "%s messages sent and acknowledged\n" % n +def send_abort_expect(n=MESSAGES): return "%s messages started and aborted\n" % n + +def wait_listening(proc): + m = proc.wait_re("listening on ([0-9]+)$") + return m.group(1), m.group(0)+"\n" # Return (port, line) + +class Broker(object): + def __init__(self, test): + self.test = test + + def __enter__(self): + self.proc = self.test.proc(["broker", "", "0"]) + self.port, _ = wait_listening(self.proc) + return self + + def __exit__(self, *args): + b = getattr(self, "proc") + if b: + if b.poll() != None: # Broker crashed + raise ProcError(b, "broker crash") + b.kill() + +class CExampleTest(ProcTestCase): + + def runex(self, name, port, messages=MESSAGES): + """Run an example with standard arguments, return output""" + return self.proc([name, "", str(port), "xtest", str(messages)]).wait_exit() + + def test_send_receive(self): + """Send first then receive""" + with Broker(self) as b: + self.assertEqual(send_expect(), self.runex("send", b.port)) + self.assertMultiLineEqual(receive_expect(), self.runex("receive", b.port)) + + def test_receive_send(self): + """Start receiving first, then send.""" + with Broker(self) as b: + self.assertEqual(send_expect(), self.runex("send", b.port)) + self.assertMultiLineEqual(receive_expect(), self.runex("receive", b.port)) + + def test_send_direct(self): + """Send to direct server""" + d = self.proc(["direct", "", "0"]) + port, line = wait_listening(d) + self.assertEqual(send_expect(), self.runex("send", port)) + self.assertMultiLineEqual(line+receive_expect(), d.wait_exit()) + + def test_receive_direct(self): + """Receive from direct server""" + d = self.proc(["direct", "", "0"]) + port, line = wait_listening(d) + self.assertMultiLineEqual(receive_expect(), self.runex("receive", port)) + self.assertEqual(line+"10 messages sent and acknowledged\n", d.wait_exit()) + + def test_send_abort_broker(self): + """Sending aborted messages to a broker""" + with Broker(self) as b: + self.assertEqual(send_expect(), self.runex("send", b.port)) + self.assertEqual(send_abort_expect(), self.runex("send-abort", b.port)) + b.proc.wait_re("Message aborted\n"*MESSAGES) + self.assertEqual(send_expect(), self.runex("send", b.port)) + expect = receive_expect_messages(MESSAGES)+receive_expect_messages(MESSAGES)+receive_expect_total(20) + self.assertMultiLineEqual(expect, self.runex("receive", b.port, "20")) + + def test_send_abort_direct(self): + """Send aborted messages to the direct server""" + d = self.proc(["direct", "", "0", "examples", "20"]) + port, line = wait_listening(d) + expect = line + self.assertEqual(send_expect(), self.runex("send", port)) + expect += receive_expect_messages() + d.wait_re(expect) + self.assertEqual(send_abort_expect(), self.runex("send-abort", port)) + expect += "Message aborted\n"*MESSAGES + d.wait_re(expect) + self.assertEqual(send_expect(), self.runex("send", port)) + expect += receive_expect_messages()+receive_expect_total(20) + self.maxDiff = None + self.assertMultiLineEqual(expect, d.wait_exit()) + + def test_send_ssl_receive(self): + """Send first then receive""" + with Broker(self) as b: + got = self.runex("send-ssl", b.port) + self.assertIn("secure connection:", got) + self.assertIn(send_expect(), got) + self.assertMultiLineEqual(receive_expect(), self.runex("receive", b.port)) + +if __name__ == "__main__": + unittest.main() http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/examples/receive.c ---------------------------------------------------------------------- diff --git a/c/examples/receive.c b/c/examples/receive.c new file mode 100644 index 0000000..0d0c988 --- /dev/null +++ b/c/examples/receive.c @@ -0,0 +1,200 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <proton/connection.h> +#include <proton/condition.h> +#include <proton/delivery.h> +#include <proton/link.h> +#include <proton/message.h> +#include <proton/proactor.h> +#include <proton/session.h> +#include <proton/transport.h> + +#include <stdio.h> +#include <stdlib.h> + +typedef struct app_data_t { + const char *host, *port; + const char *amqp_address; + const char *container_id; + int message_count; + + pn_proactor_t *proactor; + int received; + bool finished; + pn_rwbytes_t msgin; /* Partially received message */ +} app_data_t; + +static const int BATCH = 1000; /* Batch size for unlimited receive */ + +static int exit_code = 0; + +static void check_condition(pn_event_t *e, pn_condition_t *cond) { + if (pn_condition_is_set(cond)) { + fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)), + pn_condition_get_name(cond), pn_condition_get_description(cond)); + pn_connection_close(pn_event_connection(e)); + exit_code = 1; + } +} + +static void decode_message(pn_rwbytes_t data) { + pn_message_t *m = pn_message(); + int err = pn_message_decode(m, data.start, data.size); + if (!err) { + /* Print the decoded message */ + pn_string_t *s = pn_string(NULL); + pn_inspect(pn_message_body(m), s); + printf("%s\n", pn_string_get(s)); + pn_free(s); + pn_message_free(m); + free(data.start); + } else { + fprintf(stderr, "decode_message: %s\n", pn_code(err)); + exit_code = 1; + } +} + +/* Return true to continue, false to exit */ +static bool handle(app_data_t* app, pn_event_t* event) { + switch (pn_event_type(event)) { + + case PN_CONNECTION_INIT: { + pn_connection_t* c = pn_event_connection(event); + pn_session_t* s = pn_session(c); + pn_connection_set_container(c, app->container_id); + pn_connection_open(c); + pn_session_open(s); + { + pn_link_t* l = pn_receiver(s, "my_receiver"); + pn_terminus_set_address(pn_link_source(l), app->amqp_address); + pn_link_open(l); + /* cannot receive without granting credit: */ + pn_link_flow(l, app->message_count ? app->message_count : BATCH); + } + } break; + + case PN_DELIVERY: { + /* A message has been received */ + pn_delivery_t *d = pn_event_delivery(event); + if (pn_delivery_readable(d)) { + pn_link_t *l = pn_delivery_link(d); + size_t size = pn_delivery_pending(d); + pn_rwbytes_t* m = &app->msgin; /* Append data to incoming message buffer */ + int recv; + size_t oldsize = m->size; + m->size += size; + m->start = (char*)realloc(m->start, m->size); + recv = pn_link_recv(l, m->start + oldsize, m->size); + if (recv == PN_ABORTED) { + fprintf(stderr, "Message aborted\n"); + m->size = 0; /* Forget the data we accumulated */ + pn_delivery_settle(d); /* Free the delivery so we can receive the next message */ + pn_link_flow(l, 1); /* Replace credit for aborted message */ + } else if (recv < 0 && recv != PN_EOS) { /* Unexpected error */ + pn_condition_format(pn_link_condition(l), "broker", "PN_DELIVERY error: %s", pn_code(recv)); + pn_link_close(l); /* Unexpected error, close the link */ + } else if (!pn_delivery_partial(d)) { /* Message is complete */ + decode_message(*m); + *m = pn_rwbytes_null; /* Reset the buffer for the next message*/ + /* Accept the delivery */ + pn_delivery_update(d, PN_ACCEPTED); + pn_delivery_settle(d); /* settle and free d */ + if (app->message_count == 0) { + /* receive forever - see if more credit is needed */ + if (pn_link_credit(l) < BATCH/2) { + /* Grant enough credit to bring it up to BATCH: */ + pn_link_flow(l, BATCH - pn_link_credit(l)); + } + } else if (++app->received >= app->message_count) { + pn_session_t *ssn = pn_link_session(l); + printf("%d messages received\n", app->received); + pn_link_close(l); + pn_session_close(ssn); + pn_connection_close(pn_session_connection(ssn)); + } + } + } + break; + } + + case PN_TRANSPORT_CLOSED: + check_condition(event, pn_transport_condition(pn_event_transport(event))); + break; + + case PN_CONNECTION_REMOTE_CLOSE: + check_condition(event, pn_connection_remote_condition(pn_event_connection(event))); + pn_connection_close(pn_event_connection(event)); + break; + + case PN_SESSION_REMOTE_CLOSE: + check_condition(event, pn_session_remote_condition(pn_event_session(event))); + pn_connection_close(pn_event_connection(event)); + break; + + case PN_LINK_REMOTE_CLOSE: + case PN_LINK_REMOTE_DETACH: + check_condition(event, pn_link_remote_condition(pn_event_link(event))); + pn_connection_close(pn_event_connection(event)); + break; + + case PN_PROACTOR_INACTIVE: + return false; + break; + + default: + break; + } + return true; +} + +void run(app_data_t *app) { + /* Loop and handle events */ + do { + pn_event_batch_t *events = pn_proactor_wait(app->proactor); + pn_event_t *e; + for (e = pn_event_batch_next(events); e; e = pn_event_batch_next(events)) { + if (!handle(app, e) || exit_code != 0) { + return; + } + } + pn_proactor_done(app->proactor, events); + } while(true); +} + +int main(int argc, char **argv) { + struct app_data_t app = {0}; + char addr[PN_MAX_ADDR]; + + app.container_id = argv[0]; /* Should be unique */ + app.host = (argc > 1) ? argv[1] : ""; + app.port = (argc > 2) ? argv[2] : "amqp"; + app.amqp_address = (argc > 3) ? argv[3] : "examples"; + app.message_count = (argc > 4) ? atoi(argv[4]) : 10; + + /* Create the proactor and connect */ + app.proactor = pn_proactor(); + pn_proactor_addr(addr, sizeof(addr), app.host, app.port); + pn_proactor_connect2(app.proactor, NULL, NULL, addr); + run(&app); + pn_proactor_free(app.proactor); + return exit_code; +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/examples/send-abort.c ---------------------------------------------------------------------- diff --git a/c/examples/send-abort.c b/c/examples/send-abort.c new file mode 100644 index 0000000..cc88ff0 --- /dev/null +++ b/c/examples/send-abort.c @@ -0,0 +1,226 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <proton/connection.h> +#include <proton/condition.h> +#include <proton/delivery.h> +#include <proton/link.h> +#include <proton/message.h> +#include <proton/proactor.h> +#include <proton/session.h> +#include <proton/transport.h> + +#include <stdio.h> +#include <stdlib.h> + +/* + * Send aborted messages. + * + * An 80 Kbyte (MSG_SIZE) message is generated. + * The size is chosen to be large enough so that some of the message + * will go out on the wire before the abort is sent. + * + * 79 Kbytes (MSG_SIZE - HOLDBACK) are sent on one link flow event. + * The message is aborted on the next link flow event. + */ + +#define MSG_SIZE 80000 +#define HOLDBACK 1000 + +#if MSG_SIZE <= HOLDBACK +#error "MSG_SIZE must be greater than HOLDBACK" +#endif + +typedef struct app_data_t { + const char *host, *port; + const char *amqp_address; + const char *container_id; + int message_count; + + pn_proactor_t *proactor; + pn_rwbytes_t message_buffer; + int sent; + int aborted; + bool in_progress; +} app_data_t; + +static int exit_code = 0; + +static void check_condition(pn_event_t *e, pn_condition_t *cond) { + if (pn_condition_is_set(cond)) { + fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)), + pn_condition_get_name(cond), pn_condition_get_description(cond)); + pn_connection_close(pn_event_connection(e)); + exit_code = 1; + } +} + +/* Create a message with a map { "sequence" : number } encode it and return the encoded buffer. */ +static pn_bytes_t encode_message(app_data_t* app) { + /* Construct a message with the map { "sequence": app.sent } */ + pn_message_t* message = pn_message(); + char data[MSG_SIZE + 11] = {0}; + pn_data_t* body; + pn_data_put_int(pn_message_id(message), app->sent); /* Set the message_id also */ + body = pn_message_body(message); + pn_data_enter(body); + pn_data_put_string(body, pn_bytes(MSG_SIZE, data)); + pn_data_exit(body); + + /* encode the message, expanding the encode buffer as needed */ + if (app->message_buffer.start == NULL) { + static const size_t initial_size = MSG_SIZE + 1000; + app->message_buffer = pn_rwbytes(initial_size, (char*)malloc(initial_size)); + } + /* app->message_buffer is the total buffer space available. */ + /* mbuf wil point at just the portion used by the encoded message */ + { + pn_rwbytes_t mbuf = pn_rwbytes(app->message_buffer.size, app->message_buffer.start); + int status = 0; + while ((status = pn_message_encode(message, mbuf.start, &mbuf.size)) == PN_OVERFLOW) { + app->message_buffer.size *= 2; + app->message_buffer.start = (char*)realloc(app->message_buffer.start, app->message_buffer.size); + mbuf.size = app->message_buffer.size; + } + if (status != 0) { + fprintf(stderr, "error encoding message: %s\n", pn_error_text(pn_message_error(message))); + exit(1); + } + pn_message_free(message); + return pn_bytes(mbuf.size, mbuf.start); + } +} + +/* Returns true to continue, false if finished */ +static bool handle(app_data_t* app, pn_event_t* event) { + switch (pn_event_type(event)) { + + case PN_CONNECTION_INIT: { + pn_connection_t* c = pn_event_connection(event); + pn_session_t* s = pn_session(pn_event_connection(event)); + pn_connection_set_container(c, app->container_id); + pn_connection_open(c); + pn_session_open(s); + { + pn_link_t* l = pn_sender(s, "my_sender"); + pn_terminus_set_address(pn_link_target(l), app->amqp_address); + pn_link_open(l); + break; + } + } + + case PN_LINK_FLOW: { + /* The peer has given us some credit, now we can send messages */ + pn_link_t *sender = pn_event_link(event); + while (app->in_progress || (pn_link_credit(sender) > 0 && app->sent < app->message_count)) { + if (!app->in_progress) { + pn_bytes_t msgbuf = encode_message(app); + /* Use sent counter as unique delivery tag. */ + pn_delivery(sender, pn_dtag((const char *)&app->sent, sizeof(app->sent))); + pn_link_send(sender, msgbuf.start, msgbuf.size - HOLDBACK); /* Send some part of message */ + app->in_progress = true; + /* Return from this link flow event and abort the message on next, */ + break; + } else { + pn_delivery_t * pnd = pn_link_current(sender); + pn_delivery_abort(pnd); + /* aborted delivery is presettled and never ack'd. */ + if (++app->aborted == app->message_count) { + printf("%d messages started and aborted\n", app->aborted); + pn_connection_close(pn_event_connection(event)); + /* Continue handling events till we receive TRANSPORT_CLOSED */ + } + ++app->sent; + app->in_progress = false; + } + } + break; + } + + case PN_DELIVERY: { + /* We received acknowledgement from the peer that a message was delivered. */ + pn_delivery_t* d = pn_event_delivery(event); + fprintf(stderr, "Aborted deliveries should not receive delivery events. Delivery state %d\n", (int)pn_delivery_remote_state(d)); + pn_connection_close(pn_event_connection(event)); + exit_code=1; + break; + } + + case PN_TRANSPORT_CLOSED: + check_condition(event, pn_transport_condition(pn_event_transport(event))); + break; + + case PN_CONNECTION_REMOTE_CLOSE: + check_condition(event, pn_connection_remote_condition(pn_event_connection(event))); + pn_connection_close(pn_event_connection(event)); + break; + + case PN_SESSION_REMOTE_CLOSE: + check_condition(event, pn_session_remote_condition(pn_event_session(event))); + pn_connection_close(pn_event_connection(event)); + break; + + case PN_LINK_REMOTE_CLOSE: + case PN_LINK_REMOTE_DETACH: + check_condition(event, pn_link_remote_condition(pn_event_link(event))); + pn_connection_close(pn_event_connection(event)); + break; + + case PN_PROACTOR_INACTIVE: + return false; + + default: break; + } + return true; +} + +void run(app_data_t *app) { + /* Loop and handle events */ + do { + pn_event_batch_t *events = pn_proactor_wait(app->proactor); + pn_event_t* e; + for (e = pn_event_batch_next(events); e; e = pn_event_batch_next(events)) { + if (!handle(app, e)) { + return; + } + } + pn_proactor_done(app->proactor, events); + } while(true); +} + +int main(int argc, char **argv) { + struct app_data_t app = {0}; + char addr[PN_MAX_ADDR]; + + app.container_id = argv[0]; /* Should be unique */ + app.host = (argc > 1) ? argv[1] : ""; + app.port = (argc > 2) ? argv[2] : "amqp"; + app.amqp_address = (argc > 3) ? argv[3] : "examples"; + app.message_count = (argc > 4) ? atoi(argv[4]) : 10; + + app.proactor = pn_proactor(); + pn_proactor_addr(addr, sizeof(addr), app.host, app.port); + pn_proactor_connect2(app.proactor, NULL, NULL, addr); + run(&app); + pn_proactor_free(app.proactor); + free(app.message_buffer.start); + return exit_code; +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/examples/send-ssl.c ---------------------------------------------------------------------- diff --git a/c/examples/send-ssl.c b/c/examples/send-ssl.c new file mode 100644 index 0000000..76e66a9 --- /dev/null +++ b/c/examples/send-ssl.c @@ -0,0 +1,246 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <proton/connection.h> +#include <proton/condition.h> +#include <proton/delivery.h> +#include <proton/link.h> +#include <proton/message.h> +#include <proton/proactor.h> +#include <proton/session.h> +#include <proton/ssl.h> +#include <proton/transport.h> + +#include <stdio.h> +#include <stdlib.h> + +typedef struct app_data_t { + const char *host, *port; + const char *amqp_address; + const char *container_id; + int message_count; + + pn_proactor_t *proactor; + pn_rwbytes_t message_buffer; + int sent; + int acknowledged; + pn_ssl_domain_t *ssl_domain; +} app_data_t; + +static int exit_code = 0; + +/* Note must be run in the current directory to find certificate files */ +#define SSL_FILE(NAME) CMAKE_CURRENT_SOURCE_DIR "/ssl-certs/" NAME +#define SSL_PW "tclientpw" +/* Windows vs. OpenSSL certificates */ +#if defined(_WIN32) +# define CERTIFICATE(NAME) SSL_FILE(NAME "-certificate.p12") +# define SET_CREDENTIALS(DOMAIN, NAME) \ + pn_ssl_domain_set_credentials(DOMAIN, SSL_FILE(NAME "-full.p12"), "", SSL_PW) +#else +# define CERTIFICATE(NAME) SSL_FILE(NAME "-certificate.pem") +# define SET_CREDENTIALS(DOMAIN, NAME) \ + pn_ssl_domain_set_credentials(DOMAIN, CERTIFICATE(NAME), SSL_FILE(NAME "-private-key.pem"), SSL_PW) +#endif + + +static void check_condition(pn_event_t *e, pn_condition_t *cond) { + if (pn_condition_is_set(cond)) { + fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)), + pn_condition_get_name(cond), pn_condition_get_description(cond)); + pn_connection_close(pn_event_connection(e)); + exit_code = 1; + } +} + +/* Create a message with a map { "sequence" : number } encode it and return the encoded buffer. */ +static pn_bytes_t encode_message(app_data_t* app) { + /* Construct a message with the map { "sequence": app.sent } */ + pn_message_t* message = pn_message(); + pn_data_t* body = pn_message_body(message); + pn_data_put_int(pn_message_id(message), app->sent); /* Set the message_id also */ + pn_data_put_map(body); + pn_data_enter(body); + pn_data_put_string(body, pn_bytes(sizeof("sequence")-1, "sequence")); + pn_data_put_int(body, app->sent); /* The sequence number */ + pn_data_exit(body); + + /* encode the message, expanding the encode buffer as needed */ + if (app->message_buffer.start == NULL) { + static const size_t initial_size = 128; + app->message_buffer = pn_rwbytes(initial_size, (char*)malloc(initial_size)); + } + /* app->message_buffer is the total buffer space available. */ + /* mbuf wil point at just the portion used by the encoded message */ + { + pn_rwbytes_t mbuf = pn_rwbytes(app->message_buffer.size, app->message_buffer.start); + int status = 0; + while ((status = pn_message_encode(message, mbuf.start, &mbuf.size)) == PN_OVERFLOW) { + app->message_buffer.size *= 2; + app->message_buffer.start = (char*)realloc(app->message_buffer.start, app->message_buffer.size); + mbuf.size = app->message_buffer.size; + mbuf.start = app->message_buffer.start; + } + if (status != 0) { + fprintf(stderr, "error encoding message: %s\n", pn_error_text(pn_message_error(message))); + exit(1); + } + pn_message_free(message); + return pn_bytes(mbuf.size, mbuf.start); + } +} + +/* Returns true to continue, false if finished */ +static bool handle(app_data_t* app, pn_event_t* event) { + switch (pn_event_type(event)) { + + case PN_CONNECTION_INIT: { + pn_connection_t* c = pn_event_connection(event); + pn_session_t* s = pn_session(pn_event_connection(event)); + pn_connection_set_container(c, app->container_id); + pn_connection_open(c); + pn_session_open(s); + { + pn_link_t* l = pn_sender(s, "my_sender"); + pn_terminus_set_address(pn_link_target(l), app->amqp_address); + pn_link_open(l); + break; + } + } + + case PN_CONNECTION_REMOTE_OPEN: { + pn_ssl_t *ssl = pn_ssl(pn_event_transport(event)); + if (ssl) { + char name[1024]; + pn_ssl_get_protocol_name(ssl, name, sizeof(name)); + printf("secure connection: %s\n", name); + fflush(stdout); + } + } + + case PN_LINK_FLOW: { + /* The peer has given us some credit, now we can send messages */ + pn_link_t *sender = pn_event_link(event); + while (pn_link_credit(sender) > 0 && app->sent < app->message_count) { + ++app->sent; + /* Use sent counter as unique delivery tag. */ + pn_delivery(sender, pn_dtag((const char *)&app->sent, sizeof(app->sent))); + { + pn_bytes_t msgbuf = encode_message(app); + pn_link_send(sender, msgbuf.start, msgbuf.size); + } + pn_link_advance(sender); + } + break; + } + + case PN_DELIVERY: { + /* We received acknowledgement from the peer that a message was delivered. */ + pn_delivery_t* d = pn_event_delivery(event); + if (pn_delivery_remote_state(d) == PN_ACCEPTED) { + if (++app->acknowledged == app->message_count) { + printf("%d messages sent and acknowledged\n", app->acknowledged); + fflush(stdout); + pn_connection_close(pn_event_connection(event)); + /* Continue handling events till we receive TRANSPORT_CLOSED */ + } + } else { + fprintf(stderr, "unexpected delivery state %d\n", (int)pn_delivery_remote_state(d)); + pn_connection_close(pn_event_connection(event)); + exit_code=1; + } + break; + } + + case PN_TRANSPORT_CLOSED: + check_condition(event, pn_transport_condition(pn_event_transport(event))); + break; + + case PN_CONNECTION_REMOTE_CLOSE: + check_condition(event, pn_connection_remote_condition(pn_event_connection(event))); + pn_connection_close(pn_event_connection(event)); + break; + + case PN_SESSION_REMOTE_CLOSE: + check_condition(event, pn_session_remote_condition(pn_event_session(event))); + pn_connection_close(pn_event_connection(event)); + break; + + case PN_LINK_REMOTE_CLOSE: + case PN_LINK_REMOTE_DETACH: + check_condition(event, pn_link_remote_condition(pn_event_link(event))); + pn_connection_close(pn_event_connection(event)); + break; + + case PN_PROACTOR_INACTIVE: + return false; + + default: break; + } + return true; +} + +void run(app_data_t *app) { + /* Loop and handle events */ + do { + pn_event_batch_t *events = pn_proactor_wait(app->proactor); + pn_event_t *e; + for (e = pn_event_batch_next(events); e; e = pn_event_batch_next(events)) { + if (!handle(app, e)) { + return; + } + } + pn_proactor_done(app->proactor, events); + } while(true); +} + +int main(int argc, char **argv) { + struct app_data_t app = {0}; + char addr[PN_MAX_ADDR]; + pn_transport_t *t = NULL; + int err; + + app.container_id = argv[0]; /* Should be unique */ + app.host = (argc > 1) ? argv[1] : ""; + app.port = (argc > 2) ? argv[2] : "amqp"; + app.amqp_address = (argc > 3) ? argv[3] : "examples"; + app.message_count = (argc > 4) ? atoi(argv[4]) : 10; + app.ssl_domain = pn_ssl_domain(PN_SSL_MODE_CLIENT); + + app.proactor = pn_proactor(); + pn_proactor_addr(addr, sizeof(addr), app.host, app.port); + + /* Configure a transport for SSL. The transport will be freed by the proactor. */ + t = pn_transport(); + err = pn_ssl_init(pn_ssl(t), app.ssl_domain, NULL); + if (err) { + fprintf(stderr, "error initializing SSL: %s\n", pn_code(err)); + return 1; + } + pn_proactor_connect2(app.proactor, NULL, t, addr); + + run(&app); + + pn_ssl_domain_free(app.ssl_domain); + pn_proactor_free(app.proactor); + free(app.message_buffer.start); + return exit_code; +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/examples/send.c ---------------------------------------------------------------------- diff --git a/c/examples/send.c b/c/examples/send.c new file mode 100644 index 0000000..9e8cc4a --- /dev/null +++ b/c/examples/send.c @@ -0,0 +1,204 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <proton/connection.h> +#include <proton/condition.h> +#include <proton/delivery.h> +#include <proton/link.h> +#include <proton/message.h> +#include <proton/proactor.h> +#include <proton/session.h> +#include <proton/transport.h> + +#include <stdio.h> +#include <stdlib.h> + +typedef struct app_data_t { + const char *host, *port; + const char *amqp_address; + const char *container_id; + int message_count; + + pn_proactor_t *proactor; + pn_rwbytes_t message_buffer; + int sent; + int acknowledged; +} app_data_t; + +static int exit_code = 0; + +static void check_condition(pn_event_t *e, pn_condition_t *cond) { + if (pn_condition_is_set(cond)) { + fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)), + pn_condition_get_name(cond), pn_condition_get_description(cond)); + pn_connection_close(pn_event_connection(e)); + exit_code = 1; + } +} + +/* Create a message with a map { "sequence" : number } encode it and return the encoded buffer. */ +static pn_bytes_t encode_message(app_data_t* app) { + /* Construct a message with the map { "sequence": app.sent } */ + pn_message_t* message = pn_message(); + pn_data_t* body = pn_message_body(message); + pn_data_put_int(pn_message_id(message), app->sent); /* Set the message_id also */ + pn_data_put_map(body); + pn_data_enter(body); + pn_data_put_string(body, pn_bytes(sizeof("sequence")-1, "sequence")); + pn_data_put_int(body, app->sent); /* The sequence number */ + pn_data_exit(body); + + /* encode the message, expanding the encode buffer as needed */ + if (app->message_buffer.start == NULL) { + static const size_t initial_size = 128; + app->message_buffer = pn_rwbytes(initial_size, (char*)malloc(initial_size)); + } + /* app->message_buffer is the total buffer space available. */ + /* mbuf wil point at just the portion used by the encoded message */ + { + pn_rwbytes_t mbuf = pn_rwbytes(app->message_buffer.size, app->message_buffer.start); + int status = 0; + while ((status = pn_message_encode(message, mbuf.start, &mbuf.size)) == PN_OVERFLOW) { + app->message_buffer.size *= 2; + app->message_buffer.start = (char*)realloc(app->message_buffer.start, app->message_buffer.size); + mbuf.size = app->message_buffer.size; + mbuf.start = app->message_buffer.start; + } + if (status != 0) { + fprintf(stderr, "error encoding message: %s\n", pn_error_text(pn_message_error(message))); + exit(1); + } + pn_message_free(message); + return pn_bytes(mbuf.size, mbuf.start); + } +} + +/* Returns true to continue, false if finished */ +static bool handle(app_data_t* app, pn_event_t* event) { + switch (pn_event_type(event)) { + + case PN_CONNECTION_INIT: { + pn_connection_t* c = pn_event_connection(event); + pn_session_t* s = pn_session(pn_event_connection(event)); + pn_connection_set_container(c, app->container_id); + pn_connection_open(c); + pn_session_open(s); + { + pn_link_t* l = pn_sender(s, "my_sender"); + pn_terminus_set_address(pn_link_target(l), app->amqp_address); + pn_link_open(l); + break; + } + } + + case PN_LINK_FLOW: { + /* The peer has given us some credit, now we can send messages */ + pn_link_t *sender = pn_event_link(event); + while (pn_link_credit(sender) > 0 && app->sent < app->message_count) { + ++app->sent; + /* Use sent counter as unique delivery tag. */ + pn_delivery(sender, pn_dtag((const char *)&app->sent, sizeof(app->sent))); + { + pn_bytes_t msgbuf = encode_message(app); + pn_link_send(sender, msgbuf.start, msgbuf.size); + } + pn_link_advance(sender); + } + break; + } + + case PN_DELIVERY: { + /* We received acknowledgement from the peer that a message was delivered. */ + pn_delivery_t* d = pn_event_delivery(event); + if (pn_delivery_remote_state(d) == PN_ACCEPTED) { + if (++app->acknowledged == app->message_count) { + printf("%d messages sent and acknowledged\n", app->acknowledged); + pn_connection_close(pn_event_connection(event)); + /* Continue handling events till we receive TRANSPORT_CLOSED */ + } + } else { + fprintf(stderr, "unexpected delivery state %d\n", (int)pn_delivery_remote_state(d)); + pn_connection_close(pn_event_connection(event)); + exit_code=1; + } + break; + } + + case PN_TRANSPORT_CLOSED: + check_condition(event, pn_transport_condition(pn_event_transport(event))); + break; + + case PN_CONNECTION_REMOTE_CLOSE: + check_condition(event, pn_connection_remote_condition(pn_event_connection(event))); + pn_connection_close(pn_event_connection(event)); + break; + + case PN_SESSION_REMOTE_CLOSE: + check_condition(event, pn_session_remote_condition(pn_event_session(event))); + pn_connection_close(pn_event_connection(event)); + break; + + case PN_LINK_REMOTE_CLOSE: + case PN_LINK_REMOTE_DETACH: + check_condition(event, pn_link_remote_condition(pn_event_link(event))); + pn_connection_close(pn_event_connection(event)); + break; + + case PN_PROACTOR_INACTIVE: + return false; + + default: break; + } + return true; +} + +void run(app_data_t *app) { + /* Loop and handle events */ + do { + pn_event_batch_t *events = pn_proactor_wait(app->proactor); + pn_event_t *e; + for (e = pn_event_batch_next(events); e; e = pn_event_batch_next(events)) { + if (!handle(app, e)) { + return; + } + } + pn_proactor_done(app->proactor, events); + } while(true); +} + +int main(int argc, char **argv) { + struct app_data_t app = {0}; + char addr[PN_MAX_ADDR]; + + app.container_id = argv[0]; /* Should be unique */ + app.host = (argc > 1) ? argv[1] : ""; + app.port = (argc > 2) ? argv[2] : "amqp"; + app.amqp_address = (argc > 3) ? argv[3] : "examples"; + app.message_count = (argc > 4) ? atoi(argv[4]) : 10; + + app.proactor = pn_proactor(); + pn_proactor_addr(addr, sizeof(addr), app.host, app.port); + pn_proactor_connect2(app.proactor, NULL, NULL, addr); + run(&app); + pn_proactor_free(app.proactor); + free(app.message_buffer.start); + return exit_code; +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/examples/ssl-certs/README.txt ---------------------------------------------------------------------- diff --git a/c/examples/ssl-certs/README.txt b/c/examples/ssl-certs/README.txt new file mode 100644 index 0000000..9a8a4f9 --- /dev/null +++ b/c/examples/ssl-certs/README.txt @@ -0,0 +1,24 @@ +This directory contains basic self signed test certificates for use by +proton examples. + +The ".pem" files are in the format expected by proton implementations +using OpenSSL. The ".p12" file are for Windows implementations using +SChannel. + +The commands used to generate the certificates follow. + + +make_pn_cert() +{ + name=$1 + subject=$2 + passwd=$3 + # create the pem files + openssl req -newkey rsa:2048 -keyout $name-private-key.pem -out $name-certificate.pem -subj $subject -passout pass:$passwd -x509 -days 3650 + # create the p12 files + openssl pkcs12 -export -out $name-full.p12 -passin pass:$passwd -passout pass:$passwd -inkey $name-private-key.pem -in $name-certificate.pem -name $name + openssl pkcs12 -export -out $name-certificate.p12 -in $name-certificate.pem -name $name -nokeys -passout pass: +} + +make_pn_cert tserver /CN=test_server/OU=proton_test tserverpw +make_pn_cert tclient /CN=test_client/OU=proton_test tclientpw http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/examples/ssl-certs/tclient-certificate.p12 ---------------------------------------------------------------------- diff --git a/c/examples/ssl-certs/tclient-certificate.p12 b/c/examples/ssl-certs/tclient-certificate.p12 new file mode 100644 index 0000000..4d0e000 Binary files /dev/null and b/c/examples/ssl-certs/tclient-certificate.p12 differ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/examples/ssl-certs/tclient-certificate.pem ---------------------------------------------------------------------- diff --git a/c/examples/ssl-certs/tclient-certificate.pem b/c/examples/ssl-certs/tclient-certificate.pem new file mode 100644 index 0000000..8088e2e --- /dev/null +++ b/c/examples/ssl-certs/tclient-certificate.pem @@ -0,0 +1,19 @@ +-----BEGIN CERTIFICATE----- +MIIDKzCCAhOgAwIBAgIJAIV7frIjftgcMA0GCSqGSIb3DQEBCwUAMCwxFDASBgNV +BAMMC3Rlc3RfY2xpZW50MRQwEgYDVQQLDAtwcm90b25fdGVzdDAeFw0xNTExMjcx +ODEwMzlaFw0yNTExMjQxODEwMzlaMCwxFDASBgNVBAMMC3Rlc3RfY2xpZW50MRQw +EgYDVQQLDAtwcm90b25fdGVzdDCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoC +ggEBAPCIS4qUdOtQplUxZ6WW0LXcvosqFP6qOiCARLSEWpR3B8bq213rzefwwfcM +4TtMr88bP+huLKmlyMfwpl8yB88eXkscPgaAce2zk24urWkFXKSQ6GPitWBLGqBa +V+W0wJ4mfW7MwefVslWfGXI381QEUlBHjkFG30AtzMMTRj2GK2JqUlRXZPljGyB7 +WcXwxcoS+HkKV7FtHWSkLAzyXwQ9vsCUEYdWTUaGXfCUNRSRV7h1LIANbu03NxV0 +XdEl7WXcr7tuTw3axeUGhRFVhLegrxKLuZTTno4aAJnEr8uaDzjxvXnv3Ne2igvy +gRfZgOMx+XrZEob9OpAoRghQt4cCAwEAAaNQME4wHQYDVR0OBBYEFE4vbyiM0RjG +TLMLLGGhMZE/5x1GMB8GA1UdIwQYMBaAFE4vbyiM0RjGTLMLLGGhMZE/5x1GMAwG +A1UdEwQFMAMBAf8wDQYJKoZIhvcNAQELBQADggEBAErr/rvLS9Ig0UCMwh1J1lA9 +/gvXf93iIK/SjrFIAqYRmfZxg4husfoes8t2hFUeuqoH05TuSOoXG8p8DpgTSGmF +jAFe+T90vJZTm0oqZkkkI/hdzjGQoHURRp9/O2Z/lm39KSKGVAN5pUWCUDi/G5iS +P9LZPJN6a5syXMrR6x62IPxAXowlpXkRghKClF3zPOaOBTzT1V27EkI8IEgC+p45 +246EooLnw8ibB+ucNc3KHNzpgKGVd/622+I+Q5eg9AT9PLFttP+R2ECsrVDDPYuA +p0qaSnwgeozj/d6K3FOgKKEKbzBmpWgkv0jdcVk18aPMHypI/RDtZ/+3ET2Ksi8= +-----END CERTIFICATE----- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/examples/ssl-certs/tclient-full.p12 ---------------------------------------------------------------------- diff --git a/c/examples/ssl-certs/tclient-full.p12 b/c/examples/ssl-certs/tclient-full.p12 new file mode 100644 index 0000000..ad2d7d3 Binary files /dev/null and b/c/examples/ssl-certs/tclient-full.p12 differ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/examples/ssl-certs/tclient-private-key.pem ---------------------------------------------------------------------- diff --git a/c/examples/ssl-certs/tclient-private-key.pem b/c/examples/ssl-certs/tclient-private-key.pem new file mode 100644 index 0000000..e5c114d --- /dev/null +++ b/c/examples/ssl-certs/tclient-private-key.pem @@ -0,0 +1,30 @@ +-----BEGIN ENCRYPTED PRIVATE KEY----- +MIIFDjBABgkqhkiG9w0BBQ0wMzAbBgkqhkiG9w0BBQwwDgQICy6ghWp45z4CAggA +MBQGCCqGSIb3DQMHBAiVdDoo4NIghQSCBMixGm1bm/omMxsaKnIPO7zm5dyLexJ+ +yTFpmh2KV7kQqmpzCyIOdoG6K8YqFnie2XdFWm3S8faRHoMq54bDmyEWIxfQPq5f +I1iYFbIZkbnhUvK53RActsEUMf0locS4xylU7VQK3XTAwp0TVip3Lp3ehEMEdcXL +iUWibGsoTPKcY9MIWGXZAJXsEXoeHt6k2hHo1G4E0/Bi6mLW1LY/cxZCjHTGD6qI +Kt54SCCDvinqVa+rixw6yX9F14EA6bhALami8e+Ccd3lqHOyYlXcBaSS1ezCg6ig +oNK97mC+gEGy1KlkZDKWXclFoOCBXRBe4DByre6Rlq3yeI9L42bvAuSBSmf5QT5g +73Yl8vjEAKR65awBT09dPuKu7t+Fb6vkwF8/t+uyj9IuL+42UuXhMLK3ohf+6DbU +8/zB4y3GXI80QmWM0+Wx4n6khFhPFLHt2q0Sn6V9PG1vtHyiq50oSCoyrPQLaecp +hefnMCFBYTcT3JUwmoVGGy0boIAwL7T4aGsMt7QhwOx5tU35tKFxyY7m4fX14AKo +2EIy+TPQwCGkGf3Puy/Pc9VA8IAxB5+WwSrjk+NeCv88eIX7gy43k4rCr+OmD9FF +wknr3xoP3KYhNXjdZ4Ep/1UHSK+JAtzzbNLQjDcqN+gQPg/yUX6ih0j5K3Wvh9bK +E/DvzbpJroUZPgzR+8z5O68CfsD+OIdpHBFTKqAFmzvUuqpADpr998LdCjD+lW+V +xZZgZa8KEblwgiH3fdGbYl46Ho1zrZisf439DbqyybAuBIQB4NSZcL/MAgVGO17k +QDpVElWZWYrFm4CFTcvS2HvIzRmbefF5m5oJedsN7Q6WQCp+3gnwYx1xIOknd7pW +N4AHNnqjscSj9yACj/EiBVKAKNnC5H7ZGZTsaAjMETZyjLXfI2AZ3Fviz4zFR+oz +NkAfFB6WUpRpl7H02FzrzYT7XkkLcXd6H6g+mv2iDa9uKWk/PS2QlqnJt8/dHEHD +JKTG331yDK5GHlKAVGF3nP5BwFGgTQMuSoeiOervMXPUwDpQ8OaYkuaRej0cZLgT +kAF9sUjqdsoYNcXDFHALp6y5g8qYkfrxrlIbKs82zIsmB5I+dtZbUaD3a0zAUrmW +5Xm3Pc9dVP0EXKwfHz6zqPReEw2yYLisB5IoHd4M2wa3GzHBdra1ij4QTmvd3o7e +buGFoX8KJQAcig0zpbYkoDP2gPhIh9rY4unVPQNX1Q8/wRsiJAZZsYvZY+A+SmuZ +bwSwk+8ZJRsFzdYYYhQeRytD5cDAIQiClcI5Yj4T9dWQV/gf0N/wIBDNTMp0jJAy +1l7PuXTfGZodNJWZH0oqsrNoWbn/k67NildvvofIKX+h09Nxszr670Pvj0qoHd5/ +CWq30lnxoJBUgbikFOz6ZuuHi/ZiCXL+haH+v8hJKN5ptRKnyYJQHchRB/IOGRoT +5lmWxo8a7K+yXhp0VBDHJfw3685ms0xQX8Xj4X3MEuN64zd0fB1JmhtP12ydK85J +ABawNKlRQPw5weckwtCviXQX+vX25S/xu3xA6IuqlHyqL/1t3DICzuxeOyT2mZxD +tKQxEgNihPvu32vn9m74qA3adEaxuWPRkPZuTeITHOkMTZolvqYX/5olBsSgYwka +7/g= +-----END ENCRYPTED PRIVATE KEY----- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/examples/ssl-certs/tserver-certificate.p12 ---------------------------------------------------------------------- diff --git a/c/examples/ssl-certs/tserver-certificate.p12 b/c/examples/ssl-certs/tserver-certificate.p12 new file mode 100644 index 0000000..f38b67d Binary files /dev/null and b/c/examples/ssl-certs/tserver-certificate.p12 differ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/examples/ssl-certs/tserver-certificate.pem ---------------------------------------------------------------------- diff --git a/c/examples/ssl-certs/tserver-certificate.pem b/c/examples/ssl-certs/tserver-certificate.pem new file mode 100644 index 0000000..86231f3 --- /dev/null +++ b/c/examples/ssl-certs/tserver-certificate.pem @@ -0,0 +1,19 @@ +-----BEGIN CERTIFICATE----- +MIIDKzCCAhOgAwIBAgIJAPnYOOQCJ3kDMA0GCSqGSIb3DQEBCwUAMCwxFDASBgNV +BAMMC3Rlc3Rfc2VydmVyMRQwEgYDVQQLDAtwcm90b25fdGVzdDAeFw0xNTExMjcx +ODEwMzlaFw0yNTExMjQxODEwMzlaMCwxFDASBgNVBAMMC3Rlc3Rfc2VydmVyMRQw +EgYDVQQLDAtwcm90b25fdGVzdDCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoC +ggEBAKJNB78lgw4KtXDAvXocTLud6mbn6zgfB6ETIF+kcrukOH9DnPxjLBBM4Lig +sp1+kmeudFK5/X8riDrvIW52b/rlEBLgLB+oDtI74m6OTbBs9L+FUFYOuxApetQF +qoJy2vf9pWfy4uku24vCpeo7eVLi6ypu4lXE3LR+Km3FruHI1NKonHBMhwXSOWqF +pYM6/4IZJ4fbV0+eU0Jrx+05s6XHg5vone2BVJKxeSIBje+zWnNnh8+qG0Z70Jgp +aMetME5KGnLNgD1okpH0vb3lwjvuqkkx4WswGVZGbLLkSqqBpXPyM9fCFVy5aKSL +DBq7IABQtO67O2nBzK3OyigHrUUCAwEAAaNQME4wHQYDVR0OBBYEFGV1PY0FCFbJ +gpcDVKI6JGiRTt3kMB8GA1UdIwQYMBaAFGV1PY0FCFbJgpcDVKI6JGiRTt3kMAwG +A1UdEwQFMAMBAf8wDQYJKoZIhvcNAQELBQADggEBAIx1TOTGWnnbpan4bse7wuvH +GYSNDJhoTVS+X1TC63xukJD1JBAsCNTqg/ZV6lN3XEl7vvOXfGoCiyXM6a9XOKUo +gSDtMrIr+wTh6Ss1yRO8QcCJmxH5JDXNu1ojtwsjFW/vneI4IL9kwpDsSlMQEX/E +EkkQwtAx/Cvfe7pecZL4qSeykJOUMTts9H8fCAZqEiRZBA3ugJxqF8jwLP3DoFVQ +6QZzKDY6CSPqfMnVb5i0MAIYVDpau+e3N9dgQpZD22F/zbua0OVbfAPdiRMnYxML +FT4sxLnh+5YVqwpVWbEKp4onHe2Fq6YIvAxUYAJ3SBA2C8O2RAVKWxf1jko3jYI= +-----END CERTIFICATE----- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/examples/ssl-certs/tserver-full.p12 ---------------------------------------------------------------------- diff --git a/c/examples/ssl-certs/tserver-full.p12 b/c/examples/ssl-certs/tserver-full.p12 new file mode 100644 index 0000000..d4a0e40 Binary files /dev/null and b/c/examples/ssl-certs/tserver-full.p12 differ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/examples/ssl-certs/tserver-private-key.pem ---------------------------------------------------------------------- diff --git a/c/examples/ssl-certs/tserver-private-key.pem b/c/examples/ssl-certs/tserver-private-key.pem new file mode 100644 index 0000000..91dcf0e --- /dev/null +++ b/c/examples/ssl-certs/tserver-private-key.pem @@ -0,0 +1,30 @@ +-----BEGIN ENCRYPTED PRIVATE KEY----- +MIIFDjBABgkqhkiG9w0BBQ0wMzAbBgkqhkiG9w0BBQwwDgQI1cT0c2J3GcQCAggA +MBQGCCqGSIb3DQMHBAi1hxSX2LJ+EgSCBMheHJ0iXr5A36Natjk/LcAEeKUMT9s+ +sMzoQceCWe8qMlQluWksr9iDdZ4JRIE8cpK8dbmx4dLY/SShUzdlhJHCSa4zZBHq +8cZ/jGUF/RF1rqdgjK589eUq+uOl3/gXKzG/SxBqayy6PSn12kX3qnvmlkXCmtwU +lg+iBm5wRcJ0MyVHaJkyA8sW8gr186C/VAau6Yu0crQXN7NRo9snrd4ewuYMIEhZ +hgaG9XsYQWB1bPhAaKj80CZGxsQbJyTwcbKKkB3IY4WXx8mmhuiNl+vKT3HBJ9Ju +YB6tgIjs8CJ4X2P4aU3yNJwG1QldgHSqmFGQ19bcZAw3s3kzwjdzRf4H2V16XOBd +zQ5AEs/ffVMzMIAfkb1gYwgunZ2CVwwDJ2mi1RcgkX+Og2aFQc+fxXcVOnDcGLxV +6fuCuZ2lsXfoiIyRh9kj3L75N12GtVUvgBdnMuOc1wPw6XnGQtDwt0acJpdexLMG +k0j57r/gcgzTcmF3qNM+y9L/HLssgrJkvVJw2Np5gmtIyfDocsDUWUbClS4dTpYf +oTngUTU+vWtHBuaUnb+f5/WJaRS/S7mmR8usbVG3i9WnEr/vlPJpbJFSjW2S6u/H +7cFxKUmmBZsSuEv/EKt9a+Sh62kprOChm4myqfCI1/gvNKfUZC6m0Vp8zf+2LgAq +2RgbMuqysMjWUtV4kDRZT7oCYckUDwsCHdbLES3nmVrtBk2ShMKHBpDp8/GoRuiV +jdV7/EjKM/M1kXtFYYe3z7Mxv++lKYIJ7bNwVrQ8nrhce/VwHw6D5emWXNCJXhKZ +FW7EM2ZOZ9eaKOlCsIi8sbjV6Yie9IY6HJKKmi3CpO0Tv5kLBdHkru8vGCSFm3O1 +n7wz7Ys5FBSlZ19X0NwQSCQX1Q4w+tido6i1SCRX0qJEdTNGuGwVXMHCf4/1zyHV +hj8vnxh8fzo79LFrwlTTgwLg1Mr8sEUFFDJ/raJ1AhFXi8n24trtNR8EHxRW8wtD +CLCKaqkEqfBiFXK/Yq3RrefCayPHiD+DaNsI8BwefMGpED3vD8YYCjAzXNPh/CSF +sc1i1jWMzbJhzOoFSPNXhlfusbUFMFQ/6olatmH47SY6HBBOL3DDP5uQ0jw8P454 +QBjlMOpEZmZxO6TcEtJwu0vzgog4rQ5g3NWy6SIpjWehNwTynLt7yM3R5WTI6cZs +0GTv/rqo2/SUoNsFmnGIUwj/DrBe4XOAq1nS2ZlEctxKhBsKH0hMFp6D1rXOzrgl +bwcq+oistoB0TLcThShyNgSqzW1znQ1n5SVUk9b5rRhSttJxn3yOMewH0i3v8bPo +HOhP5kaGjblPsCYyhlL/SNVF0OXEGTwLNey7FQdWFOwVwTRRXe7k+uGZ2d5hg+Jn +It/trDZ1RDYbVmB7/Qy73c16J4mvhOUJ2de5ZciFBjkidbiiUKLj9xnjK9k9Sauo +MKhNnDMAEU5VDQM3xNe5BRdX8dFLwfF5H64sU3nROF83aUnDgvfFEowYPnCuPYfm +m4aQHfoBSg4j3v1OeOwktcl+Q2TjxPHfWhbWeRBfxOTqQ/suYhnQChuFSK/qyo9K +ccgotqghhunRsWMoZT25H7AZM6yKb1sMz/0oyMRIKeGqoYh+ULM5XLY0xNYd4/xU +WtQ= +-----END ENCRYPTED PRIVATE KEY----- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/examples/thread.h ---------------------------------------------------------------------- diff --git a/c/examples/thread.h b/c/examples/thread.h new file mode 100644 index 0000000..e675ed7 --- /dev/null +++ b/c/examples/thread.h @@ -0,0 +1,70 @@ +#ifndef _PROTON_EXAMPLES_C_THREADS_H +#define _PROTON_EXAMPLES_C_THREADS_H 1 + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* EXAMPLE USE ONLY. Simulate the subset of POSIX threads used by examples for windows */ + +#ifdef _WIN32 +#include <windows.h> +#include <process.h> + +typedef struct { + HANDLE handle; + void *(*func)(void *); + void *arg; +} pthread_t; + +static unsigned __stdcall pthread_run(void *thr0) { + pthread_t *t = (pthread_t *) thr0; + t->func(t->arg); + return 0; +} + +static int pthread_create(pthread_t *t, void *unused, void *(*f)(void *), void *arg) { + t->func = f; + t->arg = arg; + t->handle = (HANDLE) _beginthreadex(0, 0, &pthread_run, t, 0, 0); + if (t->handle) { + return 0; + } + return -1; +} + +static int pthread_join(pthread_t t, void **unused) { + if (t.handle) { + WaitForSingleObject(t.handle, INFINITE); + CloseHandle(t.handle); + } + return 0; +} + +typedef CRITICAL_SECTION pthread_mutex_t; +#define pthread_mutex_init(m, unused) InitializeCriticalSectionAndSpinCount(m, 4000) +#define pthread_mutex_destroy(m) DeleteCriticalSection(m) +#define pthread_mutex_lock(m) EnterCriticalSection(m) +#define pthread_mutex_unlock(m) LeaveCriticalSection(m) + +#else + +#include <pthread.h> + +#endif + +#endif /* thread.h */ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/include/proton/cid.h ---------------------------------------------------------------------- diff --git a/c/include/proton/cid.h b/c/include/proton/cid.h new file mode 100644 index 0000000..e0766a0 --- /dev/null +++ b/c/include/proton/cid.h @@ -0,0 +1,76 @@ +#ifndef PROTON_CID_H +#define PROTON_CID_H 1 + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/** + * @cond INTERNAL + */ + +typedef enum { + CID_pn_object = 1, + CID_pn_void, + CID_pn_weakref, + + CID_pn_string, + CID_pn_list, + CID_pn_map, + CID_pn_hash, + CID_pn_record, + + CID_pn_collector, + CID_pn_event, + + CID_pn_encoder, + CID_pn_decoder, + CID_pn_data, + + CID_pn_connection, + CID_pn_session, + CID_pn_link, + CID_pn_delivery, + CID_pn_transport, + + CID_pn_message, + + CID_pn_reactor, + CID_pn_handler, + CID_pn_timer, + CID_pn_task, + + CID_pn_io, + CID_pn_selector, + CID_pn_selectable, + + CID_pn_url, + + CID_pn_listener, + CID_pn_proactor, + + CID_pn_listener_socket +} pn_cid_t; + +/** + * @endcond + */ + +#endif /* cid.h */ --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org