On Tue, 2004-08-10 at 22:42, Dan Sugalski wrote:
> Part of me's tempted to just define our own set of functions, but the 
> problem there is that we then put the onus on the embedding app to 
> conform to us, which I'm not sure is the right way to go about things.

When the standard APIs are all so completely different (kqueue, epoll,
aio, /dev/poll) and seriously broken (select, poll), I think you pretty
much HAVE to define your own API!  If you standardize on kqueue,
implementing epoll becomes incredibly difficult, and vice versa.  The
only solution that I am aware of is to introduce a sane API that works
with all possible underlying implementations.

I had a similar need for another project about a year ago.  I wanted my
my application to have a single I/O API, no matter which underlying
implementation was being used.  I looked at a few libraries at the time
that claimed to do exactly this, but they were all very heavyweight and
intrusive.  So, I rolled my own.  Here's a quick description...

The API:

Each file is represented by a filedescriptor and a callback routine. 
These are grouped into an io_atom structure.  io_atoms are intended to
be embedded into other data structures, so all io_atom
allocation/deallocation is handled by the caller.

    typedef struct io_atom {
        io_proc proc;
        int fd;
    } io_atom;

To use it:

    typedef {
        io_atom io;
        int parse_state;
        ... etc.
    } struct my_connection;

io_add starts monitoring an atom. Specify the events that you are
interested in in flags (IO_EXCEPT appears to be very platform
dependent...  IO_READ is also used to get incoming connections from a
listening socket).

    int io_add(io_atom *atom, int flags);

Change the events an atom is monitoring using io_set.
Stop monitoring an atom with io_del.

    int io_set(io_atom *atom, int flags);
    int io_del(io_atom *atom);

Whenever an event occurs on a filehandle, the atom's io_proc
notification is called.  The flags tell what happened.

    #define IO_READ 0x01
    #define IO_WRITE 0x02
    #define IO_EXCEPT 0x04

   typedef void (*io_proc)(struct io_atom *atom, int flags);

Your application waits for events and dispatches them using io_wait.  A
timeout of 0 means return immediately after dispatching all pending
events, a timeout of MAXINT means no timeout -- never return.  Timeout
is in milliseconds.

    int io_wait(int timeout);

Finally, because some of these implementations require dynamic memory
(kqueue), we need:

    void io_init();     // inits internal data structures
    void io_exit();     // releases all dynamic memory used

If you call io_exit without calling io_del on all the added FDs first,
the files are are not closed.  They are simply not monitored anymore.


Some more work needs to be put into standardizing what error messages
can be returned by what calls and exactly what they mean.  Extensive
documentation.  Generalization.

Write a clearer demo!  :)

Perhaps TODO:

In my application, the io_atom appears first in every struct, so it's
real easy and memory-efficient to convert the atom into its containing

    typedef struct connection {
        io_atom io;     ///< I/O information (set by rot_connect)
        struct in_addr remote_addr;
        int remote_port;

    void callback(struct io_atom *io, int flags)
        connection *conn = (connection*)io;
        do_stuff(conn, flags);

This might not be true in the general case...  Maybe the io_atom should
include a void* client-specified refcon?

In summary...

I'm quite happy with this design, but the code is marginal right now. 
Nevertheless, I've banged together a mediocre networking demo.  Baware
the boogs!!  I figure I should have some time in a few weeks to solidify
this and get it into Parrot if nobody solves the IO issue before then. 
My girlfriend is moving to Boston, so that should help a lot.  :)

    - Scott

P.S. This is all licensed under the same terms as Parrot: GPL or
Artistic 2.0, your choice.

// io.h
// Scott Bronson
// 2 Oct 2003

// This is the generic Async I/O API.  It can be implemented using
// select, poll, epoll, kqueue, aio, and /dev/poll (hopefully).
// This code is licensed under the same terms as Parrot itself.

#define IO_READ 0x01
#define IO_WRITE 0x02
#define IO_EXCEPT 0x04

// Tells how many incoming connections we can handle at once
// (the backlog parameter to listen)
#define STD_LISTEN_SIZE 128

struct io_atom;

 * This routine is called whenever there is action on an atom.
 * @param atom The atom that the proc is being called on.
 * @param flags What sort of action is happening.
typedef void (*io_proc)(struct io_atom *atom, int flags);

typedef struct io_atom {
	io_proc proc;
	int fd;
} io_atom;

void io_init();
void io_exit();

int io_add(io_atom *atom, int flags);
int io_set(io_atom *atom, int flags);
int io_del(io_atom *atom);
/// Waits for an event, then handles it.  Stops waiting if timeout occurs.
/// Specify MAXINT for no timeout.
int io_wait(int timeout);

// select.c
// Scott Bronson
// 4 October 2003
// Uses select to satisfy gatekeeper's network I/O
// Because of select's internal limitations, MAXFDS is 1024.
// This code is licensed under the same terms as Parrot itself.

#include <stdio.h>
#include <errno.h>
#include <values.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
#include "io.h"

static io_atom* connections[FD_SETSIZE];
static fd_set fd_read, fd_write, fd_except;
static int max_fd;	// the highest-numbered filedescriptor in connections.

// Pass the file descriptor that you'll be listening and accepting on.

void io_init()

void io_exit()
	// nothing to do

static void install(int fd, int flags)
	if(flags & IO_READ) {
		FD_SET(fd, &fd_read);
	} else {
		FD_CLR(fd, &fd_read);

	if(flags & IO_WRITE) {
		FD_SET(fd, &fd_write);
	} else {
		FD_CLR(fd, &fd_write);

	if(flags & IO_EXCEPT) {
		FD_SET(fd, &fd_except);
	} else {
		FD_CLR(fd, &fd_except);

int io_add(io_atom *atom, int flags)
	int fd = atom->fd;

	if(fd < 0 || fd > FD_SETSIZE) {
		return -ERANGE;
	if(connections[fd]) {
		return -EALREADY;

	connections[fd] = atom;
	install(fd, flags);
	if(fd > max_fd) max_fd = fd;
	return 0;

int io_set(io_atom *atom, int flags)
	int fd = atom->fd;

	if(fd < 0 || fd > FD_SETSIZE) {
		return -ERANGE;
	if(!connections[fd]) {
		return -EALREADY;

	install(fd, flags);

	return 0;

int io_del(io_atom *atom)
	int fd = atom->fd;

	if(fd < 0 || fd > FD_SETSIZE) {
		return -ERANGE;
	if(!connections[fd]) {
		return -EALREADY;

	install(fd, 0);
	connections[fd] = NULL;

	while((max_fd >= 0) && (connections[max_fd] == NULL))  {
		max_fd -= 1;

	return 0;

// Wait for events, then dispatch them.
// timeout is in milliseconds.  MAXINT == forever.

int io_wait(int timeout)
	struct timeval tv;
	struct timeval *tvp = &tv;
	int num, i, flags;

	if(timeout == MAXINT) {
		tvp = NULL;
	} else {
		tv.tv_sec = timeout / 1000;
		tv.tv_usec = (timeout % 1000) * 1000;

	fd_set rfds = fd_read;
	fd_set wfds = fd_write;
	fd_set efds = fd_except;

	num = select(1+max_fd, &rfds, &wfds, &efds, tvp);
	if(num < 0) {
		return num;

	for(i=0; i <= max_fd; i++) {
		flags = 0;
		if(FD_ISSET(i, &rfds)) flags |= IO_READ;
		if(FD_ISSET(i, &wfds)) flags |= IO_WRITE;
		if(FD_ISSET(i, &efds)) flags |= IO_EXCEPT;
		if(flags) {
			if(connections[i]) {
				(*connections[i]->proc)(connections[i], flags);
			} else {
				// what do we do -- event on an unknown connection?
				printf("Got an event on an uknown connection %d!\n", i);

	return num;

// iotest.c
// Scott Bronson
// 11 Aug 2004
// Compile: "cc -Wall iotest.c io_select.c -o iotest"
// Run it it, then "telnet localhost 21314" a bunch of times.
// This example doesn't really show off asynchronous I/O, the whole point
// of the io_atom library.  And it's 99% obtuse networking code.  So,
// really, it's a pretty darn poor demo.  But hopefully it's better than
// nothing.
// This code is licensed under the same terms as parrot itself.

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <values.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/ioctl.h>
#include <netinet/in.h>

#include "io.h"

#define PORT 21314

io_atom g_accepter;   // the listening socket
char g_char = 'A';
char g_readbuf[1024];

typedef struct {
	io_atom io;
	char c;
	int chars_processed;
} connection;

int set_nonblock(int sd)
    int flags = 1;

    // set nonblocking IO on the socket
    if(ioctl(sd, FIONBIO, &flags)) {
        // ioctl failed -- try alternative
        flags = fcntl(sd, F_GETFL, 0);
        if(flags < 0) {
            return -1;
        if(fcntl(sd, F_SETFL, flags | O_NONBLOCK) < 0) {
            return -1;

    return 0;

void connection_close(connection *conn)

void connection_proc(io_atom *ioa, int flags)
	connection *conn = (connection*)ioa;
    int fd = conn->io.fd;
    int len;
    if(flags & IO_READ) { 
        do {
            len = read(fd, g_readbuf, sizeof(g_readbuf));
        } while (errno == EINTR);   // stupid posix
        if(len > 0) {
			write(fd, g_readbuf, len);
            conn->chars_processed += len;
        } else if(len == 0) {
            // A 0-length read means remote has closed normally
        } else {
            // handle an error on the socket
            if(errno == EAGAIN) {
                // nothing to read?  weird.
            } else if(errno == EWOULDBLOCK) {
                // with glibc EAGAIN==EWOULDBLOCK so this is probably dead code
            } else {
                // there's some sort of read error on this stream.
    if(flags & IO_WRITE) {
		// there's more space in the write buffer
		// so continue writing.
    if(flags & IO_EXCEPT) {
        // I think this is also used for OOB.
        // recv (fd1, &c, 1, MSG_OOB);

void accept_proc(io_atom *ioa, int flags)
    connection *conn;
    struct sockaddr_in pin;
    socklen_t plen;
    int sd;

	// since the accepter only has IO_READ anyway, there's no need to
	// check the flags param.

    plen = sizeof(pin);
    while((sd = accept(ioa->fd, (struct sockaddr*)&pin, &plen)) < 0) {
        if(errno == EINTR) {
            // This call was interrupted by a signal.  Try again and
            // see if we receive a connection.
        if(errno == EAGAIN || errno == EWOULDBLOCK) {
            // socket is marked nonblocking but no pending connections
            // are present.  Weird.  I guess we should succeed but do nothing.

        // Probably there is a network error pending for this
        // connection (already!).  Should probably just ignore it...?

    if(set_nonblock(sd) < 0) {
        printf("Could not set nonblocking: %s.\n", strerror(errno));

    conn = malloc(sizeof(connection));
    if(!conn) {

	conn->io.fd = sd;
	conn->io.proc = connection_proc;

	if(io_add(&conn->io, IO_READ) < 0) {

	printf("connection opened from %s port %d given fd %d\n",
		inet_ntoa(pin.sin_addr), ntohs(pin.sin_port), conn->io.fd);

int main(int argc, char **argv)
	int sd;
	struct sockaddr_in sin;


	printf("Opening listening socket...\n");

	if((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {

	if(set_nonblock(sd) < 0) {

	memset(&sin, 0, sizeof(sin));
	sin.sin_family = AF_INET;
	sin.sin_port = htons(PORT);
	sin.sin_addr.s_addr = htonl(INADDR_ANY);

	if(bind(sd, (struct sockaddr*)&sin, sizeof(sin)) < 0) {

	if (listen(sd, STD_LISTEN_SIZE) == -1) {

	g_accepter.fd = sd;
	g_accepter.proc = accept_proc;

	if(io_add(&g_accepter, IO_READ) < 0) {

	printf("Listening on port %d, fd %d.\n", PORT, g_accepter.fd);

	for(;;) {


	return 0;

