Because it takes a while for the service to start up, and until then we spin in a fast epoll loop, this tends to start up all the instances all at once. There are a number of ways we can slow this instanciation down: 1) Call accept() and pass an additional fd to the service 2) Use EPOLLET: requires event to be prioritized and always dispatched. 3) Disable and then reenable the event source every time we enqueue an instance.
With Type=notify, we wait until a service tells us it is ready before we listen again and thereby start up more instances. What if someone want to use the templating namespace ('@') with Distribute=? --- TODO | 3 +- man/systemd.socket.xml | 15 +++++++- src/core/dbus-socket.c | 2 +- src/core/load-fragment-gperf.gperf.m4 | 3 +- src/core/service.c | 4 ++ src/core/socket.c | 72 ++++++++++++++++++++++++----------- src/core/socket.h | 8 +++- 7 files changed, 78 insertions(+), 29 deletions(-) diff --git a/TODO b/TODO index 2fb9cd3..697d568 100644 --- a/TODO +++ b/TODO @@ -69,7 +69,7 @@ Features: * rfkill,backlight: we probably should run the load tools inside of the udev rules so that the state is properly initialized by the time other software sees it -* Add a new Distribute=$NUMBER key to socket units that makes use of SO_REUSEPORT to distribute network traffic on $NUMBER instances +* tmpfiles: when applying ownership to /run/log/journal, also do this for the journal fails contained in it * we probably should replace the left-over uses of strv_append() and replace them by strv_push() or strv_extend() @@ -187,7 +187,6 @@ Features: * teach ConditionKernelCommandLine= globs or regexes (in order to match foobar={no,0,off}) * Support SO_REUSEPORT with socket activation: - - Let systemd maintain a pool of servers. - Use for seamless upgrades, by running the new server before stopping the old. diff --git a/man/systemd.socket.xml b/man/systemd.socket.xml index 7c10c58..6799020 100644 --- a/man/systemd.socket.xml +++ b/man/systemd.socket.xml @@ -404,7 +404,8 @@ designed for usage with <citerefentry><refentrytitle>inetd</refentrytitle><manvolnum>8</manvolnum></citerefentry> to work unmodified with systemd socket - activation.</para></listitem> + activation. Incompatible with + <varname>Distribute=</varname></para></listitem> </varlistentry> <varlistentry> @@ -519,6 +520,18 @@ </varlistentry> <varlistentry> + <term><varname>Distribute=</varname></term> + <listitem><para>Takes an integer + value. Systemd will spawn up to + given number of instances of service each + listening to the same socket. Default is 0. + Setting this requires corresponding service to + be an instansiated service (name ends with <literal>@.service</literal>). + Useful with <varname>ReusePort=</varname> above. + Incompatible with <varname>Accept=true</varname>.</para></listitem> + </varlistentry> + + <varlistentry> <term><varname>SmackLabel=</varname></term> <term><varname>SmackLabelIPIn=</varname></term> <term><varname>SmackLabelIPOut=</varname></term> diff --git a/src/core/dbus-socket.c b/src/core/dbus-socket.c index 74217df..68c95a0 100644 --- a/src/core/dbus-socket.c +++ b/src/core/dbus-socket.c @@ -40,7 +40,6 @@ static int property_get_listen( void *userdata, sd_bus_error *error) { - Socket *s = SOCKET(userdata); SocketPort *p; int r; @@ -116,6 +115,7 @@ const sd_bus_vtable bus_socket_vtable[] = { SD_BUS_PROPERTY("MessageQueueMessageSize", "x", bus_property_get_long, offsetof(Socket, mq_msgsize), 0), SD_BUS_PROPERTY("Result", "s", property_get_result, offsetof(Socket, result), SD_BUS_VTABLE_PROPERTY_EMITS_CHANGE), SD_BUS_PROPERTY("ReusePort", "b", bus_property_get_bool, offsetof(Socket, reuse_port), 0), + SD_BUS_PROPERTY("Distribute", "u", bus_property_get_unsigned, offsetof(Socket, distribute), 0), SD_BUS_PROPERTY("SmackLabel", "s", NULL, offsetof(Socket, smack), 0), SD_BUS_PROPERTY("SmackLabelIPIn", "s", NULL, offsetof(Socket, smack_ip_in), 0), SD_BUS_PROPERTY("SmackLabelIPOut", "s", NULL, offsetof(Socket, smack_ip_out), 0), diff --git a/src/core/load-fragment-gperf.gperf.m4 b/src/core/load-fragment-gperf.gperf.m4 index a5033b2..98f1eb7 100644 --- a/src/core/load-fragment-gperf.gperf.m4 +++ b/src/core/load-fragment-gperf.gperf.m4 @@ -212,7 +212,8 @@ Socket.Broadcast, config_parse_bool, 0, Socket.PassCredentials, config_parse_bool, 0, offsetof(Socket, pass_cred) Socket.PassSecurity, config_parse_bool, 0, offsetof(Socket, pass_sec) Socket.TCPCongestion, config_parse_string, 0, offsetof(Socket, tcp_congestion) -Socket.ReusePort, config_parse_bool, 0, offsetof(Socket, reuse_port) +Socket.ReusePort, config_parse_bool, -1, offsetof(Socket, reuse_port) +Socket.Distribute, config_parse_unsigned, 0, offsetof(Socket, distribute) Socket.MessageQueueMaxMessages, config_parse_long, 0, offsetof(Socket, mq_maxmsg) Socket.MessageQueueMessageSize, config_parse_long, 0, offsetof(Socket, mq_msgsize) Socket.Service, config_parse_socket_service, 0, 0 diff --git a/src/core/service.c b/src/core/service.c index 702443d..1d5972b 100644 --- a/src/core/service.c +++ b/src/core/service.c @@ -3409,9 +3409,13 @@ static void service_notify_message(Unit *u, pid_t pid, char **tags) { if (s->type == SERVICE_NOTIFY && s->state == SERVICE_START && strv_find(tags, "READY=1")) { + Socket *socket = SOCKET(UNIT_DEREF(s->accept_socket)); log_debug_unit(u->id, "%s: got READY=1", u->id); + if (socket && socket->distribute > socket->n_connections) + socket_enter_listening(socket); + service_enter_start_post(s); } diff --git a/src/core/socket.c b/src/core/socket.c index aaaa8d6..41a72a4 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -179,34 +179,30 @@ static int socket_arm_timer(Socket *s) { } static int socket_instantiate_service(Socket *s) { - char *prefix, *name; + _cleanup_free_ char *prefix = NULL, *name = NULL; int r; Unit *u; assert(s); /* This fills in s->service if it isn't filled in yet. For - * Accept=yes sockets we create the next connection service - * here. For Accept=no this is mostly a NOP since the service + * Accept=yes and Distribute=n sockets we create the next connection + * service here. Otherwise is mostly a NOP since the service * is figured out at load time anyway. */ - if (UNIT_DEREF(s->service)) + if (UNIT_DEREF(s->service) && s->distribute == 0) return 0; - assert(s->accept); + assert(s->accept || s->distribute); if (!(prefix = unit_name_to_prefix(UNIT(s)->id))) return -ENOMEM; r = asprintf(&name, "%s@%u.service", prefix, s->n_accepted); - free(prefix); - if (r < 0) return -ENOMEM; r = manager_load_unit(UNIT(s)->manager, name, NULL, NULL, &u); - free(name); - if (r < 0) return r; @@ -228,7 +224,7 @@ static bool have_non_accept_socket(Socket *s) { assert(s); - if (!s->accept) + if (!(s->accept || s->distribute)) return true; LIST_FOREACH(port, p, s->ports) { @@ -359,6 +355,13 @@ static int socket_add_extras(Socket *s) { return r; } + if (s->reuse_port == -1) { + if (s->distribute) + s->reuse_port = true; + else + s->reuse_port = false; + } + return 0; } @@ -483,15 +486,23 @@ static void socket_dump(Unit *u, FILE *f, const char *prefix) { "%sBindToDevice: %s\n", prefix, s->bind_to_device); - if (s->accept) + if (s->accept || s->distribute) fprintf(f, "%sAccepted: %u\n" - "%sNConnections: %u\n" - "%sMaxConnections: %u\n", + "%sNConnections: %u\n", prefix, s->n_accepted, - prefix, s->n_connections, + prefix, s->n_connections); + + if (s->accept) + fprintf(f, + "%sMaxConnections: %u\n", prefix, s->max_connections); + if (s->distribute) + fprintf(f, + "%sDistribute: %u\n", + prefix, s->distribute); + if (s->priority >= 0) fprintf(f, "%sPriority: %i\n", @@ -604,9 +615,14 @@ static int instance_from_socket(int fd, unsigned nr, char **instance) { struct sockaddr_storage storage; } local, remote; - assert(fd >= 0); assert(instance); + if (fd < 0) { + if (asprintf(instance, "%u", nr) < 0) + return -ENOMEM; + return 0; + } + l = sizeof(local); if (getsockname(fd, &local.sa, &l) < 0) return -errno; @@ -798,8 +814,8 @@ static void socket_apply_socket_options(Socket *s, int fd) { if (setsockopt(fd, SOL_TCP, TCP_CONGESTION, s->tcp_congestion, strlen(s->tcp_congestion)+1) < 0) log_warning_unit(UNIT(s)->id, "TCP_CONGESTION failed: %m"); - if (s->reuse_port) { - int b = s->reuse_port; + if (s->reuse_port || s->distribute) { + int b = true; if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &b, sizeof(b)) < 0) log_warning_unit(UNIT(s)->id, "SO_REUSEPORT failed: %m"); } @@ -1385,7 +1401,7 @@ fail: socket_enter_stop_post(s, SOCKET_FAILURE_RESOURCES); } -static void socket_enter_listening(Socket *s) { +void socket_enter_listening(Socket *s) { int r; assert(s); @@ -1485,7 +1501,7 @@ static void socket_enter_running(Socket *s, int cfd) { return; } - if (cfd < 0) { + if (cfd < 0 && !(s->distribute)) { Iterator i; Unit *other; bool pending = false; @@ -1509,7 +1525,7 @@ static void socket_enter_running(Socket *s, int cfd) { _cleanup_free_ char *prefix = NULL, *instance = NULL, *name = NULL; Service *service; - if (s->n_connections >= s->max_connections) { + if (s->n_connections >= s->max_connections && !(s->distribute)) { log_warning_unit(UNIT(s)->id, "%s: Too many incoming connections (%u)", UNIT(s)->id, s->n_connections); close_nointr_nofail(cfd); return; @@ -1554,7 +1570,10 @@ static void socket_enter_running(Socket *s, int cfd) { unit_choose_id(UNIT(service), name); - r = service_set_socket_fd(service, cfd, s); + if (cfd >= 0) + r = service_set_socket_fd(service, cfd, s); + else + r = unit_add_two_dependencies(UNIT(s), UNIT_BEFORE, UNIT_TRIGGERS, UNIT(service), false); if (r < 0) goto fail; @@ -1565,6 +1584,9 @@ static void socket_enter_running(Socket *s, int cfd) { if (r < 0) goto fail; + if (s->distribute > 0 && (s->n_connections >= s->distribute || SERVICE(UNIT_DEREF(s->service))->type == SERVICE_NOTIFY)) + socket_set_state(s, SOCKET_RUNNING); + /* Notify clients about changed counters */ unit_add_to_dbus_queue(UNIT(s)); } @@ -2286,13 +2308,17 @@ void socket_connection_unref(Socket *s) { /* The service is dead. Yay! * - * This is strictly for one-instance-per-connection - * services. */ + * This is for one-instance-per-connection + * and Distribute= services */ assert(s->n_connections > 0); s->n_connections--; + log_debug_unit(UNIT(s)->id, "%s: One connection closed, %u left.", UNIT(s)->id, s->n_connections); + + if(s->n_connections < s->distribute && s->state == SOCKET_RUNNING) + socket_enter_listening(s); } static void socket_trigger_notify(Unit *u, Unit *other) { diff --git a/src/core/socket.h b/src/core/socket.h index 076a183..86cd353 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -95,6 +95,8 @@ struct Socket { LIST_HEAD(SocketPort, ports); unsigned n_accepted; + /* when Accept=true this is the number of active connectoins + * when Distribute=n this is the number of active workers */ unsigned n_connections; unsigned max_connections; @@ -147,7 +149,8 @@ struct Socket { size_t pipe_size; char *bind_to_device; char *tcp_congestion; - bool reuse_port; + int reuse_port; + unsigned distribute; long mq_maxmsg; long mq_msgsize; @@ -162,6 +165,9 @@ int socket_collect_fds(Socket *s, int **fds, unsigned *n_fds); /* Called from the service code when a per-connection service ended */ void socket_connection_unref(Socket *s); +/* Called from the service code when we recieve a READY=1 notification */ +void socket_enter_listening(Socket *s); + void socket_free_ports(Socket *s); extern const UnitVTable socket_vtable; -- 1.8.5.1 _______________________________________________ systemd-devel mailing list systemd-devel@lists.freedesktop.org http://lists.freedesktop.org/mailman/listinfo/systemd-devel