This is an automated email from the ASF dual-hosted git repository. gsim pushed a commit to branch dev-protocol-adaptors in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
commit c0fb276ecc3a193e9f03790cd3b4f910677cbe3a Author: Gordon Sim <g...@redhat.com> AuthorDate: Tue Aug 11 11:32:33 2020 +0100 DISPATCH-1654: Initial TCP adaptor --- python/qpid_dispatch/management/qdrouter.json | 61 ++ python/qpid_dispatch_internal/dispatch.py | 2 + python/qpid_dispatch_internal/management/agent.py | 28 + python/qpid_dispatch_internal/management/config.py | 2 +- src/CMakeLists.txt | 1 + src/adaptors/tcp_adaptor.c | 917 +++++++++++++++++++++ src/adaptors/tcp_adaptor.h | 79 ++ 7 files changed, 1089 insertions(+), 1 deletion(-) diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json index 421fa0a..0bb2bec 100644 --- a/python/qpid_dispatch/management/qdrouter.json +++ b/python/qpid_dispatch/management/qdrouter.json @@ -1151,6 +1151,66 @@ } }, + "tcpListener": { + "description": "Ingress TCP bridge.", + "extends": "configurationEntity", + "operations": ["CREATE", "DELETE"], + "attributes": { + "address": { + "description":"Address of this bridge", + "type": "string", + "create": true + }, + "host": { + "description":"A host name, IPV4 or IPV6 literal, or the empty string. The empty string listens on all local addresses. A host name listens on all addresses associated with the name. An IPV6 literal address (or wildcard '[::]') listens only for IPV6. An IPV4 literal address (or wildcard '0.0.0.0') listens only for IPV4.", + "type": "string", + "default": "0.0.0.0", + "create": true + }, + "port": { + "description": "Port number or symbolic service name. If '0', the router shall assign an ephemeral port to the listener and log the port number with a log of the form 'SERVER (notice) Listening on <host>:<assigned-port> (<listener-name>)'", + "type": "string", + "create": true + }, + "siteId": { + "type": "string", + "required": false, + "description": "Used to identify where connection is handled.", + "create": true + } + } + }, + + "tcpConnector": { + "description": "Egress TCP bridge.", + "extends": "configurationEntity", + "operations": ["CREATE", "DELETE"], + "attributes": { + "address": { + "description":"Address of this bridge", + "type": "string", + "create": true + }, + "host": { + "description":"IP address: ipv4 or ipv6 literal or a host name", + "type": "string", + "create": true + }, + "port": { + "description": "Port number or symbolic service name.", + "type": "string", + "create": true + + }, + "siteId": { + "type": "string", + "required": false, + "description": "Used to identify origin of connections.", + "create": true + } + } + }, + "log": { "description": "Configure logging for a particular module. You can use the `UPDATE` operation to change log settings while the router is running.", "extends": "configurationEntity", @@ -1174,6 +1234,7 @@ "CONN_MGR", "PYTHON", "PROTOCOL", + "TCP_ADAPTOR", "HTTP_ADAPTOR", "DEFAULT" ], diff --git a/python/qpid_dispatch_internal/dispatch.py b/python/qpid_dispatch_internal/dispatch.py index ad99749..f5bd91a 100644 --- a/python/qpid_dispatch_internal/dispatch.py +++ b/python/qpid_dispatch_internal/dispatch.py @@ -67,6 +67,8 @@ class QdDll(ctypes.PyDLL): self._prototype(self.qd_dispatch_configure_connector, ctypes.c_void_p, [self.qd_dispatch_p, py_object]) self._prototype(self.qd_dispatch_configure_ssl_profile, ctypes.c_void_p, [self.qd_dispatch_p, py_object]) self._prototype(self.qd_dispatch_configure_sasl_plugin, ctypes.c_void_p, [self.qd_dispatch_p, py_object]) + self._prototype(self.qd_dispatch_configure_tcp_listener, ctypes.c_void_p, [self.qd_dispatch_p, py_object]) + self._prototype(self.qd_dispatch_configure_tcp_connector, ctypes.c_void_p, [self.qd_dispatch_p, py_object]) self._prototype(self.qd_dispatch_configure_http_lsnr, ctypes.c_void_p, [self.qd_dispatch_p, py_object]) self._prototype(self.qd_dispatch_configure_http_connector, ctypes.c_void_p, [self.qd_dispatch_p, py_object]) self._prototype(self.qd_connection_manager_delete_listener, None, [self.qd_dispatch_p, ctypes.c_void_p]) diff --git a/python/qpid_dispatch_internal/management/agent.py b/python/qpid_dispatch_internal/management/agent.py index af6d8a2..ec5696c 100644 --- a/python/qpid_dispatch_internal/management/agent.py +++ b/python/qpid_dispatch_internal/management/agent.py @@ -586,6 +586,34 @@ class HttpListenerEntity(EntityAdapter): def _delete(self): self._qd.qd_dispatch_delete_http_listener(self._dispatch, self._implementations[0].key) +class TcpListenerEntity(EntityAdapter): + def create(self): + config_listener = self._qd.qd_dispatch_configure_tcp_listener(self._dispatch, self) + return config_listener + + def _identifier(self): + return _host_port_name_identifier(self) + + def __str__(self): + return super(TcpListenerEntity, self).__str__().replace("Entity(", "TcpListenerEntity(") + + def _delete(self): + self._qd.qd_dispatch_delete_tcp_listener(self._dispatch, self._implementations[0].key) + + +class TcpConnectorEntity(EntityAdapter): + def create(self): + config_connector = self._qd.qd_dispatch_configure_tcp_connector(self._dispatch, self) + return config_connector + + def _identifier(self): + return _host_port_name_identifier(self) + + def __str__(self): + return super(TcpConnectorEntity, self).__str__().replace("Entity(", "TcpConnectorEntity(") + + def _delete(self): + self._qd.qd_dispatch_delete_tcp_connector(self._dispatch, self._implementations[0].key) class HttpConnectorEntity(EntityAdapter): def create(self): diff --git a/python/qpid_dispatch_internal/management/config.py b/python/qpid_dispatch_internal/management/config.py index 0edfebb..21016f7 100644 --- a/python/qpid_dispatch_internal/management/config.py +++ b/python/qpid_dispatch_internal/management/config.py @@ -313,7 +313,7 @@ def configure_dispatch(dispatch, lib_handle, filename): for t in "sslProfile", "authServicePlugin", "listener", "connector", \ "router.config.address", "router.config.linkRoute", "router.config.autoLink", \ "router.config.exchange", "router.config.binding", \ - "vhost", "httpListener", "httpConnector": + "vhost", "httpListener", "httpConnector", "tcpListener", "tcpConnector": for a in config.by_type(t): configure(a) if t == "sslProfile": diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index f7ac67e..b780011 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -43,6 +43,7 @@ set(qpid_dispatch_SOURCES adaptors/http_adaptor.c adaptors/http1/http1_lib.c adaptors/http1/http1_adaptor.c + adaptors/tcp_adaptor.c alloc_pool.c amqp.c bitmask.c diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c new file mode 100644 index 0000000..27dcb5c --- /dev/null +++ b/src/adaptors/tcp_adaptor.c @@ -0,0 +1,917 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <proton/condition.h> +#include <proton/listener.h> +#include <proton/netaddr.h> +#include <proton/proactor.h> +#include <proton/raw_connection.h> +#include "qpid/dispatch/ctools.h" +#include "qpid/dispatch/protocol_adaptor.h" +#include "delivery.h" +#include "tcp_adaptor.h" +#include <stdio.h> +#include <inttypes.h> + +ALLOC_DEFINE(qd_tcp_listener_t); +ALLOC_DEFINE(qd_tcp_connector_t); + +typedef struct qdr_tcp_adaptor_t { + qdr_core_t *core; + qdr_protocol_adaptor_t *adaptor; + qd_tcp_listener_list_t listeners; + qd_tcp_connector_list_t connectors; + qd_log_source_t *log_source; +} qdr_tcp_adaptor_t; + +static qdr_tcp_adaptor_t *tcp_adaptor; + +#define READ_BUFFERS 4 +#define WRITE_BUFFERS 4 + +typedef struct qdr_tcp_connection_t { + qd_handler_context_t context; + char *reply_to; + qdr_connection_t *conn; + uint64_t conn_id; + qdr_link_t *incoming; + uint64_t incoming_id; + qdr_link_t *outgoing; + uint64_t outgoing_id; + pn_raw_connection_t *socket; + qdr_delivery_t *instream; + qdr_delivery_t *outstream; + bool ingress; + bool egress_dispatcher; + bool connector_closed;//only used if egress_dispatcher=true + qd_timer_t *activate_timer; + qd_bridge_config_t config; + qd_server_t *server; + char *remote_address; +} qdr_tcp_connection_t; + +static void handle_disconnected(qdr_tcp_connection_t* conn); +static void free_qdr_tcp_connection(qdr_tcp_connection_t* conn); + +static void on_activate(void *context) +{ + qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) context; + + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%i] on_activate", conn->conn_id); + while (qdr_connection_process(conn->conn)) {} + if (conn->egress_dispatcher && conn->connector_closed) { + qdr_connection_closed(conn->conn); + qdr_connection_set_context(conn->conn, 0); + free_qdr_tcp_connection(conn); + } +} + +static void grant_read_buffers(qdr_tcp_connection_t *conn) +{ + pn_raw_buffer_t raw_buffers[READ_BUFFERS]; + // Give proactor more read buffers for the socket + if (!pn_raw_connection_is_read_closed(conn->socket)) { + size_t desired = pn_raw_connection_read_buffers_capacity(conn->socket); + while (desired) { + size_t i; + for (i = 0; i < desired && i < READ_BUFFERS; ++i) { + qd_buffer_t *buf = qd_buffer(); + raw_buffers[i].bytes = (char*) qd_buffer_base(buf); + raw_buffers[i].capacity = qd_buffer_capacity(buf); + raw_buffers[i].size = 0; + raw_buffers[i].offset = 0; + raw_buffers[i].context = (uintptr_t) buf; + } + desired -= i; + pn_raw_connection_give_read_buffers(conn->socket, raw_buffers, i); + } + } +} + +static int handle_incoming(qdr_tcp_connection_t *conn) +{ + qd_buffer_list_t buffers; + DEQ_INIT(buffers); + pn_raw_buffer_t raw_buffers[READ_BUFFERS]; + size_t n; + int count = 0; + while ( (n = pn_raw_connection_take_read_buffers(conn->socket, raw_buffers, READ_BUFFERS)) ) { + for (size_t i = 0; i < n && raw_buffers[i].bytes; ++i) { + qd_buffer_t *buf = (qd_buffer_t*) raw_buffers[i].context; + qd_buffer_insert(buf, raw_buffers[i].size); + count += raw_buffers[i].size; + DEQ_INSERT_TAIL(buffers, buf); + } + } + + grant_read_buffers(conn); + + if (conn->instream) { + qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_BODY_DATA, 0); + qd_compose_insert_binary_buffers(field, &buffers); + qd_message_extend(qdr_delivery_message(conn->instream), field); + qd_compose_free(field); + qdr_delivery_continue(tcp_adaptor->core, conn->instream, false); + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%i][L%i] Continuing message with %i bytes", conn->conn_id, conn->incoming_id, count); + } else { + qd_message_t *msg = qd_message(); + + qd_composed_field_t *props = qd_compose(QD_PERFORMATIVE_PROPERTIES, 0); + qd_compose_start_list(props); + qd_compose_insert_null(props); // message-id + qd_compose_insert_null(props); // user-id + qd_compose_insert_null(props); // to + qd_compose_insert_null(props); // subject + qd_compose_insert_string(props, conn->reply_to); // reply-to + //qd_compose_insert_null(props); // correlation-id + //qd_compose_insert_null(props); // content-type + //qd_compose_insert_null(props); // content-encoding + //qd_compose_insert_timestamp(props, 0); // absolute-expiry-time + //qd_compose_insert_timestamp(props, 0); // creation-time + //qd_compose_insert_null(props); // group-id + //qd_compose_insert_uint(props, 0); // group-sequence + //qd_compose_insert_null(props); // reply-to-group-id + qd_compose_end_list(props); + + if (count > 0) { + props = qd_compose(QD_PERFORMATIVE_BODY_DATA, props); + qd_compose_insert_binary_buffers(props, &buffers); + } + + qd_message_compose_2(msg, props, false); + qd_compose_free(props); + + conn->instream = qdr_link_deliver(conn->incoming, msg, 0, false, 0, 0, 0, 0); + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%i][L%i] Initiating message with %i bytes", conn->conn_id, conn->incoming_id, count); + } + return count; +} + +static void free_qdr_tcp_connection(qdr_tcp_connection_t* tc) +{ + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "Freeing %p", (void*) tc); + if (tc->reply_to) { + free(tc->reply_to); + } + if(tc->remote_address) { + free(tc->remote_address); + } + if (tc->activate_timer) { + qd_timer_free(tc->activate_timer); + } + //proactor will free the socket + free(tc); +} + +static void handle_disconnected(qdr_tcp_connection_t* conn) +{ + if (conn->instream) { + qd_message_set_receive_complete(qdr_delivery_message(conn->instream)); + qdr_delivery_continue(tcp_adaptor->core, conn->instream, true); + } + qdr_connection_closed(conn->conn); + qdr_connection_set_context(conn->conn, 0); + free_qdr_tcp_connection(conn); +} + +static int read_message_body(qdr_tcp_connection_t *conn, qd_message_t *msg, pn_raw_buffer_t *buffers, int count) +{ + int used = 0; + qd_message_body_data_t *body_data; + while (used < count) { + qd_message_body_data_result_t body_data_result = qd_message_next_body_data(msg, &body_data); + if (body_data_result == QD_MESSAGE_BODY_DATA_OK) { + used += qd_message_body_data_buffers(body_data, buffers + used, used, count - used); + if (used > 0) { + buffers[used-1].context = (uintptr_t) body_data; + } + } else if (body_data_result == QD_MESSAGE_BODY_DATA_INCOMPLETE) { + return used; + } else { + switch (body_data_result) { + case QD_MESSAGE_BODY_DATA_NO_MORE: + qd_log(tcp_adaptor->log_source, QD_LOG_INFO, "[C%i] EOS", conn->conn_id); break; + case QD_MESSAGE_BODY_DATA_INVALID: + qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "[C%i] Invalid body data for streaming message", conn->conn_id); break; + case QD_MESSAGE_BODY_DATA_NOT_DATA: + qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "[C%i] Invalid body; expected data section", conn->conn_id); break; + default: + break; + } + qd_message_set_send_complete(msg); + return -1; + } + } + return used; +} + +static void handle_outgoing(qdr_tcp_connection_t *conn) +{ + if (conn->outstream) { + qd_message_t *msg = qdr_delivery_message(conn->outstream); + pn_raw_buffer_t buffs[WRITE_BUFFERS]; + for (int i = 0; i < WRITE_BUFFERS; i++) { + buffs[i].context = 0; + buffs[i].bytes = 0; + buffs[i].capacity = 0; + buffs[i].size = 0; + buffs[i].offset = 0; + } + int n = read_message_body(conn, msg, buffs, WRITE_BUFFERS); + if (n > 0) { + size_t used = pn_raw_connection_write_buffers(conn->socket, buffs, n); + int bytes_written = 0; + for (size_t i = 0; i < used; i++) { + if (buffs[i].bytes) { + bytes_written += buffs[i].size; + } else { + qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "[C%i] empty buffer can't be written (%i of %i)", conn->conn_id, i+1, used); + } + } + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%i] Writing %i bytes", conn->conn_id, bytes_written); + } + if (qd_message_receive_complete(msg) || qd_message_send_complete(msg)) { + pn_raw_connection_close(conn->socket); + } + } +} + +static char *get_address_string(pn_raw_connection_t *socket) +{ + const pn_netaddr_t *netaddr = pn_raw_connection_remote_addr(socket); + char buffer[1024]; + int len = pn_netaddr_str(netaddr, buffer, 1024); + if (len <= 1024) { + return strdup(buffer); + } else { + return strndup(buffer, 1024); + } +} + +static void qdr_tcp_connection_ingress_accept(qdr_tcp_connection_t* tc) +{ + tc->remote_address = get_address_string(tc->socket); + qdr_connection_info_t *info = qdr_connection_info(false, //bool is_encrypted, + false, //bool is_authenticated, + true, //bool opened, + "", //char *sasl_mechanisms, + QD_INCOMING, //qd_direction_t dir, + tc->remote_address, //const char *host, + "", //const char *ssl_proto, + "", //const char *ssl_cipher, + "", //const char *user, + "TcpAdaptor", //const char *container, + pn_data(0), //pn_data_t *connection_properties, + 0, //int ssl_ssf, + false, //bool ssl, + // set if remote is a qdrouter + 0); //const qdr_router_version_t *version) + + tc->conn_id = qd_server_allocate_connection_id(tc->server); + qdr_connection_t *conn = qdr_connection_opened(tcp_adaptor->core, + tcp_adaptor->adaptor, + true, + QDR_ROLE_NORMAL, + 1, + tc->conn_id, + 0, + 0, + false, + false, + false, + false, + 250, + 0, + info, + 0, + 0); + tc->conn = conn; + qdr_connection_set_context(conn, tc); + + qdr_terminus_t *dynamic_source = qdr_terminus(0); + qdr_terminus_set_dynamic(dynamic_source); + qdr_terminus_t *target = qdr_terminus(0); + qdr_terminus_set_address(target, tc->config.address); + + tc->outgoing = qdr_link_first_attach(conn, + QD_OUTGOING, + dynamic_source, //qdr_terminus_t *source, + qdr_terminus(0), //qdr_terminus_t *target, + "tcp.ingress.out", //const char *name, + 0, //const char *terminus_addr, + false, + NULL, + &(tc->outgoing_id)); + qdr_link_set_context(tc->outgoing, tc); + tc->incoming = qdr_link_first_attach(conn, + QD_INCOMING, + qdr_terminus(0), //qdr_terminus_t *source, + target, //qdr_terminus_t *target, + "tcp.ingress.in", //const char *name, + 0, //const char *terminus_addr, + false, + NULL, + &(tc->incoming_id)); + qdr_link_set_context(tc->incoming, tc); + + grant_read_buffers(tc); +} + +static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void *context) +{ + qdr_tcp_connection_t *conn = (qdr_tcp_connection_t*) context; + qd_log_source_t *log = tcp_adaptor->log_source; + switch (pn_event_type(e)) { + case PN_RAW_CONNECTION_CONNECTED: { + if (conn->ingress) { + qdr_tcp_connection_ingress_accept(conn); + qd_log(log, QD_LOG_INFO, "[C%i] Accepted from %s", conn->conn_id, conn->remote_address); + break; + } else { + qd_log(log, QD_LOG_INFO, "[C%i] Connected", conn->conn_id); + qdr_connection_process(conn->conn); + break; + } + } + case PN_RAW_CONNECTION_CLOSED_READ: { + qd_log(log, QD_LOG_DEBUG, "[C%i] Closed for reading", conn->conn_id); + pn_raw_connection_close(conn->socket); + break; + } + case PN_RAW_CONNECTION_CLOSED_WRITE: { + qd_log(log, QD_LOG_DEBUG, "[C%i] Closed for writing", conn->conn_id); + pn_raw_connection_close(conn->socket); + break; + } + case PN_RAW_CONNECTION_DISCONNECTED: { + qd_log(log, QD_LOG_INFO, "[C%i] Disconnected", conn->conn_id); + handle_disconnected(conn); + break; + } + case PN_RAW_CONNECTION_NEED_WRITE_BUFFERS: { + qd_log(log, QD_LOG_DEBUG, "[C%i] Need write buffers", conn->conn_id); + while (qdr_connection_process(conn->conn)) {} + break; + } + case PN_RAW_CONNECTION_NEED_READ_BUFFERS: { + qd_log(log, QD_LOG_DEBUG, "[C%i] Need read buffers", conn->conn_id); + while (qdr_connection_process(conn->conn)) {} + break; + } + case PN_RAW_CONNECTION_WAKE: { + qd_log(log, QD_LOG_DEBUG, "[C%i] Wake-up", conn->conn_id); + while (qdr_connection_process(conn->conn)) {} + break; + } + case PN_RAW_CONNECTION_READ: { + int read = handle_incoming(conn); + qd_log(log, QD_LOG_DEBUG, "[C%i] Read %i bytes", conn->conn_id, read); + while (qdr_connection_process(conn->conn)) {} + break; + } + case PN_RAW_CONNECTION_WRITTEN: { + pn_raw_buffer_t buffs[WRITE_BUFFERS]; + size_t pn_raw_connection_take_written_buffers(pn_raw_connection_t *connection, pn_raw_buffer_t *buffers, size_t num); + size_t n; + size_t written = 0; + while ( (n = pn_raw_connection_take_written_buffers(conn->socket, buffs, WRITE_BUFFERS)) ) { + for (size_t i = 0; i < n; ++i) { + written += buffs[i].size; + if (buffs[i].context) { + qd_message_body_data_release((qd_message_body_data_t*) buffs[i].context); + } + } + } + qd_log(log, QD_LOG_DEBUG, "[C%i] Wrote %i bytes", conn->conn_id, written); + while (qdr_connection_process(conn->conn)) {} + break; + } + default: + break; + } +} + +static qdr_tcp_connection_t *qdr_tcp_connection_ingress(qd_tcp_listener_t* listener) +{ + qdr_tcp_connection_t* tc = NEW(qdr_tcp_connection_t); + ZERO(tc); + tc->ingress = true; + tc->context.context = tc; + tc->context.handler = &handle_connection_event; + tc->config = listener->config; + tc->server = listener->server; + tc->socket = pn_raw_connection(); + pn_raw_connection_set_context(tc->socket, tc); + //the following call will cause a PN_RAW_CONNECTION_CONNECTED + //event on another thread, which is where the rest of the + //initialisation will happen, through a call to + //qdr_tcp_connection_ingress_accept + pn_listener_raw_accept(listener->pn_listener, tc->socket); + return tc; +} + +static void tcp_connector_establish(qdr_tcp_connection_t *conn) +{ + qd_log(tcp_adaptor->log_source, QD_LOG_INFO, "[C%i] Connecting to: %s", conn->conn_id, conn->config.host_port); + conn->socket = pn_raw_connection(); + pn_raw_connection_set_context(conn->socket, conn); + pn_proactor_raw_connect(qd_server_proactor(conn->server), conn->socket, conn->config.host_port); +} + +static qdr_tcp_connection_t *qdr_tcp_connection_egress(qd_bridge_config_t *config, qd_server_t *server, qdr_delivery_t *initial_delivery) +{ + qdr_tcp_connection_t* tc = NEW(qdr_tcp_connection_t); + ZERO(tc); + if (initial_delivery) { + tc->egress_dispatcher = false; + } else { + tc->activate_timer = qd_timer(tcp_adaptor->core->qd, on_activate, tc); + tc->egress_dispatcher = true; + } + tc->ingress = false; + tc->context.context = tc; + tc->context.handler = &handle_connection_event; + tc->config = *config; + tc->server = server; + qdr_connection_info_t *info = qdr_connection_info(false, //bool is_encrypted, + false, //bool is_authenticated, + true, //bool opened, + "", //char *sasl_mechanisms, + QD_OUTGOING, //qd_direction_t dir, + tc->egress_dispatcher ? "egress-dispatch" : tc->config.host_port, //const char *host, + "", //const char *ssl_proto, + "", //const char *ssl_cipher, + "", //const char *user, + "TcpAdaptor", //const char *container, + pn_data(0), //pn_data_t *connection_properties, + 0, //int ssl_ssf, + false, //bool ssl, + // set if remote is a qdrouter + 0); //const qdr_router_version_t *version) + + tc->conn_id = qd_server_allocate_connection_id(tc->server); + qdr_connection_t *conn = qdr_connection_opened(tcp_adaptor->core, + tcp_adaptor->adaptor, + false, + QDR_ROLE_NORMAL, + 1, + tc->conn_id, + 0, + 0, + false, + false, + false, + false, + 250, + 0, + info, + 0, + 0); + tc->conn = conn; + qdr_connection_set_context(conn, tc); + + qdr_terminus_t *source = qdr_terminus(0); + qdr_terminus_set_address(source, tc->config.address); + + tc->outgoing = qdr_link_first_attach(conn, + QD_OUTGOING, + source, //qdr_terminus_t *source, + qdr_terminus(0), //qdr_terminus_t *target, + "tcp.egress.out", //const char *name, + 0, //const char *terminus_addr, + !(tc->egress_dispatcher), + initial_delivery, + &(tc->outgoing_id)); + qdr_link_set_context(tc->outgoing, tc); + //the incoming link for egress is created once we receive the + //message which has the reply to address (and read buffers are + //granted at that point) + if (!tc->egress_dispatcher) { + tcp_connector_establish(tc); + } + + return tc; +} + +static void free_bridge_config(qd_bridge_config_t *config) +{ + if (!config) return; + free(config->host); + free(config->port); + free(config->name); + free(config->site_id); + free(config->host_port); +} + +#define CHECK() if (qd_error_code()) goto error + +static qd_error_t load_bridge_config(qd_dispatch_t *qd, qd_bridge_config_t *config, qd_entity_t* entity, bool is_listener) +{ + qd_error_clear(); + ZERO(config); + + config->name = qd_entity_get_string(entity, "name"); CHECK(); + config->address = qd_entity_get_string(entity, "address"); CHECK(); + config->host = qd_entity_get_string(entity, "host"); CHECK(); + config->port = qd_entity_get_string(entity, "port"); CHECK(); + config->site_id = qd_entity_opt_string(entity, "site-id", 0); CHECK(); + + int hplen = strlen(config->host) + strlen(config->port) + 2; + config->host_port = malloc(hplen); + snprintf(config->host_port, hplen, "%s:%s", config->host, config->port); + + return QD_ERROR_NONE; + + error: + free_bridge_config(config); + return qd_error_code(); +} + +static void log_tcp_bridge_config(qd_log_source_t *log, qd_bridge_config_t *c, const char *what) { + qd_log(log, QD_LOG_INFO, "Configured %s for %s, %s:%s", what, c->address, c->host, c->port); +} + +void qd_tcp_listener_decref(qd_tcp_listener_t* li) +{ + if (li && sys_atomic_dec(&li->ref_count) == 1) { + free_bridge_config(&li->config); + free_qd_tcp_listener_t(li); + } +} + +static void handle_listener_event(pn_event_t *e, qd_server_t *qd_server, void *context) { + qd_log_source_t *log = tcp_adaptor->log_source; + + qd_tcp_listener_t *li = (qd_tcp_listener_t*) context; + const char *host_port = li->config.host_port; + + switch (pn_event_type(e)) { + + case PN_LISTENER_OPEN: { + qd_log(log, QD_LOG_NOTICE, "Listening on %s", host_port); + break; + } + + case PN_LISTENER_ACCEPT: { + qd_log(log, QD_LOG_INFO, "Accepting TCP connection on %s", host_port); + qdr_tcp_connection_ingress(li); + break; + } + + case PN_LISTENER_CLOSE: + if (li->pn_listener) { + pn_condition_t *cond = pn_listener_condition(li->pn_listener); + if (pn_condition_is_set(cond)) { + qd_log(log, QD_LOG_ERROR, "Listener error on %s: %s (%s)", host_port, + pn_condition_get_description(cond), + pn_condition_get_name(cond)); + } else { + qd_log(log, QD_LOG_TRACE, "Listener closed on %s", host_port); + } + pn_listener_set_context(li->pn_listener, 0); + li->pn_listener = 0; + qd_tcp_listener_decref(li); + } + break; + + default: + break; + } +} + +static qd_tcp_listener_t *qd_tcp_listener(qd_server_t *server) +{ + qd_tcp_listener_t *li = new_qd_tcp_listener_t(); + if (!li) return 0; + ZERO(li); + sys_atomic_init(&li->ref_count, 1); + li->server = server; + li->context.context = li; + li->context.handler = &handle_listener_event; + return li; +} + +static const int BACKLOG = 50; /* Listening backlog */ + +static bool tcp_listener_listen(qd_tcp_listener_t *li) { + li->pn_listener = pn_listener(); + if (li->pn_listener) { + pn_listener_set_context(li->pn_listener, &li->context); + pn_proactor_listen(qd_server_proactor(li->server), li->pn_listener, li->config.host_port, BACKLOG); + sys_atomic_inc(&li->ref_count); /* In use by proactor, PN_LISTENER_CLOSE will dec */ + /* Listen is asynchronous, log "listening" message on PN_LISTENER_OPEN event */ + } else { + qd_log(tcp_adaptor->log_source, QD_LOG_CRITICAL, "Failed to create listener for %s", + li->config.host_port); + } + return li->pn_listener; +} + +qd_tcp_listener_t *qd_dispatch_configure_tcp_listener(qd_dispatch_t *qd, qd_entity_t *entity) +{ + qd_tcp_listener_t *li = qd_tcp_listener(qd->server); + if (!li || load_bridge_config(qd, &li->config, entity, true) != QD_ERROR_NONE) { + qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "Unable to create tcp listener: %s", qd_error_message()); + qd_tcp_listener_decref(li); + return 0; + } + DEQ_ITEM_INIT(li); + DEQ_INSERT_TAIL(tcp_adaptor->listeners, li); + log_tcp_bridge_config(tcp_adaptor->log_source, &li->config, "TcpListener"); + tcp_listener_listen(li); + return li; +} + +void qd_dispatch_delete_tcp_listener(qd_dispatch_t *qd, void *impl) +{ + qd_tcp_listener_t *li = (qd_tcp_listener_t*) impl; + if (li) { + if (li->pn_listener) { + pn_listener_close(li->pn_listener); + } + DEQ_REMOVE(tcp_adaptor->listeners, li); + qd_log(tcp_adaptor->log_source, QD_LOG_INFO, "Deleted TcpListener for %s, %s:%s", li->config.address, li->config.host, li->config.port); + qd_tcp_listener_decref(li); + } +} + +qd_error_t qd_entity_refresh_tcpListener(qd_entity_t* entity, void *impl) +{ + return QD_ERROR_NONE; +} + +static qd_tcp_connector_t *qd_tcp_connector(qd_server_t *server) +{ + qd_tcp_connector_t *c = new_qd_tcp_connector_t(); + if (!c) return 0; + ZERO(c); + sys_atomic_init(&c->ref_count, 1); + c->server = server; + return c; +} + +void qd_tcp_connector_decref(qd_tcp_connector_t* c) +{ + if (c && sys_atomic_dec(&c->ref_count) == 1) { + free_bridge_config(&c->config); + free_qd_tcp_connector_t(c); + } +} + +qd_tcp_connector_t *qd_dispatch_configure_tcp_connector(qd_dispatch_t *qd, qd_entity_t *entity) +{ + qd_tcp_connector_t *c = qd_tcp_connector(qd->server); + if (!c || load_bridge_config(qd, &c->config, entity, true) != QD_ERROR_NONE) { + qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "Unable to create tcp connector: %s", qd_error_message()); + qd_tcp_connector_decref(c); + return 0; + } + DEQ_ITEM_INIT(c); + DEQ_INSERT_TAIL(tcp_adaptor->connectors, c); + log_tcp_bridge_config(tcp_adaptor->log_source, &c->config, "TcpConnector"); + c->dispatcher = qdr_tcp_connection_egress(&(c->config), c->server, NULL); + return c; +} + +static void close_egress_dispatcher(qdr_tcp_connection_t *context) +{ + //actual close needs to happen on connection thread + context->connector_closed = true; + qd_timer_schedule(context->activate_timer, 0); +} + +void qd_dispatch_delete_tcp_connector(qd_dispatch_t *qd, void *impl) +{ + qd_tcp_connector_t *ct = (qd_tcp_connector_t*) impl; + if (ct) { + //need to close the pseudo-connection used for dispatching + //deliveries out to live connnections: + qd_log(tcp_adaptor->log_source, QD_LOG_INFO, "Deleted TcpConnector for %s, %s:%s", ct->config.address, ct->config.host, ct->config.port); + close_egress_dispatcher((qdr_tcp_connection_t*) ct->dispatcher); + DEQ_REMOVE(tcp_adaptor->connectors, ct); + qd_tcp_connector_decref(ct); + } +} + +qd_error_t qd_entity_refresh_tcpConnector(qd_entity_t* entity, void *impl) +{ + return QD_ERROR_NONE; +} + +static void qdr_tcp_first_attach(void *context, qdr_connection_t *conn, qdr_link_t *link, + qdr_terminus_t *source, qdr_terminus_t *target, + qd_session_class_t session_class) +{ +} + +static void qdr_tcp_connection_copy_reply_to(qdr_tcp_connection_t* tc, qd_iterator_t* reply_to) +{ + int length = qd_iterator_length(reply_to); + tc->reply_to = malloc(length + 1); + qd_iterator_strncpy(reply_to, tc->reply_to, length + 1); +} + +static void qdr_tcp_second_attach(void *context, qdr_link_t *link, + qdr_terminus_t *source, qdr_terminus_t *target) +{ + void* link_context = qdr_link_get_context(link); + if (link_context) { + qdr_tcp_connection_t* tc = (qdr_tcp_connection_t*) link_context; + if (qdr_link_direction(link) == QD_OUTGOING) { + if (tc->ingress) { + qdr_tcp_connection_copy_reply_to(tc, qdr_terminus_get_address(source)); + // for ingress, can start reading from socket once we have + // a reply to address, as that is when we are able to send + // out a message + grant_read_buffers(tc); + handle_incoming(tc); + } + qdr_link_flow(tcp_adaptor->core, link, 10, false); + } else if (!tc->ingress) { + //for egress we can start reading from the socket once we + //have the link to send messages over + grant_read_buffers(tc); + } + } +} + + +static void qdr_tcp_detach(void *context, qdr_link_t *link, qdr_error_t *error, bool first, bool close) +{ +} + + +static void qdr_tcp_flow(void *context, qdr_link_t *link, int credit) +{ +} + + +static void qdr_tcp_offer(void *context, qdr_link_t *link, int delivery_count) +{ +} + + +static void qdr_tcp_drained(void *context, qdr_link_t *link) +{ +} + + +static void qdr_tcp_drain(void *context, qdr_link_t *link, bool mode) +{ +} + + +static int qdr_tcp_push(void *context, qdr_link_t *link, int limit) +{ + return qdr_link_process_deliveries(tcp_adaptor->core, link, limit); +} + + +static uint64_t qdr_tcp_deliver(void *context, qdr_link_t *link, qdr_delivery_t *delivery, bool settled) +{ + void* link_context = qdr_link_get_context(link); + if (link_context) { + qdr_tcp_connection_t* tc = (qdr_tcp_connection_t*) link_context; + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%i][L%i] Delivery event", tc->conn_id, tc->outgoing_id); + if (tc->egress_dispatcher) { + qdr_tcp_connection_egress(&(tc->config), tc->server, delivery); + } else if (!tc->outstream) { + tc->outstream = delivery; + if (!tc->ingress) { + //on egress, can only set up link for the reverse + //direction once we receive the first part of the + //message from client to server + qd_message_t *msg = qdr_delivery_message(delivery); + qdr_tcp_connection_copy_reply_to(tc, qd_message_field_iterator(msg, QD_FIELD_REPLY_TO)); + qdr_terminus_t *target = qdr_terminus(0); + qdr_terminus_set_address(target, tc->reply_to); + tc->incoming = qdr_link_first_attach(tc->conn, + QD_INCOMING, + qdr_terminus(0), //qdr_terminus_t *source, + target, //qdr_terminus_t *target, + "tcp.egress.in", //const char *name, + 0, //const char *terminus_addr, + false, + NULL, + &(tc->incoming_id)); + qdr_link_set_context(tc->incoming, tc); + handle_incoming(tc); + } + } + handle_outgoing(tc); + } + return 0; +} + + +static int qdr_tcp_get_credit(void *context, qdr_link_t *link) +{ + return 10; +} + + +static void qdr_tcp_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t disp, bool settled) +{ + void* link_context = qdr_link_get_context(qdr_delivery_link(dlv)); + if (link_context) { + qdr_tcp_connection_t* tc = (qdr_tcp_connection_t*) link_context; + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%i] Delivery update", tc->conn_id); + } +} + + +static void qdr_tcp_conn_close(void *context, qdr_connection_t *conn, qdr_error_t *error) +{ +} + + +static void qdr_tcp_conn_trace(void *context, qdr_connection_t *conn, bool trace) +{ +} + +static void qdr_tcp_activate(void *notused, qdr_connection_t *c) +{ + void *context = qdr_connection_get_context(c); + if (context) { + qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) context; + if (conn->socket) { + pn_raw_connection_wake(conn->socket); + } else if (conn->activate_timer) { + // On egress, the raw connection is only created once the + // first part of the message encapsulating the + // client->server half of the stream has been + // received. Prior to that however a subscribing link (and + // its associated connection must be setup), for which we + // fake wakeup by using a timer. + qd_timer_schedule(conn->activate_timer, 0); + } else { + qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "[C%i] Cannot activate", conn->conn_id); + } + } +} + +/** + * This initialization function will be invoked when the router core is ready for the protocol + * adaptor to be created. This function must: + * + * 1) Register the protocol adaptor with the router-core. + * 2) Prepare the protocol adaptor to be configured. + */ +static void qdr_tcp_adaptor_init(qdr_core_t *core, void **adaptor_context) +{ + qdr_tcp_adaptor_t *adaptor = NEW(qdr_tcp_adaptor_t); + adaptor->core = core; + adaptor->adaptor = qdr_protocol_adaptor(core, + "tcp", // name + adaptor, // context + qdr_tcp_activate, // activate + qdr_tcp_first_attach, + qdr_tcp_second_attach, + qdr_tcp_detach, + qdr_tcp_flow, + qdr_tcp_offer, + qdr_tcp_drained, + qdr_tcp_drain, + qdr_tcp_push, + qdr_tcp_deliver, + qdr_tcp_get_credit, + qdr_tcp_delivery_update, + qdr_tcp_conn_close, + qdr_tcp_conn_trace); + adaptor->log_source = qd_log_source("TCP_ADAPTOR"); + DEQ_INIT(adaptor->listeners); + DEQ_INIT(adaptor->connectors); + *adaptor_context = adaptor; + + tcp_adaptor = adaptor; +} + + +static void qdr_tcp_adaptor_final(void *adaptor_context) +{ + qdr_tcp_adaptor_t *adaptor = (qdr_tcp_adaptor_t*) adaptor_context; + qdr_protocol_adaptor_free(adaptor->core, adaptor->adaptor); + free(adaptor); + tcp_adaptor = NULL; +} + +/** + * Declare the adaptor so that it will self-register on process startup. + */ +QDR_CORE_ADAPTOR_DECLARE("tcp-adaptor", qdr_tcp_adaptor_init, qdr_tcp_adaptor_final) diff --git a/src/adaptors/tcp_adaptor.h b/src/adaptors/tcp_adaptor.h new file mode 100644 index 0000000..17bd7f9 --- /dev/null +++ b/src/adaptors/tcp_adaptor.h @@ -0,0 +1,79 @@ +#ifndef __tcp_adaptor_h__ +#define __tcp_adaptor_h__ 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qpid/dispatch/atomic.h> +#include <qpid/dispatch/enum.h> +#include <qpid/dispatch/server.h> +#include <qpid/dispatch/threading.h> +#include <qpid/dispatch/alloc.h> +#include <qpid/dispatch/ctools.h> +#include <qpid/dispatch/log.h> +#include <proton/engine.h> +#include <proton/event.h> +#include <proton/ssl.h> + +#include "dispatch_private.h" +#include "timer_private.h" + +typedef struct qd_tcp_listener_t qd_tcp_listener_t; +typedef struct qd_tcp_connector_t qd_tcp_connector_t; +typedef struct qd_bridge_config_t qd_bridge_config_t; + +struct qd_bridge_config_t +{ + char *name; + char *address; + char *host; + char *port; + char *site_id; + char *host_port; +}; + +struct qd_tcp_listener_t +{ + qd_handler_context_t context; + /* May be referenced by connection_manager and pn_listener_t */ + sys_atomic_t ref_count; + qd_server_t *server; + qd_bridge_config_t config; + pn_listener_t *pn_listener; + + DEQ_LINKS(qd_tcp_listener_t); +}; + +DEQ_DECLARE(qd_tcp_listener_t, qd_tcp_listener_list_t); +ALLOC_DECLARE(qd_tcp_listener_t); + +struct qd_tcp_connector_t +{ + /* May be referenced by connection_manager, timer and pn_connection_t */ + sys_atomic_t ref_count; + qd_server_t *server; + qd_bridge_config_t config; + void *dispatcher; + + DEQ_LINKS(qd_tcp_connector_t); +}; + +DEQ_DECLARE(qd_tcp_connector_t, qd_tcp_connector_list_t); +ALLOC_DECLARE(qd_tcp_connector_t); + +#endif --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org