ping?
On Fri, Dec 13, 2013 at 8:23 PM, Shawn Landden <sh...@churchofgit.com> wrote: > If Distribute=n, turns SO_REUSEPORT on, and spawns > n workers to handling incoming requests. > > SO_REUSEPORT sockets on the same port must all be created > by the same uid, therefore using the option allows > other root programs (or programs of the same user > if running in --user mode) to "hijack" this port, even > after systemd reserves it. > > We spawn workers at a rate approximentally reverse > exponentially proportianal to the number of incoming connections. > Faster based on the time for new workers to start accept()ing > and their load, or slower if systemd is under load. > --- > TODO | 3 +- > man/systemd.socket.xml | 15 +++++++- > src/core/dbus-socket.c | 4 +-- > src/core/load-fragment-gperf.gperf.m4 | 3 +- > src/core/service.c | 4 +-- > src/core/socket.c | 68 > ++++++++++++++++++++++------------- > src/core/socket.h | 5 ++- > src/shared/conf-parser.c | 32 +++++++++++++++++ > src/shared/conf-parser.h | 1 + > 9 files changed, 101 insertions(+), 34 deletions(-) > > diff --git a/TODO b/TODO > index 0b43888..2abe1b4 100644 > --- a/TODO > +++ b/TODO > @@ -73,7 +73,7 @@ Features: > > * rfkill,backlight: we probably should run the load tools inside of the udev > rules so that the state is properly initialized by the time other software > sees it > > -* Add a new Distribute=$NUMBER key to socket units that makes use of > SO_REUSEPORT to distribute network traffic on $NUMBER instances > +* tmpfiles: when applying ownership to /run/log/journal, also do this for > the journal fails contained in it > > * move config_parse_path_strv() out of conf-parser.c > > @@ -181,7 +181,6 @@ Features: > * teach ConditionKernelCommandLine= globs or regexes (in order to match > foobar={no,0,off}) > > * Support SO_REUSEPORT with socket activation: > - - Let systemd maintain a pool of servers. > - Use for seamless upgrades, by running the new server before stopping the > old. > > diff --git a/man/systemd.socket.xml b/man/systemd.socket.xml > index 7c10c58..6799020 100644 > --- a/man/systemd.socket.xml > +++ b/man/systemd.socket.xml > @@ -404,7 +404,8 @@ > designed for usage with > > <citerefentry><refentrytitle>inetd</refentrytitle><manvolnum>8</manvolnum></citerefentry> > to work unmodified with systemd socket > - activation.</para></listitem> > + activation. Incompatible with > + > <varname>Distribute=</varname></para></listitem> > </varlistentry> > > <varlistentry> > @@ -519,6 +520,18 @@ > </varlistentry> > > <varlistentry> > + <term><varname>Distribute=</varname></term> > + <listitem><para>Takes an integer > + value. Systemd will spawn up to > + given number of instances of service each > + listening to the same socket. Default is 0. > + Setting this requires corresponding service > to > + be an instansiated service (name ends with > <literal>@.service</literal>). > + Useful with <varname>ReusePort=</varname> > above. > + Incompatible with > <varname>Accept=true</varname>.</para></listitem> > + </varlistentry> > + > + <varlistentry> > <term><varname>SmackLabel=</varname></term> > > <term><varname>SmackLabelIPIn=</varname></term> > > <term><varname>SmackLabelIPOut=</varname></term> > diff --git a/src/core/dbus-socket.c b/src/core/dbus-socket.c > index 74217df..036c9af 100644 > --- a/src/core/dbus-socket.c > +++ b/src/core/dbus-socket.c > @@ -40,7 +40,6 @@ static int property_get_listen( > void *userdata, > sd_bus_error *error) { > > - > Socket *s = SOCKET(userdata); > SocketPort *p; > int r; > @@ -115,7 +114,8 @@ const sd_bus_vtable bus_socket_vtable[] = { > SD_BUS_PROPERTY("MessageQueueMaxMessages", "x", > bus_property_get_long, offsetof(Socket, mq_maxmsg), 0), > SD_BUS_PROPERTY("MessageQueueMessageSize", "x", > bus_property_get_long, offsetof(Socket, mq_msgsize), 0), > SD_BUS_PROPERTY("Result", "s", property_get_result, offsetof(Socket, > result), SD_BUS_VTABLE_PROPERTY_EMITS_CHANGE), > - SD_BUS_PROPERTY("ReusePort", "b", bus_property_get_bool, > offsetof(Socket, reuse_port), 0), > + SD_BUS_PROPERTY("ReusePort", "b", bus_property_get_tristate, > offsetof(Socket, reuse_port), 0), > + SD_BUS_PROPERTY("Distribute", "u", bus_property_get_unsigned, > offsetof(Socket, distribute), 0), > SD_BUS_PROPERTY("SmackLabel", "s", NULL, offsetof(Socket, smack), 0), > SD_BUS_PROPERTY("SmackLabelIPIn", "s", NULL, offsetof(Socket, > smack_ip_in), 0), > SD_BUS_PROPERTY("SmackLabelIPOut", "s", NULL, offsetof(Socket, > smack_ip_out), 0), > diff --git a/src/core/load-fragment-gperf.gperf.m4 > b/src/core/load-fragment-gperf.gperf.m4 > index a5033b2..70f15bd 100644 > --- a/src/core/load-fragment-gperf.gperf.m4 > +++ b/src/core/load-fragment-gperf.gperf.m4 > @@ -212,7 +212,8 @@ Socket.Broadcast, config_parse_bool, > 0, > Socket.PassCredentials, config_parse_bool, 0, > offsetof(Socket, pass_cred) > Socket.PassSecurity, config_parse_bool, 0, > offsetof(Socket, pass_sec) > Socket.TCPCongestion, config_parse_string, 0, > offsetof(Socket, tcp_congestion) > -Socket.ReusePort, config_parse_bool, 0, > offsetof(Socket, reuse_port) > +Socket.ReusePort, config_parse_tristate, 0, > offsetof(Socket, reuse_port) > +Socket.Distribute, config_parse_unsigned, 0, > offsetof(Socket, distribute) > Socket.MessageQueueMaxMessages, config_parse_long, 0, > offsetof(Socket, mq_maxmsg) > Socket.MessageQueueMessageSize, config_parse_long, 0, > offsetof(Socket, mq_msgsize) > Socket.Service, config_parse_socket_service, 0, > 0 > diff --git a/src/core/service.c b/src/core/service.c > index 3b3f956..3047e10 100644 > --- a/src/core/service.c > +++ b/src/core/service.c > @@ -3668,7 +3668,6 @@ static void service_bus_name_owner_change( > int service_set_socket_fd(Service *s, int fd, Socket *sock) { > > assert(s); > - assert(fd >= 0); > > /* This is called by the socket code when instantiating a new > * service for a stream socket and the socket needs to be > @@ -3683,7 +3682,8 @@ int service_set_socket_fd(Service *s, int fd, Socket > *sock) { > if (s->state != SERVICE_DEAD) > return -EAGAIN; > > - s->socket_fd = fd; > + if (fd > 0) > + s->socket_fd = fd; > > unit_ref_set(&s->accept_socket, UNIT(sock)); > > diff --git a/src/core/socket.c b/src/core/socket.c > index aaaa8d6..be2b681 100644 > --- a/src/core/socket.c > +++ b/src/core/socket.c > @@ -179,34 +179,30 @@ static int socket_arm_timer(Socket *s) { > } > > static int socket_instantiate_service(Socket *s) { > - char *prefix, *name; > + _cleanup_free_ char *prefix = NULL, *name = NULL; > int r; > Unit *u; > > assert(s); > > /* This fills in s->service if it isn't filled in yet. For > - * Accept=yes sockets we create the next connection service > - * here. For Accept=no this is mostly a NOP since the service > + * Accept=yes and Distribute=n sockets we create the next connection > + * service here. Otherwise is mostly a NOP since the service > * is figured out at load time anyway. */ > > - if (UNIT_DEREF(s->service)) > + if (UNIT_DEREF(s->service) && s->distribute == 0) > return 0; > > - assert(s->accept); > + assert(s->accept || s->distribute > 0); > > if (!(prefix = unit_name_to_prefix(UNIT(s)->id))) > return -ENOMEM; > > r = asprintf(&name, "%s@%u.service", prefix, s->n_accepted); > - free(prefix); > - > if (r < 0) > return -ENOMEM; > > r = manager_load_unit(UNIT(s)->manager, name, NULL, NULL, &u); > - free(name); > - > if (r < 0) > return r; > > @@ -228,7 +224,7 @@ static bool have_non_accept_socket(Socket *s) { > > assert(s); > > - if (!s->accept) > + if (!(s->accept || s->distribute > 0)) > return true; > > LIST_FOREACH(port, p, s->ports) { > @@ -408,6 +404,9 @@ static int socket_load(Unit *u) { > if (r < 0) > return r; > > + if (s->reuse_port < 0) > + s->reuse_port = s->distribute > 0; > + > if (u->load_state == UNIT_LOADED) { > /* This is a new unit? Then let's add in some extras */ > r = socket_add_extras(s); > @@ -483,15 +482,23 @@ static void socket_dump(Unit *u, FILE *f, const char > *prefix) { > "%sBindToDevice: %s\n", > prefix, s->bind_to_device); > > - if (s->accept) > + if (s->accept || s->distribute > 0) > fprintf(f, > "%sAccepted: %u\n" > - "%sNConnections: %u\n" > - "%sMaxConnections: %u\n", > + "%sNConnections: %u\n", > prefix, s->n_accepted, > - prefix, s->n_connections, > + prefix, s->n_connections); > + > + if (s->accept) > + fprintf(f, > + "%sMaxConnections: %u\n", > prefix, s->max_connections); > > + if (s->distribute > 0) > + fprintf(f, > + "%sDistribute: %u\n", > + prefix, s->distribute); > + > if (s->priority >= 0) > fprintf(f, > "%sPriority: %i\n", > @@ -604,9 +611,14 @@ static int instance_from_socket(int fd, unsigned nr, > char **instance) { > struct sockaddr_storage storage; > } local, remote; > > - assert(fd >= 0); > assert(instance); > > + if (fd < 0) { > + if (asprintf(instance, "%u", nr) < 0) > + return -ENOMEM; > + return 0; > + } > + > l = sizeof(local); > if (getsockname(fd, &local.sa, &l) < 0) > return -errno; > @@ -798,11 +810,9 @@ static void socket_apply_socket_options(Socket *s, int > fd) { > if (setsockopt(fd, SOL_TCP, TCP_CONGESTION, > s->tcp_congestion, strlen(s->tcp_congestion)+1) < 0) > log_warning_unit(UNIT(s)->id, "TCP_CONGESTION > failed: %m"); > > - if (s->reuse_port) { > - int b = s->reuse_port; > - if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &b, sizeof(b)) > < 0) > + if (s->reuse_port) > + if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &s->reuse_port, > sizeof(s->reuse_port)) < 0) > log_warning_unit(UNIT(s)->id, "SO_REUSEPORT failed: > %m"); > - } > > if (s->smack_ip_in) > if (smack_label_ip_in_fd(fd, s->smack_ip_in) < 0) > @@ -1112,7 +1122,7 @@ static int socket_watch_fds(Socket *s) { > if (p->event_source) > r = sd_event_source_set_enabled(p->event_source, > SD_EVENT_ON); > else > - r = sd_event_add_io(UNIT(s)->manager->event, p->fd, > EPOLLIN, socket_dispatch_io, p, &p->event_source); > + r = sd_event_add_io(UNIT(s)->manager->event, p->fd, > EPOLLIN | (s->distribute > 0 ? EPOLLET : 0), socket_dispatch_io, p, > &p->event_source); > > if (r < 0) { > log_warning_unit(UNIT(s)->id, "Failed to watch > listening fds: %s", strerror(-r)); > @@ -1485,7 +1495,7 @@ static void socket_enter_running(Socket *s, int cfd) { > return; > } > > - if (cfd < 0) { > + if (cfd < 0 && s->distribute == 0) { > Iterator i; > Unit *other; > bool pending = false; > @@ -1509,7 +1519,7 @@ static void socket_enter_running(Socket *s, int cfd) { > _cleanup_free_ char *prefix = NULL, *instance = NULL, *name > = NULL; > Service *service; > > - if (s->n_connections >= s->max_connections) { > + if (s->n_connections >= s->max_connections && s->distribute > == 0) { > log_warning_unit(UNIT(s)->id, "%s: Too many incoming > connections (%u)", UNIT(s)->id, s->n_connections); > close_nointr_nofail(cfd); > return; > @@ -1565,6 +1575,9 @@ static void socket_enter_running(Socket *s, int cfd) { > if (r < 0) > goto fail; > > + if (s->distribute > 0 && s->n_connections >= s->distribute) > + socket_set_state(s, SOCKET_RUNNING); > + > /* Notify clients about changed counters */ > unit_add_to_dbus_queue(UNIT(s)); > } > @@ -2286,13 +2299,17 @@ void socket_connection_unref(Socket *s) { > > /* The service is dead. Yay! > * > - * This is strictly for one-instance-per-connection > - * services. */ > + * This is for one-instance-per-connection > + * and Distribute= services */ > > assert(s->n_connections > 0); > s->n_connections--; > > + > log_debug_unit(UNIT(s)->id, "%s: One connection closed, %u left.", > UNIT(s)->id, s->n_connections); > + > + if (s->n_connections < s->distribute && s->state == SOCKET_RUNNING) > + socket_enter_listening(s); > } > > static void socket_trigger_notify(Unit *u, Unit *other) { > @@ -2306,7 +2323,8 @@ static void socket_trigger_notify(Unit *u, Unit *other) > { > already down or accepting connections */ > if ((s->state != SOCKET_RUNNING && > s->state != SOCKET_LISTENING) || > - s->accept) > + s->accept || > + s->distribute > 0) > return; > > if (other->load_state != UNIT_LOADED || > diff --git a/src/core/socket.h b/src/core/socket.h > index 076a183..138ac34 100644 > --- a/src/core/socket.h > +++ b/src/core/socket.h > @@ -95,6 +95,8 @@ struct Socket { > LIST_HEAD(SocketPort, ports); > > unsigned n_accepted; > + /* when Accept=true this is the number of active connectoins > + * when Distribute=n this is the number of active workers */ > unsigned n_connections; > unsigned max_connections; > > @@ -147,7 +149,8 @@ struct Socket { > size_t pipe_size; > char *bind_to_device; > char *tcp_congestion; > - bool reuse_port; > + int reuse_port; > + unsigned distribute; > long mq_maxmsg; > long mq_msgsize; > > diff --git a/src/shared/conf-parser.c b/src/shared/conf-parser.c > index 1e3cee5..e9654b3 100644 > --- a/src/shared/conf-parser.c > +++ b/src/shared/conf-parser.c > @@ -498,6 +498,38 @@ int config_parse_bytes_off(const char* unit, > return 0; > } > > +int config_parse_tristate(const char* unit, > + const char *filename, > + unsigned line, > + const char *section, > + unsigned section_line, > + const char *lvalue, > + int ltype, > + const char *rvalue, > + void *data, > + void *userdata) { > + > + int k; > + int *b = data; > + > + assert(filename); > + assert(lvalue); > + assert(rvalue); > + assert(data); > + > + /* Tristates are like booleans, but can also take the 'default' > value, i.e. "-1" */ > + > + k = parse_boolean(rvalue); > + if (k < 0) { > + log_syntax(unit, LOG_ERR, filename, line, -k, > + "Failed to parse boolean value, ignoring: %s", > rvalue); > + return 0; > + } > + > + *b = !!k; > + return 0; > +} > + > int config_parse_bool(const char* unit, > const char *filename, > unsigned line, > diff --git a/src/shared/conf-parser.h b/src/shared/conf-parser.h > index 2d5aa31..f1f9b27 100644 > --- a/src/shared/conf-parser.h > +++ b/src/shared/conf-parser.h > @@ -100,6 +100,7 @@ int config_parse_double(const char *unit, const char > *filename, unsigned line, c > int config_parse_bytes_size(const char *unit, const char *filename, unsigned > line, const char *section, unsigned section_line, const char *lvalue, int > ltype, const char *rvalue, void *data, void *userdata); > int config_parse_bytes_off(const char *unit, const char *filename, unsigned > line, const char *section, unsigned section_line, const char *lvalue, int > ltype, const char *rvalue, void *data, void *userdata); > int config_parse_bool(const char *unit, const char *filename, unsigned line, > const char *section, unsigned section_line, const char *lvalue, int ltype, > const char *rvalue, void *data, void *userdata); > +int config_parse_tristate(const char *unit, const char *filename, unsigned > line, const char *section, unsigned section_line, const char *lvalue, int > ltype, const char *rvalue, void *data, void *userdata); > int config_parse_string(const char *unit, const char *filename, unsigned > line, const char *section, unsigned section_line, const char *lvalue, int > ltype, const char *rvalue, void *data, void *userdata); > int config_parse_path(const char *unit, const char *filename, unsigned line, > const char *section, unsigned section_line, const char *lvalue, int ltype, > const char *rvalue, void *data, void *userdata); > int config_parse_strv(const char *unit, const char *filename, unsigned line, > const char *section, unsigned section_line, const char *lvalue, int ltype, > const char *rvalue, void *data, void *userdata); > -- > 1.8.5.1 > > _______________________________________________ > systemd-devel mailing list > systemd-devel@lists.freedesktop.org > http://lists.freedesktop.org/mailman/listinfo/systemd-devel _______________________________________________ systemd-devel mailing list systemd-devel@lists.freedesktop.org http://lists.freedesktop.org/mailman/listinfo/systemd-devel