v3: make each worker its own service v4: be less intrusive v5: misc fixups --- TODO | 3 - man/systemd.socket.xml | 11 ++++ src/core/dbus-socket.c | 2 + src/core/load-fragment-gperf.gperf.m4 | 1 + src/core/service.c | 7 +- src/core/service.h | 1 - src/core/socket.c | 119 +++++++++++++++++++--------------- src/core/socket.h | 4 ++ 8 files changed, 89 insertions(+), 59 deletions(-)
diff --git a/TODO b/TODO index efc7e2a..6067efb 100644 --- a/TODO +++ b/TODO @@ -80,8 +80,6 @@ Features: * rfkill,backlight: we probably should run the load tools inside of the udev rules so that the state is properly initialized by the time other software sees it -* Add a new Distribute=$NUMBER key to socket units that makes use of SO_REUSEPORT to distribute network traffic on $NUMBER instances - * tmpfiles: when applying ownership to /run/log/journal, also do this for the journal fails contained in it * we probably should replace the left-over uses of strv_append() and replace them by strv_push() or strv_extend() @@ -259,7 +257,6 @@ Features: * teach ConditionKernelCommandLine= globs or regexes (in order to match foobar={no,0,off}) * Support SO_REUSEPORT with socket activation: - - Let systemd maintain a pool of servers. - Use for seamless upgrades, by running the new server before stopping the old. diff --git a/man/systemd.socket.xml b/man/systemd.socket.xml index 7c10c58..4a2189b 100644 --- a/man/systemd.socket.xml +++ b/man/systemd.socket.xml @@ -519,6 +519,17 @@ </varlistentry> <varlistentry> + <term><varname>Distribute=</varname></term> + <listitem><para>Takes an integer + value. Systemd will spawn + given number of instances of service each + listening to the same socket. Default is 0. + Setting this requires corresponding service to + be an instansiated service (name ends with <literal>@.service</literal>). + This option implies <varname>Reuseport=</varname> above.</para></listitem> + </varlistentry> + + <varlistentry> <term><varname>SmackLabel=</varname></term> <term><varname>SmackLabelIPIn=</varname></term> <term><varname>SmackLabelIPOut=</varname></term> diff --git a/src/core/dbus-socket.c b/src/core/dbus-socket.c index 60a8d05..4644007 100644 --- a/src/core/dbus-socket.c +++ b/src/core/dbus-socket.c @@ -68,6 +68,7 @@ " <property name=\"Listen\" type=\"a(ss)\" access=\"read\"/>\n" \ " <property name=\"Result\" type=\"s\" access=\"read\"/>\n" \ " <property name=\"ReusePort\" type=\"b\" access=\"read\"/>\n" \ + " <property name=\"Distribute\" type=\"u\" access=\"read\"/>\n" \ " <property name=\"SmackLabel\" type=\"s\" access=\"read\"/>\n" \ " <property name=\"SmackLabelIPIn\" type=\"s\" access=\"read\"/>\n" \ " <property name=\"SmackLabelIPOut\" type=\"s\" access=\"read\"/>\n" \ @@ -196,6 +197,7 @@ static const BusProperty bus_socket_properties[] = { { "MessageQueueMessageSize", bus_property_append_long, "x", offsetof(Socket, mq_msgsize) }, { "Result", bus_socket_append_socket_result, "s", offsetof(Socket, result) }, { "ReusePort", bus_property_append_bool, "b", offsetof(Socket, reuseport) }, + { "Distribute", bus_property_append_unsigned, "u", offsetof(Socket, distribute) }, { "SmackLabel", bus_property_append_string, "s", offsetof(Socket, smack), true }, { "SmackLabelIPIn", bus_property_append_string, "s", offsetof(Socket, smack_ip_in), true }, { "SmackLabelIPOut",bus_property_append_string, "s", offsetof(Socket, smack_ip_out), true }, diff --git a/src/core/load-fragment-gperf.gperf.m4 b/src/core/load-fragment-gperf.gperf.m4 index b64fdc9..4058a1f 100644 --- a/src/core/load-fragment-gperf.gperf.m4 +++ b/src/core/load-fragment-gperf.gperf.m4 @@ -211,6 +211,7 @@ Socket.PassCredentials, config_parse_bool, 0, Socket.PassSecurity, config_parse_bool, 0, offsetof(Socket, pass_sec) Socket.TCPCongestion, config_parse_string, 0, offsetof(Socket, tcp_congestion) Socket.ReusePort, config_parse_bool, 0, offsetof(Socket, reuseport) +Socket.Distribute, config_parse_unsigned, 0, offsetof(Socket, distribute) Socket.MessageQueueMaxMessages, config_parse_long, 0, offsetof(Socket, mq_maxmsg) Socket.MessageQueueMessageSize, config_parse_long, 0, offsetof(Socket, mq_msgsize) Socket.Service, config_parse_socket_service, 0, 0 diff --git a/src/core/service.c b/src/core/service.c index c0ee114..9c8cf9c 100644 --- a/src/core/service.c +++ b/src/core/service.c @@ -3679,7 +3679,6 @@ static void service_bus_query_pid_done( int service_set_socket_fd(Service *s, int fd, Socket *sock) { assert(s); - assert(fd >= 0); /* This is called by the socket code when instantiating a new * service for a stream socket and the socket needs to be @@ -3694,8 +3693,10 @@ int service_set_socket_fd(Service *s, int fd, Socket *sock) { if (s->state != SERVICE_DEAD) return -EAGAIN; - s->socket_fd = fd; - s->got_socket_fd = true; + if (fd >= 0) { + s->socket_fd = fd; + s->got_socket_fd = true; + } unit_ref_set(&s->accept_socket, UNIT(sock)); diff --git a/src/core/service.h b/src/core/service.h index 37fa6ff..2ffe7d1 100644 --- a/src/core/service.h +++ b/src/core/service.h @@ -26,7 +26,6 @@ typedef struct Service Service; #include "unit.h" #include "path.h" #include "ratelimit.h" -#include "service.h" #include "kill.h" #include "exit-status.h" diff --git a/src/core/socket.c b/src/core/socket.c index 751f20b..4d4627a 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -153,34 +153,30 @@ static void socket_done(Unit *u) { } static int socket_instantiate_service(Socket *s) { - char *prefix, *name; + _cleanup_free_ char *prefix = NULL, *name = NULL; int r; Unit *u; assert(s); /* This fills in s->service if it isn't filled in yet. For - * Accept=yes sockets we create the next connection service - * here. For Accept=no this is mostly a NOP since the service + * Accept=yes and Distribute=n sockets we create the next connection + * service here. Otherwise is mostly a NOP since the service * is figured out at load time anyway. */ - if (UNIT_DEREF(s->service)) + if (UNIT_DEREF(s->service) && !(s->distribute)) return 0; - assert(s->accept); + assert(s->accept || s->distribute); if (!(prefix = unit_name_to_prefix(UNIT(s)->id))) return -ENOMEM; r = asprintf(&name, "%s@%u.service", prefix, s->n_accepted); - free(prefix); - if (r < 0) return -ENOMEM; r = manager_load_unit(UNIT(s)->manager, name, NULL, NULL, &u); - free(name); - if (r < 0) return r; @@ -513,6 +509,11 @@ static void socket_dump(Unit *u, FILE *f, const char *prefix) { "%sReusePort: %s\n", prefix, yes_no(s->reuseport)); + if (s->distribute) + fprintf(f, + "%sDistribute: %d\n", + prefix, s->distribute); + if (s->smack) fprintf(f, "%sSmackLabel: %s\n", @@ -577,9 +578,13 @@ static int instance_from_socket(int fd, unsigned nr, char **instance) { struct sockaddr_storage storage; } local, remote; - assert(fd >= 0); assert(instance); + if (fd < 0) { + asprintf(&r, "%u", nr); + goto shortcut; + } + l = sizeof(local); if (getsockname(fd, &local.sa, &l) < 0) return -errno; @@ -663,6 +668,7 @@ static int instance_from_socket(int fd, unsigned nr, char **instance) { assert_not_reached("Unhandled socket type."); } +shortcut: *instance = r; return 0; } @@ -769,8 +775,8 @@ static void socket_apply_socket_options(Socket *s, int fd) { if (setsockopt(fd, SOL_TCP, TCP_CONGESTION, s->tcp_congestion, strlen(s->tcp_congestion)+1) < 0) log_warning_unit(UNIT(s)->id, "TCP_CONGESTION failed: %m"); - if (s->reuseport) { - int b = s->reuseport; + if (s->reuseport || s->distribute) { + int b = true; if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &b, sizeof(b)) < 0) log_warning_unit(UNIT(s)->id, "SO_REUSEPORT failed: %m"); } @@ -1454,7 +1460,7 @@ static void socket_enter_running(Socket *s, int cfd) { return; } - if (cfd < 0) { + if (cfd < 0 && !(s->distribute)) { Iterator i; Unit *u; bool pending = false; @@ -1478,7 +1484,7 @@ static void socket_enter_running(Socket *s, int cfd) { _cleanup_free_ char *prefix = NULL, *instance = NULL, *name = NULL; Service *service; - if (s->n_connections >= s->max_connections) { + if (s->n_connections >= s->max_connections && !(s->distribute)) { log_warning_unit(UNIT(s)->id, "%s: Too many incoming connections (%u)", UNIT(s)->id, s->n_connections); @@ -1486,56 +1492,60 @@ static void socket_enter_running(Socket *s, int cfd) { return; } - r = socket_instantiate_service(s); - if (r < 0) - goto fail; - - r = instance_from_socket(cfd, s->n_accepted, &instance); - if (r < 0) { - if (r != -ENOTCONN) - goto fail; - - /* ENOTCONN is legitimate if TCP RST was received. - * This connection is over, but the socket unit lives on. */ - close_nointr_nofail(cfd); - return; - } - prefix = unit_name_to_prefix(UNIT(s)->id); if (!prefix) { r = -ENOMEM; goto fail; } - name = unit_name_build(prefix, instance, ".service"); + do { + r = socket_instantiate_service(s); + if (r < 0) + goto fail; - if (!name) { - r = -ENOMEM; - goto fail; - } + r = instance_from_socket(cfd, s->n_accepted, &instance); + if (r < 0) { + if (r != -ENOTCONN) + goto fail; - r = unit_add_name(UNIT_DEREF(s->service), name); - if (r < 0) - goto fail; + /* ENOTCONN is legitimate if TCP RST was received. + * This connection is over, but the socket unit lives on. */ + close_nointr_nofail(cfd); + return; + } - service = SERVICE(UNIT_DEREF(s->service)); - unit_ref_unset(&s->service); - s->n_accepted ++; + name = unit_name_build(prefix, instance, ".service"); + if (!name) { + r = -ENOMEM; + goto fail; + } - UNIT(service)->no_gc = false; + r = unit_add_name(UNIT_DEREF(s->service), name); + if (r < 0) + goto fail; - unit_choose_id(UNIT(service), name); + service = SERVICE(UNIT_DEREF(s->service)); + unit_ref_unset(&s->service); + s->n_accepted ++; - r = service_set_socket_fd(service, cfd, s); - if (r < 0) - goto fail; + UNIT(service)->no_gc = false; - cfd = -1; - s->n_connections ++; + unit_choose_id(UNIT(service), name); - r = manager_add_job(UNIT(s)->manager, JOB_START, UNIT(service), JOB_REPLACE, true, &error, NULL); - if (r < 0) - goto fail; + r = service_set_socket_fd(service, cfd, s); + if (r < 0) + goto fail; + + cfd = -1; + s->n_connections ++; + + r = manager_add_job(UNIT(s)->manager, JOB_START, UNIT(service), JOB_REPLACE, true, &error, NULL); + if (r < 0) + goto fail; + + if (s->n_connections < s->distribute) + socket_enter_listening(s); + } while (s->n_connections < s->distribute); /* Notify clients about changed counters */ unit_add_to_dbus_queue(UNIT(s)); @@ -2263,14 +2273,19 @@ void socket_connection_unref(Socket *s) { /* The service is dead. Yay! * - * This is strictly for one-instance-per-connection - * services. */ + * This is for one-instance-per-connection + * and Distribute= services */ assert(s->n_connections > 0); s->n_connections--; log_debug_unit(UNIT(s)->id, "%s: One connection closed, %u left.", UNIT(s)->id, s->n_connections); + + if(s->n_connections < s->distribute && s->state == SOCKET_RUNNING) + /* (re)enter systemd into SO_REUSEPORT pool, when it gets a + * connection it will reestablish distribute target */ + socket_enter_listening(s); } static void socket_reset_failed(Unit *u) { diff --git a/src/core/socket.h b/src/core/socket.h index 3d7eadc..5928356 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -93,6 +93,8 @@ struct Socket { LIST_HEAD(SocketPort, ports); unsigned n_accepted; + /* when Accept=true this is the number of active connectoins + * when Distribute=n this is the number of active workers */ unsigned n_connections; unsigned max_connections; @@ -145,6 +147,8 @@ struct Socket { char *bind_to_device; char *tcp_congestion; bool reuseport; + /* implies reuseport */ + unsigned distribute; long mq_maxmsg; long mq_msgsize; -- 1.8.4.3 _______________________________________________ systemd-devel mailing list systemd-devel@lists.freedesktop.org http://lists.freedesktop.org/mailman/listinfo/systemd-devel