In order to fix the latency issues, one first step is to make things
more asynchronous. This patch contains the basic building block for
that in making sockets non-blocking (mostly). Unfortunately it requires
patching the base X server, but it has a polling fallback (for libvnc
and such).

I've only updated xserver15.patch as that is the server I have here.
Please see if you can do the same modifications to the other versions
(that you have easy access to).

Rgds
-- 
Pierre Ossman            OpenSource-based Thin Client Technology
System Developer         Telephone: +46-13-21 46 00
Cendio AB                Web: http://www.cendio.com

A: Because it messes up the order in which people normally read text.
Q: Why is top-posting such a bad thing?
Index: unix/xserver/hw/vnc/vncExtInit.cc
===================================================================
--- unix/xserver/hw/vnc/vncExtInit.cc	(revision 4718)
+++ unix/xserver/hw/vnc/vncExtInit.cc	(working copy)
@@ -21,6 +21,7 @@
 #endif
 
 #include <stdio.h>
+#include <errno.h>
 
 extern "C" {
 #define class c_class
@@ -28,6 +29,7 @@
 #define NEED_EVENTS
 #include <X11/X.h>
 #include <X11/Xproto.h>
+#include <X11/Xpoll.h>
 #include "misc.h"
 #include "os.h"
 #include "dixstruct.h"
@@ -63,6 +65,8 @@
   static void vncResetProc(ExtensionEntry* extEntry);
   static void vncBlockHandler(pointer data, OSTimePtr t, pointer readmask);
   static void vncWakeupHandler(pointer data, int nfds, pointer readmask);
+  void vncWriteBlockHandler(fd_set *fds);
+  void vncWriteWakeupHandler(int nfds, fd_set *fds);
   static void vncClientStateChange(CallbackListPtr*, pointer, pointer);
   static void SendSelectionChangeEvent(Atom selection);
   static int ProcVncExtDispatch(ClientPtr client);
@@ -287,6 +291,9 @@
   SendSelectionChangeEvent(selection->selection);
 }
 
+static void vncWriteBlockHandlerFallback(OSTimePtr timeout);
+static void vncWriteWakeupHandlerFallback();
+
 //
 // vncBlockHandler - called just before the X server goes into select().  Call
 // on to the block handler for each desktop.  Then check whether any of the
@@ -297,6 +304,8 @@
 {
   fd_set* fds = (fd_set*)readmask;
 
+  vncWriteBlockHandlerFallback(timeout);
+
   for (int scr = 0; scr < screenInfo.numScreens; scr++)
     if (desktop[scr])
       desktop[scr]->blockHandler(fds);
@@ -311,8 +320,87 @@
       desktop[scr]->wakeupHandler(fds, nfds);
     }
   }
+
+  vncWriteWakeupHandlerFallback();
 }
 
+//
+// vncWriteBlockHandler - extra hack to be able to get the main select loop
+// to monitor writeable fds and not just readable. This requirers a modified
+// Xorg and might therefore not be called. When it is called though, it will
+// do so before vncBlockHandler (and vncWriteWakeupHandler called after
+// vncWakeupHandler).
+//
+
+static bool needFallback = true;
+static fd_set fallbackFds;
+static struct timeval tw;
+
+void vncWriteBlockHandler(fd_set *fds)
+{
+  needFallback = false;
+
+  for (int scr = 0; scr < screenInfo.numScreens; scr++)
+    if (desktop[scr])
+      desktop[scr]->writeBlockHandler(fds);
+}
+
+void vncWriteWakeupHandler(int nfds, fd_set *fds)
+{
+  for (int scr = 0; scr < screenInfo.numScreens; scr++) {
+    if (desktop[scr]) {
+      desktop[scr]->writeWakeupHandler(fds, nfds);
+    }
+  }
+}
+
+static void vncWriteBlockHandlerFallback(OSTimePtr timeout)
+{
+  if (!needFallback)
+    return;
+
+  FD_ZERO(&fallbackFds);
+  vncWriteBlockHandler(&fallbackFds);
+  needFallback = true;
+
+  if (!XFD_ANYSET(&fallbackFds))
+    return;
+
+  if ((*timeout == NULL) ||
+      ((*timeout)->tv_sec > 0) || ((*timeout)->tv_usec > 10000)) {
+    tw.tv_sec = 0;
+    tw.tv_usec = 10000;
+    *timeout = &tw;
+  }
+}
+
+static void vncWriteWakeupHandlerFallback()
+{
+  int ret;
+  struct timeval timeout;
+
+  if (!needFallback)
+    return;
+
+  if (!XFD_ANYSET(&fallbackFds))
+    return;
+
+  timeout.tv_sec = 0;
+  timeout.tv_usec = 0;
+
+  ret = select(XFD_SETSIZE, NULL, &fallbackFds, NULL, &timeout);
+  if (ret < 0) {
+    ErrorF("vncWriteWakeupHandlerFallback(): select: %s\n",
+           strerror(errno));
+    return;
+  }
+
+  if (ret == 0)
+    return;
+
+  vncWriteWakeupHandler(ret, &fallbackFds);
+}
+
 static void vncClientStateChange(CallbackListPtr*, pointer, pointer p)
 {
   ClientPtr client = ((NewClientInfoRec*)p)->client;
Index: unix/xserver/hw/vnc/XserverDesktop.cc
===================================================================
--- unix/xserver/hw/vnc/XserverDesktop.cc	(revision 4718)
+++ unix/xserver/hw/vnc/XserverDesktop.cc	(working copy)
@@ -581,6 +581,7 @@
         if (FD_ISSET(listener->getFd(), fds)) {
           FD_CLR(listener->getFd(), fds);
           Socket* sock = listener->accept();
+          sock->outStream().setBlocking(false);
           server->addSocket(sock);
           vlog.debug("new client, sock %d",sock->getFd());
         }
@@ -590,6 +591,7 @@
         if (FD_ISSET(httpListener->getFd(), fds)) {
           FD_CLR(httpListener->getFd(), fds);
           Socket* sock = httpListener->accept();
+          sock->outStream().setBlocking(false);
           httpServer->addSocket(sock);
           vlog.debug("new http client, sock %d",sock->getFd());
         }
@@ -632,6 +634,78 @@
   }
 }
 
+void XserverDesktop::writeBlockHandler(fd_set* fds)
+{
+  try {
+    std::list<Socket*> sockets;
+    std::list<Socket*>::iterator i;
+
+    server->getSockets(&sockets);
+    for (i = sockets.begin(); i != sockets.end(); i++) {
+      int fd = (*i)->getFd();
+      if ((*i)->isShutdown()) {
+        vlog.debug("client gone, sock %d",fd);
+        server->removeSocket(*i);
+        vncClientGone(fd);
+        delete (*i);
+      } else {
+        if ((*i)->outStream().bufferUsage() > 0)
+          FD_SET(fd, fds);
+      }
+    }
+
+    if (httpServer) {
+      httpServer->getSockets(&sockets);
+      for (i = sockets.begin(); i != sockets.end(); i++) {
+        int fd = (*i)->getFd();
+        if ((*i)->isShutdown()) {
+          vlog.debug("http client gone, sock %d",fd);
+          httpServer->removeSocket(*i);
+          delete (*i);
+        } else {
+          if ((*i)->outStream().bufferUsage() > 0)
+            FD_SET(fd, fds);
+        }
+      }
+    }
+  } catch (rdr::Exception& e) {
+    vlog.error("XserverDesktop::writeBlockHandler: %s",e.str());
+  }
+}
+
+void XserverDesktop::writeWakeupHandler(fd_set* fds, int nfds)
+{
+  if (nfds < 1)
+    return;
+
+  try {
+    std::list<Socket*> sockets;
+    std::list<Socket*>::iterator i;
+
+    server->getSockets(&sockets);
+    for (i = sockets.begin(); i != sockets.end(); i++) {
+      int fd = (*i)->getFd();
+      if (FD_ISSET(fd, fds)) {
+        FD_CLR(fd, fds);
+        (*i)->outStream().flush();
+      }
+    }
+
+    if (httpServer) {
+      httpServer->getSockets(&sockets);
+      for (i = sockets.begin(); i != sockets.end(); i++) {
+        int fd = (*i)->getFd();
+        if (FD_ISSET(fd, fds)) {
+          FD_CLR(fd, fds);
+          (*i)->outStream().flush();
+        }
+      }
+    }
+  } catch (rdr::Exception& e) {
+    vlog.error("XserverDesktop::writeWakeupHandler: %s",e.str());
+  }
+}
+
 void XserverDesktop::addClient(Socket* sock, bool reverse)
 {
   vlog.debug("new client, sock %d reverse %d",sock->getFd(),reverse);
Index: unix/xserver/hw/vnc/XserverDesktop.h
===================================================================
--- unix/xserver/hw/vnc/XserverDesktop.h	(revision 4718)
+++ unix/xserver/hw/vnc/XserverDesktop.h	(working copy)
@@ -72,6 +72,8 @@
   void ignoreHooks(bool b) { ignoreHooks_ = b; }
   void blockHandler(fd_set* fds);
   void wakeupHandler(fd_set* fds, int nfds);
+  void writeBlockHandler(fd_set* fds);
+  void writeWakeupHandler(fd_set* fds, int nfds);
   void addClient(network::Socket* sock, bool reverse);
   void disconnectClients();
 
Index: unix/xserver15.patch
===================================================================
--- unix/xserver15.patch	(revision 4718)
+++ unix/xserver15.patch	(working copy)
@@ -98,3 +98,61 @@
  #ifdef XIDLE
      if (!noXIdleExtension) XIdleExtensionInit();
  #endif
+--- xserver/os/WaitFor.c.orig	2011-10-07 12:57:57.000000000 +0200
++++ xserver/os/WaitFor.c	2011-10-07 13:21:11.000000000 +0200
+@@ -125,6 +125,9 @@
+ static void CheckAllTimers(void);
+ static OsTimerPtr timers = NULL;
+ 
++extern void vncWriteBlockHandler(fd_set *fds);
++extern void vncWriteWakeupHandler(int nfds, fd_set *fds);
++
+ /*****************
+  * WaitForSomething:
+  *     Make the server suspend until there is
+@@ -150,6 +153,7 @@
+     INT32 timeout = 0;
+     fd_set clientsReadable;
+     fd_set clientsWritable;
++    fd_set socketsWritable;
+     int curclient;
+     int selecterr;
+     int nready;
+@@ -220,23 +224,29 @@
+ 	SmartScheduleStopTimer ();
+ 
+ #endif
++	FD_ZERO(&socketsWritable);
++	vncWriteBlockHandler(&socketsWritable);
+ 	BlockHandler((pointer)&wt, (pointer)&LastSelectMask);
+ 	if (NewOutputPending)
+ 	    FlushAllOutput();
+ 	/* keep this check close to select() call to minimize race */
+ 	if (dispatchException)
+ 	    i = -1;
+-	else if (AnyClientsWriteBlocked)
+-	{
+-	    XFD_COPYSET(&ClientsWriteBlocked, &clientsWritable);
+-	    i = Select (MaxClients, &LastSelectMask, &clientsWritable, NULL, wt);
+-	}
+-	else 
+-	{
+-	    i = Select (MaxClients, &LastSelectMask, NULL, NULL, wt);
++	else {
++	    if (AnyClientsWriteBlocked)
++		XFD_ORSET(&socketsWritable, &ClientsWriteBlocked, &socketsWritable);
++
++	    if (XFD_ANYSET(&socketsWritable)) {
++		i = Select (MaxClients, &LastSelectMask, &socketsWritable, NULL, wt);
++		if (AnyClientsWriteBlocked)
++		    XFD_ANDSET(&clientsWritable, &socketsWritable, &ClientsWriteBlocked);
++	    } else {
++		i = Select (MaxClients, &LastSelectMask, NULL, NULL, wt);
++	    }
+ 	}
+ 	selecterr = GetErrno();
+ 	WakeupHandler(i, (pointer)&LastSelectMask);
++	vncWriteWakeupHandler(i, &socketsWritable);
+ #ifdef SMART_SCHEDULE
+ 	SmartScheduleStartTimer ();
+ #endif
Index: common/rdr/FdOutStream.cxx
===================================================================
--- common/rdr/FdOutStream.cxx	(revision 4718)
+++ common/rdr/FdOutStream.cxx	(working copy)
@@ -50,17 +50,18 @@
 
 enum { DEFAULT_BUF_SIZE = 16384 };
 
-FdOutStream::FdOutStream(int fd_, int timeoutms_, int bufSize_)
-  : fd(fd_), timeoutms(timeoutms_),
+FdOutStream::FdOutStream(int fd_, bool blocking_, int timeoutms_, int bufSize_)
+  : fd(fd_), blocking(blocking_), timeoutms(timeoutms_),
     bufSize(bufSize_ ? bufSize_ : DEFAULT_BUF_SIZE), offset(0)
 {
-  ptr = start = new U8[bufSize];
+  ptr = start = sentUpTo = new U8[bufSize];
   end = start + bufSize;
 }
 
 FdOutStream::~FdOutStream()
 {
   try {
+    blocking = true;
     flush();
   } catch (Exception&) {
   }
@@ -71,21 +72,56 @@
   timeoutms = timeoutms_;
 }
 
+void FdOutStream::setBlocking(bool blocking_) {
+  blocking = blocking_;
+}
+
 int FdOutStream::length()
 {
-  return offset + ptr - start;
+  return offset + ptr - sentUpTo;
 }
 
+int FdOutStream::bufferUsage()
+{
+  return ptr - sentUpTo;
+}
+
 void FdOutStream::flush()
 {
-  U8* sentUpTo = start;
+  int timeoutms_;
+
+  if (blocking)
+    timeoutms_ = timeoutms;
+  else
+    timeoutms_ = 0;
+
   while (sentUpTo < ptr) {
-    int n = writeWithTimeout((const void*) sentUpTo, ptr - sentUpTo);
+    int n = writeWithTimeout((const void*) sentUpTo,
+                             ptr - sentUpTo, timeoutms_);
+
+    // Timeout?
+    if (n == 0) {
+      // If non-blocking then we're done here
+      if (!blocking)
+        break;
+
+      // Otherwise try blocking (with possible timeout)
+      if ((timeoutms_ == 0) && (timeoutms != 0)) {
+        timeoutms_ = timeoutms;
+        break;
+      }
+
+      // Proper timeout
+      throw TimedOut();
+    }
+
     sentUpTo += n;
     offset += n;
   }
 
-  ptr = start;
+   // Managed to flush everything?
+  if (sentUpTo == ptr)
+    ptr = sentUpTo = start;
 }
 
 
@@ -94,8 +130,31 @@
   if (itemSize > bufSize)
     throw Exception("FdOutStream overrun: max itemSize exceeded");
 
+  // First try to get rid of the data we have
   flush();
 
+  // Still not enough space?
+  if (itemSize > end - ptr) {
+    // Can we shuffle things around?
+    // (don't do this if it gains us less than 25%)
+    if ((sentUpTo - start > bufSize / 4) &&
+        (itemSize < bufSize - (ptr - sentUpTo))) {
+      memmove(start, sentUpTo, ptr - sentUpTo);
+      ptr = start + (ptr - sentUpTo);
+      sentUpTo = start;
+    } else {
+      // Have to get rid of more data, so turn off non-blocking
+      // for a bit...
+      bool realBlocking;
+
+      realBlocking = blocking;
+      blocking = true;
+      flush();
+      blocking = realBlocking;
+    }
+  }
+
+  // Can we fit all the items asked for?
   if (itemSize * nItems > end - ptr)
     nItems = (end - ptr) / itemSize;
 
@@ -112,7 +171,7 @@
 // select() and write() returning EINTR.
 //
 
-int FdOutStream::writeWithTimeout(const void* data, int length)
+int FdOutStream::writeWithTimeout(const void* data, int length, int timeoutms)
 {
   int n;
 
@@ -146,7 +205,7 @@
 
     if (n < 0) throw SystemException("select",errno);
 
-    if (n == 0) throw TimedOut();
+    if (n == 0) return 0;
 
     do {
       n = ::write(fd, data, length);
Index: common/rdr/FdOutStream.h
===================================================================
--- common/rdr/FdOutStream.h	(revision 4718)
+++ common/rdr/FdOutStream.h	(working copy)
@@ -31,23 +31,28 @@
 
   public:
 
-    FdOutStream(int fd, int timeoutms=-1, int bufSize=0);
+    FdOutStream(int fd, bool blocking=true, int timeoutms=-1, int bufSize=0);
     virtual ~FdOutStream();
 
     void setTimeout(int timeoutms);
+    void setBlocking(bool blocking);
     int getFd() { return fd; }
 
     void flush();
     int length();
 
+    int bufferUsage();
+
   private:
     int overrun(int itemSize, int nItems);
-    int writeWithTimeout(const void* data, int length);
+    int writeWithTimeout(const void* data, int length, int timeoutms);
     int fd;
+    bool blocking;
     int timeoutms;
     int bufSize;
     int offset;
     U8* start;
+    U8* sentUpTo;
   };
 
 }

Attachment: signature.asc
Description: PGP signature

------------------------------------------------------------------------------
All the data continuously generated in your IT infrastructure contains a
definitive record of customers, application performance, security
threats, fraudulent activity and more. Splunk takes this data and makes
sense of it. Business sense. IT sense. Common sense.
http://p.sf.net/sfu/splunk-d2dcopy1
_______________________________________________
Tigervnc-devel mailing list
Tigervnc-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/tigervnc-devel

Reply via email to