Author: adkulkar
Date: 2011-01-13 15:13:49 EST (Thu, 13 Jan 2011)
New Revision: 24251
URL: https://svn.open-mpi.org/trac/ompi/changeset/24251
Log:
Few fault tolerance updates related to the CIFTS project
(http://www.mcs.anl.gov/research/cifts/)
* Improve the FTB notifier to publish (C/R, process/communication failure)
events to the FTB with the
OMPI jobid as the associated payload.
* Add notifier calls for C/R events and process status events in SnapC and
ErrMgr components.
* Fix a bug where the SnapC states and process states collide before being
thrown out over the notifier.
Text files modified:
trunk/orte/mca/errmgr/base/base.h | 3
trunk/orte/mca/errmgr/base/errmgr_base_fns.c | 47 ++++++++++
trunk/orte/mca/errmgr/hnp/errmgr_hnp.c | 4
trunk/orte/mca/notifier/ftb/notifier_ftb_component.c | 5 -
trunk/orte/mca/notifier/ftb/notifier_ftb_module.c | 162
++++++++++++++++++++++++++++-----------
trunk/orte/mca/oob/tcp/oob_tcp_peer.c | 3
trunk/orte/mca/snapc/base/snapc_base_fns.c | 26 ++++--
trunk/orte/mca/snapc/full/snapc_full_global.c | 4
trunk/orte/mca/snapc/snapc.h | 19 ++++
9 files changed, 208 insertions(+), 65 deletions(-)
Modified: trunk/orte/mca/errmgr/base/base.h
==============================================================================
--- trunk/orte/mca/errmgr/base/base.h (original)
+++ trunk/orte/mca/errmgr/base/base.h 2011-01-13 15:13:49 EST (Thu, 13 Jan
2011)
@@ -97,6 +97,9 @@
ORTE_DECLSPEC int orte_errmgr_base_restart_job(orte_jobid_t jobid, char *
global_handle, int seq_num);
ORTE_DECLSPEC int orte_errmgr_base_migrate_job(orte_jobid_t jobid,
orte_snapc_base_request_op_t *datum);
+/* Interface to report process state to the notifier */
+ORTE_DECLSPEC void orte_errmgr_base_proc_state_notify(orte_proc_state_t state,
orte_process_name_t *proc);
+
#endif /* OPAL_ENABLE_FT_CR */
/*
Modified: trunk/orte/mca/errmgr/base/errmgr_base_fns.c
==============================================================================
--- trunk/orte/mca/errmgr/base/errmgr_base_fns.c (original)
+++ trunk/orte/mca/errmgr/base/errmgr_base_fns.c 2011-01-13 15:13:49 EST
(Thu, 13 Jan 2011)
@@ -248,12 +248,13 @@
case ORTE_ERRMGR_MIGRATE_STATE_ERROR:
case ORTE_ERRMGR_MIGRATE_STATE_ERR_INPROGRESS:
orte_notifier.log(ORTE_NOTIFIER_ERROR, state,
- "base:migrate_state_notify: Migration failed (PID =
%d)", true,
- orte_process_info.pid);
+ "%d: Migration failed for process %s.",
+ orte_process_info.pid,
ORTE_JOBID_PRINT(ORTE_PROC_MY_NAME->jobid));
break;
case ORTE_ERRMGR_MIGRATE_STATE_FINISH:
- orte_notifier.show_help(ORTE_NOTIFIER_INFO, state,
- "help-orte-errmgr-hnp.txt",
"crmig_migrated_job", true);
+ orte_notifier.log(ORTE_NOTIFIER_INFO, state,
+ "%d: Migration successful for process %s.",
+ orte_process_info.pid,
ORTE_JOBID_PRINT(ORTE_PROC_MY_NAME->jobid));
break;
case ORTE_ERRMGR_MIGRATE_STATE_NONE:
@@ -267,6 +268,44 @@
}
}
+void orte_errmgr_base_proc_state_notify(orte_proc_state_t state,
orte_process_name_t *proc)
+{
+ if (NULL != proc) {
+ switch(state) {
+ case ORTE_PROC_STATE_ABORTED:
+ case ORTE_PROC_STATE_ABORTED_BY_SIG:
+ case ORTE_PROC_STATE_TERM_WO_SYNC:
+ case ORTE_PROC_STATE_TERMINATED:
+ case ORTE_PROC_STATE_KILLED_BY_CMD:
+ case ORTE_PROC_STATE_SENSOR_BOUND_EXCEEDED:
+ orte_notifier.log(ORTE_NOTIFIER_ERROR, state, "%d: Process %s is
dead.",
+ orte_process_info.pid,
ORTE_JOBID_PRINT(proc->jobid));
+ break;
+
+ case ORTE_PROC_STATE_HEARTBEAT_FAILED:
+ orte_notifier.log(ORTE_NOTIFIER_ERROR, state,
+ "%d: Process %s is unreachable.",
+ orte_process_info.pid,
ORTE_JOBID_PRINT(proc->jobid));
+
+ case ORTE_PROC_STATE_COMM_FAILED:
+ orte_notifier.log(ORTE_NOTIFIER_WARN, state,
+ "%d: Failed to communicate with process %s.",
+ orte_process_info.pid,
ORTE_JOBID_PRINT(proc->jobid));
+ break;
+
+ case ORTE_PROC_STATE_CALLED_ABORT:
+ case ORTE_PROC_STATE_FAILED_TO_START:
+ orte_notifier.log(ORTE_NOTIFIER_ERROR, state,
+ "%d: Process %s has called abort.",
+ orte_process_info.pid,
ORTE_JOBID_PRINT(proc->jobid));
+ break;
+ case ORTE_PROC_STATE_MIGRATING:
+ default:
+ break;
+ }
+ }
+}
+
int orte_errmgr_base_migrate_state_str(char ** state_str, int state)
{
switch(state) {
Modified: trunk/orte/mca/errmgr/hnp/errmgr_hnp.c
==============================================================================
--- trunk/orte/mca/errmgr/hnp/errmgr_hnp.c (original)
+++ trunk/orte/mca/errmgr/hnp/errmgr_hnp.c 2011-01-13 15:13:49 EST (Thu,
13 Jan 2011)
@@ -543,6 +543,10 @@
}
}
+ /* Notify the process state to the notifier framework if it is
+ active and selected. */
+ orte_errmgr_base_proc_state_notify(state, proc);
+
/* update is for a specific proc */
switch (state) {
case ORTE_PROC_STATE_ABORTED:
Modified: trunk/orte/mca/notifier/ftb/notifier_ftb_component.c
==============================================================================
--- trunk/orte/mca/notifier/ftb/notifier_ftb_component.c (original)
+++ trunk/orte/mca/notifier/ftb/notifier_ftb_component.c 2011-01-13
15:13:49 EST (Thu, 13 Jan 2011)
@@ -80,11 +80,6 @@
free(mca_notifier_ftb_component.subscription_style);
}
- /* If the FTB client handle is valid, disconnect the client */
- if (1 == ftb_client_handle.valid) {
- FTB_Disconnect(ftb_client_handle);
- }
-
return ORTE_SUCCESS;
}
Modified: trunk/orte/mca/notifier/ftb/notifier_ftb_module.c
==============================================================================
--- trunk/orte/mca/notifier/ftb/notifier_ftb_module.c (original)
+++ trunk/orte/mca/notifier/ftb/notifier_ftb_module.c 2011-01-13 15:13:49 EST
(Thu, 13 Jan 2011)
@@ -34,11 +34,17 @@
#include "opal/util/show_help.h"
#include "opal/util/os_path.h"
+#include "orte/mca/plm/base/plm_private.h"
+#include "orte/mca/plm/plm.h"
+#include "orte/mca/sensor/sensor.h"
#include "orte/mca/ess/ess.h"
#include "orte/util/show_help.h"
#include "orte/mca/snapc/snapc.h"
#include "orte/mca/snapc/base/base.h"
+#include "orte/mca/errmgr/errmgr.h"
+#include "orte/mca/errmgr/base/base.h"
+
#include "orte/mca/notifier/base/base.h"
#include "notifier_ftb.h"
@@ -95,7 +101,10 @@
}
static void finalize(void) {
- FTB_Disconnect(ftb_client_handle);
+ /* If the FTB client handle is valid, disconnect the client from FTB. */
+ if (1 == ftb_client_handle.valid) {
+ FTB_Disconnect(ftb_client_handle);
+ }
}
static const char* get_ftb_event_severity(orte_notifier_base_severity_t
severity)
@@ -121,53 +130,106 @@
static const char* get_ftb_event_name(int errnum)
{
- switch (errnum) {
-
- case ORTE_SNAPC_CKPT_STATE_ESTABLISHED:
- return FTB_EVENT(FTB_MPI_PROCS_CKPTED);
-
- case ORTE_SNAPC_CKPT_STATE_NO_CKPT:
- case ORTE_SNAPC_CKPT_STATE_ERROR:
- return FTB_EVENT(FTB_MPI_PROCS_CKPT_FAIL);
-
- case ORTE_ERR_CONNECTION_REFUSED:
- case ORTE_ERR_CONNECTION_FAILED:
- case ORTE_ERR_UNREACH:
- return FTB_EVENT(FTB_MPI_PROCS_UNREACHABLE);
+ /* Handle checkpoint/restart and migration events */
+ if ( CHECK_ORTE_SNAPC_CKPT_STATE(errnum) ) {
+ errnum = ORTE_SNAPC_CKPT_STATE(errnum);
+ switch (errnum) {
+ case ORTE_SNAPC_CKPT_STATE_ESTABLISHED:
+ return FTB_EVENT(FTB_MPI_PROCS_CKPTED);
+
+ case ORTE_SNAPC_CKPT_STATE_NO_CKPT:
+ case ORTE_SNAPC_CKPT_STATE_ERROR:
+ return FTB_EVENT(FTB_MPI_PROCS_CKPT_FAIL);
+
+ /* Restart events */
+ case ORTE_SNAPC_CKPT_STATE_RECOVERED:
+ return FTB_EVENT(FTB_MPI_PROCS_RESTARTED);
+
+ case ORTE_SNAPC_CKPT_STATE_NO_RESTART:
+ return FTB_EVENT(FTB_MPI_PROCS_RESTART_FAIL);
+
+ /* Process migration events */
+ case ORTE_ERRMGR_MIGRATE_STATE_FINISH:
+ return FTB_EVENT(FTB_MPI_PROCS_MIGRATED);
+
+ case ORTE_ERRMGR_MIGRATE_STATE_ERROR:
+ case ORTE_ERRMGR_MIGRATE_STATE_ERR_INPROGRESS:
+ return FTB_EVENT(FTB_MPI_PROCS_MIGRATE_FAIL);
- case ORTE_ERR_COMM_FAILURE:
- return FTB_EVENT(FTB_MPI_PROCS_COMM_ERROR);
+ default:
+ return NULL;
+ }
+ } else {
+ /* Handle process and communication failure events */
+ switch (errnum) {
+ case ORTE_ERR_CONNECTION_REFUSED:
+ case ORTE_ERR_CONNECTION_FAILED:
+ case ORTE_ERR_UNREACH:
+ case ORTE_PROC_STATE_HEARTBEAT_FAILED:
+ return FTB_EVENT(FTB_MPI_PROCS_UNREACHABLE);
+
+ case ORTE_ERR_COMM_FAILURE:
+ case ORTE_PROC_STATE_COMM_FAILED:
+ return FTB_EVENT(FTB_MPI_PROCS_COMM_ERROR);
+
+ case ORTE_PROC_STATE_FAILED_TO_START:
+ case ORTE_PROC_STATE_CALLED_ABORT:
+ return FTB_EVENT(FTB_MPI_PROCS_ABORTED);
+
+ case ORTE_PROC_STATE_ABORTED:
+ case ORTE_PROC_STATE_ABORTED_BY_SIG:
+ case ORTE_PROC_STATE_TERM_WO_SYNC:
+ case ORTE_PROC_STATE_TERMINATED:
+ case ORTE_PROC_STATE_KILLED_BY_CMD:
+ return FTB_EVENT(FTB_MPI_PROCS_DEAD);
- default:
- return NULL;
+ default:
+ return NULL;
+ }
}
return NULL;
}
-static void publish_ftb_event(orte_notifier_base_severity_t severity, int
errcode, char *payload)
+/* Extracts the FTB payload (inside the brackets []) from notifier
+ * message payload.
+ * For instance: "<FTB message [payload]>" would return "payload".
+ */
+static unsigned int extract_payload(char *dest, char *src, unsigned int size)
+{
+ unsigned int ret;
+ char *lbrace, *rbrace;
+ rbrace = strrchr(src, ']');
+ lbrace = strchr(src, '[');
+
+ if (NULL == rbrace || NULL == lbrace) {
+ strncpy(dest, src, size);
+ ret = size;
+ } else {
+ ret = rbrace - lbrace + 1;
+ if (ret > size) {
+ ret = size;
+ }
+ strncpy(dest, lbrace, ret);
+ }
+ return ret;
+}
+
+static void publish_ftb_event(orte_notifier_base_severity_t severity, int
errcode,
+ FTB_event_properties_t *eprop)
{
int ret;
const char *event_name;
FTB_event_handle_t ehandle;
- FTB_event_properties_t eprop;
-
- /* Only normal FTB events are supported currently. */
- eprop.event_type = (int) FTB_EVENT_NORMAL;
-
- /* Copy the event payload, if we have one */
- if (NULL != payload) {
- strncpy(eprop.event_payload, payload, FTB_MAX_PAYLOAD_DATA);
- }
/* Publish the event to the Fault Tolerant Backplane */
event_name = get_ftb_event_name(errcode);
if (NULL != event_name) {
- ret = FTB_Publish(ftb_client_handle, event_name, &eprop, &ehandle);
+ ret = FTB_Publish(ftb_client_handle, event_name, eprop, &ehandle);
if (FTB_SUCCESS != ret) {
orte_show_help("help-orte-notifier-ftb.txt", "publish failed",
true,
"FTB_Publish() failed", ret,
get_ftb_event_severity(severity),
- event_name, payload, errcode);
+ event_name, eprop->event_payload, errcode);
}
}
}
@@ -176,11 +238,17 @@
va_list ap)
{
char *payload;
+ FTB_event_properties_t ev_prop;
+ /* Only normal FTB events are supported currently. */
+ ev_prop.event_type = (int) FTB_EVENT_NORMAL;
+
+ /* Copy the event payload, if we have one */
vasprintf(&payload, msg, ap);
if (NULL != payload) {
- publish_ftb_event(severity, errcode, payload);
+ extract_payload(ev_prop.event_payload, payload, FTB_MAX_PAYLOAD_DATA);
free(payload);
+ publish_ftb_event(severity, errcode, &ev_prop);
}
}
@@ -188,11 +256,16 @@
const char *filename, const char *topic, va_list ap)
{
char *payload;
+ FTB_event_properties_t ev_prop;
+
+ /* Only normal FTB events are supported currently. */
+ ev_prop.event_type = (int) FTB_EVENT_NORMAL;
payload = opal_show_help_vstring(filename, topic, false, ap);
if (NULL != payload) {
- publish_ftb_event(severity, errcode, payload);
+ extract_payload(ev_prop.event_payload, payload, FTB_MAX_PAYLOAD_DATA);
free(payload);
+ publish_ftb_event(severity, errcode, &ev_prop);
}
}
@@ -200,23 +273,22 @@
orte_process_name_t *peer_proc, const char *msg,
va_list ap)
{
- char payload[FTB_MAX_PAYLOAD_DATA + 1];
- char *peer_host = NULL;
- char *pos = payload;
- int len, space = FTB_MAX_PAYLOAD_DATA;
+ char *payload, *peer_host;
+ FTB_event_properties_t ev_prop;
+ /* Only normal FTB events are supported currently. */
+ ev_prop.event_type = (int) FTB_EVENT_NORMAL;
+
+ peer_host = NULL;
if (peer_proc) {
peer_host = orte_ess.proc_get_hostname(peer_proc);
+ /* Ignore the peer_host for now. */
}
- len = snprintf(pos, space, "%s:", peer_host ? peer_host : "UNKNOWN");
- space -= len;
- pos += len;
- /* If there was a message, and space left, output it */
- if (0 < space) {
- vsnprintf(pos, space, msg, ap);
+ vasprintf(&payload, msg, ap);
+ if (NULL != payload) {
+ extract_payload(ev_prop.event_payload, payload, FTB_MAX_PAYLOAD_DATA);
+ free(payload);
+ publish_ftb_event(severity, errcode, &ev_prop);
}
-
- payload[FTB_MAX_PAYLOAD_DATA] = '\0';
- publish_ftb_event(severity, errcode, payload);
}
Modified: trunk/orte/mca/oob/tcp/oob_tcp_peer.c
==============================================================================
--- trunk/orte/mca/oob/tcp/oob_tcp_peer.c (original)
+++ trunk/orte/mca/oob/tcp/oob_tcp_peer.c 2011-01-13 15:13:49 EST (Thu,
13 Jan 2011)
@@ -635,7 +635,8 @@
(NULL == host) ? "NULL" : host);
/* provide a notifier message */
orte_notifier.log_peer(ORTE_NOTIFIER_CRIT, ORTE_ERR_COMM_FAILURE,
&(peer->peer_name),
- "OOB Connection retries exceeded. Can not communicate
with peer");
+ "OOB connection retries exceeded. Cannot communicate
with peer %s.",
+ ORTE_JOBID_PRINT(peer->peer_name.jobid));
/* There are cases during the initial connection setup where
the peer_send_msg is NULL but there are things in the queue
Modified: trunk/orte/mca/snapc/base/snapc_base_fns.c
==============================================================================
--- trunk/orte/mca/snapc/base/snapc_base_fns.c (original)
+++ trunk/orte/mca/snapc/base/snapc_base_fns.c 2011-01-13 15:13:49 EST (Thu,
13 Jan 2011)
@@ -423,24 +423,32 @@
{
switch(state) {
case ORTE_SNAPC_CKPT_STATE_ESTABLISHED:
- orte_notifier.log(ORTE_NOTIFIER_INFO, state,
- "base:ckpt_state_notify: Checkpoint established for PID =
%d {%s}.",
- orte_process_info.pid,
ORTE_JOBID_PRINT(ORTE_PROC_MY_NAME->jobid));
+ orte_notifier.log(ORTE_NOTIFIER_INFO, ORTE_SNAPC_CKPT_NOTIFY(state),
+ "%d: Checkpoint established for process %s.",
+ orte_process_info.pid,
ORTE_JOBID_PRINT(ORTE_PROC_MY_NAME->jobid));
break;
case ORTE_SNAPC_CKPT_STATE_NO_CKPT:
- orte_notifier.log(ORTE_NOTIFIER_WARN, state,
- "base:ckpt_state_notify: PID = %d is not checkpointable
{%s}.",
+ orte_notifier.log(ORTE_NOTIFIER_WARN, ORTE_SNAPC_CKPT_NOTIFY(state),
+ "%d: Process %s is not checkpointable.",
orte_process_info.pid,
ORTE_JOBID_PRINT(ORTE_PROC_MY_NAME->jobid));
break;
case ORTE_SNAPC_CKPT_STATE_ERROR:
- orte_notifier.log(ORTE_NOTIFIER_WARN, state,
- "base:ckpt_state_notify: Failed to checkpoint PID = %d
{%s}.",
+ orte_notifier.log(ORTE_NOTIFIER_WARN, ORTE_SNAPC_CKPT_NOTIFY(state),
+ "%d: Failed to checkpoint process %s.",
+ orte_process_info.pid,
ORTE_JOBID_PRINT(ORTE_PROC_MY_NAME->jobid));
+ break;
+ case ORTE_SNAPC_CKPT_STATE_RECOVERED:
+ orte_notifier.log(ORTE_NOTIFIER_INFO, ORTE_SNAPC_CKPT_NOTIFY(state),
+ "%d: Successfully restarted process %s.",
+ orte_process_info.pid,
ORTE_JOBID_PRINT(ORTE_PROC_MY_NAME->jobid));
+ break;
+ case ORTE_SNAPC_CKPT_STATE_NO_RESTART:
+ orte_notifier.log(ORTE_NOTIFIER_WARN, ORTE_SNAPC_CKPT_NOTIFY(state),
+ "%d: Failed to restart process %s.",
orte_process_info.pid,
ORTE_JOBID_PRINT(ORTE_PROC_MY_NAME->jobid));
break;
-
/* ADK: We currently do not notify for these states, but good to
* have them around anyways. */
- case ORTE_SNAPC_CKPT_STATE_RECOVERED:
case ORTE_SNAPC_CKPT_STATE_NONE:
case ORTE_SNAPC_CKPT_STATE_REQUEST:
case ORTE_SNAPC_CKPT_STATE_PENDING:
Modified: trunk/orte/mca/snapc/full/snapc_full_global.c
==============================================================================
--- trunk/orte/mca/snapc/full/snapc_full_global.c (original)
+++ trunk/orte/mca/snapc/full/snapc_full_global.c 2011-01-13 15:13:49 EST
(Thu, 13 Jan 2011)
@@ -1358,12 +1358,14 @@
count = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(sbuffer, &seq_num, &count,
OPAL_INT))) {
ORTE_ERROR_LOG(ret);
+ orte_snapc_ckpt_state_notify(ORTE_SNAPC_CKPT_STATE_NO_RESTART);
goto cleanup;
}
count = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(sbuffer, &global_handle,
&count, OPAL_STRING))) {
ORTE_ERROR_LOG(ret);
+ orte_snapc_ckpt_state_notify(ORTE_SNAPC_CKPT_STATE_NO_RESTART);
goto cleanup;
}
@@ -1372,6 +1374,7 @@
*/
if( ORTE_SUCCESS != (ret =
orte_errmgr_base_restart_job(current_global_jobid, global_handle, seq_num) ) ) {
ORTE_ERROR_LOG(ret);
+ orte_snapc_ckpt_state_notify(ORTE_SNAPC_CKPT_STATE_NO_RESTART);
goto cleanup;
}
}
@@ -1757,6 +1760,7 @@
"Global) Job has been successfully
restarted"));
/*current_job_ckpt_state = ORTE_SNAPC_CKPT_STATE_RECOVERED;*/
+ orte_snapc_ckpt_state_notify(ORTE_SNAPC_CKPT_STATE_RECOVERED);
for(item =
opal_list_get_first(&(global_snapshot.local_snapshots));
item != opal_list_get_end(&(global_snapshot.local_snapshots));
Modified: trunk/orte/mca/snapc/snapc.h
==============================================================================
--- trunk/orte/mca/snapc/snapc.h (original)
+++ trunk/orte/mca/snapc/snapc.h 2011-01-13 15:13:49 EST (Thu, 13 Jan
2011)
@@ -117,7 +117,24 @@
#define ORTE_SNAPC_CKPT_STATE_RECOVERED 10
/* Unable to checkpoint this job */
#define ORTE_SNAPC_CKPT_STATE_NO_CKPT 11
-#define ORTE_SNAPC_CKPT_MAX 12
+/* Unable to restart this job */
+#define ORTE_SNAPC_CKPT_STATE_NO_RESTART 12
+#define ORTE_SNAPC_CKPT_MAX 13
+
+/**
+ * Sufficiently high shift value to avoid colliding the process
+ * checkpointing states above with the ORTE process states
+ */
+#define ORTE_SNAPC_CKPT_SHIFT 131072
+
+/* Uniquely encode the SNAPC state */
+#define ORTE_SNAPC_CKPT_NOTIFY(state) (ORTE_SNAPC_CKPT_SHIFT + state)
+
+/* Decode the SNAPC state */
+#define ORTE_SNAPC_CKPT_STATE(state) (state - ORTE_SNAPC_CKPT_SHIFT)
+
+/* Check whether a state is a SNAPC state or not. */
+#define CHECK_ORTE_SNAPC_CKPT_STATE(state) (state >= ORTE_SNAPC_CKPT_SHIFT)
/**
* Definition of a orte local snapshot.
_______________________________________________
svn mailing list
s...@open-mpi.org
http://www.open-mpi.org/mailman/listinfo.cgi/svn