Well spotted indeed :-)
Here comes the next iteration.

Cheers
Andreas


Extend the JSON output plugin so that the generated JSON stream can be
sent to a remote host via TCP/UDP or to a local unix socket.

Signed-off-by: Andreas Jaggi <andreas.ja...@waterwave.ch>
---
 output/ulogd_output_JSON.c | 291 ++++++++++++++++++++++++++++++++++---
 ulogd.conf.in              |  11 ++
 2 files changed, 281 insertions(+), 21 deletions(-)

diff --git a/output/ulogd_output_JSON.c b/output/ulogd_output_JSON.c
index 4d8e3e9..6edfa90 100644
--- a/output/ulogd_output_JSON.c
+++ b/output/ulogd_output_JSON.c
@@ -20,10 +20,15 @@
 
 #include <stdio.h>
 #include <stdlib.h>
+#include <unistd.h>
 #include <string.h>
 #include <time.h>
 #include <errno.h>
 #include <inttypes.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <netdb.h>
 #include <ulogd/ulogd.h>
 #include <ulogd/conffile.h>
 #include <jansson.h>
@@ -36,6 +41,10 @@
 #define ULOGD_JSON_DEFAULT_DEVICE "Netfilter"
 #endif
 
+#define host_ce(x)     (x->ces[JSON_CONF_HOST])
+#define port_ce(x)     (x->ces[JSON_CONF_PORT])
+#define mode_ce(x)     (x->ces[JSON_CONF_MODE])
+#define file_ce(x)     (x->ces[JSON_CONF_FILENAME])
 #define unlikely(x) __builtin_expect((x),0)
 
 struct json_priv {
@@ -44,6 +53,15 @@ struct json_priv {
        int usec_idx;
        long cached_gmtoff;
        char cached_tz[6];      /* eg +0200 */
+       int mode;
+       int sock;
+};
+
+enum json_mode {
+       JSON_MODE_FILE = 0,
+       JSON_MODE_TCP,
+       JSON_MODE_UDP,
+       JSON_MODE_UNIX
 };
 
 enum json_conf {
@@ -53,6 +71,9 @@ enum json_conf {
        JSON_CONF_EVENTV1,
        JSON_CONF_DEVICE,
        JSON_CONF_BOOLEAN_LABEL,
+       JSON_CONF_MODE,
+       JSON_CONF_HOST,
+       JSON_CONF_PORT,
        JSON_CONF_MAX
 };
 
@@ -95,15 +116,167 @@ static struct config_keyset json_kset = {
                        .options = CONFIG_OPT_NONE,
                        .u = { .value = 0 },
                },
+               [JSON_CONF_MODE] = {
+                       .key = "mode",
+                       .type = CONFIG_TYPE_STRING,
+                       .options = CONFIG_OPT_NONE,
+                       .u = { .string = "file" },
+               },
+               [JSON_CONF_HOST] = {
+                       .key = "host",
+                       .type = CONFIG_TYPE_STRING,
+                       .options = CONFIG_OPT_NONE,
+                       .u = { .string = "127.0.0.1" },
+               },
+               [JSON_CONF_PORT] = {
+                       .key = "port",
+                       .type = CONFIG_TYPE_STRING,
+                       .options = CONFIG_OPT_NONE,
+                       .u = { .string = "12345" },
+               },
        },
 };
 
+static void close_socket(struct json_priv *op) {
+       if (op->sock != -1) {
+               close(op->sock);
+               op->sock = -1;
+       }
+}
+
+static int _connect_socket_unix(struct ulogd_pluginstance *pi)
+{
+       struct json_priv *op = (struct json_priv *) &pi->private;
+       struct sockaddr_un u_addr;
+       int sfd;
+
+       close_socket(op);
+
+       ulogd_log(ULOGD_DEBUG, "connecting to unix:%s\n",
+                 file_ce(pi->config_kset).u.string);
+
+       sfd = socket(AF_UNIX, SOCK_STREAM, 0);
+       if (sfd == -1) {
+               return -1;
+       }
+       u_addr.sun_family = AF_UNIX;
+       strncpy(u_addr.sun_path, file_ce(pi->config_kset).u.string,
+               sizeof(u_addr.sun_path) - 1);
+       if (connect(sfd, (struct sockaddr *) &u_addr, sizeof(struct 
sockaddr_un)) == -1) {
+               close(sfd);
+               return -1;
+       }
+
+       op->sock = sfd;
+
+       return 0;
+}
+
+static int _connect_socket_net(struct ulogd_pluginstance *pi)
+{
+       struct json_priv *op = (struct json_priv *) &pi->private;
+       struct addrinfo hints;
+       struct addrinfo *result, *rp;
+       int sfd, s;
+
+       close_socket(op);
+
+       ulogd_log(ULOGD_DEBUG, "connecting to %s:%s\n",
+                 host_ce(pi->config_kset).u.string,
+                 port_ce(pi->config_kset).u.string);
+
+       memset(&hints, 0, sizeof(struct addrinfo));
+       hints.ai_family = AF_UNSPEC;
+       hints.ai_socktype = op->mode == JSON_MODE_UDP ? SOCK_DGRAM : 
SOCK_STREAM;
+       hints.ai_protocol = 0;
+       hints.ai_flags = 0;
+
+       s = getaddrinfo(host_ce(pi->config_kset).u.string,
+                       port_ce(pi->config_kset).u.string, &hints, &result);
+       if (s != 0) {
+               ulogd_log(ULOGD_ERROR, "getaddrinfo: %s\n", gai_strerror(s));
+               return -1;
+       }
+
+       for (rp = result; rp != NULL; rp = rp->ai_next) {
+               int on = 1;
+
+               sfd = socket(rp->ai_family, rp->ai_socktype,
+                               rp->ai_protocol);
+               if (sfd == -1)
+                       continue;
+
+               setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR,
+                          (char *) &on, sizeof(on));
+
+               if (connect(sfd, rp->ai_addr, rp->ai_addrlen) != -1)
+                       break;
+
+               close(sfd);
+       }
+
+       freeaddrinfo(result);
+
+       if (rp == NULL) {
+               return -1;
+       }
+
+       op->sock = sfd;
+
+       return 0;
+}
+
+static int _connect_socket(struct ulogd_pluginstance *pi)
+{
+       struct json_priv *op = (struct json_priv *) &pi->private;
+
+       if (op->mode == JSON_MODE_UNIX)
+               return _connect_socket_unix(pi);
+       else
+               return _connect_socket_net(pi);
+}
+
+static int json_interp_socket(struct ulogd_pluginstance *upi, char *buf, int 
buflen)
+{
+       struct json_priv *opi = (struct json_priv *) &upi->private;
+       int ret = 0;
+
+       if (opi->sock != -1)
+               ret = send(opi->sock, buf, buflen, MSG_NOSIGNAL);
+       free(buf);
+       if (ret != buflen) {
+               ulogd_log(ULOGD_ERROR, "Failure sending message: %s\n",
+                         strerror(errno));
+               if (ret == -1 || opi->sock == -1)
+                       return _connect_socket(upi);
+               else
+                       return ULOGD_IRET_ERR;
+       }
+
+       return ULOGD_IRET_OK;
+}
+
+static int json_interp_file(struct ulogd_pluginstance *upi, char *buf)
+{
+       struct json_priv *opi = (struct json_priv *) &upi->private;
+
+       fprintf(opi->of, "%s", buf);
+       free(buf);
+
+       if (upi->config_kset->ces[JSON_CONF_SYNC].u.value != 0)
+               fflush(opi->of);
+
+       return ULOGD_IRET_OK;
+}
+
 #define MAX_LOCAL_TIME_STRING 38
 
 static int json_interp(struct ulogd_pluginstance *upi)
 {
        struct json_priv *opi = (struct json_priv *) &upi->private;
        unsigned int i;
+       char *buf;
+       int buflen;
        json_t *msg;
 
        msg = json_object();
@@ -218,34 +391,65 @@ static int json_interp(struct ulogd_pluginstance *upi)
                }
        }
 
-       json_dumpf(msg, opi->of, 0);
-       fprintf(opi->of, "\n");
 
+       buf = json_dumps(msg, 0);
        json_decref(msg);
+       if (buf == NULL) {
+               ulogd_log(ULOGD_ERROR, "Could not create message\n");
+               return ULOGD_IRET_ERR;
+       }
+       buflen = strlen(buf);
+       buf = realloc(buf, sizeof(char)*(buflen+2));
+       if (buf == NULL) {
+               ulogd_log(ULOGD_ERROR, "Could not create message\n");
+               return ULOGD_IRET_ERR;
+       }
+       strncat(buf, "\n", 1);
+       buflen++;
 
-       if (upi->config_kset->ces[JSON_CONF_SYNC].u.value != 0)
-               fflush(opi->of);
+       if (opi->mode == JSON_MODE_FILE)
+               return json_interp_file(upi, buf);
+       else
+               return json_interp_socket(upi, buf, buflen);
+}
 
-       return ULOGD_IRET_OK;
+static void reopen_file(struct ulogd_pluginstance *upi)
+{
+       struct json_priv *oi = (struct json_priv *) &upi->private;
+       FILE *old = oi->of;
+
+       ulogd_log(ULOGD_NOTICE, "JSON: reopening logfile\n");
+       oi->of = fopen(upi->config_kset->ces[0].u.string, "a");
+       if (!oi->of) {
+               ulogd_log(ULOGD_ERROR, "can't open JSON "
+                                      "log file: %s\n",
+                         strerror(errno));
+               oi->of = old;
+       } else {
+               fclose(old);
+       }
+}
+
+static void reopen_socket(struct ulogd_pluginstance *upi)
+{
+       ulogd_log(ULOGD_NOTICE, "JSON: reopening socket\n");
+       if (_connect_socket(upi) < 0) {
+               ulogd_log(ULOGD_ERROR, "can't open JSON "
+                                      "socket: %s\n",
+                         strerror(errno));
+       }
 }
 
 static void sighup_handler_print(struct ulogd_pluginstance *upi, int signal)
 {
        struct json_priv *oi = (struct json_priv *) &upi->private;
-       FILE *old = oi->of;
 
        switch (signal) {
        case SIGHUP:
-               ulogd_log(ULOGD_NOTICE, "JSON: reopening logfile\n");
-               oi->of = fopen(upi->config_kset->ces[0].u.string, "a");
-               if (!oi->of) {
-                       ulogd_log(ULOGD_ERROR, "can't open JSON "
-                                              "log file: %s\n",
-                                 strerror(errno));
-                       oi->of = old;
-               } else {
-                       fclose(old);
-               }
+               if (oi->mode == JSON_MODE_FILE)
+                       reopen_file(upi);
+               else
+                       reopen_socket(upi);
                break;
        default:
                break;
@@ -255,6 +459,8 @@ static void sighup_handler_print(struct ulogd_pluginstance 
*upi, int signal)
 static int json_configure(struct ulogd_pluginstance *upi,
                            struct ulogd_pluginstance_stack *stack)
 {
+       struct json_priv *op = (struct json_priv *) &upi->private;
+       char *mode_str = mode_ce(upi->config_kset).u.string;
        int ret;
 
        ret = ulogd_wildcard_inputkeys(upi);
@@ -265,13 +471,25 @@ static int json_configure(struct ulogd_pluginstance *upi,
        if (ret < 0)
                return ret;
 
+       if (!strcasecmp(mode_str, "udp")) {
+               op->mode = JSON_MODE_UDP;
+       } else if (!strcasecmp(mode_str, "tcp")) {
+               op->mode = JSON_MODE_TCP;
+       } else if (!strcasecmp(mode_str, "unix")) {
+               op->mode = JSON_MODE_UNIX;
+       } else if (!strcasecmp(mode_str, "file")) {
+               op->mode = JSON_MODE_FILE;
+       } else {
+               ulogd_log(ULOGD_ERROR, "unknown mode '%s'\n", mode_str);
+               return -EINVAL;
+       }
+
        return 0;
 }
 
-static int json_init(struct ulogd_pluginstance *upi)
+static int json_init_file(struct ulogd_pluginstance *upi)
 {
        struct json_priv *op = (struct json_priv *) &upi->private;
-       unsigned int i;
 
        op->of = fopen(upi->config_kset->ces[0].u.string, "a");
        if (!op->of) {
@@ -280,6 +498,27 @@ static int json_init(struct ulogd_pluginstance *upi)
                return -1;
        }
 
+       return 0;
+}
+
+static int json_init_socket(struct ulogd_pluginstance *upi)
+{
+       struct json_priv *op = (struct json_priv *) &upi->private;
+
+       if (host_ce(upi->config_kset).u.string == NULL)
+               return -1;
+       if (port_ce(upi->config_kset).u.string == NULL)
+               return -1;
+
+       op->sock = -1;
+       return _connect_socket(upi);
+}
+
+static int json_init(struct ulogd_pluginstance *upi)
+{
+       struct json_priv *op = (struct json_priv *) &upi->private;
+       unsigned int i;
+
        /* search for time */
        op->sec_idx = -1;
        op->usec_idx = -1;
@@ -293,15 +532,25 @@ static int json_init(struct ulogd_pluginstance *upi)
 
        *op->cached_tz = '\0';
 
-       return 0;
+       if (op->mode == JSON_MODE_FILE)
+               return json_init_file(upi);
+       else
+               return json_init_socket(upi);
+}
+
+static void close_file(FILE *of) {
+       if (of != stdout)
+               fclose(of);
 }
 
 static int json_fini(struct ulogd_pluginstance *pi)
 {
        struct json_priv *op = (struct json_priv *) &pi->private;
 
-       if (op->of != stdout)
-               fclose(op->of);
+       if (op->mode == JSON_MODE_FILE)
+               close_file(op->of);
+       else
+               close_socket(op);
 
        return 0;
 }
diff --git a/ulogd.conf.in b/ulogd.conf.in
index 2fcf39a..e94a3a2 100644
--- a/ulogd.conf.in
+++ b/ulogd.conf.in
@@ -212,6 +212,17 @@ sync=1
 # Uncomment the following line to use JSON v1 event format that
 # can provide better compatility with some JSON file reader.
 #eventv1=1
+# Uncomment the following lines to send the JSON logs to a remote host via UDP
+#mode="udp"
+#host="192.0.2.10"
+#port="10210"
+# Uncomment the following lines to send the JSON logs to a remote host via TCP
+#mode="tcp"
+#host="192.0.2.10"
+#port="10210"
+# Uncomment the following lines to send the JSON logs to a local unix socket
+#mode="unix"
+#file="/var/run/ulogd.socket"
 
 [pcap1]
 #default file is /var/log/ulogd.pcap
-- 
2.17.0


--
To unsubscribe from this list: send the line "unsubscribe netfilter-devel" in
the body of a message to majord...@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html

Reply via email to