As we discussed at the cluster summit, increasing the message size
inside corosync itself is not only dangerous, but is only needed for a
very few corner cases .. all of which involve CPG.
So, to allow large CPG messages (which is needed) I have added an extra
facility to libcpg that will fragment messages that are too large for
corosync's internal buffers. It does this transparently to the
application. zero-copy sends are NOT supported for this feature.
I've also included a test program 'cpghum' that can test this facility
with message sequence numbers and checksums.
Signed-Off-By: Christine Caulfield <[email protected]>
diff --git a/exec/cpg.c b/exec/cpg.c
index 1c6fbb9..3d83982 100644
--- a/exec/cpg.c
+++ b/exec/cpg.c
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2006-2012 Red Hat, Inc.
+ * Copyright (c) 2006-2015 Red Hat, Inc.
*
* All rights reserved.
*
@@ -83,7 +83,8 @@ enum cpg_message_req_types {
MESSAGE_REQ_EXEC_CPG_JOINLIST = 2,
MESSAGE_REQ_EXEC_CPG_MCAST = 3,
MESSAGE_REQ_EXEC_CPG_DOWNLIST_OLD = 4,
- MESSAGE_REQ_EXEC_CPG_DOWNLIST = 5
+ MESSAGE_REQ_EXEC_CPG_DOWNLIST = 5,
+ MESSAGE_REQ_EXEC_CPG_PARTIAL_MCAST = 6,
};
struct zcb_mapped {
@@ -224,6 +225,10 @@ static void message_handler_req_exec_cpg_mcast (
const void *message,
unsigned int nodeid);
+static void message_handler_req_exec_cpg_partial_mcast (
+ const void *message,
+ unsigned int nodeid);
+
static void message_handler_req_exec_cpg_downlist_old (
const void *message,
unsigned int nodeid);
@@ -238,6 +243,8 @@ static void exec_cpg_joinlist_endian_convert (void *msg);
static void exec_cpg_mcast_endian_convert (void *msg);
+static void exec_cpg_partial_mcast_endian_convert (void *msg);
+
static void exec_cpg_downlist_endian_convert_old (void *msg);
static void exec_cpg_downlist_endian_convert (void *msg);
@@ -250,6 +257,8 @@ static void message_handler_req_lib_cpg_finalize (void *conn, const void *messag
static void message_handler_req_lib_cpg_mcast (void *conn, const void *message);
+static void message_handler_req_lib_cpg_partial_mcast (void *conn, const void *message);
+
static void message_handler_req_lib_cpg_membership (void *conn,
const void *message);
@@ -383,7 +392,10 @@ static struct corosync_lib_handler cpg_lib_engine[] =
.lib_handler_fn = message_handler_req_lib_cpg_zc_execute,
.flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
},
-
+ { /* 12 */
+ .lib_handler_fn = message_handler_req_lib_cpg_partial_mcast,
+ .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
+ },
};
@@ -413,6 +425,10 @@ static struct corosync_exec_handler cpg_exec_engine[] =
.exec_handler_fn = message_handler_req_exec_cpg_downlist,
.exec_endian_convert_fn = exec_cpg_downlist_endian_convert
},
+ { /* 6 - MESSAGE_REQ_EXEC_CPG_PARTIAL_MCAST */
+ .exec_handler_fn = message_handler_req_exec_cpg_partial_mcast,
+ .exec_endian_convert_fn = exec_cpg_partial_mcast_endian_convert
+ },
};
struct corosync_service_engine cpg_service_engine = {
@@ -457,6 +473,17 @@ struct req_exec_cpg_mcast {
mar_uint8_t message[] __attribute__((aligned(8)));
};
+struct req_exec_cpg_partial_mcast {
+ struct qb_ipc_request_header header __attribute__((aligned(8)));
+ mar_cpg_name_t group_name __attribute__((aligned(8)));
+ mar_uint32_t msglen __attribute__((aligned(8)));
+ mar_uint32_t fraglen __attribute__((aligned(8)));
+ mar_uint32_t pid __attribute__((aligned(8)));
+ mar_uint32_t type __attribute__((aligned(8)));
+ mar_message_source_t source __attribute__((aligned(8)));
+ mar_uint8_t message[] __attribute__((aligned(8)));
+};
+
struct req_exec_cpg_downlist_old {
struct qb_ipc_request_header header __attribute__((aligned(8)));
mar_uint32_t left_nodes __attribute__((aligned(8)));
@@ -1186,6 +1213,19 @@ static void exec_cpg_mcast_endian_convert (void *msg)
swab_mar_message_source_t (&req_exec_cpg_mcast->source);
}
+static void exec_cpg_partial_mcast_endian_convert (void *msg)
+{
+ struct req_exec_cpg_partial_mcast *req_exec_cpg_mcast = msg;
+
+ swab_coroipc_request_header_t (&req_exec_cpg_mcast->header);
+ swab_mar_cpg_name_t (&req_exec_cpg_mcast->group_name);
+ req_exec_cpg_mcast->pid = swab32(req_exec_cpg_mcast->pid);
+ req_exec_cpg_mcast->msglen = swab32(req_exec_cpg_mcast->msglen);
+ req_exec_cpg_mcast->fraglen = swab32(req_exec_cpg_mcast->fraglen);
+ req_exec_cpg_mcast->type = swab32(req_exec_cpg_mcast->type);
+ swab_mar_message_source_t (&req_exec_cpg_mcast->source);
+}
+
static struct process_info *process_info_find(const mar_cpg_name_t *group_name, uint32_t pid, unsigned int nodeid) {
struct list_head *iter;
@@ -1452,6 +1492,67 @@ static void message_handler_req_exec_cpg_mcast (
}
}
}
+static void message_handler_req_exec_cpg_partial_mcast (
+ const void *message,
+ unsigned int nodeid)
+{
+ const struct req_exec_cpg_partial_mcast *req_exec_cpg_mcast = message;
+ struct res_lib_cpg_partial_deliver_callback res_lib_cpg_mcast;
+ int msglen = req_exec_cpg_mcast->fraglen;
+ struct list_head *iter, *pi_iter;
+ struct cpg_pd *cpd;
+ struct iovec iovec[2];
+ int known_node = 0;
+
+ log_printf(LOGSYS_LEVEL_DEBUG, "Got fragmented message from node %d, size = %d bytes\n", nodeid, msglen);
+
+ res_lib_cpg_mcast.header.id = MESSAGE_RES_CPG_PARTIAL_DELIVER_CALLBACK;
+ res_lib_cpg_mcast.header.size = sizeof(res_lib_cpg_mcast) + msglen;
+ res_lib_cpg_mcast.fraglen = msglen;
+ res_lib_cpg_mcast.msglen = req_exec_cpg_mcast->msglen;
+ res_lib_cpg_mcast.pid = req_exec_cpg_mcast->pid;
+ res_lib_cpg_mcast.type = req_exec_cpg_mcast->type;
+ res_lib_cpg_mcast.nodeid = nodeid;
+
+ memcpy(&res_lib_cpg_mcast.group_name, &req_exec_cpg_mcast->group_name,
+ sizeof(mar_cpg_name_t));
+ iovec[0].iov_base = (void *)&res_lib_cpg_mcast;
+ iovec[0].iov_len = sizeof (res_lib_cpg_mcast);
+
+ iovec[1].iov_base = (char*)message+sizeof(*req_exec_cpg_mcast);
+ iovec[1].iov_len = msglen;
+
+ for (iter = cpg_pd_list_head.next; iter != &cpg_pd_list_head; ) {
+ cpd = list_entry(iter, struct cpg_pd, list);
+ iter = iter->next;
+
+ if ((cpd->cpd_state == CPD_STATE_LEAVE_STARTED || cpd->cpd_state == CPD_STATE_JOIN_COMPLETED)
+ && (mar_name_compare (&cpd->group_name, &req_exec_cpg_mcast->group_name) == 0)) {
+
+ if (!known_node) {
+ /* Try to find, if we know the node */
+ for (pi_iter = process_info_list_head.next;
+ pi_iter != &process_info_list_head; pi_iter = pi_iter->next) {
+
+ struct process_info *pi = list_entry (pi_iter, struct process_info, list);
+
+ if (pi->nodeid == nodeid &&
+ mar_name_compare (&pi->group, &req_exec_cpg_mcast->group_name) == 0) {
+ known_node = 1;
+ break;
+ }
+ }
+ }
+
+ if (!known_node) {
+ log_printf(LOGSYS_LEVEL_WARNING, "Unknown node -> we will not deliver message");
+ return ;
+ }
+
+ api->ipc_dispatch_iov_send (cpd->conn, iovec, 2);
+ }
+ }
+}
static int cpg_exec_send_downlist(void)
@@ -1864,6 +1965,62 @@ static void message_handler_req_lib_cpg_zc_free (
res_header.size);
}
+/* Fragmented mcast message from the library */
+static void message_handler_req_lib_cpg_partial_mcast (void *conn, const void *message)
+{
+ const struct req_lib_cpg_partial_mcast *req_lib_cpg_mcast = message;
+ struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
+ mar_cpg_name_t group_name = cpd->group_name;
+
+ struct iovec req_exec_cpg_iovec[2];
+ struct req_exec_cpg_partial_mcast req_exec_cpg_mcast;
+ int msglen = req_lib_cpg_mcast->fraglen;
+ int result;
+ cs_error_t error = CS_ERR_NOT_EXIST;
+
+ log_printf(LOGSYS_LEVEL_TRACE, "got fragmented mcast request on %p", conn);
+ log_printf(LOGSYS_LEVEL_DEBUG, "Sending fragmented message size = %d bytes\n", msglen);
+
+ switch (cpd->cpd_state) {
+ case CPD_STATE_UNJOINED:
+ error = CS_ERR_NOT_EXIST;
+ break;
+ case CPD_STATE_LEAVE_STARTED:
+ error = CS_ERR_NOT_EXIST;
+ break;
+ case CPD_STATE_JOIN_STARTED:
+ error = CS_OK;
+ break;
+ case CPD_STATE_JOIN_COMPLETED:
+ error = CS_OK;
+ break;
+ }
+
+ if (error == CS_OK) {
+ req_exec_cpg_mcast.header.size = sizeof(req_exec_cpg_mcast) + msglen;
+ req_exec_cpg_mcast.header.id = SERVICE_ID_MAKE(CPG_SERVICE,
+ MESSAGE_REQ_EXEC_CPG_PARTIAL_MCAST);
+ req_exec_cpg_mcast.pid = cpd->pid;
+ req_exec_cpg_mcast.msglen = req_lib_cpg_mcast->msglen;
+ req_exec_cpg_mcast.type = req_lib_cpg_mcast->type;
+ req_exec_cpg_mcast.fraglen = req_lib_cpg_mcast->fraglen;
+ api->ipc_source_set (&req_exec_cpg_mcast.source, conn);
+ memcpy(&req_exec_cpg_mcast.group_name, &group_name,
+ sizeof(mar_cpg_name_t));
+
+ req_exec_cpg_iovec[0].iov_base = (char *)&req_exec_cpg_mcast;
+ req_exec_cpg_iovec[0].iov_len = sizeof(req_exec_cpg_mcast);
+ req_exec_cpg_iovec[1].iov_base = (char *)&req_lib_cpg_mcast->message;
+ req_exec_cpg_iovec[1].iov_len = msglen;
+
+ result = api->totem_mcast (req_exec_cpg_iovec, 2, TOTEM_AGREED);
+ assert(result == 0);
+ } else {
+ log_printf(LOGSYS_LEVEL_ERROR, "*** %p can't mcast to group %s state:%d, error:%d",
+ conn, group_name.value, cpd->cpd_state, error);
+ }
+}
+
/* Mcast message from the library */
static void message_handler_req_lib_cpg_mcast (void *conn, const void *message)
{
diff --git a/include/corosync/ipc_cpg.h b/include/corosync/ipc_cpg.h
index a95335a..d7000a5 100644
--- a/include/corosync/ipc_cpg.h
+++ b/include/corosync/ipc_cpg.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2006-2011 Red Hat, Inc.
+ * Copyright (c) 2006-2015 Red Hat, Inc.
*
* All rights reserved.
*
@@ -55,6 +55,7 @@ enum req_cpg_types {
MESSAGE_REQ_CPG_ZC_ALLOC = 9,
MESSAGE_REQ_CPG_ZC_FREE = 10,
MESSAGE_REQ_CPG_ZC_EXECUTE = 11,
+ MESSAGE_REQ_CPG_PARTIAL_MCAST = 12,
};
enum res_cpg_types {
@@ -75,6 +76,7 @@ enum res_cpg_types {
MESSAGE_RES_CPG_ZC_ALLOC = 14,
MESSAGE_RES_CPG_ZC_FREE = 15,
MESSAGE_RES_CPG_ZC_EXECUTE = 16,
+ MESSAGE_RES_CPG_PARTIAL_DELIVER_CALLBACK = 17,
};
enum lib_cpg_confchg_reason {
@@ -85,6 +87,12 @@ enum lib_cpg_confchg_reason {
CONFCHG_CPG_REASON_PROCDOWN = 5
};
+enum lib_cpg_partial_types {
+ LIBCPG_PARTIAL_FIRST = 1,
+ LIBCPG_PARTIAL_CONTINUED = 2,
+ LIBCPG_PARTIAL_LAST = 3,
+};
+
typedef struct {
uint32_t length __attribute__((aligned(8)));
char value[CPG_MAX_NAME_LENGTH] __attribute__((aligned(8)));
@@ -207,6 +215,15 @@ struct req_lib_cpg_mcast {
mar_uint8_t message[] __attribute__((aligned(8)));
};
+struct req_lib_cpg_partial_mcast {
+ struct qb_ipc_response_header header __attribute__((aligned(8)));
+ mar_uint32_t guarantee __attribute__((aligned(8)));
+ mar_uint32_t msglen __attribute__((aligned(8)));
+ mar_uint32_t fraglen __attribute__((aligned(8)));
+ mar_uint32_t type __attribute__((aligned(8)));
+ mar_uint8_t message[] __attribute__((aligned(8)));
+};
+
struct res_lib_cpg_mcast {
struct qb_ipc_response_header header __attribute__((aligned(8)));
};
@@ -223,6 +240,17 @@ struct res_lib_cpg_deliver_callback {
mar_uint8_t message[] __attribute__((aligned(8)));
};
+struct res_lib_cpg_partial_deliver_callback {
+ struct qb_ipc_response_header header __attribute__((aligned(8)));
+ mar_cpg_name_t group_name __attribute__((aligned(8)));
+ mar_uint32_t msglen __attribute__((aligned(8)));
+ mar_uint32_t fraglen __attribute__((aligned(8)));
+ mar_uint32_t nodeid __attribute__((aligned(8)));
+ mar_uint32_t pid __attribute__((aligned(8)));
+ mar_uint32_t type __attribute__((aligned(8)));
+ mar_uint8_t message[] __attribute__((aligned(8)));
+};
+
struct res_lib_cpg_flowcontrol_callback {
struct qb_ipc_response_header header __attribute__((aligned(8)));
mar_uint32_t flow_control_state __attribute__((aligned(8)));
diff --git a/lib/cpg.c b/lib/cpg.c
index 4b92f44..376914c 100644
--- a/lib/cpg.c
+++ b/lib/cpg.c
@@ -1,7 +1,7 @@
/*
* vi: set autoindent tabstop=4 shiftwidth=4 :
*
- * Copyright (c) 2006-2012 Red Hat, Inc.
+ * Copyright (c) 2006-2015 Red Hat, Inc.
*
* All rights reserved.
*
@@ -83,6 +83,14 @@ struct cpg_inst {
cpg_model_v1_data_t model_v1_data;
};
struct list_head iteration_list_head;
+ uint32_t max_msg_size;
+ char *assembly_buf;
+ uint32_t assembly_buf_ptr;
+ int assembling; /* Flag that says we have started assembling a message.
+ * It's here to catch the situation where a node joins
+ * the cluster/group in the middle of a CPG message send
+ * so we don't pass on a partial message to the client.
+ */
};
static void cpg_inst_free (void *inst);
@@ -210,6 +218,8 @@ cs_error_t cpg_model_initialize (
}
}
+ /* Allow space for corosync internal headers */
+ cpg_inst->max_msg_size = IPC_REQUEST_SIZE - 1024;
cpg_inst->model_data.model = model;
cpg_inst->context = context;
@@ -339,6 +349,7 @@ cs_error_t cpg_dispatch (
struct cpg_inst *cpg_inst;
struct res_lib_cpg_confchg_callback *res_cpg_confchg_callback;
struct res_lib_cpg_deliver_callback *res_cpg_deliver_callback;
+ struct res_lib_cpg_partial_deliver_callback *res_cpg_partial_deliver_callback;
struct res_lib_cpg_totem_confchg_callback *res_cpg_totem_confchg_callback;
struct cpg_inst cpg_inst_copy;
struct qb_ipc_response_header *dispatch_data;
@@ -361,7 +372,7 @@ cs_error_t cpg_dispatch (
/*
* Timeout instantly for CS_DISPATCH_ONE_NONBLOCKING or CS_DISPATCH_ALL and
- * wait indefinately for CS_DISPATCH_ONE or CS_DISPATCH_BLOCKING
+ * wait indefinitely for CS_DISPATCH_ONE or CS_DISPATCH_BLOCKING
*/
if (dispatch_types == CS_DISPATCH_ALL || dispatch_types == CS_DISPATCH_ONE_NONBLOCKING) {
timeout = 0;
@@ -428,6 +439,43 @@ cs_error_t cpg_dispatch (
res_cpg_deliver_callback->msglen);
break;
+ case MESSAGE_RES_CPG_PARTIAL_DELIVER_CALLBACK:
+ res_cpg_partial_deliver_callback = (struct res_lib_cpg_partial_deliver_callback *)dispatch_data;
+
+ marshall_from_mar_cpg_name_t (
+ &group_name,
+ &res_cpg_partial_deliver_callback->group_name);
+
+ if (res_cpg_partial_deliver_callback->type == LIBCPG_PARTIAL_FIRST) {
+ /*
+ * Allocate a buffer to contain a full message.
+ */
+ cpg_inst->assembly_buf = malloc(res_cpg_partial_deliver_callback->msglen);
+ if (!cpg_inst->assembly_buf) {
+ error = CS_ERR_NO_MEMORY;
+ goto error_put;
+ }
+ cpg_inst->assembling = 1;
+ cpg_inst->assembly_buf_ptr = 0;
+ }
+ if (cpg_inst->assembling) {
+ memcpy(cpg_inst->assembly_buf + cpg_inst->assembly_buf_ptr,
+ res_cpg_partial_deliver_callback->message, res_cpg_partial_deliver_callback->fraglen);
+ cpg_inst->assembly_buf_ptr += res_cpg_partial_deliver_callback->fraglen;
+
+ if (res_cpg_partial_deliver_callback->type == LIBCPG_PARTIAL_LAST) {
+ cpg_inst_copy.model_v1_data.cpg_deliver_fn (handle,
+ &group_name,
+ res_cpg_partial_deliver_callback->nodeid,
+ res_cpg_partial_deliver_callback->pid,
+ cpg_inst->assembly_buf,
+ res_cpg_partial_deliver_callback->msglen);
+ free(cpg_inst->assembly_buf);
+ cpg_inst->assembling = 0;
+ }
+ }
+ break;
+
case MESSAGE_RES_CPG_CONFCHG_CALLBACK:
if (cpg_inst_copy.model_v1_data.cpg_confchg_fn == NULL) {
break;
@@ -921,6 +969,12 @@ cs_error_t cpg_zcb_mcast_joined (
if (error != CS_OK) {
return (error);
}
+
+ if (msg_len > IPC_REQUEST_SIZE) {
+ error = CS_ERR_TOO_BIG;
+ goto error_exit;
+ }
+
req_lib_cpg_mcast = (struct req_lib_cpg_mcast *)(((char *)msg) - sizeof (struct req_lib_cpg_mcast));
req_lib_cpg_mcast->header.size = sizeof (struct req_lib_cpg_mcast) +
msg_len;
@@ -957,6 +1011,76 @@ error_exit:
return (error);
}
+static cs_error_t send_fragments (
+ struct cpg_inst *cpg_inst,
+ cpg_guarantee_t guarantee,
+ size_t msg_len,
+ const struct iovec *iovec,
+ unsigned int iov_len)
+{
+ int i;
+ cs_error_t error = CS_OK;
+ struct iovec iov[2];
+ struct req_lib_cpg_partial_mcast req_lib_cpg_mcast;
+ size_t sent = 0;
+ size_t iov_sent = 0;
+
+ req_lib_cpg_mcast.header.id = MESSAGE_REQ_CPG_PARTIAL_MCAST;
+ req_lib_cpg_mcast.guarantee = guarantee;
+ req_lib_cpg_mcast.msglen = msg_len;
+
+ iov[0].iov_base = (void *)&req_lib_cpg_mcast;
+ iov[0].iov_len = sizeof (struct req_lib_cpg_partial_mcast);
+
+ i=0;
+ iov_sent = 0 ;
+ qb_ipcc_fc_enable_max_set(cpg_inst->c, 2);
+
+ while (error == CS_OK && sent < msg_len) {
+
+ if ( (iovec[i].iov_len - iov_sent) > cpg_inst->max_msg_size) {
+ iov[1].iov_len = cpg_inst->max_msg_size;
+ }
+ else {
+ iov[1].iov_len = iovec[i].iov_len - iov_sent;
+ }
+
+ if (sent == 0) {
+ req_lib_cpg_mcast.type = LIBCPG_PARTIAL_FIRST;
+ }
+ else if ((sent + iov[1].iov_len) == msg_len) {
+ req_lib_cpg_mcast.type = LIBCPG_PARTIAL_LAST;
+ }
+ else {
+ req_lib_cpg_mcast.type = LIBCPG_PARTIAL_CONTINUED;
+ }
+
+ req_lib_cpg_mcast.fraglen = iov[1].iov_len;
+ req_lib_cpg_mcast.header.size = sizeof (struct req_lib_cpg_partial_mcast) + iov[1].iov_len;
+ iov[1].iov_base = (char *)iovec[i].iov_base + iov_sent;
+
+ resend:
+ error = qb_to_cs_error(qb_ipcc_sendv(cpg_inst->c, iov, 2));
+ if (error == CS_ERR_TRY_AGAIN) {
+ usleep(10000);
+ goto resend;
+ }
+
+ iov_sent += iov[1].iov_len;
+ sent += iov[1].iov_len;
+
+ /* Next iovec */
+ if (iov_sent >= iovec[i].iov_len) {
+ i++;
+ iov_sent = 0;
+ }
+ }
+ qb_ipcc_fc_enable_max_set(cpg_inst->c, 1);
+
+ return error;
+}
+
+
cs_error_t cpg_mcast_joined (
cpg_handle_t handle,
cpg_guarantee_t guarantee,
@@ -979,6 +1103,11 @@ cs_error_t cpg_mcast_joined (
msg_len += iovec[i].iov_len;
}
+ if (msg_len > cpg_inst->max_msg_size) {
+ error = send_fragments(cpg_inst, guarantee, msg_len, iovec, iov_len);
+ goto error_exit;
+ }
+
req_lib_cpg_mcast.header.size = sizeof (struct req_lib_cpg_mcast) +
msg_len;
@@ -994,6 +1123,7 @@ cs_error_t cpg_mcast_joined (
error = qb_to_cs_error(qb_ipcc_sendv(cpg_inst->c, iov, iov_len + 1));
qb_ipcc_fc_enable_max_set(cpg_inst->c, 1);
+error_exit:
hdb_handle_put (&cpg_handle_t_db, handle);
return (error);
diff --git a/test/Makefile.am b/test/Makefile.am
index c19e506..bb11518 100644
--- a/test/Makefile.am
+++ b/test/Makefile.am
@@ -34,7 +34,7 @@ MAINTAINERCLEANFILES = Makefile.in
EXTRA_DIST = ploadstart.sh
-noinst_PROGRAMS = cpgverify testcpg testcpg2 cpgbench \
+noinst_PROGRAMS = cpgverify testcpg testcpg2 cpgbench cpghum \
testquorum testvotequorum1 testvotequorum2 \
stress_cpgfdget stress_cpgcontext cpgbound testsam \
testcpgzc cpgbenchzc testzcgc stress_cpgzc
@@ -48,6 +48,7 @@ testzcgc_LDADD = $(LIBQB_LIBS) $(top_builddir)/lib/libcpg.la
stress_cpgzc_LDADD = $(LIBQB_LIBS) $(top_builddir)/lib/libcpg.la
stress_cpgfdget_LDADD = $(LIBQB_LIBS) $(top_builddir)/lib/libcpg.la
stress_cpgcontext_LDADD = $(LIBQB_LIBS) $(top_builddir)/lib/libcpg.la
+cpghum_LDADD = $(LIBQB_LIBS) $(top_builddir)/lib/libcpg.la -lz
testquorum_LDADD = $(LIBQB_LIBS) $(top_builddir)/lib/libquorum.la
testvotequorum1_LDADD = $(LIBQB_LIBS) $(top_builddir)/lib/libvotequorum.la
testvotequorum2_LDADD = $(LIBQB_LIBS) $(top_builddir)/lib/libvotequorum.la
diff --git a/test/cpghum.c b/test/cpghum.c
new file mode 100644
index 0000000..0a096db
--- /dev/null
+++ b/test/cpghum.c
@@ -0,0 +1,432 @@
+/*
+ * Copyright (c) 2015 Red Hat, Inc.
+ *
+ * All rights reserved.
+ *
+ * Author: Christine Caulfield <[email protected]>
+ *
+ * This software licensed under BSD license, the text of which follows:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * - Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * - Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ * - Neither the name of the MontaVista Software, Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from this
+ * software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+ * THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <signal.h>
+#include <unistd.h>
+#include <errno.h>
+#include <unistd.h>
+#include <time.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/select.h>
+#include <sys/uio.h>
+#include <sys/un.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <pthread.h>
+#include <zlib.h>
+#include <libgen.h>
+
+#include <qb/qblog.h>
+#include <qb/qbutil.h>
+
+#include <corosync/corotypes.h>
+#include <corosync/cpg.h>
+
+static cpg_handle_t handle;
+
+static pthread_t thread;
+
+#ifndef timersub
+#define timersub(a, b, result) \
+ do { \
+ (result)->tv_sec = (a)->tv_sec - (b)->tv_sec; \
+ (result)->tv_usec = (a)->tv_usec - (b)->tv_usec; \
+ if ((result)->tv_usec < 0) { \
+ --(result)->tv_sec; \
+ (result)->tv_usec += 1000000; \
+ } \
+ } while (0)
+#endif /* timersub */
+
+static int alarm_notice;
+#define ONE_MEG 1048576
+#define DATASIZE (ONE_MEG*20)
+static char data[DATASIZE];
+static int send_counter = 0;
+static int do_syslog = 0;
+static int quiet = 0;
+static volatile int stopped;
+
+// stats
+static unsigned int length_errors=0;
+static unsigned int crc_errors=0;
+static unsigned int sequence_errors=0;
+static unsigned int packets_sent=0;
+static unsigned int packets_recvd=0;
+static unsigned int send_retries=0;
+static unsigned int send_fails=0;
+
+static void cpg_bm_confchg_fn (
+ cpg_handle_t handle_in,
+ const struct cpg_name *group_name,
+ const struct cpg_address *member_list, size_t member_list_entries,
+ const struct cpg_address *left_list, size_t left_list_entries,
+ const struct cpg_address *joined_list, size_t joined_list_entries)
+{
+}
+
+static unsigned int g_recv_count;
+static unsigned int g_recv_length;
+static unsigned int g_write_size;
+static int g_recv_counter = 0;
+
+static void cpg_bm_deliver_fn (
+ cpg_handle_t handle_in,
+ const struct cpg_name *group_name,
+ uint32_t nodeid,
+ uint32_t pid,
+ void *msg,
+ size_t msg_len)
+{
+ int *value = msg;
+ uLong crc=0;
+ ulong recv_crc = value[1] & 0xFFFFFFFF;
+
+ packets_recvd++;
+ g_recv_length = msg_len;
+
+ // Basic check, packets should all be the right size
+ if (g_write_size && (msg_len != g_write_size)) {
+ length_errors++;
+ fprintf(stderr, "%s: message sizes don't match. got %lu, expected %u\n", group_name->value, msg_len, g_write_size);
+ if (do_syslog) {
+ syslog(LOG_ERR, "%s: message sizes don't match. got %lu, expected %u\n", group_name->value, msg_len, g_write_size);
+ }
+ }
+
+ // Sequence counters are incrementing in step?
+ if (*value != g_recv_counter) {
+ sequence_errors++;
+ fprintf(stderr, "%s: counters don't match. got %d, expected %d\n", group_name->value, *value, g_recv_counter);
+ if (do_syslog) {
+ syslog(LOG_ERR, "%s: counters don't match. got %d, expected %d\n", group_name->value, *value, g_recv_counter);
+ }
+ // Catch up or we'll be printing errors for ever
+ g_recv_counter = *value +1;
+ } else {
+ g_recv_counter++;
+ }
+
+ // Check crc
+ crc = crc32(0, NULL, 0);
+ crc = crc32(crc, (Bytef *)&value[2], msg_len-sizeof(int)*2) & 0xFFFFFFFF;
+ if (crc != recv_crc) {
+ crc_errors++;
+ fprintf(stderr, "%s: CRCs don't match. got %lx, expected %lx\n", group_name->value, recv_crc, crc);
+ if (do_syslog) {
+ syslog(LOG_ERR, "%s: CRCs don't match. got %lx, expected %lx\n", group_name->value, recv_crc, crc);
+ }
+ }
+
+ g_recv_count++;
+
+}
+
+static cpg_model_v1_data_t model1_data = {
+ .cpg_deliver_fn = cpg_bm_deliver_fn,
+ .cpg_confchg_fn = cpg_bm_confchg_fn,
+};
+
+static cpg_callbacks_t callbacks = {
+ .cpg_deliver_fn = cpg_bm_deliver_fn,
+ .cpg_confchg_fn = cpg_bm_confchg_fn
+};
+
+static struct cpg_name group_name = {
+ .value = "cpghum",
+ .length = 7
+};
+
+static void cpg_test (
+ cpg_handle_t handle_in,
+ int write_size,
+ int delay_time,
+ int print_time)
+{
+ struct timeval tv1, tv2, tv_elapsed;
+ struct iovec iov;
+ unsigned int res;
+ int i;
+ unsigned int *dataint = (unsigned int *)data;
+ uLong crc;
+
+ alarm_notice = 0;
+ iov.iov_base = data;
+ iov.iov_len = write_size;
+
+ g_recv_count = 0;
+ alarm (print_time);
+
+ gettimeofday (&tv1, NULL);
+ do {
+ dataint[0] = send_counter++;
+ for (i=2; i<(DATASIZE-sizeof(int)*2)/4; i++) {
+ dataint[i] = rand();
+ }
+ crc = crc32(0, NULL, 0);
+ dataint[1] = crc32(crc, (Bytef*)&dataint[2], write_size-sizeof(int)*2);
+ resend:
+ res = cpg_mcast_joined (handle_in, CPG_TYPE_AGREED, &iov, 1);
+ if (res == CS_ERR_TRY_AGAIN) {
+ usleep(10000);
+ send_retries++;
+ goto resend;
+ }
+ if (res != CS_OK) {
+ fprintf(stderr, "send failed: %d\n", res);
+ send_fails++;
+ }
+ else {
+ packets_sent++;
+ }
+ usleep(delay_time*1000);
+ } while (alarm_notice == 0 && (res == CS_OK || res == CS_ERR_TRY_AGAIN) && stopped == 0);
+ gettimeofday (&tv2, NULL);
+ timersub (&tv2, &tv1, &tv_elapsed);
+
+ if (!quiet) {
+ printf ("%s: %5d message%s received, ", group_name.value, g_recv_count, g_recv_count==1?"":"s");
+ printf ("%5d bytes per write\n", write_size);
+ }
+
+}
+
+static void sigalrm_handler (int num)
+{
+ alarm_notice = 1;
+}
+
+static void sigint_handler (int num)
+{
+ stopped = 1;
+}
+
+static void* dispatch_thread (void *arg)
+{
+ cpg_dispatch (handle, CS_DISPATCH_BLOCKING);
+ return NULL;
+}
+
+static void usage(char *cmd)
+{
+ fprintf(stderr, "%s [OPTIONS]\n", cmd);
+ fprintf(stderr, "\n");
+ fprintf(stderr, "%s sends CPG messages to all registered users of the CPG.\n", cmd);
+ fprintf(stderr, "The messages have a sequence number and a CRC so that missing or\n");
+ fprintf(stderr, "corrupted messages will be detected and reported.\n");
+ fprintf(stderr, "\n");
+ fprintf(stderr, "%s can also be asked to simply listen for (and check) packets\n", cmd);
+ fprintf(stderr, "so that there is another node in the cluster connected to the CPG.\n");
+ fprintf(stderr, "\n");
+ fprintf(stderr, "When -l is present, packet size is only checked if specified by -w or -W\n");
+ fprintf(stderr, "and it, obviously, must match that of the sender.\n");
+ fprintf(stderr, "\n");
+ fprintf(stderr, "Multiple copies, in different CPGs, can also be run on the same or\n");
+ fprintf(stderr, "different nodes by using the -n option.\n");
+ fprintf(stderr, "\n");
+ fprintf(stderr, "%s can't handle more than 1 sender in the same CPG as it messes with the\n", cmd);
+ fprintf(stderr, "sequence numbers.\n");
+ fprintf(stderr, "\n");
+ fprintf(stderr, " -w Write size in Kbytes, default 4\n");
+ fprintf(stderr, " -W Write size in bytes, default 4096\n");
+ fprintf(stderr, " -n CPG name to use, default 'cpghum'\n");
+ fprintf(stderr, " -d Delay between sending packets (mS), default 1000\n");
+ fprintf(stderr, " -r Number of repetitions, default 100\n");
+ fprintf(stderr, " -p Delay between printing output(S), default 10s\n");
+ fprintf(stderr, " -l Listen and check CRCs only, don't send (^C to quit)\n");
+ fprintf(stderr, " -m cpg_initialise() model. Default 1.\n");
+ fprintf(stderr, " -s Also send errors to syslog (for daemon log correlation).\n");
+ fprintf(stderr, " -q Quiet. Don't print messages every 10 seconds (see also -p)\n");
+ fprintf(stderr, "\n");
+}
+
+int main (int argc, char *argv[]) {
+ int i;
+ unsigned int res;
+ const char *cpgname = "cpghum";
+ int opt;
+ int bs;
+ int write_size = 4096;
+ int delay_time = 1000;
+ int repetitions = 100;
+ int print_time = 10;
+ int have_size = 0;
+ int listen_only = 0;
+ int model = 1;
+
+ while ( (opt = getopt(argc, argv, "qlsn:d:r:p:m:w:W:")) != -1 ) {
+ switch (opt) {
+ case 'w': // Write size in K
+ bs = atoi(optarg);
+ if (bs > 0) {
+ write_size = bs*1024;
+ have_size = 1;
+ }
+ break;
+ case 'W': // Write size in bytes
+ bs = atoi(optarg);
+ if (bs > 0) {
+ write_size = bs;
+ have_size = 1;
+ }
+ break;
+ case 'n':
+ strcpy(group_name.value, optarg);
+ group_name.length = strlen(group_name.value);
+ break;
+ case 'd':
+ delay_time = atoi(optarg);
+ break;
+ case 'r':
+ repetitions = atoi(optarg);
+ break;
+ case 'p':
+ print_time = atoi(optarg);
+ break;
+ case 'l':
+ listen_only = 1;
+ break;
+ case 's':
+ do_syslog = 1;
+ break;
+ case 'q':
+ quiet = 1;
+ break;
+ case 'm':
+ model = atoi(optarg);
+ if (model < 0 || model > 1) {
+ fprintf(stderr, "%s: Model must be 0-1\n", argv[0]);
+ exit(1);
+ }
+ break;
+ case '?':
+ usage(basename(argv[0]));
+ exit(0);
+ }
+ }
+
+ qb_log_init("cpghum", LOG_USER, LOG_EMERG);
+ qb_log_ctl(QB_LOG_SYSLOG, QB_LOG_CONF_ENABLED, QB_FALSE);
+ qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_ADD,
+ QB_LOG_FILTER_FILE, "*", LOG_DEBUG);
+ qb_log_ctl(QB_LOG_STDERR, QB_LOG_CONF_ENABLED, QB_TRUE);
+
+ g_write_size = write_size;
+
+ signal (SIGALRM, sigalrm_handler);
+ signal (SIGINT, sigint_handler);
+ switch (model) {
+ case 0:
+ res = cpg_initialize (&handle, &callbacks);
+ break;
+ case 1:
+ res = cpg_model_initialize (&handle, CPG_MODEL_V1, (cpg_model_data_t *)&model1_data, NULL);
+ break;
+ default:
+ res=999; // can't get here but it keeps the compiler happy
+ break;
+ }
+
+ if (res != CS_OK) {
+ printf ("cpg_initialize failed with result %d\n", res);
+ exit (1);
+ }
+ pthread_create (&thread, NULL, dispatch_thread, NULL);
+
+ res = cpg_join (handle, &group_name);
+ if (res != CS_OK) {
+ printf ("cpg_join failed with result %d\n", res);
+ exit (1);
+ }
+
+ if (listen_only) {
+ int secs;
+ if (!quiet) {
+ printf("-- Listening on CPG %s\n", cpgname);
+ printf("-- Ignore any starting \"counters don't match\" error while we catch up\n");
+ }
+
+ /* Only check packet size if specified on the command-line */
+ if (!have_size) {
+ g_write_size = 0;
+ }
+
+ while (!stopped) {
+ sleep(1);
+ if (++secs > print_time && !quiet) {
+ printf ("%s: %5d message%s received. %d bytes\n", group_name.value, g_recv_count, g_recv_count==1?"":"s", g_recv_length);
+ secs = 0;
+ g_recv_count = 0;
+ }
+ }
+ }
+ else {
+ /* We're a test app .. we 'know' that IPC_SIZE is 1 Meg. Don't do this at home. */
+ if ( write_size > ONE_MEG - 1024) {
+ fprintf(stderr, "INFO: packet size (%d) is larger than IPC_REQUEST_SIZE-1K (%d), libcpg will fragment\n",
+ write_size, ONE_MEG-1024);
+ }
+ for (i = 0; i < repetitions && !stopped; i++) {
+ cpg_test (handle, write_size, delay_time, print_time);
+ signal (SIGALRM, sigalrm_handler);
+ }
+ }
+
+ res = cpg_finalize (handle);
+ if (res != CS_OK) {
+ printf ("cpg_finalize failed with result %d\n", res);
+ exit (1);
+ }
+
+ printf("\n");
+ printf("Stats:\n");
+ if (!listen_only) {
+ printf(" packets sent: %d\n", packets_sent);
+ printf(" send failures: %d\n", send_fails);
+ printf(" send retries: %d\n", send_retries);
+ }
+ if (have_size) {
+ printf(" length errors: %d\n", length_errors);
+ }
+ printf(" packets recvd: %d\n", packets_recvd);
+ printf(" sequence errors: %d\n", sequence_errors);
+ printf(" crc errors: %d\n", crc_errors);
+ printf("\n");
+ return (0);
+}
_______________________________________________
discuss mailing list
[email protected]
http://lists.corosync.org/mailman/listinfo/discuss