Repository: trafficserver Updated Branches: refs/heads/master 47a53bec1 -> 93fb1bbfd
TS-2627: mgmt condense duplicated socket read code Rather than copy the code to read a fixed amount from a socket, make a helper function and call it. Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo Commit: http://git-wip-us.apache.org/repos/asf/trafficserver/commit/93fb1bbf Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/93fb1bbf Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/93fb1bbf Branch: refs/heads/master Commit: 93fb1bbfd6e693c47db148b43197895e6e0bcf32 Parents: 47a53be Author: James Peach <jpe...@apache.org> Authored: Mon Mar 10 17:01:07 2014 -0700 Committer: James Peach <jpe...@apache.org> Committed: Tue Mar 11 12:53:18 2014 -0700 ---------------------------------------------------------------------- CHANGES | 2 + mgmt/api/NetworkUtilsRemote.cc | 562 ++++++++++-------------------------- mgmt/api/NetworkUtilsRemote.h | 1 - 3 files changed, 151 insertions(+), 414 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/trafficserver/blob/93fb1bbf/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index e59b52d..3a87b9d 100644 --- a/CHANGES +++ b/CHANGES @@ -1,6 +1,8 @@ -*- coding: utf-8 -*- Changes with Apache Traffic Server 5.0.0 + *) [TS-2627] Reduce management socket code duplication. + *) [TS-2625] trafficserver.in doesn't use TS_BASE *) [TS-2624] Make thread affinity more robust http://git-wip-us.apache.org/repos/asf/trafficserver/blob/93fb1bbf/mgmt/api/NetworkUtilsRemote.cc ---------------------------------------------------------------------- diff --git a/mgmt/api/NetworkUtilsRemote.cc b/mgmt/api/NetworkUtilsRemote.cc index 92a2e55..3bf941a 100644 --- a/mgmt/api/NetworkUtilsRemote.cc +++ b/mgmt/api/NetworkUtilsRemote.cc @@ -61,6 +61,7 @@ extern TSInitOptionT ts_init_options; /********************************************************************** * Socket Helper Functions **********************************************************************/ + void set_socket_paths(const char *path) { @@ -381,6 +382,34 @@ connect_and_send(const char *msg, int msg_len) return TS_ERR_OKAY; } +static TSError +socket_read_conn(int fd, uint8_t * buf, size_t needed) +{ + size_t consumed = 0; + ssize_t ret; + + while (needed > consumed) { + ret = read(fd, buf, needed - consumed); + + if (ret < 0) { + if (errno == EAGAIN) { + continue; + } else { + return TS_ERR_NET_READ; + } + } + + if (ret == 0) { + return TS_ERR_NET_EOF; + } + + buf += ret; + consumed += ret; + } + + return TS_ERR_OKAY; +} + /************************************************************************** * socket_write_conn * @@ -398,7 +427,7 @@ connect_and_send(const char *msg, int msg_len) * 1) if the write returns EPIPE error, then call connect_and_send() * 2) return the value returned from EPIPE *************************************************************************/ -TSError +static TSError socket_write_conn(int fd, const char *msg_buf, int bytes) { int ret, byte_wrote = 0; @@ -407,7 +436,7 @@ socket_write_conn(int fd, const char *msg_buf, int bytes) if (socket_write_timeout(fd, MAX_TIME_WAIT, 0) <= 0) { return TS_ERR_NET_TIMEOUT; } - // read until we fulfill the number + // write until we fulfill the number while (byte_wrote < bytes) { ret = write(fd, msg_buf + byte_wrote, bytes - byte_wrote); @@ -1043,30 +1072,17 @@ send_diags_msg(int fd, TSDiagsT mode, const char *diag_msg) TSError parse_reply(int fd) { - int ret, amount_read = 0; + TSError ret; int16_t ret_val; // check to see if anything to read; wait for specified time = 1 sec if (socket_read_timeout(fd, MAX_TIME_WAIT, 0) <= 0) { // time expires before ready to read return TS_ERR_NET_TIMEOUT; } - // get the return value (TSError type) - while (amount_read < SIZE_ERR_T) { - ret = read(fd, (void *) &ret_val, SIZE_ERR_T - amount_read); - - if (ret < 0) { - if (errno == EAGAIN) - continue; - else { - return TS_ERR_NET_READ; - } - } - if (ret == 0) { - return TS_ERR_NET_EOF; - } - // all is good here :) - amount_read += ret; + ret = socket_read_conn(fd, (uint8_t *)&ret_val, SIZE_ERR_T); + if (ret != TS_ERR_OKAY) { + return ret; } return (TSError) ret_val; @@ -1085,86 +1101,48 @@ parse_reply(int fd) TSError parse_reply_list(int fd, char **list) { - int ret, amount_read = 0; int16_t ret_val; int32_t list_size; TSError err_t; - if (!list) + if (!list) { return TS_ERR_PARAMS; + } // check to see if anything to read; wait for specified time if (socket_read_timeout(fd, MAX_TIME_WAIT, 0) <= 0) { return TS_ERR_NET_TIMEOUT; } - // get the return value (TSError type) - while (amount_read < SIZE_ERR_T) { - ret = read(fd, (void *) &ret_val, SIZE_ERR_T - amount_read); - if (ret < 0) { - if (errno == EAGAIN) - continue; - else { - return TS_ERR_NET_READ; - } - } - - if (ret == 0) { - return TS_ERR_NET_EOF; - } - // all is good here :) - amount_read += ret; + // get the return value (TSError type) + err_t = socket_read_conn(fd, (uint8_t *)&ret_val, SIZE_ERR_T); + if (err_t != TS_ERR_OKAY) { + return err_t; } + // if !TS_ERR_OKAY, stop reading rest of msg err_t = (TSError) ret_val; if (err_t != TS_ERR_OKAY) { return err_t; } - // now get size of string event list - amount_read = 0; - while (amount_read < SIZE_LEN) { - ret = read(fd, (void *) &list_size, SIZE_LEN - amount_read); - if (ret < 0) { - if (errno == EAGAIN) - continue; - else { - return TS_ERR_NET_READ; - } - } - - if (ret == 0) { - return TS_ERR_NET_EOF; - } - // all is good here :) - amount_read += ret; + // now get size of string event list + err_t = socket_read_conn(fd, (uint8_t *)&list_size, SIZE_LEN); + if (err_t != TS_ERR_OKAY) { + return err_t; } // get the delimited event list string *list = (char *)ats_malloc(sizeof(char) * (list_size + 1)); - amount_read = 0; - while (amount_read < list_size) { - ret = read(fd, (void *) *list, list_size - amount_read); - - if (ret < 0) { - if (errno == EAGAIN) - continue; - else { - ats_free(*list); - return TS_ERR_NET_READ; - } - } - - if (ret == 0) { - ats_free(*list); - return TS_ERR_NET_EOF; - } - - amount_read += ret; + err_t = socket_read_conn(fd, (uint8_t *)(*list), list_size); + if (err_t != TS_ERR_OKAY) { + ats_free(*list); + *list = NULL; + return err_t; } + // add end of string to end of the record value ((char *) (*list))[list_size] = '\0'; - return err_t; } @@ -1184,7 +1162,6 @@ parse_reply_list(int fd, char **list) TSError parse_file_read_reply(int fd, int *ver, int *size, char **text) { - int ret, amount_read = 0; int32_t f_size; int16_t ret_val, f_ver; TSError err_t; @@ -1196,101 +1173,52 @@ parse_file_read_reply(int fd, int *ver, int *size, char **text) if (socket_read_timeout(fd, MAX_TIME_WAIT, 0) <= 0) { // time expires before ready to read return TS_ERR_NET_TIMEOUT; } - // get the error return value - while (amount_read < SIZE_ERR_T) { - ret = read(fd, (void *) &ret_val, SIZE_ERR_T - amount_read); - - if (ret < 0) { - if (errno == EAGAIN) - continue; - else { - return TS_ERR_NET_READ; - } - } - - if (ret == 0) { - return TS_ERR_NET_EOF; - } - amount_read += ret; + // get the error return value + err_t = socket_read_conn(fd, (uint8_t *)&ret_val, SIZE_ERR_T); + if (err_t != TS_ERR_OKAY) { + return err_t; } + // if !TS_ERR_OKAY, stop reading rest of msg err_t = (TSError) ret_val; if (err_t != TS_ERR_OKAY) { return err_t; } - // now get file version - amount_read = 0; - while (amount_read < SIZE_VER) { - ret = read(fd, (void *) &f_ver, SIZE_VER - amount_read); - - if (ret < 0) { - if (errno == EAGAIN) - continue; - else { - return TS_ERR_NET_READ; - } - } - - if (ret == 0) { - return TS_ERR_NET_EOF; - } - amount_read += ret; + // now get file version + err_t = socket_read_conn(fd, (uint8_t *)&f_ver, SIZE_VER); + if (err_t != TS_ERR_OKAY) { + return err_t; } + *ver = (int) f_ver; // now get file size - amount_read = 0; - while (amount_read < SIZE_LEN) { - ret = read(fd, (void *) &f_size, SIZE_LEN - amount_read); - - if (ret < 0) { - if (errno == EAGAIN) - continue; - else { - return TS_ERR_NET_READ; - } - } - - if (ret == 0) { - return TS_ERR_NET_EOF; - } - // all is good here :) - amount_read += ret; + err_t = socket_read_conn(fd, (uint8_t *)&f_size, SIZE_LEN); + if (err_t != TS_ERR_OKAY) { + return err_t; } + *size = (int) f_size; // check size before reading text if ((*size) <= 0) { *text = ats_strndup("", 1); // set to empty string - } else { - // now we got the size, we can read everything into our msg * then parse it - *text = (char *)ats_malloc(sizeof(char) * (f_size + 1)); - amount_read = 0; - while (amount_read < f_size) { - ret = read(fd, (void *) *text, f_size - amount_read); - - if (ret < 0) { - if (errno == EAGAIN) - continue; - else { - ats_free(*text); - return TS_ERR_NET_READ; - } - } - - if (ret == 0) { - ats_free(*text); - return TS_ERR_NET_EOF; - } + return TS_ERR_OKAY; + } - amount_read += ret; - } - (*text)[f_size] = '\0'; // end the string + // now we got the size, we can read everything into our msg * then parse it + *text = (char *)ats_malloc(sizeof(char) * (f_size + 1)); + err_t = socket_read_conn(fd, (uint8_t *)(*text), f_size); + if (err_t != TS_ERR_OKAY) { + ats_free(*text); + *text = NULL; + return err_t; } - return err_t; + (*text)[f_size] = '\0'; // end the string + return TS_ERR_OKAY; } /********************************************************************** @@ -1310,7 +1238,6 @@ parse_file_read_reply(int fd, int *ver, int *size, char **text) TSError parse_record_get_reply(int fd, TSRecordT * rec_type, void **rec_val) { - int ret, amount_read = 0; int16_t ret_val, rec_t; int32_t rec_size; TSError err_t; @@ -1322,100 +1249,52 @@ parse_record_get_reply(int fd, TSRecordT * rec_type, void **rec_val) if (socket_read_timeout(fd, MAX_TIME_WAIT, 0) <= 0) { //time expired before ready to read return TS_ERR_NET_TIMEOUT; } - // get the return value (TSError type) - while (amount_read < SIZE_ERR_T) { - ret = read(fd, (void *) &ret_val, SIZE_ERR_T - amount_read); - if (ret < 0) { - if (errno == EAGAIN) - continue; - else { - return TS_ERR_NET_READ; - } - } - - if (ret == 0) { - return TS_ERR_NET_EOF; - } - // all is good here :) - amount_read += ret; + // get the return value (TSError type) + err_t = socket_read_conn(fd, (uint8_t *)&ret_val, SIZE_ERR_T); + if (err_t != TS_ERR_OKAY) { + return err_t; } + // if !TS_ERR_OKAY, stop reading rest of msg err_t = (TSError) ret_val; if (err_t != TS_ERR_OKAY) { return err_t; } - // now get size of record_value - amount_read = 0; - while (amount_read < SIZE_LEN) { - ret = read(fd, (void *) &rec_size, SIZE_LEN - amount_read); - - if (ret < 0) { - if (errno == EAGAIN) - continue; - else { - return TS_ERR_NET_READ; - } - } - if (ret == 0) { - return TS_ERR_NET_EOF; - } - // all is good here :) - amount_read += ret; + // now get size of record_value + err_t = socket_read_conn(fd, (uint8_t *)&rec_size, SIZE_LEN); + if (err_t != TS_ERR_OKAY) { + return err_t; } // get the record type - amount_read = 0; - while (amount_read < SIZE_REC_T) { - ret = read(fd, (void *) &rec_t, SIZE_REC_T - amount_read); - - if (ret < 0) { - if (errno == EAGAIN) { - continue; - } else { - return TS_ERR_NET_READ; - } - } - - if (ret == 0) { - return TS_ERR_NET_EOF; - } - // all is good here :) - amount_read += ret; + err_t = socket_read_conn(fd, (uint8_t *)&rec_t, SIZE_REC_T); + if (err_t != TS_ERR_OKAY) { + return err_t; } + *rec_type = (TSRecordT) rec_t; // get record value // allocate correct amount of memory for record value - if (*rec_type == TS_REC_STRING) + if (*rec_type == TS_REC_STRING) { *rec_val = ats_malloc(sizeof(char) * (rec_size + 1)); - else + } else { *rec_val = ats_malloc(sizeof(char) * (rec_size)); + } - amount_read = 0; - while (amount_read < rec_size) { - ret = read(fd, (void *) *rec_val, rec_size - amount_read); - - if (ret < 0) { - if (errno == EAGAIN) - continue; - else { - ats_free(*rec_val); - return TS_ERR_NET_READ; - } - } - - if (ret == 0) { - ats_free(*rec_val); - return TS_ERR_NET_EOF; - } - - amount_read += ret; + err_t = socket_read_conn(fd, (uint8_t *)(*rec_val), rec_size); + if (err_t != TS_ERR_OKAY) { + ats_free(*rec_val); + *rec_val = NULL; + return err_t; } + // add end of string to end of the record value - if (*rec_type == TS_REC_STRING) + if (*rec_type == TS_REC_STRING) { ((char *) (*rec_val))[rec_size] = '\0'; + } return err_t; } @@ -1434,7 +1313,6 @@ parse_record_get_reply(int fd, TSRecordT * rec_type, void **rec_val) TSError parse_record_set_reply(int fd, TSActionNeedT * action_need) { - int ret, amount_read = 0; int16_t ret_val, action_t; TSError err_t; @@ -1445,50 +1323,26 @@ parse_record_set_reply(int fd, TSActionNeedT * action_need) if (socket_read_timeout(fd, MAX_TIME_WAIT, 0) <= 0) { return TS_ERR_NET_TIMEOUT; } - // get the return value (TSError type) - while (amount_read < SIZE_ERR_T) { - ret = read(fd, (void *) &ret_val, SIZE_ERR_T - amount_read); - if (ret < 0) { - if (errno == EAGAIN) - continue; - else { - return TS_ERR_NET_READ; - } - } - - if (ret == 0) { - return TS_ERR_NET_EOF; - } - // all is good here :) - amount_read += ret; + // get the return value (TSError type) + err_t = socket_read_conn(fd, (uint8_t *)&ret_val, SIZE_ERR_T); + if (err_t != TS_ERR_OKAY) { + return err_t; } + // if !TS_ERR_OKAY, stop reading rest of msg err_t = (TSError) ret_val; if (err_t != TS_ERR_OKAY) { return err_t; } - // now get the action needed - amount_read = 0; - while (amount_read < SIZE_ACTION_T) { - ret = read(fd, (void *) &action_t, SIZE_ACTION_T - amount_read); - - if (ret < 0) { - if (errno == EAGAIN) - continue; - else { - return TS_ERR_NET_READ; - } - } - if (ret == 0) { - return TS_ERR_NET_EOF; - } - // all is good here :) - amount_read += ret; + // now get the action needed + err_t = socket_read_conn(fd, (uint8_t *)&action_t, SIZE_ACTION_T); + if (err_t != TS_ERR_OKAY) { + return err_t; } - *action_need = (TSActionNeedT) action_t; + *action_need = (TSActionNeedT) action_t; return err_t; } @@ -1506,7 +1360,7 @@ parse_record_set_reply(int fd, TSActionNeedT * action_need) TSError parse_proxy_state_get_reply(int fd, TSProxyStateT * state) { - int ret, amount_read = 0; + TSError err_t; int16_t state_t; if (!state) @@ -1516,27 +1370,14 @@ parse_proxy_state_get_reply(int fd, TSProxyStateT * state) if (socket_read_timeout(fd, MAX_TIME_WAIT, 0) <= 0) { // time expires before ready to read return TS_ERR_NET_TIMEOUT; } - // now get proxy state - amount_read = 0; - while (amount_read < SIZE_PROXY_T) { - ret = read(fd, (void *) &state_t, SIZE_PROXY_T - amount_read); - - if (ret < 0) { - if (errno == EAGAIN) - continue; - else { - return TS_ERR_NET_READ; - } - } - if (ret == 0) { - return TS_ERR_NET_EOF; - } - // all is good here :) - amount_read += ret; + // now get proxy state + err_t = socket_read_conn(fd, (uint8_t *)&state_t, SIZE_PROXY_T); + if (err_t != TS_ERR_OKAY) { + return err_t; } - *state = (TSProxyStateT) state_t; + *state = (TSProxyStateT) state_t; return TS_ERR_OKAY; } @@ -1555,7 +1396,6 @@ parse_proxy_state_get_reply(int fd, TSProxyStateT * state) TSError parse_event_active_reply(int fd, bool * is_active) { - int ret, amount_read = 0; int16_t ret_val, active; TSError err_t; @@ -1566,50 +1406,26 @@ parse_event_active_reply(int fd, bool * is_active) if (socket_read_timeout(fd, MAX_TIME_WAIT, 0) <= 0) { return TS_ERR_NET_TIMEOUT; } - // get the return value (TSError type) - while (amount_read < SIZE_ERR_T) { - ret = read(fd, (void *) &ret_val, SIZE_ERR_T - amount_read); - if (ret < 0) { - if (errno == EAGAIN) - continue; - else { - return TS_ERR_NET_READ; - } - } - - if (ret == 0) { - return TS_ERR_NET_EOF; - } - // all is good here :) - amount_read += ret; + // get the return value (TSError type) + err_t = socket_read_conn(fd, (uint8_t *)&ret_val, SIZE_ERR_T); + if (err_t != TS_ERR_OKAY) { + return err_t; } + // if !TS_ERR_OKAY, stop reading rest of msg err_t = (TSError) ret_val; if (err_t != TS_ERR_OKAY) { return err_t; } - // now get the boolean - amount_read = 0; - while (amount_read < SIZE_BOOL) { - ret = read(fd, (void *) &active, SIZE_BOOL - amount_read); - if (ret < 0) { - if (errno == EAGAIN) - continue; - else { - return TS_ERR_NET_READ; - } - } - - if (ret == 0) { - return TS_ERR_NET_EOF; - } - // all is good here :) - amount_read += ret; + // now get the boolean + err_t = socket_read_conn(fd, (uint8_t *)&active, SIZE_BOOL); + if (err_t != TS_ERR_OKAY) { + return err_t; } - *is_active = (bool) active; + *is_active = (bool) active; return err_t; } @@ -1627,130 +1443,55 @@ parse_event_active_reply(int fd, bool * is_active) TSError parse_event_notification(int fd, TSEvent * event) { - int amount_read, ret; OpType msg_type; int16_t type_op; int32_t msg_len; char *event_name = NULL, *desc = NULL; + TSError err_t; if (!event) return TS_ERR_PARAMS; // read the operation type; should be EVENT_NOTIFY - amount_read = 0; - while (amount_read < SIZE_OP_T) { - ret = read(fd, (void *) &type_op, SIZE_OP_T - amount_read); - - // the thread can receive a bad file descriptor error(EBADF) - // if the current socket_fd being used by this thread is invalid; - // this occurs when TM restarts and the client has to reconnect and - // get a new socket_fd; in this case, this thread will return null - // and die; and the client will launch a new event_poll_thread_main - // when it reconnects to TM - - // connection broken or error - if ((ret < 0) && (errno != EAGAIN)) { - //fprintf(stderr, "[event_poll_thread_main] ERROR read event socket %d: %s\n", sock_fd, strerror((int)errno)); - goto ERROR_READ; - } - - if (ret == 0) { - goto ERROR_EOF; - } - - // if we read some bytes keep on going. - amount_read += ret; + err_t = socket_read_conn(fd, (uint8_t *)&type_op, SIZE_OP_T); + if (err_t != TS_ERR_OKAY) { + return err_t; } // got the message type; the msg_type should be EVENT_NOTIFY msg_type = (OpType) type_op; - if (msg_type != EVENT_NOTIFY) + if (msg_type != EVENT_NOTIFY) { return TS_ERR_FAIL; - //fprintf(stderr, "[event_poll_thread_main] received EVENT_NOTIFY from TM\n"); + } // read in event name length - amount_read = 0; - while (amount_read < SIZE_LEN) { - ret = read(fd, (void *) &msg_len, SIZE_LEN - amount_read); - - if (ret < 0) { - if (errno == EAGAIN) - continue; - else { - //fprintf(stderr, "[event_poll_thread_main] EXIT: error reading\n"); - goto ERROR_READ; - } - } - - if (ret == 0) { - goto ERROR_EOF; - } - - amount_read += ret; + err_t = socket_read_conn(fd, (uint8_t *)&msg_len, SIZE_LEN); + if (err_t != TS_ERR_OKAY) { + return err_t; } // read the event name event_name = (char *)ats_malloc(sizeof(char) * (msg_len + 1)); - amount_read = 0; - while (amount_read < msg_len) { - ret = read(fd, (void *) event_name, msg_len - amount_read); - - if (ret < 0) { - if (errno == EAGAIN) - continue; - else { - goto ERROR_READ; - } - } - - if (ret == 0) { - goto ERROR_EOF; - } - - amount_read += ret; + err_t = socket_read_conn(fd, (uint8_t *)event_name, msg_len); + if (err_t != TS_ERR_OKAY) { + goto fail; } + event_name[msg_len] = '\0'; // end the string // read in event description length - amount_read = 0; - while (amount_read < SIZE_LEN) { - ret = read(fd, (void *) &msg_len, SIZE_LEN - amount_read); - - if (ret < 0) { - if (errno == EAGAIN) - continue; - else { - goto ERROR_READ; - } - } - - if (ret == 0) { - goto ERROR_EOF; - } - - amount_read += ret; + err_t = socket_read_conn(fd, (uint8_t *)&msg_len, SIZE_LEN); + if (err_t != TS_ERR_OKAY) { + goto fail; } // read the event description desc = (char *)ats_malloc(sizeof(char) * (msg_len + 1)); - amount_read = 0; - while (amount_read < msg_len) { - ret = read(fd, (void *) desc, msg_len - amount_read); - - if (ret < 0) { - if (errno == EAGAIN) - continue; - else { - goto ERROR_READ; - } - } - - if (ret == 0) { - goto ERROR_EOF; - } - - amount_read += ret; + err_t = socket_read_conn(fd, (uint8_t *)desc, msg_len); + if (err_t != TS_ERR_OKAY) { + goto fail; } + desc[msg_len] = '\0'; // end the string // fill in event info @@ -1760,13 +1501,8 @@ parse_event_notification(int fd, TSEvent * event) return TS_ERR_OKAY; -ERROR_READ: +fail: ats_free(event_name); ats_free(desc); - return TS_ERR_NET_READ; - -ERROR_EOF: - ats_free(event_name); - ats_free(desc); - return TS_ERR_NET_EOF; + return err_t; } http://git-wip-us.apache.org/repos/asf/trafficserver/blob/93fb1bbf/mgmt/api/NetworkUtilsRemote.h ---------------------------------------------------------------------- diff --git a/mgmt/api/NetworkUtilsRemote.h b/mgmt/api/NetworkUtilsRemote.h index c54d62e..b30ac14 100644 --- a/mgmt/api/NetworkUtilsRemote.h +++ b/mgmt/api/NetworkUtilsRemote.h @@ -64,7 +64,6 @@ TSError disconnect(); TSError reconnect(); TSError reconnect_loop(int num_attempts); TSError connect_and_send(const char *msg, int msg_len); -TSError socket_write_conn(int fd, const char *msg_buf, int bytes); void *socket_test_thread(void *arg); /*****************************************************************************