Should SERVICE_SIMPLE test be a load-time test? v2 fix assert order --- TODO | 3 +- src/core/dbus-socket.c | 2 ++ src/core/load-fragment-gperf.gperf.m4 | 1 + src/core/service.c | 2 +- src/core/service.h | 13 ++++++- src/core/socket.c | 64 +++++++++++++++++++++++++++++++++++ src/core/socket.h | 2 ++ 7 files changed, 83 insertions(+), 4 deletions(-)
diff --git a/TODO b/TODO index efc7e2a..0db4dc6 100644 --- a/TODO +++ b/TODO @@ -80,7 +80,7 @@ Features: * rfkill,backlight: we probably should run the load tools inside of the udev rules so that the state is properly initialized by the time other software sees it -* Add a new Distribute=$NUMBER key to socket units that makes use of SO_REUSEPORT to distribute network traffic on $NUMBER instances +* respawn Distribute= worker threads when they die unexpectedly * tmpfiles: when applying ownership to /run/log/journal, also do this for the journal fails contained in it @@ -259,7 +259,6 @@ Features: * teach ConditionKernelCommandLine= globs or regexes (in order to match foobar={no,0,off}) * Support SO_REUSEPORT with socket activation: - - Let systemd maintain a pool of servers. - Use for seamless upgrades, by running the new server before stopping the old. diff --git a/src/core/dbus-socket.c b/src/core/dbus-socket.c index 60a8d05..4644007 100644 --- a/src/core/dbus-socket.c +++ b/src/core/dbus-socket.c @@ -68,6 +68,7 @@ " <property name=\"Listen\" type=\"a(ss)\" access=\"read\"/>\n" \ " <property name=\"Result\" type=\"s\" access=\"read\"/>\n" \ " <property name=\"ReusePort\" type=\"b\" access=\"read\"/>\n" \ + " <property name=\"Distribute\" type=\"u\" access=\"read\"/>\n" \ " <property name=\"SmackLabel\" type=\"s\" access=\"read\"/>\n" \ " <property name=\"SmackLabelIPIn\" type=\"s\" access=\"read\"/>\n" \ " <property name=\"SmackLabelIPOut\" type=\"s\" access=\"read\"/>\n" \ @@ -196,6 +197,7 @@ static const BusProperty bus_socket_properties[] = { { "MessageQueueMessageSize", bus_property_append_long, "x", offsetof(Socket, mq_msgsize) }, { "Result", bus_socket_append_socket_result, "s", offsetof(Socket, result) }, { "ReusePort", bus_property_append_bool, "b", offsetof(Socket, reuseport) }, + { "Distribute", bus_property_append_unsigned, "u", offsetof(Socket, distribute) }, { "SmackLabel", bus_property_append_string, "s", offsetof(Socket, smack), true }, { "SmackLabelIPIn", bus_property_append_string, "s", offsetof(Socket, smack_ip_in), true }, { "SmackLabelIPOut",bus_property_append_string, "s", offsetof(Socket, smack_ip_out), true }, diff --git a/src/core/load-fragment-gperf.gperf.m4 b/src/core/load-fragment-gperf.gperf.m4 index b64fdc9..4058a1f 100644 --- a/src/core/load-fragment-gperf.gperf.m4 +++ b/src/core/load-fragment-gperf.gperf.m4 @@ -211,6 +211,7 @@ Socket.PassCredentials, config_parse_bool, 0, Socket.PassSecurity, config_parse_bool, 0, offsetof(Socket, pass_sec) Socket.TCPCongestion, config_parse_string, 0, offsetof(Socket, tcp_congestion) Socket.ReusePort, config_parse_bool, 0, offsetof(Socket, reuseport) +Socket.Distribute, config_parse_unsigned, 0, offsetof(Socket, distribute) Socket.MessageQueueMaxMessages, config_parse_long, 0, offsetof(Socket, mq_maxmsg) Socket.MessageQueueMessageSize, config_parse_long, 0, offsetof(Socket, mq_msgsize) Socket.Service, config_parse_socket_service, 0, 0 diff --git a/src/core/service.c b/src/core/service.c index 3da32a1..cc337cf 100644 --- a/src/core/service.c +++ b/src/core/service.c @@ -1668,7 +1668,7 @@ fail: return r; } -static int service_spawn( +int service_spawn( Service *s, ExecCommand *c, bool timeout, diff --git a/src/core/service.h b/src/core/service.h index 37fa6ff..95aa707 100644 --- a/src/core/service.h +++ b/src/core/service.h @@ -26,7 +26,6 @@ typedef struct Service Service; #include "unit.h" #include "path.h" #include "ratelimit.h" -#include "service.h" #include "kill.h" #include "exit-status.h" @@ -201,6 +200,18 @@ extern const UnitVTable service_vtable; struct Socket; +int service_spawn( + Service *s, + ExecCommand *c, + bool timeout, + bool pass_fds, + bool apply_permissions, + bool apply_chroot, + bool apply_tty_stdin, + bool set_notify_socket, + bool is_control, + pid_t *_pid); + int service_set_socket_fd(Service *s, int fd, struct Socket *socket); const char* service_state_to_string(ServiceState i) _const_; diff --git a/src/core/socket.c b/src/core/socket.c index 751f20b..d0b5375 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -1422,6 +1422,66 @@ fail: socket_enter_dead(s, SOCKET_FAILURE_RESOURCES); } +static void socket_distribute(Socket *socket) { + int r; + Service *s; + + assert(socket); + + if (socket->distribute <= 1) + return; + + s = SERVICE(UNIT_DEREF(socket->service)); + + if (!s->type == SERVICE_SIMPLE) { + /* do not spam error message when Accept=true */ + if (!(socket->accept == true)) + log_warning_unit(UNIT(s)->id, + "%s is not Type=simple required for Distribute. Not distributing.", + UNIT(s)->id); + return; + } + + assert(s->exec_command[SERVICE_EXEC_START]); + assert(!s->exec_command[SERVICE_EXEC_START]->command_next); + + /* distribute implies reuseport */ + socket->reuseport = true; + + /* the first worker has already been started */ + for (unsigned i=1;i < socket->distribute;i++) { + ExecCommand *c; + pid_t pid; + + r = socket_open_fds(socket); + if (r < 0) { + log_warning_unit(UNIT(socket)->id, + "%s failed to open socket(s) to distribute. Ignoring: %s", + UNIT(socket)->id, strerror(-r)); + return; + } + + c = s->main_command = s->exec_command[SERVICE_EXEC_START]; + + r = service_spawn(s, + c, + NULL, + true, + true, + true, + true, + s->notify_access != NOTIFY_NONE, + false, + &pid); + if (r < 0) { + log_warning_unit(UNIT(socket)->id, + "%s failed to distribute. Ignoring: %s", + UNIT(socket)->id, strerror(-r)); + return; + } + } +} + static void socket_enter_running(Socket *s, int cfd) { int r; DBusError error; @@ -1471,6 +1531,8 @@ static void socket_enter_running(Socket *s, int cfd) { r = manager_add_job(UNIT(s)->manager, JOB_START, UNIT_DEREF(s->service), JOB_REPLACE, true, &error, NULL); if (r < 0) goto fail; + + socket_distribute(s); } socket_set_state(s, SOCKET_RUNNING); @@ -1537,6 +1599,8 @@ static void socket_enter_running(Socket *s, int cfd) { if (r < 0) goto fail; + socket_distribute(s); + /* Notify clients about changed counters */ unit_add_to_dbus_queue(UNIT(s)); } diff --git a/src/core/socket.h b/src/core/socket.h index 3d7eadc..b307822 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -145,6 +145,8 @@ struct Socket { char *bind_to_device; char *tcp_congestion; bool reuseport; + /* implies reuseport */ + unsigned distribute; long mq_maxmsg; long mq_msgsize; -- 1.8.4.3 _______________________________________________ systemd-devel mailing list systemd-devel@lists.freedesktop.org http://lists.freedesktop.org/mailman/listinfo/systemd-devel