On Sat, Nov 16, 2013 at 7:38 AM, Zbigniew Jędrzejewski-Szmek <zbys...@in.waw.pl> wrote: > On Fri, Nov 15, 2013 at 08:22:14PM -0800, Shawn Landden wrote: >> v3: make each worker its own service >> v4: be less intrusive > Hi Shawn, > unfortunately this doesn't apply cleanly. Can you rebase? thats because it is 3rd in a series, I will send the whole series right after this email > >> diff --git a/man/systemd.socket.xml b/man/systemd.socket.xml >> index 7c10c58..92a9275 100644 >> --- a/man/systemd.socket.xml >> +++ b/man/systemd.socket.xml >> @@ -519,6 +519,15 @@ >> </varlistentry> >> >> <varlistentry> >> + <term><varname>Distribute=</varname></term> >> + <listitem><para>Takes an integer >> + value. If greater than one, systemd will >> spawn >> + given number of instances of service each >> + listening to the same socket. This option >> implies >> + <varname>Reuseport=</varname> >> above.</para></listitem> >> + </varlistentry> >> + >> + <varlistentry> >> <term><varname>SmackLabel=</varname></term> >> >> <term><varname>SmackLabelIPIn=</varname></term> >> >> <term><varname>SmackLabelIPOut=</varname></term> >> diff --git a/src/core/dbus-socket.c b/src/core/dbus-socket.c >> index 60a8d05..4644007 100644 >> --- a/src/core/dbus-socket.c >> +++ b/src/core/dbus-socket.c >> @@ -68,6 +68,7 @@ >> " <property name=\"Listen\" type=\"a(ss)\" access=\"read\"/>\n" >> \ >> " <property name=\"Result\" type=\"s\" access=\"read\"/>\n" \ >> " <property name=\"ReusePort\" type=\"b\" access=\"read\"/>\n" \ >> + " <property name=\"Distribute\" type=\"u\" access=\"read\"/>\n" \ >> " <property name=\"SmackLabel\" type=\"s\" access=\"read\"/>\n" \ >> " <property name=\"SmackLabelIPIn\" type=\"s\" >> access=\"read\"/>\n" \ >> " <property name=\"SmackLabelIPOut\" type=\"s\" >> access=\"read\"/>\n" \ >> @@ -196,6 +197,7 @@ static const BusProperty bus_socket_properties[] = { >> { "MessageQueueMessageSize", bus_property_append_long, "x", >> offsetof(Socket, mq_msgsize) }, >> { "Result", bus_socket_append_socket_result, "s", >> offsetof(Socket, result) }, >> { "ReusePort", bus_property_append_bool, "b", >> offsetof(Socket, reuseport) }, >> + { "Distribute", bus_property_append_unsigned, "u", >> offsetof(Socket, distribute) }, >> { "SmackLabel", bus_property_append_string, "s", >> offsetof(Socket, smack), true }, >> { "SmackLabelIPIn", bus_property_append_string, "s", >> offsetof(Socket, smack_ip_in), true }, >> { "SmackLabelIPOut",bus_property_append_string, "s", >> offsetof(Socket, smack_ip_out), true }, >> diff --git a/src/core/load-fragment-gperf.gperf.m4 >> b/src/core/load-fragment-gperf.gperf.m4 >> index b64fdc9..4058a1f 100644 >> --- a/src/core/load-fragment-gperf.gperf.m4 >> +++ b/src/core/load-fragment-gperf.gperf.m4 >> @@ -211,6 +211,7 @@ Socket.PassCredentials, config_parse_bool, >> 0, >> Socket.PassSecurity, config_parse_bool, 0, >> offsetof(Socket, pass_sec) >> Socket.TCPCongestion, config_parse_string, 0, >> offsetof(Socket, tcp_congestion) >> Socket.ReusePort, config_parse_bool, 0, >> offsetof(Socket, reuseport) >> +Socket.Distribute, config_parse_unsigned, 0, >> offsetof(Socket, distribute) >> Socket.MessageQueueMaxMessages, config_parse_long, 0, >> offsetof(Socket, mq_maxmsg) >> Socket.MessageQueueMessageSize, config_parse_long, 0, >> offsetof(Socket, mq_msgsize) >> Socket.Service, config_parse_socket_service, 0, >> 0 >> diff --git a/src/core/service.c b/src/core/service.c >> index 3da32a1..8fc55a0 100644 >> --- a/src/core/service.c >> +++ b/src/core/service.c >> @@ -3663,7 +3663,6 @@ static void service_bus_query_pid_done( >> int service_set_socket_fd(Service *s, int fd, Socket *sock) { >> >> assert(s); >> - assert(fd >= 0); >> >> /* This is called by the socket code when instantiating a new >> * service for a stream socket and the socket needs to be >> @@ -3678,8 +3677,10 @@ int service_set_socket_fd(Service *s, int fd, Socket >> *sock) { >> if (s->state != SERVICE_DEAD) >> return -EAGAIN; >> >> - s->socket_fd = fd; >> - s->got_socket_fd = true; >> + if (fd >= 0) { >> + s->socket_fd = fd; >> + s->got_socket_fd = true; >> + } >> >> unit_ref_set(&s->accept_socket, UNIT(sock)); >> >> diff --git a/src/core/service.h b/src/core/service.h >> index 37fa6ff..2ffe7d1 100644 >> --- a/src/core/service.h >> +++ b/src/core/service.h >> @@ -26,7 +26,6 @@ typedef struct Service Service; >> #include "unit.h" >> #include "path.h" >> #include "ratelimit.h" >> -#include "service.h" >> #include "kill.h" >> #include "exit-status.h" >> >> diff --git a/src/core/socket.c b/src/core/socket.c >> index 751f20b..11b649b 100644 >> --- a/src/core/socket.c >> +++ b/src/core/socket.c >> @@ -153,34 +153,30 @@ static void socket_done(Unit *u) { >> } >> >> static int socket_instantiate_service(Socket *s) { >> - char *prefix, *name; >> + _cleanup_free_ char *prefix = NULL, *name = NULL; >> int r; >> Unit *u; >> >> assert(s); >> >> /* This fills in s->service if it isn't filled in yet. For >> - * Accept=yes sockets we create the next connection service >> - * here. For Accept=no this is mostly a NOP since the service >> + * Accept=yes and Distribute=n sockets we create the next connection >> + * service here. Otherwise is mostly a NOP since the service >> * is figured out at load time anyway. */ >> >> - if (UNIT_DEREF(s->service)) >> + if (UNIT_DEREF(s->service) && !(s->distribute)) >> return 0; >> >> - assert(s->accept); >> + assert(s->accept || s->distribute); >> >> if (!(prefix = unit_name_to_prefix(UNIT(s)->id))) >> return -ENOMEM; >> >> r = asprintf(&name, "%s@%u.service", prefix, s->n_accepted); > Here we could use something like "%.*u", MAX(s->distribute-1), s->n_accepted > to have nicely sorted instances... (E.g. systemctl sorts by type ane name). > I'm not sure if that's better or not. I don't think this is a good idea. It needlessly special-cases s->distribute, and as exited workers are respawned the number can go above s->distribute, so it isn't really right. This issues is purely cosmetic, as all the workers are the same, and even then is really only a systemctl problem, not a pid 1 problem, and should be fixed as such, with more fanciful ordering. > >> - free(prefix); >> - >> if (r < 0) >> return -ENOMEM; >> >> r = manager_load_unit(UNIT(s)->manager, name, NULL, NULL, &u); >> - free(name); >> - >> if (r < 0) >> return r; >> >> @@ -513,6 +509,11 @@ static void socket_dump(Unit *u, FILE *f, const char >> *prefix) { >> "%sReusePort: %s\n", >> prefix, yes_no(s->reuseport)); >> >> + if (s->distribute) >> + fprintf(f, >> + "%sDistribute: %d\n", >> + prefix, s->distribute); >> + >> if (s->smack) >> fprintf(f, >> "%sSmackLabel: %s\n", >> @@ -1454,7 +1455,7 @@ static void socket_enter_running(Socket *s, int cfd) { >> return; >> } >> >> - if (cfd < 0) { >> + if (cfd < 0 && !(s->distribute)) { >> Iterator i; >> Unit *u; >> bool pending = false; >> @@ -1486,56 +1487,66 @@ static void socket_enter_running(Socket *s, int cfd) >> { >> return; >> } >> >> - r = socket_instantiate_service(s); >> - if (r < 0) >> - goto fail; >> - >> - r = instance_from_socket(cfd, s->n_accepted, &instance); >> - if (r < 0) { >> - if (r != -ENOTCONN) >> - goto fail; >> - >> - /* ENOTCONN is legitimate if TCP RST was received. >> - * This connection is over, but the socket unit >> lives on. */ >> - close_nointr_nofail(cfd); >> - return; >> - } >> - >> prefix = unit_name_to_prefix(UNIT(s)->id); >> if (!prefix) { >> r = -ENOMEM; >> goto fail; >> } >> >> - name = unit_name_build(prefix, instance, ".service"); >> + do { >> + r = socket_instantiate_service(s); >> + if (r < 0) >> + goto fail; >> >> - if (!name) { >> - r = -ENOMEM; >> - goto fail; >> - } >> + if (!(s->distribute)) { > What does Distribute=1 mean? Is it treated as a special case of Distribute=n, > and just one service@1.service is started? Or is treated as equivalent to > Distribute=0, and service.service is started? I kind of like the first > version, > but then the in the manpage it should be clarified a bit that Distribute=0 is > the default, and different from Distribute=1. the latter, clarified in man page. > >> + r = instance_from_socket(cfd, >> s->n_accepted, &instance); >> + if (r < 0) { >> + if (r != -ENOTCONN) >> + goto fail; >> >> - r = unit_add_name(UNIT_DEREF(s->service), name); >> - if (r < 0) >> - goto fail; >> + /* ENOTCONN is legitimate if TCP >> RST was received. >> + * This connection is over, but the >> socket unit lives on. */ >> + close_nointr_nofail(cfd); >> + return; >> + } >> >> - service = SERVICE(UNIT_DEREF(s->service)); >> - unit_ref_unset(&s->service); >> - s->n_accepted ++; >> + name = unit_name_build(prefix, instance, >> ".service"); >> + if (!name) { >> + r = -ENOMEM; >> + goto fail; >> + } >> >> - UNIT(service)->no_gc = false; >> + r = unit_add_name(UNIT_DEREF(s->service), >> name); >> + if (r < 0) >> + goto fail; >> + } >> >> - unit_choose_id(UNIT(service), name); >> + service = SERVICE(UNIT_DEREF(s->service)); >> + unit_ref_unset(&s->service); >> + s->n_accepted ++; >> >> - r = service_set_socket_fd(service, cfd, s); >> - if (r < 0) >> - goto fail; >> + UNIT(service)->no_gc = false; >> >> - cfd = -1; >> - s->n_connections ++; >> + unit_choose_id(UNIT(service), name); >> >> - r = manager_add_job(UNIT(s)->manager, JOB_START, >> UNIT(service), JOB_REPLACE, true, &error, NULL); >> - if (r < 0) >> - goto fail; >> + r = service_set_socket_fd(service, cfd, s); >> + if (r < 0) >> + goto fail; >> + >> + cfd = -1; >> + s->n_connections ++; >> + >> + r = manager_add_job(UNIT(s)->manager, JOB_START, >> UNIT(service), JOB_REPLACE, true, &error, NULL); >> + if (r < 0) >> + goto fail; >> + >> + if(s->distribute > s->n_connections) { >> + /* distribute implies reuseport */ >> + s->reuseport = true; >> + >> + socket_enter_listening(s); >> + } >> + } while(s->distribute > s->n_connections); >> >> /* Notify clients about changed counters */ >> unit_add_to_dbus_queue(UNIT(s)); >> @@ -2263,14 +2274,21 @@ void socket_connection_unref(Socket *s) { >> >> /* The service is dead. Yay! >> * >> - * This is strictly for one-instance-per-connection >> - * services. */ >> + * This is for one-instance-per-connection >> + * and Distribute= services */ >> >> assert(s->n_connections > 0); >> s->n_connections--; >> >> log_debug_unit(UNIT(s)->id, >> "%s: One connection closed, %u left.", UNIT(s)->id, >> s->n_connections); >> + >> + if(s->distribute > s->n_connections && s->state == SOCKET_RUNNING){ > Could this be 's->n_connections < s->distribute'? It just feels backwards. changed everywhere > >> + s->reuseport = true; > Could this have changed? It seems it should be set in just one place. moved to socket_apply_socket_options() > >> + /* (re)enter systemd into SO_REUSEPORT pool, when it gets a >> + * connection it will reestablish distribute target */ >> + socket_enter_listening(s); >> + } >> } >> >> static void socket_reset_failed(Unit *u) { >> diff --git a/src/core/socket.h b/src/core/socket.h >> index 3d7eadc..5928356 100644 >> --- a/src/core/socket.h >> +++ b/src/core/socket.h >> @@ -93,6 +93,8 @@ struct Socket { >> LIST_HEAD(SocketPort, ports); >> >> unsigned n_accepted; >> + /* when Accept=true this is the number of active connectoins >> + * when Distribute=n this is the number of active workers */ >> unsigned n_connections; > Could this become n_instances then? I did this, but it was too ugly as NConnections is an exported API. > >> unsigned max_connections; >> >> @@ -145,6 +147,8 @@ struct Socket { >> char *bind_to_device; >> char *tcp_congestion; >> bool reuseport; >> + /* implies reuseport */ >> + unsigned distribute; >> long mq_maxmsg; >> long mq_msgsize; >> > > Zbyszek > _______________________________________________ > systemd-devel mailing list > systemd-devel@lists.freedesktop.org > http://lists.freedesktop.org/mailman/listinfo/systemd-devel
#include <stdio.h> #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <stdlib.h> #include <string.h> #include <strings.h> #include <systemd/sd-daemon.h>
int main( int argc, char *argv[] ) { int sockfd, newsockfd, portno, clilen; char buffer[256]; struct sockaddr_in serv_addr, cli_addr; int n; if (sd_listen_fds(0) != 1) { fprintf(stderr, "No or too many file descriptors received.\n"); exit(1); } sockfd = SD_LISTEN_FDS_START + 0; listen(sockfd,5); clilen = sizeof(cli_addr); while(1) { /* Accept actual connection from the client */ newsockfd = accept(sockfd, (struct sockaddr *)&cli_addr, &clilen); if (newsockfd < 0) { perror("ERROR on accept"); exit(1); } /* If connection is established then start communicating */ bzero(buffer,256); n = read( newsockfd,buffer,255 ); if (n < 0) { perror("ERROR reading from socket"); exit(1); } buffer[strchr(buffer, '\n') - buffer] = '\0'; printf("PID %d got: %s\n", getpid(), buffer); char resp[1024]; snprintf(resp, 1024, "PID %d got your message\n", getpid()); n = write(newsockfd,resp,strlen(resp)); if (n < 0) { perror("ERROR writing to socket"); exit(1); } close(newsockfd); } return 0; }
reuseport.socket
Description: Binary data
reuseport@.service
Description: Binary data
_______________________________________________ systemd-devel mailing list systemd-devel@lists.freedesktop.org http://lists.freedesktop.org/mailman/listinfo/systemd-devel