Hi,

I have implemented a first version of syslogd that can send messages
via TCP streams.  Some error handling is missing like reconnecting
after the remote loghost has closed the connection.

I would like to get this in and develop the missing parts in tree.

ok?

bluhm

Index: usr.sbin/syslogd/privsep.c
===================================================================
RCS file: /data/mirror/openbsd/cvs/src/usr.sbin/syslogd/privsep.c,v
retrieving revision 1.48
diff -u -p -u -p -r1.48 privsep.c
--- usr.sbin/syslogd/privsep.c  5 Oct 2014 18:14:01 -0000       1.48
+++ usr.sbin/syslogd/privsep.c  29 Oct 2014 16:53:57 -0000
@@ -317,17 +317,34 @@ priv_init(char *conf, int numeric, int l
                        servname[servname_len - 1] = '\0';
 
                        memset(&hints, 0, sizeof(hints));
-                       if (strcmp(protoname, "udp") == 0) {
+                       switch (strlen(protoname)) {
+                       case 3:
                                hints.ai_family = AF_UNSPEC;
-                       } else if (strcmp(protoname, "udp4") == 0) {
-                               hints.ai_family = AF_INET;
-                       } else if (strcmp(protoname, "udp6") == 0) {
-                               hints.ai_family = AF_INET6;
+                               break;
+                       case 4:
+                               switch (protoname[3]) {
+                               case '4':
+                                       hints.ai_family = AF_INET;
+                                       break;
+                               case '6':
+                                       hints.ai_family = AF_INET6;
+                                       break;
+                               default:
+                                       errx(1, "bad ip version %s", protoname);
+                               }
+                               break;
+                       default:
+                               errx(1, "bad protocol length %s", protoname);
+                       }
+                       if (strncmp(protoname, "udp", 3) == 0) {
+                               hints.ai_socktype = SOCK_DGRAM;
+                               hints.ai_protocol = IPPROTO_UDP;
+                       } else if (strncmp(protoname, "tcp", 3) == 0) {
+                               hints.ai_socktype = SOCK_STREAM;
+                               hints.ai_protocol = IPPROTO_TCP;
                        } else {
                                errx(1, "unknown protocol %s", protoname);
                        }
-                       hints.ai_socktype = SOCK_DGRAM;
-                       hints.ai_protocol = IPPROTO_UDP;
                        i = getaddrinfo(hostname, servname, &hints, &res0);
                        if (i != 0 || res0 == NULL) {
                                addr_len = 0;
Index: usr.sbin/syslogd/syslogd.c
===================================================================
RCS file: /data/mirror/openbsd/cvs/src/usr.sbin/syslogd/syslogd.c,v
retrieving revision 1.129
diff -u -p -u -p -r1.129 syslogd.c
--- usr.sbin/syslogd/syslogd.c  6 Oct 2014 19:36:34 -0000       1.129
+++ usr.sbin/syslogd/syslogd.c  29 Oct 2014 17:44:51 -0000
@@ -50,13 +50,14 @@
  * extensive changes by Ralph Campbell
  * more extensive changes by Eric Allman (again)
  * memory buffer logging by Damien Miller
- * IPv6, libevent by Alexander Bluhm
+ * IPv6, libevent, sending via TCP by Alexander Bluhm
  */
 
 #define        MAXLINE         1024            /* maximum line length */
 #define MIN_MEMBUF     (MAXLINE * 4)   /* Minimum memory buffer size */
 #define MAX_MEMBUF     (256 * 1024)    /* Maximum memory buffer size */
 #define MAX_MEMBUF_NAME        64              /* Max length of membuf log 
name */
+#define MAX_TCPBUF     (256 * 1024)    /* Maximum tcp event buffer size */
 #define        MAXSVLINE       120             /* maximum saved line length */
 #define DEFUPRI                (LOG_USER|LOG_NOTICE)
 #define DEFSPRI                (LOG_KERN|LOG_CRIT)
@@ -132,6 +133,8 @@ struct filed {
                        char    f_loghost[1+4+3+1+MAXHOSTNAMELEN+1+NI_MAXSERV];
                                /* @proto46://[hostname]:servname\0 */
                        struct sockaddr_storage f_addr;
+                       struct bufferevent      *f_bufev;
+                       int     f_fd;
                } f_forw;               /* forwarding address */
                char    f_fname[MAXPATHLEN];
                struct {
@@ -170,16 +173,17 @@ int       repeatinterval[] = { 30, 120, 600 };
 #define F_FILE         1               /* regular file */
 #define F_TTY          2               /* terminal */
 #define F_CONSOLE      3               /* console terminal */
-#define F_FORW         4               /* remote machine */
+#define F_FORWUDP      4               /* remote machine via UDP */
 #define F_USERS                5               /* list of users */
 #define F_WALL         6               /* everyone logged on */
 #define F_MEMBUF       7               /* memory buffer */
 #define F_PIPE         8               /* pipe to external program */
+#define F_FORWTCP      9               /* remote machine via TCP */
 
-char   *TypeNames[9] = {
+char   *TypeNames[] = {
        "UNUSED",       "FILE",         "TTY",          "CONSOLE",
-       "FORW",         "USERS",        "WALL",         "MEMBUF",
-       "PIPE"
+       "FORWUDP",      "USERS",        "WALL",         "MEMBUF",
+       "PIPE",         "FORWTCP",
 };
 
 struct filed *Files;
@@ -259,6 +263,9 @@ struct event         ev_ctlaccept, ev_ctlread, 
 void    klog_readcb(int, short, void *);
 void    udp_readcb(int, short, void *);
 void    unix_readcb(int, short, void *);
+int     tcp_socket(struct filed *);
+void    tcp_readcb(struct bufferevent *, void *);
+void    tcp_errorcb(struct bufferevent *, short, void *);
 void    die_signalcb(int, short, void *);
 void    mark_timercb(int, short, void *);
 void    init_signalcb(int, short, void *);
@@ -661,6 +668,82 @@ unix_readcb(int fd, short event, void *a
                logerror("recvfrom unix");
 }
 
+int
+tcp_socket(struct filed *f)
+{
+       int      s, flags;
+       char     ebuf[100];
+
+       if ((s = socket(f->f_un.f_forw.f_addr.ss_family, SOCK_STREAM,
+           IPPROTO_TCP)) == -1) {
+               snprintf(ebuf, sizeof(ebuf), "socket \"%s\"",
+                   f->f_un.f_forw.f_loghost);
+               logerror(ebuf);
+               return (-1);
+       }
+       /* Connect must not block the process. */
+       if ((flags = fcntl(s, F_GETFL)) == -1 ||
+           fcntl(s, F_SETFL, flags | O_NONBLOCK) == -1) {
+               snprintf(ebuf, sizeof(ebuf), "fcntl \"%s\" O_NONBLOCK",
+                   f->f_un.f_forw.f_loghost);
+               logerror(ebuf);
+               close(s);
+               return (-1);
+       }
+       if (connect(s, (struct sockaddr *)&f->f_un.f_forw.f_addr,
+           f->f_un.f_forw.f_addr.ss_len) == -1) {
+               snprintf(ebuf, sizeof(ebuf), "connect \"%s\"",
+                   f->f_un.f_forw.f_loghost);
+               logerror(ebuf);
+               close(s);
+               return (-1);
+       }
+       return (s);
+}
+
+void
+tcp_readcb(struct bufferevent *bufev, void *arg)
+{
+       struct filed    *f = arg;
+
+       /*
+        * Drop data received from the forward log server.
+        */
+       dprintf("loghost \"%s\" did send %zu bytes back\n",
+           f->f_un.f_forw.f_loghost,
+           EVBUFFER_LENGTH(f->f_un.f_forw.f_bufev->input));
+       evbuffer_drain(bufev->input, -1);
+}
+
+void
+tcp_errorcb(struct bufferevent *bufev, short event, void *arg)
+{
+       struct filed    *f = arg;
+       char             ebuf[100];
+
+       if (event & EVBUFFER_EOF)
+               snprintf(ebuf, sizeof(ebuf),
+                   "syslogd: loghost \"%s\" connection close",
+                   f->f_un.f_forw.f_loghost);
+       else
+               snprintf(ebuf, sizeof(ebuf),
+                   "syslogd: loghost \"%s\" connection error: %s",
+                   f->f_un.f_forw.f_loghost, strerror(errno));
+       dprintf("%s\n", ebuf);
+
+       close(f->f_un.f_forw.f_fd);
+       if ((f->f_un.f_forw.f_fd = tcp_socket(f)) == -1) {
+               /* XXX reconnect later */
+               bufferevent_free(bufev);
+               f->f_type = F_UNUSED;
+       } else {
+               /* XXX The messages in the output buffer may be out of sync. */
+               bufferevent_setfd(bufev, f->f_un.f_forw.f_fd);
+               bufferevent_enable(f->f_un.f_forw.f_bufev, EV_READ);
+       }
+       logmsg(LOG_SYSLOG|LOG_WARNING, ebuf, LocalHostName, ADDDATE);
+}
+
 void
 usage(void)
 {
@@ -883,7 +966,7 @@ fprintlog(struct filed *f, int flags, ch
 {
        struct iovec iov[6];
        struct iovec *v;
-       int fd, l, retryonce;
+       int l, retryonce;
        char line[MAXLINE + 1], repbuf[80], greetings[500];
 
        v = iov;
@@ -938,19 +1021,8 @@ fprintlog(struct filed *f, int flags, ch
                dprintf("\n");
                break;
 
-       case F_FORW:
+       case F_FORWUDP:
                dprintf(" %s\n", f->f_un.f_forw.f_loghost);
-               switch (f->f_un.f_forw.f_addr.ss_family) {
-               case AF_INET:
-                       fd = fd_udp;
-                       break;
-               case AF_INET6:
-                       fd = fd_udp6;
-                       break;
-               default:
-                       fd = -1;
-                       break;
-               }
                l = snprintf(line, sizeof(line), "<%d>%.15s %s%s%s",
                    f->f_prevpri, (char *)iov[0].iov_base,
                    IncludeHostname ? LocalHostName : "",
@@ -958,7 +1030,7 @@ fprintlog(struct filed *f, int flags, ch
                    (char *)iov[4].iov_base);
                if (l < 0 || (size_t)l >= sizeof(line))
                        l = strlen(line);
-               if (sendto(fd, line, l, 0,
+               if (sendto(f->f_un.f_forw.f_fd, line, l, 0,
                    (struct sockaddr *)&f->f_un.f_forw.f_addr,
                    f->f_un.f_forw.f_addr.ss_len) != l) {
                        switch (errno) {
@@ -977,6 +1049,25 @@ fprintlog(struct filed *f, int flags, ch
                }
                break;
 
+       case F_FORWTCP:
+               dprintf(" %s\n", f->f_un.f_forw.f_loghost);
+               if (EVBUFFER_LENGTH(f->f_un.f_forw.f_bufev->output) >=
+                   MAX_TCPBUF)
+                       break;  /* XXX log error message */
+               /*
+                * RFC 6587  3.4.2.  Non-Transparent-Framing
+                * Use \n to split messages for now, will change later.
+                */
+               l = evbuffer_add_printf(f->f_un.f_forw.f_bufev->output,
+                   "<%d>%.15s %s%s%s\n", f->f_prevpri, (char *)iov[0].iov_base,
+                   IncludeHostname ? LocalHostName : "",
+                   IncludeHostname ? " " : "",
+                   (char *)iov[4].iov_base);
+               if (l < 0)
+                       break;  /* XXX log error message */
+               bufferevent_enable(f->f_un.f_forw.f_bufev, EV_WRITE);
+               break;
+
        case F_CONSOLE:
                if (flags & IGN_CONS) {
                        dprintf(" (ignored)\n");
@@ -1249,7 +1340,8 @@ init(void)
                case F_PIPE:
                        (void)close(f->f_file);
                        break;
-               case F_FORW:
+               case F_FORWUDP:
+               case F_FORWTCP:  /* XXX close and reconnect? */
                        break;
                }
                next = f->f_next;
@@ -1383,7 +1475,8 @@ init(void)
                                printf("%s", f->f_un.f_fname);
                                break;
 
-                       case F_FORW:
+                       case F_FORWUDP:
+                       case F_FORWTCP:
                                printf("%s", f->f_un.f_forw.f_loghost);
                                break;
 
@@ -1578,6 +1671,9 @@ cfline(char *line, char *prog)
                                logerror(ebuf);
                                break;
                        }
+               } else if (strcmp(proto, "tcp") == 0 ||
+                   strcmp(proto, "tcp4") == 0 || strcmp(proto, "tcp6") == 0) {
+                       ;
                } else {
                        snprintf(ebuf, sizeof(ebuf), "bad protocol \"%s\"",
                            f->f_un.f_forw.f_loghost);
@@ -1606,7 +1702,35 @@ cfline(char *line, char *prog)
                        logerror(ebuf);
                        break;
                }
-               f->f_type = F_FORW;
+               f->f_un.f_forw.f_fd = -1;
+               if (strncmp(proto, "udp", 3) == 0) {
+                       switch (f->f_un.f_forw.f_addr.ss_family) {
+                       case AF_INET:
+                               f->f_un.f_forw.f_fd = fd_udp;
+                               break;
+                       case AF_INET6:
+                               f->f_un.f_forw.f_fd = fd_udp6;
+                               break;
+                       }
+                       f->f_type = F_FORWUDP;
+               } else if (strncmp(proto, "tcp", 3) == 0) {
+                       int s;
+
+                       if ((s = tcp_socket(f)) == -1)
+                               break;
+                       if ((f->f_un.f_forw.f_bufev = bufferevent_new(s,
+                           tcp_readcb, NULL, tcp_errorcb, f)) == NULL) {
+                               snprintf(ebuf, sizeof(ebuf),
+                                   "bufferevent \"%s\"",
+                                   f->f_un.f_forw.f_loghost);
+                               logerror(ebuf);
+                               close(s);
+                               break;
+                       }
+                       bufferevent_enable(f->f_un.f_forw.f_bufev, EV_READ);
+                       f->f_un.f_forw.f_fd = s;
+                       f->f_type = F_FORWTCP;
+               }
                break;
 
        case '/':
@@ -1920,7 +2044,7 @@ ctlsock_acceptcb(int fd, short event, vo
 
        if ((flags = fcntl(fd, F_GETFL)) == -1 ||
            fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {
-               logerror("fcntl ctlconn");
+               logerror("fcntl ctlconn O_NONBLOCK");
                close(fd);
                return;
        }

Reply via email to