Hi, this is a stab at the "remote journal logging" functionality... Attached is the body of the program, but full patch set is available under http://in.waw.pl/git/systemd/ journal-remoted
The program (called systemd-journal-remoted now, but I'd be happy to hear suggestions for a better name) listens on sockets (either from socket activation, or specified on the command line with --listen=), or reads stdin (if given --stdin), or uses curl to receive events from a systemd-journal-gatewayd instance (with --url=). So it can be used a server, or as a standalone binary. Messages must be in the export format. They are parsed and stored into a journal file. The journal file is /var/log/journal/external-*.journal by default, but this can be overridden by commandline options (--output). Authentication and rate-limiting are not implemented... Debugging messages are a bit excessive... Push mode is not implemented... (but it would be a separate program anyway). Examples: journalctl -o export | systemd-journal-remoted --stdin -o /tmp/dir/ will create a copy of events, which can be browsed with journalctl -D /tmp/dir/ Copy messages from another host systemd-journal-remoted --url http://some.host:19531/entries?boot' -o /tmp/dir/ Copy messages from another host, live systemd-journal-remoted --url http://some.host:19531/entries?boot&follow' -o /tmp/dir/ Listen on socket: systemd-journal-remoted --listen 19532 -o /tmp/dir/ I think that the implementation is fairly sound, but some details certainly can be improved. E.g. currently, file names look like (underneath some directory): remote-127.0.0.1~2000.journal remote-multiple.journal remote-stdin.journal remote-http~~~some~host~19531~entries.journal The goal was to have names containing the port number, so that it is possible to run multiple instances without conflict. Also, the memory allocation/deallocation patterns in get_line() are fairly ugly. I'm not sure if this is significant at all. Zbyszek
/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/ /*** This file is part of systemd. Copyright 2012 Zbigniew Jędrzejewski-Szmek systemd is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version. systemd is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with systemd; If not, see <http://www.gnu.org/licenses/>. ***/ #include <errno.h> #include <fcntl.h> #include <inttypes.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <sys/epoll.h> #include <sys/prctl.h> #include <sys/signalfd.h> #include <sys/socket.h> #include <sys/stat.h> #include <sys/types.h> #include <unistd.h> #include <getopt.h> #include <systemd/sd-daemon.h> #include "journal-file.h" #include "journald-native.h" #include "journald-server.h" #include "socket-util.h" #include "mkdir.h" #include "build.h" #include "macro.h" #define REMOTE_JOURNAL_PATH "/var/log/journal/" SD_ID128_FORMAT_STR "/remote-%s.journal" static char* arg_output = NULL; static char* arg_url = NULL; static bool arg_stdin = false; static char* arg_listen = NULL; static int arg_compress = 1; static int arg_seal = 0; /********************************************************************** ********************************************************************** **********************************************************************/ static int spawn_child(const char* child, char** argv) { int fd[2]; pid_t parent_pid, child_pid; int r; if (pipe(fd) < 0) { log_error("Failed to create pager pipe: %m"); return -errno; } parent_pid = getpid(); child_pid = fork(); if (child_pid < 0) { r = -errno; log_error("Failed to fork: %m"); close_pipe(fd); return r; } /* In the child */ if (child_pid == 0) { r = dup2(fd[1], STDOUT_FILENO); if (r < 0) { log_error("Failed to dup pipe to stdout: %m"); _exit(EXIT_FAILURE); } r = close_pipe(fd); if (r < 0) log_warning("Failed to close pipe fds: %m"); /* Make sure the child goes away when the parent dies */ if (prctl(PR_SET_PDEATHSIG, SIGTERM) < 0) _exit(EXIT_FAILURE); /* Check whether our parent died before we were able * to set the death signal */ if (getppid() != parent_pid) _exit(EXIT_SUCCESS); execvp(child, argv); log_error("Failed to exec child %s: %m", child); _exit(EXIT_FAILURE); } r = close(fd[1]); if (r < 0) log_warning("Failed to close write end of pipe: %m"); return fd[0]; } static int spawn_curl(char* url) { int r; char argv0[] = "curl"; char argv1[] = "-HAccept: application/vnd.fdo.journal"; char argv2[] = "--silent"; char argv3[] = "--show-error"; char* argv[] = {argv0, argv1, argv2, argv3, url, NULL}; r = spawn_child("curl", argv); if (r < 0) log_error("Failed to spawn curl: %m"); return r; } /********************************************************************** ********************************************************************** **********************************************************************/ static int make_socket_fd(const char* address) { SocketAddress a; int fd, r; char _cleanup_free_ *p = NULL; r = socket_address_parse(&a, address); if (r < 0) { log_error("failed to parse socket: %s", strerror(-r)); return r; } fd = socket(socket_address_family(&a), SOCK_STREAM|SOCK_CLOEXEC, 0); if (fd < 0) { log_error("socket(): %m"); return -errno; } r = socket_address_print(&a, &p); if (r < 0) { log_error("socket_address_print(): %s", strerror(-r)); return r; } log_info("Listening on %s", p); r = bind(fd, &a.sockaddr.sa, a.size); if (r < 0) { log_error("bind to %s: %m", address); return -errno; } r = listen(fd, SOMAXCONN); if (r < 0) { log_error("listen on %s: %m", address); return -errno; } return fd; } /********************************************************************** ********************************************************************** **********************************************************************/ struct iovec_wrapper { struct iovec *iovec; size_t size; size_t count; }; static int iovw_put(struct iovec_wrapper *iovw, void* data, size_t len) { if (iovw->count == iovw->size) { size_t newsize = MAX(iovw->size * 2, 1); struct iovec *new = realloc(iovw->iovec, sizeof(struct iovec) * newsize); if (!new) return log_oom(); iovw->size = newsize; iovw->iovec = new; } iovw->iovec[iovw->count].iov_base = data; iovw->iovec[iovw->count].iov_len = len; iovw->count++; return 0; } static void iovw_free(struct iovec_wrapper *iovw) { for (size_t j = 0; j < iovw->count; j++) free(iovw->iovec[j].iov_base); free(iovw->iovec); iovw->iovec = NULL; iovw->size = iovw->count = 0; } static size_t iovw_size(struct iovec_wrapper *iovw) { size_t n = 0, i; for(i = 0; i < iovw->count; i++) n += iovw->iovec[i].iov_len; return n; } /********************************************************************** ********************************************************************** **********************************************************************/ static int do_rotate(JournalFile **f) { int r = journal_file_rotate(f, arg_compress, arg_seal); if (r < 0) { if (*f) log_error("Failed to rotate %s: %s", (*f)->path, strerror(-r)); else log_error("Failed to create rotated journal: %s", strerror(-r)); } return r; } typedef struct Writer { JournalFile *journal; JournalMetrics metrics; MMapCache *mmap; uint64_t seqnum; } Writer; static int writer_init(Writer *s) { assert(s); s->journal = NULL; memset(&s->metrics, 0xFF, sizeof(s->metrics)); s->mmap = mmap_cache_new(); if (!s->mmap) return log_oom(); s->seqnum = 0; return 0; } static int writer_close(Writer *s) { if (s->journal) journal_file_close(s->journal); if (s->mmap) mmap_cache_unref(s->mmap); return 0; } static int open_output(Writer *s, const char* url) { char _cleanup_free_ *name, *output = NULL; char *c; int r; assert(url); name = strdup(url); if (!name) return log_oom(); for(c = name; *c; c++) { if (*c == '/' || *c == ':') *c = '~'; else if (*c == '?') { *c = '\0'; break; } } if (!arg_output) { sd_id128_t machine; r = sd_id128_get_machine(&machine); if (r < 0) { log_error("failed to determine machine ID128: %s", strerror(-r)); return r; } r = asprintf(&output, REMOTE_JOURNAL_PATH, SD_ID128_FORMAT_VAL(machine), name); if (r < 0) return log_oom(); } else { r = is_dir(arg_output); if (r > 0) { r = asprintf(&output, "%s/remote-%s.journal", arg_output, name); if (r < 0) return log_oom(); } else { output = strdup(arg_output); if (!output) return log_oom(); } } r = journal_file_open_reliably(output, O_RDWR|O_CREAT, 0640, arg_compress, arg_seal, &s->metrics, s->mmap, NULL, &s->journal); if (r < 0) log_error("Failed to open output journal %s: %s", arg_output, strerror(-r)); else log_info("Opened output file %s", s->journal->path); return r; } static int write_to_journal(Writer *s, struct iovec_wrapper *iovw, dual_timestamp *ts) { int r; assert(s); assert(iovw); assert(iovw->count > 0); if (journal_file_rotate_suggested(s->journal, 0)) { log_info("%s: Journal header limits reached or header out-of-date, rotating", s->journal->path); r = do_rotate(&s->journal); if (r < 0) return r; } r = journal_file_append_entry(s->journal, ts, iovw->iovec, iovw->count, &s->seqnum, NULL, NULL); if (r >= 0) return 1; log_info("%s: Write failed, rotating", s->journal->path); r = do_rotate(&s->journal); if (r < 0) return r; log_debug("Retrying write."); r = journal_file_append_entry(s->journal, ts, iovw->iovec, iovw->count, &s->seqnum, NULL, NULL); return r < 0 ? r : 1; } /********************************************************************** ********************************************************************** **********************************************************************/ typedef enum { STATE_LINE, /* waiting to read, or reading line */ STATE_DATA_START, /* reading binary data header */ STATE_DATA, /* reading binary data */ STATE_DATA_FINISH, /* expecting newline */ STATE_EOF, /* done */ } source_state; typedef struct Source { char* name; int fd; char *buf; size_t size; size_t filled; size_t data_size; struct iovec_wrapper iovw; source_state state; dual_timestamp ts; } Source; typedef struct RServer { Source **sources; ssize_t sources_size; ssize_t active; struct Writer writer; int epoll_fd; int signal_fd; } RServer; static int do_epoll(int epoll_fd, int fd) { int r; struct epoll_event ev = {}; ev.events = EPOLLIN; ev.data.fd = fd; log_debug("Polling fd=%d (epoll_fd=%d)", fd, epoll_fd); r = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &ev); if (r < 0) log_error("Failed to add epoll (fd=%d) event for fd=%d", epoll_fd, fd); return r; } static int source_non_empty(Source *source) { assert(source); return source->filled > 0; } static int get_source_for_fd(RServer *s, int fd, Source **source) { assert(fd >= 0); assert(source); if (fd >= s->sources_size) { size_t newsize = MAX(fd+1, s->sources_size*2); Source **tmp = realloc(s->sources, sizeof(Source*) * newsize); if (!tmp) return log_oom(); memzero(tmp + s->sources_size, sizeof(Source*) * (newsize - s->sources_size)); s->sources = tmp; s->sources_size = newsize; } if (s->sources[fd] == NULL) { s->sources[fd] = calloc(1, sizeof(Source)); if (!s->sources[fd]) return log_oom(); s->sources[fd]->fd = -1; } *source = s->sources[fd]; return 0; } static int remove_source(RServer *s, int fd) { Source *source; int r = 0; assert(s); assert(fd >= 0); assert(fd < s->sources_size); source = s->sources[fd]; if (source) { if (source->fd >= 0) { log_debug("Closing fd=%d (%s)", source->fd, source->name); r = close(source->fd); } free(source->name); free(source->buf); iovw_free(&source->iovw); free(source); s->sources[fd] = NULL; s->active--; } return r; } static int add_source(RServer *s, int fd, const char* name) { Source *source = NULL; int r; assert(s); assert(fd >= 0); log_debug("Creating source for fd=%d (%s)", fd, name); r = get_source_for_fd(s, fd, &source); if (r < 0) { log_error("Failed to create source for fd=%d (%s)", fd, name); return r; } assert(source); s->active++; if (source->fd != -1) { log_error("Trying to add duplicate source for fd=%d (%s), existing is (%s)", fd, name, source->name); return -EEXIST; } source->fd = fd; if (name) { source->name = strdup(name); if (!source->name) r = log_oom(); } else { r = asprintf(&source->name, "fd=%d", fd); if (r < 0) r = log_oom(); } if (r < 0) goto error; r = do_epoll(s->epoll_fd, fd); if (r < 0) goto error; return 1; /* work to do */ error: remove_source(s, fd); return r; } static int rserver_init(RServer *s) { int r, n, fd; const char *output_name = NULL; assert(s); memzero(s, sizeof(*s)); s->epoll_fd = s->signal_fd = -1; s->epoll_fd = epoll_create1(EPOLL_CLOEXEC); if (s->epoll_fd < 0) { log_error("Failed to create epoll object: %m"); return -errno; } n = sd_listen_fds(true); if (n < 0) { log_error("Failed to read listening file descriptors from environment: %s", strerror(-n)); return n; } else log_info("Received %d descriptors", n); for (fd = SD_LISTEN_FDS_START; fd < SD_LISTEN_FDS_START + n; fd++) { if (sd_is_socket(fd, AF_UNSPEC, 0, true)) { log_info("Received a socket (fd=%d)", fd); r = do_epoll(s->epoll_fd, fd); if (r < 0) return r; output_name = "socket"; s->active ++; } else { log_error("Unknown socket passed on fd=%d", fd); return -EINVAL; } } if (arg_url) { log_info("Spawning curl..."); fd = spawn_curl(arg_url); if (fd < 0) return fd; r = add_source(s, fd, arg_url); if (r < 0) return r; output_name = arg_url; } if (arg_listen) { log_info("Opening socket to listen..."); fd = make_socket_fd(arg_listen); if (fd < 0) return fd; r = do_epoll(s->epoll_fd, fd); if (r < 0) return r; output_name = arg_listen; s->active ++; } if (arg_stdin) { log_info("Reading standard input..."); r = add_source(s, STDIN_FILENO, "stdin"); if (r < 0) return r; output_name = "stdin"; } if (s->active == 0) { log_error("Zarro sources specified"); return -EINVAL; } r = open_signalfd(s->epoll_fd, &s->signal_fd); if (r < 0) return r; if (!!n + !!arg_url + !!arg_listen + !!arg_stdin > 1) output_name = "multiple"; r = writer_init(&s->writer); if (r < 0) return r; r = open_output(&s->writer, output_name); return r; } static int server_destroy(RServer *s) { int r; ssize_t i; r = writer_close(&s->writer); if (s->epoll_fd >= 0) close(s->epoll_fd); if (s->signal_fd >= 0) close(s->signal_fd); assert(s->sources_size == 0 || s->sources); for(i = 0; i < s->sources_size; i++) remove_source(s, i); free(s->sources); /* fds that we're listening on remain open... */ return r; } #define LINE_CHUNK 1024 static int get_line(Source *source, char **line, size_t *size) { ssize_t n, remain; char *c; char *newbuf = NULL; size_t newsize = 0; assert(source); assert(source->state == STATE_LINE); assert(source->filled <= source->size); assert(source->buf == NULL || source->size > 0); c = memchr(source->buf, '\n', source->filled); if (c != NULL) goto docopy; resize: if (source->size - source->filled < LINE_CHUNK) { char *buf; // XXX: add check for maximum line length buf = realloc(source->buf, source->size + LINE_CHUNK); if (!buf) return log_oom(); source->buf = buf; source->size += LINE_CHUNK; } assert(source->size - source->filled >= LINE_CHUNK); n = read(source->fd, source->buf + source->filled, source->size - source->filled); if (n < 0) { if (errno != EAGAIN && errno != EWOULDBLOCK) log_error("read(%d, ..., %zd): %m", source->fd, source->size - source->filled); return -errno; } else if (n == 0) return 0; c = memchr(source->buf + source->filled, '\n', n); source->filled += n; if (c == NULL) goto resize; docopy: *line = source->buf; *size = c + 1 - source->buf; /* Check if something remains */ remain = source->buf + source->filled - c - 1; assert(remain >= 0); if (remain) { newsize = MAX(remain, LINE_CHUNK); newbuf = malloc(newsize); if (!newbuf) return log_oom(); memcpy(newbuf, c + 1, remain); } source->buf = newbuf; source->size = newsize; source->filled = remain; return 1; } static int fill_fixed_size(Source *source, void **data, size_t size) { int n; char *newbuf = NULL; size_t newsize = 0, remain; assert(source); assert(source->state == STATE_DATA_START || source->state == STATE_DATA || source->state == STATE_DATA_FINISH); assert(size <= DATA_SIZE_MAX); assert(source->filled <= source->size); assert(source->buf != NULL || source->size == 0); assert(source->buf == NULL || source->size > 0); assert(data); while(source->filled < size) { if (source->size < size) { char *buf; buf = realloc(source->buf, size); if (!buf) return log_oom(); source->buf = buf; source->size = size; } n = read(source->fd, source->buf + source->filled, source->size - source->filled); if (n < 0) { if (errno != EAGAIN && errno != EWOULDBLOCK) log_error("read(%d, ..., %zd): %m", source->fd, source->size - source->filled); return -errno; } else if (n == 0) return 0; source->filled += n; } *data = source->buf; /* Check if something remains */ assert(size <= source->filled); remain = source->filled - size; if (remain) { newsize = MAX(remain, LINE_CHUNK); newbuf = malloc(newsize); if (!newbuf) return log_oom(); memcpy(newbuf, source->buf + size, remain); } source->buf = newbuf; source->size = newsize; source->filled = remain; return 1; return 1; } static int get_data_size(Source *source) { int r; void _cleanup_free_ *data = NULL; assert(source); assert(source->state == STATE_DATA_START); assert(source->data_size == 0); r = fill_fixed_size(source, &data, sizeof(uint64_t)); if (r <= 0) return r; source->data_size = le64toh( *(uint64_t *) data ); if (source->data_size == 0) log_warning("Binary field with zero length"); return 1; } static int get_data_data(Source *source, void **data) { int r; assert(source); assert(data); assert(source->state == STATE_DATA); r = fill_fixed_size(source, data, source->data_size); if (r <= 0) return r; return 1; } static int get_data_newline(Source *source) { int r; char _cleanup_free_ *data = NULL; assert(source); assert(source->state == STATE_DATA_FINISH); r = fill_fixed_size(source, (void**) &data, 1); if (r <= 0) return r; assert(data); if (*data != '\n') { log_error("expected newline, got '%c'", *data); return -EINVAL; } return 1; } static int process_dunder(Source *source, char *line, size_t n) { char *timestamp; int r; assert(line); assert(n > 0); assert(line[n-1] == '\n'); /* XXX: is it worth to support timestamps in extended format? * We don't produce them, but who knows... */ timestamp = startswith(line, "__CURSOR="); if (timestamp) /* ignore __CURSOR */ return 1; timestamp = startswith(line, "__REALTIME_TIMESTAMP="); if (timestamp) { long long unsigned x; line[n-1] = '\0'; r = safe_atollu(timestamp, &x); if (r < 0) log_error("failed to parse __REALTIME_TIMESTAMP: '%s'", timestamp); else source->ts.realtime = x; return r < 0 ? r : 1; } timestamp = startswith(line, "__MONOTONIC_TIMESTAMP="); if (timestamp) { long long unsigned x; line[n-1] = '\0'; r = safe_atollu(timestamp, &x); if (r < 0) log_error("failed to parse __MONOTONIC_TIMESTAMP: '%s'", timestamp); else source->ts.monotonic = x; return r < 0 ? r : 1; } timestamp = startswith(line, "__"); if (timestamp) { log_warning("unknown dunder line %s", line); return 1; } /* no dunder */ return 0; } static int process_source(Source *source) { int r; switch(source->state) { case STATE_LINE: { char *line, *sep; size_t n; assert(source->data_size == 0); r = get_line(source, &line, &n); if (r < 0) return r; if (r == 0) { source->state = STATE_EOF; return r; } assert(n > 0); assert(line[n-1] == '\n'); if (n == 1) { log_debug("Received empty line, event is ready"); free(line); return 1; } r = process_dunder(source, line, n); if (r != 0) { free(line); return r < 0 ? r : 0; } /* MESSAGE=xxx\n or COREDUMP\n LLLLLLLL0011223344...\n */ sep = strchr(line, '='); if (sep) /* chomp newline */ n--; else /* replace \n with = */ line[n-1] = '='; log_debug("Received: %.*s", (int) n, line); r = iovw_put(&source->iovw, line, n); if (r < 0) { log_error("Failed to put line in iovect"); free(line); return r; } if (!sep) source->state = STATE_DATA_START; return 0; /* continue */ } case STATE_DATA_START: assert(source->data_size == 0); r = get_data_size(source); log_debug("get_data_size() -> %d", r); if (r < 0) return r; if (r == 0) { source->state = STATE_EOF; return 0; } source->state = source->data_size > 0 ? STATE_DATA : STATE_DATA_FINISH; return 0; /* continue */ case STATE_DATA: { void *data; assert(source->data_size > 0); r = get_data_data(source, &data); log_debug("get_data_data() -> %d", r); if (r < 0) return r; if (r == 0) { source->state = STATE_EOF; return 0; } assert(data); r = iovw_put(&source->iovw, data, source->data_size); if (r < 0) { log_error("failed to put binary buffer in iovect"); return r; } source->state = STATE_DATA_FINISH; return 0; /* continue */ } case STATE_DATA_FINISH: r = get_data_newline(source); log_debug("get_data_newline() -> %d", r); if (r < 0) return r; if (r == 0) { source->state = STATE_EOF; return 0; } source->data_size = 0; source->state = STATE_LINE; return 0; /* continue */ default: assert_not_reached("wtf?"); } } /********************************************************************** ********************************************************************** **********************************************************************/ static int process_source_event(Source *source, Writer *writer) { int r; assert(source); assert(writer); r = process_source(source); if (r <= 0) return r < 0 ? r : 1; /* We have a full event */ log_info("Received a full event from source@%p fd=%d (%s)", source, source->fd, source->name); if (!source->iovw.count) { log_warning("Entry with no payload, skipping"); goto freeing; } assert(source->iovw.iovec); assert(source->iovw.count); r = write_to_journal(writer, &source->iovw, &source->ts); if (r < 0) { log_error("Failed to write entry: %s", strerror(-r)); if (r == -E2BIG) log_info("Entry was %zu bytes", iovw_size(&source->iovw)); } else r = 1; freeing: iovw_free(&source->iovw); return r; } static int process_epoll_event(RServer *s, struct epoll_event *event) { Source *source = NULL; int r; log_debug("Processing event (fd=%d, sources_size=%zd)", event->data.fd, s->sources_size); if (event->data.fd < s->sources_size) source = s->sources[event->data.fd]; if (source) { assert(source->fd == event->data.fd); r = process_source_event(source, &s->writer); if (source->state == STATE_EOF) { log_info("EOF reached with source fd=%d (%s)", source->fd, source->name); if (source_non_empty(source)) log_warning("EOF reached with incomplete data"); remove_source(s, source->fd); log_info("%zd active source remaining", s->active); } else if (r == -E2BIG) { log_error("Entry too big, skipped"); r = 1; } } else { SocketAddress addr = { .size = sizeof(union sockaddr_union), .type = SOCK_STREAM, }; int fd2; log_debug("Accepting new connection on fd=%d", event->data.fd); fd2 = accept4(event->data.fd, &addr.sockaddr.sa, &addr.size, SOCK_NONBLOCK|SOCK_CLOEXEC); if (fd2 < 0) { log_error("accept() on fd=%d failed: %m", event->data.fd); return -errno; } switch(socket_address_family(&addr)) { case AF_INET: case AF_INET6: { char* _cleanup_free_ a = NULL; r = socket_address_print(&addr, &a); if (r < 0) { log_error("socket_address_print(): %s", strerror(-r)); close(fd2); return r; } log_info("Accepted %s connection from %s", socket_address_family(&addr) == AF_INET ? "IP" : "IPv6", a); break; } default: log_error("Connection with unsupported family %d", socket_address_family(&addr)); close(fd2); return -EINVAL; } r = add_source(s, fd2, NULL); if (r < 0) log_error("failed to create source from fd=%d", fd2); } return r; } /********************************************************************** ********************************************************************** **********************************************************************/ static int help(void) { printf("%s [OPTIONS...]\n\n" "Write external journal events to a journal file.\n\n" "Options:\n" " --url=URL Read events from systemd-journal-gatewayd at URL\n" " --listen=ADDR Listen for connections at ADDR\n" " --stdin Read events from standard input\n" " -o --output=FILE|DIR Write output to FILE or DIR/external-*.journal\n" " --[no-]compress Use XZ-compression in the output journal (default: %s)\n" " --[no-]seal Use Event sealing in the output journal (default: %s)\n" " -h --help Show this help and exit\n" " --version Print version string and exit\n" "\n" "Note: file descriptors from sd_listen_fds() will be consumed, too.\n" , program_invocation_short_name , arg_compress ? "yes" : "no" , arg_seal ? "yes" : "no" ); return 0; } static int parse_argv(int argc, char *argv[]) { enum { ARG_VERSION = 0x100, ARG_URL, ARG_LISTEN, ARG_STDIN, ARG_COMPRESS, ARG_NO_COMPRESS, ARG_SEAL, ARG_NO_SEAL, }; static const struct option options[] = { { "help", no_argument, NULL, 'h' }, { "version", no_argument, NULL, ARG_VERSION }, { "url", required_argument, NULL, ARG_URL }, { "listen", required_argument, NULL, ARG_LISTEN }, { "stdin", no_argument, NULL, ARG_STDIN }, { "output", required_argument, NULL, 'o' }, { "compress", no_argument, NULL, ARG_COMPRESS }, { "no-compress", no_argument, NULL, ARG_NO_COMPRESS }, { "seal", no_argument, NULL, ARG_SEAL }, { "no-seal", no_argument, NULL, ARG_NO_SEAL }, { NULL, 0, NULL, 0 } }; int c; assert(argc >= 0); assert(argv); while ((c = getopt_long(argc, argv, "ho:", options, NULL)) >= 0) switch(c) { case 'h': help(); return 0 /* done */; case ARG_VERSION: puts(PACKAGE_STRING); puts(DISTRIBUTION); puts(SYSTEMD_FEATURES); return 0 /* done */; case ARG_URL: if (arg_url) { log_error("cannot currently set more than one --url"); return -EINVAL; } arg_url = optarg; break; case ARG_LISTEN: if (arg_listen) { log_error("cannot currently use --listen more than once"); return -EINVAL; } arg_listen = optarg; break; case ARG_STDIN: arg_stdin = true; break; case 'o': if (arg_output) { log_error("cannot use --output/-o more than once"); return -EINVAL; } arg_output = optarg; break; case ARG_COMPRESS: arg_compress = true; break; case ARG_NO_COMPRESS: arg_compress = false; break; case ARG_SEAL: arg_seal = true; break; case ARG_NO_SEAL: arg_seal = false; break; case '?': return -EINVAL; default: log_error("Unknown option code %c", c); return -EINVAL; } if (optind < argc) { log_error("This program takes no positional arguments"); return -EINVAL; } return 1 /* work to do */; } static int process_signal(RServer *s, struct epoll_event *event) { struct signalfd_siginfo sfsi; ssize_t n; assert(event->data.fd == s->signal_fd); n = read(s->signal_fd, &sfsi, sizeof(sfsi)); if (n != sizeof(sfsi)) { if (n >= 0) return -EIO; if (errno == EINTR || errno == EAGAIN) return 1; return -errno; } log_info("Received SIG%s", signal_to_string(sfsi.ssi_signo)); return 0; } int main(int argc, char **argv) { RServer s; int r, r2; log_set_max_level(LOG_DEBUG); log_show_color(true); // log_parse_environment(); r = parse_argv(argc, argv); if (r <= 0) return r == 0 ? EXIT_SUCCESS : EXIT_FAILURE; if (rserver_init(&s) < 0) return EXIT_FAILURE; log_debug("%s running as pid %lu", program_invocation_short_name, (unsigned long) getpid()); sd_notify(false, "READY=1\n" "STATUS=Processing requests..."); while (s.active) { struct epoll_event event; r = epoll_wait(s.epoll_fd, &event, 1, -1); if (r < 0) { if (errno == EINTR) continue; log_error("epoll_wait() failed: %m"); r = -errno; break; } if (r > 0) { if (!(event.events & (EPOLLIN|EPOLLHUP))) { log_error("Got unexpected event from epoll for fd=%d: %d", event.data.fd, event.events); r = -EIO; break; } if (event.data.fd == s.signal_fd) r = process_signal(&s, &event); else r = process_epoll_event(&s, &event); if (r <= 0) { if (r == -EAGAIN || r == -EWOULDBLOCK) continue; break; } } } sd_notify(false, "STATUS=Shutting down..."); log_info("Finishing after writing %" PRIu64 " entries", s.writer.seqnum); r2 = server_destroy(&s); if (r == 0) r = r2; return r == 0 ? EXIT_SUCCESS : EXIT_FAILURE; }
_______________________________________________ systemd-devel mailing list systemd-devel@lists.freedesktop.org http://lists.freedesktop.org/mailman/listinfo/systemd-devel