On Sun, Dec 28, 2014 at 05:33:08PM +0100, Alexander Bluhm wrote:
> Jasper tested and found that it only worked on loopback.  I have
> forgotten to check for EINPROGRESS after connect.  So here is a new
> diff.
> 
> bluhm
Succesfully tested now with a remote logstash host.
 
> Index: privsep.c
> ===================================================================
> RCS file: /data/mirror/openbsd/cvs/src/usr.sbin/syslogd/privsep.c,v
> retrieving revision 1.48
> diff -u -p -r1.48 privsep.c
> --- privsep.c 5 Oct 2014 18:14:01 -0000       1.48
> +++ privsep.c 28 Dec 2014 15:48:51 -0000
> @@ -317,17 +317,34 @@ priv_init(char *conf, int numeric, int l
>                       servname[servname_len - 1] = '\0';
>  
>                       memset(&hints, 0, sizeof(hints));
> -                     if (strcmp(protoname, "udp") == 0) {
> +                     switch (strlen(protoname)) {
> +                     case 3:
>                               hints.ai_family = AF_UNSPEC;
> -                     } else if (strcmp(protoname, "udp4") == 0) {
> -                             hints.ai_family = AF_INET;
> -                     } else if (strcmp(protoname, "udp6") == 0) {
> -                             hints.ai_family = AF_INET6;
> +                             break;
> +                     case 4:
> +                             switch (protoname[3]) {
> +                             case '4':
> +                                     hints.ai_family = AF_INET;
> +                                     break;
> +                             case '6':
> +                                     hints.ai_family = AF_INET6;
> +                                     break;
> +                             default:
> +                                     errx(1, "bad ip version %s", protoname);
> +                             }
> +                             break;
> +                     default:
> +                             errx(1, "bad protocol length %s", protoname);
> +                     }
> +                     if (strncmp(protoname, "udp", 3) == 0) {
> +                             hints.ai_socktype = SOCK_DGRAM;
> +                             hints.ai_protocol = IPPROTO_UDP;
> +                     } else if (strncmp(protoname, "tcp", 3) == 0) {
> +                             hints.ai_socktype = SOCK_STREAM;
> +                             hints.ai_protocol = IPPROTO_TCP;
>                       } else {
>                               errx(1, "unknown protocol %s", protoname);
>                       }
> -                     hints.ai_socktype = SOCK_DGRAM;
> -                     hints.ai_protocol = IPPROTO_UDP;
>                       i = getaddrinfo(hostname, servname, &hints, &res0);
>                       if (i != 0 || res0 == NULL) {
>                               addr_len = 0;
> Index: syslogd.c
> ===================================================================
> RCS file: /data/mirror/openbsd/cvs/src/usr.sbin/syslogd/syslogd.c,v
> retrieving revision 1.136
> diff -u -p -r1.136 syslogd.c
> --- syslogd.c 10 Dec 2014 19:42:14 -0000      1.136
> +++ syslogd.c 28 Dec 2014 16:25:55 -0000
> @@ -50,13 +50,14 @@
>   * extensive changes by Ralph Campbell
>   * more extensive changes by Eric Allman (again)
>   * memory buffer logging by Damien Miller
> - * IPv6, libevent by Alexander Bluhm
> + * IPv6, libevent, sending via TCP by Alexander Bluhm
>   */
>  
>  #define      MAXLINE         1024            /* maximum line length */
>  #define MIN_MEMBUF   (MAXLINE * 4)   /* Minimum memory buffer size */
>  #define MAX_MEMBUF   (256 * 1024)    /* Maximum memory buffer size */
>  #define MAX_MEMBUF_NAME      64              /* Max length of membuf log 
> name */
> +#define MAX_TCPBUF   (256 * 1024)    /* Maximum tcp event buffer size */
>  #define      MAXSVLINE       120             /* maximum saved line length */
>  #define DEFUPRI              (LOG_USER|LOG_NOTICE)
>  #define DEFSPRI              (LOG_KERN|LOG_CRIT)
> @@ -132,6 +133,8 @@ struct filed {
>                       char    f_loghost[1+4+3+1+MAXHOSTNAMELEN+1+NI_MAXSERV];
>                               /* @proto46://[hostname]:servname\0 */
>                       struct sockaddr_storage f_addr;
> +                     struct bufferevent      *f_bufev;
> +                     int     f_fd;
>               } f_forw;               /* forwarding address */
>               char    f_fname[MAXPATHLEN];
>               struct {
> @@ -170,16 +173,17 @@ int     repeatinterval[] = { 30, 120, 600 };
>  #define F_FILE               1               /* regular file */
>  #define F_TTY                2               /* terminal */
>  #define F_CONSOLE    3               /* console terminal */
> -#define F_FORW               4               /* remote machine */
> +#define F_FORWUDP    4               /* remote machine via UDP */
>  #define F_USERS              5               /* list of users */
>  #define F_WALL               6               /* everyone logged on */
>  #define F_MEMBUF     7               /* memory buffer */
>  #define F_PIPE               8               /* pipe to external program */
> +#define F_FORWTCP    9               /* remote machine via TCP */
>  
> -char *TypeNames[9] = {
> +char *TypeNames[] = {
>       "UNUSED",       "FILE",         "TTY",          "CONSOLE",
> -     "FORW",         "USERS",        "WALL",         "MEMBUF",
> -     "PIPE"
> +     "FORWUDP",      "USERS",        "WALL",         "MEMBUF",
> +     "PIPE",         "FORWTCP",
>  };
>  
>  struct       filed *Files;
> @@ -259,6 +263,9 @@ struct event       ev_ctlaccept, ev_ctlread, 
>  void  klog_readcb(int, short, void *);
>  void  udp_readcb(int, short, void *);
>  void  unix_readcb(int, short, void *);
> +int   tcp_socket(struct filed *);
> +void  tcp_readcb(struct bufferevent *, void *);
> +void  tcp_errorcb(struct bufferevent *, short, void *);
>  void  die_signalcb(int, short, void *);
>  void  mark_timercb(int, short, void *);
>  void  init_signalcb(int, short, void *);
> @@ -661,6 +668,82 @@ unix_readcb(int fd, short event, void *a
>               logerror("recvfrom unix");
>  }
>  
> +int
> +tcp_socket(struct filed *f)
> +{
> +     int      s, flags;
> +     char     ebuf[100];
> +
> +     if ((s = socket(f->f_un.f_forw.f_addr.ss_family, SOCK_STREAM,
> +         IPPROTO_TCP)) == -1) {
> +             snprintf(ebuf, sizeof(ebuf), "socket \"%s\"",
> +                 f->f_un.f_forw.f_loghost);
> +             logerror(ebuf);
> +             return (-1);
> +     }
> +     /* Connect must not block the process. */
> +     if ((flags = fcntl(s, F_GETFL)) == -1 ||
> +         fcntl(s, F_SETFL, flags | O_NONBLOCK) == -1) {
> +             snprintf(ebuf, sizeof(ebuf), "fcntl \"%s\" O_NONBLOCK",
> +                 f->f_un.f_forw.f_loghost);
> +             logerror(ebuf);
> +             close(s);
> +             return (-1);
> +     }
> +     if (connect(s, (struct sockaddr *)&f->f_un.f_forw.f_addr,
> +         f->f_un.f_forw.f_addr.ss_len) == -1 && errno != EINPROGRESS) {
> +             snprintf(ebuf, sizeof(ebuf), "connect \"%s\"",
> +                 f->f_un.f_forw.f_loghost);
> +             logerror(ebuf);
> +             close(s);
> +             return (-1);
> +     }
> +     return (s);
> +}
> +
> +void
> +tcp_readcb(struct bufferevent *bufev, void *arg)
> +{
> +     struct filed    *f = arg;
> +
> +     /*
> +      * Drop data received from the forward log server.
> +      */
> +     dprintf("loghost \"%s\" did send %zu bytes back\n",
> +         f->f_un.f_forw.f_loghost,
> +         EVBUFFER_LENGTH(f->f_un.f_forw.f_bufev->input));
> +     evbuffer_drain(bufev->input, -1);
> +}
> +
> +void
> +tcp_errorcb(struct bufferevent *bufev, short event, void *arg)
> +{
> +     struct filed    *f = arg;
> +     char             ebuf[100];
> +
> +     if (event & EVBUFFER_EOF)
> +             snprintf(ebuf, sizeof(ebuf),
> +                 "syslogd: loghost \"%s\" connection close",
> +                 f->f_un.f_forw.f_loghost);
> +     else
> +             snprintf(ebuf, sizeof(ebuf),
> +                 "syslogd: loghost \"%s\" connection error: %s",
> +                 f->f_un.f_forw.f_loghost, strerror(errno));
> +     dprintf("%s\n", ebuf);
> +
> +     close(f->f_un.f_forw.f_fd);
> +     if ((f->f_un.f_forw.f_fd = tcp_socket(f)) == -1) {
> +             /* XXX reconnect later */
> +             bufferevent_free(bufev);
> +             f->f_type = F_UNUSED;
> +     } else {
> +             /* XXX The messages in the output buffer may be out of sync. */
> +             bufferevent_setfd(bufev, f->f_un.f_forw.f_fd);
> +             bufferevent_enable(f->f_un.f_forw.f_bufev, EV_READ);
> +     }
> +     logmsg(LOG_SYSLOG|LOG_WARNING, ebuf, LocalHostName, ADDDATE);
> +}
> +
>  void
>  usage(void)
>  {
> @@ -883,7 +966,7 @@ fprintlog(struct filed *f, int flags, ch
>  {
>       struct iovec iov[6];
>       struct iovec *v;
> -     int fd, l, retryonce;
> +     int l, retryonce;
>       char line[MAXLINE + 1], repbuf[80], greetings[500];
>  
>       v = iov;
> @@ -938,19 +1021,8 @@ fprintlog(struct filed *f, int flags, ch
>               dprintf("\n");
>               break;
>  
> -     case F_FORW:
> +     case F_FORWUDP:
>               dprintf(" %s\n", f->f_un.f_forw.f_loghost);
> -             switch (f->f_un.f_forw.f_addr.ss_family) {
> -             case AF_INET:
> -                     fd = fd_udp;
> -                     break;
> -             case AF_INET6:
> -                     fd = fd_udp6;
> -                     break;
> -             default:
> -                     fd = -1;
> -                     break;
> -             }
>               l = snprintf(line, sizeof(line), "<%d>%.15s %s%s%s",
>                   f->f_prevpri, (char *)iov[0].iov_base,
>                   IncludeHostname ? LocalHostName : "",
> @@ -958,7 +1030,7 @@ fprintlog(struct filed *f, int flags, ch
>                   (char *)iov[4].iov_base);
>               if (l < 0 || (size_t)l >= sizeof(line))
>                       l = strlen(line);
> -             if (sendto(fd, line, l, 0,
> +             if (sendto(f->f_un.f_forw.f_fd, line, l, 0,
>                   (struct sockaddr *)&f->f_un.f_forw.f_addr,
>                   f->f_un.f_forw.f_addr.ss_len) != l) {
>                       switch (errno) {
> @@ -977,6 +1049,26 @@ fprintlog(struct filed *f, int flags, ch
>               }
>               break;
>  
> +     case F_FORWTCP:
> +             dprintf(" %s\n", f->f_un.f_forw.f_loghost);
> +             if (EVBUFFER_LENGTH(f->f_un.f_forw.f_bufev->output) >=
> +                 MAX_TCPBUF)
> +                     break;  /* XXX log error message */
> +             /*
> +              * RFC 6587  3.4.2.  Non-Transparent-Framing
> +              * Use \n to split messages for now.
> +              * 3.4.1.  Octet Counting might be implemented later.
> +              */
> +             l = evbuffer_add_printf(f->f_un.f_forw.f_bufev->output,
> +                 "<%d>%.15s %s%s%s\n", f->f_prevpri, (char *)iov[0].iov_base,
> +                 IncludeHostname ? LocalHostName : "",
> +                 IncludeHostname ? " " : "",
> +                 (char *)iov[4].iov_base);
> +             if (l < 0)
> +                     break;  /* XXX log error message */
> +             bufferevent_enable(f->f_un.f_forw.f_bufev, EV_WRITE);
> +             break;
> +
>       case F_CONSOLE:
>               if (flags & IGN_CONS) {
>                       dprintf(" (ignored)\n");
> @@ -1249,7 +1341,8 @@ init(void)
>               case F_PIPE:
>                       (void)close(f->f_file);
>                       break;
> -             case F_FORW:
> +             case F_FORWUDP:
> +             case F_FORWTCP:  /* XXX close and reconnect? */
>                       break;
>               }
>               next = f->f_next;
> @@ -1383,7 +1476,8 @@ init(void)
>                               printf("%s", f->f_un.f_fname);
>                               break;
>  
> -                     case F_FORW:
> +                     case F_FORWUDP:
> +                     case F_FORWTCP:
>                               printf("%s", f->f_un.f_forw.f_loghost);
>                               break;
>  
> @@ -1575,6 +1669,9 @@ cfline(char *line, char *prog)
>                               logerror(ebuf);
>                               break;
>                       }
> +             } else if (strcmp(proto, "tcp") == 0 ||
> +                 strcmp(proto, "tcp4") == 0 || strcmp(proto, "tcp6") == 0) {
> +                     ;
>               } else {
>                       snprintf(ebuf, sizeof(ebuf), "bad protocol \"%s\"",
>                           f->f_un.f_forw.f_loghost);
> @@ -1603,7 +1700,35 @@ cfline(char *line, char *prog)
>                       logerror(ebuf);
>                       break;
>               }
> -             f->f_type = F_FORW;
> +             f->f_un.f_forw.f_fd = -1;
> +             if (strncmp(proto, "udp", 3) == 0) {
> +                     switch (f->f_un.f_forw.f_addr.ss_family) {
> +                     case AF_INET:
> +                             f->f_un.f_forw.f_fd = fd_udp;
> +                             break;
> +                     case AF_INET6:
> +                             f->f_un.f_forw.f_fd = fd_udp6;
> +                             break;
> +                     }
> +                     f->f_type = F_FORWUDP;
> +             } else if (strncmp(proto, "tcp", 3) == 0) {
> +                     int s;
> +
> +                     if ((s = tcp_socket(f)) == -1)
> +                             break;
> +                     if ((f->f_un.f_forw.f_bufev = bufferevent_new(s,
> +                         tcp_readcb, NULL, tcp_errorcb, f)) == NULL) {
> +                             snprintf(ebuf, sizeof(ebuf),
> +                                 "bufferevent \"%s\"",
> +                                 f->f_un.f_forw.f_loghost);
> +                             logerror(ebuf);
> +                             close(s);
> +                             break;
> +                     }
> +                     bufferevent_enable(f->f_un.f_forw.f_bufev, EV_READ);
> +                     f->f_un.f_forw.f_fd = s;
> +                     f->f_type = F_FORWTCP;
> +             }
>               break;
>  
>       case '/':
> @@ -1914,7 +2039,7 @@ ctlsock_acceptcb(int fd, short event, vo
>  
>       if ((flags = fcntl(fd, F_GETFL)) == -1 ||
>           fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {
> -             logerror("fcntl ctlconn");
> +             logerror("fcntl ctlconn O_NONBLOCK");
>               close(fd);
>               return;
>       }
> 

-- 
jasper

Reply via email to