On Sun, Dec 28, 2014 at 05:33:08PM +0100, Alexander Bluhm wrote: > Jasper tested and found that it only worked on loopback. I have > forgotten to check for EINPROGRESS after connect. So here is a new > diff. > > bluhm Succesfully tested now with a remote logstash host. > Index: privsep.c > =================================================================== > RCS file: /data/mirror/openbsd/cvs/src/usr.sbin/syslogd/privsep.c,v > retrieving revision 1.48 > diff -u -p -r1.48 privsep.c > --- privsep.c 5 Oct 2014 18:14:01 -0000 1.48 > +++ privsep.c 28 Dec 2014 15:48:51 -0000 > @@ -317,17 +317,34 @@ priv_init(char *conf, int numeric, int l > servname[servname_len - 1] = '\0'; > > memset(&hints, 0, sizeof(hints)); > - if (strcmp(protoname, "udp") == 0) { > + switch (strlen(protoname)) { > + case 3: > hints.ai_family = AF_UNSPEC; > - } else if (strcmp(protoname, "udp4") == 0) { > - hints.ai_family = AF_INET; > - } else if (strcmp(protoname, "udp6") == 0) { > - hints.ai_family = AF_INET6; > + break; > + case 4: > + switch (protoname[3]) { > + case '4': > + hints.ai_family = AF_INET; > + break; > + case '6': > + hints.ai_family = AF_INET6; > + break; > + default: > + errx(1, "bad ip version %s", protoname); > + } > + break; > + default: > + errx(1, "bad protocol length %s", protoname); > + } > + if (strncmp(protoname, "udp", 3) == 0) { > + hints.ai_socktype = SOCK_DGRAM; > + hints.ai_protocol = IPPROTO_UDP; > + } else if (strncmp(protoname, "tcp", 3) == 0) { > + hints.ai_socktype = SOCK_STREAM; > + hints.ai_protocol = IPPROTO_TCP; > } else { > errx(1, "unknown protocol %s", protoname); > } > - hints.ai_socktype = SOCK_DGRAM; > - hints.ai_protocol = IPPROTO_UDP; > i = getaddrinfo(hostname, servname, &hints, &res0); > if (i != 0 || res0 == NULL) { > addr_len = 0; > Index: syslogd.c > =================================================================== > RCS file: /data/mirror/openbsd/cvs/src/usr.sbin/syslogd/syslogd.c,v > retrieving revision 1.136 > diff -u -p -r1.136 syslogd.c > --- syslogd.c 10 Dec 2014 19:42:14 -0000 1.136 > +++ syslogd.c 28 Dec 2014 16:25:55 -0000 > @@ -50,13 +50,14 @@ > * extensive changes by Ralph Campbell > * more extensive changes by Eric Allman (again) > * memory buffer logging by Damien Miller > - * IPv6, libevent by Alexander Bluhm > + * IPv6, libevent, sending via TCP by Alexander Bluhm > */ > > #define MAXLINE 1024 /* maximum line length */ > #define MIN_MEMBUF (MAXLINE * 4) /* Minimum memory buffer size */ > #define MAX_MEMBUF (256 * 1024) /* Maximum memory buffer size */ > #define MAX_MEMBUF_NAME 64 /* Max length of membuf log > name */ > +#define MAX_TCPBUF (256 * 1024) /* Maximum tcp event buffer size */ > #define MAXSVLINE 120 /* maximum saved line length */ > #define DEFUPRI (LOG_USER|LOG_NOTICE) > #define DEFSPRI (LOG_KERN|LOG_CRIT) > @@ -132,6 +133,8 @@ struct filed { > char f_loghost[1+4+3+1+MAXHOSTNAMELEN+1+NI_MAXSERV]; > /* @proto46://[hostname]:servname\0 */ > struct sockaddr_storage f_addr; > + struct bufferevent *f_bufev; > + int f_fd; > } f_forw; /* forwarding address */ > char f_fname[MAXPATHLEN]; > struct { > @@ -170,16 +173,17 @@ int repeatinterval[] = { 30, 120, 600 }; > #define F_FILE 1 /* regular file */ > #define F_TTY 2 /* terminal */ > #define F_CONSOLE 3 /* console terminal */ > -#define F_FORW 4 /* remote machine */ > +#define F_FORWUDP 4 /* remote machine via UDP */ > #define F_USERS 5 /* list of users */ > #define F_WALL 6 /* everyone logged on */ > #define F_MEMBUF 7 /* memory buffer */ > #define F_PIPE 8 /* pipe to external program */ > +#define F_FORWTCP 9 /* remote machine via TCP */ > > -char *TypeNames[9] = { > +char *TypeNames[] = { > "UNUSED", "FILE", "TTY", "CONSOLE", > - "FORW", "USERS", "WALL", "MEMBUF", > - "PIPE" > + "FORWUDP", "USERS", "WALL", "MEMBUF", > + "PIPE", "FORWTCP", > }; > > struct filed *Files; > @@ -259,6 +263,9 @@ struct event ev_ctlaccept, ev_ctlread, > void klog_readcb(int, short, void *); > void udp_readcb(int, short, void *); > void unix_readcb(int, short, void *); > +int tcp_socket(struct filed *); > +void tcp_readcb(struct bufferevent *, void *); > +void tcp_errorcb(struct bufferevent *, short, void *); > void die_signalcb(int, short, void *); > void mark_timercb(int, short, void *); > void init_signalcb(int, short, void *); > @@ -661,6 +668,82 @@ unix_readcb(int fd, short event, void *a > logerror("recvfrom unix"); > } > > +int > +tcp_socket(struct filed *f) > +{ > + int s, flags; > + char ebuf[100]; > + > + if ((s = socket(f->f_un.f_forw.f_addr.ss_family, SOCK_STREAM, > + IPPROTO_TCP)) == -1) { > + snprintf(ebuf, sizeof(ebuf), "socket \"%s\"", > + f->f_un.f_forw.f_loghost); > + logerror(ebuf); > + return (-1); > + } > + /* Connect must not block the process. */ > + if ((flags = fcntl(s, F_GETFL)) == -1 || > + fcntl(s, F_SETFL, flags | O_NONBLOCK) == -1) { > + snprintf(ebuf, sizeof(ebuf), "fcntl \"%s\" O_NONBLOCK", > + f->f_un.f_forw.f_loghost); > + logerror(ebuf); > + close(s); > + return (-1); > + } > + if (connect(s, (struct sockaddr *)&f->f_un.f_forw.f_addr, > + f->f_un.f_forw.f_addr.ss_len) == -1 && errno != EINPROGRESS) { > + snprintf(ebuf, sizeof(ebuf), "connect \"%s\"", > + f->f_un.f_forw.f_loghost); > + logerror(ebuf); > + close(s); > + return (-1); > + } > + return (s); > +} > + > +void > +tcp_readcb(struct bufferevent *bufev, void *arg) > +{ > + struct filed *f = arg; > + > + /* > + * Drop data received from the forward log server. > + */ > + dprintf("loghost \"%s\" did send %zu bytes back\n", > + f->f_un.f_forw.f_loghost, > + EVBUFFER_LENGTH(f->f_un.f_forw.f_bufev->input)); > + evbuffer_drain(bufev->input, -1); > +} > + > +void > +tcp_errorcb(struct bufferevent *bufev, short event, void *arg) > +{ > + struct filed *f = arg; > + char ebuf[100]; > + > + if (event & EVBUFFER_EOF) > + snprintf(ebuf, sizeof(ebuf), > + "syslogd: loghost \"%s\" connection close", > + f->f_un.f_forw.f_loghost); > + else > + snprintf(ebuf, sizeof(ebuf), > + "syslogd: loghost \"%s\" connection error: %s", > + f->f_un.f_forw.f_loghost, strerror(errno)); > + dprintf("%s\n", ebuf); > + > + close(f->f_un.f_forw.f_fd); > + if ((f->f_un.f_forw.f_fd = tcp_socket(f)) == -1) { > + /* XXX reconnect later */ > + bufferevent_free(bufev); > + f->f_type = F_UNUSED; > + } else { > + /* XXX The messages in the output buffer may be out of sync. */ > + bufferevent_setfd(bufev, f->f_un.f_forw.f_fd); > + bufferevent_enable(f->f_un.f_forw.f_bufev, EV_READ); > + } > + logmsg(LOG_SYSLOG|LOG_WARNING, ebuf, LocalHostName, ADDDATE); > +} > + > void > usage(void) > { > @@ -883,7 +966,7 @@ fprintlog(struct filed *f, int flags, ch > { > struct iovec iov[6]; > struct iovec *v; > - int fd, l, retryonce; > + int l, retryonce; > char line[MAXLINE + 1], repbuf[80], greetings[500]; > > v = iov; > @@ -938,19 +1021,8 @@ fprintlog(struct filed *f, int flags, ch > dprintf("\n"); > break; > > - case F_FORW: > + case F_FORWUDP: > dprintf(" %s\n", f->f_un.f_forw.f_loghost); > - switch (f->f_un.f_forw.f_addr.ss_family) { > - case AF_INET: > - fd = fd_udp; > - break; > - case AF_INET6: > - fd = fd_udp6; > - break; > - default: > - fd = -1; > - break; > - } > l = snprintf(line, sizeof(line), "<%d>%.15s %s%s%s", > f->f_prevpri, (char *)iov[0].iov_base, > IncludeHostname ? LocalHostName : "", > @@ -958,7 +1030,7 @@ fprintlog(struct filed *f, int flags, ch > (char *)iov[4].iov_base); > if (l < 0 || (size_t)l >= sizeof(line)) > l = strlen(line); > - if (sendto(fd, line, l, 0, > + if (sendto(f->f_un.f_forw.f_fd, line, l, 0, > (struct sockaddr *)&f->f_un.f_forw.f_addr, > f->f_un.f_forw.f_addr.ss_len) != l) { > switch (errno) { > @@ -977,6 +1049,26 @@ fprintlog(struct filed *f, int flags, ch > } > break; > > + case F_FORWTCP: > + dprintf(" %s\n", f->f_un.f_forw.f_loghost); > + if (EVBUFFER_LENGTH(f->f_un.f_forw.f_bufev->output) >= > + MAX_TCPBUF) > + break; /* XXX log error message */ > + /* > + * RFC 6587 3.4.2. Non-Transparent-Framing > + * Use \n to split messages for now. > + * 3.4.1. Octet Counting might be implemented later. > + */ > + l = evbuffer_add_printf(f->f_un.f_forw.f_bufev->output, > + "<%d>%.15s %s%s%s\n", f->f_prevpri, (char *)iov[0].iov_base, > + IncludeHostname ? LocalHostName : "", > + IncludeHostname ? " " : "", > + (char *)iov[4].iov_base); > + if (l < 0) > + break; /* XXX log error message */ > + bufferevent_enable(f->f_un.f_forw.f_bufev, EV_WRITE); > + break; > + > case F_CONSOLE: > if (flags & IGN_CONS) { > dprintf(" (ignored)\n"); > @@ -1249,7 +1341,8 @@ init(void) > case F_PIPE: > (void)close(f->f_file); > break; > - case F_FORW: > + case F_FORWUDP: > + case F_FORWTCP: /* XXX close and reconnect? */ > break; > } > next = f->f_next; > @@ -1383,7 +1476,8 @@ init(void) > printf("%s", f->f_un.f_fname); > break; > > - case F_FORW: > + case F_FORWUDP: > + case F_FORWTCP: > printf("%s", f->f_un.f_forw.f_loghost); > break; > > @@ -1575,6 +1669,9 @@ cfline(char *line, char *prog) > logerror(ebuf); > break; > } > + } else if (strcmp(proto, "tcp") == 0 || > + strcmp(proto, "tcp4") == 0 || strcmp(proto, "tcp6") == 0) { > + ; > } else { > snprintf(ebuf, sizeof(ebuf), "bad protocol \"%s\"", > f->f_un.f_forw.f_loghost); > @@ -1603,7 +1700,35 @@ cfline(char *line, char *prog) > logerror(ebuf); > break; > } > - f->f_type = F_FORW; > + f->f_un.f_forw.f_fd = -1; > + if (strncmp(proto, "udp", 3) == 0) { > + switch (f->f_un.f_forw.f_addr.ss_family) { > + case AF_INET: > + f->f_un.f_forw.f_fd = fd_udp; > + break; > + case AF_INET6: > + f->f_un.f_forw.f_fd = fd_udp6; > + break; > + } > + f->f_type = F_FORWUDP; > + } else if (strncmp(proto, "tcp", 3) == 0) { > + int s; > + > + if ((s = tcp_socket(f)) == -1) > + break; > + if ((f->f_un.f_forw.f_bufev = bufferevent_new(s, > + tcp_readcb, NULL, tcp_errorcb, f)) == NULL) { > + snprintf(ebuf, sizeof(ebuf), > + "bufferevent \"%s\"", > + f->f_un.f_forw.f_loghost); > + logerror(ebuf); > + close(s); > + break; > + } > + bufferevent_enable(f->f_un.f_forw.f_bufev, EV_READ); > + f->f_un.f_forw.f_fd = s; > + f->f_type = F_FORWTCP; > + } > break; > > case '/': > @@ -1914,7 +2039,7 @@ ctlsock_acceptcb(int fd, short event, vo > > if ((flags = fcntl(fd, F_GETFL)) == -1 || > fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) { > - logerror("fcntl ctlconn"); > + logerror("fcntl ctlconn O_NONBLOCK"); > close(fd); > return; > } >
-- jasper