Author: bouteill
Date: 2008-09-27 09:22:32 EDT (Sat, 27 Sep 2008)
New Revision: 19653
URL: https://svn.open-mpi.org/trac/ompi/changeset/19653
Log:
Add functions to access the opaque port_string and to add routes
to a remote port. This is usefull for FT, but could also turn
usefull when considering MPI3 extentions to the MPI2 dynamics.
Text files modified:
trunk/ompi/mca/dpm/base/base.h | 3 +
trunk/ompi/mca/dpm/base/dpm_base_null_fns.c | 12 ++++
trunk/ompi/mca/dpm/base/dpm_base_open.c | 2
trunk/ompi/mca/dpm/dpm.h | 20 +++++++
trunk/ompi/mca/dpm/orte/dpm_orte.c | 114 +++++++++++++++
++++++------------------
5 files changed, 99 insertions(+), 52 deletions(-)
Modified: trunk/ompi/mca/dpm/base/base.h
=
=
=
=
=
=
=
=
=
=
====================================================================
--- trunk/ompi/mca/dpm/base/base.h (original)
+++ trunk/ompi/mca/dpm/base/base.h 2008-09-27 09:22:32 EDT (Sat,
27 Sep 2008)
@@ -92,6 +92,9 @@
int ompi_dpm_base_null_dyn_finalize (void);
void ompi_dpm_base_null_mark_dyncomm (ompi_communicator_t *comm);
int ompi_dpm_base_null_open_port(char *port_name, orte_rml_tag_t
given_tag);
+int ompi_dpm_base_null_parse_port(char *port_name,
+ orte_process_name_t *rproc,
orte_rml_tag_t *tag);
+int ompi_dpm_base_null_route_to_port(char *rml_uri,
orte_process_name_t *rproc);
int ompi_dpm_base_null_close_port(char *port_name);
/* useful globals */
Modified: trunk/ompi/mca/dpm/base/dpm_base_null_fns.c
=
=
=
=
=
=
=
=
=
=
====================================================================
--- trunk/ompi/mca/dpm/base/dpm_base_null_fns.c (original)
+++ trunk/ompi/mca/dpm/base/dpm_base_null_fns.c 2008-09-27
09:22:32 EDT (Sat, 27 Sep 2008)
@@ -36,6 +36,7 @@
{
return OMPI_ERR_NOT_SUPPORTED;
}
+
void ompi_dpm_base_null_disconnect(ompi_communicator_t *comm)
{
return;
@@ -70,6 +71,17 @@
return OMPI_ERR_NOT_SUPPORTED;
}
+int ompi_dpm_base_null_parse_port(char *port_name,
+ orte_process_name_t *rproc,
orte_rml_tag_t *tag)
+{
+ return OMPI_ERR_NOT_SUPPORTED;
+}
+
+int ompi_dpm_base_null_route_to_port(char *rml_uri,
orte_process_name_t *rproc)
+{
+ return OMPI_ERR_NOT_SUPPORTED;
+}
+
int ompi_dpm_base_null_close_port(char *port_name)
{
return OMPI_ERR_NOT_SUPPORTED;
Modified: trunk/ompi/mca/dpm/base/dpm_base_open.c
=
=
=
=
=
=
=
=
=
=
====================================================================
--- trunk/ompi/mca/dpm/base/dpm_base_open.c (original)
+++ trunk/ompi/mca/dpm/base/dpm_base_open.c 2008-09-27 09:22:32
EDT (Sat, 27 Sep 2008)
@@ -42,6 +42,8 @@
ompi_dpm_base_null_dyn_finalize,
ompi_dpm_base_null_mark_dyncomm,
ompi_dpm_base_null_open_port,
+ ompi_dpm_base_null_parse_port,
+ ompi_dpm_base_null_route_to_port,
ompi_dpm_base_null_close_port,
NULL
};
Modified: trunk/ompi/mca/dpm/dpm.h
=
=
=
=
=
=
=
=
=
=
====================================================================
--- trunk/ompi/mca/dpm/dpm.h (original)
+++ trunk/ompi/mca/dpm/dpm.h 2008-09-27 09:22:32 EDT (Sat, 27 Sep
2008)
@@ -58,6 +58,8 @@
#define OMPI_RML_TAG_DYNAMIC
OMPI_RML_TAG_BASE+200
+
+
/*
* Initialize a module
*/
@@ -116,6 +118,20 @@
typedef int (*ompi_dpm_base_module_open_port_fn_t)(char
*port_name, orte_rml_tag_t tag);
/*
+ * Converts an opaque port string to a RML process nane and tag.
+ */
+typedef int (*ompi_dpm_base_module_parse_port_name_t)(char
*port_name,
+
orte_process_name_t *rproc,
+
orte_rml_tag_t *tag);
+
+/*
+ * Update the routed component to make sure that the RML can send
messages to
+ * the remote port
+ */
+typedef int (*ompi_dpm_base_module_route_to_port_t)(char
*rml_uri, orte_process_name_t *rproc);
+
+
+/*
* Close a port
*/
typedef int (*ompi_dpm_base_module_close_port_fn_t)(char
*port_name);
@@ -145,6 +161,10 @@
ompi_dpm_base_module_mark_dyncomm_fn_t mark_dyncomm;
/* open port */
ompi_dpm_base_module_open_port_fn_t open_port;
+ /* parse port string */
+ ompi_dpm_base_module_parse_port_name_t parse_port;
+ /* update route to a port */
+ ompi_dpm_base_module_route_to_port_t route_to_port;
/* close port */
ompi_dpm_base_module_close_port_fn_t close_port;
/* finalize */
Modified: trunk/ompi/mca/dpm/orte/dpm_orte.c
=
=
=
=
=
=
=
=
=
=
====================================================================
--- trunk/ompi/mca/dpm/orte/dpm_orte.c (original)
+++ trunk/ompi/mca/dpm/orte/dpm_orte.c 2008-09-27 09:22:32 EDT
(Sat, 27 Sep 2008)
@@ -61,9 +61,6 @@
opal_buffer_t *buffer,
orte_rml_tag_t tag, void *cbdata);
static void process_cb(int fd, short event, void *data);
-static int parse_port_name(char *port_name,
- orte_process_name_t *rproc,
- orte_rml_tag_t *tag);
/* API functions */
static int init(void);
@@ -78,6 +75,9 @@
char *port_name);
static int dyn_init(void);
static int open_port(char *port_name, orte_rml_tag_t given_tag);
+static int parse_port_name(char *port_name, orte_process_name_t
*rproc,
+ orte_rml_tag_t *tag);
+static int route_to_port(char *rml_uri, orte_process_name_t
*rproc);
static int close_port(char *port_name);
static int finalize(void);
@@ -93,6 +93,8 @@
ompi_dpm_base_dyn_finalize,
ompi_dpm_base_mark_dyncomm,
open_port,
+ parse_port_name,
+ route_to_port,
close_port,
finalize
};
@@ -145,14 +147,17 @@
* set us up to communicate with it
*/
if (NULL != port_string && 0 < strlen(port_string)) {
- /* separate the string into the RML URI and tag - this
function performs
- * whatever route initialization is required by the
selected routed module
- */
+ /* separate the string into the RML URI and tag */
if (ORTE_SUCCESS != (rc = parse_port_name(port_string,
&port, &tag))) {
ORTE_ERROR_LOG(rc);
return rc;
}
}
+ /* make sure we can route rml messages to the destination */
+ if (ORTE_SUCCESS != (rc = route_to_port(port_string, &port))) {
+ ORTE_ERROR_LOG(rc);
+ return rc;
+ }
/* tell the progress engine to tick the event library more
often, to make sure that the OOB messages get sent */
@@ -783,48 +788,11 @@
ORTE_MESSAGE_EVENT(sender, buffer, tag, release_ack);
}
-
-static int parse_port_name(char *port_name,
- orte_process_name_t *rproc,
- orte_rml_tag_t *tag)
+static int route_to_port(char *uri, orte_process_name_t *rproc)
{
- char *tmpstring=NULL, *ptr, *rml_uri=NULL;
- orte_rml_cmd_flag_t cmd = ORTE_RML_UPDATE_CMD;
- int rc;
opal_buffer_t route;
-
- /* don't mangle the port name */
- tmpstring = strdup(port_name);
-
- /* find the ':' demarking the RML tag we added to the end */
- if (NULL == (ptr = strrchr(tmpstring, ':'))) {
- rc = ORTE_ERR_NOT_FOUND;
- goto cleanup;
- }
-
- /* terminate the port_name at that location */
- *ptr = '\0';
- ptr++;
-
- /* convert the RML tag */
- sscanf(ptr,"%d", (int*)tag);
-
- /* now split out the second field - the uri of the remote
proc */
- if (NULL == (ptr = strchr(tmpstring, '+'))) {
- rc = ORTE_ERR_NOT_FOUND;
- goto cleanup;
- }
- *ptr = '\0';
- ptr++;
-
- /* save that info */
- rml_uri = strdup(ptr);
-
- /* extract the originating proc's name */
- if (ORTE_SUCCESS != (rc = orte_rml_base_parse_uris(ptr,
rproc, NULL))) {
- ORTE_ERROR_LOG(rc);
- goto cleanup;
- }
+ orte_rml_cmd_flag_t cmd = ORTE_RML_UPDATE_CMD;
+ int rc;
/* if this proc is part of my job family, then I need to
* update my RML contact hash table and my routes
@@ -835,14 +803,14 @@
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* set the contact info into the hash table */
- if (ORTE_SUCCESS != (rc =
orte_rml.set_contact_info(rml_uri))) {
+ if (ORTE_SUCCESS != (rc =
orte_rml.set_contact_info(uri))) {
ORTE_ERROR_LOG(rc);
- goto cleanup;
+ return rc;
}
if (ORTE_SUCCESS != (rc = orte_routed.update_route(rproc,
rproc))) {
ORTE_ERROR_LOG(rc);
}
- goto cleanup;
+ return ORTE_SUCCESS;
}
/* the proc must be part of another job family. In this case, we
@@ -855,7 +823,7 @@
opal_dss.pack(&route, &cmd, 1, ORTE_RML_CMD);
/* pack the HNP uri */
- opal_dss.pack(&route, &tmpstring, 1, OPAL_STRING);
+ opal_dss.pack(&route, &uri, 1, OPAL_STRING);
OPAL_OUTPUT_VERBOSE((3, ompi_dpm_base_output,
"%s dpm_parse_port: %s in diff job family -
sending update to %s",
@@ -867,7 +835,7 @@
ORTE_RML_TAG_RML_INFO_UPDATE,
0))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&route);
- goto cleanup;
+ return rc;
}
/* wait right here until the HNP acks the update to ensure that
@@ -886,7 +854,49 @@
/* our get_route function automatically routes all messages for
* other job families via the HNP, so nothing more to do here
- */
+ */
+ return rc;
+}
+
+static int parse_port_name(char *port_name,
+ orte_process_name_t *rproc,
+ orte_rml_tag_t *tag)
+{
+ char *tmpstring=NULL, *ptr, *rml_uri=NULL;
+ int rc;
+
+ /* don't mangle the port name */
+ tmpstring = strdup(port_name);
+
+ /* find the ':' demarking the RML tag we added to the end */
+ if (NULL == (ptr = strrchr(tmpstring, ':'))) {
+ rc = ORTE_ERR_NOT_FOUND;
+ goto cleanup;
+ }
+
+ /* terminate the port_name at that location */
+ *ptr = '\0';
+ ptr++;
+
+ /* convert the RML tag */
+ sscanf(ptr,"%d", (int*)tag);
+
+ /* now split out the second field - the uri of the remote
proc */
+ if (NULL == (ptr = strchr(tmpstring, '+'))) {
+ rc = ORTE_ERR_NOT_FOUND;
+ goto cleanup;
+ }
+ *ptr = '\0';
+ ptr++;
+
+ /* save that info */
+ rml_uri = strdup(ptr);
+
+ /* extract the originating proc's name */
+ if (ORTE_SUCCESS != (rc = orte_rml_base_parse_uris(ptr,
rproc, NULL))) {
+ ORTE_ERROR_LOG(rc);
+ goto cleanup;
+ }
rc = ORTE_SUCCESS;
cleanup:
_______________________________________________
svn mailing list
s...@open-mpi.org
http://www.open-mpi.org/mailman/listinfo.cgi/svn