On Tue, 2004-08-10 at 22:42, Dan Sugalski wrote:
> Part of me's tempted to just define our own set of functions, but the 
> problem there is that we then put the onus on the embedding app to 
> conform to us, which I'm not sure is the right way to go about things.

When the standard APIs are all so completely different (kqueue, epoll,
aio, /dev/poll) and seriously broken (select, poll), I think you pretty
much HAVE to define your own API!  If you standardize on kqueue,
implementing epoll becomes incredibly difficult, and vice versa.  The
only solution that I am aware of is to introduce a sane API that works
with all possible underlying implementations.


I had a similar need for another project about a year ago.  I wanted my
my application to have a single I/O API, no matter which underlying
implementation was being used.  I looked at a few libraries at the time
that claimed to do exactly this, but they were all very heavyweight and
intrusive.  So, I rolled my own.  Here's a quick description...


The API:

Each file is represented by a filedescriptor and a callback routine. 
These are grouped into an io_atom structure.  io_atoms are intended to
be embedded into other data structures, so all io_atom
allocation/deallocation is handled by the caller.

    typedef struct io_atom {
        io_proc proc;
        int fd;
    } io_atom;

To use it:

    typedef {
        io_atom io;
        int parse_state;
        ... etc.
    } struct my_connection;


io_add starts monitoring an atom. Specify the events that you are
interested in in flags (IO_EXCEPT appears to be very platform
dependent...  IO_READ is also used to get incoming connections from a
listening socket).

    int io_add(io_atom *atom, int flags);

Change the events an atom is monitoring using io_set.
Stop monitoring an atom with io_del.

    int io_set(io_atom *atom, int flags);
    int io_del(io_atom *atom);


Whenever an event occurs on a filehandle, the atom's io_proc
notification is called.  The flags tell what happened.

    #define IO_READ 0x01
    #define IO_WRITE 0x02
    #define IO_EXCEPT 0x04

   typedef void (*io_proc)(struct io_atom *atom, int flags);



Your application waits for events and dispatches them using io_wait.  A
timeout of 0 means return immediately after dispatching all pending
events, a timeout of MAXINT means no timeout -- never return.  Timeout
is in milliseconds.

    int io_wait(int timeout);


Finally, because some of these implementations require dynamic memory
(kqueue), we need:

    void io_init();     // inits internal data structures
    void io_exit();     // releases all dynamic memory used

If you call io_exit without calling io_del on all the added FDs first,
the files are are not closed.  They are simply not monitored anymore.


TODO:

Some more work needs to be put into standardizing what error messages
can be returned by what calls and exactly what they mean.  Extensive
documentation.  Generalization.

Write a clearer demo!  :)


Perhaps TODO:

In my application, the io_atom appears first in every struct, so it's
real easy and memory-efficient to convert the atom into its containing
structure:

    typedef struct connection {
        io_atom io;     ///< I/O information (set by rot_connect)
        struct in_addr remote_addr;
        int remote_port;
        ...

    void callback(struct io_atom *io, int flags)
    {
        connection *conn = (connection*)io;
        do_stuff(conn, flags);
    }

This might not be true in the general case...  Maybe the io_atom should
include a void* client-specified refcon?


In summary...

I'm quite happy with this design, but the code is marginal right now. 
Nevertheless, I've banged together a mediocre networking demo.  Baware
the boogs!!  I figure I should have some time in a few weeks to solidify
this and get it into Parrot if nobody solves the IO issue before then. 
My girlfriend is moving to Boston, so that should help a lot.  :)

    - Scott


P.S. This is all licensed under the same terms as Parrot: GPL or
Artistic 2.0, your choice.


// io.h
// Scott Bronson
// 2 Oct 2003

// This is the generic Async I/O API.  It can be implemented using
// select, poll, epoll, kqueue, aio, and /dev/poll (hopefully).
//
// This code is licensed under the same terms as Parrot itself.

#define IO_READ 0x01
#define IO_WRITE 0x02
#define IO_EXCEPT 0x04


// Tells how many incoming connections we can handle at once
// (the backlog parameter to listen)
#define STD_LISTEN_SIZE 128

struct io_atom;

/**
 * This routine is called whenever there is action on an atom.
 *
 * @param atom The atom that the proc is being called on.
 * @param flags What sort of action is happening.
 */
typedef void (*io_proc)(struct io_atom *atom, int flags);

typedef struct io_atom {
	io_proc proc;
	int fd;
} io_atom;


void io_init();
void io_exit();

int io_add(io_atom *atom, int flags);
int io_set(io_atom *atom, int flags);
int io_del(io_atom *atom);
/// Waits for an event, then handles it.  Stops waiting if timeout occurs.
/// Specify MAXINT for no timeout.
int io_wait(int timeout);

// select.c
// Scott Bronson
// 4 October 2003
//
// Uses select to satisfy gatekeeper's network I/O
// Because of select's internal limitations, MAXFDS is 1024.
//
// This code is licensed under the same terms as Parrot itself.


#include <stdio.h>
#include <errno.h>
#include <values.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
#include "io.h"


static io_atom* connections[FD_SETSIZE];
static fd_set fd_read, fd_write, fd_except;
static int max_fd;	// the highest-numbered filedescriptor in connections.


// Pass the file descriptor that you'll be listening and accepting on.

void io_init()
{
	FD_ZERO(&fd_read);
	FD_ZERO(&fd_write);
	FD_ZERO(&fd_except);
}


void io_exit()
{
	// nothing to do
}


static void install(int fd, int flags)
{
	if(flags & IO_READ) {
		FD_SET(fd, &fd_read);
	} else {
		FD_CLR(fd, &fd_read);
	}

	if(flags & IO_WRITE) {
		FD_SET(fd, &fd_write);
	} else {
		FD_CLR(fd, &fd_write);
	}

	if(flags & IO_EXCEPT) {
		FD_SET(fd, &fd_except);
	} else {
		FD_CLR(fd, &fd_except);
	}
}


int io_add(io_atom *atom, int flags)
{
	int fd = atom->fd;

	if(fd < 0 || fd > FD_SETSIZE) {
		return -ERANGE;
	}
	if(connections[fd]) {
		return -EALREADY;
	}

	connections[fd] = atom;
	install(fd, flags);
	if(fd > max_fd) max_fd = fd;
	
	return 0;
}


int io_set(io_atom *atom, int flags)
{
	int fd = atom->fd;

	if(fd < 0 || fd > FD_SETSIZE) {
		return -ERANGE;
	}
	if(!connections[fd]) {
		return -EALREADY;
	}

	install(fd, flags);

	return 0;
}


int io_del(io_atom *atom)
{
	int fd = atom->fd;

	if(fd < 0 || fd > FD_SETSIZE) {
		return -ERANGE;
	}
	if(!connections[fd]) {
		return -EALREADY;
	}

	install(fd, 0);
	connections[fd] = NULL;

	while((max_fd >= 0) && (connections[max_fd] == NULL))  {
		max_fd -= 1;
	}

	return 0;
}


// Wait for events, then dispatch them.
// timeout is in milliseconds.  MAXINT == forever.

int io_wait(int timeout)
{
	struct timeval tv;
	struct timeval *tvp = &tv;
	int num, i, flags;

	if(timeout == MAXINT) {
		tvp = NULL;
	} else {
		tv.tv_sec = timeout / 1000;
		tv.tv_usec = (timeout % 1000) * 1000;
	}

	fd_set rfds = fd_read;
	fd_set wfds = fd_write;
	fd_set efds = fd_except;

	num = select(1+max_fd, &rfds, &wfds, &efds, tvp);
	if(num < 0) {
		perror("select");
		return num;
	}

	for(i=0; i <= max_fd; i++) {
		flags = 0;
		if(FD_ISSET(i, &rfds)) flags |= IO_READ;
		if(FD_ISSET(i, &wfds)) flags |= IO_WRITE;
		if(FD_ISSET(i, &efds)) flags |= IO_EXCEPT;
		if(flags) {
			if(connections[i]) {
				(*connections[i]->proc)(connections[i], flags);
			} else {
				// what do we do -- event on an unknown connection?
				printf("Got an event on an uknown connection %d!\n", i);
			}
		}
	}

	return num;
}

// iotest.c
// Scott Bronson
// 11 Aug 2004
//
// Compile: "cc -Wall iotest.c io_select.c -o iotest"
// Run it it, then "telnet localhost 21314" a bunch of times.
//
// This example doesn't really show off asynchronous I/O, the whole point
// of the io_atom library.  And it's 99% obtuse networking code.  So,
// really, it's a pretty darn poor demo.  But hopefully it's better than
// nothing.
//
// This code is licensed under the same terms as parrot itself.

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <values.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/ioctl.h>
#include <netinet/in.h>


#include "io.h"

#define PORT 21314


io_atom g_accepter;   // the listening socket
char g_char = 'A';
char g_readbuf[1024];


typedef struct {
	io_atom io;
	char c;
	int chars_processed;
} connection;


int set_nonblock(int sd)
{
    int flags = 1;

    // set nonblocking IO on the socket
    if(ioctl(sd, FIONBIO, &flags)) {
        // ioctl failed -- try alternative
        flags = fcntl(sd, F_GETFL, 0);
        if(flags < 0) {
            return -1;
        }
        if(fcntl(sd, F_SETFL, flags | O_NONBLOCK) < 0) {
            return -1;
        }
    }

    return 0;
}


void connection_close(connection *conn)
{
	io_del(&conn->io);
	close(conn->io.fd);
	free(conn);
}


void connection_proc(io_atom *ioa, int flags)
{
	connection *conn = (connection*)ioa;
    int fd = conn->io.fd;
    int len;
        
    if(flags & IO_READ) { 
        do {
            len = read(fd, g_readbuf, sizeof(g_readbuf));
        } while (errno == EINTR);   // stupid posix
    
        if(len > 0) {
			write(fd, g_readbuf, len);
            conn->chars_processed += len;
        } else if(len == 0) {
            // A 0-length read means remote has closed normally
			connection_close(conn);
			return;
        } else {
            // handle an error on the socket
            if(errno == EAGAIN) {
                // nothing to read?  weird.
            } else if(errno == EWOULDBLOCK) {
                // with glibc EAGAIN==EWOULDBLOCK so this is probably dead code
            } else {
                // there's some sort of read error on this stream.
				connection_close(conn);
                return;
            }
        }
    }
    
    if(flags & IO_WRITE) {
		// there's more space in the write buffer
		// so continue writing.
    }   
            
    if(flags & IO_EXCEPT) {
        // I think this is also used for OOB.
        // recv (fd1, &c, 1, MSG_OOB);
		connection_close(conn);
		return;
    }
}


void accept_proc(io_atom *ioa, int flags)
{
    connection *conn;
    struct sockaddr_in pin;
    socklen_t plen;
    int sd;

	// since the accepter only has IO_READ anyway, there's no need to
	// check the flags param.

    plen = sizeof(pin);
    while((sd = accept(ioa->fd, (struct sockaddr*)&pin, &plen)) < 0) {
        if(errno == EINTR) {
            // This call was interrupted by a signal.  Try again and
            // see if we receive a connection.
            continue;
        }
        if(errno == EAGAIN || errno == EWOULDBLOCK) {
            // socket is marked nonblocking but no pending connections
            // are present.  Weird.  I guess we should succeed but do nothing.
            return;
        }

        // Probably there is a network error pending for this
        // connection (already!).  Should probably just ignore it...?
        return;
    }

    if(set_nonblock(sd) < 0) {
        printf("Could not set nonblocking: %s.\n", strerror(errno));
        close(sd);
        return;
    }

    conn = malloc(sizeof(connection));
    if(!conn) {
		close(sd);
		return;
	}

	conn->io.fd = sd;
	conn->io.proc = connection_proc;

	if(io_add(&conn->io, IO_READ) < 0) {
		perror("io_add_main");
		close(sd);
		exit(1);
	}

	printf("connection opened from %s port %d given fd %d\n",
		inet_ntoa(pin.sin_addr), ntohs(pin.sin_port), conn->io.fd);
}


int main(int argc, char **argv)
{
	int sd;
	struct sockaddr_in sin;

	io_init();

	printf("Opening listening socket...\n");

	if((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
		perror("socket");
		exit(1);
	}

	if(set_nonblock(sd) < 0) {
		perror("setnonblocking");
		close(sd);
		exit(1);
	}

	memset(&sin, 0, sizeof(sin));
	sin.sin_family = AF_INET;
	sin.sin_port = htons(PORT);
	sin.sin_addr.s_addr = htonl(INADDR_ANY);

	if(bind(sd, (struct sockaddr*)&sin, sizeof(sin)) < 0) {
		perror("bind");
		close(sd);
		exit(1);
	}

	if (listen(sd, STD_LISTEN_SIZE) == -1) {
		perror("listen");
		close(sd);
		exit(1);
	}

	g_accepter.fd = sd;
	g_accepter.proc = accept_proc;

	if(io_add(&g_accepter, IO_READ) < 0) {
		perror("io_add_main");
		close(sd);
		exit(1);
	}

	printf("Listening on port %d, fd %d.\n", PORT, g_accepter.fd);

	for(;;) {
		io_wait(MAXINT);
	}

	io_exit();

	return 0;
}


Reply via email to