Below is the current CLD replication patch, which takes CLD from being a single-node service to a fully replicated, highly available service.
The server implementation should be complete. The current merge blocker is needed code in libcldc, which does not yet properly "hunt" for a master, among a group of peer CLD replicas in a CLD cell. This will be a big milestone for CLD, when merged. The next milestone will be adding the needed strict-cache-coherence caching semantics to CLD server and client lib. This patch was generated against git commit 511b8dafb233ee85e60ddf7eda212f87963e150c. --- server/cld.h | 20 +++ server/cldb.c | 69 +++++++++++-- server/cldb.h | 9 + server/cldbadm.c | 8 - server/server.c | 286 +++++++++++++++++++++++++++++++++++++++++++++++++++--- test/pid-exists | 13 +- test/prep-db | 19 ++- test/start-daemon | 26 ++++ test/stop-daemon | 32 ++++-- 9 files changed, 428 insertions(+), 54 deletions(-) diff --git a/server/cld.h b/server/cld.h index 21f103d..08e6b12 100644 --- a/server/cld.h +++ b/server/cld.h @@ -91,6 +91,15 @@ struct msg_params { size_t msg_len; }; +enum st_cldb { + ST_CLDB_INIT, + ST_CLDB_OPEN, + ST_CLDB_ACTIVE, + ST_CLDB_MASTER, + ST_CLDB_SLAVE, + ST_CLDBNUM +}; + struct server_stats { unsigned long poll; /* number polls */ unsigned long event; /* events dispatched */ @@ -114,6 +123,17 @@ struct server { int pid_fd; char *port; /* bind port */ + unsigned short rep_port; /* db4 replication port */ + + char *myhost; + char *force_myhost; + GList *rep_remotes; + + unsigned int n_peers; /* total peers in cell */ + + int rep_pipe[2]; + + enum st_cldb state_cldb, state_cldb_new; struct cldb cldb; /* database info */ diff --git a/server/cldb.c b/server/cldb.c index 3e7c95c..254decd 100644 --- a/server/cldb.c +++ b/server/cldb.c @@ -25,8 +25,6 @@ #include <glib.h> #include "cld.h" -static int cldb_up(struct cldb *cldb, unsigned int flags); - /* * db4 page sizes for our various databases. Filesystem block size * is recommended, so 4096 was chosen (default ext3 block size). @@ -202,6 +200,30 @@ err_out: return -EIO; } +static int add_remote_sites(DB_ENV *dbenv, GList *remotes, int *nsites) +{ + int rc; + struct db_remote *rp; + GList *tmp; + + *nsites = 0; + for (tmp = remotes; tmp; tmp = tmp->next) { + rp = tmp->data; + + rc = dbenv->repmgr_add_remote_site(dbenv, rp->host, rp->port, + NULL, 0); + if (rc) { + dbenv->err(dbenv, rc, + "dbenv->add.remote.site host %s port %u", + rp->host, rp->port); + return rc; + } + (*nsites)++; + } + + return 0; +} + static void db4_event(DB_ENV *dbenv, u_int32_t event, void *event_info) { struct cldb *cldb = dbenv->app_private; @@ -229,12 +251,13 @@ static void db4_event(DB_ENV *dbenv, u_int32_t event, void *event_info) int cldb_init(struct cldb *cldb, const char *db_home, const char *db_password, unsigned int env_flags, const char *errpfx, bool do_syslog, - unsigned int flags, void (*cb)(enum db_event)) + GList *remotes, char *rep_host, unsigned short rep_port, + int n_peers, void (*cb)(enum db_event)) { - int rc; + int rc, nsites = 0; DB_ENV *dbenv; - cldb->is_master = true; + cldb->is_master = false; cldb->home = db_home; cldb->state_cb = cb; @@ -281,25 +304,55 @@ int cldb_init(struct cldb *cldb, const char *db_home, const char *db_password, cldb->keyed = true; } + rc = dbenv->repmgr_set_local_site(dbenv, rep_host, rep_port, 0); + if (rc) { + dbenv->err(dbenv, rc, "dbenv->set_local_site"); + goto err_out; + } + rc = dbenv->set_event_notify(dbenv, db4_event); if (rc) { dbenv->err(dbenv, rc, "dbenv->set_event_notify"); goto err_out; } + rc = dbenv->rep_set_priority(dbenv, 100); + if (rc) { + dbenv->err(dbenv, rc, "dbenv->rep_set_priority"); + goto err_out; + } + + rc = dbenv->rep_set_nsites(dbenv, n_peers); + if (rc) { + dbenv->err(dbenv, rc, "dbenv->rep_set_nsites"); + goto err_out; + } + + rc = dbenv->repmgr_set_ack_policy(dbenv, DB_REPMGR_ACKS_QUORUM); + if (rc) { + dbenv->err(dbenv, rc, "dbenv->rep_ack_policy"); + goto err_out; + } + /* init DB transactional environment, stored in directory db_home */ env_flags |= DB_INIT_LOG | DB_INIT_LOCK | DB_INIT_MPOOL; - env_flags |= DB_INIT_TXN; + env_flags |= DB_INIT_TXN | DB_INIT_REP; rc = dbenv->open(dbenv, db_home, env_flags, S_IRUSR | S_IWUSR); if (rc) { dbenv->err(dbenv, rc, "dbenv->open"); goto err_out; } - rc = cldb_up(cldb, flags); + rc = add_remote_sites(dbenv, remotes, &nsites); if (rc) goto err_out; + rc = dbenv->repmgr_start(dbenv, 2, DB_REP_ELECTION); + if (rc) { + dbenv->err(dbenv, rc, "dbenv->repmgr_start"); + goto err_out; + } + return 0; err_out: @@ -310,7 +363,7 @@ err_out: /* * open databases */ -static int cldb_up(struct cldb *cldb, unsigned int flags) +int cldb_up(struct cldb *cldb, unsigned int flags) { DB_ENV *dbenv = cldb->env; int rc; diff --git a/server/cldb.h b/server/cldb.h index d28f732..f8f26db 100644 --- a/server/cldb.h +++ b/server/cldb.h @@ -107,6 +107,11 @@ enum db_event { CLDB_EV_NONE, CLDB_EV_CLIENT, CLDB_EV_MASTER, CLDB_EV_ELECTED }; +struct db_remote { /* remotes for cldb_init */ + char *host; + unsigned short port; +}; + struct cldb { bool is_master; bool keyed; /* using encryption? */ @@ -133,7 +138,9 @@ struct cldb { extern int cldb_init(struct cldb *cldb, const char *db_home, const char *db_password, unsigned int env_flags, const char *errpfx, bool do_syslog, - unsigned int flags, void (*cb)(enum db_event)); + GList *remotes, char *rep_host, unsigned short rep_port, + int n_peers, void (*cb)(enum db_event)); +extern int cldb_up(struct cldb *cldb, unsigned int flags); extern void cldb_down(struct cldb *cldb); extern void cldb_fini(struct cldb *cldb); diff --git a/server/cldbadm.c b/server/cldbadm.c index 37e8e36..9342f66 100644 --- a/server/cldbadm.c +++ b/server/cldbadm.c @@ -78,7 +78,8 @@ int main(int argc, char *argv[]) } if (cldb_init(&cld_adm.cldb, cld_adm.data_dir, NULL, - DB_RECOVER, "cldbadm", false, 0, NULL)) + DB_RECOVER, "cldbadm", false, + NULL, NULL, 0, 0, NULL)) goto err_dbopen; switch (cld_adm.mode) { @@ -142,8 +143,9 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) * Stubs for contents of cldb.c */ int cldb_init(struct cldb *cldb, const char *db_home, const char *db_password, - unsigned int env_flags, const char *errpfx, bool do_syslog, - unsigned int flags, void (*cb)(enum db_event)) + unsigned int env_flags, const char *errpfx, bool do_syslog, + GList *remotes, char *rep_host, unsigned short rep_port, + int n_peers, void (*cb)(enum db_event)) { return 0; diff --git a/server/server.c b/server/server.c index 02e6231..fb51a42 100644 --- a/server/server.c +++ b/server/server.c @@ -29,6 +29,7 @@ #include <errno.h> #include <syslog.h> #include <locale.h> +#include <ctype.h> #include <argp.h> #include <netdb.h> #include <signal.h> @@ -46,6 +47,12 @@ const char *argp_program_version = PACKAGE_VERSION; enum { CLD_RAW_MSG_SZ = 4096, + + CLD_DEF_REP_PORT = 9081, + + CLD_DEF_PEERS = 5, + CLD_MIN_PEERS = 3, + CLD_MAX_PEERS = 400, /* arbitrary "sanity" limit */ }; static struct argp_option options[] = { @@ -58,10 +65,18 @@ static struct argp_option options[] = { "Switch the log to standard error" }, { "foreground", 'F', NULL, 0, "Run in foreground, do not fork" }, + { "myhost", 'm', "HOST", 0, + "Force local hostname to HOST (def: autodetect)" }, { "port", 'p', "PORT", 0, "bind to UDP port PORT. Default: " CLD_DEF_PORT }, { "pid", 'P', "FILE", 0, "Write daemon process id to FILE. Default: " CLD_DEF_PIDFN }, + { "rep-port", 'r', "PORT", 0, + "bind replication engine to port PORT (def: 9081)" }, + { "remote", 'R', "HOST:PORT", 0, + "Add a HOST:PORT pair to list of remote hosts. Use this argument multiple times to build cell's peer list." }, + { "cell-size", 'S', "PEERS", 0, + "Total number of PEERS in cell. (PEERS/2)+1 required for quorum. Must be an odd number (def: 5)" }, { } }; @@ -79,10 +94,15 @@ static bool use_syslog = true; int debugging = 0; struct timeval current_time; +static const char *state_name_cldb[ST_CLDBNUM] = { + "Init", "Open", "Active", "Master", "Slave" +}; struct server cld_srv = { - .data_dir = CLD_DEF_DATADIR, - .pid_file = CLD_DEF_PIDFN, + .data_dir = "/spare/tmp/cld/lib", + .pid_file = "/var/run/cld.pid", .port = CLD_DEF_PORT, + .rep_port = CLD_DEF_REP_PORT, + .n_peers = CLD_DEF_PEERS, }; static void ensure_root(void); @@ -108,6 +128,33 @@ void cldlog(int prio, const char *fmt, ...) va_end(ap); } +/* + * Find out own hostname. + * This is needed for: + * - finding the local domain and its SRV records + * Do this before our state machines start ticking, so we can quit with + * a meaningful message easily. + */ +static char *get_hostname(void) +{ + enum { hostsz = 64 }; + char hostb[hostsz]; + char *ret; + + if (gethostname(hostb, hostsz-1) < 0) { + cldlog(LOG_ERR, "get_hostname: gethostname error (%d): %s", + errno, strerror(errno)); + exit(1); + } + hostb[hostsz-1] = 0; + if ((ret = strdup(hostb)) == NULL) { + cldlog(LOG_ERR, "get_hostname: no core (%ld)", + (long)strlen(hostb)); + exit(1); + } + return ret; +} + int udp_tx(struct server_socket *sock, struct sockaddr *addr, socklen_t addr_len, const void *data, size_t data_len) { @@ -484,6 +531,55 @@ static void cldb_checkpoint(struct timer *timer) add_chkpt_timer(); } +static void cldb_state_cb(enum db_event event) +{ + + switch (event) { + case CLDB_EV_ELECTED: + /* + * Safe to stop ignoring bogus client indication, + * so unmute us by advancing the state. + */ + if (cld_srv.state_cldb == ST_CLDB_OPEN) + cld_srv.state_cldb = ST_CLDB_ACTIVE; + break; + case CLDB_EV_CLIENT: + case CLDB_EV_MASTER: + /* + * This callback runs on the context of the replication + * manager thread, and calling any of our functions thus + * turns our program into a multi-threaded one. Instead + * we do a loopbreak and postpone the processing. + */ + if (cld_srv.state_cldb != ST_CLDB_INIT && + cld_srv.state_cldb != ST_CLDB_OPEN) { + char c = 0x42; + + if (event == CLDB_EV_MASTER) + cld_srv.state_cldb_new = ST_CLDB_MASTER; + else + cld_srv.state_cldb_new = ST_CLDB_SLAVE; + if (debugging) { + cldlog(LOG_DEBUG, "CLDB state > %s", + state_name_cldb[cld_srv.state_cldb_new]); + } + + /* wake up main loop */ + write(cld_srv.rep_pipe[1], &c, 1); + } + break; + default: + cldlog(LOG_WARNING, "API confusion with CLDB, event 0x%x", event); + cld_srv.state_cldb = ST_CLDB_OPEN; /* wrong, stub for now */ + cld_srv.state_cldb_new = ST_CLDB_INIT; + } +} + +static bool noop_event(int fd, short events, void *userdata) +{ + return true; /* continue main loop; do NOT terminate server */ +} + static int net_open(void) { int ipv6_found; @@ -575,6 +671,32 @@ err_addr: return rc; } +static void cldb_state_process(enum st_cldb new_state) +{ + unsigned int db_flags; + + if ((new_state == ST_CLDB_MASTER || new_state == ST_CLDB_SLAVE) && + cld_srv.state_cldb == ST_CLDB_ACTIVE) { + + db_flags = DB_CREATE | DB_THREAD; + if (cldb_up(&cld_srv.cldb, db_flags)) + return; + + ensure_root(); + + if (sess_load(cld_srv.sessions) != 0) { + cldlog(LOG_ERR, "session load failed. FIXME: I want error handling"); + return; + } + + add_chkpt_timer(); + } else { + if (debugging) + cldlog(LOG_DEBUG, "unhandled state transition %d -> %d", + cld_srv.state_cldb, new_state); + } +} + static void segv_signal(int signal) { cldlog(LOG_ERR, "SIGSEGV"); @@ -598,10 +720,59 @@ static void stats_dump(void) { X(poll); X(event); + cldlog(LOG_INFO, "State: CLDB %s", + state_name_cldb[cld_srv.state_cldb]); } #undef X +static bool add_remote(const char *arg) +{ + size_t arg_len = strlen(arg); + int i, port; + struct db_remote *rp; + char *s_port, *colon; + + if (!arg_len) + return false; + + /* verify no whitespace in input */ + for (i = 0; i < arg_len; i++) + if (isspace(arg[i])) + return false; + + /* find colon delimiter */ + colon = strchr(arg, ':'); + if (!colon || (colon == arg)) + return false; + s_port = colon + 1; + + /* parse replication port number */ + port = atoi(s_port); + if (port < 1 || port > 65535) + return false; + + /* alloc and fill in remote-host record */ + rp = malloc(sizeof(*rp)); + if (!rp) + return false; + + rp->port = port; + rp->host = strdup(arg); + if (!rp->host) { + free(rp); + return false; + } + + /* truncate string down to simply hostname portion */ + rp->host[colon - arg] = 0; + + /* add remote host to global list */ + cld_srv.rep_remotes = g_list_append(cld_srv.rep_remotes, rp); + + return true; +} + static error_t parse_opt (int key, char *arg, struct argp_state *state) { switch(key) { @@ -622,6 +793,15 @@ static error_t parse_opt (int key, char *arg, struct argp_state *state) case 'F': cld_srv.flags |= SFL_FOREGROUND; break; + case 'm': + if ((strlen(arg) > 3) && (strlen(arg) < 64) && + (strchr(arg, '.'))) + cld_srv.force_myhost = arg; + else { + fprintf(stderr, "invalid myhost: '%s'\n", arg); + argp_usage(state); + } + break; case 'p': if (atoi(arg) > 0 && atoi(arg) < 65536) cld_srv.port = arg; @@ -633,6 +813,31 @@ static error_t parse_opt (int key, char *arg, struct argp_state *state) case 'P': cld_srv.pid_file = arg; break; + case 'r': + if (atoi(arg) > 0 && atoi(arg) < 65536) + cld_srv.rep_port = atoi(arg); + else { + fprintf(stderr, "invalid rep-port: '%s'\n", arg); + argp_usage(state); + } + break; + case 'R': + if (!add_remote(arg)) { + fprintf(stderr, "invalid remote host:port: '%s'\n", arg); + argp_usage(state); + } + break; + case 'S': { + int n_peers = atoi(arg); + if ((n_peers >= CLD_MIN_PEERS) && (n_peers < CLD_MAX_PEERS) && + (n_peers & 0x01)) + cld_srv.n_peers = atoi(arg); + else { + fprintf(stderr, "invalid peer count: '%s'\n", arg); + argp_usage(state); + } + break; + } case ARGP_KEY_ARG: argp_usage(state); /* too many args */ break; @@ -648,9 +853,12 @@ static error_t parse_opt (int key, char *arg, struct argp_state *state) int main (int argc, char *argv[]) { error_t aprc; - int rc = 1; + int rc = 1, env_flags; time_t next_timeout; + cld_srv.state_cldb = + cld_srv.state_cldb_new = ST_CLDB_INIT; + /* isspace() and strcasecmp() consistency requires this */ setlocale(LC_ALL, "C"); @@ -674,6 +882,20 @@ int main (int argc, char *argv[]) if (use_syslog) openlog(PROGRAM_NAME, LOG_PID, LOG_LOCAL3); + if (cld_srv.force_myhost) + cld_srv.myhost = strdup(cld_srv.force_myhost); + else + cld_srv.myhost = get_hostname(); + + if (debugging) + cldlog(LOG_DEBUG, "our hostname: %s", cld_srv.myhost); + + /* remotes file should list all in peer group, except for us */ + if ((cld_srv.n_peers - 1) != g_list_length(cld_srv.rep_remotes)) { + cldlog(LOG_ERR, "n_peers does not match remotes file loaded"); + goto err_out; + } + if (!(cld_srv.flags & SFL_FOREGROUND) && (daemon(1, !use_syslog) < 0)) { syslogerr("daemon"); goto err_out; @@ -694,16 +916,7 @@ int main (int argc, char *argv[]) signal(SIGTERM, term_signal); signal(SIGUSR1, stats_signal); - if (cldb_init(&cld_srv.cldb, cld_srv.data_dir, NULL, - DB_CREATE | DB_THREAD | DB_RECOVER, - "cld", use_syslog, - DB_CREATE | DB_THREAD, NULL)) - exit(1); - - ensure_root(); - timer_init(&cld_srv.chkpt_timer, cldb_checkpoint, NULL); - add_chkpt_timer(); rc = 1; @@ -716,17 +929,53 @@ int main (int argc, char *argv[]) !cld_srv.polls) goto err_out_pid; - if (sess_load(cld_srv.sessions) != 0) - goto err_out_pid; + /* init pipe for replication manager notifications to us */ + if (pipe(cld_srv.rep_pipe) < 0) { + syslogerr("pipe"); + goto err_out; + } /* set up server networking */ rc = net_open(); if (rc) goto err_out_pid; + { + struct pollfd pfd; + struct server_poll sp; + + /* + * add pipe to poll list, after doing so with our net sockets + */ + sp.fd = cld_srv.rep_pipe[0]; + sp.cb = noop_event; + sp.userdata = NULL; + g_array_append_val(cld_srv.poll_data, sp); + + pfd.fd = cld_srv.rep_pipe[0]; + pfd.events = POLLIN; + pfd.revents = 0; + g_array_append_val(cld_srv.polls, pfd); + } + + env_flags = DB_RECOVER | DB_CREATE | DB_THREAD; + if (cldb_init(&cld_srv.cldb, cld_srv.data_dir, NULL, + env_flags, "cld", true, + cld_srv.rep_remotes, + cld_srv.myhost, cld_srv.rep_port, + cld_srv.n_peers, cldb_state_cb)) { + cldlog(LOG_ERR, "Failed to open CLDB, limping"); + } else { + cld_srv.state_cldb = + cld_srv.state_cldb_new = ST_CLDB_OPEN; + } + cldlog(LOG_INFO, "initialized: cport %s, dbg %u", cld_srv.port, debugging); + cldlog(LOG_INFO, "replication: %s:%u", + cld_srv.myhost, + cld_srv.rep_port); next_timeout = timers_run(); @@ -789,13 +1038,20 @@ int main (int argc, char *argv[]) } next_timeout = timers_run(); + + if (cld_srv.state_cldb_new != ST_CLDB_INIT && + cld_srv.state_cldb_new != cld_srv.state_cldb) { + cldb_state_process(cld_srv.state_cldb_new); + cld_srv.state_cldb = cld_srv.state_cldb_new; + } } cldlog(LOG_INFO, "shutting down"); if (cld_srv.cldb.up) cldb_down(&cld_srv.cldb); - cldb_fini(&cld_srv.cldb); + if (cld_srv.state_cldb >= ST_CLDB_OPEN) + cldb_fini(&cld_srv.cldb); rc = 0; diff --git a/test/pid-exists b/test/pid-exists index 351b4f1..4fa2275 100755 --- a/test/pid-exists +++ b/test/pid-exists @@ -1,9 +1,12 @@ #!/bin/sh -if [ ! -f cld.pid ] -then - echo "pid file not found." - exit 1 -fi +for n in 1 2 3 +do + if [ ! -f cld$n.pid ] + then + echo "cld$n.pid not found." + exit 1 + fi +done exit 0 diff --git a/test/prep-db b/test/prep-db index 353ca4a..3e4fb60 100755 --- a/test/prep-db +++ b/test/prep-db @@ -2,13 +2,16 @@ DATADIR=data -mkdir -p $DATADIR - -if [ ! -d $DATADIR ] -then - rm -rf $DATADIR - echo "test database dir not found." - exit 1 -fi +for n in 1 2 3 +do + mkdir -p $DATADIR/n$n/data + + if [ ! -d $DATADIR/n$n/data ] + then + rm -rf $DATADIR + echo "test database dir for node $n not found." + exit 1 + fi +done exit 0 diff --git a/test/start-daemon b/test/start-daemon index 4cb9fd7..06b3250 100755 --- a/test/start-daemon +++ b/test/start-daemon @@ -1,13 +1,31 @@ #!/bin/sh -if [ -f cld.pid ] +if [ -f cld1.pid -o -f cld2.pid -o -f cld3.pid ] then - echo "pid file found. daemon still running?" + echo "pid file found. daemons still running?" exit 1 fi -../server/cld -P cld.pid -d "$PWD/data" -p 18181 -E +../server/cld -d "$PWD/data/n1/data" -p 18181 -r 19181 -P cld1.pid -E \ + -D 2 -S 3 \ + -m localhost.localdomain \ + -R localhost.localdomain:19182 \ + -R localhost.localdomain:19183 -sleep 3 +../server/cld -d "$PWD/data/n2/data" -p 18182 -r 19182 -P cld2.pid -E \ + -D 2 -S 3 \ + -m localhost.localdomain \ + -R localhost.localdomain:19181 \ + -R localhost.localdomain:19183 + +../server/cld -d "$PWD/data/n3/data" -p 18183 -r 19183 -P cld3.pid -E \ + -D 2 -S 3 \ + -m localhost.localdomain \ + -R localhost.localdomain:19181 \ + -R localhost.localdomain:19182 +sleep 1 + +echo " start-daemon: Waiting 20s, for daemons to start up..." +sleep 20 exit 0 diff --git a/test/stop-daemon b/test/stop-daemon index 221dc46..d00fda6 100755 --- a/test/stop-daemon +++ b/test/stop-daemon @@ -1,23 +1,35 @@ #!/bin/sh -if [ ! -f cld.pid ] -then - echo no daemon pid file found. - exit 1 -fi +for n in 1 2 3 +do + if [ ! -f cld$n.pid ] + then + echo " stop-daemon: cld$n.pid not found." + exit 1 + fi +done + -kill `cat cld.pid` +kill `cat cld1.pid cld2.pid cld3.pid` for ((n = 0; n < 10; n++)) do - if [ ! -f cld.pid ] + if [ -f cld1.pid -o -f cld2.pid -o -f cld3.pid ] then + sleep 1 + else exit 0 fi - sleep 1 done -echo "PID file not removed, after signal sent." -rm -f cld.pid +for n in 1 2 3 +do + if [ -f cld$n.pid ] + then + echo " stop-daemon: cld$n.pid found, after signal sent." + fi +done + +rm -f cld?.pid exit 1 -- To unsubscribe from this list: send the line "unsubscribe hail-devel" in the body of a message to majord...@vger.kernel.org More majordomo info at http://vger.kernel.org/majordomo-info.html