Prevously compactions used snapshots in the background. While the
compaction data was prepared asynchronously the actual writing of the
data needed to happen within the main thread in order to ensure
consistency.
We can solve this differently by using multiple files within the ovsdb
log. By splitting files when the compaction thread starts we can remove
most of that dependency.

The log will create a new "curr" file and move the previous "curr" to "old".
The files "base" and "old" are read-only from then on and the compaction
thread is reponsible to write all content of them (including e.g. raft
states) to a new log file.
In the mean time "curr" can be used as normal an be written to.
At the end of the compaction we just replace "base" with the newly
written compacted file and drop "old".

This means the part of the main thread that is blocked for compaction is
now limited to writing 2 redirects, opening and renaming files. We no
longer need to write significant amounts of data in there.

The only exception of this is if compaction fails for some reason and we
need to abort to the previous state (e.g. if a disk write fails). Then
the revert will happen in the main thread to ensure consistency.

Signed-off-by: Felix Huettner <felix.huettner@stackit.cloud>
---
 lib/ovs-thread.c      |   1 +
 lib/ovs-thread.h      |   1 +
 ovsdb/log.c           | 394 +++++++++++++++++++++++++++++++++++++++++-
 ovsdb/log.h           |  10 ++
 ovsdb/ovsdb.c         | 125 ++++++++++----
 ovsdb/ovsdb.h         |  19 +-
 ovsdb/raft-private.h  |   7 +
 ovsdb/raft.c          | 112 +++++++++++-
 ovsdb/raft.h          |  19 ++
 ovsdb/storage.c       |  94 ++++++++--
 ovsdb/storage.h       |  27 ++-
 ovsdb/transaction.c   |   4 +-
 tests/ovsdb-log.at    | 159 +++++++++++++++++
 tests/ovsdb-server.at |   4 +-
 tests/test-ovsdb.c    |  11 ++
 15 files changed, 924 insertions(+), 63 deletions(-)

diff --git a/lib/ovs-thread.c b/lib/ovs-thread.c
index f80008061..0db84fbc3 100644
--- a/lib/ovs-thread.c
+++ b/lib/ovs-thread.c
@@ -188,6 +188,7 @@ XPTHREAD_FUNC1(pthread_cond_signal, pthread_cond_t *);
 XPTHREAD_FUNC1(pthread_cond_broadcast, pthread_cond_t *);
 
 XPTHREAD_FUNC2(pthread_join, pthread_t, void **);
+XPTHREAD_FUNC2(pthread_kill, pthread_t, int);
 
 typedef void destructor_func(void *);
 XPTHREAD_FUNC2(pthread_key_create, pthread_key_t *, destructor_func *);
diff --git a/lib/ovs-thread.h b/lib/ovs-thread.h
index aac5e19c9..31d1eb0f8 100644
--- a/lib/ovs-thread.h
+++ b/lib/ovs-thread.h
@@ -123,6 +123,7 @@ void xpthread_sigmask(int, const sigset_t *, sigset_t *);
 
 pthread_t ovs_thread_create(const char *name, void *(*)(void *), void *);
 void xpthread_join(pthread_t, void **);
+void xpthread_kill(pthread_t, int);
 
 /* Per-thread data.
  *
diff --git a/ovsdb/log.c b/ovsdb/log.c
index 339ce2739..7bfa51665 100644
--- a/ovsdb/log.c
+++ b/ovsdb/log.c
@@ -659,10 +659,46 @@ ovsdb_log_read(struct ovsdb_log *log, struct json **jsonp)
                     /* We just finished reading the base file */
                     log->base = log->curr;
                     log->curr = next;
-                } else {
+                } else if (!log->old) {
                     /* We already have a base file, so we must be reading from
                      * a db that is in a transient state of a compaction.
-                     * For now this needs to be fixed manually. */
+                     * We need to clean this up. */
+                    VLOG_WARN("%s: db was in a compaction-in-progress state. "
+                              "Recovering...", log->display_name);
+                    log->old = log->curr;
+                    log->curr = next;
+
+                    char *deref_name = follow_symlinks(log->base->name);
+                    char *tmp_name = xasprintf("%s.tmp", deref_name);
+                    free(deref_name);
+                    if (unlink(tmp_name) < 0 && errno != ENOENT) {
+                        error = ovsdb_io_error(errno, "failed to remove %s",
+                                               tmp_name);
+                        json_destroy(*jsonp);
+                        *jsonp = NULL;
+                        free(tmp_name);
+                        return error;
+                    }
+
+                    struct ovsdb_log *temporary;
+                    error = ovsdb_log_open(tmp_name, log->magic,
+                                           OVSDB_LOG_CREATE_EXCL_SINGLE,
+                                           false, &temporary);
+                    free(tmp_name);
+                    if (error) {
+                        json_destroy(*jsonp);
+                        *jsonp = NULL;
+                        return error;
+                    }
+                    error = ovsdb_log_compact_abort(log, temporary);
+                    if (error) {
+                        json_destroy(*jsonp);
+                        *jsonp = NULL;
+                        return error;
+                    }
+                } else {
+                    /* Not sure what causes us to have 4 files, but something
+                     * is definately broken. Return an error. */
                     log->state = OVSDB_LOG_BROKEN;
                     json_destroy(*jsonp);
                     *jsonp = NULL;
@@ -1190,6 +1226,360 @@ ovsdb_log_replace_abort(struct ovsdb_log *new)
         free(name);
     }
 }
+
+static struct ovsdb_error *
+ovsdb_log_copy_file(struct ovsdb_log_file *source, struct ovsdb_log_file *dst,
+                    off_t source_start_offset, off_t source_end_offset)
+{
+    struct ovsdb_error *error;
+    char buf[BUFSIZ];
+    source->offset = source_start_offset;
+    if (fseeko(source->stream, source->offset, SEEK_SET)) {
+        error = ovsdb_io_error(errno, "%s: cannot seek to start of file",
+                               source->name);
+        return error;
+    }
+    while (source->offset < source_end_offset) {
+        int chunk = MIN(source_end_offset - source->offset, sizeof buf);
+        if (fread(buf, 1, chunk, source->stream) != chunk) {
+            return ovsdb_io_error(ferror(source->stream) ? errno : EOF,
+                                  "%s: error reading %u bytes "
+                                  "starting at offset %lld",
+                                  source->name, chunk,
+                                  (long long int) source->offset);
+        }
+        source->offset += chunk;
+        if (fwrite(buf, 1, chunk, dst->stream) != chunk) {
+            return ovsdb_io_error(ferror(dst->stream) ? errno : EOF,
+                                  "%s: error writing %u bytes "
+                                  "starting at offset %lld",
+                                  dst->name, chunk,
+                                  (long long int) dst->offset);
+        }
+        dst->offset += chunk;
+    }
+    return NULL;
+}
+
+/* Replaces the file behind original with the file behind replacer.
+ * original will be in a state with valid streams afterwards while replacer
+ * will have a closed stream. */
+static struct ovsdb_error *
+ovsdb_log_rename_file(struct ovsdb_log_file *original,
+                      struct ovsdb_log_file *replacer)
+{
+    /* Replace original base file by the temporary file.
+     *
+     * We support two strategies:
+     *
+     *     - The preferred strategy is to rename the temporary file over the
+     *       original one in-place, then close the original one.  This works on
+     *       Unix-like systems.  It does not work on Windows, which does not
+     *       allow open files to be renamed.  The approach has the advantage
+     *       that, at any point, we can drop back to something that already
+     *       works.
+     *
+     *     - Alternatively, we can close both files, rename, then open the new
+     *       file (which now has the original name).  This works on all
+     *       systems, but if reopening the file fails then 'old' is broken.
+     *
+     * We make the strategy a variable instead of an #ifdef to make it easier
+     * to test both strategies on Unix-like systems, and to make the code
+     * easier to read. */
+    if (!rename_open_files) {
+        fclose(original->stream);
+        original->stream = NULL;
+
+        fclose(replacer->stream);
+        replacer->stream = NULL;
+    }
+
+    /* Rename 'old' to 'new'.  We dereference the old name because, if it is a
+     * symlink, we want to replace the referent of the symlink instead of the
+     * symlink itself. */
+    char *deref_name = follow_symlinks(original->name);
+    struct ovsdb_error *error = ovsdb_rename(replacer->name, deref_name);
+    free(deref_name);
+
+    if (error) {
+        return error;
+    }
+    if (rename_open_files) {
+        fsync_parent_dir(original->name);
+        fclose(original->stream);
+        original->stream = replacer->stream;
+        replacer->stream = NULL;
+    } else {
+        original->stream = fopen(original->name, "r+b");
+        if (!original->stream) {
+            return  ovsdb_io_error(errno, "%s: could not reopen log",
+                                   original->name);
+        }
+
+        if (fseek(original->stream, replacer->offset, SEEK_SET)) {
+            return ovsdb_io_error(errno, "%s: seek failed", original->name);
+        }
+    }
+    return error;
+}
+
+/* Prepare to compact the content of the 'log' on disk. In order to do that
+ * we need to:
+ * 1. Open a new file to be used for writing from now on. Store as new curr
+ * 2. Write metadata at the end of old to point to the new file
+ * 3. Transition the file in curr to old
+ * 4. Open an additional temporary file. This will be the new base in the
+ *    future and will contain all data of base and old in a compacted way.
+ *
+ * The caller is responsible to write all data that has been accumulated until
+ * now into newp and afterwards call ovsdb_log_compact_commit or abort.
+ */
+struct ovsdb_error * OVS_WARN_UNUSED_RESULT
+ovsdb_log_compact_start(struct ovsdb_log *old,
+                        struct ovsdb_log **newp)
+{
+    struct ovsdb_error *error;
+
+    ovs_assert(old->open_mode != OVSDB_LOG_READ_ONLY);
+
+    /* If we have a single file and not yet two files we first need to split
+     * them. That makes the replacement way easier. */
+    if (!old->base) {
+        ovsdb_log_write_(old, NULL);
+    }
+
+    ovs_assert(old->curr->lockfile);
+    ovs_assert(!old->old);
+    ovs_assert(old->base);
+
+    struct ovsdb_log_file *new_curr;
+
+    char *new_curr_name = ovsdb_log_get_new_filename(old);
+    error = ovsdb_log_file_open(new_curr_name, old->magic, OVSDB_LOG_CREATE,
+                                old->may_lock, &new_curr, NULL);
+
+    if (error) {
+        return error;
+    }
+
+    error = ovsdb_log_write_next_file(old, old->curr, new_curr_name);
+
+    if (error) {
+        /* Close and remove the newly created file. */
+        ovsdb_log_file_close(new_curr);
+        unlink(new_curr_name);
+        free(new_curr_name);
+        return error;
+    }
+    free(new_curr_name);
+
+    old->old = old->curr;
+    old->curr = new_curr;
+
+    /* If old->curr->name is a symlink, then we want the new file to be in the
+     * same directory as the symlink's referent. */
+    char *deref_name = follow_symlinks(old->base->name);
+    char *tmp_name = xasprintf("%s.tmp", deref_name);
+    free(deref_name);
+
+    /* Remove temporary file.  (It might not exist.) */
+    if (unlink(tmp_name) < 0 && errno != ENOENT) {
+        error = ovsdb_io_error(errno, "failed to remove %s", tmp_name);
+        free(tmp_name);
+        *newp = NULL;
+        return error;
+    }
+
+    /* Create temporary file. */
+    error = ovsdb_log_open(tmp_name, old->magic, OVSDB_LOG_CREATE_EXCL_SINGLE,
+                           false, newp);
+    free(tmp_name);
+    return error;
+}
+
+/* Finish the compact the content of the 'log' on disk. In order to do that
+ * we need to:
+ * 1. Write to the end of new->curr a reference to old->curr
+ * 2. Atomically replace old->base with new->curr
+ * 3. Remove old->old as it is now unreferenced
+ */
+struct ovsdb_error * OVS_WARN_UNUSED_RESULT
+ovsdb_log_compact_commit(struct ovsdb_log *old, struct ovsdb_log *new)
+{
+    if (!old->old) {
+        VLOG_WARN("The compaction of %s has been aborted previously.",
+                  old->display_name);
+        ovsdb_log_close(new);
+        return NULL;
+    }
+    ovs_assert(old->base && old->old && old->curr);
+    ovs_assert(!old->only_single_file);
+    ovs_assert(!new->base && !new->old);
+    ovs_assert(new->curr);
+    ovs_assert(new->only_single_file);
+    ovs_assert(!strcmp(old->magic, new->magic));
+    struct ovsdb_error *error, *abort_error;
+
+    error = ovsdb_log_commit_block(new);
+    if (error) {
+        goto abort;
+    }
+
+    error = ovsdb_log_write_next_file(new, new->curr, old->curr->name);
+    if (error) {
+        goto abort;
+    }
+
+    error = ovsdb_log_rename_file(old->base, new->curr);
+    if (error) {
+        old->error = error;
+        old->state = OVSDB_LOG_BROKEN;
+        return ovsdb_error_clone(error);
+    }
+
+    /* Remove old->old since it is now no longer necessary.
+     * We ignore errors during removal as actually removing it is not
+     * critical. */
+    char *oldname = old->old->name;
+    old->old->name = NULL;
+    ovsdb_log_file_close(old->old);
+    if (unlink(oldname) < 0) {
+        VLOG_WARN("failed to remove old file %s, the file is unnecessary "
+                  " and can be removed now. Errno %d", oldname, errno);
+    }
+    free(oldname);
+    old->old = NULL;
+
+    /* Replace 'old' by 'new' in memory.
+     *
+     * 'old' transitions to OVSDB_LOG_WRITE (it was probably in that mode
+     * anyway). */
+    old->state = OVSDB_LOG_WRITE;
+    ovsdb_error_destroy(old->error);
+    old->error = NULL;
+    /* prev_offset only matters for OVSDB_LOG_READ. */
+    if (old->base->afsync) {
+        uint64_t ticket = afsync_destroy(old->base->afsync);
+        old->base->afsync = afsync_create(fileno(old->base->stream),
+                                          ticket + 1);
+    }
+    old->base->offset = new->curr->offset;
+
+    /* Free 'new'. */
+    ovsdb_log_close(new);
+
+    return NULL;
+
+abort:
+    abort_error = ovsdb_log_compact_abort(old, new);
+    if (abort_error) {
+        char *e = ovsdb_error_to_string_free(error);
+        VLOG_ERR("Error when commiting compaction and additional "
+                 "error while aborting the compaction. Initial error %s",
+                 e);
+        free(e);
+        return abort_error;
+    }
+    return error;
+}
+
+
+/* Abort the compact the content of the 'log' on disk. In order to do that
+ * we need to:
+ * 1. Remove the new log completely
+ * 2. Sync old->old to disk
+ * 3. Write the whole old->old to a temporary file without the redirect
+ * 4. Append all data of old->curr to the temporary file
+ * 5. Replace the underlying file of old->old with the temporary file
+ * 6. Remove old->curr
+ * 7. Move old->old to old->curr
+ *
+ * In comparison to the other compact operations this will cause potentially
+ * significant amount of writes.
+ * This process is designed in a way that if we crash at any step a new process
+ * can clean up. In the worst case we would leave a temporary file laying
+ * around.
+ */
+struct ovsdb_error * OVS_WARN_UNUSED_RESULT
+ovsdb_log_compact_abort(struct ovsdb_log *old, struct ovsdb_log *new)
+{
+    struct ovsdb_error *error;
+
+    /* We need to keep a consistent state in regards to the offset we are
+     * reading next, since we might not have read old->curr yet. */
+    off_t new_offset = old->old->prev_offset + old->curr->offset;
+
+    /* Step 1: Remove the new log completely
+     * We will truncate the new file and abuse it as temporary file. */
+    struct ovsdb_log_file *tmpfile = new->curr;
+    new->curr = NULL;
+    ovsdb_log_close(new);
+    tmpfile->offset = tmpfile->prev_offset = 0;
+    ovsdb_log_file_truncate(tmpfile);
+
+    /* Step 2: Sync old->old to disk.
+     * If this fails we can only hope for no data loss. */
+    error = ovsdb_log_file_commit_block(old->old);
+    if (error) {
+        old->error = error;
+        old->state = OVSDB_LOG_BROKEN;
+        return ovsdb_error_clone(error);
+    }
+
+    /* 3. Write the whole old->old to a temporary file without the redirect.
+     * old->old->prev_offset will tell us until when we want to read. */
+    error = ovsdb_log_copy_file(old->old, tmpfile, 0, old->old->prev_offset);
+    if (error) {
+        old->error = error;
+        old->state = OVSDB_LOG_BROKEN;
+        return ovsdb_error_clone(error);
+    }
+
+    /* 4. Append all data of old->curr to the temporary file.
+     * Since we might not be at the end of the file yet we need to seek to the
+     * end. */
+    if (fseek(old->curr->stream, 0, SEEK_END)) {
+        old->error = ovsdb_io_error(errno, "%s: seek failed", old->curr->name);
+        old->state = OVSDB_LOG_BROKEN;
+        return ovsdb_error_clone(old->error);
+    }
+
+    error = ovsdb_log_copy_file(old->curr, tmpfile, 0,
+                                ftell(old->curr->stream));
+    if (error) {
+        old->error = error;
+        old->state = OVSDB_LOG_BROKEN;
+        return ovsdb_error_clone(error);
+    }
+
+    /* 5. Replace the underlying file of old->old with the temporary file. */
+    error = ovsdb_log_rename_file(old->old, tmpfile);
+    if (error) {
+        old->error = error;
+        old->state = OVSDB_LOG_BROKEN;
+        return ovsdb_error_clone(error);
+    }
+    ovsdb_log_file_close(tmpfile);
+
+    /* 6. Remove old->curr. */
+    char *filename = old->curr->name;
+    old->curr->name = NULL;
+    ovsdb_log_file_close(old->curr);
+    unlink(filename);
+    free(filename);
+
+    /* 7. Move old->old to old->curr. */
+    old->curr = old->old;
+    old->old = NULL;
+    old->curr->offset = old->curr->prev_offset = new_offset;
+    if (fseek(old->curr->stream, new_offset, SEEK_SET)) {
+        old->error = ovsdb_io_error(errno, "%s: seek failed", old->curr->name);
+        old->state = OVSDB_LOG_BROKEN;
+        return ovsdb_error_clone(old->error);
+    }
+
+    return NULL;
+}
 
 void
 ovsdb_log_disable_renaming_open_files(void)
diff --git a/ovsdb/log.h b/ovsdb/log.h
index 6161c0d23..69567d6c4 100644
--- a/ovsdb/log.h
+++ b/ovsdb/log.h
@@ -100,6 +100,16 @@ struct ovsdb_error *ovsdb_log_replace_commit(struct 
ovsdb_log *old,
     OVS_WARN_UNUSED_RESULT;
 void ovsdb_log_replace_abort(struct ovsdb_log *new);
 
+struct ovsdb_error *ovsdb_log_compact_start(struct ovsdb_log *old,
+                                            struct ovsdb_log **newp)
+    OVS_WARN_UNUSED_RESULT;
+struct ovsdb_error *ovsdb_log_compact_commit(struct ovsdb_log *old,
+                                             struct ovsdb_log *new)
+    OVS_WARN_UNUSED_RESULT;
+struct ovsdb_error *ovsdb_log_compact_abort(struct ovsdb_log *old,
+                                            struct ovsdb_log *new)
+    OVS_WARN_UNUSED_RESULT;
+
 /* For testing. */
 void ovsdb_log_disable_renaming_open_files(void);
 
diff --git a/ovsdb/ovsdb.c b/ovsdb/ovsdb.c
index 7336f500c..6fe81fc1a 100644
--- a/ovsdb/ovsdb.c
+++ b/ovsdb/ovsdb.c
@@ -14,6 +14,7 @@
  */
 
 #include <config.h>
+#include <signal.h>
 
 #include "ovsdb.h"
 
@@ -616,21 +617,54 @@ ovsdb_clone_data(const struct ovsdb *db)
 
     return new;
 }
+struct ovsdb_write * OVS_WARN_UNUSED_RESULT
+ovsdb_write_schema_change(struct ovsdb *db,
+                          const struct ovsdb_schema *schema,
+                          const struct json *data,
+                          const struct uuid *prereq,
+                          struct uuid *resultp)
+{
+    /* If we are currently running a compaction we will abort it now. This
+     * might be expensive in terms of disk writes, but there is no real
+     * alternative. */
+    if (ovsdb_compcation_in_progress(db)) {
+        xpthread_kill(db->compact_state->thread, SIGKILL);
+        xpthread_join(db->compact_state->thread, NULL);
+        struct ovsdb_error *err = ovsdb_storage_compact_abort(
+                db->storage, db->compact_state->log,
+                db->compact_state->aux);
+        if (err) {
+            char *s = ovsdb_error_to_string_free(err);
+            VLOG_FATAL("Error when aborting compaction during a "
+                       "schema change: %s", s);
+            free(s);
+        }
+    }
+
+    return ovsdb_storage_write_schema_change(db->storage, schema, data,
+                                             prereq, resultp);
+}
 
 static void *
 compaction_thread(void *aux)
 {
     struct ovsdb_compaction_state *state = aux;
     uint64_t start_time = time_msec();
-    struct json *data;
+    struct json *data, *serialized_data;
 
     VLOG_DBG("%s: Compaction thread started.", state->db->name);
     data = ovsdb_to_txn_json(state->db, "compacting database online",
                              /* Do not allow shallow copies to avoid races. */
                              false);
-    state->data = json_serialized_object_create(data);
+    serialized_data = json_serialized_object_create(data);
     json_destroy(data);
 
+    state->error = ovsdb_storage_compact_write(state->is_raft,
+                                               state->log,
+                                               state->schema,
+                                               serialized_data,
+                                               state->aux);
+
     state->thread_time = time_msec() - start_time;
 
     VLOG_DBG("%s: Compaction thread finished in %"PRIu64" ms.",
@@ -671,14 +705,9 @@ ovsdb_compact(struct ovsdb *db, bool trim_memory 
OVS_UNUSED)
     uint64_t applied_index = ovsdb_storage_get_applied_index(db->storage);
     uint64_t elapsed, start_time = time_msec();
     struct ovsdb_compaction_state *state;
+    struct ovsdb_error *error;
 
-    if (!applied_index) {
-        /* Parallel compaction is not supported for standalone databases. */
-        state = xzalloc(sizeof *state);
-        state->data = ovsdb_to_txn_json(db,
-                                        "compacting database online", true);
-        state->schema = ovsdb_schema_to_json(db->schema);
-    } else if (ovsdb_compaction_ready(db)) {
+    if (ovsdb_compaction_ready(db)) {
         xpthread_join(db->compact_state->thread, NULL);
 
         state = db->compact_state;
@@ -686,14 +715,67 @@ ovsdb_compact(struct ovsdb *db, bool trim_memory 
OVS_UNUSED)
 
         ovsdb_destroy(state->db);
         seq_destroy(state->done);
+        json_destroy(state->schema);
+
+        if (state->error) {
+            VLOG_WARN("Compaction failed in thread with %s",
+                      ovsdb_error_to_string(state->error));
+            error = ovsdb_storage_compact_abort(db->storage, state->log,
+                                                state->aux);
+            if (error) {
+                VLOG_ABORT("Failure to abort compaction: %s.",
+                           ovsdb_error_to_string(error));
+            }
+            free(state);
+            return error;
+        }
+
+        error = ovsdb_storage_compact_commit(db->storage, state->log,
+                                             state->applied_index,
+                                             state->aux);
+        if (error) {
+            VLOG_WARN("Compaction failed with %s",
+                      ovsdb_error_to_string(error));
+            free(state);
+            return error;
+        }
+
+#if HAVE_DECL_MALLOC_TRIM
+        if (!error && trim_memory) {
+            malloc_trim(0);
+        }
+#endif
+
+        elapsed = time_msec() - start_time;
+        VLOG(elapsed > 1000 ? VLL_INFO : VLL_DBG,
+             "%s: Database compaction took %"PRIu64"ms "
+             "(init: %"PRIu64"ms, write: %"PRIu64"ms, thread: %"PRIu64"ms)",
+             db->name, elapsed + state->init_time,
+             state->init_time, elapsed, state->thread_time);
+
+        free(state);
+        return error;
     } else {
         /* Creating a thread. */
         ovs_assert(!db->compact_state);
-        state = xzalloc(sizeof *state);
 
+        struct ovsdb_log *target;
+        void *aux;
+        error = ovsdb_storage_compact_start(db->storage, applied_index,
+                                            &target, &aux);
+        if (error) {
+            VLOG_WARN("Compaction start failed with %s",
+                      ovsdb_error_to_string(error));
+            return error;
+        }
+
+        state = xzalloc(sizeof *state);
         state->db = ovsdb_clone_data(db);
         state->schema = ovsdb_schema_to_json(db->schema);
+        state->log = target;
         state->applied_index = applied_index;
+        state->is_raft = applied_index != 0;
+        state->aux = aux;
         state->done = seq_create();
         state->seqno = seq_read(state->done);
         state->thread = ovs_thread_create("compaction",
@@ -703,29 +785,6 @@ ovsdb_compact(struct ovsdb *db, bool trim_memory 
OVS_UNUSED)
         db->compact_state = state;
         return NULL;
     }
-
-    struct ovsdb_error *error;
-
-    error = ovsdb_storage_store_snapshot(db->storage, state->schema,
-                                         state->data, state->applied_index);
-    json_destroy(state->schema);
-    json_destroy(state->data);
-
-#if HAVE_DECL_MALLOC_TRIM
-    if (!error && trim_memory) {
-        malloc_trim(0);
-    }
-#endif
-
-    elapsed = time_msec() - start_time;
-    VLOG(elapsed > 1000 ? VLL_INFO : VLL_DBG,
-         "%s: Database compaction took %"PRIu64"ms "
-         "(init: %"PRIu64"ms, write: %"PRIu64"ms, thread: %"PRIu64"ms)",
-         db->name, elapsed + state->init_time,
-         state->init_time, elapsed, state->thread_time);
-
-    free(state);
-    return error;
 }
 
 void
diff --git a/ovsdb/ovsdb.h b/ovsdb/ovsdb.h
index 6e08a679b..b86688aa1 100644
--- a/ovsdb/ovsdb.h
+++ b/ovsdb/ovsdb.h
@@ -78,11 +78,20 @@ struct ovsdb_compaction_state {
 
     struct ovsdb *db;          /* Copy of a database data to compact. */
 
-    struct json *data;         /* 'db' as a serialized json. */
     struct json *schema;       /* 'db' schema json. */
     uint64_t applied_index;    /* Last applied index reported by the storage
                                 * at the moment of a database copy. */
 
+    struct ovsdb_log *log;     /* The log to write the compacted data to. */
+
+    bool is_raft;
+
+    void *aux;                 /* Data that might be used to support the
+                                * compaction on the underlying storage. */
+
+    struct ovsdb_error *error; /* NULL on success, otherwise contains the
+                                * error. */
+
     /* Completion signaling. */
     struct seq *done;
     uint64_t seqno;
@@ -154,6 +163,14 @@ struct json *ovsdb_execute(struct ovsdb *, const struct 
ovsdb_session *,
                            long long int elapsed_msec,
                            long long int *timeout_msec);
 
+struct ovsdb_write *
+ovsdb_write_schema_change(struct ovsdb *db,
+                          const struct ovsdb_schema *schema,
+                          const struct json *data,
+                          const struct uuid *prereq,
+                          struct uuid *resultp)
+    OVS_WARN_UNUSED_RESULT;
+
 struct ovsdb_error *ovsdb_compact(struct ovsdb *, bool trim_memory)
     OVS_WARN_UNUSED_RESULT;
 void ovsdb_compaction_wait(struct ovsdb *);
diff --git a/ovsdb/raft-private.h b/ovsdb/raft-private.h
index 8bfe85008..2a70c6380 100644
--- a/ovsdb/raft-private.h
+++ b/ovsdb/raft-private.h
@@ -227,4 +227,11 @@ struct uuid raft_parse_required_uuid(struct ovsdb_parser 
*, const char *name);
 bool raft_parse_optional_uuid(struct ovsdb_parser *, const char *name,
                          struct uuid *);
 
+/* Type used to share data with the compaction thread. */
+struct raft_compaction_state {
+    struct raft_header header;
+    uint64_t term;
+    struct uuid vote;
+};
+
 #endif /* raft-private.h */
diff --git a/ovsdb/raft.c b/ovsdb/raft.c
index cf9285b7e..8d4cde695 100644
--- a/ovsdb/raft.c
+++ b/ovsdb/raft.c
@@ -4546,8 +4546,8 @@ raft_get_log_length(const struct raft *raft)
             : raft->last_applied - raft->log_start + 1);
 }
 
-/* Returns true if taking a snapshot of 'raft', with raft_store_snapshot(), is
- * possible. */
+/* Returns true if taking a snapshot of 'raft', with raft_store_snapshot() or
+ * compaction, is possible. */
 bool
 raft_may_snapshot(const struct raft *raft)
 {
@@ -5326,3 +5326,111 @@ raft_init(void)
                              raft_unixctl_failure_test, NULL);
     ovsthread_once_done(&once);
 }
+
+struct ovsdb_error * OVS_WARN_UNUSED_RESULT
+raft_compact_start(struct raft *raft, uint64_t applied_index,
+                   struct ovsdb_log **dst, void **aux)
+{
+    if (raft->joining) {
+        return ovsdb_error(NULL,
+                           "cannot store a snapshot while joining cluster");
+    } else if (raft->leaving) {
+        return ovsdb_error(NULL,
+                           "cannot store a snapshot while leaving cluster");
+    } else if (raft->left) {
+        return ovsdb_error(NULL,
+                           "cannot store a snapshot after leaving cluster");
+    } else if (raft->failed) {
+        return ovsdb_error(NULL,
+                           "cannot store a snapshot following failure");
+    }
+
+    struct ovsdb_error *error = ovsdb_log_compact_start(raft->log, dst);
+
+    if (!error) {
+        struct raft_compaction_state *state = xzalloc(sizeof *state);
+        *aux = state;
+        struct raft_header *h = &state->header;
+        h->sid = raft->sid;
+        h->cid = raft->cid;
+        h->name = xstrdup(raft->name);
+        h->local_address = xstrdup(raft->local_address);
+        h->snap_index = applied_index;
+        h->snap.term = raft_get_term(raft, applied_index);
+        h->snap.eid = *raft_get_eid(raft, applied_index);
+        h->snap.servers = json_clone(raft_servers_for_index(
+            raft, applied_index - 1));
+        h->snap.election_timer = raft->election_timer;
+        state->term = raft->term;
+        state->vote = raft->vote;
+    }
+
+    return error;
+}
+
+struct ovsdb_error * OVS_WARN_UNUSED_RESULT
+raft_compact_commit(struct raft *raft, struct ovsdb_log *dst,
+                    uint64_t applied_index, void *aux)
+{
+    struct ovsdb_error *error = ovsdb_log_compact_commit(raft->log, dst);
+    if (error) {
+        return error;
+    }
+
+    struct raft_compaction_state *state = aux;
+    raft_entry_uninit(&raft->snap);
+    raft->snap = state->header.snap;
+    free(state->header.name);
+    free(state->header.local_address);
+    free(state);
+
+    uint64_t new_log_start = applied_index + 1;
+    for (size_t i = 0; i < new_log_start - raft->log_start; i++) {
+        raft_entry_uninit(&raft->entries[i]);
+    }
+    memmove(&raft->entries[0], &raft->entries[new_log_start - raft->log_start],
+            (raft->log_end - new_log_start) * sizeof *raft->entries);
+
+    raft->log_start = new_log_start;
+
+    return NULL;
+}
+
+struct ovsdb_error * OVS_WARN_UNUSED_RESULT
+raft_compact_abort(struct raft *raft, struct ovsdb_log *dst, void *aux)
+{
+    struct raft_compaction_state *state = aux;
+    raft_header_uninit(&state->header);
+    free(state->header.name);
+    free(state->header.local_address);
+    free(state);
+    return ovsdb_log_compact_abort(raft->log, dst);
+}
+
+struct ovsdb_error * OVS_WARN_UNUSED_RESULT
+raft_store_compact(struct ovsdb_log *target,
+                   const struct json *new_snapshot_data,
+                   void *aux)
+{
+    struct raft_compaction_state *state = aux;
+    struct raft_header *header = &state->header;
+    raft_entry_set_parsed_data(&header->snap, new_snapshot_data);
+    struct ovsdb_error *error = ovsdb_log_write_and_free(
+        target, raft_header_to_json(header));
+    if (error) {
+        return error;
+    }
+
+    /* Write term and vote (if any).
+     *
+     * The term is redundant if we wrote a log record for that term above.  The
+     * vote, if any, is never redundant.
+     */
+    error = raft_write_state(target, state->term, &state->vote);
+    if (error) {
+        return error;
+    }
+
+    return ovsdb_log_commit_block(target);
+
+}
diff --git a/ovsdb/raft.h b/ovsdb/raft.h
index 5833aaf23..a1251510e 100644
--- a/ovsdb/raft.h
+++ b/ovsdb/raft.h
@@ -190,4 +190,23 @@ void raft_take_leadership(struct raft *);
 void raft_transfer_leadership(struct raft *, const char *reason);
 
 bool raft_precheck_prereq(const struct raft *, const struct uuid *prereq);
+
+struct ovsdb_error *raft_compact_start(struct raft *raft,
+                                       uint64_t applied_index,
+                                       struct ovsdb_log **dst,
+                                       void **aux)
+    OVS_WARN_UNUSED_RESULT;
+struct ovsdb_error *raft_compact_commit(struct raft *raft,
+                                        struct ovsdb_log *dst,
+                                        uint64_t applied_index,
+                                        void *aux)
+    OVS_WARN_UNUSED_RESULT;
+struct ovsdb_error *raft_compact_abort(struct raft *raft,
+                                       struct ovsdb_log *dst,
+                                       void *aux)
+    OVS_WARN_UNUSED_RESULT;
+struct ovsdb_error *raft_store_compact(struct ovsdb_log *target,
+                                       const struct json *new_snapshot,
+                                       void *aux)
+    OVS_WARN_UNUSED_RESULT;
 #endif /* lib/raft.h */
diff --git a/ovsdb/storage.c b/ovsdb/storage.c
index c2f28ca5a..fc3d3fb67 100644
--- a/ovsdb/storage.c
+++ b/ovsdb/storage.c
@@ -608,22 +608,6 @@ ovsdb_storage_store_snapshot__(struct ovsdb_storage 
*storage,
     }
 }
 
-/* 'schema' and 'data' should faithfully represent the current schema and data,
- * otherwise the two storing backing formats will yield divergent results.  Use
- * ovsdb_storage_write_schema_change() to change the schema. */
-struct ovsdb_error * OVS_WARN_UNUSED_RESULT
-ovsdb_storage_store_snapshot(struct ovsdb_storage *storage,
-                             const struct json *schema,
-                             const struct json *data, uint64_t index)
-{
-    struct ovsdb_error *error = ovsdb_storage_store_snapshot__(storage,
-                                                               schema, data,
-                                                               index);
-    bool retry_quickly = error != NULL;
-    schedule_next_snapshot(storage, retry_quickly);
-    return error;
-}
-
 struct ovsdb_write * OVS_WARN_UNUSED_RESULT
 ovsdb_storage_write_schema_change(struct ovsdb_storage *storage,
                                   const struct ovsdb_schema *schema,
@@ -673,3 +657,81 @@ ovsdb_storage_precheck_prereq(const struct ovsdb_storage 
*storage,
     }
     return raft_precheck_prereq(storage->raft, prereq);
 }
+
+struct ovsdb_error * OVS_WARN_UNUSED_RESULT
+ovsdb_storage_compact_start(struct ovsdb_storage *storage,
+                            uint64_t index,
+                            struct ovsdb_log **dst,
+                            void **aux)
+{
+    if (storage->raft) {
+        return raft_compact_start(storage->raft, index, dst, aux);
+    } else {
+        return ovsdb_log_compact_start(storage->log, dst);
+    }
+}
+
+struct ovsdb_error * OVS_WARN_UNUSED_RESULT
+ovsdb_storage_compact_commit(struct ovsdb_storage *storage,
+                             struct ovsdb_log *dst,
+                             uint64_t applied_index,
+                             void *aux)
+{
+    struct ovsdb_error *error = NULL;
+    if (storage->raft) {
+        error = raft_compact_commit(storage->raft, dst, applied_index, aux);
+    } else {
+        error = ovsdb_log_compact_commit(storage->log, dst);
+    }
+    schedule_next_snapshot(storage, error != NULL);
+    return error;
+}
+
+struct ovsdb_error * OVS_WARN_UNUSED_RESULT
+ovsdb_storage_compact_abort(struct ovsdb_storage *storage,
+                            struct ovsdb_log *dst,
+                            void *aux)
+{
+    schedule_next_snapshot(storage, true);
+    if (storage->raft) {
+        return raft_compact_abort(storage->raft, dst, aux);
+    } else {
+        return ovsdb_log_compact_abort(storage->log, dst);
+    }
+}
+
+/* 'schema' and 'data' should faithfully represent the current schema and data,
+ * otherwise the two storing backing formats will yield divergent results.  Use
+ * ovsdb_storage_write_schema_change() to change the schema. */
+struct ovsdb_error * OVS_WARN_UNUSED_RESULT
+ovsdb_storage_compact_write(bool is_raft,
+                            struct ovsdb_log *target_log,
+                            const struct json *schema,
+                            struct json *data,
+                            void *aux)
+{
+    struct ovsdb_error *error = NULL;
+    if (is_raft) {
+        struct json *entries = json_array_create_empty();
+        if (schema) {
+            json_array_add(entries, json_clone(schema));
+        }
+        if (data) {
+            json_array_add(entries, data);
+        }
+        error = raft_store_compact(target_log, entries, aux);
+        json_destroy(entries);
+        return error;
+    } else {
+        error = ovsdb_log_write(target_log, schema);
+        if (error) {
+            return error;
+        }
+        error = ovsdb_log_write_and_free(target_log, data);
+        if (error) {
+            return error;
+        }
+        error = ovsdb_log_commit_block(target_log);
+    }
+    return error;
+}
diff --git a/ovsdb/storage.h b/ovsdb/storage.h
index 6e5ef9185..b315343c5 100644
--- a/ovsdb/storage.h
+++ b/ovsdb/storage.h
@@ -23,6 +23,7 @@
 struct json;
 struct ovsdb_schema;
 struct ovsdb_storage;
+struct ovsdb_log;
 struct simap;
 struct uuid;
 
@@ -77,11 +78,6 @@ void ovsdb_write_wait(const struct ovsdb_write *);
 void ovsdb_write_destroy(struct ovsdb_write *);
 
 bool ovsdb_storage_should_compact(struct ovsdb_storage *);
-struct ovsdb_error *ovsdb_storage_store_snapshot(struct ovsdb_storage *storage,
-                                                 const struct json *schema,
-                                                 const struct json *snapshot,
-                                                 uint64_t applied_index)
-    OVS_WARN_UNUSED_RESULT;
 
 struct ovsdb_write *ovsdb_storage_write_schema_change(
     struct ovsdb_storage *,
@@ -101,4 +97,25 @@ struct ovsdb_schema *ovsdb_storage_read_schema(struct 
ovsdb_storage *);
 bool ovsdb_storage_precheck_prereq(const struct ovsdb_storage *,
                                    const struct uuid *prereq);
 
+struct ovsdb_error *ovsdb_storage_compact_start(struct ovsdb_storage *storage,
+                                                uint64_t index,
+                                                struct ovsdb_log **dst,
+                                                void **aux)
+    OVS_WARN_UNUSED_RESULT;
+struct ovsdb_error *ovsdb_storage_compact_commit(struct ovsdb_storage *storage,
+                                                 struct ovsdb_log *dst,
+                                                 uint64_t applied_index,
+                                                 void *aux)
+    OVS_WARN_UNUSED_RESULT;
+struct ovsdb_error *ovsdb_storage_compact_abort(struct ovsdb_storage *storage,
+                                                struct ovsdb_log *dst,
+                                                void *aux)
+    OVS_WARN_UNUSED_RESULT;
+struct ovsdb_error *ovsdb_storage_compact_write(bool is_raft,
+                                                struct ovsdb_log *target_log,
+                                                const struct json *schema,
+                                                struct json *snapshot,
+                                                void *aux)
+    OVS_WARN_UNUSED_RESULT;
+
 #endif /* ovsdb/storage.h */
diff --git a/ovsdb/transaction.c b/ovsdb/transaction.c
index 39650d3b3..d419f32cc 100644
--- a/ovsdb/transaction.c
+++ b/ovsdb/transaction.c
@@ -1261,8 +1261,8 @@ ovsdb_txn_propose_schema_change(struct ovsdb *db,
     struct ovsdb_txn_progress *progress = xzalloc(sizeof *progress);
     progress->storage = db->storage;
 
-    struct ovsdb_write *write = ovsdb_storage_write_schema_change(
-        db->storage, schema, data, &db->prereq, txnid);
+    struct ovsdb_write *write = ovsdb_write_schema_change(
+        db, schema, data, &db->prereq, txnid);
     if (!ovsdb_write_is_complete(write)) {
         progress->write = write;
     } else {
diff --git a/tests/ovsdb-log.at b/tests/ovsdb-log.at
index d50fbbf4d..29304af6a 100644
--- a/tests/ovsdb-log.at
+++ b/tests/ovsdb-log.at
@@ -386,3 +386,162 @@ file: read: end of log
 ]], [ignore])
 AT_CHECK([test -f .file.~lock~])
 AT_CLEANUP
+
+AT_SETUP([ovsdb-log - write one, compact, commit])
+AT_KEYWORDS([ovsdb log])
+AT_CAPTURE_FILE([file])
+for option in '' --no-rename-open-files; do
+    rm -f file
+    AT_CHECK(
+      [[test-ovsdb $option log-io file create \
+          'write:{"a":0}' \
+          'write:{"b":1}' \
+          'compact_start' \
+          'new-write:{"a":0,"b":1}' \
+          'old-write:{"c":2}' \
+          'compact_commit' \
+          'read' \
+          'write:{"a":3}']], [0],
+      [[file: open successful
+file: write:{"a":0} successful
+file: write:{"b":1} successful
+file: compact_start successful
+(temp): write:{"a":0,"b":1} successful
+file: write:{"c":2} successful
+file: compact_commit successful
+file: read: end of log
+file: write:{"a":3} successful
+]])
+    AT_CHECK(
+      [test-ovsdb log-io file read-only read read read read], [0],
+      [[file: open successful
+file: read: {"a":0,"b":1}
+file: read: {"c":2}
+file: read: {"a":3}
+file: read: end of log
+]], [ignore])
+done
+AT_CHECK([test -f .file.~lock~])
+AT_CLEANUP
+
+AT_SETUP([ovsdb-log - write one, compact, abort])
+AT_KEYWORDS([ovsdb log])
+AT_CAPTURE_FILE([file])
+for option in '' --no-rename-open-files; do
+    rm -f file
+    AT_CHECK(
+      [[test-ovsdb $option log-io file create \
+          'write:{"a":0}' \
+          'write:{"b":1}' \
+          'compact_start' \
+          'new-write:{"a":0,"b":1}' \
+          'old-write:{"c":2}' \
+          'compact_abort' \
+          'read' \
+          'write:{"a":3}']], [0],
+      [[file: open successful
+file: write:{"a":0} successful
+file: write:{"b":1} successful
+file: compact_start successful
+(temp): write:{"a":0,"b":1} successful
+file: write:{"c":2} successful
+file: compact_abort successful
+file: read: end of log
+file: write:{"a":3} successful
+]])
+    AT_CHECK(
+      [test-ovsdb log-io file read-only read read read read read], [0],
+      [[file: open successful
+file: read: {"a":0}
+file: read: {"b":1}
+file: read: {"c":2}
+file: read: {"a":3}
+file: read: end of log
+]], [ignore])
+done
+AT_CHECK([test -f .file.~lock~])
+AT_CLEANUP
+
+AT_SETUP([ovsdb-log - write one, non-completed compact])
+AT_KEYWORDS([ovsdb log])
+AT_CAPTURE_FILE([file])
+for option in '' --no-rename-open-files; do
+    rm -f file
+    AT_CHECK(
+      [[test-ovsdb $option log-io file create \
+          'write:{"a":0}' \
+          'write:{"b":1}' \
+          'compact_start' \
+          'new-write:{"a":0,"b":1}' \
+          'old-write:{"c":2}' \
+          'read']], [0],
+      [[file: open successful
+file: write:{"a":0} successful
+file: write:{"b":1} successful
+file: compact_start successful
+(temp): write:{"a":0,"b":1} successful
+file: write:{"c":2} successful
+file: read: end of log
+]])
+    AT_CHECK(
+      [test-ovsdb log-io file read-only read read read read], [0],
+      [[file: open successful
+file: read: {"a":0}
+file: read: {"b":1}
+file: read: {"c":2}
+file: read: end of log
+]], [ignore])
+done
+AT_CHECK([test -f .file.~lock~])
+AT_CLEANUP
+
+AT_SETUP([ovsdb-log - write one, non-completed compact, write, compact])
+AT_KEYWORDS([ovsdb log])
+AT_CAPTURE_FILE([file])
+for option in '' --no-rename-open-files; do
+    rm -f file
+    AT_CHECK(
+      [[test-ovsdb $option log-io file create \
+          'write:{"a":0}' \
+          'write:{"b":1}' \
+          'compact_start' \
+          'new-write:{"a":0,"b":1}' \
+          'old-write:{"c":2}' \
+          'read']], [0],
+      [[file: open successful
+file: write:{"a":0} successful
+file: write:{"b":1} successful
+file: compact_start successful
+(temp): write:{"a":0,"b":1} successful
+file: write:{"c":2} successful
+file: read: end of log
+]])
+    AT_CHECK(
+      [[test-ovsdb $option log-io file read/write \
+          'write:{"d":3}' \
+          'compact_start' \
+          'new-write:{"a":0,"b":1,"c":2,"d":3}' \
+          'old-write:{"e":4}' \
+          'compact_commit' \
+          'write:{"f":5}' \
+          'read']], [0],
+      [[file: open successful
+file: write:{"d":3} successful
+file: compact_start successful
+(temp): write:{"a":0,"b":1,"c":2,"d":3} successful
+file: write:{"e":4} successful
+file: compact_commit successful
+file: write:{"f":5} successful
+file: read: end of log
+]])
+    AT_CHECK(
+      [test-ovsdb log-io file read-only read read read read], [0],
+      [[file: open successful
+file: read: {"a":0,"b":1,"c":2,"d":3}
+file: read: {"e":4}
+file: read: {"f":5}
+file: read: end of log
+]], [ignore])
+done
+AT_CHECK([test -f .file.~lock~])
+AT_CLEANUP
diff --git a/tests/ovsdb-server.at b/tests/ovsdb-server.at
index 57322ccb0..a5eac6116 100644
--- a/tests/ovsdb-server.at
+++ b/tests/ovsdb-server.at
@@ -1159,13 +1159,13 @@ ovs-appctl: ovsdb-server: server returned an error
     fi
 
     # We can't fully re-check the contents of the database log, because the
-    # order of the records is not predictable, but there should only be 4 lines
+    # order of the records is not predictable, but there should only be 6 lines
     # in it now in the standalone case
     AT_CAPTURE_FILE([db])
     compacted_lines=`cat dir/db* | wc -l`
     echo compacted_lines=$compacted_lines
     if test $model = standalone; then
-        AT_CHECK([test $compacted_lines -eq 4])
+        AT_CHECK([test $compacted_lines -eq 6])
     fi
 
     dnl And check that the dumped data is the same too:
diff --git a/tests/test-ovsdb.c b/tests/test-ovsdb.c
index a2b037063..2019fa349 100644
--- a/tests/test-ovsdb.c
+++ b/tests/test-ovsdb.c
@@ -412,6 +412,17 @@ do_log_io(struct ovs_cmdl_context *ctx)
             ovsdb_log_replace_abort(replacement);
             replacement = NULL;
             error = NULL;
+        } else if (!strcmp(command, "compact_start")) {
+            ovs_assert(!replacement);
+            error = ovsdb_log_compact_start(log, &replacement);
+        } else if (!strcmp(command, "compact_commit")) {
+            ovs_assert(replacement);
+            error = ovsdb_log_compact_commit(log, replacement);
+            replacement = NULL;
+        } else if (!strcmp(command, "compact_abort")) {
+            ovs_assert(replacement);
+            error = ovsdb_log_compact_abort(log, replacement);
+            replacement = NULL;
         } else {
             ovs_fatal(0, "unknown log-io command \"%s\"", command);
         }
-- 
2.43.0


_______________________________________________
dev mailing list
d...@openvswitch.org
https://mail.openvswitch.org/mailman/listinfo/ovs-dev

Reply via email to