Deliver local callbacks in response to remote events

 remote_internal.c |  255 ++++++++++++++++++++++++++++++++++++++++++++++++++++--
 1 file changed, 248 insertions(+), 7 deletions(-)
diff --git a/src/remote_internal.c b/src/remote_internal.c
index 35b7b4b..13537f7 100644
--- a/src/remote_internal.c
+++ b/src/remote_internal.c
@@ -34,6 +34,7 @@
 #include <signal.h>
 #include <sys/types.h>
 #include <sys/stat.h>
+#include <sys/poll.h>
 #include <fcntl.h>
 
 #ifdef HAVE_SYS_WAIT_H
@@ -73,6 +74,7 @@
 #include "remote_protocol.h"
 #include "memory.h"
 #include "util.h"
+#include "event.h"
 
 /* Per-connection private data. */
 #define MAGIC 999               /* private_data->magic if OK */
@@ -97,6 +99,10 @@ struct private_data {
     unsigned int saslDecodedLength;
     unsigned int saslDecodedOffset;
 #endif
+
+    virDomainEventQueuePtr domainEvents; /* The queue of events generated
+                                          * during a call / response rpc */
+    int eventFlushTimer;                 /* Timer for flushing domainEvents queue */
 };
 
 #define GET_PRIVATE(conn,retcode)                                       \
@@ -156,7 +162,10 @@ static void make_nonnull_domain (remote_nonnull_domain *dom_dst, virDomainPtr do
 static void make_nonnull_network (remote_nonnull_network *net_dst, virNetworkPtr net_src);
 static void make_nonnull_storage_pool (remote_nonnull_storage_pool *pool_dst, virStoragePoolPtr vol_src);
 static void make_nonnull_storage_vol (remote_nonnull_storage_vol *vol_dst, virStorageVolPtr vol_src);
-
+void remoteDomainEventFired(int fd, int event, void *data);
+static void remoteDomainProcessEvent(virConnectPtr conn, XDR *xdr);
+static void remoteDomainQueueEvent(virConnectPtr conn, XDR *xdr);
+void remoteDomainEventQueueFlush(int timer, void *opaque);
 /*----------------------------------------------------------------------*/
 
 /* Helper functions for remoteOpen. */
@@ -680,6 +689,26 @@ doRemoteOpen (virConnectPtr conn,
               (xdrproc_t) xdr_void, (char *) NULL) == -1)
         goto failed;
 
+    if(VIR_ALLOC(priv->domainEvents)<0) {
+        error(conn, VIR_ERR_INVALID_ARG, _("Error allocating domainEvents"));
+        goto failed;
+    }
+
+    DEBUG0("Adding Handler for remote events");
+    /* Set up a callback to listen on the socket data */
+    if (virEventAddHandle(priv->sock,
+                          POLLIN | POLLERR | POLLHUP,
+                          remoteDomainEventFired,
+                          conn) < 0) {
+        DEBUG0("virEventAddHandle failed: No addHandleImpl defined. continuing without events.");
+    }
+
+    DEBUG0("Adding Timeout for remote event queue flushing");
+    if ( (priv->eventFlushTimer = virEventAddTimeout(0,
+                                                     remoteDomainEventQueueFlush,
+                                                     conn)) < 0) {
+        DEBUG0("virEventAddTimeout failed: No addTimeoutImpl defined. continuing without events.");
+    }
     /* Successful. */
     retcode = VIR_DRV_OPEN_SUCCESS;
 
@@ -1101,6 +1130,11 @@ doRemoteClose (virConnectPtr conn, struct private_data *priv)
               (xdrproc_t) xdr_void, (char *) NULL) == -1)
         return -1;
 
+    /* Remove handle for remote events */
+    virEventRemoveHandle(priv->sock);
+    /* Remove timout */
+    virEventRemoveTimeout(priv->eventFlushTimer);
+
     /* Close socket. */
     if (priv->uses_tls && priv->session) {
         gnutls_bye (priv->session, GNUTLS_SHUT_RDWR);
@@ -1132,6 +1166,9 @@ doRemoteClose (virConnectPtr conn, struct private_data *priv)
     /* Free private data. */
     priv->magic = DEAD;
 
+    /* Free queued events */
+    virDomainEventQueueFree(priv->domainEvents);
+
     return 0;
 }
 
@@ -4288,6 +4325,46 @@ remoteAuthPolkit (virConnectPtr conn, struct private_data *priv, int in_open,
     return 0;
 }
 #endif /* HAVE_POLKIT */
+/*----------------------------------------------------------------------*/
+
+static int remoteDomainEventRegister (virConnectPtr conn,
+                               void *callback,
+                               void *opaque)
+{
+    struct private_data *priv = conn->privateData;
+
+    /* dispatch an rpc - so the server sde can track
+       how many callbacks are regstered */
+    remote_domain_events_register_args args;
+    args.callback = (unsigned long)callback;
+    args.user_data = (unsigned long)opaque;
+
+    if (call (conn, priv, 0, REMOTE_PROC_DOMAIN_EVENTS_REGISTER,
+              (xdrproc_t) xdr_remote_domain_events_register_args, (char *) &args,
+              (xdrproc_t) xdr_void, (char *) NULL) == -1)
+        return -1;
+
+
+    return 0;
+}
+
+static int remoteDomainEventDeregister (virConnectPtr conn,
+                                 void *callback)
+{
+    struct private_data *priv = conn->privateData;
+
+    /* dispatch an rpc - so the server sde can track
+    how many callbacks are regstered */
+    remote_domain_events_deregister_args args;
+    args.callback = (unsigned long)callback;
+
+    if (call (conn, priv, 0, REMOTE_PROC_DOMAIN_EVENTS_DEREGISTER,
+              (xdrproc_t) xdr_remote_domain_events_register_args, (char *) &args,
+              (xdrproc_t) xdr_void, (char *) NULL) == -1)
+        return -1;
+
+    return 0;
+}
 
 /*----------------------------------------------------------------------*/
 
@@ -4367,6 +4444,7 @@ call (virConnectPtr conn, struct private_data *priv,
         really_write (conn, priv, flags & REMOTE_CALL_IN_OPEN, buffer, len-4) == -1)
         return -1;
 
+retry_read:
     /* Read and deserialise length word. */
     if (really_read (conn, priv, flags & REMOTE_CALL_IN_OPEN, buffer2, sizeof buffer2) == -1)
         return -1;
@@ -4418,10 +4496,19 @@ call (virConnectPtr conn, struct private_data *priv,
         return -1;
     }
 
-    /* If we extend the server to actually send asynchronous messages, then
-     * we'll need to change this so that it can recognise an asynch
-     * message being received at this point.
-     */
+    if (hdr.proc == REMOTE_PROC_DOMAIN_EVENT &&
+        hdr.direction == REMOTE_MESSAGE) {
+        /* An async message has come in while we were waiting for the
+         * response. Process it to pull it off the wire, and try again
+         */
+        DEBUG0("Encountered an event while waiting for a response");
+
+        remoteDomainQueueEvent(conn, &xdr);
+
+        DEBUG0("Retrying read");
+        xdr_destroy (&xdr);
+        goto retry_read;
+    }
     if (hdr.proc != proc_nr) {
         __virRaiseError (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
                          NULL, NULL, VIR_FROM_REMOTE,
@@ -4872,6 +4959,8 @@ static virDriver driver = {
     .domainMemoryPeek = remoteDomainMemoryPeek,
     .nodeGetCellsFreeMemory = remoteNodeGetCellsFreeMemory,
     .getFreeMemory = remoteNodeGetFreeMemory,
+    .domainEventRegister = remoteDomainEventRegister,
+    .domainEventDeregister = remoteDomainEventDeregister,
 };
 
 static virNetworkDriver network_driver = {
@@ -4957,3 +5046,140 @@ remoteRegister (void)
 
     return 0;
 }
+
+/**
+ * remoteDomainReadEvent
+ *
+ * Read the event data off the wire
+ */
+static int remoteDomainReadEvent(virConnectPtr conn, XDR *xdr,
+                                 virDomainPtr *dom, int *event,
+                                 virConnectDomainEventCallback *cb,
+                                 void **opaque)
+{
+    remote_domain_event_ret ret;
+    memset (&ret, 0, sizeof ret);
+
+    /* unmarshall parameters, and process it*/
+    if (! xdr_remote_domain_event_ret(xdr, &ret) ) {
+        error (conn, VIR_ERR_RPC, _("remoteDomainProcessEvent: unmarshalling ret"));
+        return -1;
+    }
+
+    *dom = get_nonnull_domain(conn,ret.dom);
+    *event = ret.event;
+    *cb = (virConnectDomainEventCallback)ret.callback;
+    *opaque = (void *)ret.user_data;
+
+    return 0;
+}
+
+static void remoteDomainProcessEvent(virConnectPtr conn, XDR *xdr)
+{
+    virDomainPtr dom;
+    int event;
+    virConnectDomainEventCallback cb;
+    void *opaque;
+    if(!remoteDomainReadEvent(conn, xdr, &dom, &event, &cb, &opaque)) {
+        DEBUG0("Calling domain event callback (no queue)");
+        if(cb)
+            cb(conn,dom,event,opaque);
+    }
+}
+
+static void remoteDomainQueueEvent(virConnectPtr conn, XDR *xdr)
+{
+    virDomainPtr dom;
+    int event;
+    virConnectDomainEventCallback cb;
+    void *opaque;
+    struct private_data *priv = conn->privateData;
+
+    if(!remoteDomainReadEvent(conn, xdr, &dom, &event, &cb, &opaque))
+    {
+        if( virDomainEventCallbackQueuePush(priv->domainEvents, dom, event, cb, opaque) < 0 ) {
+            DEBUG("%s", "Error adding event to queue");
+        }
+    }
+}
+
+/** remoteDomainEventFired:
+ *
+ * The callback for monitoring the remote socket
+ * for event data
+ */
+void remoteDomainEventFired(int fd ATTRIBUTE_UNUSED,
+                             int event ATTRIBUTE_UNUSED,
+                             void *opaque)
+{
+    char buffer[REMOTE_MESSAGE_MAX];
+    char buffer2[4];
+    struct remote_message_header hdr;
+    XDR xdr;
+    int len;
+
+    virConnectPtr        conn = opaque;
+    struct private_data *priv = conn->privateData;
+
+    DEBUG("%s : Event fired", __FUNCTION__);
+
+    /* Read and deserialise length word. */
+    if (really_read (conn, priv, 0, buffer2, sizeof buffer2) == -1)
+        return;
+
+    xdrmem_create (&xdr, buffer2, sizeof buffer2, XDR_DECODE);
+    if (!xdr_int (&xdr, &len)) {
+        error (conn, VIR_ERR_RPC, _("xdr_int (length word, reply)"));
+        return;
+    }
+    xdr_destroy (&xdr);
+
+    /* Length includes length word - adjust to real length to read. */
+    len -= 4;
+
+    if (len < 0 || len > REMOTE_MESSAGE_MAX) {
+        error (conn, VIR_ERR_RPC, _("packet received from server too large"));
+        return;
+    }
+
+    /* Read reply header and what follows (either a ret or an error). */
+    if (really_read (conn, priv, 0, buffer, len) == -1) {
+        error (conn, VIR_ERR_RPC, _("error reading buffer from memory"));
+        return;
+    }
+
+    /* Deserialise reply header. */
+    xdrmem_create (&xdr, buffer, len, XDR_DECODE);
+    if (!xdr_remote_message_header (&xdr, &hdr)) {
+        error (conn, VIR_ERR_RPC, _("invalid header in event firing"));
+        return;
+    }
+
+    if (hdr.proc == REMOTE_PROC_DOMAIN_EVENT &&
+        hdr.direction == REMOTE_MESSAGE) {
+        DEBUG0("Encountered an async event");
+        remoteDomainProcessEvent(conn, &xdr);
+    } else {
+        DEBUG0("invalid proc in event firing");
+        error (conn, VIR_ERR_RPC, _("invalid proc in event firing"));
+    }
+}
+
+void remoteDomainEventQueueFlush(int timer ATTRIBUTE_UNUSED,
+                                 void *opaque)
+{
+    virDomainEventPtr domEvent;
+    virConnectPtr        conn = opaque;
+    struct private_data *priv = conn->privateData;
+
+    DEBUG0("Flushing domain events");
+    while( (domEvent = virDomainEventCallbackQueuePop(priv->domainEvents)) ) {
+        DEBUG("   Flushing %p", domEvent);
+        if(domEvent->cb)
+            domEvent->cb(domEvent->dom->conn,
+                         domEvent->dom,
+                         domEvent->event,
+                         domEvent->opaque);
+        VIR_FREE(domEvent);
+    }
+}
--
Libvir-list mailing list
Libvir-list@redhat.com
https://www.redhat.com/mailman/listinfo/libvir-list

Reply via email to