Author: bpintea Date: 2008-08-25 17:59:24 +0200 (Mon, 25 Aug 2008) New Revision: 1076
Added: trunk/core/plug-in/binrpcctrl/ConnPool.cpp trunk/core/plug-in/binrpcctrl/ConnPool.h trunk/core/plug-in/binrpcctrl/CtrlServer.cpp trunk/core/plug-in/binrpcctrl/CtrlServer.h Modified: trunk/core/plug-in/binrpcctrl/BrpcCtrlInterface.cpp trunk/core/plug-in/binrpcctrl/BrpcCtrlInterface.h Log: - Content-Type header is now always set (see http://lists.iptel.org/pipermail/semsdev/2008-March/002379.html) - implemented a connection pool, used for SEMS->SER message xchange initiated by SEMS; this pairs with SEMS' one-thread-per-session design and should boost throughput, as it only used one main socket before - added multi-threaded receiver, for SER->SEMS xchange, pairing with SER's multiprocess design and moving from the single-listener 1st implementation. Modified: trunk/core/plug-in/binrpcctrl/BrpcCtrlInterface.cpp =================================================================== --- trunk/core/plug-in/binrpcctrl/BrpcCtrlInterface.cpp 2008-08-24 17:07:26 UTC (rev 1075) +++ trunk/core/plug-in/binrpcctrl/BrpcCtrlInterface.cpp 2008-08-25 15:59:24 UTC (rev 1076) @@ -17,24 +17,22 @@ #define LISTEN_ADDR_PARAM "sems_address" #define SER_ADDR_PARAM "ser_address" +#define CT_TIMEOUT_PARAM "connect_timeout" +#define RX_TIMEOUT_PARAM "receive_timeout" +#define TX_TIMEOUT_PARAM "transmit_timeout" +#define RX_WORKERS_PARAM "receive_workers" +#define TX_WORKERS_PARAM "transmit_workers" #define LISTEN_ADDR_DEFAULT "brpcnd://127.0.0.1:3334" #define SER_ADDR_DEFAULT "brpcnd://127.0.0.1:1089" #define BRPC_CB_HASH_SIZE 16 -#define ASI_VERSION 0x2 -#define SND_USOCK_TEMPLATE "/tmp/sems_send_sock_XXXXXX" //TODO: configurable -#define MAX_RETRY_ON_ERR 5 -//TODO: configurable -#define CT_TIMEOUT 500000 -#define RX_TIMEOUT 500000 /* 50000 */ -#define TX_TIMEOUT 200000 +#define CT_TIMEOUT 500 // ms +#define RX_TIMEOUT 500 // ms +#define TX_TIMEOUT 200 // ms +#define RX_WORKERS 8 +#define TX_WORKERS 8 -#ifndef UNIX_PATH_MAX -#include <sys/un.h> -#define UNIX_PATH_MAX sizeof(((struct sockaddr_un *)0)->sun_path) -#endif - #define STX 0x02 #define ETX 0x03 #define SUB 0x21 @@ -82,13 +80,13 @@ const BRPC_STR_STATIC_INIT(SER_DFMT_TO_TAG, "@to.tag"); const BRPC_STR_STATIC_INIT(SER_DFMT_CSEQ_NUM, "@cseq.num"); const BRPC_STR_STATIC_INIT(SER_DFMT_RR_ALL, "@hf_value.record_route"); -const BRPC_STR_STATIC_INIT(SER_DFMT_RR_1ST, "@hf_value.record_route[1]"); const BRPC_STR_STATIC_INIT(SER_DFMT_BODY, "@msg.body"); const BRPC_STR_STATIC_INIT(SER_DFMT_CMD, "$sems_cmd"); const BRPC_STR_STATIC_INIT(SER_DFMT_HDRS, "$sems_hdrs"); //aditionals, for replies const BRPC_STR_STATIC_INIT(SER_DFMT_CODE, "@code"); const BRPC_STR_STATIC_INIT(SER_DFMT_REASON, "@reason"); +const BRPC_STR_STATIC_INIT(SER_DFMT_CONTTYPE, "@hf_value.content_type"); static const brpc_str_t *SIP_CORE_METHODS[] = { @@ -117,7 +115,7 @@ &SER_DFMT_TO_TAG, &SER_DFMT_CSEQ_NUM, &SER_DFMT_RR_ALL, - &SER_DFMT_RR_1ST, + &SER_DFMT_CONTTYPE, &SER_DFMT_BODY, &SER_DFMT_CMD, &SER_DFMT_HDRS @@ -127,11 +125,11 @@ &SER_DFMT_CODE, &SER_DFMT_REASON, &SER_DFMT_CONTACT_URI, - &SER_DFMT_RR_1ST, &SER_DFMT_RR_ALL, &SER_DFMT_FROM_TAG, &SER_DFMT_TO_TAG, &SER_DFMT_CSEQ_NUM, + &SER_DFMT_CONTTYPE, &SER_DFMT_HDRS, &SER_DFMT_BODY }; @@ -151,10 +149,11 @@ /* WARN: must remain sync'ed with SER's enum ASI_REQ_FLAGS! */ enum SIP_REQ_FLAGS { - SIP_REQ_ACK_FLG = 1 << 0, - SIP_REQ_FIN_FLG = 1 << 1, - SIP_REQ_PRV_FLG = 1 << 2, - SIP_REQ_RUN_ORR = 1 << 3, + SIPREQ_GET_ACK_FLG = 1 << 0, + SIPREQ_GET_FIN_FLG = 1 << 1, + SIPREQ_GET_PRV_FLG = 1 << 2, + SIPREQ_RUN_ORR_FLG = 1 << 3, + SIPREQ_DEL_1ST_FLG = 1 << 4, }; @@ -183,87 +182,63 @@ #define CONFIRM_RECEPTION 0 + +static brpc_tv_t ct_timeout = CT_TIMEOUT * 1000; +static brpc_tv_t rx_timeout = RX_TIMEOUT * 1000; +static brpc_tv_t tx_timeout = TX_TIMEOUT * 1000; +static unsigned rx_workers; +static unsigned tx_workers; + // time_t BrpcCtrlInterface::serial = -1; // brpc_int_t BrpcCtrlInterface::as_id = -1; BrpcCtrlInterfaceFactory::BrpcCtrlInterfaceFactory(const string &name) : AmCtrlInterfaceFactory(name) -{ -} +{} BrpcCtrlInterfaceFactory::~BrpcCtrlInterfaceFactory() -{ -} +{} AmCtrlInterface* BrpcCtrlInterfaceFactory::instance() { BrpcCtrlInterface* ctrl = new BrpcCtrlInterface(); - + if(ctrl->init(semsUri,serUri) < 0){ - delete ctrl; - return NULL; + delete ctrl; + return NULL; } return ctrl; } -BrpcCtrlInterface::BrpcCtrlInterface() - : semsFd(-1), - serFd(-1), - serial(-1), - as_id(-1) -{ - memset(&sndAddr, 0, sizeof(sndAddr)); -} +BrpcCtrlInterface::BrpcCtrlInterface() : + serial(-1), + as_id(-1) +{} + BrpcCtrlInterface::~BrpcCtrlInterface() { - closeSock(&serFd, &sndAddr); - closeSock(&semsFd, &semsAddr); + delete serConn; + delete ctrlSrv; } -static int init_listener(const string &semsUri, brpc_addr_t *semsAddr) -{ - brpc_addr_t *addr; - int sockfd; - - if (! (addr = brpc_parse_uri(semsUri.c_str()))) { - ERROR("failed to parse BINRPC URI `%s': %s [%d].\n", semsUri.c_str(), - brpc_strerror(), brpc_errno); - return -1; - } else if (BRPC_ADDR_TYPE(addr) != SOCK_DGRAM) { - //b/c we'd have to do connection management otherwise (listen for - //connections, poll for activity on each descriptor etc); for now, not - //relly needed. - ERROR("only datagram listeners supported.\n"); - return -1; - } - - if ((sockfd = brpc_socket(addr, /*blocking*/false, /*bind*/true)) < 0) { - ERROR("failed to get listen socket for URI `%s': %s [%d].\n", - semsUri.c_str(), brpc_strerror(), brpc_errno); - return -1; - } - *semsAddr = *addr; - return sockfd; -} - int BrpcCtrlInterface::init(const string& semsUri, const string& serUri) { - brpc_addr_t *addr; - - if ((semsFd = init_listener(semsUri, &semsAddr)) < 0) { - ERROR("failed to initialize BINRPC listening socket.\n"); + try { + serConn = new ConnPool(serUri, tx_workers, ct_timeout); + } catch (string errmsg) { + ERROR("failed to initialize SER connection pool: %s.\n", errmsg.c_str()); return -1; } - if (! (addr = brpc_parse_uri(serUri.c_str()))) { - ERROR("failed to parse BINRPC URI `%s': %s [%d].\n", serUri.c_str(), - brpc_strerror(), brpc_errno); + try { + ctrlSrv = new CtrlServer(semsUri, rx_workers, rx_timeout, tx_timeout); + } catch (string errmsg) { + ERROR("failed to initialize binRPC server: %s.\n", errmsg.c_str()); + delete serConn; return -1; - } else { - serAddr = *addr; } sipDispatcher = AmSipDispatcher::instance(); @@ -274,6 +249,7 @@ int BrpcCtrlInterfaceFactory::onLoad() { AmConfigReader cfg; + unsigned int ct_to, rx_to, tx_to; if (cfg.loadFile(AmConfig::ModConfigPath + string(MOD_NAME ".conf"))) { WARN("failed to read/parse config file `%s' - assuming defaults\n", @@ -283,99 +259,53 @@ } else { semsUri = cfg.getParameter(LISTEN_ADDR_PARAM, LISTEN_ADDR_DEFAULT); serUri = cfg.getParameter(SER_ADDR_PARAM, SER_ADDR_DEFAULT); - } - INFO(LISTEN_ADDR_PARAM ": %s.\n", semsUri.c_str()); - INFO(SER_ADDR_PARAM ": %s.\n", serUri.c_str()); - return 0; -} + if (str2i(cfg.getParameter(CT_TIMEOUT_PARAM, int2str(CT_TIMEOUT)), + ct_to)) { + ERROR("failed to read `%s' param from config file.\n", CT_TIMEOUT_PARAM); + return -1; + } else { + ct_timeout = ct_to * 1000; + } + if (str2i(cfg.getParameter(RX_TIMEOUT_PARAM, int2str(RX_TIMEOUT)), + rx_to)) { + ERROR("failed to read `%s' param from config file.\n", RX_TIMEOUT_PARAM); + return -1; + } else { + rx_timeout = rx_to * 1000; + } + if (str2i(cfg.getParameter(TX_TIMEOUT_PARAM, int2str(TX_TIMEOUT)), + tx_to)) { + ERROR("failed to read `%s' param from config file.\n", TX_TIMEOUT_PARAM); + return -1; + } else { + tx_timeout = tx_to * 1000; + } -int BrpcCtrlInterface::getSerFd() -{ - if (0 <= serFd) - return serFd; - - //if using local sockets, we need to bind to a local socket when sending - //out requests, so that SER knows where to send back replies for our - //requests (unlike INET layers PF_LOCAL+DGRAM based communication needs to - //bind.the sending socket in order to have this socket receive replies) - if (BRPC_ADDR_DOMAIN(&serAddr) == PF_LOCAL) { - //There's a race the loop tries to work around: - //1. mkstemp creates & opens a temp file. brpc_socket() (called - //later), removes the temporary file (unlink) and opens a unix - //socket with the same name. - //So, between unlink() and socket(), some other mkstemp could - //"ocupy" the name. - int errcnt = -1; - do { - if (MAX_RETRY_ON_ERR < ++errcnt) { - ERROR("%dth consecutive failed attempt to create a local domain " - "socket for sending req - giving up.\n", errcnt); - return -1; - } else if (1 < errcnt) { //the previous brpc_socket() attempt failed - ERROR("failed to create BINRPC socket: %s [%d].\n", brpc_strerror(), - brpc_errno); - } - char buff[UNIX_PATH_MAX]; - assert(sizeof(SND_USOCK_TEMPLATE) <= sizeof(buff)); - memcpy(buff, SND_USOCK_TEMPLATE, sizeof(SND_USOCK_TEMPLATE)); - int tmpfd = mkstemp(buff); - if (tmpfd < 0) { - ERROR("failed to create temporary file with template `%s': %s [%d].\n", - SND_USOCK_TEMPLATE, strerror(errno), errno); - continue; - } else { - //close the FD - only the modified buff is of worth - close(tmpfd); - } - sndAddr = serAddr; //copy domain, socket type - memcpy(BRPC_ADDR_UN(&sndAddr)->sun_path, buff, strlen(buff) + - /*0-term*/1); - BRPC_ADDR_LEN(&sndAddr) = SUN_LEN(BRPC_ADDR_UN(&sndAddr)); - DBG("creating temporary send socket bound to `%s'.\n", buff); - } while ((serFd = brpc_socket(&sndAddr, /*blocking*/false, - /*named*/true)) < 0); - /* TODO: permission, UID/GID of the created socket */ - } else { - if ((serFd = brpc_socket(&serAddr, /*blk*/false, /*named*/false)) < 0) { - ERROR("failed to create BINRPC socket: %s [%d].\n", brpc_strerror(), - brpc_errno); + if (str2i(cfg.getParameter(RX_WORKERS_PARAM, int2str(RX_WORKERS)), + rx_workers)) { + ERROR("failed to read `%s' param from config file.\n", RX_WORKERS_PARAM); return -1; } + if (str2i(cfg.getParameter(TX_WORKERS_PARAM, int2str(TX_WORKERS)), + tx_workers)) { + ERROR("failed to read `%s' param from config file.\n", TX_WORKERS_PARAM); + return -1; + } } - if (! brpc_connect(&serAddr, &serFd, CT_TIMEOUT)) { - ERROR("failed to connect to SER: %s [%d].\n", brpc_strerror(), brpc_errno); - close(serFd); - serFd = -1; - } - return serFd; + INFO(LISTEN_ADDR_PARAM ": %s.\n", semsUri.c_str()); + INFO(SER_ADDR_PARAM ": %s.\n", serUri.c_str()); + INFO(CT_TIMEOUT_PARAM ": %uus.\n", (unsigned)ct_timeout); + INFO(RX_TIMEOUT_PARAM ": %uus.\n", (unsigned)rx_timeout); + INFO(TX_TIMEOUT_PARAM ": %uus.\n", (unsigned)tx_timeout); + INFO(RX_WORKERS_PARAM ": %u.\n", rx_workers); + INFO(TX_WORKERS_PARAM ": %u.\n", tx_workers); + + return 0; } -void BrpcCtrlInterface::closeSock(int *sock, brpc_addr_t *addr) -{ - DBG("closing FD#%d for %s.\n", *sock, - BRPC_ADDR_DOMAIN(addr) ? brpc_print_addr(addr) : "[INET/INET6 sender]"); - if (*sock < 0) { - WARN("connection not oppened, so can not be closed.\n"); - return; - } - - if (close(*sock) < 0) - WARN("FD closed uncleanly: %s [%d].\n", strerror(errno), errno); - - if (BRPC_ADDR_DOMAIN(addr) == PF_LOCAL) { - if (unlink(BRPC_ADDR_UN(addr)->sun_path) < 0) { - ERROR("failed to remove unix socket file '%s': %s [%d].\n", - BRPC_ADDR_UN(addr)->sun_path, strerror(errno), errno); - } - } - - DBG("socket %d closed.\n", *sock); - *sock = -1; -} - bool BrpcCtrlInterface::initCallbacks() { if (! brpc_cb_init(BRPC_CB_HASH_SIZE, /*no reply handling*/0)) { @@ -577,7 +507,7 @@ GOTOERR(CODE_RPC_INVALID); // check if not a bogus call - for (i = 0; i < sizeof(SIP_CORE_METHODS)/sizeof(brpc_str_t *); i ++) { + for (i = 0; i < sizeof(SIP_CORE_METHODS)/sizeof(brpc_str_t *); i ++) { if ((mname->len == SIP_CORE_METHODS[i]->len) && (strncmp(mname->val, SIP_CORE_METHODS[i]->val, mname->len) == 0)) { break; @@ -643,19 +573,19 @@ brpc_t *rpl = NULL; brpc_str_t *reason; brpc_int_t *code; - brpc_addr_t from = serAddr; //avoid a syscall to find out socket type + brpc_addr_t from = serConn->txAddr; //avoid a syscall to find socket type brpc_id_t req_id; + int serFd; - if (getSerFd() < 0) { + if ((serFd = serConn->get()) < 0) { ERROR("no connection to SER available.\n"); goto end; } - assert(0 <= serFd); - if (! brpc_sendto(serFd, &serAddr, req, TX_TIMEOUT)) { + if (! brpc_sendto(serFd, &serConn->txAddr, req, tx_timeout)) { ERROR("failed to send msg to SER: %s [%d].\n", brpc_strerror(), brpc_errno); - closeSock(&serFd, &sndAddr); + serConn->destroy(serFd); goto end; } else { req_id = req->id; @@ -664,16 +594,17 @@ } /* receive from queue until empty, if IDs do not match */ - while ((rpl = brpc_recvfrom(serFd, &from, RX_TIMEOUT))) { + while ((rpl = brpc_recvfrom(serFd, &from, rx_timeout))) { if (req_id == rpl->id) break; - ERROR("received reply's ID (#%d) doesn't match request's - discarded (%d)", - brpc_id(rpl), req_id); + ERROR("received reply's ID (#%d) doesn't match request's - " + "discarded (%d).\n", brpc_id(rpl), req_id); brpc_finish(rpl); } + serConn->release(serFd); if (! rpl) { - ERROR("failed to get reply: %s [%d].\n", brpc_strerror(), brpc_errno); - closeSock(&serFd, &sndAddr); + ERROR("failed to get reply (waited max %uus): %s [%d].\n", + (unsigned)rx_timeout, brpc_strerror(), brpc_errno); goto end; } if (brpc_is_fault(rpl)) { @@ -733,7 +664,7 @@ brpc_str_t listen, *reason; int *retcode; - listen.val = brpc_print_addr(&semsAddr); + listen.val = brpc_print_addr(&ctrlSrv->rxAddr); listen.len = strlen(listen.val); if (! ((req = brpc_req(METH_SER_RESYNC, random())) && @@ -781,29 +712,6 @@ brpc_finish(rpl); } -void BrpcCtrlInterface::_run() -{ - brpc_addr_t from; - brpc_t *req, *rpl; - - DBG("Running BrpcCtrlInterface thread.\n"); - while (! is_stopped()) { - from = semsAddr; // avoid a syscall to find out socket type - if (! (req = brpc_recvfrom(semsFd, &from, RX_TIMEOUT))) - continue; - //unsafe - DBG("received BINRPC request `%.*s'.\n", BRPC_STR_FMT(brpc_method(req))); - if ((rpl = brpc_cb_run(req))) { - if (! brpc_sendto(semsFd, &from, rpl, TX_TIMEOUT)) { - ERROR("failed to send reply to BINRPC request: %s [%d].\n", - brpc_strerror(), brpc_errno); - } - brpc_finish(rpl); - } - brpc_finish(req); - } -} - void BrpcCtrlInterface::run() { if (! sipDispatcher) { @@ -819,9 +727,15 @@ if(rpcCheck()) serResync(); - _run(); + ctrlSrv->start(); + ctrlSrv->join(); } +void BrpcCtrlInterface::on_stop() +{ + ctrlSrv->stop(); +} + static inline enum RPC_ERR_CODE read_unsigned(string &u_str, unsigned int &u_int) { @@ -873,17 +787,15 @@ &amReq.to_tag, &cseq_str, &amReq.route, - &amReq.next_hop, + &amReq.content_type, &amReq.body, &amReq.cmd, &amReq.hdrs }; brpc_str_t *cstr_refs[sizeof(strRef)/sizeof(string *)]; -#ifndef NDEBUG assert(sizeof(strRef)/sizeof(string *) - /*implicit TID*/1 == sizeof(REQ_FMTS)/sizeof(brpc_str_t *)); -#endif memset(fmt, 's', sizeof(fmt)/sizeof(char) - 1); fmt[0] = '!'; /* lay the refs in array */ @@ -926,11 +838,11 @@ &code_str, &amRpl.reason, &amRpl.next_request_uri, - &amRpl.next_hop, &amRpl.route, &amRpl.local_tag, &amRpl.remote_tag, &cseq_str, + &amRpl.content_type, &amRpl.hdrs, &amRpl.body }; @@ -1002,18 +914,20 @@ int mtype; AmSipRequest amReq; AmSipReply amRpl; - BrpcCtrlInterface *iface = (BrpcCtrlInterface *)_iface; + //BrpcCtrlInterface *iface = (BrpcCtrlInterface *)_iface; switch ((mtype = get_sipmeth_type(req))) { case SIP_METH_REQ: if ((errcode = sip_req_handler(req, amReq)) == CODE_RPC_SUCCESS) - iface->handleSipMsg(amReq); + //iface->handleSipMsg(amReq); + AmSipDispatcher::instance()->handleSipMsg(amReq); break; case SIP_METH_FIN: case SIP_METH_PRV: if ((errcode = sip_fin_handler(req, amRpl)) == CODE_RPC_SUCCESS) - iface->handleSipMsg(amRpl); + //iface->handleSipMsg(amRpl); + AmSipDispatcher::instance()->handleSipMsg(amRpl); break; case SIP_METH_NONE: @@ -1047,13 +961,69 @@ return req; } +/** + * Extract the bodies of multiple Route headers. + */ +static inline string rtset_body(const string &rthdr) +{ + string rtset; + const char *pos, *end; + const char *rt_start; + int eoh_len; + + for (pos = rthdr.c_str(), end = pos + rthdr.length(); pos < end; ) { + // if starts with `Route:', skip it (can start with WS, if multiline body + if (((signed)SIP_HDR_LEN(SIP_HDR_COL(SIP_HDR_ROUTE)) < end - pos) || + (memcmp(pos, SIP_HDR_COL(SIP_HDR_ROUTE), + SIP_HDR_LEN(SIP_HDR_COL(SIP_HDR_ROUTE))) == 0)) + pos += SIP_HDR_LEN(SIP_HDR_COL(SIP_HDR_ROUTE)); + + // skip leading WS + while (pos < end) { + switch (*pos) { + case ' ': + case '\n': + pos ++; + continue; + } + break; + } + // mark begining of route body + rt_start = pos; + + /* find end of route body */ + eoh_len = 0; + for ( ; pos < end; pos ++) + if (*pos == '\r') { + eoh_len ++; + break; + } + if (pos < end) + pos ++; + if (pos < end && *pos == '\n') { + eoh_len ++; + pos ++; + } + + // roll back over the existing `,' of a multiline route set + if (eoh_len && pos[-eoh_len] == ',') + eoh_len ++; + + if (rt_start < pos - eoh_len) { + if (! rtset.empty()) + rtset += ", "; + rtset += string(rt_start, &pos[-eoh_len] - rt_start); + } + } + + return rtset; +} + #define XTRA_HDRS(_xhdrs, _msg) \ string _xhdrs; \ - if (_msg.route.length()) \ - _xhdrs += _msg.route; \ - if (_msg.contact.length()) \ + if (! _msg.contact.empty()) \ _xhdrs += _msg.contact; \ - if (_msg.content_type.length()) \ + if (! _msg.content_type.empty()) \ _xhdrs += SIP_HDR_COLSP(SIP_HDR_CONTENT_TYPE) + _msg.content_type + CRLF;\ _xhdrs += _msg.hdrs; @@ -1075,11 +1045,34 @@ STR2BSTR(_from, amReq.from); STR2BSTR(_to, amReq.to); STR2BSTR(_callid, amReq.callid); - STR2BSTR(_next_hop, amReq.next_hop); STR2BSTR(_hdrs, xtraHdrs); STR2BSTR(_body, amReq.body); STR2BSTR(_empty, string("")); + string rtset; + try { + rtset = rtset_body(amReq.route); + } catch (string emsg) { + ERROR("failed to parse route set headers: %s.\n", rtset.c_str()); + return NULL; + } + + int rtflag; + if (! amReq.next_hop.empty()) { + string nhop; + if (amReq.next_hop.c_str()[0] != '<') + nhop = "<" + amReq.next_hop + ">"; + else + nhop = amReq.next_hop; + + rtset = nhop + rtset; + rtflag = SIPREQ_DEL_1ST_FLG; + } else { + rtflag = 0; + } + + STR2BSTR(_rtset, rtset); + #define STRIP_HF_NAME(_bstr_, _hf_name, _hf_name_len) \ do { \ if ((_hf_name_len < (_bstr_)->len) && \ @@ -1100,14 +1093,15 @@ if (! brpc_asm(req, REQUEST_FMT_REQ, as_id, - SIP_REQ_FIN_FLG|SIP_REQ_PRV_FLG|SIP_REQ_RUN_ORR, // FIXME: parameterized + // FIXME: parameterized + SIPREQ_GET_FIN_FLG|SIPREQ_GET_PRV_FLG|SIPREQ_RUN_ORR_FLG|rtflag, &_method, &_r_uri, &_from, // FIXME: only HF value; MUST have tag (check) &_to, // FIXME: only HF value (no "To: " included) (check) amReq.cseq, &_callid, - &_next_hop, + &_rtset, &_hdrs, &_body, &_empty // FIXME: "use the power!" @@ -1229,8 +1223,14 @@ goto end; } if (300 <= *retcode) { - ERROR("RPC request failed with code: %d, status: '%.*s'.\n", *retcode, - /*misleading var. name!*/BRPC_STR_FMT(ser_opaque)); +#if 0 + ERROR("RPC request failed (code: %d, status: '%.*s') for reply: %s\n", + *retcode, /*misleading var. name!*/BRPC_STR_FMT(ser_opaque), + ((AmSipReply)amRpl).print().c_str()); +#else + ERROR("RPC request failed (code: %d, status: '%.*s') for reply.\n", + *retcode, /*misleading var. name!*/BRPC_STR_FMT(ser_opaque)); +#endif goto end; } @@ -1250,7 +1250,7 @@ { string localUri; - if (displayName.length()) { + if (! displayName.empty()) { // quoting is safer (the check for quote need doesn't really pay off) if (displayName.c_str()[0] == '"') { assert(displayName.c_str()[displayName.length() - 1] == '"'); @@ -1265,17 +1265,17 @@ // angular brackets not always needed (unless contact) localUri += "<"; - if (hostName.length()) { + if (! hostName.empty()) { localUri += SIP_SCHEME_SIP; //TODO: sips|tel|tels localUri += ":"; - if (userName.length()) { + if (! userName.empty()) { localUri += userName; localUri += "@"; } localUri += hostName; } else { // SER will substituite the markers below - if (userName.length()) { + if (! userName.empty()) { localUri += char(STX); localUri += userName; localUri += char(ETX); @@ -1284,14 +1284,14 @@ } } - if (uriParams.length()) { + if (! uriParams.empty()) { if (uriParams.c_str()[0] != ';') localUri += ';'; localUri += uriParams; } localUri += ">"; - if (hdrParams.length()) { + if (! hdrParams.empty()) { if (hdrParams.c_str()[0] != ';') localUri += ';'; localUri += hdrParams; Modified: trunk/core/plug-in/binrpcctrl/BrpcCtrlInterface.h =================================================================== --- trunk/core/plug-in/binrpcctrl/BrpcCtrlInterface.h 2008-08-24 17:07:26 UTC (rev 1075) +++ trunk/core/plug-in/binrpcctrl/BrpcCtrlInterface.h 2008-08-25 15:59:24 UTC (rev 1076) @@ -7,7 +7,13 @@ #include "AmApi.h" #include "AmSipDispatcher.h" +#include "ConnPool.h" +#include "CtrlServer.h" + +#define ASI_VERSION 0x2 + + class BrpcCtrlInterfaceFactory : public AmCtrlInterfaceFactory { string semsUri, serUri; @@ -30,22 +36,17 @@ // handler of requests (SIP request | reply) received from SER AmSipDispatcher *sipDispatcher; - //addresses for: - //- SEMS listening for SER requests - //- SER listening for SEMS requests - //- SEMS SeNDing requests to SER, when using PF_LOCAL sockets (might not - //be used) - brpc_addr_t semsAddr, serAddr, sndAddr; - int semsFd, serFd; + ConnPool *serConn; + CtrlServer *ctrlSrv; inline void handleSipMsg(AmSipRequest &req) { - AmSipDispatcher::instance()->handleSipMsg(req); + AmSipDispatcher::instance()->handleSipMsg(req); } inline void handleSipMsg(AmSipReply &rpl) { - AmSipDispatcher::instance()->handleSipMsg(rpl); + AmSipDispatcher::instance()->handleSipMsg(rpl); } brpc_t *rpcExecute(brpc_t *req); @@ -57,12 +58,7 @@ static brpc_t *digests(brpc_t *req, void *iface); static brpc_t *req_handler(brpc_t *req, void *iface); - int getSerFd(); - void closeSerConn(); - static void closeSock(int *sock, brpc_addr_t *addr); - bool initCallbacks(); - void _run(); public: BrpcCtrlInterface(); @@ -72,7 +68,7 @@ // AmThread void run(); - void on_stop() {} + void on_stop(); // AmCtrlInterface int send(const AmSipRequest &, char *, unsigned int &); Added: trunk/core/plug-in/binrpcctrl/ConnPool.cpp =================================================================== --- trunk/core/plug-in/binrpcctrl/ConnPool.cpp 2008-08-24 17:07:26 UTC (rev 1075) +++ trunk/core/plug-in/binrpcctrl/ConnPool.cpp 2008-08-25 15:59:24 UTC (rev 1076) @@ -0,0 +1,232 @@ + +#include <stdlib.h> +#include <errno.h> +#include <assert.h> +#include <binrpc.h> + +#include "log.h" +#include "ConnPool.h" + + +#define SND_USOCK_TEMPLATE "/tmp/sems_send_sock_XXXXXX" //TODO: configurable +#define MAX_RETRY_ON_ERR 5 + +#ifndef UNIX_PATH_MAX +#include <sys/un.h> +#define UNIX_PATH_MAX sizeof(((struct sockaddr_un *)0)->sun_path) +#endif + + +// The locks are of type 'DEFAULT', non recursive -> should not fail. +#define LOCK \ + if (pthread_mutex_lock(&mutex) != 0) { \ + ERROR("CRITICAL: failed to lock mutex: %s [%d].\n", strerror(errno), \ + errno); \ + abort(); \ + } +#define UNLOCK \ + if (pthread_mutex_unlock(&mutex) != 0) { \ + ERROR("CRITICAL: failed to unlock mutex: %s [%d].\n", strerror(errno), \ + errno); \ + abort(); \ + } +#define WAIT \ + if (pthread_cond_wait(&cond, &mutex) != 0) { \ + ERROR("CRITICAL: failed to wait on condition: %s [%d].\n", \ + strerror(errno), errno); \ + abort(); \ + } +#define WAKEUP \ + if (pthread_cond_signal(&cond) != 0) { \ + ERROR("CRITICAL: failed to signal on cond"); \ + abort(); \ + } +#define SAFE(instr_set) \ + do { \ + LOCK; \ + instr_set; \ + UNLOCK; \ + } while (0) + +ConnPool::ConnPool(const string &target, unsigned size, brpc_tv_t ct_to) : + cap(size), + size(0), + ct_timeout(ct_to), + onwait(0) +{ + brpc_addr_t *addr; + if (! (addr = brpc_parse_uri(target.c_str()))) + throw "failed to parse BINRPC URI `" + target + "': " + + string(brpc_strerror()) + "."; + else + txAddr = *addr; + + if (pthread_mutex_init(&mutex, 0) != 0) + throw "failed to init mutex"; + if (pthread_cond_init(&cond, 0) != 0) + throw "failed to init wait condition"; +} + +ConnPool::~ConnPool() +{ + int fd; + + cap = 0; // prevent making new connections; + while (size) { + if (0 <= (fd = get())) { + destroy(fd); + } else { + ERROR("failed to destroy all connections (%s [%d]).\n", + strerror(errno), errno); + break; + } + } + + pthread_mutex_destroy(&mutex); + pthread_cond_destroy(&cond); +} + +int ConnPool::get() +{ + int fd; + + LOCK; + + while (fdStack.empty()) { + if (size < cap) { + size ++; // inc it now, so that the cap is enforced + UNLOCK; + return new_conn(); + } else { + onwait ++; + INFO("%dth worker asking for connectio, put on wait. " + "Need more capacity? (current: %d)\n", onwait, cap); + WAIT; + onwait --; + } + } + fd = fdStack.top(); + fdStack.pop(); + + UNLOCK; + + DBG("connection FD#%d aquired.\n", fd); + return fd; +} + +void ConnPool::release(int fd) +{ + assert(0 <= fd); + + LOCK; + if (onwait && fdStack.empty()) + WAKEUP; + fdStack.push(fd); + UNLOCK; + DBG("connection FD#%d released.\n", fd); +} + +void ConnPool::destroy(int fd) +{ + brpc_addr_t addr; + + assert(0 <= fd); + + LOCK; + if (BRPC_ADDR_DOMAIN(&txAddr) == PF_LOCAL) { + addr = locAddrMap[fd]; + locAddrMap.erase(fd); + } else { + BRPC_ADDR_DOMAIN(&addr) = 0; + } + size --; + UNLOCK; + + if (BRPC_ADDR_DOMAIN(&addr)) { + INFO("closing FD#%d for %s.\n", fd, brpc_print_addr(&addr)); + if (unlink(BRPC_ADDR_UN(&addr)->sun_path) < 0) { + ERROR("failed to remove unix socket file '%s': %s [%d].\n", + BRPC_ADDR_UN(&addr)->sun_path, strerror(errno), errno); + } + } else { + INFO("closing FD#%d for %s.\n", fd, brpc_print_addr(&txAddr)); + } + + if (close(fd) < 0) + ERROR("FD %d closed uncleanly: %s [%d].\n", fd, strerror(errno), errno); + + DBG("connection FD#%d destroyied.\n", fd); +} + +int ConnPool::new_conn() +{ + int fd; + brpc_addr_t locAddr; + + //if using local sockets, we need to bind to a local socket when sending + //out requests, so that SER knows where to send back replies for our + //requests (unlike INET layers PF_LOCAL+DGRAM based communication needs to + //bind.the sending socket in order to have this socket receive replies) + if (BRPC_ADDR_DOMAIN(&txAddr) == PF_LOCAL) { + //There's a race the loop tries to work around: + //1. mkstemp creates & opens a temp file. brpc_socket() (called + //later), removes the temporary file (unlink) and opens a unix + //socket with the same name. + //So, between unlink() and socket(), some other mkstemp could + //"ocupy" the name. + int errcnt = -1; + do { + if (MAX_RETRY_ON_ERR < ++errcnt) { + ERROR("%dth consecutive failed attempt to create a local domain " + "socket for sending req - giving up.\n", errcnt); + fd = -1; + goto end; + } else if (1 < errcnt) { //the previous brpc_socket() attempt failed + ERROR("failed to create BINRPC socket: %s [%d].\n", brpc_strerror(), + brpc_errno); + } + + char buff[UNIX_PATH_MAX]; + assert(sizeof(SND_USOCK_TEMPLATE) <= sizeof(buff)); + memcpy(buff, SND_USOCK_TEMPLATE, sizeof(SND_USOCK_TEMPLATE)); + int tmpFd = mkstemp(buff); + if (tmpFd < 0) { + ERROR("failed to create temporary file with template `%s': %s [%d].\n", + SND_USOCK_TEMPLATE, strerror(errno), errno); + continue; + } else { + close(tmpFd); // close the FD - only the modified buff is of worth + } + locAddr = txAddr; //copy domain, socket type + memcpy(BRPC_ADDR_UN(&locAddr)->sun_path, buff, strlen(buff)+/*0-term*/1); + BRPC_ADDR_LEN(&locAddr) = SUN_LEN(BRPC_ADDR_UN(&locAddr)); + DBG("creating temporary send socket bound to `%s'.\n", buff); + } while ((fd = brpc_socket(&locAddr, /*blocking*/false, + /*named*/true)) < 0); + /* TODO: permission, UID/GID of the created socket */ + } else { + if ((fd = brpc_socket(&txAddr, /*blk*/false, /*named*/false)) < 0) { + ERROR("failed to create BINRPC socket: %s [%d].\n", brpc_strerror(), + brpc_errno); + fd = -1; + goto end; + } + } + if (! brpc_connect(&txAddr, &fd, ct_timeout)) { + ERROR("failed to connect to SER: %s [%d].\n", brpc_strerror(), brpc_errno); + close(fd); + fd = -1; + } + +end: + if (fd < 0) { + SAFE(size --); + } else { + if (BRPC_ADDR_DOMAIN(&txAddr) == PF_LOCAL) { + // connect succeeded -> do the mapping + SAFE(locAddrMap[fd] = locAddr); + } + DBG("connection FD#%d created.\n", fd); + } + return fd; +} Added: trunk/core/plug-in/binrpcctrl/ConnPool.h =================================================================== --- trunk/core/plug-in/binrpcctrl/ConnPool.h 2008-08-24 17:07:26 UTC (rev 1075) +++ trunk/core/plug-in/binrpcctrl/ConnPool.h 2008-08-25 15:59:24 UTC (rev 1076) @@ -0,0 +1,38 @@ +#ifndef __CONNPOOL_H__ +#define __CONNPOOL_H__ + +#include <pthread.h> +#include <string> +#include <map> +#include <stack> +#include <binrpc.h> + +using namespace std; +using std::map; +using std::stack; + +class ConnPool +{ + int cap; // ..acity: maximum size of the pool + stack<int> fdStack; // array with socket file descriptors + int size; // active connections count; =fdStack.size() + N x get()'s + map<int, brpc_addr_t> locAddrMap; // used for PF_LOCAL connections + pthread_mutex_t mutex; // protects access to stack with FDs + pthread_cond_t cond; // wait when pool empty and capacity reached + int onwait; // how many workers are waiting on cond. + brpc_tv_t ct_timeout; + + int new_conn(); + + public: + ConnPool(const string &target, unsigned size, brpc_tv_t ct_timeout); + ~ConnPool(); + + brpc_addr_t txAddr; // binRPC address of SER; + + int get(); + void release(int fd); + void destroy(int fd); +}; + +#endif /* __CONNPOOL_H__ */ Added: trunk/core/plug-in/binrpcctrl/CtrlServer.cpp =================================================================== --- trunk/core/plug-in/binrpcctrl/CtrlServer.cpp 2008-08-24 17:07:26 UTC (rev 1075) +++ trunk/core/plug-in/binrpcctrl/CtrlServer.cpp 2008-08-25 15:59:24 UTC (rev 1076) @@ -0,0 +1,117 @@ + +#include "AmUtils.h" +#include "log.h" +#include "CtrlServer.h" + +CtrlServer::CtrlServer(const string &listen, unsigned listeners, + brpc_tv_t rx_timeout, brpc_tv_t tx_timeout) : + wcnt(listeners) +{ + brpc_addr_t *addr; + + if (! (addr = brpc_parse_uri(listen.c_str()))) { + throw "failed to parse BINRPC URI `" + listen + "': " + + string(brpc_strerror()) + " [" + int2str(brpc_errno) + "]"; + } else if (BRPC_ADDR_TYPE(addr) != SOCK_DGRAM) { + //b/c we'd have to do connection management otherwise (listen for + //connections, poll for activity on each descriptor etc); for now, not + //relly needed and the impl. is much easier. + throw "only datagram listeners supported"; + } else { + rxAddr = *addr; + } + + if ((rxFd = brpc_socket(addr, /*blocking*/false, /*bind*/true)) < 0) + throw "failed to get listen socket for URI `" + listen + "': " + + string(brpc_strerror()) + " [" + int2str(brpc_errno) + "].\n"; + + workers = new CtrlWorker[wcnt](); + + for (unsigned i = 0; i < wcnt; i ++) + workers[i].init(rxFd, rxAddr, rx_timeout, tx_timeout); +} + +CtrlServer::~CtrlServer() +{ + INFO("closing SEMS listener FD#%d for %s.\n", rxFd, + brpc_print_addr(&rxAddr)); + + if (close(rxFd) < 0) + ERROR("CtrlServer server socket#%d closed uncleanly: %s [%d].\n", rxFd, + strerror(errno), errno); + + if (BRPC_ADDR_DOMAIN(&rxAddr) == PF_LOCAL) { + if (unlink(BRPC_ADDR_UN(&rxAddr)->sun_path) < 0) { + ERROR("failed to remove unix socket file '%s': %s [%d].\n", + BRPC_ADDR_UN(&rxAddr)->sun_path, strerror(errno), errno); + } + } + delete []workers; +} + +void CtrlServer::start() +{ + for (unsigned i = 0; i < wcnt; i ++) + workers[i].start(); + INFO("CtrlServer started.\n"); +} + +void CtrlServer::stop() +{ + INFO("CtrlServer stopping.\n"); + for (unsigned i = 0; i < wcnt; i ++) + workers[i].stop(); +} + +void CtrlServer::join() +{ + for (unsigned i = 0; i < wcnt; i ++) + workers[i].join(); + INFO("CtrlServer stopped.\n"); +} + + +CtrlWorker::CtrlWorker() : + rxFd(-1) +{} + +void CtrlWorker::init(int rxFd, brpc_addr_t rxAddr, + brpc_tv_t rx_timeout, brpc_tv_t tx_timeout) +{ + this->rxFd = rxFd; + this->rxAddr = rxAddr; + this->rx_timeout = rx_timeout; + this->tx_timeout = tx_timeout; +} + +void CtrlWorker::run() +{ + brpc_addr_t from; + brpc_t *req, *rpl; + + INFO("CtrlServer worker #%lx started.\n", pthread_self()); + running = 1; + + do { + from = rxAddr; // avoid a syscall to find out socket type + if (! (req = brpc_recvfrom(rxFd, &from, rx_timeout))) + continue; + //unsafe + DBG("received BINRPC request `%.*s'.\n", BRPC_STR_FMT(brpc_method(req))); + if ((rpl = brpc_cb_run(req))) { + if (! brpc_sendto(rxFd, &from, rpl, tx_timeout)) { + ERROR("failed to send reply to BINRPC request: %s [%d].\n", + brpc_strerror(), brpc_errno); + } + brpc_finish(rpl); + } + brpc_finish(req); + } while (running); + + INFO("CtrlServer worker #%lx stopped.\n", pthread_self()); +} + +void CtrlWorker::on_stop() +{ + running = 0; +} Added: trunk/core/plug-in/binrpcctrl/CtrlServer.h =================================================================== --- trunk/core/plug-in/binrpcctrl/CtrlServer.h 2008-08-24 17:07:26 UTC (rev 1075) +++ trunk/core/plug-in/binrpcctrl/CtrlServer.h 2008-08-25 15:59:24 UTC (rev 1076) @@ -0,0 +1,48 @@ +#ifndef __CTRLSERVER_H__ +#define __CTRLSERVER_H__ + +#include <string> +#include <binrpc.h> +#include "AmThread.h" + +using namespace std; + +class CtrlWorker : public AmThread +{ + volatile int running; //don't like the shared var of AmThread (locking) + brpc_tv_t tx_timeout, rx_timeout; + int rxFd; + brpc_addr_t rxAddr; + + void run(); + void on_stop(); + + public: + CtrlWorker(); + + void init(int rxFd, brpc_addr_t rxAddr, + brpc_tv_t rx_timeout, brpc_tv_t tx_timeout); +}; + +/** + * This is just a proxy class, handling multiple receiver threads + */ +class CtrlServer +{ + int rxFd; + CtrlWorker *workers; + unsigned wcnt; + + public: + CtrlServer(const string &listen, unsigned listeners, + brpc_tv_t rx_timeout, brpc_tv_t tx_timeout); + ~CtrlServer(); + + brpc_addr_t rxAddr; + + void start(); + void stop(); + void join(); +}; + +#endif /* __CTRLSERVER_H__ */ _______________________________________________ Semsdev mailing list [email protected] http://lists.iptel.org/mailman/listinfo/semsdev
