On Tue, 2004-08-10 at 22:42, Dan Sugalski wrote: > Part of me's tempted to just define our own set of functions, but the > problem there is that we then put the onus on the embedding app to > conform to us, which I'm not sure is the right way to go about things.
When the standard APIs are all so completely different (kqueue, epoll, aio, /dev/poll) and seriously broken (select, poll), I think you pretty much HAVE to define your own API! If you standardize on kqueue, implementing epoll becomes incredibly difficult, and vice versa. The only solution that I am aware of is to introduce a sane API that works with all possible underlying implementations. I had a similar need for another project about a year ago. I wanted my my application to have a single I/O API, no matter which underlying implementation was being used. I looked at a few libraries at the time that claimed to do exactly this, but they were all very heavyweight and intrusive. So, I rolled my own. Here's a quick description... The API: Each file is represented by a filedescriptor and a callback routine. These are grouped into an io_atom structure. io_atoms are intended to be embedded into other data structures, so all io_atom allocation/deallocation is handled by the caller. typedef struct io_atom { io_proc proc; int fd; } io_atom; To use it: typedef { io_atom io; int parse_state; ... etc. } struct my_connection; io_add starts monitoring an atom. Specify the events that you are interested in in flags (IO_EXCEPT appears to be very platform dependent... IO_READ is also used to get incoming connections from a listening socket). int io_add(io_atom *atom, int flags); Change the events an atom is monitoring using io_set. Stop monitoring an atom with io_del. int io_set(io_atom *atom, int flags); int io_del(io_atom *atom); Whenever an event occurs on a filehandle, the atom's io_proc notification is called. The flags tell what happened. #define IO_READ 0x01 #define IO_WRITE 0x02 #define IO_EXCEPT 0x04 typedef void (*io_proc)(struct io_atom *atom, int flags); Your application waits for events and dispatches them using io_wait. A timeout of 0 means return immediately after dispatching all pending events, a timeout of MAXINT means no timeout -- never return. Timeout is in milliseconds. int io_wait(int timeout); Finally, because some of these implementations require dynamic memory (kqueue), we need: void io_init(); // inits internal data structures void io_exit(); // releases all dynamic memory used If you call io_exit without calling io_del on all the added FDs first, the files are are not closed. They are simply not monitored anymore. TODO: Some more work needs to be put into standardizing what error messages can be returned by what calls and exactly what they mean. Extensive documentation. Generalization. Write a clearer demo! :) Perhaps TODO: In my application, the io_atom appears first in every struct, so it's real easy and memory-efficient to convert the atom into its containing structure: typedef struct connection { io_atom io; ///< I/O information (set by rot_connect) struct in_addr remote_addr; int remote_port; ... void callback(struct io_atom *io, int flags) { connection *conn = (connection*)io; do_stuff(conn, flags); } This might not be true in the general case... Maybe the io_atom should include a void* client-specified refcon? In summary... I'm quite happy with this design, but the code is marginal right now. Nevertheless, I've banged together a mediocre networking demo. Baware the boogs!! I figure I should have some time in a few weeks to solidify this and get it into Parrot if nobody solves the IO issue before then. My girlfriend is moving to Boston, so that should help a lot. :) - Scott P.S. This is all licensed under the same terms as Parrot: GPL or Artistic 2.0, your choice.
// io.h // Scott Bronson // 2 Oct 2003 // This is the generic Async I/O API. It can be implemented using // select, poll, epoll, kqueue, aio, and /dev/poll (hopefully). // // This code is licensed under the same terms as Parrot itself. #define IO_READ 0x01 #define IO_WRITE 0x02 #define IO_EXCEPT 0x04 // Tells how many incoming connections we can handle at once // (the backlog parameter to listen) #define STD_LISTEN_SIZE 128 struct io_atom; /** * This routine is called whenever there is action on an atom. * * @param atom The atom that the proc is being called on. * @param flags What sort of action is happening. */ typedef void (*io_proc)(struct io_atom *atom, int flags); typedef struct io_atom { io_proc proc; int fd; } io_atom; void io_init(); void io_exit(); int io_add(io_atom *atom, int flags); int io_set(io_atom *atom, int flags); int io_del(io_atom *atom); /// Waits for an event, then handles it. Stops waiting if timeout occurs. /// Specify MAXINT for no timeout. int io_wait(int timeout);
// select.c // Scott Bronson // 4 October 2003 // // Uses select to satisfy gatekeeper's network I/O // Because of select's internal limitations, MAXFDS is 1024. // // This code is licensed under the same terms as Parrot itself. #include <stdio.h> #include <errno.h> #include <values.h> #include <sys/time.h> #include <sys/types.h> #include <unistd.h> #include "io.h" static io_atom* connections[FD_SETSIZE]; static fd_set fd_read, fd_write, fd_except; static int max_fd; // the highest-numbered filedescriptor in connections. // Pass the file descriptor that you'll be listening and accepting on. void io_init() { FD_ZERO(&fd_read); FD_ZERO(&fd_write); FD_ZERO(&fd_except); } void io_exit() { // nothing to do } static void install(int fd, int flags) { if(flags & IO_READ) { FD_SET(fd, &fd_read); } else { FD_CLR(fd, &fd_read); } if(flags & IO_WRITE) { FD_SET(fd, &fd_write); } else { FD_CLR(fd, &fd_write); } if(flags & IO_EXCEPT) { FD_SET(fd, &fd_except); } else { FD_CLR(fd, &fd_except); } } int io_add(io_atom *atom, int flags) { int fd = atom->fd; if(fd < 0 || fd > FD_SETSIZE) { return -ERANGE; } if(connections[fd]) { return -EALREADY; } connections[fd] = atom; install(fd, flags); if(fd > max_fd) max_fd = fd; return 0; } int io_set(io_atom *atom, int flags) { int fd = atom->fd; if(fd < 0 || fd > FD_SETSIZE) { return -ERANGE; } if(!connections[fd]) { return -EALREADY; } install(fd, flags); return 0; } int io_del(io_atom *atom) { int fd = atom->fd; if(fd < 0 || fd > FD_SETSIZE) { return -ERANGE; } if(!connections[fd]) { return -EALREADY; } install(fd, 0); connections[fd] = NULL; while((max_fd >= 0) && (connections[max_fd] == NULL)) { max_fd -= 1; } return 0; } // Wait for events, then dispatch them. // timeout is in milliseconds. MAXINT == forever. int io_wait(int timeout) { struct timeval tv; struct timeval *tvp = &tv; int num, i, flags; if(timeout == MAXINT) { tvp = NULL; } else { tv.tv_sec = timeout / 1000; tv.tv_usec = (timeout % 1000) * 1000; } fd_set rfds = fd_read; fd_set wfds = fd_write; fd_set efds = fd_except; num = select(1+max_fd, &rfds, &wfds, &efds, tvp); if(num < 0) { perror("select"); return num; } for(i=0; i <= max_fd; i++) { flags = 0; if(FD_ISSET(i, &rfds)) flags |= IO_READ; if(FD_ISSET(i, &wfds)) flags |= IO_WRITE; if(FD_ISSET(i, &efds)) flags |= IO_EXCEPT; if(flags) { if(connections[i]) { (*connections[i]->proc)(connections[i], flags); } else { // what do we do -- event on an unknown connection? printf("Got an event on an uknown connection %d!\n", i); } } } return num; }
// iotest.c // Scott Bronson // 11 Aug 2004 // // Compile: "cc -Wall iotest.c io_select.c -o iotest" // Run it it, then "telnet localhost 21314" a bunch of times. // // This example doesn't really show off asynchronous I/O, the whole point // of the io_atom library. And it's 99% obtuse networking code. So, // really, it's a pretty darn poor demo. But hopefully it's better than // nothing. // // This code is licensed under the same terms as parrot itself. #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <fcntl.h> #include <errno.h> #include <values.h> #include <sys/types.h> #include <sys/socket.h> #include <sys/ioctl.h> #include <netinet/in.h> #include "io.h" #define PORT 21314 io_atom g_accepter; // the listening socket char g_char = 'A'; char g_readbuf[1024]; typedef struct { io_atom io; char c; int chars_processed; } connection; int set_nonblock(int sd) { int flags = 1; // set nonblocking IO on the socket if(ioctl(sd, FIONBIO, &flags)) { // ioctl failed -- try alternative flags = fcntl(sd, F_GETFL, 0); if(flags < 0) { return -1; } if(fcntl(sd, F_SETFL, flags | O_NONBLOCK) < 0) { return -1; } } return 0; } void connection_close(connection *conn) { io_del(&conn->io); close(conn->io.fd); free(conn); } void connection_proc(io_atom *ioa, int flags) { connection *conn = (connection*)ioa; int fd = conn->io.fd; int len; if(flags & IO_READ) { do { len = read(fd, g_readbuf, sizeof(g_readbuf)); } while (errno == EINTR); // stupid posix if(len > 0) { write(fd, g_readbuf, len); conn->chars_processed += len; } else if(len == 0) { // A 0-length read means remote has closed normally connection_close(conn); return; } else { // handle an error on the socket if(errno == EAGAIN) { // nothing to read? weird. } else if(errno == EWOULDBLOCK) { // with glibc EAGAIN==EWOULDBLOCK so this is probably dead code } else { // there's some sort of read error on this stream. connection_close(conn); return; } } } if(flags & IO_WRITE) { // there's more space in the write buffer // so continue writing. } if(flags & IO_EXCEPT) { // I think this is also used for OOB. // recv (fd1, &c, 1, MSG_OOB); connection_close(conn); return; } } void accept_proc(io_atom *ioa, int flags) { connection *conn; struct sockaddr_in pin; socklen_t plen; int sd; // since the accepter only has IO_READ anyway, there's no need to // check the flags param. plen = sizeof(pin); while((sd = accept(ioa->fd, (struct sockaddr*)&pin, &plen)) < 0) { if(errno == EINTR) { // This call was interrupted by a signal. Try again and // see if we receive a connection. continue; } if(errno == EAGAIN || errno == EWOULDBLOCK) { // socket is marked nonblocking but no pending connections // are present. Weird. I guess we should succeed but do nothing. return; } // Probably there is a network error pending for this // connection (already!). Should probably just ignore it...? return; } if(set_nonblock(sd) < 0) { printf("Could not set nonblocking: %s.\n", strerror(errno)); close(sd); return; } conn = malloc(sizeof(connection)); if(!conn) { close(sd); return; } conn->io.fd = sd; conn->io.proc = connection_proc; if(io_add(&conn->io, IO_READ) < 0) { perror("io_add_main"); close(sd); exit(1); } printf("connection opened from %s port %d given fd %d\n", inet_ntoa(pin.sin_addr), ntohs(pin.sin_port), conn->io.fd); } int main(int argc, char **argv) { int sd; struct sockaddr_in sin; io_init(); printf("Opening listening socket...\n"); if((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { perror("socket"); exit(1); } if(set_nonblock(sd) < 0) { perror("setnonblocking"); close(sd); exit(1); } memset(&sin, 0, sizeof(sin)); sin.sin_family = AF_INET; sin.sin_port = htons(PORT); sin.sin_addr.s_addr = htonl(INADDR_ANY); if(bind(sd, (struct sockaddr*)&sin, sizeof(sin)) < 0) { perror("bind"); close(sd); exit(1); } if (listen(sd, STD_LISTEN_SIZE) == -1) { perror("listen"); close(sd); exit(1); } g_accepter.fd = sd; g_accepter.proc = accept_proc; if(io_add(&g_accepter, IO_READ) < 0) { perror("io_add_main"); close(sd); exit(1); } printf("Listening on port %d, fd %d.\n", PORT, g_accepter.fd); for(;;) { io_wait(MAXINT); } io_exit(); return 0; }