Repository: qpid-proton Updated Branches: refs/heads/master b2e1acd97 -> 66cb35e1c
PROTON-1470: proactor api - composing and examining addresses Simpler solution than proposed in the JIRA. No change to connect()/listen() signatures, they still take a string address. Added pn_proactor_addr() to format an address string from a separate host and port, so user does not need to know the format. Format is simple "host:port", where the port is the substring after the *last* ":". IPv6 hosts are not decorated with "[]", the last ":" in the address is always the host/port separator, never part of an IPv6 literal. The C examples now take separate host, port arguments and use pn_proactor_addr() to construct the proactor address. Existing bindings do not need to be changed, but it would be good practice to have them use pn_proactor_addr() rather than hard-coding the "host:port"format. Also renamed pn_proactor_addr_t to pn_netaddr_t. Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/66cb35e1 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/66cb35e1 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/66cb35e1 Branch: refs/heads/master Commit: 66cb35e1ce412e792c8557a43ff53b144972113c Parents: b2e1acd Author: Alan Conway <acon...@redhat.com> Authored: Fri Apr 28 17:37:05 2017 -0400 Committer: Alan Conway <acon...@redhat.com> Committed: Fri Apr 28 18:47:39 2017 -0400 ---------------------------------------------------------------------- examples/c/proactor/broker.c | 8 +- examples/c/proactor/direct.c | 16 ++-- examples/c/proactor/receive.c | 16 ++-- examples/c/proactor/send.c | 17 ++-- examples/c/proactor/test.py | 21 +++-- proton-c/CMakeLists.txt | 2 +- proton-c/include/proton/netaddr.h | 76 ++++++++++++++++ proton-c/include/proton/proactor.h | 115 ++++++------------------- proton-c/src/proactor/epoll.c | 8 +- proton-c/src/proactor/libuv.c | 46 +++++----- proton-c/src/proactor/proactor-internal.h | 37 ++++++++ proton-c/src/proactor/proactor.c | 45 ++++++++++ proton-c/src/tests/proactor.c | 63 +++++++++----- 13 files changed, 303 insertions(+), 167 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/66cb35e1/examples/c/proactor/broker.c ---------------------------------------------------------------------- diff --git a/examples/c/proactor/broker.c b/examples/c/proactor/broker.c index 6501927..05c5552 100644 --- a/examples/c/proactor/broker.c +++ b/examples/c/proactor/broker.c @@ -401,9 +401,13 @@ int main(int argc, char **argv) { queues_init(&b.queues); b.container_id = argv[0]; b.threads = 4; - const char *addr = (argc > 1) ? argv[1] : "127.0.0.1:amqp"; + int i = 1; + const char *host = (argc > i) ? argv[i++] : ""; + const char *port = (argc > i) ? argv[i++] : "amqp"; - /* Listen on addr */ + /* Listenf on addr */ + char addr[PN_MAX_ADDR]; + pn_proactor_addr(addr, sizeof(addr), host, port); pn_proactor_listen(b.proactor, pn_listener(), addr, 16); /* Start n-1 threads */ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/66cb35e1/examples/c/proactor/direct.c ---------------------------------------------------------------------- diff --git a/examples/c/proactor/direct.c b/examples/c/proactor/direct.c index f76895c..c1ac467 100644 --- a/examples/c/proactor/direct.c +++ b/examples/c/proactor/direct.c @@ -34,7 +34,7 @@ #include <stdlib.h> typedef struct app_data_t { - const char *connection_address; + const char *host, *port; const char *amqp_address; const char *container_id; int message_count; @@ -306,15 +306,19 @@ void run(app_data_t *app) { int main(int argc, char **argv) { struct app_data_t app = {0}; - app.container_id = argv[0]; /* Should be unique */ - app.connection_address = (argc > 1) ? argv[1] : "127.0.0.1:amqp"; - app.amqp_address = (argc > 2) ? argv[2] : "example"; - app.message_count = (argc > 3) ? atoi(argv[3]) : 10; + int i = 0; + app.container_id = argv[i++]; /* Should be unique */ + app.host = (argc > 1) ? argv[i++] : ""; + app.port = (argc > 1) ? argv[i++] : "amqp"; + app.amqp_address = (argc > i) ? argv[i++] : "example"; + app.message_count = (argc > i) ? atoi(argv[i++]) : 10; /* Create the proactor and connect */ app.proactor = pn_proactor(); app.listener = pn_listener(); - pn_proactor_listen(app.proactor, app.listener, app.connection_address, 16); + char addr[PN_MAX_ADDR]; + pn_proactor_addr(addr, sizeof(addr), app.host, app.port); + pn_proactor_listen(app.proactor, app.listener, addr, 16); run(&app); pn_proactor_free(app.proactor); free(app.message_buffer.start); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/66cb35e1/examples/c/proactor/receive.c ---------------------------------------------------------------------- diff --git a/examples/c/proactor/receive.c b/examples/c/proactor/receive.c index c8d3363..ddf0a35 100644 --- a/examples/c/proactor/receive.c +++ b/examples/c/proactor/receive.c @@ -32,7 +32,7 @@ #include <stdlib.h> typedef struct app_data_t { - const char *connection_address; + const char *host, *port; const char *amqp_address; const char *container_id; int message_count; @@ -170,14 +170,18 @@ void run(app_data_t *app) { int main(int argc, char **argv) { struct app_data_t app = {0}; - app.container_id = argv[0]; /* Should be unique */ - app.connection_address = (argc > 1) ? argv[1] : "127.0.0.1:amqp"; - app.amqp_address = (argc > 2) ? argv[2] : "example"; - app.message_count = (argc > 3) ? atoi(argv[3]) : 10; + int i = 0; + app.container_id = argv[i++]; /* Should be unique */ + app.host = (argc > 1) ? argv[i++] : ""; + app.port = (argc > 1) ? argv[i++] : "amqp"; + app.amqp_address = (argc > i) ? argv[i++] : "example"; + app.message_count = (argc > i) ? atoi(argv[i++]) : 10; /* Create the proactor and connect */ app.proactor = pn_proactor(); - pn_proactor_connect(app.proactor, pn_connection(), app.connection_address); + char addr[PN_MAX_ADDR]; + pn_proactor_addr(addr, sizeof(addr), app.host, app.port); + pn_proactor_connect(app.proactor, pn_connection(), addr); run(&app); pn_proactor_free(app.proactor); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/66cb35e1/examples/c/proactor/send.c ---------------------------------------------------------------------- diff --git a/examples/c/proactor/send.c b/examples/c/proactor/send.c index c21ac68..3324010 100644 --- a/examples/c/proactor/send.c +++ b/examples/c/proactor/send.c @@ -32,7 +32,7 @@ #include <stdlib.h> typedef struct app_data_t { - const char *connection_address; + const char *host, *port; const char *amqp_address; const char *container_id; int message_count; @@ -178,14 +178,17 @@ void run(app_data_t *app) { int main(int argc, char **argv) { struct app_data_t app = {0}; - app.container_id = argv[0]; /* Should be unique */ - app.connection_address = (argc > 1) ? argv[1] : "127.0.0.1:amqp"; - app.amqp_address = (argc > 2) ? argv[2] : "example"; - app.message_count = (argc > 3) ? atoi(argv[3]) : 10; + int i = 0; + app.container_id = argv[i++]; /* Should be unique */ + app.host = (argc > 1) ? argv[i++] : ""; + app.port = (argc > 1) ? argv[i++] : "amqp"; + app.amqp_address = (argc > i) ? argv[i++] : "example"; + app.message_count = (argc > i) ? atoi(argv[i++]) : 10; - /* Create the proactor and connect */ app.proactor = pn_proactor(); - pn_proactor_connect(app.proactor, pn_connection(), app.connection_address); + char addr[PN_MAX_ADDR]; + pn_proactor_addr(addr, sizeof(addr), app.host, app.port); + pn_proactor_connect(app.proactor, pn_connection(), addr); run(&app); pn_proactor_free(app.proactor); free(app.message_buffer.start); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/66cb35e1/examples/c/proactor/test.py ---------------------------------------------------------------------- diff --git a/examples/c/proactor/test.py b/examples/c/proactor/test.py index 4950cef..f62ea4e 100644 --- a/examples/c/proactor/test.py +++ b/examples/c/proactor/test.py @@ -35,8 +35,8 @@ class Broker(object): def __enter__(self): with TestPort() as port: - self.addr = "127.0.0.1:%s" % port - self.proc = self.test.proc(["broker", self.addr]) + self.port = port + self.proc = self.test.proc(["broker", "", self.port]) self.proc.wait_re("listening") return self @@ -52,35 +52,34 @@ class CExampleTest(ExampleTestCase): def test_send_receive(self): """Send first then receive""" with Broker(self) as b: - s = self.proc(["send", b.addr]) + s = self.proc(["send", "", b.port]) self.assertEqual("10 messages sent and acknowledged\n", s.wait_out()) - r = self.proc(["receive", b.addr]) + r = self.proc(["receive", "", b.port]) self.assertEqual(receive_expect(10), r.wait_out()) def test_receive_send(self): """Start receiving first, then send.""" with Broker(self) as b: - r = self.proc(["receive", b.addr]); - s = self.proc(["send", b.addr]); + r = self.proc(["receive", "", b.port]); + s = self.proc(["send", "", b.port]); self.assertEqual("10 messages sent and acknowledged\n", s.wait_out()) self.assertEqual(receive_expect(10), r.wait_out()) def test_send_direct(self): """Send to direct server""" with TestPort() as port: - addr = "127.0.0.1:%s" % port - d = self.proc(["direct", addr]) + d = self.proc(["direct", "", port]) d.wait_re("listening") - self.assertEqual("10 messages sent and acknowledged\n", self.proc(["send", addr]).wait_out()) + self.assertEqual("10 messages sent and acknowledged\n", self.proc(["send", "", port]).wait_out()) self.assertIn(receive_expect(10), d.wait_out()) def test_receive_direct(self): """Receive from direct server""" with TestPort() as port: addr = "127.0.0.1:%s" % port - d = self.proc(["direct", addr]) + d = self.proc(["direct", "", port]) d.wait_re("listening") - self.assertEqual(receive_expect(10), self.proc(["receive", addr]).wait_out()) + self.assertEqual(receive_expect(10), self.proc(["receive", "", port]).wait_out()) self.assertIn("10 messages sent and acknowledged\n", d.wait_out()) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/66cb35e1/proton-c/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/proton-c/CMakeLists.txt b/proton-c/CMakeLists.txt index b1de956..03ee538 100644 --- a/proton-c/CMakeLists.txt +++ b/proton-c/CMakeLists.txt @@ -501,7 +501,7 @@ if (PROACTOR STREQUAL "libuv" OR (NOT PROACTOR AND NOT PROACTOR_OK)) find_package(Libuv) if (LIBUV_FOUND) set (PROACTOR_OK libuv) - set (qpid-proton-proactor src/proactor/libuv.c) + set (qpid-proton-proactor src/proactor/libuv.c src/proactor/proactor.c) set (PROACTOR_LIBS ${Libuv_LIBRARIES}) set_source_files_properties (${qpid-proton-proactor} PROPERTIES # Skip COMPILE_LANGUAGE_FLAGS, libuv.h won't compile with --std=c99 http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/66cb35e1/proton-c/include/proton/netaddr.h ---------------------------------------------------------------------- diff --git a/proton-c/include/proton/netaddr.h b/proton-c/include/proton/netaddr.h new file mode 100644 index 0000000..21b79ff --- /dev/null +++ b/proton-c/include/proton/netaddr.h @@ -0,0 +1,76 @@ +#ifndef PROTON_NETADDR_H +#define PROTON_NETADDR_H + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <proton/import_export.h> +#include <proton/types.h> + +/** + * @file + * Use to examine the network address of proactor connections. + * + * @addtogroup proactor + * + * @{ + */ + +/** + * Network address of a proactor transport. + */ +typedef struct pn_netaddr_t pn_netaddr_t; + +/** + * Format a network address as a human-readable string in buf. + * + * @return the length of the string (excluding trailing '\0'), if >= size then + * the address was truncated. + */ +PNP_EXTERN int pn_netaddr_str(pn_netaddr_t *addr, char *buf, size_t size); + +/** + * Get the local address of a transport. Return NULL if not available. + */ +PNP_EXTERN pn_netaddr_t *pn_netaddr_local(pn_transport_t *t); + +/** + * Get the remote address of a transport. Return NULL if not available. + */ +PNP_EXTERN pn_netaddr_t *pn_netaddr_remote(pn_transport_t *t); + +struct sockaddr; + +/** + * On POSIX or Windows, get the underlying `struct sockaddr`. + * Return NULL if not available. + */ +PNP_EXTERN struct sockaddr *pn_netaddr_sockaddr(pn_netaddr_t *na); + +/** + * On POSIX or Windows, get the size of the underlying `struct sockaddr`. + * Return 0 if not available. + */ +PNP_EXTERN size_t pn_netaddr_socklen(pn_netaddr_t *na); + +/** + * @} + */ + +#endif // PROTON_NETADDR_H http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/66cb35e1/proton-c/include/proton/proactor.h ---------------------------------------------------------------------- diff --git a/proton-c/include/proton/proactor.h b/proton-c/include/proton/proactor.h index 8b193b5..4a3b3c7 100644 --- a/proton-c/include/proton/proactor.h +++ b/proton-c/include/proton/proactor.h @@ -20,6 +20,7 @@ * under the License. */ +#include <proton/condition.h> #include <proton/event.h> #include <proton/import_export.h> #include <proton/types.h> @@ -65,6 +66,23 @@ extern "C" { */ /** + * Size of buffer that can hold the largest connection or listening address. + */ +#define PN_MAX_ADDR 1060 + +/** + * Format a host:port address string for pn_proactor_connect() or pn_proactor_listen() + * + * @param[out] addr address is copied to this buffer, with trailing '\0' + * @param[in] size size of addr buffer + * @param[in] host network host name, DNS name or IP address + * @param[in] port network service name or decimal port number, e.g. "amqp" or "5672" + * @return the length of network address (excluding trailing '\0'), if >= size + * then the address was truncated + */ +PNP_EXTERN int pn_proactor_addr(char *addr, size_t size, const char *host, const char *port); + +/** * Create a proactor. Must be freed with pn_proactor_free() */ PNP_EXTERN pn_proactor_t *pn_proactor(void); @@ -80,42 +98,20 @@ PNP_EXTERN void pn_proactor_free(pn_proactor_t *proactor); * * @note Thread safe. * - * @param connection @p proactor *takes ownership* of @p connection and will + * @param[in] proactor the proactor object + * + * @param[in] connection @p proactor *takes ownership* of @p connection and will * automatically call pn_connection_free() after the final @ref * PN_TRANSPORT_CLOSED event is handled, or when pn_proactor_free() is * called. You can prevent the automatic free with * pn_proactor_release_connection() * - * @param[in] addr the network address in the form "host:port" or as a URL - * For a URL *only* the host and port fields are used, the rest is ignored. - * - * Three special cases are allowed: - * - * - "host": Connect to "host" on the standard AMQP port (5672). - * - ":port": Connect to the local host on "port" using the default protocol. - * - "": Connect to the local host on the AMQP port using the default protocol. - * - * @note The network address @p addr and AMQP address are different things. The - * network address enables connection to a remote host, the AMQP address - * identifies an AMQP node (such as a queue or topic) *after* you have - * established the connection. - * The special case ":port" connects to the local host via the default protocol. - * The special case "" connects to the local host on the AMQP standard port. - * - * It is common to combine the two into a URL like this: - * - * amqp[s]://user:pass@host:port/amqp_address - * - * The proactor will extract the host and port only. If you want to use other - * fields (e.g. to set up security) you must call the relevant functions on @p - * connection before pn_proactor_connect() and handle @ref PN_CONNECTION_BOUND - * to set up the @ref transport. - * - * @note Thread safe. - * - * @param[in] proactor the proactor object + * @param[in] addr the "host:port" network address, constructed by pn_proactor_addr() + * An empty host will connect to the local host via the default protocol. + * An empty port will connect to the standard AMQP port (5672). * * @param[in] connection @ref connection to be connected to @p addr. + * */ PNP_EXTERN void pn_proactor_connect(pn_proactor_t *proactor, pn_connection_t *connection, const char *addr); @@ -131,14 +127,9 @@ PNP_EXTERN void pn_proactor_connect(pn_proactor_t *proactor, pn_connection_t *co * automatically call pn_listener_free() after the final PN_LISTENER_CLOSE event * is handled, or when pn_proactor_free() is called. * - * @param[in] addr the network address in the form "host:port" or as a URL - * For a URL *only* the host and port fields are used, the rest is ignored. - * - * Three special cases are allowed: - * - * - "host": Listen on the standard AMQP port (5672) on the interface and protocol identified by "host" - * - ":port": Listen on "port", on all local interfaces, for all protocols. - * - "": Listen on the standard AMQP port, on all local interfaces, for all protocols. + * @param[in] addr the "host:port" network address, constructed by pn_proactor_addr() + * An empty host will listen for all protocols on all local interfaces. + * An empty port will listen on the standard AMQP port (5672). * @param[in] backlog of un-handled connection requests to allow before refusing * connections. If @p addr resolves to multiple interface/protocol combinations, @@ -285,56 +276,6 @@ PNP_EXTERN pn_proactor_t *pn_connection_proactor(pn_connection_t *connection); PNP_EXTERN pn_proactor_t *pn_event_proactor(pn_event_t *event); /** - * Stores a network address in native format. - */ -typedef struct pn_proactor_addr_t pn_proactor_addr_t; - -/** - * Format a network address as a human-readable string in buf. - * - * @return the length of the full address string (including trailing NUL). The - * string is copied to buf. If the address string is longer than len it is - * truncated to len-1 bytes, but the full length is returned. - * - * If len == 0 the length of the address string is returned, buf is ignored. - * - * If @p addr is not a pointer to a valid address, buf is set to "" and 0 is returned. - * - * @note Thread safe. - */ -PNP_EXTERN size_t pn_proactor_addr_str(const pn_proactor_addr_t* addr, char *buf, size_t len); - -/** - * Get the local address of a transport. - * - * @return NULL if the address is not available. Address is immutable, returned - * pointer is valid until @p transport is closed. - * - * @note Thread safe. - */ -PNP_EXTERN const pn_proactor_addr_t *pn_proactor_addr_local(pn_transport_t* c); - -/** - * Get the remote address of a transport. - * - * @return NULL if the address is not available. Address is immutable, returned - * pointer is valid until @p transport is closed. - * - * @note Thread safe. - */ -PNP_EXTERN const pn_proactor_addr_t *pn_proactor_addr_remote(pn_transport_t* c); - -/** - * If the proactor implementation uses `struct sockaddr` (for example on POSIX - * or Windows sockets) return a pointer to a `struct sockaddr_storage` - * containing the address info, otherwise return NULL. - * - * @note Thread safe. - */ -PNP_EXTERN const struct sockaddr_storage *pn_proactor_addr_sockaddr(const pn_proactor_addr_t *addr); - - -/** * Get the real elapsed time since an arbitrary point in the past in milliseconds. * * This may be used as a portable way to get a timestamp for the current time. It is monotonically http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/66cb35e1/proton-c/src/proactor/epoll.c ---------------------------------------------------------------------- diff --git a/proton-c/src/proactor/epoll.c b/proton-c/src/proactor/epoll.c index 6f9e237..304644f 100644 --- a/proton-c/src/proactor/epoll.c +++ b/proton-c/src/proactor/epoll.c @@ -1727,22 +1727,22 @@ void pn_proactor_disconnect(pn_proactor_t *p, pn_condition_t *cond) { wake_notify(&p->context); } -const struct sockaddr_storage *pn_proactor_addr_sockaddr(const pn_proactor_addr_t *addr) { +const struct sockaddr_storage *pn_netaddr_sockaddr(const pn_netaddr_t *addr) { assert(false); return NULL; } -const struct pn_proactor_addr_t *pn_proactor_addr_local(pn_transport_t *t) { +const struct pn_netaddr_t *pn_netaddr_local(pn_transport_t *t) { assert(false); return NULL; } -const struct pn_proactor_addr_t *pn_proactor_addr_remote(pn_transport_t *t) { +const struct pn_netaddr_t *pn_netaddr_remote(pn_transport_t *t) { assert(false); return NULL; } -size_t pn_proactor_addr_str(const struct pn_proactor_addr_t* addr, char *buf, size_t len) { +size_t pn_netaddr_str(const struct pn_netaddr_t* addr, char *buf, size_t len) { struct sockaddr_storage *sa = (struct sockaddr_storage*)addr; char host[NI_MAXHOST]; char port[NI_MAXSERV]; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/66cb35e1/proton-c/src/proactor/libuv.c ---------------------------------------------------------------------- diff --git a/proton-c/src/proactor/libuv.c b/proton-c/src/proactor/libuv.c index 5a7f97d..14682c4 100644 --- a/proton-c/src/proactor/libuv.c +++ b/proton-c/src/proactor/libuv.c @@ -20,13 +20,14 @@ */ #include "../core/log_private.h" -#include "../core/url-internal.h" +#include "proactor-internal.h" #include <proton/condition.h> #include <proton/connection_driver.h> #include <proton/engine.h> #include <proton/listener.h> #include <proton/message.h> +#include <proton/netaddr.h> #include <proton/object.h> #include <proton/proactor.h> #include <proton/transport.h> @@ -132,11 +133,9 @@ static void work_init(work_t* w, pn_proactor_t* p, struct_type type) { /* ================ IO ================ */ -#define MAXADDR (NI_MAXHOST+NI_MAXSERV) - /* A resolvable address */ typedef struct addr_t { - char addr[MAXADDR]; + char host_port[PN_MAX_ADDR]; char *host, *port; /* Point into addr after destructive pni_url_parse */ uv_getaddrinfo_t getaddrinfo; /* UV getaddrinfo request, contains list of addrinfo */ struct addrinfo* addrinfo; /* The current addrinfo being tried */ @@ -153,6 +152,10 @@ PN_STRUCT_CLASSDEF(lsocket, CID_pn_listener_socket) typedef enum { W_NONE, W_PENDING, W_CLOSED } wake_state; +struct pn_netaddr_t { + struct sockaddr_storage ss; +}; + /* An incoming or outgoing connection. */ typedef struct pconnection_t { work_t work; /* Must be first to allow casting */ @@ -170,7 +173,7 @@ typedef struct pconnection_t { lsocket_t *lsocket; /* Incoming connection only */ - struct sockaddr_storage local, remote; /* Actual addresses */ + struct pn_netaddr_t local, remote; /* Actual addresses */ uv_timer_t timer; uv_write_t write; size_t writing; /* size of pending write request, 0 if none pending */ @@ -277,9 +280,9 @@ static void work_start(work_t *w) { } static void parse_addr(addr_t *addr, const char *str) { - strncpy(addr->addr, str, sizeof(addr->addr)); - char *scheme, *user, *pass, *path; - pni_parse_url(addr->addr, &scheme, &user, &pass, &addr->host, &addr->port, &path); + strncpy(addr->host_port, str, sizeof(addr->host_port)); + addr->host = addr->host_port; + addr->port = pni_split_host_port(addr->host_port); } /* Make a pn_class for pconnection_t since it is attached to a pn_connection_t record */ @@ -313,12 +316,10 @@ static pconnection_t *pconnection(pn_proactor_t *p, pn_connection_t *c, bool ser if (server) { pn_transport_set_server(pc->driver.transport); } - pc->addr.host = pc->addr.port = pc->addr.addr; /* Set host/port to "" by default */ pn_record_t *r = pn_connection_attachments(pc->driver.connection); pn_record_def(r, PN_PROACTOR, &pconnection_class); pn_record_set(r, PN_PROACTOR, pc); pn_decref(pc); /* Will be deleted when the connection is */ - pc->addr.host = pc->addr.port = pc->addr.addr; /* Set host/port to "" by default */ return pc; } @@ -388,7 +389,8 @@ static void on_close_pconnection_final(uv_handle_t *h) { } static void uv_safe_close(uv_handle_t *h, uv_close_cb cb) { - if (!uv_is_closing(h)) { + /* Only close if h has been initialized and is not already closing */ + if (h->type && !uv_is_closing(h)) { uv_close(h, cb); } } @@ -1267,25 +1269,29 @@ void pn_listener_accept(pn_listener_t *l, pn_connection_t *c) { work_notify(&l->work); } -const struct sockaddr_storage *pn_proactor_addr_sockaddr(const pn_proactor_addr_t *addr) { - return (const struct sockaddr_storage*)addr; +struct sockaddr *pn_netaddr_sockaddr(pn_netaddr_t *na) { + return (struct sockaddr*)na; +} + +size_t pn_netaddr_socklen(pn_netaddr_t *na) { + return sizeof(struct sockaddr_storage); } -const struct pn_proactor_addr_t *pn_proactor_addr_local(pn_transport_t *t) { +pn_netaddr_t *pn_netaddr_local(pn_transport_t *t) { pconnection_t *pc = get_pconnection(pn_transport_connection(t)); - return pc ? (pn_proactor_addr_t*)&pc->local : NULL; + return pc? &pc->local : NULL; } -const struct pn_proactor_addr_t *pn_proactor_addr_remote(pn_transport_t *t) { +pn_netaddr_t *pn_netaddr_remote(pn_transport_t *t) { pconnection_t *pc = get_pconnection(pn_transport_connection(t)); - return pc ? (pn_proactor_addr_t*)&pc->remote : NULL; + return pc ? &pc->remote : NULL; } -size_t pn_proactor_addr_str(const struct pn_proactor_addr_t* addr, char *buf, size_t len) { - struct sockaddr_storage *sa = (struct sockaddr_storage*)addr; +int pn_netaddr_str(struct pn_netaddr_t* na, char *buf, size_t len) { char host[NI_MAXHOST]; char port[NI_MAXSERV]; - int err = getnameinfo((struct sockaddr *)sa, sizeof(*sa), host, sizeof(host), port, sizeof(port), + int err = getnameinfo((struct sockaddr *)&na->ss, sizeof(na->ss), + host, sizeof(host), port, sizeof(port), NI_NUMERICHOST | NI_NUMERICSERV); if (!err) { return snprintf(buf, len, "%s:%s", host, port); /* FIXME aconway 2017-03-29: ipv6 format? */ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/66cb35e1/proton-c/src/proactor/proactor-internal.h ---------------------------------------------------------------------- diff --git a/proton-c/src/proactor/proactor-internal.h b/proton-c/src/proactor/proactor-internal.h new file mode 100644 index 0000000..bf79cd5 --- /dev/null +++ b/proton-c/src/proactor/proactor-internal.h @@ -0,0 +1,37 @@ +#ifndef PROACTOR_NETADDR_INTERNAL_H +#define PROACTOR_NETADDR_INTERNAL_H + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Given a "host:port" string, split it in-place like so: + * + * char *host = host_port; + * char *port = pn_split_host_port(host_port); + * + * Note this modifies the original host_port string by replacing the last ':' + * character with '\0'. + * + * If there is no ':', the returned pointer is an empty string, not NULL. + * + */ +char* pni_split_host_port(char *host_port); + +#endif // PROACTOR_NETADDR_INTERNAL_H http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/66cb35e1/proton-c/src/proactor/proactor.c ---------------------------------------------------------------------- diff --git a/proton-c/src/proactor/proactor.c b/proton-c/src/proactor/proactor.c new file mode 100644 index 0000000..a7939e8 --- /dev/null +++ b/proton-c/src/proactor/proactor.c @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +/* Common platform-independent implementation for proactor libraries */ + +#include "proactor-internal.h" +#include <proton/proactor.h> + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + + +int pn_proactor_addr(char *buf, size_t len, const char *host, const char *port) { + return snprintf(buf, len, "%s:%s", host ? host : "", port ? port : ""); +} + +char* pni_split_host_port(char *host_port) { + char *port = strrchr(host_port, ':'); + if (port) { + *port = '\0'; + ++port; + } else { + port = host_port + strlen(host_port); /* Empty string, point to trailing \0 */ + } + return port; +} + http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/66cb35e1/proton-c/src/tests/proactor.c ---------------------------------------------------------------------- diff --git a/proton-c/src/tests/proactor.c b/proton-c/src/tests/proactor.c index 1365745..8cf0d26 100644 --- a/proton-c/src/tests/proactor.c +++ b/proton-c/src/tests/proactor.c @@ -23,6 +23,7 @@ #include <proton/connection.h> #include <proton/event.h> #include <proton/listener.h> +#include <proton/netaddr.h> #include <proton/proactor.h> #include <proton/ssl.h> #include <proton/transport.h> @@ -521,7 +522,7 @@ static void test_ipv4_ipv6(test_t *t) { pn_proactor_t *client = pts[0].proactor, *server = pts[1].proactor; /* Listen on all interfaces for IPv6 only. If this fails, skip IPv6 tests */ - test_port_t port6 = test_port("[::]"); + test_port_t port6 = test_port("::"); pn_listener_t *l6 = pn_listener(); pn_proactor_listen(server, l6, port6.host_port, 4); TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts)); @@ -573,12 +574,12 @@ static void test_ipv4_ipv6(test_t *t) { EXPECT_CONNECT(port, ""); /* local->all */ if (has_ipv6) { - EXPECT_CONNECT(port6, "[::]"); /* v6->v6 */ + EXPECT_CONNECT(port6, "::"); /* v6->v6 */ EXPECT_CONNECT(port6, ""); /* local->v6 */ - EXPECT_CONNECT(port, "[::1]"); /* v6->all */ + EXPECT_CONNECT(port, "::1"); /* v6->all */ EXPECT_FAIL(port6, "127.0.0.1"); /* fail v4->v6 */ - EXPECT_FAIL(port4, "[::1]"); /* fail v6->v4 */ + EXPECT_FAIL(port4, "::1"); /* fail v6->v4 */ } PROACTOR_TEST_DRAIN(pts); @@ -696,22 +697,37 @@ static void test_ssl(test_t *t) { PROACTOR_TEST_FREE(pts); } +static void test_proactor_addr(test_t *t) { + /* Test the address formatter */ + char addr[PN_MAX_ADDR]; + pn_proactor_addr(addr, sizeof(addr), "foo", "bar"); + TEST_STR_EQUAL(t, "foo:bar", addr); + pn_proactor_addr(addr, sizeof(addr), "foo", ""); + TEST_STR_EQUAL(t, "foo:", addr); + pn_proactor_addr(addr, sizeof(addr), "foo", NULL); + TEST_STR_EQUAL(t, "foo:", addr); + pn_proactor_addr(addr, sizeof(addr), "", "bar"); + TEST_STR_EQUAL(t, ":bar", addr); + pn_proactor_addr(addr, sizeof(addr), NULL, "bar"); + TEST_STR_EQUAL(t, ":bar", addr); + pn_proactor_addr(addr, sizeof(addr), "1:2:3:4", "5"); + TEST_STR_EQUAL(t, "1:2:3:4:5", addr); + pn_proactor_addr(addr, sizeof(addr), "1:2:3:4", ""); + TEST_STR_EQUAL(t, "1:2:3:4:", addr); + pn_proactor_addr(addr, sizeof(addr), "1:2:3:4", NULL); + TEST_STR_EQUAL(t, "1:2:3:4:", addr); +} /* Test pn_proactor_addr funtions */ /* FIXME aconway 2017-03-30: windows will need winsock2.h etc. - These headers are *only* needed for test_addr and only for the getnameinfo part. + These headers are *only* needed for test_netaddr and only for the getnameinfo part. This is the only non-portable part of the proactor test suite. */ #include <sys/socket.h> /* For socket_storage */ #include <netdb.h> /* For NI_MAXHOST/NI_MAXSERV */ -static void test_addr(test_t *t) { - /* Make sure NULL addr gives empty string */ - char str[1024] = "not-empty"; - pn_proactor_addr_str(NULL, str, sizeof(str)); - TEST_STR_EQUAL(t, "", str); - +static void test_netaddr(test_t *t) { proactor_test_t pts[] ={ { open_wake_handler }, { listen_handler } }; PROACTOR_TEST_INIT(pts, t); pn_proactor_t *client = pts[0].proactor, *server = pts[1].proactor; @@ -727,35 +743,35 @@ static void test_addr(test_t *t) { char cr[1024], cl[1024], sr[1024], sl[1024]; pn_transport_t *ct = pn_connection_transport(c); - pn_proactor_addr_str(pn_proactor_addr_remote(ct), cr, sizeof(cr)); + pn_netaddr_str(pn_netaddr_remote(ct), cr, sizeof(cr)); TEST_STR_IN(t, test_port_use_host(&port, ""), cr); /* remote address has listening port */ pn_connection_t *s = last_accepted; /* server side of the connection */ pn_transport_t *st = pn_connection_transport(s); if (!TEST_CHECK(t, st)) return; - pn_proactor_addr_str(pn_proactor_addr_local(st), sl, sizeof(sl)); + pn_netaddr_str(pn_netaddr_local(st), sl, sizeof(sl)); TEST_STR_EQUAL(t, cr, sl); /* client remote == server local */ - pn_proactor_addr_str(pn_proactor_addr_local(ct), cl, sizeof(cl)); - pn_proactor_addr_str(pn_proactor_addr_remote(st), sr, sizeof(sr)); + pn_netaddr_str(pn_netaddr_local(ct), cl, sizeof(cl)); + pn_netaddr_str(pn_netaddr_remote(st), sr, sizeof(sr)); TEST_STR_EQUAL(t, cl, sr); /* client local == server remote */ /* Examine as sockaddr */ - const struct sockaddr_storage* addr = pn_proactor_addr_sockaddr(pn_proactor_addr_remote(ct)); - TEST_CHECK(t, AF_INET == addr->ss_family); + pn_netaddr_t *na = pn_netaddr_remote(ct); + struct sockaddr *sa = pn_netaddr_sockaddr(na); + TEST_CHECK(t, AF_INET == sa->sa_family); char host[NI_MAXHOST] = ""; char serv[NI_MAXSERV] = ""; - int err = getnameinfo((struct sockaddr*)addr, sizeof(*addr), - host, sizeof(host), - serv, sizeof(serv), + int err = getnameinfo(sa, pn_netaddr_socklen(na), + host, sizeof(host), serv, sizeof(serv), NI_NUMERICHOST | NI_NUMERICSERV); TEST_CHECK(t, 0 == err); TEST_STR_EQUAL(t, "127.0.0.1", host); TEST_STR_EQUAL(t, port.str, serv); /* Make sure you can use NULL, 0 to get length of address string without a crash */ - size_t len = pn_proactor_addr_str(pn_proactor_addr_local(ct), NULL, 0); - TEST_CHECK(t, strlen(cl) == len); + size_t len = pn_netaddr_str(pn_netaddr_local(ct), NULL, 0); + TEST_CHECKF(t, strlen(cl) == len, "%d != %d", strlen(cl), len); sock_close(port.sock); PROACTOR_TEST_DRAIN(pts); @@ -848,7 +864,8 @@ int main(int argc, char **argv) { RUN_ARGV_TEST(failed, t, test_ipv4_ipv6(&t)); RUN_ARGV_TEST(failed, t, test_release_free(&t)); RUN_ARGV_TEST(failed, t, test_ssl(&t)); - RUN_ARGV_TEST(failed, t, test_addr(&t)); + RUN_ARGV_TEST(failed, t, test_proactor_addr(&t)); + RUN_ARGV_TEST(failed, t, test_netaddr(&t)); RUN_ARGV_TEST(failed, t, test_disconnect(&t)); RUN_ARGV_TEST(failed, t, test_abort(&t)); RUN_ARGV_TEST(failed, t, test_refuse(&t)); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org