http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/bindings/cpp/src/io/connection_engine.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/io/connection_engine.cpp b/proton-c/bindings/cpp/src/io/connection_engine.cpp deleted file mode 100644 index 5e6483f..0000000 --- a/proton-c/bindings/cpp/src/io/connection_engine.cpp +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include "proton/io/connection_engine.hpp" - -#include "proton/event_loop.hpp" -#include "proton/error.hpp" -#include "proton/messaging_handler.hpp" -#include "proton/uuid.hpp" - -#include "contexts.hpp" -#include "messaging_adapter.hpp" -#include "msg.hpp" -#include "proton_bits.hpp" -#include "proton_event.hpp" - -#include <proton/connection.h> -#include <proton/transport.h> -#include <proton/event.h> - -#include <algorithm> - - -namespace proton { -namespace io { - -void connection_engine::init() { - if (pn_connection_engine_init(&engine_, pn_connection(), pn_transport()) != 0) { - this->~connection_engine(); // Dtor won't be called on throw from ctor. - throw proton::error(std::string("connection_engine allocation failed")); - } -} - -connection_engine::connection_engine() : handler_(0), container_(0) { init(); } - -connection_engine::connection_engine(class container& cont, event_loop* loop) : handler_(0), container_(&cont) { - init(); - connection_context& ctx = connection_context::get(connection()); - ctx.container = container_; - ctx.event_loop.reset(loop); -} - -connection_engine::~connection_engine() { - pn_connection_engine_destroy(&engine_); -} - -void connection_engine::configure(const connection_options& opts, bool server) { - proton::connection c(connection()); - opts.apply_unbound(c); - if (server) pn_transport_set_server(engine_.transport); - pn_connection_engine_bind(&engine_); - opts.apply_bound(c); - handler_ = opts.handler(); - connection_context::get(connection()).collector = engine_.collector; -} - -void connection_engine::connect(const connection_options& opts) { - connection_options all; - if (container_) { - all.container_id(container_->id()); - all.update(container_->client_connection_options()); - } - all.update(opts); - configure(all, false); - connection().open(); -} - -void connection_engine::accept(const connection_options& opts) { - connection_options all; - if (container_) { - all.container_id(container_->id()); - all.update(container_->server_connection_options()); - } - all.update(opts); - configure(all, true); -} - -bool connection_engine::dispatch() { - pn_event_t* c_event; - while ((c_event = pn_connection_engine_event(&engine_)) != NULL) { - proton_event cpp_event(c_event, container_); - try { - if (handler_ != 0) { - messaging_adapter adapter(*handler_); - cpp_event.dispatch(adapter); - } - } catch (const std::exception& e) { - pn_condition_t *cond = pn_transport_condition(engine_.transport); - if (!pn_condition_is_set(cond)) { - pn_condition_format(cond, "exception", "%s", e.what()); - } - } - pn_connection_engine_pop_event(&engine_); - } - return !pn_connection_engine_finished(&engine_); -} - -mutable_buffer connection_engine::read_buffer() { - pn_rwbytes_t buffer = pn_connection_engine_read_buffer(&engine_); - return mutable_buffer(buffer.start, buffer.size); -} - -void connection_engine::read_done(size_t n) { - return pn_connection_engine_read_done(&engine_, n); -} - -void connection_engine::read_close() { - pn_connection_engine_read_close(&engine_); -} - -const_buffer connection_engine::write_buffer() { - pn_bytes_t buffer = pn_connection_engine_write_buffer(&engine_); - return const_buffer(buffer.start, buffer.size); -} - -void connection_engine::write_done(size_t n) { - return pn_connection_engine_write_done(&engine_, n); -} - -void connection_engine::write_close() { - pn_connection_engine_write_close(&engine_); -} - -void connection_engine::disconnected(const proton::error_condition& err) { - pn_condition_t* condition = pn_transport_condition(engine_.transport); - if (!pn_condition_is_set(condition)) { - set_error_condition(err, condition); - } - pn_connection_engine_close(&engine_); -} - -proton::connection connection_engine::connection() const { - return make_wrapper(engine_.connection); -} - -proton::transport connection_engine::transport() const { - return make_wrapper(engine_.transport); -} - -proton::container* connection_engine::container() const { - return container_; -} - -}}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/bindings/cpp/src/receiver.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/receiver.cpp b/proton-c/bindings/cpp/src/receiver.cpp index b84722c..e5ec55a 100644 --- a/proton-c/bindings/cpp/src/receiver.cpp +++ b/proton-c/bindings/cpp/src/receiver.cpp @@ -74,7 +74,7 @@ void receiver::drain() { // Create dummy flow event where "drain finish" can be detected. pn_connection_t *pnc = pn_session_connection(pn_link_session(pn_object())); connection_context& cctx = connection_context::get(pnc); - // connection_engine collector is per connection. Reactor collector is global. + // connection_driver collector is per connection. Reactor collector is global. pn_collector_t *coll = cctx.collector; if (!coll) coll = pn_reactor_collector(pn_object_reactor(pnc)); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/bindings/cpp/src/thread_safe_test.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/thread_safe_test.cpp b/proton-c/bindings/cpp/src/thread_safe_test.cpp index f8dc3d8..5b5d487 100644 --- a/proton-c/bindings/cpp/src/thread_safe_test.cpp +++ b/proton-c/bindings/cpp/src/thread_safe_test.cpp @@ -24,7 +24,7 @@ #include "proton_bits.hpp" #include "proton/thread_safe.hpp" -#include "proton/io/connection_engine.hpp" +#include "proton/io/connection_driver.hpp" #include <proton/connection.h> @@ -37,7 +37,7 @@ void test_new() { pn_connection_t* c = 0; thread_safe<connection>* p = 0; { - io::connection_engine e; + io::connection_driver e; c = unwrap(e.connection()); int r = pn_refcount(c); ASSERT(r >= 1); // engine may have internal refs (transport, collector). @@ -54,7 +54,7 @@ void test_new() { { std::shared_ptr<thread_safe<connection> > sp; { - io::connection_engine e; + io::connection_driver e; c = unwrap(e.connection()); sp = make_shared_thread_safe(e.connection()); } @@ -63,7 +63,7 @@ void test_new() { { std::unique_ptr<thread_safe<connection> > up; { - io::connection_engine e; + io::connection_driver e; c = unwrap(e.connection()); up = make_unique_thread_safe(e.connection()); } @@ -78,7 +78,7 @@ void test_convert() { connection c; pn_connection_t* pc = 0; { - io::connection_engine eng; + io::connection_driver eng; c = eng.connection(); pc = unwrap(c); // Unwrap in separate scope to avoid confusion from temp values. } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/docs/api/index.md ---------------------------------------------------------------------- diff --git a/proton-c/docs/api/index.md b/proton-c/docs/api/index.md index ccd679d..9c6009f 100644 --- a/proton-c/docs/api/index.md +++ b/proton-c/docs/api/index.md @@ -5,35 +5,31 @@ Proton Documentation {#index} The [Engine API](@ref engine) is a "pure AMQP" toolkit, it decodes AMQP bytes into proton [events](@ref event) and generates AMQP bytes from application -calls. +calls. There is no IO or threading code in this part of the library. -The [connection engine](@ref connection_engine) provides a simple bytes in/bytes -out, event-driven interface so you can read AMQP data from any source, process -the resulting [events](@ref event) and write AMQP output to any destination. +## Proactive event-driven programming -There is no IO or threading code in this part of the library, so it can be -embedded in many different environments. The proton project provides language -bindings (Python, Ruby, Go etc.) that embed it into the standard IO and -threading facilities of the bound language. +The [Proactor API](@ref proactor) is a pro-active, asynchronous framework to +build single or multi-threaded Proton C applications. It manages the IO +transport layer so you can write portable, event-driven AMQP code using the @ref +engine API. -## Integrating with IO +## IO Integration -The [Proactor API](@ref proactor) is a pro-active, asynchronous framewokr to -build single or multi-threaded Proton C applications. +The [connection driver](@ref connection_driver) provides a simple bytes in/bytes +out, event-driven interface so you can read AMQP data from any source, process +the resulting [events](@ref event) and write AMQP output to any destination. It +lets you use proton in in alternate event loops, or for specialized embedded +applications. -For advanced use-cases it is possible to write your own implementation of the -proactor API for an unusual IO or threading framework. Any proton application +It is also possible to write your own implementation of the @ref proactor if you +are dealing with an unusual IO or threading framework. Any proton application written to the proactor API will be able to use your implementation. -## Messenger and Reactor APIs - -The [Messenger](@ref messenger) [Reactor](@ref reactor) APIs were intended -to be simple APIs that included IO support directly out of the box. +## Messenger and Reactor APIs (deprecated) -They both had good points but were both based on the assumption of a single-threaded -environment using a POSIX-like poll() call. This was a problem for performance on some -platforms and did not support multi-threaded applications. +The [Messenger](@ref messenger) [Reactor](@ref reactor) APIs are older APIs +that were limited to single-threaded applications. -Note however that application code which interacts with the AMQP @ref engine and -processes AMQP @ref "events" event is the same for the proactor and reactor APIs, -so is quite easy to convert. The main difference is in how connections are set up. +Existing @ref reactor applications can be converted easily to use the @ref proactor, +since they share the same @engine API and @ref event set. http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/include/proton/connection.h ---------------------------------------------------------------------- diff --git a/proton-c/include/proton/connection.h b/proton-c/include/proton/connection.h index 0ed23b0..70fad73 100644 --- a/proton-c/include/proton/connection.h +++ b/proton-c/include/proton/connection.h @@ -38,7 +38,7 @@ extern "C" { /** * @file * - * Connection API for the proton Engine. + * Connection API for the proton @ref engine * * @defgroup connection Connection * @ingroup engine http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/include/proton/connection_driver.h ---------------------------------------------------------------------- diff --git a/proton-c/include/proton/connection_driver.h b/proton-c/include/proton/connection_driver.h new file mode 100644 index 0000000..4fa3fb9 --- /dev/null +++ b/proton-c/include/proton/connection_driver.h @@ -0,0 +1,243 @@ +#ifndef PROTON_CONNECTION_DRIVER_H +#define PROTON_CONNECTION_DRIVER_H + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * @file + * + * @defgroup connection_driver Connection Driver + * + * **Experimental**: Toolkit for integrating proton with arbitrary network or IO + * transports. Provides a single point of control for an AMQP connection and + * a simple bytes-in/bytes-out interface that lets you: + * + * - process AMQP-encoded bytes from some input byte stream + * - generate ::pn_event_t events for your application to handle + * - encode resulting AMQP output bytes for some output byte stream + * + * The pn_connection_driver_() functions provide a simplified API and extra + * logic to use ::pn_connection_t and ::pn_transport_t as a unit. You can also + * access them directly for features that are not exposed via the @ref + * connection_driver API. + * + * The engine buffers events and data, you should run it until + * pn_connection_driver_finished() is true, to ensure all reading, writing and + * event handling (including ERROR and FINAL events) is finished. + * + * ## Error handling + * + * The pn_connection_driver_*() functions do not return an error code. IO errors set + * the transport condition and are returned as a PN_TRANSPORT_ERROR. The integration + * code can set errors using pn_connection_driver_errorf() + * + * ## IO patterns + * + * This API supports asynchronous, proactive, non-blocking and reactive IO. An + * integration does not have to follow the dispatch-read-write sequence above, + * but note that you should handle all available events before calling + * pn_connection_driver_read_buffer() and check that `size` is non-zero before + * starting a blocking or asynchronous read call. A `read` started while there + * are unprocessed CLOSE events in the buffer may never complete. + * + * AMQP is a full-duplex, asynchronous protocol. The "read" and "write" sides of + * an AMQP connection can close separately + * + * ## Thread safety + * + * The @ref engine types are not thread safe, but each connection and its + * associated types forms an independent unit. Different connections can be + * processed concurrently by different threads. + * + * @{ + */ + +#include <proton/import_export.h> +#include <proton/event.h> +#include <proton/types.h> + +#include <stdarg.h> + +#ifdef __cplusplus +extern "C" { +#endif + +/** + * Struct containing the 3 elements needed to driver AMQP IO and events, aggregated as a unit. + */ +typedef struct pn_connection_driver_t { + pn_connection_t *connection; + pn_transport_t *transport; + pn_event_batch_t batch; +} pn_connection_driver_t; + +/** + * Set #connection and #transport to the provided values, or create a new + * @ref pn_connection_t or @ref pn_transport_t if either is NULL. + * The provided values belong to the connection driver and will be freed by + * pn_connection_driver_destroy() + * + * The transport is bound automatically after the PN_CONNECTION_INIT has been is + * handled by the application. It can be bound earlier with + * pn_connection_driver_bind(). + * + * The following functions must be called before the transport is + * bound to have effect: pn_connection_set_username(), pn_connection_set_password(), + * pn_transport_set_server() + * + * @return PN_OUT_OF_MEMORY if any allocation fails. + */ +PN_EXTERN int pn_connection_driver_init(pn_connection_driver_t*, pn_connection_t*, pn_transport_t*); + +/** Force binding of the transport. + * This happens automatically after the PN_CONNECTION_INIT is processed. + * + * @return PN_STATE_ERR if the transport is already bound. + */ +PN_EXTERN int pn_connection_driver_bind(pn_connection_driver_t *d); + +/** + * Unbind, release and free #connection and #transport. Set all pointers to + * NULL. Does not free the @ref pn_connection_driver_t struct itself. + */ +PN_EXTERN void pn_connection_driver_destroy(pn_connection_driver_t *); + +/** + * Get the read buffer. + * + * Copy data from your input byte source to buf.start, up to buf.size. + * Call pn_connection_driver_read_done() when reading is complete. + * + * buf.size==0 means reading is not possible: no buffer space or the read side is closed. + */ +PN_EXTERN pn_rwbytes_t pn_connection_driver_read_buffer(pn_connection_driver_t *); + +/** + * Process the first n bytes of data in pn_connection_driver_read_buffer() and + * reclaim the buffer space. + */ +PN_EXTERN void pn_connection_driver_read_done(pn_connection_driver_t *, size_t n); + +/** + * Close the read side. Call when the IO can no longer be read. + */ +PN_EXTERN void pn_connection_driver_read_close(pn_connection_driver_t *); + +/** + * True if read side is closed. + */ +PN_EXTERN bool pn_connection_driver_read_closed(pn_connection_driver_t *); + +/** + * Get the write buffer. + * + * Write data from buf.start to your IO destination, up to a max of buf.size. + * Call pn_connection_driver_write_done() when writing is complete. + * + * buf.size==0 means there is nothing to write. + */ + PN_EXTERN pn_bytes_t pn_connection_driver_write_buffer(pn_connection_driver_t *); + +/** + * Call when the first n bytes of pn_connection_driver_write_buffer() have been + * written to IO. Reclaims the buffer space and reset the write buffer. + */ +PN_EXTERN void pn_connection_driver_write_done(pn_connection_driver_t *, size_t n); + +/** + * Close the write side. Call when IO can no longer be written to. + */ +PN_EXTERN void pn_connection_driver_write_close(pn_connection_driver_t *); + +/** + * True if write side is closed. + */ +PN_EXTERN bool pn_connection_driver_write_closed(pn_connection_driver_t *); + +/** + * Close both sides side. + */ +PN_EXTERN void pn_connection_driver_close(pn_connection_driver_t * c); + +/** + * Get the next event to handle. + * + * @return pointer is valid till the next call of + * pn_connection_driver_next(). NULL if there are no more events available now, + * reading/writing may produce more. + */ +PN_EXTERN pn_event_t* pn_connection_driver_next_event(pn_connection_driver_t *); + +/** + * True if pn_connection_driver_next_event() will return a non-NULL event. + */ +PN_EXTERN bool pn_connection_driver_has_event(pn_connection_driver_t *); + +/** + * Return true if the the engine is closed for reading and writing and there are + * no more events. + * + * Call pn_connection_driver_free() to free all related memory. + */ +PN_EXTERN bool pn_connection_driver_finished(pn_connection_driver_t *); + +/** + * Set IO error information. + * + * The name and formatted description are set on the transport condition, and + * returned as a PN_TRANSPORT_ERROR event from pn_connection_driver_next_event(). + * + * You must call this *before* pn_connection_driver_read_close() or + * pn_connection_driver_write_close() to ensure the error is processed. + */ +PN_EXTERN void pn_connection_driver_errorf(pn_connection_driver_t *d, const char *name, const char *fmt, ...); + +/** + * Set IO error information via a va_list, see pn_connection_driver_errorf() + */ +PN_EXTERN void pn_connection_driver_verrorf(pn_connection_driver_t *d, const char *name, const char *fmt, va_list); + +/** + * Log a string message using the connection's transport log. + */ +PN_EXTERN void pn_connection_driver_log(pn_connection_driver_t *d, const char *msg); + +/** + * Log a printf formatted message using the connection's transport log. + */ +PN_EXTERN void pn_connection_driver_logf(pn_connection_driver_t *d, char *fmt, ...); + +/** + * Log a printf formatted message using the connection's transport log. + */ +PN_EXTERN void pn_connection_driver_vlogf(pn_connection_driver_t *d, const char *fmt, va_list ap); + +/** + * If batch is part of a connection_driver, return the connection_driver address, + * else return NULL + */ +PN_EXTERN pn_connection_driver_t* pn_event_batch_connection_driver(pn_event_batch_t *batch); +///@} + +#ifdef __cplusplus +} +#endif + +#endif // PROTON_CONNECTION_DRIVER_H http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/include/proton/connection_engine.h ---------------------------------------------------------------------- diff --git a/proton-c/include/proton/connection_engine.h b/proton-c/include/proton/connection_engine.h deleted file mode 100644 index b7022a9..0000000 --- a/proton-c/include/proton/connection_engine.h +++ /dev/null @@ -1,313 +0,0 @@ -#ifndef PROTON_CONNECTION_ENGINE_H -#define PROTON_CONNECTION_ENGINE_H - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * @file - * - * **Experimental** The connection IO API is a set of functions to simplify - * integrating proton with different IO and concurrency platforms. The portable - * parts of a Proton application should use the @ref engine types. We will - * use "application" to mean the portable part of the application and - * "integration" to mean code that integrates with a particular IO platform. - * - * The connection_engine functions take a @ref pn_connection_t\*, and perform common - * tasks involving the @ref pn_connection_t and it's @ref pn_transport_t and - * @ref pn_collector_t so you can treat them as a unit. You can also work with - * these types directly for features not available via @ref connection_engine API. - * - * @defgroup connection_engine Connection Engine - * - * **Experimental**: Toolkit for integrating proton with arbitrary network or IO - * transports. Provides a single point of control for an AMQP connection and - * a simple bytes-in/bytes-out interface that lets you: - * - * - process AMQP-encoded bytes from some input byte stream - * - generate @ref pn_event_t events for your application to handle - * - encode resulting AMQP output bytes for some output byte stream - * - * The engine contains a @ref pn_connection_t, @ref pn_transport_t and @ref - * pn_collector_t and provides functions to operate on all three as a unit for - * IO integration. You can also use them directly for anything not covered by - * this API - * - * For example a simple blocking IO integration with the imaginary "my_io" library: - * - * pn_connection_engine_t ce; - * pn_connection_engine_init(&ce); - * while (!pn_connection_engine_finished(&ce) { - * // Dispatch events to be handled by the application. - * pn_event_t *e; - * while ((e = pn_connection_engine_event(&ce))!= NULL) { - * my_app_handle(e); // Pass to the application handler - * switch (pn_event_type(e)) { - * case PN_CONNECTION_INIT: pn_connection_engine_bind(&ce); - * // Only for full-duplex IO where read/write can shutdown separately. - * case PN_TRANSPORT_CLOSE_READ: my_io_shutdown_read(...); break; - * case PN_TRANSPORT_CLOSE_WRITE: my_io_shutdown_write(...); break; - * default: break; - * }; - * e = pn_connection_engine_pop_event(&ce); - * } - * // Read from my_io into the connection buffer - * pn_rwbytes_t readbuf = pn_connection_engine_read_buffer(&ce); - * if (readbuf.size) { - * size_t n = my_io_read(readbuf.start, readbuf.size, ...); - * if (n > 0) { - * pn_connection_engine_read_done(&ce, n); - * } else if (n < 0) { - * pn_connection_engine_errorf(&ce, "read-err", "something-bad (%d): %s", n, ...); - * pn_connection_engine_read_close(&ce); - * } - * } - * // Write from connection buffer to my_io - * pn_bytes_t writebuf = pn_connection_engine_write_buffer(&ce); - * if (writebuf.size) { - * size_t n = my_io_write_data(writebuf.start, writebuf.size, ...); - * if (n < 0) { - * pn_connection_engine_errorf(&ce, "write-err", "something-bad (%d): %s", d, ...); - * pn_connection_engine_write_close(&ce); - * } else { - * pn_connection_engine_write_done(&ce, n); - * } - * } - * } - * // If my_io doesn't have separate read/write shutdown, then we should close it now. - * my_io_close(...); - * - * AMQP is a full-duplex, asynchronous protocol. The "read" and "write" sides of - * an AMQP connection can close separately, the example shows how to handle this - * for full-duplex IO or IO with a simple close. - * - * The engine buffers events, you must keep processing till - * pn_connection_engine_finished() is true, to ensure all reading, writing and event - * handling (including ERROR and FINAL events) is completely finished. - * - * ## Error handling - * - * The pn_connection_engine_*() functions do not return an error code. IO errors set - * the transport condition and are returned as a PN_TRANSPORT_ERROR. The integration - * code can set errors using pn_connection_engine_errorf() - * - * ## Other IO patterns - * - * This API supports asynchronous, proactive, non-blocking and reactive IO. An - * integration does not have to follow the dispatch-read-write sequence above, - * but note that you should handle all available events before calling - * pn_connection_engine_read_buffer() and check that `size` is non-zero before - * starting a blocking or asynchronous read call. A `read` started while there - * are unprocessed CLOSE events in the buffer may never complete. - * - * ## Thread safety - * - * The @ref engine types are not thread safe, but each connection and its - * associated types forms an independent unit. Different connections can be - * processed concurrently by different threads. - * - * @defgroup connection_engine Connection IO - * @{ - */ - -#include <proton/import_export.h> -#include <proton/event.h> -#include <proton/types.h> - -#include <stdarg.h> - -#ifdef __cplusplus -extern "C" { -#endif - -/** - * Struct containing a connection, transport and collector. See - * pn_connection_engine_init(), pn_connection_engine_destroy() and pn_connection_engine() - */ -typedef struct pn_connection_engine_t { - pn_connection_t *connection; - pn_transport_t *transport; - pn_collector_t *collector; -} pn_connection_engine_t; - -/** - * Set #connection and #transport to the provided values, or create a new - * @ref pn_connection_t or @ref pn_transport_t if either is NULL. - * The provided values belong to the connection engine and will be freed by - * pn_connection_engine_destroy() - * - * Create a new @ref pn_collector_t and set as #collector. - * - * The transport and connection are *not* bound at this point. You should - * configure them as needed and let the application handle the - * PN_CONNECTION_INIT from pn_connection_engine_event() before calling - * pn_connection_engine_bind(). - * - * @return if any allocation fails call pn_connection_engine_destroy() and return PN_OUT_OF_MEMORY - */ -PN_EXTERN int pn_connection_engine_init(pn_connection_engine_t*, pn_connection_t*, pn_transport_t*); - -/** - * Bind the connection to the transport when the external IO is ready. - * - * The following functions (if called at all) must be called *before* bind: - * pn_connection_set_username(), pn_connection_set_password(), pn_transport_set_server() - * - * If there is an external IO error during setup, set a transport error, close - * the transport and then bind. The error events are reported to the application - * via pn_connection_engine_event(). - * - * @return an error code if the bind fails. - */ -PN_EXTERN int pn_connection_engine_bind(pn_connection_engine_t *); - -/** - * Unbind, release and free #connection, #transpot and #collector. Set all pointers to NULL. - * Does not free the @ref pn_connection_engine_t struct itself. - */ -PN_EXTERN void pn_connection_engine_destroy(pn_connection_engine_t *); - -/** - * Get the read buffer. - * - * Copy data from your input byte source to buf.start, up to buf.size. - * Call pn_connection_engine_read_done() when reading is complete. - * - * buf.size==0 means reading is not possible: no buffer space or the read side is closed. - */ -PN_EXTERN pn_rwbytes_t pn_connection_engine_read_buffer(pn_connection_engine_t *); - -/** - * Process the first n bytes of data in pn_connection_engine_read_buffer() and - * reclaim the buffer space. - */ -PN_EXTERN void pn_connection_engine_read_done(pn_connection_engine_t *, size_t n); - -/** - * Close the read side. Call when the IO can no longer be read. - */ -PN_EXTERN void pn_connection_engine_read_close(pn_connection_engine_t *); - -/** - * True if read side is closed. - */ -PN_EXTERN bool pn_connection_engine_read_closed(pn_connection_engine_t *); - -/** - * Get the write buffer. - * - * Write data from buf.start to your IO destination, up to a max of buf.size. - * Call pn_connection_engine_write_done() when writing is complete. - * - * buf.size==0 means there is nothing to write. - */ - PN_EXTERN pn_bytes_t pn_connection_engine_write_buffer(pn_connection_engine_t *); - -/** - * Call when the first n bytes of pn_connection_engine_write_buffer() have been - * written to IO. Reclaims the buffer space and reset the write buffer. - */ -PN_EXTERN void pn_connection_engine_write_done(pn_connection_engine_t *, size_t n); - -/** - * Close the write side. Call when IO can no longer be written to. - */ -PN_EXTERN void pn_connection_engine_write_close(pn_connection_engine_t *); - -/** - * True if write side is closed. - */ -PN_EXTERN bool pn_connection_engine_write_closed(pn_connection_engine_t *); - -/** - * Close both sides side. - */ -PN_EXTERN void pn_connection_engine_close(pn_connection_engine_t * c); - -/** - * Get the current event. Call pn_connection_engine_done() when done handling it. - * Note that if PN_TRACE_EVT is enabled this will log the event, so you should - * avoid calling it more than once per event. Use pn_connection_engine_has_event() - * to silently test if any events are available. - * - * @return NULL if there are no more events ready. Reading/writing data may produce more. - */ -PN_EXTERN pn_event_t* pn_connection_engine_event(pn_connection_engine_t *); - -/** - * True if pn_connection_engine_event() will return a non-NULL event. - */ -PN_EXTERN bool pn_connection_engine_has_event(pn_connection_engine_t *); - -/** - * Drop the current event, advance pn_connection_engine_event() to the next event. - */ -PN_EXTERN void pn_connection_engine_pop_event(pn_connection_engine_t *); - -/** - * Return true if the the engine is closed for reading and writing and there are - * no more events. - * - * Call pn_connection_engine_free() to free all related memory. - */ -PN_EXTERN bool pn_connection_engine_finished(pn_connection_engine_t *); - -/** - * Set IO error information. - * - * The name and formatted description are set on the transport condition, and - * returned as a PN_TRANSPORT_ERROR event from pn_connection_engine_event(). - * - * You must call this *before* pn_connection_engine_read_close() or - * pn_connection_engine_write_close() to ensure the error is processed. - * - * If there is already a transport condition set, this call does nothing. For - * more complex cases, you can work with the transport condition directly using: - * - * pn_condition_t *cond = pn_transport_condition(pn_connection_transport(conn)); - */ -PN_EXTERN void pn_connection_engine_errorf(pn_connection_engine_t *ce, const char *name, const char *fmt, ...); - -/** - * Set IO error information via a va_list, see pn_connection_engine_errorf() - */ -PN_EXTERN void pn_connection_engine_verrorf(pn_connection_engine_t *ce, const char *name, const char *fmt, va_list); - -/** - * Log a string message using the connection's transport log. - */ -PN_EXTERN void pn_connection_engine_log(pn_connection_engine_t *ce, const char *msg); - -/** - * Log a printf formatted message using the connection's transport log. - */ -PN_EXTERN void pn_connection_engine_logf(pn_connection_engine_t *ce, char *fmt, ...); - -/** - * Log a printf formatted message using the connection's transport log. - */ -PN_EXTERN void pn_connection_engine_vlogf(pn_connection_engine_t *ce, const char *fmt, va_list ap); - -///@} - -#ifdef __cplusplus -} -#endif - -#endif // PROTON_CONNECTION_ENGINE_H http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/include/proton/cproton.i ---------------------------------------------------------------------- diff --git a/proton-c/include/proton/cproton.i b/proton-c/include/proton/cproton.i index ffcf830..931437e 100644 --- a/proton-c/include/proton/cproton.i +++ b/proton-c/include/proton/cproton.i @@ -39,6 +39,9 @@ typedef unsigned long int uintptr_t; %ignore pn_bytes_t; %ignore pn_rwbytes_t; +/* pn_event_batch_t is not used directly by bindings */ +%ignore pn_event_batch_t; + /* There is no need to wrap pn_class_t aa it is an internal implementation detail and cannot be used outside the library */ %ignore pn_class_t; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/include/proton/event.h ---------------------------------------------------------------------- diff --git a/proton-c/include/proton/event.h b/proton-c/include/proton/event.h index 4dca2d5..31d4bdd 100644 --- a/proton-c/include/proton/event.h +++ b/proton-c/include/proton/event.h @@ -428,6 +428,32 @@ PN_EXTERN pn_event_t *pn_collector_peek(pn_collector_t *collector); PN_EXTERN bool pn_collector_pop(pn_collector_t *collector); /** + * Return the next event to be handled. + * + * Returns the head event if it has not previously been returned by + * pn_collector_next(), otherwise does pn_collector_pop() and returns + * the new head event. + * + * The returned pointer is valid till the next call of pn_collector_pop(), + * pn_collector_next(), pn_collector_release() or pn_collector_free() + * + * @param[in] collector a collector object + * @return the next event. + */ +PN_EXTERN pn_event_t *pn_collector_next(pn_collector_t *collector); + +/** + * Return the same event as the previous call to pn_collector_next() + * + * The returned pointer is valid till the next call of pn_collector_pop(), + * pn_collector_next(), pn_collector_release() or pn_collector_free() + * + * @param[in] collector a collector object + * @return a pointer to the event returned by previous call to pn_collector_next() + */ +PN_EXTERN pn_event_t *pn_collector_prev(pn_collector_t *collector); + +/** * Check if there are more events after the current event. If this * returns true, then pn_collector_peek() will return an event even * after pn_collector_pop() is called. @@ -506,6 +532,36 @@ PN_EXTERN pn_transport_t *pn_event_transport(pn_event_t *event); */ PN_EXTERN pn_record_t *pn_event_attachments(pn_event_t *event); +/** + * **Experimental**: A batch of events to handle. Call pn_event_batch_next() in + * a loop until it returns NULL to handle them. + */ +typedef struct pn_event_batch_t pn_event_batch_t; + +/* NOTE: there is deliberately no peek(), more() or other look-ahead on an event + * batch. We want to know exactly which events have been handled, next() only + * allows the user to get each event exactly once, in order. + */ + +/** + * **Experimental**: Remove the next event from the batch and return it. NULL + * means the batch is empty. The returned event pointer is valid until + * pn_event_batch_next() is called again on the same batch. + */ +PN_EXTERN pn_event_t *pn_event_batch_next(pn_event_batch_t *batch); + +/** + *@cond INTERNAL + * pn_event_batch_next() can be re-implemented for different behaviors in different contextxs. + */ +struct pn_event_batch_t { + pn_event_t *(*next_event)(pn_event_batch_t *batch); +}; + +/** + *@endcond + */ + #ifdef __cplusplus } #endif http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/include/proton/proactor.h ---------------------------------------------------------------------- diff --git a/proton-c/include/proton/proactor.h b/proton-c/include/proton/proactor.h index 49d7b6a..e23a24f 100644 --- a/proton-c/include/proton/proactor.h +++ b/proton-c/include/proton/proactor.h @@ -33,30 +33,27 @@ typedef struct pn_condition_t pn_condition_t; /** * @file * - * **Experimental**: Proactor API for the the proton @ref engine. + * **Experimental**: Proactor API for the the proton @ref engine * * @defgroup proactor Proactor * * **Experimental**: Proactor API for portable, multi-threaded, asynchronous applications. * - * The proactor establishes and listens for connections. It creates the @ref - * "transport" transport that sends and receives data over the network and - * delivers @ref "events" event to application threads for processing. + * The proactor establishes and listens for connections. It creates + * the @ref transport that sends and receives data over the network and + * delivers @ref event to application threads for handling. * - * ## Multi-threading - * - * The @ref proactor is thread-safe, but the @ref "protocol engine" is not. The - * proactor ensures that each @ref "connection" connection and its associated - * values (@ref session, @ref link etc.) is processed sequentially, even if there - * are multiple application threads. See pn_proactor_wait() + * **Multi-threading**: + * The @ref proactor is thread-safe, but the @ref engine is not. The proactor + * ensures that each @ref connection and its associated values (@ref session, + * @ref link etc.) is handle sequentially, even if there are multiple + * application threads. See pn_proactor_wait() * * @{ */ /** - * The proactor creates and manage @ref "transports" transport and delivers @ref - * "event" events to the application. - * + * The proactor. */ typedef struct pn_proactor_t pn_proactor_t; @@ -70,13 +67,6 @@ pn_proactor_t *pn_proactor(void); */ void pn_proactor_free(pn_proactor_t*); -/* FIXME aconway 2016-11-12: connect and listen need options to enable - things like websockets, alternate encryption or other features. - The "extra" parameter will be replaced by an "options" parameter - that will include providing extra data and other manipulators - to affect how the connection is processed. -*/ - /** * Asynchronous connect: a connection and transport will be created, the * relevant events will be returned by pn_proactor_wait() @@ -104,13 +94,27 @@ int pn_proactor_connect(pn_proactor_t*, const char *host, const char *port, pn_b pn_listener_t *pn_proactor_listen(pn_proactor_t *, const char *host, const char *port, int backlog, pn_bytes_t extra); /** - * Wait for an event. Can be called in multiple threads concurrently. - * You must call pn_event_done() when the event has been handled. + * Wait for events to handle. Call pn_proactor_done() after handling events. + * + * Thread safe: pn_proactor_wait() can be called concurrently, but the events in + * the returned ::pn_event_batch_t must be handled sequentially. + * + * The proactor always returns events that must be handled sequentially in the + * same batch or sequentially in a later batch after pn_proactor_done(). Any + * events returned concurrently by pn_proactor_wait() are safe to handle + * concurrently. + */ +pn_event_batch_t *pn_proactor_wait(pn_proactor_t* d); + +/** + * Call when done handling events. * - * The proactor ensures that events that cannot be handled concurrently - * (e.g. events for for the same connection) are never returned concurrently. + * It is generally most efficient to handle the entire batch in the thread + * that calls pn_proactor_wait(), then call pn_proactor_done(). If you call + * pn_proactor_done() earlier, the remaining events will be returned again by + * pn_proactor_wait(), possibly to another thread. */ -pn_event_t *pn_proactor_wait(pn_proactor_t* d); +void pn_proactor_done(pn_proactor_t* d, pn_event_batch_t *events); /** * Cause PN_PROACTOR_INTERRUPT to be returned to exactly one thread calling wait() @@ -146,14 +150,6 @@ void pn_connection_wake(pn_connection_t *c); pn_proactor_t *pn_connection_proactor(pn_connection_t *c); /** - * Call when a proactor event has been handled. Does nothing if not a proactor event. - * - * Thread safe: May be called from any thread but must be called exactly once - * for each event returned by pn_proactor_wait() - */ -void pn_event_done(pn_event_t *); - -/** * Get the proactor that created the event or NULL. */ pn_proactor_t *pn_event_proactor(pn_event_t *); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/src/core/connection_driver.c ---------------------------------------------------------------------- diff --git a/proton-c/src/core/connection_driver.c b/proton-c/src/core/connection_driver.c index f31ddb0..3393e64 100644 --- a/proton-c/src/core/connection_driver.c +++ b/proton-c/src/core/connection_driver.c @@ -20,144 +20,149 @@ #include "engine-internal.h" #include <proton/condition.h> #include <proton/connection.h> -#include <proton/connection_engine.h> +#include <proton/connection_driver.h> #include <proton/transport.h> #include <string.h> -int pn_connection_engine_init(pn_connection_engine_t* ce, pn_connection_t *c, pn_transport_t *t) { - ce->connection = c ? c : pn_connection(); - ce->transport = t ? t : pn_transport(); - ce->collector = pn_collector(); - if (!ce->connection || !ce->transport || !ce->collector) { - pn_connection_engine_destroy(ce); +struct driver_batch { + pn_event_batch_t batch; +}; + +static pn_event_t *batch_next(pn_event_batch_t *batch) { + pn_connection_driver_t *d = + (pn_connection_driver_t*)((char*)batch - offsetof(pn_connection_driver_t, batch)); + pn_collector_t *collector = pn_connection_collector(d->connection); + pn_event_t *handled = pn_collector_prev(collector); + if (handled && pn_event_type(handled) == PN_CONNECTION_INIT) { + pn_transport_bind(d->transport, d->connection); /* Init event handled, auto-bind */ + } + pn_event_t *next = pn_collector_next(collector); + if (next && d->transport->trace & PN_TRACE_EVT) { + pn_string_clear(d->transport->scratch); + pn_inspect(next, d->transport->scratch); + pn_transport_log(d->transport, pn_string_get(d->transport->scratch)); + } + return next; +} + +int pn_connection_driver_init(pn_connection_driver_t* d, pn_connection_t *c, pn_transport_t *t) { + memset(d, 0, sizeof(*d)); + d->batch.next_event = &batch_next; + d->connection = c ? c : pn_connection(); + d->transport = t ? t : pn_transport(); + pn_collector_t *collector = pn_collector(); + if (!d->connection || !d->transport || !collector) { + if (collector) pn_collector_free(collector); + pn_connection_driver_destroy(d); return PN_OUT_OF_MEMORY; } - pn_connection_collect(ce->connection, ce->collector); + pn_connection_collect(d->connection, collector); return 0; } -int pn_connection_engine_bind(pn_connection_engine_t *ce) { - return pn_transport_bind(ce->transport, ce->connection); +int pn_connection_driver_bind(pn_connection_driver_t *d) { + return pn_transport_bind(d->transport, d->connection); } -void pn_connection_engine_destroy(pn_connection_engine_t *ce) { - if (ce->transport) { - pn_transport_unbind(ce->transport); - pn_transport_free(ce->transport); +void pn_connection_driver_destroy(pn_connection_driver_t *d) { + if (d->transport) { + pn_transport_unbind(d->transport); + pn_transport_free(d->transport); + } + if (d->connection) { + pn_collector_t *collector = pn_connection_collector(d->connection); + pn_connection_free(d->connection); + pn_collector_free(collector); } - if (ce->collector) pn_collector_free(ce->collector); - if (ce->connection) pn_connection_free(ce->connection); - memset(ce, 0, sizeof(*ce)); + memset(d, 0, sizeof(*d)); } -pn_rwbytes_t pn_connection_engine_read_buffer(pn_connection_engine_t *ce) { - ssize_t cap = pn_transport_capacity(ce->transport); - return (cap > 0) ? pn_rwbytes(cap, pn_transport_tail(ce->transport)) : pn_rwbytes(0, 0); +pn_rwbytes_t pn_connection_driver_read_buffer(pn_connection_driver_t *d) { + ssize_t cap = pn_transport_capacity(d->transport); + return (cap > 0) ? pn_rwbytes(cap, pn_transport_tail(d->transport)) : pn_rwbytes(0, 0); } -void pn_connection_engine_read_done(pn_connection_engine_t *ce, size_t n) { - if (n > 0) pn_transport_process(ce->transport, n); +void pn_connection_driver_read_done(pn_connection_driver_t *d, size_t n) { + if (n > 0) pn_transport_process(d->transport, n); } -bool pn_connection_engine_read_closed(pn_connection_engine_t *ce) { - return pn_transport_capacity(ce->transport) < 0; +bool pn_connection_driver_read_closed(pn_connection_driver_t *d) { + return pn_transport_capacity(d->transport) < 0; } -void pn_connection_engine_read_close(pn_connection_engine_t *ce) { - if (!pn_connection_engine_read_closed(ce)) { - pn_transport_close_tail(ce->transport); +void pn_connection_driver_read_close(pn_connection_driver_t *d) { + if (!pn_connection_driver_read_closed(d)) { + pn_transport_close_tail(d->transport); } } -pn_bytes_t pn_connection_engine_write_buffer(pn_connection_engine_t *ce) { - ssize_t pending = pn_transport_pending(ce->transport); +pn_bytes_t pn_connection_driver_write_buffer(pn_connection_driver_t *d) { + ssize_t pending = pn_transport_pending(d->transport); return (pending > 0) ? - pn_bytes(pending, pn_transport_head(ce->transport)) : pn_bytes_null; + pn_bytes(pending, pn_transport_head(d->transport)) : pn_bytes_null; } -void pn_connection_engine_write_done(pn_connection_engine_t *ce, size_t n) { +void pn_connection_driver_write_done(pn_connection_driver_t *d, size_t n) { if (n > 0) - pn_transport_pop(ce->transport, n); + pn_transport_pop(d->transport, n); } -bool pn_connection_engine_write_closed(pn_connection_engine_t *ce) { - return pn_transport_pending(ce->transport) < 0; +bool pn_connection_driver_write_closed(pn_connection_driver_t *d) { + return pn_transport_pending(d->transport) < 0; } -void pn_connection_engine_write_close(pn_connection_engine_t *ce) { - if (!pn_connection_engine_write_closed(ce)) { - pn_transport_close_head(ce->transport); +void pn_connection_driver_write_close(pn_connection_driver_t *d) { + if (!pn_connection_driver_write_closed(d)) { + pn_transport_close_head(d->transport); } } -void pn_connection_engine_close(pn_connection_engine_t *ce) { - pn_connection_engine_read_close(ce); - pn_connection_engine_write_close(ce); -} - -pn_event_t* pn_connection_engine_event(pn_connection_engine_t *ce) { - pn_event_t *e = ce->collector ? pn_collector_peek(ce->collector) : NULL; - if (e) { - pn_transport_t *t = ce->transport; - if (t && t->trace & PN_TRACE_EVT) { - /* This can log the same event twice if pn_connection_engine_event is called - * twice but for debugging it is much better to log before handling than after. - */ - pn_string_clear(t->scratch); - pn_inspect(e, t->scratch); - pn_transport_log(t, pn_string_get(t->scratch)); - } - } - return e; +void pn_connection_driver_close(pn_connection_driver_t *d) { + pn_connection_driver_read_close(d); + pn_connection_driver_write_close(d); } -bool pn_connection_engine_has_event(pn_connection_engine_t *ce) { - return ce->collector && pn_collector_peek(ce->collector); +pn_event_t* pn_connection_driver_next_event(pn_connection_driver_t *d) { + return pn_event_batch_next(&d->batch); } -void pn_connection_engine_pop_event(pn_connection_engine_t *ce) { - if (ce->collector) { - pn_event_t *e = pn_collector_peek(ce->collector); - if (pn_event_type(e) == PN_TRANSPORT_CLOSED) { /* The last event ever */ - /* Events can accumulate behind the TRANSPORT_CLOSED before the - * PN_TRANSPORT_CLOSED event is handled. They can never be processed - * so release them. - */ - pn_collector_release(ce->collector); - } else { - pn_collector_pop(ce->collector); - } - - } +bool pn_connection_driver_has_event(pn_connection_driver_t *d) { + return pn_collector_peek(pn_connection_collector(d->connection)); } -bool pn_connection_engine_finished(pn_connection_engine_t *ce) { - return pn_transport_closed(ce->transport) && !pn_connection_engine_has_event(ce); +bool pn_connection_driver_finished(pn_connection_driver_t *d) { + return pn_transport_closed(d->transport) && !pn_connection_driver_has_event(d); } -void pn_connection_engine_verrorf(pn_connection_engine_t *ce, const char *name, const char *fmt, va_list ap) { - pn_transport_t *t = ce->transport; +void pn_connection_driver_verrorf(pn_connection_driver_t *d, const char *name, const char *fmt, va_list ap) { + pn_transport_t *t = d->transport; pn_condition_t *cond = pn_transport_condition(t); pn_string_vformat(t->scratch, fmt, ap); pn_condition_set_name(cond, name); pn_condition_set_description(cond, pn_string_get(t->scratch)); } -void pn_connection_engine_errorf(pn_connection_engine_t *ce, const char *name, const char *fmt, ...) { +void pn_connection_driver_errorf(pn_connection_driver_t *d, const char *name, const char *fmt, ...) { va_list ap; va_start(ap, fmt); - pn_connection_engine_verrorf(ce, name, fmt, ap); + pn_connection_driver_verrorf(d, name, fmt, ap); va_end(ap); } -void pn_connection_engine_log(pn_connection_engine_t *ce, const char *msg) { - pn_transport_log(ce->transport, msg); +void pn_connection_driver_log(pn_connection_driver_t *d, const char *msg) { + pn_transport_log(d->transport, msg); +} + +void pn_connection_driver_vlogf(pn_connection_driver_t *d, const char *fmt, va_list ap) { + pn_transport_vlogf(d->transport, fmt, ap); } -void pn_connection_engine_vlogf(pn_connection_engine_t *ce, const char *fmt, va_list ap) { - pn_transport_vlogf(ce->transport, fmt, ap); +void pn_connection_driver_vlog(pn_connection_driver_t *d, const char *msg) { + pn_transport_log(d->transport, msg); } -void pn_connection_engine_vlog(pn_connection_engine_t *ce, const char *msg) { - pn_transport_log(ce->transport, msg); +pn_connection_driver_t* pn_event_batch_connection_driver(pn_event_batch_t *batch) { + return (batch->next_event == batch_next) ? + (pn_connection_driver_t*)((char*)batch - offsetof(pn_connection_driver_t, batch)) : + NULL; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/src/core/connection_engine.c ---------------------------------------------------------------------- diff --git a/proton-c/src/core/connection_engine.c b/proton-c/src/core/connection_engine.c deleted file mode 100644 index f31ddb0..0000000 --- a/proton-c/src/core/connection_engine.c +++ /dev/null @@ -1,163 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include "engine-internal.h" -#include <proton/condition.h> -#include <proton/connection.h> -#include <proton/connection_engine.h> -#include <proton/transport.h> -#include <string.h> - -int pn_connection_engine_init(pn_connection_engine_t* ce, pn_connection_t *c, pn_transport_t *t) { - ce->connection = c ? c : pn_connection(); - ce->transport = t ? t : pn_transport(); - ce->collector = pn_collector(); - if (!ce->connection || !ce->transport || !ce->collector) { - pn_connection_engine_destroy(ce); - return PN_OUT_OF_MEMORY; - } - pn_connection_collect(ce->connection, ce->collector); - return 0; -} - -int pn_connection_engine_bind(pn_connection_engine_t *ce) { - return pn_transport_bind(ce->transport, ce->connection); -} - -void pn_connection_engine_destroy(pn_connection_engine_t *ce) { - if (ce->transport) { - pn_transport_unbind(ce->transport); - pn_transport_free(ce->transport); - } - if (ce->collector) pn_collector_free(ce->collector); - if (ce->connection) pn_connection_free(ce->connection); - memset(ce, 0, sizeof(*ce)); -} - -pn_rwbytes_t pn_connection_engine_read_buffer(pn_connection_engine_t *ce) { - ssize_t cap = pn_transport_capacity(ce->transport); - return (cap > 0) ? pn_rwbytes(cap, pn_transport_tail(ce->transport)) : pn_rwbytes(0, 0); -} - -void pn_connection_engine_read_done(pn_connection_engine_t *ce, size_t n) { - if (n > 0) pn_transport_process(ce->transport, n); -} - -bool pn_connection_engine_read_closed(pn_connection_engine_t *ce) { - return pn_transport_capacity(ce->transport) < 0; -} - -void pn_connection_engine_read_close(pn_connection_engine_t *ce) { - if (!pn_connection_engine_read_closed(ce)) { - pn_transport_close_tail(ce->transport); - } -} - -pn_bytes_t pn_connection_engine_write_buffer(pn_connection_engine_t *ce) { - ssize_t pending = pn_transport_pending(ce->transport); - return (pending > 0) ? - pn_bytes(pending, pn_transport_head(ce->transport)) : pn_bytes_null; -} - -void pn_connection_engine_write_done(pn_connection_engine_t *ce, size_t n) { - if (n > 0) - pn_transport_pop(ce->transport, n); -} - -bool pn_connection_engine_write_closed(pn_connection_engine_t *ce) { - return pn_transport_pending(ce->transport) < 0; -} - -void pn_connection_engine_write_close(pn_connection_engine_t *ce) { - if (!pn_connection_engine_write_closed(ce)) { - pn_transport_close_head(ce->transport); - } -} - -void pn_connection_engine_close(pn_connection_engine_t *ce) { - pn_connection_engine_read_close(ce); - pn_connection_engine_write_close(ce); -} - -pn_event_t* pn_connection_engine_event(pn_connection_engine_t *ce) { - pn_event_t *e = ce->collector ? pn_collector_peek(ce->collector) : NULL; - if (e) { - pn_transport_t *t = ce->transport; - if (t && t->trace & PN_TRACE_EVT) { - /* This can log the same event twice if pn_connection_engine_event is called - * twice but for debugging it is much better to log before handling than after. - */ - pn_string_clear(t->scratch); - pn_inspect(e, t->scratch); - pn_transport_log(t, pn_string_get(t->scratch)); - } - } - return e; -} - -bool pn_connection_engine_has_event(pn_connection_engine_t *ce) { - return ce->collector && pn_collector_peek(ce->collector); -} - -void pn_connection_engine_pop_event(pn_connection_engine_t *ce) { - if (ce->collector) { - pn_event_t *e = pn_collector_peek(ce->collector); - if (pn_event_type(e) == PN_TRANSPORT_CLOSED) { /* The last event ever */ - /* Events can accumulate behind the TRANSPORT_CLOSED before the - * PN_TRANSPORT_CLOSED event is handled. They can never be processed - * so release them. - */ - pn_collector_release(ce->collector); - } else { - pn_collector_pop(ce->collector); - } - - } -} - -bool pn_connection_engine_finished(pn_connection_engine_t *ce) { - return pn_transport_closed(ce->transport) && !pn_connection_engine_has_event(ce); -} - -void pn_connection_engine_verrorf(pn_connection_engine_t *ce, const char *name, const char *fmt, va_list ap) { - pn_transport_t *t = ce->transport; - pn_condition_t *cond = pn_transport_condition(t); - pn_string_vformat(t->scratch, fmt, ap); - pn_condition_set_name(cond, name); - pn_condition_set_description(cond, pn_string_get(t->scratch)); -} - -void pn_connection_engine_errorf(pn_connection_engine_t *ce, const char *name, const char *fmt, ...) { - va_list ap; - va_start(ap, fmt); - pn_connection_engine_verrorf(ce, name, fmt, ap); - va_end(ap); -} - -void pn_connection_engine_log(pn_connection_engine_t *ce, const char *msg) { - pn_transport_log(ce->transport, msg); -} - -void pn_connection_engine_vlogf(pn_connection_engine_t *ce, const char *fmt, va_list ap) { - pn_transport_vlogf(ce->transport, fmt, ap); -} - -void pn_connection_engine_vlog(pn_connection_engine_t *ce, const char *msg) { - pn_transport_log(ce->transport, msg); -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/src/core/event.c ---------------------------------------------------------------------- diff --git a/proton-c/src/core/event.c b/proton-c/src/core/event.c index 7882327..2a0a5cf 100644 --- a/proton-c/src/core/event.c +++ b/proton-c/src/core/event.c @@ -28,7 +28,8 @@ struct pn_collector_t { pn_list_t *pool; pn_event_t *head; pn_event_t *tail; - bool freed; + bool freed:1; + bool head_returned:1; /* Head has been returned by pn_collector_next() */ }; struct pn_event_t { @@ -51,11 +52,8 @@ static void pn_collector_initialize(pn_collector_t *collector) static void pn_collector_drain(pn_collector_t *collector) { assert(collector); - - while (pn_collector_peek(collector)) { - pn_collector_pop(collector); - } - + while (pn_collector_next(collector)) + ; assert(!collector->head); assert(!collector->tail); } @@ -175,6 +173,7 @@ pn_event_t *pn_collector_peek(pn_collector_t *collector) bool pn_collector_pop(pn_collector_t *collector) { + collector->head_returned = false; pn_event_t *event = collector->head; if (event) { collector->head = event->next; @@ -190,6 +189,19 @@ bool pn_collector_pop(pn_collector_t *collector) return true; } +pn_event_t *pn_collector_next(pn_collector_t *collector) +{ + if (collector->head_returned) { + pn_collector_pop(collector); + } + collector->head_returned = collector->head; + return collector->head; +} + +pn_event_t *pn_collector_prev(pn_collector_t *collector) { + return collector->head_returned ? collector->head : NULL; +} + bool pn_collector_more(pn_collector_t *collector) { assert(collector); @@ -386,3 +398,7 @@ const char *pn_event_type_name(pn_event_type_t type) } return NULL; } + +pn_event_t *pn_event_batch_next(pn_event_batch_t *batch) { + return batch->next_event(batch); +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/src/tests/refcount.c ---------------------------------------------------------------------- diff --git a/proton-c/src/tests/refcount.c b/proton-c/src/tests/refcount.c index a36d01c..267c861 100644 --- a/proton-c/src/tests/refcount.c +++ b/proton-c/src/tests/refcount.c @@ -313,7 +313,8 @@ static void test_transport_connection(void) { } static void drain(pn_collector_t *collector) { - while (pn_collector_peek(collector)) { pn_collector_pop(collector); } + while (pn_collector_next(collector)) + ; } static void test_collector_connection_transport(void) { http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/qpid-proton-cpp.syms ---------------------------------------------------------------------- diff --git a/qpid-proton-cpp.syms b/qpid-proton-cpp.syms index 7a25651..c24e898 100644 --- a/qpid-proton-cpp.syms +++ b/qpid-proton-cpp.syms @@ -30,24 +30,24 @@ proton::connection::transport() const proton::connection::user(std::string const&) proton::connection::~connection() -proton::connection_engine::can_read() const -proton::connection_engine::can_write() const -proton::connection_engine::closed() const -proton::connection_engine::connection() const -proton::connection_engine::connection_engine(proton::handler&, proton::connection_options const&) -proton::connection_engine::container::container(std::string const&) -proton::connection_engine::container::id() const -proton::connection_engine::container::make_options() -proton::connection_engine::container::options(proton::connection_options const&) -proton::connection_engine::container::~container() -proton::connection_engine::dispatch() -proton::connection_engine::io_error::io_error(std::string const&) -proton::connection_engine::io_error::~io_error() -proton::connection_engine::no_opts -proton::connection_engine::process(int) -proton::connection_engine::try_read() -proton::connection_engine::try_write() -proton::connection_engine::~connection_engine() +proton::connection_driver::can_read() const +proton::connection_driver::can_write() const +proton::connection_driver::closed() const +proton::connection_driver::connection() const +proton::connection_driver::connection_driver(proton::handler&, proton::connection_options const&) +proton::connection_driver::container::container(std::string const&) +proton::connection_driver::container::id() const +proton::connection_driver::container::make_options() +proton::connection_driver::container::options(proton::connection_options const&) +proton::connection_driver::container::~container() +proton::connection_driver::dispatch() +proton::connection_driver::io_error::io_error(std::string const&) +proton::connection_driver::io_error::~io_error() +proton::connection_driver::no_opts +proton::connection_driver::process(int) +proton::connection_driver::try_read() +proton::connection_driver::try_write() +proton::connection_driver::~connection_driver() proton::connection_options::connection_options() proton::connection_options::connection_options(proton::connection_options const&) @@ -587,8 +587,8 @@ proton::value::value(proton::value const&) # Only types with the following info can be thrown across shared abject boundary # Or correctly dynamically cast by user typeinfo for proton::connection -typeinfo for proton::connection_engine -typeinfo for proton::connection_engine::io_error +typeinfo for proton::connection_driver +typeinfo for proton::connection_driver::io_error typeinfo for proton::conversion_error typeinfo for proton::endpoint typeinfo for proton::error @@ -600,8 +600,8 @@ typeinfo for proton::session typeinfo for proton::timeout_error typeinfo for proton::url_error typeinfo name for proton::connection -typeinfo name for proton::connection_engine -typeinfo name for proton::connection_engine::io_error +typeinfo name for proton::connection_driver +typeinfo name for proton::connection_driver::io_error typeinfo name for proton::conversion_error typeinfo name for proton::endpoint typeinfo name for proton::error @@ -613,8 +613,8 @@ typeinfo name for proton::session typeinfo name for proton::timeout_error typeinfo name for proton::url_error vtable for proton::connection -vtable for proton::connection_engine -vtable for proton::connection_engine::io_error +vtable for proton::connection_driver +vtable for proton::connection_driver::io_error vtable for proton::conversion_error vtable for proton::endpoint vtable for proton::error --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
