2011/12/2 Chen Jie <ch...@lemote.com>:
> Hey,
> After did many experiments, I found it was one iteration of
> manager_loop spent ~25s:
> * dbus.service: service_enter_start_post()
>  -> service_enter_running()
>  --> service_set_state(s, SERVICE_RUNNING)
>  ---> unit_notify()
>  ----> bus_init() -> bus_init_api()
> bus_ini_api()  failed to add_match for 'NameOwnerChanged' (reason:
> timeout, and the default timeout value in dbus lib was 25s)
>
> In another hand, dbus-daemon failed to reply in time because it tried
> to log a message, meanwhile rsyslog.service was starting:
> 1. In ExecStartPre stage(/bin/systemctl stop
> systemd-kmsg-syslogd.service), systemctl failed to receive replies in
> time because the manager_loop of systemd was blocked!   or
> 2. After ExecStartPre stage, manager_loop was blocked for 25s, and
> then do ExecStart.
>
> Then dbus-daemon prompted "Failed to activate service
> 'org.freedesktop.systemd1': timed out", and then entered a busy-loop
> in pending_activation_failed()
> http://cgit.freedesktop.org/dbus/dbus/tree/bus/activation.c?id=7dbfa45153167ee53487630f8109d32860cb6263#n1280
> (An _dbus_return_val_if_fail(reply_serial != 0, FALSE)  failure was
> observed in gdb, still don't how this happened)

Let me explain it further.
First syslogv() in dbus may block, while rsyslog.service is starting
and meanwhile the kernel socket buffer was full.
Attachment syslog-test.c was a program simulates the situation.

So while rsyslog.service is starting, on the systemd side:
1. [ExecStartPre] stops systemd-kmsg-syslogd.service
2. It now running bus_init() to publish its API to dbus
3. Step 2 timeout, because dbus-daemon didn't reply in time.

Why dbus-daemon didn't response quickly? Because it blocked on
syslogv(), which was waiting for someone consumes the message(the
kernel socket buffer was full), but sadly the consumer -- rsyslog
didn't be started because systemd blocked.

So systemd waits for dbus, and dbus waits for startup of rsyslog,
rsyslog waits for systemd to start it.

IMHO, We need to put functions that may block in separate threads, for
example bus_init(), shutdown_connection(), log_meta().

Attachment 0001-Add-new-API-manager_add_work_thread.patch and
0002-dbus.c-hooks-to-the-manager_add_work_thread-API.patch are two
patches that make bus_init run in thread and hence not block the
manager_loop.



Regards,
-- Chen Jie
#include <stdio.h>
#include <unistd.h>
#include <syslog.h>
#include <sys/time.h>
#include <stdint.h>

int main(void)
{
	int c, i;
	struct timeval tv1, tv2;

	openlog("slog", LOG_PID | LOG_PERROR, LOG_DAEMON);

	printf("Now edit /lib/systemd/system/rsyslog.service.\n"
	       "Add a line \"ExecStartPre=/bin/sleep 30\" just after the original ExecStartPre\n"
	       "Run \"systemctl daemon-reload && systemctl restart rsyslog.service\"\n"
	       "Then press enter to continue? See how many time it may spend on sending a log\n");
	c = getc(stdin);

	printf("Starting the test...\n");
	for (i = 0; i < 20; i++) {
		gettimeofday(&tv1, NULL);
		syslog(LOG_INFO, "iteration %d", i);
		gettimeofday(&tv2, NULL);

		if (tv2.tv_sec - tv1.tv_sec > 2) {
			printf("In %d iteration: syslog() spend more than 2 secs (%lu,%lu) -> (%lu,%lu)\n",
			       i, tv1.tv_sec, tv1.tv_usec, tv2.tv_sec, tv2.tv_usec);
			break;
		}
	}
	printf("Test finished.\n");

	closelog();

	return 0;
}
From 78a56b263844b1db7fb29a7f6c0c4a4b0d07ad80 Mon Sep 17 00:00:00 2001
From: cee1 <fykc...@gmail.com>
Date: Wed, 14 Dec 2011 14:25:39 +0800
Subject: [PATCH 1/2] Add new API: manager_add_work_thread()

The work thread was used to finish work in a seperate thread.
It can be specified a notify function, which will be called in manager_loop,
When process watch of type WATCH_WORK_THREAD.
---
 src/manager.c |  197 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++-
 src/manager.h |   22 ++++++-
 2 files changed, 217 insertions(+), 2 deletions(-)

diff --git a/src/manager.c b/src/manager.c
index 111167a..387e130 100644
--- a/src/manager.c
+++ b/src/manager.c
@@ -36,6 +36,7 @@
 #include <sys/types.h>
 #include <sys/stat.h>
 #include <dirent.h>
+#include <pthread.h>
 
 #ifdef HAVE_AUDIT
 #include <libaudit.h>
@@ -71,6 +72,8 @@
 #define NOTIFY_SOCKET_SYSTEM "/run/systemd/notify"
 #define NOTIFY_SOCKET_USER "@/org/freedesktop/systemd1/notify"
 
+static void wait_all_work_threads_done(Manager *m);
+
 static int manager_setup_notify(Manager *m) {
         union {
                 struct sockaddr sa;
@@ -217,6 +220,27 @@ static int manager_setup_signals(Manager *m) {
         return 0;
 }
 
+static int manager_setup_work_threads(Manager *m) {
+        int p[2] = { -1, -1 };
+        struct epoll_event ev;
+
+        if (pipe2(p, O_NONBLOCK | O_CLOEXEC) < 0)
+                return -errno;
+
+        m->work_thread_watch.type = WATCH_WORK_THREAD;
+        m->work_thread_watch.fd = p[0];
+        m->work_thread_notify_fd = p[1];
+
+        zero(ev);
+        ev.events = EPOLLIN;
+        ev.data.ptr = &m->work_thread_watch;
+
+        if (epoll_ctl(m->epoll_fd, EPOLL_CTL_ADD, m->work_thread_watch.fd, &ev) < 0)
+                return -errno;
+
+        return 0;
+}
+
 int manager_new(ManagerRunningAs running_as, Manager **_m) {
         Manager *m;
         int r = -ENOMEM;
@@ -239,7 +263,8 @@ int manager_new(ManagerRunningAs running_as, Manager **_m) {
         m->audit_fd = -1;
 #endif
 
-        m->signal_watch.fd = m->mount_watch.fd = m->udev_watch.fd = m->epoll_fd = m->dev_autofs_fd = m->swap_watch.fd = -1;
+        m->signal_watch.fd = m->mount_watch.fd = m->udev_watch.fd = m->epoll_fd = m->dev_autofs_fd = m->swap_watch.fd = m->work_thread_watch.fd = -1;
+        m->work_thread_notify_fd = -1;
         m->current_job_id = 1; /* start as id #1, so that we can leave #0 around as "null-like" value */
 
         if (!(m->environment = strv_copy(environ)))
@@ -260,6 +285,9 @@ int manager_new(ManagerRunningAs running_as, Manager **_m) {
         if (!(m->watch_pids = hashmap_new(trivial_hash_func, trivial_compare_func)))
                 goto fail;
 
+        if (!(m->watch_work_threads = hashmap_new(trivial_hash_func, trivial_compare_func)))
+                goto fail;
+
         if (!(m->cgroup_bondings = hashmap_new(string_hash_func, string_compare_func)))
                 goto fail;
 
@@ -281,6 +309,9 @@ int manager_new(ManagerRunningAs running_as, Manager **_m) {
         if ((r = manager_setup_notify(m)) < 0)
                 goto fail;
 
+        if ((r = manager_setup_work_threads(m)) < 0)
+                goto fail;
+
         /* Try to connect to the busses, if possible. */
         if ((r = bus_init(m, running_as != MANAGER_SYSTEM)) < 0)
                 goto fail;
@@ -468,10 +499,19 @@ void manager_free(Manager *m) {
 
         bus_done(m);
 
+	/* Close work thread notify fds and then invoke wait_all_work_threads_done()
+         * to prevent possible work threads blocking if the pipe2() buffer is full */
+        if (m->work_thread_watch.fd >= 0)
+                close_nointr_nofail(m->work_thread_watch.fd);
+        if (m->work_thread_notify_fd >= 0)
+                close_nointr_nofail(m->work_thread_notify_fd);
+        wait_all_work_threads_done(m);
+
         hashmap_free(m->units);
         hashmap_free(m->jobs);
         hashmap_free(m->transaction_jobs);
         hashmap_free(m->watch_pids);
+        hashmap_free(m->watch_work_threads);
         hashmap_free(m->watch_bus);
 
         if (m->epoll_fd >= 0)
@@ -2304,6 +2344,155 @@ static int manager_process_signal_fd(Manager *m) {
         return 0;
 }
 
+struct WorkThreadClosure {
+        Manager *manager;
+        WorkThreadFunc work;
+        WorkThreadNotify notify;
+        ArgArray args;
+        ArgArray return_values;
+        int return_code;
+        pthread_mutex_t start_wait;
+        bool start;
+};
+
+static void *work_thread_wrap(void *data) {
+        int r;
+        struct WorkThreadClosure *wtc = (struct WorkThreadClosure *)data;
+        pthread_t t = pthread_self();
+
+        assert(wtc->work);
+
+        pthread_mutex_lock(&wtc->start_wait);
+        if (!wtc->start) {
+                pthread_mutex_unlock(&wtc->start_wait);
+                return NULL;
+        }
+        pthread_mutex_unlock(&wtc->start_wait);
+
+        r = wtc->work(&wtc->args, &wtc->return_values);
+        wtc->return_code = r;
+        loop_write(wtc->manager->work_thread_notify_fd, &t, sizeof(pthread_t), true);
+
+        return NULL;
+}
+
+int manager_add_work_thread(Manager *m,
+                            WorkThreadFunc work, ArgArray *args,
+                            WorkThreadNotify notify) {
+        int r;
+        struct WorkThreadClosure *wtc = NULL;
+        pthread_t t;
+
+        assert(m);
+
+        if (!(wtc = new0(struct WorkThreadClosure, 1)))
+                return -ENOMEM;
+
+        pthread_mutex_init(&wtc->start_wait, NULL);
+        wtc->args = *args;
+        wtc->manager = m;
+        wtc->work = work;
+        wtc->notify = notify;
+
+        pthread_mutex_lock(&wtc->start_wait);
+
+        if ((r = pthread_create(&t, NULL, work_thread_wrap, wtc)) != 0) {
+                pthread_mutex_unlock(&wtc->start_wait);
+                pthread_mutex_destroy(&wtc->start_wait);
+                free(wtc);
+                return -r;
+        }
+
+        if ((r = hashmap_put(m->watch_work_threads, (void *)t, (void *)wtc)) < 0) {
+                pthread_mutex_unlock(&wtc->start_wait);
+                pthread_join(t, NULL);
+
+                pthread_mutex_destroy(&wtc->start_wait);
+                free(wtc);
+
+                return r;
+        }
+
+        wtc->start = true;
+        pthread_mutex_unlock(&wtc->start_wait);
+
+        return 0;
+}
+
+static int work_thread_done_notify(pthread_t t, struct WorkThreadClosure **wtc) {
+        int r;
+        struct WorkThreadClosure *c = *wtc;
+
+        if ((r = pthread_join(t, NULL))) {
+                return -r;
+        }
+
+        if (c->notify)
+                c->notify(c->manager, &c->args, c->return_code, &c->return_values);
+
+        pthread_mutex_destroy(&c->start_wait);
+        free(c);
+        *wtc = NULL;
+
+        return 0;
+}
+
+static int work_thread_done_event(Manager *m, Watch *w, int events) {
+        int r = 0;
+        ssize_t n;
+        pthread_t t;
+        struct WorkThreadClosure *wtc = NULL;
+
+        assert(m);
+
+        for (;;) {
+                n = loop_read(m->work_thread_watch.fd, &t, sizeof(t), false);
+                if (n != sizeof(t)) {
+                        if (n >= 0) {
+                                r = -EIO;
+                                break;
+                        }
+
+                        if (n == -EAGAIN)
+                                break;
+
+                        r = n;
+                        break;
+                }
+
+                wtc = (struct WorkThreadClosure *) hashmap_remove(m->watch_work_threads, ULONG_TO_PTR(t));
+                if (!wtc) {
+                        r = -ENOKEY;
+                        break;
+                }
+
+                work_thread_done_notify(t, &wtc);
+        }
+
+        return r;
+}
+
+static void wait_all_work_threads_done(Manager *m) {
+        dual_timestamp ts1, ts2;
+        char timestamp[FORMAT_TIMESTAMP_MAX], timespan[FORMAT_TIMESPAN_MAX];
+
+        pthread_t t;
+        struct WorkThreadClosure *wtc = NULL;
+        Iterator i;
+
+        dual_timestamp_get(&ts1);
+        log_debug("%s: Begin to wait for all work threads done...",
+                  format_timestamp(timestamp, sizeof(timestamp), ts1.realtime));
+
+        HASHMAP_FOREACH_KEY(wtc, t, m->watch_work_threads, i)
+                work_thread_done_notify(t, &wtc);
+
+        dual_timestamp_get(&ts2);
+        log_debug("%s: All work threads done. Total spend %s.",
+                  format_timestamp(timestamp, sizeof(timestamp), ts2.realtime),
+                  format_timespan(timespan, sizeof(timespan), ts2.monotonic - ts1.monotonic));
+}
+
 static int process_event(Manager *m, struct epoll_event *ev) {
         int r;
         Watch *w;
@@ -2329,6 +2518,12 @@ static int process_event(Manager *m, struct epoll_event *ev) {
 
                 break;
 
+        case WATCH_WORK_THREAD:
+                if ((r = work_thread_done_event(m, w, ev->events)) < 0)
+                        return r;
+
+                break;
+
         case WATCH_NOTIFY:
 
                 /* An incoming daemon notification event? */
diff --git a/src/manager.h b/src/manager.h
index 5deb569..956e49a 100644
--- a/src/manager.h
+++ b/src/manager.h
@@ -35,6 +35,7 @@
 typedef struct Manager Manager;
 typedef enum WatchType WatchType;
 typedef struct Watch Watch;
+typedef struct ArgArray ArgArray;
 
 typedef enum ManagerExitCode {
         MANAGER_RUNNING,
@@ -67,7 +68,8 @@ enum WatchType {
         WATCH_SWAP,
         WATCH_UDEV,
         WATCH_DBUS_WATCH,
-        WATCH_DBUS_TIMEOUT
+        WATCH_DBUS_TIMEOUT,
+        WATCH_WORK_THREAD
 };
 
 struct Watch {
@@ -128,11 +130,14 @@ struct Manager {
         JobDependency *transaction_anchor;
 
         Hashmap *watch_pids;  /* pid => Unit object n:1 */
+        Hashmap *watch_work_threads; /* pthread_t => WorkThreadClosure */
 
         char *notify_socket;
 
         Watch notify_watch;
         Watch signal_watch;
+        Watch work_thread_watch;
+        int work_thread_notify_fd;
 
         int epoll_fd;
 
@@ -230,6 +235,18 @@ struct Manager {
         unsigned n_failed_jobs;
 };
 
+#define ARG_ARRAY_MAX_ARGS 4
+struct ArgArray {
+        int n_args;
+        void *args[ARG_ARRAY_MAX_ARGS];
+};
+
+typedef int (*WorkThreadFunc) (ArgArray *args, ArgArray *return_values);
+typedef void (*WorkThreadNotify) (Manager *m,
+                                 ArgArray *work_thread_args,
+                                 int return_code,
+                                 ArgArray *work_thread_return_values);
+
 int manager_new(ManagerRunningAs running_as, Manager **m);
 void manager_free(Manager *m);
 
@@ -295,5 +312,8 @@ bool manager_get_show_status(Manager *m);
 
 const char *manager_running_as_to_string(ManagerRunningAs i);
 ManagerRunningAs manager_running_as_from_string(const char *s);
+int manager_add_work_thread(Manager *m,
+                            WorkThreadFunc work, ArgArray *args,
+                            WorkThreadNotify notify);
 
 #endif
-- 
1.7.7.3

From 9ae47319528eea1f92865706e2ff0f656d7a8580 Mon Sep 17 00:00:00 2001
From: cee1 <fykc...@gmail.com>
Date: Wed, 14 Dec 2011 16:17:01 +0800
Subject: [PATCH 2/2] dbus.c: hooks to the manager_add_work_thread API

Systemd will publish its API to dbus if dbus.service starts up
or received SIGUSR1. This operation may block manager_loop if
dbus daemon blocks. So hooks to manager_add_work_thread and make these
operations run in another thread.

Note, currently only apply to bus_init_*. shutdown_connection() may block
so should also run in seperate thread.

Chages summery:
* bus_init_system() and bus_init_session() removed. The related init logic
   now is: bus_init_public_async() -> bus_init_public_initiate_thread()
   <In thread> do system_bus_get() and/or session_bus_get()
   <Back from thread> system_bus_start() and/or session_bus_start()
* shutdown_connection is now spilt into stop_connection and drop_connection
  The latter is expected to be run in thread.
---
 src/dbus.c |  436 +++++++++++++++++++++++++++++++++++++++++++++---------------
 1 files changed, 327 insertions(+), 109 deletions(-)

diff --git a/src/dbus.c b/src/dbus.c
index daa2c84..f1c38d7 100644
--- a/src/dbus.c
+++ b/src/dbus.c
@@ -74,6 +74,8 @@ static void bus_done_api(Manager *m);
 static void bus_done_system(Manager *m);
 static void bus_done_private(Manager *m);
 static void shutdown_connection(Manager *m, DBusConnection *c);
+static void stop_connection(Manager *m, DBusConnection *c);
+static void drop_connection(DBusConnection *c);
 
 static void bus_dispatch_status(DBusConnection *bus, DBusDispatchStatus status, void *data)  {
         Manager *m = data;
@@ -570,7 +572,7 @@ static void request_name_pending_cb(DBusPendingCall *pending, void *userdata) {
         dbus_error_free(&error);
 }
 
-static int request_name(Manager *m) {
+static int request_name(DBusConnection *api_bus, Manager *m) {
         const char *name = "org.freedesktop.systemd1";
         /* Allow replacing of our name, to ease implementation of
          * reexecution, where we keep the old connection open until
@@ -595,7 +597,7 @@ static int request_name(Manager *m) {
                             DBUS_TYPE_INVALID))
                 goto oom;
 
-        if (!dbus_connection_send_with_reply(m->api_bus, message, &pending, -1))
+        if (!dbus_connection_send_with_reply(api_bus, message, &pending, -1))
                 goto oom;
 
         if (!dbus_pending_call_set_notify(pending, request_name_pending_cb, m, NULL))
@@ -671,7 +673,7 @@ static void query_name_list_pending_cb(DBusPendingCall *pending, void *userdata)
         dbus_error_free(&error);
 }
 
-static int query_name_list(Manager *m) {
+static int query_name_list(DBusConnection *api_bus, Manager *m) {
         DBusMessage *message = NULL;
         DBusPendingCall *pending = NULL;
 
@@ -684,7 +686,7 @@ static int query_name_list(Manager *m) {
                               "ListNames")))
                 goto oom;
 
-        if (!dbus_connection_send_with_reply(m->api_bus, message, &pending, -1))
+        if (!dbus_connection_send_with_reply(api_bus, message, &pending, -1))
                 goto oom;
 
         if (!dbus_pending_call_set_notify(pending, query_name_list_pending_cb, m, NULL))
@@ -767,150 +769,360 @@ static void bus_new_connection(
         dbus_connection_ref(new_connection);
 }
 
-static int bus_init_system(Manager *m) {
-        DBusError error;
+static int bus_register_match_for_api(DBusConnection *api_bus, DBusError *error) {
+        /* Get NameOwnerChange messages */
+        dbus_bus_add_match(api_bus,
+                           "type='signal',"
+                           "sender='"DBUS_SERVICE_DBUS"',"
+                           "interface='"DBUS_INTERFACE_DBUS"',"
+                           "member='NameOwnerChanged',"
+                           "path='"DBUS_PATH_DBUS"'",
+                           error);
+
+        if (dbus_error_is_set(error))
+                return -EIO;
+
+        /* Get activation requests */
+        dbus_bus_add_match(api_bus,
+                           "type='signal',"
+                           "sender='"DBUS_SERVICE_DBUS"',"
+                           "interface='org.freedesktop.systemd1.Activator',"
+                           "member='ActivationRequest',"
+                           "path='"DBUS_PATH_DBUS"'",
+                           error);
+
+        if (dbus_error_is_set(error)) {
+                return -EIO;
+        }
+
+        return 0;
+}
+
+static int bus_publish_api(DBusConnection *api_bus, Manager *m) {
         int r;
 
-        assert(m);
+        if (!dbus_connection_register_object_path(api_bus, "/org/freedesktop/systemd1",
+                                                  &bus_manager_vtable, m) ||
+            !dbus_connection_register_fallback(api_bus, "/org/freedesktop/systemd1/unit",
+                                                  &bus_unit_vtable, m) ||
+            !dbus_connection_register_fallback(api_bus, "/org/freedesktop/systemd1/job",
+                                                  &bus_job_vtable, m) ||
+            !dbus_connection_add_filter(api_bus, api_bus_message_filter, m, NULL)) {
+                return -ENOMEM;
+        }
 
-        dbus_error_init(&error);
+        if ((r = request_name(api_bus, m)) < 0)
+                return r;
 
-        if (m->system_bus)
-                return 0;
+        if ((r = query_name_list(api_bus, m)) < 0)
+                return r;
 
-        if (m->running_as == MANAGER_SYSTEM && m->api_bus)
-                m->system_bus = m->api_bus;
-        else {
-                if (!(m->system_bus = dbus_bus_get_private(DBUS_BUS_SYSTEM, &error))) {
-                        log_debug("Failed to get system D-Bus connection, retrying later: %s", bus_error_message(&error));
-                        r = 0;
-                        goto fail;
-                }
+        return 0;
+}
 
-                if ((r = bus_setup_loop(m, m->system_bus)) < 0)
-                        goto fail;
+static DBusConnection *session_bus_get(ManagerRunningAs running_as,
+                                       DBusError *error,
+                                       const char **detail) {
+        const char *op = NULL;
+        DBusConnection *bus;
+
+        assert(running_as != MANAGER_SYSTEM);
+        if (!(bus = dbus_bus_get_private(DBUS_BUS_SESSION, error))) {
+                op = "get session D-Bus connection";
+                goto out;
         }
 
-        if (!dbus_connection_add_filter(m->system_bus, system_bus_message_filter, m, NULL)) {
-                log_error("Not enough memory");
-                r = -ENOMEM;
-                goto fail;
+
+        if (bus_register_match_for_api(bus, error)) {
+                op = "register match";
+                goto out;
         }
 
-        if (m->running_as != MANAGER_SYSTEM) {
-                dbus_bus_add_match(m->system_bus,
+out:
+        if (dbus_error_is_set(error)) {
+                if (bus) {
+                        drop_connection(bus);
+                        bus = NULL;
+                }
+        }
+
+        if (detail)
+                *detail = op;
+
+        return bus;
+}
+
+static DBusConnection *system_bus_get(ManagerRunningAs running_as,
+                                      DBusError *error,
+                                      const char **detail) {
+        const char *op = NULL;
+        DBusConnection *bus;
+
+        if (!(bus = dbus_bus_get_private(DBUS_BUS_SYSTEM, error))) {
+                op = "get system D-Bus connection";
+                goto out;
+        }
+
+        if (running_as == MANAGER_SYSTEM) {
+                if (bus_register_match_for_api(bus, error)) {
+                        op = "register match";
+                        goto out;
+                }
+        } else {
+                dbus_bus_add_match(bus,
                                    "type='signal',"
                                    "interface='org.freedesktop.systemd1.Agent',"
                                    "member='Released',"
                                    "path='/org/freedesktop/systemd1/agent'",
-                                   &error);
+                                   error);
 
-                if (dbus_error_is_set(&error)) {
-                        log_error("Failed to register match: %s", bus_error_message(&error));
-                        r = -EIO;
-                        goto fail;
+                if (dbus_error_is_set(error)) {
+                        op = "register match";
+                        goto out;
                 }
         }
 
-        if (m->api_bus != m->system_bus) {
-                char *id;
-                log_debug("Successfully connected to system D-Bus bus %s as %s",
-                         strnull((id = dbus_connection_get_server_id(m->system_bus))),
-                         strnull(dbus_bus_get_unique_name(m->system_bus)));
-                dbus_free(id);
+out:
+        if (dbus_error_is_set(error)) {
+                if (bus) {
+                        drop_connection(bus);
+                        bus = NULL;
+                }
         }
 
-        return 0;
+        if (detail)
+                *detail = op;
+
+        return bus;
+}
+
+static int system_bus_start(DBusConnection *system_bus, Manager *m) {
+        int r = 0;
+
+        if ((r = (bus_setup_loop(m, system_bus))) < 0) {
+                goto fail;
+        }
+
+        if (!dbus_connection_add_filter(system_bus, system_bus_message_filter, m, NULL)) {
+                r = -ENOMEM;
+                goto fail;
+        }
+
+        if (m->running_as == MANAGER_SYSTEM) {
+                if ((r = bus_publish_api(system_bus, m)) < 0)
+                        goto fail;
+
+                m->api_bus = m->system_bus = system_bus;
+        } else {
+                m->system_bus = system_bus;
+        }
+
+        return r;
 
 fail:
-        bus_done_system(m);
-        dbus_error_free(&error);
+        drop_connection(system_bus);
+        return r;
+}
+
+static int session_bus_start(DBusConnection *session_bus, Manager *m) {
+        int r = 0;
+
+        assert(m->running_as != MANAGER_SYSTEM);
+
+        if ((r = (bus_setup_loop(m, session_bus))) < 0) {
+                goto fail;
+        }
+
+        if ((r = bus_publish_api(session_bus, m)) < 0)
+                goto fail;
+
+        m->api_bus = session_bus;
 
         return r;
+
+fail:
+        drop_connection(session_bus);
+        return r;
 }
 
-static int bus_init_api(Manager *m) {
-        DBusError error;
-        int r;
+#define BUS_SETUP_SYSTEM 0x1
+#define BUS_SETUP_SESSION 0x2
+static int bus_public_setup_work_thread(ArgArray *args, ArgArray *return_values) {
+        ManagerRunningAs running_as;
+        int cmd;
+        DBusError *errors;
+        const char **details;
+        DBusConnection *system_bus, *session_bus;
 
-        assert(m);
+        assert(args->n_args <= ARG_ARRAY_MAX_ARGS && args->n_args >= 4);
 
-        dbus_error_init(&error);
+        running_as = PTR_TO_INT(args->args[0]);
+        cmd = PTR_TO_INT(args->args[1]);
+        errors = (DBusError *)args->args[2];
+        details = (const char **)args->args[3];
 
-        if (m->api_bus)
-                return 0;
+        system_bus = session_bus = NULL;
 
-        if (m->running_as == MANAGER_SYSTEM && m->system_bus)
-                m->api_bus = m->system_bus;
-        else {
-                if (!(m->api_bus = dbus_bus_get_private(m->running_as == MANAGER_USER ? DBUS_BUS_SESSION : DBUS_BUS_SYSTEM, &error))) {
-                        log_debug("Failed to get API D-Bus connection, retrying later: %s", bus_error_message(&error));
-                        r = 0;
-                        goto fail;
+        dbus_error_init(&errors[0]);
+        dbus_error_init(&errors[1]);
+
+        if (cmd & BUS_SETUP_SYSTEM)
+                system_bus = system_bus_get(running_as, &errors[0], &details[0]);
+
+        if (cmd & BUS_SETUP_SESSION)
+                if (!(session_bus = session_bus_get(running_as, &errors[1], &details[1]))) {
+                        if (system_bus) {
+                                drop_connection(system_bus);
+                                system_bus = NULL;
+                        }
                 }
 
-                if ((r = bus_setup_loop(m, m->api_bus)) < 0)
-                        goto fail;
-        }
+        return_values->n_args = 4;
+        assert(return_values->n_args <= ARG_ARRAY_MAX_ARGS);
 
-        if (!dbus_connection_register_object_path(m->api_bus, "/org/freedesktop/systemd1", &bus_manager_vtable, m) ||
-            !dbus_connection_register_fallback(m->api_bus, "/org/freedesktop/systemd1/unit", &bus_unit_vtable, m) ||
-            !dbus_connection_register_fallback(m->api_bus, "/org/freedesktop/systemd1/job", &bus_job_vtable, m) ||
-            !dbus_connection_add_filter(m->api_bus, api_bus_message_filter, m, NULL)) {
-                log_error("Not enough memory");
-                r = -ENOMEM;
-                goto fail;
-        }
+        return_values->args[0] = (void *)system_bus;
+        return_values->args[1] = (void *)session_bus;
+        return_values->args[2] = (void *)errors;
+        return_values->args[3] = (void *)details;
 
-        /* Get NameOwnerChange messages */
-        dbus_bus_add_match(m->api_bus,
-                           "type='signal',"
-                           "sender='"DBUS_SERVICE_DBUS"',"
-                           "interface='"DBUS_INTERFACE_DBUS"',"
-                           "member='NameOwnerChanged',"
-                           "path='"DBUS_PATH_DBUS"'",
-                           &error);
+        return 0;
+}
 
-        if (dbus_error_is_set(&error)) {
-                log_error("Failed to register match: %s", bus_error_message(&error));
-                r = -EIO;
-                goto fail;
+static bool system_bus_setup_pending;
+static bool session_bus_setup_pending;
+
+static void bus_public_start(Manager *m,
+                             ArgArray *args,
+                             int return_code,
+                             ArgArray *return_values) {
+        DBusConnection *system_bus, *session_bus;
+        const char **details;
+        DBusError *errors;
+        int cmd;
+
+        assert(args->n_args <= ARG_ARRAY_MAX_ARGS && args->n_args >= 4);
+        cmd = PTR_TO_INT(args->args[1]);
+
+        assert(return_values->n_args <= ARG_ARRAY_MAX_ARGS && return_values->n_args >= 4);
+        system_bus = (DBusConnection *)return_values->args[0];
+        session_bus = (DBusConnection *)return_values->args[1];
+        errors = (DBusError *)return_values->args[2];
+        details = (const char **)return_values->args[3];
+
+        if (cmd & BUS_SETUP_SYSTEM) {
+                DBusError *error = &errors[0];
+                const char *detail = details[0];
+                int r;
+                char *id;
+
+                system_bus_setup_pending = false;
+
+                if (system_bus &&
+                    !(r = system_bus_start(system_bus, m))) {
+                        log_debug("Successfully connected to %s D-Bus bus %s as %s",
+                                  m->running_as == MANAGER_SYSTEM ? "System/API" : "System",
+                                  strnull((id = dbus_connection_get_server_id(m->system_bus))),
+                                  strnull(dbus_bus_get_unique_name(m->system_bus)));
+                        dbus_free(id);
+                } else {
+                        log_error("Failed to connect to %s D-Bus bus: %s: %s",
+                                  m->running_as == MANAGER_SYSTEM ? "System/API" : "System",
+                                  detail ? : "",
+                                  dbus_error_is_set(error) ? bus_error_message(error) :
+                                                             r ? strerror(-r) : "UNKNOWN");
+                }
         }
 
-        /* Get activation requests */
-        dbus_bus_add_match(m->api_bus,
-                           "type='signal',"
-                           "sender='"DBUS_SERVICE_DBUS"',"
-                           "interface='org.freedesktop.systemd1.Activator',"
-                           "member='ActivationRequest',"
-                           "path='"DBUS_PATH_DBUS"'",
-                           &error);
+        if (cmd & BUS_SETUP_SESSION) {
+                DBusError *error = &errors[1];
+                const char *detail = details[1];
+                int r;
+                char *id;
 
-        if (dbus_error_is_set(&error)) {
-                log_error("Failed to register match: %s", bus_error_message(&error));
-                r = -EIO;
-                goto fail;
+                session_bus_setup_pending = false;
+
+                if (session_bus &&
+                    !(r = session_bus_start(session_bus, m))) {
+                        log_debug("Successfully connected to API D-Bus bus %s as %s",
+                                  strnull((id = dbus_connection_get_server_id(m->system_bus))),
+                                  strnull(dbus_bus_get_unique_name(m->system_bus)));
+                        dbus_free(id);
+                } else {
+                        log_error("Failed to connect to API D-Bus bus: %s: %s",
+                                  detail ? : "",
+                                  dbus_error_is_set(error) ? bus_error_message(error) :
+                                                             r ? strerror(-r) : "UNKNOWN");
+                }
         }
 
-        if ((r = request_name(m)) < 0)
-                goto fail;
+        if (dbus_error_is_set(&errors[0]))
+                dbus_error_free(&errors[0]);
 
-        if ((r = query_name_list(m)) < 0)
-                goto fail;
+        if (dbus_error_is_set(&errors[1]))
+                dbus_error_free(&errors[1]);
+}
 
-        if (m->api_bus != m->system_bus) {
-                char *id;
-                log_debug("Successfully connected to API D-Bus bus %s as %s",
-                         strnull((id = dbus_connection_get_server_id(m->api_bus))),
-                         strnull(dbus_bus_get_unique_name(m->api_bus)));
-                dbus_free(id);
+inline static int bus_init_public_initiate_thread(Manager *m, int cmd) {
+        static DBusError errors[2];
+        static const char *details[2];
+        int r = 0;
+        ArgArray args;
+
+        args.n_args = 4;
+        assert(args.n_args <= ARG_ARRAY_MAX_ARGS);
+
+        args.args[0] = INT_TO_PTR(m->running_as);
+        args.args[1] = INT_TO_PTR(cmd);
+        args.args[2] = (void *)errors;
+        args.args[3] = (void *)details;
+
+        r = manager_add_work_thread(m, bus_public_setup_work_thread,
+                                    &args, bus_public_start);
+        return r;
+}
+
+static int bus_init_public_async(Manager *m) {
+        int cmd = 0;
+        const char *cmd_desc;
+        int r = 0;
+
+        if (m->running_as == MANAGER_SYSTEM) {
+                assert(m->system_bus == m->api_bus);
+
+                if (!m->system_bus && !system_bus_setup_pending) {
+                        cmd = BUS_SETUP_SYSTEM;
+                        system_bus_setup_pending = true;
+                }
+        } else {
+                if (!m->system_bus && !system_bus_setup_pending) {
+                        cmd |= BUS_SETUP_SYSTEM;
+                        system_bus_setup_pending = true;
+                }
+                if (!m->api_bus && !session_bus_setup_pending) {
+                        cmd |= BUS_SETUP_SESSION;
+                        session_bus_setup_pending = true;
+                }
         }
 
-        return 0;
+        if (!cmd)
+                return 0;
 
-fail:
-        bus_done_api(m);
-        dbus_error_free(&error);
+        cmd_desc = cmd == (BUS_SETUP_SYSTEM | BUS_SETUP_SESSION) ? "System and API" :
+                   cmd & BUS_SETUP_SYSTEM ?
+                        m->running_as == MANAGER_SYSTEM ? "System/API" : "System" :
+                   cmd & BUS_SETUP_SESSION ? "API" : "UNKNOWN";
+
+        log_debug("Initiate a thread of initializing %s D-bus bus", cmd_desc);
+        r = bus_init_public_initiate_thread(m, cmd);
+
+        if (r < 0) {
+                if (cmd & BUS_SETUP_SYSTEM)
+                        system_bus_setup_pending = false;
+                if (cmd & BUS_SETUP_SESSION)
+                        session_bus_setup_pending = false;
+                log_error("Failed to initiate a thread to init %s D-bus bus: %s",
+                          cmd_desc, strerror(-r));
+        }
 
         return r;
 }
@@ -1006,11 +1218,9 @@ int bus_init(Manager *m, bool try_bus_connect) {
                         return -ENOMEM;
                 }
 
-        if (try_bus_connect) {
-                if ((r = bus_init_system(m)) < 0 ||
-                    (r = bus_init_api(m)) < 0)
-                        return r;
-        }
+        if (try_bus_connect &&
+            (r = bus_init_public_async(m)) < 0)
+                return r;
 
         if ((r = bus_init_private(m)) < 0)
                 return r;
@@ -1018,7 +1228,7 @@ int bus_init(Manager *m, bool try_bus_connect) {
         return 0;
 }
 
-static void shutdown_connection(Manager *m, DBusConnection *c) {
+static void stop_connection(Manager *m, DBusConnection *c) {
         Set *s;
         Job *j;
         Iterator i;
@@ -1053,11 +1263,19 @@ static void shutdown_connection(Manager *m, DBusConnection *c) {
         }
 
         dbus_connection_set_dispatch_status_function(c, NULL, NULL, NULL);
+}
+
+static void drop_connection(DBusConnection *c) {
         dbus_connection_flush(c);
         dbus_connection_close(c);
         dbus_connection_unref(c);
 }
 
+static void shutdown_connection(Manager *m, DBusConnection *c) {
+        stop_connection(m, c);
+        drop_connection(c);
+}
+
 static void bus_done_api(Manager *m) {
         assert(m);
 
-- 
1.7.7.3

_______________________________________________
systemd-devel mailing list
systemd-devel@lists.freedesktop.org
http://lists.freedesktop.org/mailman/listinfo/systemd-devel

Reply via email to