Hi, I have implemented a first version of syslogd that can send messages via TCP streams. Some error handling is missing like reconnecting after the remote loghost has closed the connection.
I would like to get this in and develop the missing parts in tree. ok? bluhm Index: usr.sbin/syslogd/privsep.c =================================================================== RCS file: /data/mirror/openbsd/cvs/src/usr.sbin/syslogd/privsep.c,v retrieving revision 1.48 diff -u -p -u -p -r1.48 privsep.c --- usr.sbin/syslogd/privsep.c 5 Oct 2014 18:14:01 -0000 1.48 +++ usr.sbin/syslogd/privsep.c 29 Oct 2014 16:53:57 -0000 @@ -317,17 +317,34 @@ priv_init(char *conf, int numeric, int l servname[servname_len - 1] = '\0'; memset(&hints, 0, sizeof(hints)); - if (strcmp(protoname, "udp") == 0) { + switch (strlen(protoname)) { + case 3: hints.ai_family = AF_UNSPEC; - } else if (strcmp(protoname, "udp4") == 0) { - hints.ai_family = AF_INET; - } else if (strcmp(protoname, "udp6") == 0) { - hints.ai_family = AF_INET6; + break; + case 4: + switch (protoname[3]) { + case '4': + hints.ai_family = AF_INET; + break; + case '6': + hints.ai_family = AF_INET6; + break; + default: + errx(1, "bad ip version %s", protoname); + } + break; + default: + errx(1, "bad protocol length %s", protoname); + } + if (strncmp(protoname, "udp", 3) == 0) { + hints.ai_socktype = SOCK_DGRAM; + hints.ai_protocol = IPPROTO_UDP; + } else if (strncmp(protoname, "tcp", 3) == 0) { + hints.ai_socktype = SOCK_STREAM; + hints.ai_protocol = IPPROTO_TCP; } else { errx(1, "unknown protocol %s", protoname); } - hints.ai_socktype = SOCK_DGRAM; - hints.ai_protocol = IPPROTO_UDP; i = getaddrinfo(hostname, servname, &hints, &res0); if (i != 0 || res0 == NULL) { addr_len = 0; Index: usr.sbin/syslogd/syslogd.c =================================================================== RCS file: /data/mirror/openbsd/cvs/src/usr.sbin/syslogd/syslogd.c,v retrieving revision 1.129 diff -u -p -u -p -r1.129 syslogd.c --- usr.sbin/syslogd/syslogd.c 6 Oct 2014 19:36:34 -0000 1.129 +++ usr.sbin/syslogd/syslogd.c 29 Oct 2014 17:44:51 -0000 @@ -50,13 +50,14 @@ * extensive changes by Ralph Campbell * more extensive changes by Eric Allman (again) * memory buffer logging by Damien Miller - * IPv6, libevent by Alexander Bluhm + * IPv6, libevent, sending via TCP by Alexander Bluhm */ #define MAXLINE 1024 /* maximum line length */ #define MIN_MEMBUF (MAXLINE * 4) /* Minimum memory buffer size */ #define MAX_MEMBUF (256 * 1024) /* Maximum memory buffer size */ #define MAX_MEMBUF_NAME 64 /* Max length of membuf log name */ +#define MAX_TCPBUF (256 * 1024) /* Maximum tcp event buffer size */ #define MAXSVLINE 120 /* maximum saved line length */ #define DEFUPRI (LOG_USER|LOG_NOTICE) #define DEFSPRI (LOG_KERN|LOG_CRIT) @@ -132,6 +133,8 @@ struct filed { char f_loghost[1+4+3+1+MAXHOSTNAMELEN+1+NI_MAXSERV]; /* @proto46://[hostname]:servname\0 */ struct sockaddr_storage f_addr; + struct bufferevent *f_bufev; + int f_fd; } f_forw; /* forwarding address */ char f_fname[MAXPATHLEN]; struct { @@ -170,16 +173,17 @@ int repeatinterval[] = { 30, 120, 600 }; #define F_FILE 1 /* regular file */ #define F_TTY 2 /* terminal */ #define F_CONSOLE 3 /* console terminal */ -#define F_FORW 4 /* remote machine */ +#define F_FORWUDP 4 /* remote machine via UDP */ #define F_USERS 5 /* list of users */ #define F_WALL 6 /* everyone logged on */ #define F_MEMBUF 7 /* memory buffer */ #define F_PIPE 8 /* pipe to external program */ +#define F_FORWTCP 9 /* remote machine via TCP */ -char *TypeNames[9] = { +char *TypeNames[] = { "UNUSED", "FILE", "TTY", "CONSOLE", - "FORW", "USERS", "WALL", "MEMBUF", - "PIPE" + "FORWUDP", "USERS", "WALL", "MEMBUF", + "PIPE", "FORWTCP", }; struct filed *Files; @@ -259,6 +263,9 @@ struct event ev_ctlaccept, ev_ctlread, void klog_readcb(int, short, void *); void udp_readcb(int, short, void *); void unix_readcb(int, short, void *); +int tcp_socket(struct filed *); +void tcp_readcb(struct bufferevent *, void *); +void tcp_errorcb(struct bufferevent *, short, void *); void die_signalcb(int, short, void *); void mark_timercb(int, short, void *); void init_signalcb(int, short, void *); @@ -661,6 +668,82 @@ unix_readcb(int fd, short event, void *a logerror("recvfrom unix"); } +int +tcp_socket(struct filed *f) +{ + int s, flags; + char ebuf[100]; + + if ((s = socket(f->f_un.f_forw.f_addr.ss_family, SOCK_STREAM, + IPPROTO_TCP)) == -1) { + snprintf(ebuf, sizeof(ebuf), "socket \"%s\"", + f->f_un.f_forw.f_loghost); + logerror(ebuf); + return (-1); + } + /* Connect must not block the process. */ + if ((flags = fcntl(s, F_GETFL)) == -1 || + fcntl(s, F_SETFL, flags | O_NONBLOCK) == -1) { + snprintf(ebuf, sizeof(ebuf), "fcntl \"%s\" O_NONBLOCK", + f->f_un.f_forw.f_loghost); + logerror(ebuf); + close(s); + return (-1); + } + if (connect(s, (struct sockaddr *)&f->f_un.f_forw.f_addr, + f->f_un.f_forw.f_addr.ss_len) == -1) { + snprintf(ebuf, sizeof(ebuf), "connect \"%s\"", + f->f_un.f_forw.f_loghost); + logerror(ebuf); + close(s); + return (-1); + } + return (s); +} + +void +tcp_readcb(struct bufferevent *bufev, void *arg) +{ + struct filed *f = arg; + + /* + * Drop data received from the forward log server. + */ + dprintf("loghost \"%s\" did send %zu bytes back\n", + f->f_un.f_forw.f_loghost, + EVBUFFER_LENGTH(f->f_un.f_forw.f_bufev->input)); + evbuffer_drain(bufev->input, -1); +} + +void +tcp_errorcb(struct bufferevent *bufev, short event, void *arg) +{ + struct filed *f = arg; + char ebuf[100]; + + if (event & EVBUFFER_EOF) + snprintf(ebuf, sizeof(ebuf), + "syslogd: loghost \"%s\" connection close", + f->f_un.f_forw.f_loghost); + else + snprintf(ebuf, sizeof(ebuf), + "syslogd: loghost \"%s\" connection error: %s", + f->f_un.f_forw.f_loghost, strerror(errno)); + dprintf("%s\n", ebuf); + + close(f->f_un.f_forw.f_fd); + if ((f->f_un.f_forw.f_fd = tcp_socket(f)) == -1) { + /* XXX reconnect later */ + bufferevent_free(bufev); + f->f_type = F_UNUSED; + } else { + /* XXX The messages in the output buffer may be out of sync. */ + bufferevent_setfd(bufev, f->f_un.f_forw.f_fd); + bufferevent_enable(f->f_un.f_forw.f_bufev, EV_READ); + } + logmsg(LOG_SYSLOG|LOG_WARNING, ebuf, LocalHostName, ADDDATE); +} + void usage(void) { @@ -883,7 +966,7 @@ fprintlog(struct filed *f, int flags, ch { struct iovec iov[6]; struct iovec *v; - int fd, l, retryonce; + int l, retryonce; char line[MAXLINE + 1], repbuf[80], greetings[500]; v = iov; @@ -938,19 +1021,8 @@ fprintlog(struct filed *f, int flags, ch dprintf("\n"); break; - case F_FORW: + case F_FORWUDP: dprintf(" %s\n", f->f_un.f_forw.f_loghost); - switch (f->f_un.f_forw.f_addr.ss_family) { - case AF_INET: - fd = fd_udp; - break; - case AF_INET6: - fd = fd_udp6; - break; - default: - fd = -1; - break; - } l = snprintf(line, sizeof(line), "<%d>%.15s %s%s%s", f->f_prevpri, (char *)iov[0].iov_base, IncludeHostname ? LocalHostName : "", @@ -958,7 +1030,7 @@ fprintlog(struct filed *f, int flags, ch (char *)iov[4].iov_base); if (l < 0 || (size_t)l >= sizeof(line)) l = strlen(line); - if (sendto(fd, line, l, 0, + if (sendto(f->f_un.f_forw.f_fd, line, l, 0, (struct sockaddr *)&f->f_un.f_forw.f_addr, f->f_un.f_forw.f_addr.ss_len) != l) { switch (errno) { @@ -977,6 +1049,25 @@ fprintlog(struct filed *f, int flags, ch } break; + case F_FORWTCP: + dprintf(" %s\n", f->f_un.f_forw.f_loghost); + if (EVBUFFER_LENGTH(f->f_un.f_forw.f_bufev->output) >= + MAX_TCPBUF) + break; /* XXX log error message */ + /* + * RFC 6587 3.4.2. Non-Transparent-Framing + * Use \n to split messages for now, will change later. + */ + l = evbuffer_add_printf(f->f_un.f_forw.f_bufev->output, + "<%d>%.15s %s%s%s\n", f->f_prevpri, (char *)iov[0].iov_base, + IncludeHostname ? LocalHostName : "", + IncludeHostname ? " " : "", + (char *)iov[4].iov_base); + if (l < 0) + break; /* XXX log error message */ + bufferevent_enable(f->f_un.f_forw.f_bufev, EV_WRITE); + break; + case F_CONSOLE: if (flags & IGN_CONS) { dprintf(" (ignored)\n"); @@ -1249,7 +1340,8 @@ init(void) case F_PIPE: (void)close(f->f_file); break; - case F_FORW: + case F_FORWUDP: + case F_FORWTCP: /* XXX close and reconnect? */ break; } next = f->f_next; @@ -1383,7 +1475,8 @@ init(void) printf("%s", f->f_un.f_fname); break; - case F_FORW: + case F_FORWUDP: + case F_FORWTCP: printf("%s", f->f_un.f_forw.f_loghost); break; @@ -1578,6 +1671,9 @@ cfline(char *line, char *prog) logerror(ebuf); break; } + } else if (strcmp(proto, "tcp") == 0 || + strcmp(proto, "tcp4") == 0 || strcmp(proto, "tcp6") == 0) { + ; } else { snprintf(ebuf, sizeof(ebuf), "bad protocol \"%s\"", f->f_un.f_forw.f_loghost); @@ -1606,7 +1702,35 @@ cfline(char *line, char *prog) logerror(ebuf); break; } - f->f_type = F_FORW; + f->f_un.f_forw.f_fd = -1; + if (strncmp(proto, "udp", 3) == 0) { + switch (f->f_un.f_forw.f_addr.ss_family) { + case AF_INET: + f->f_un.f_forw.f_fd = fd_udp; + break; + case AF_INET6: + f->f_un.f_forw.f_fd = fd_udp6; + break; + } + f->f_type = F_FORWUDP; + } else if (strncmp(proto, "tcp", 3) == 0) { + int s; + + if ((s = tcp_socket(f)) == -1) + break; + if ((f->f_un.f_forw.f_bufev = bufferevent_new(s, + tcp_readcb, NULL, tcp_errorcb, f)) == NULL) { + snprintf(ebuf, sizeof(ebuf), + "bufferevent \"%s\"", + f->f_un.f_forw.f_loghost); + logerror(ebuf); + close(s); + break; + } + bufferevent_enable(f->f_un.f_forw.f_bufev, EV_READ); + f->f_un.f_forw.f_fd = s; + f->f_type = F_FORWTCP; + } break; case '/': @@ -1920,7 +2044,7 @@ ctlsock_acceptcb(int fd, short event, vo if ((flags = fcntl(fd, F_GETFL)) == -1 || fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) { - logerror("fcntl ctlconn"); + logerror("fcntl ctlconn O_NONBLOCK"); close(fd); return; }