Changeset: 216dfc9aa75e for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=216dfc9aa75e
Modified Files:
        sql/storage/bat/bat_logger.c
        sql/storage/store.c
Branch: hot-snapshot
Log Message:

Parse BBP.dir to find the bats that need copying


diffs (265 lines):

diff --git a/sql/storage/bat/bat_logger.c b/sql/storage/bat/bat_logger.c
--- a/sql/storage/bat/bat_logger.c
+++ b/sql/storage/bat/bat_logger.c
@@ -884,7 +884,7 @@ bl_find_table_value(const char *tabnam, 
  * That part of the file must remain unchanged until the plan is executed.
  */
 static void
-snapshot_lazy_copy_file(stream *plan, char *name, long extent)
+snapshot_lazy_copy_file(stream *plan, const char *name, long extent)
 {
        mnstr_printf(plan, "c %ld %s\n", extent, name);
 }
@@ -910,6 +910,8 @@ snapshot_immediate_copy_file(stream *pla
        }
        size = (long)statbuf.st_size;
 
+       // TODO we should really do this in pieces, 
+       // not allocate memory for the whole file
        buf = GDKmalloc(size);
        if (!buf) {
                GDKerror("malloc failed");
@@ -971,57 +973,166 @@ snapshot_wal(stream *plan, const char *d
        snapshot_immediate_copy_file(plan, log_file, log_file + strlen(db_dir) 
+ 1);
 
        snprintf(log_file, sizeof(log_file), "%s%s." LLFMT, bat_logger->dir, 
LOGFILE, bat_logger->id);
-       FILE *f = getFile(log);
-       long pos = ftell(f);
-       if (pos < 0) {
-               GDKerror("ftell failed on %s: %s", log_file, strerror(errno));
+       long extent = getFileSize(log);
+
+       snapshot_lazy_copy_file(plan, log_file, extent);
+
+       return GDK_SUCCEED;
+}
+
+/* If `path` exists, lazy copy it with name `name`.
+ * Otherwise, if `alt_path` exists, lazy copy it with name `alt_name`.
+ * 
+ * If `mandatory` is set, it is an error for both files not to exist.
+ * 
+ * This interface is slightly messy but of all I tried this gives the most
+ * readable code.
+ */
+static gdk_return
+snapshot_one_heap(stream *plan, bool mandatory, const char *path, const char 
*name, const char *alt_path, const char *alt_name)
+{
+       struct stat statbuf;
+       fprintf(stderr, "#cp %s=%s\n", name, path);
+
+       if (stat(path, &statbuf) == 0) {
+               snapshot_lazy_copy_file(plan, name, statbuf.st_size);
+               return GDK_SUCCEED;
+       }
+       if (errno != ENOENT) {
+               GDKerror("Error stat'ing %s: %s", path, strerror(errno));
                return GDK_FAIL;
        }
-       
-       // there is some unflushed data in that stream so not all bytes 
reported by ftell are
-       // actually on disk. Is it safe to flush it? I don't know.
-       mnstr_flush(log);
 
-       snapshot_lazy_copy_file(plan, log_file, pos);
+       if (stat(alt_path, &statbuf) == 0) {
+               snapshot_lazy_copy_file(plan, alt_name, statbuf.st_size);
+               return GDK_SUCCEED;
+       }
+       if (errno != ENOENT) {
+               GDKerror("Error stat'ing %s: %s", alt_path, strerror(errno));
+               return GDK_FAIL;
+       }
+
+       if (mandatory) {
+               GDKerror("One of %s and %s must exist", path, alt_path);
+               return GDK_FAIL;
+       }
 
        return GDK_SUCCEED;
 }
 
-/* Add plan entries for all persistent BATs */
+/* Add plan entry for the heaps of one BAT.
+ * path_buffer points at a buffer containing the initial part of 
+ * the absolute path of the heap file, for example /tmp/mydatabase/bat/07/726.
+ * This function attempts to add suffixes such as .tail, .theap etc
+ * and copies those files if they exist.
+ * local_part_index is the number of byte to skip to get to the local
+ * part of the filename, in this example skipping the "/tmp/mydatabase/"
+ * 
+ * Note: path_buffer must have room for this function to append the suffixes
+ * in-place!
+ */
+static gdk_return
+snapshot_one_bat(stream *plan, char *path_buffer, char *alt_path_buffer, 
size_t local_part_index)
+{
+       static const char *suffixes[] = { "M.tail", "O.theap", "O.tvheap", 
"O.torderidx", "O.timprint", NULL };
+       //TODO check the above
+       gdk_return ret = GDK_FAIL;
+       char *tail = path_buffer + strlen(path_buffer);
+       char *alt_tail = alt_path_buffer + strlen(alt_path_buffer);
+
+       for (const char **suf = &suffixes[0]; *suf; suf++) {
+               bool mandatory;
+               switch (**suf) {
+                       case 'M':
+                               mandatory = true;
+                               break;
+                       case 'O':
+                               mandatory = false;
+                               break;
+                       default:
+                               GDKfatal("%s does not start with either M or 
O", *suf);
+               }
+               strcpy(tail, *suf + 1);
+               strcpy(alt_tail, *suf + 1);
+               ret = snapshot_one_heap(
+                       plan, mandatory, 
+                       path_buffer, path_buffer + local_part_index,
+                       alt_path_buffer, alt_path_buffer + local_part_index);
+               if (ret != GDK_SUCCEED)
+                       goto end;
+       }
+
+       ret = GDK_SUCCEED;
+end:
+       *tail = '\0';
+       *alt_tail = '\0';
+       return ret;
+}
+/* Add plan entries for all persistent BATs by looping over the BBP.dir.
+ * Also include the BBP.dir itself.
+ */
 static gdk_return
 snapshot_bats(stream *plan, const char *db_dir)
 {
        char bbpdir[FILENAME_MAX];
-       bat active_bats;
-       gdk_return ret;
+       stream *cat = NULL;
+       char line[1024];
+       int gdk_version;
+       char heapfile[FILENAME_MAX];
+       char alt_heapfile[FILENAME_MAX];
+       size_t heapfile_pos, alt_heapfile_pos;
+       int id;
+       gdk_return ret = GDK_FAIL;
 
-       active_bats = getBBPsize();
-
-       snprintf(bbpdir, sizeof(bbpdir), "%s%c%s%c%s", db_dir, DIR_SEP, BAKDIR, 
DIR_SEP, "BBP.dir");
+       snprintf(bbpdir, FILENAME_MAX, "%s/%s/%s", db_dir, BAKDIR, "BBP.dir");
        ret = snapshot_immediate_copy_file(plan, bbpdir, bbpdir + 
strlen(db_dir) + 1);
        if (ret == GDK_FAIL)
                goto end;
 
-       for (bat id = 1; id < active_bats; id++) {
-               if (BBP_status(id) & BBPPERSISTENT) {
-                       BAT *b = BBP_desc(id);
-                       snapshot_lazy_copy_heap(plan, &b->theap);
-                       if (b->tvheap)
-                               snapshot_lazy_copy_heap(plan, b->tvheap);
-                       if (b->torderidx)
-                               snapshot_lazy_copy_heap(plan, b->torderidx);
-
-                       // Not sure why I commented this out
-                       // if (b->thash)
-                       //      snapshot_lazy_copy_heap(plan, &b->thash->heap);
-
-                       // Hmm.. the definition of b->timprints not available 
here so
-                       // b->timprints->heap is unreachable. Hopefully the 
system can 
-                       //reconstruct the imprints on demand.
-               }
+       // Open the catalog and parse the header
+       cat = open_rastream(bbpdir);
+       if (cat == NULL) {
+               GDKerror("Could not open %s for reading", bbpdir);
+               goto end;
+       }
+       if (mnstr_readline(cat, line, sizeof(line)) < 0) {
+               GDKerror("Could not read first line of %s", bbpdir);
+               goto end;
+       }
+       if (sscanf(line, "BBP.dir, GDKversion %d", &gdk_version) != 1) {
+               GDKerror("Invalid first line of %s", bbpdir);
+               goto end;
+       }
+       if (gdk_version != 25122) { // do not hardcode this
+               GDKerror("Cannot snapshot this gdk version");
+               goto end;
+       }
+       if (mnstr_readline(cat, line, sizeof(line)) < 0) {
+               GDKerror("Couldn't skip the second line of %s", bbpdir);
+               goto end;
+       }
+       if (mnstr_readline(cat, line, sizeof(line)) < 0) {
+               GDKerror("Couldn't skip the third line of %s", bbpdir);
+               goto end;
        }
 
+       heapfile_pos = snprintf(heapfile, FILENAME_MAX, "%s/%s/%s/", db_dir, 
BAKDIR, BATDIR);
+       alt_heapfile_pos = snprintf(alt_heapfile, FILENAME_MAX, "%s/%s/", 
db_dir, BATDIR);
+       while (mnstr_readline(cat, line, sizeof(line)) > 0) {
+               if (sscanf(line, "%d %*s %*s %s", &id, heapfile + heapfile_pos) 
!= 2) {
+                       GDKerror("Couldn't parse %s line: %s", bbpdir, line);
+                       goto end;
+               }
+               strcpy(alt_heapfile + alt_heapfile_pos, heapfile + 
heapfile_pos);
+               snapshot_one_bat(plan, heapfile, alt_heapfile, strlen(db_dir) + 
1);
+       }
+
+
 end:
+       if (cat) {
+               close_stream(cat);
+       }
+       (void)snapshot_lazy_copy_heap;
        return ret;
 }
 
diff --git a/sql/storage/store.c b/sql/storage/store.c
--- a/sql/storage/store.c
+++ b/sql/storage/store.c
@@ -2466,10 +2466,17 @@ static gdk_return
 tar_copy_data(stream *tarfile, const char *path, time_t mtime, stream 
*contents, ssize_t size)
 {
        gdk_return ret = GDK_FAIL;
+       ssize_t file_size = getFileSize(contents);
        const ssize_t bufsize = 64 * 1024;
        char *buf = malloc(bufsize);
        ssize_t nbytes;
 
+       fprintf(stderr, "#need to copy %ld/%ld bytes of component %s\n", size, 
file_size, path);
+       if (file_size < size) {
+               fprintf(stderr, "#adjusting amount to copy to %ld\n", 
file_size);
+               size = file_size;
+       }
+
        if (!buf) {
                GDKerror("could not allocate buffer");
                goto end;
@@ -2488,7 +2495,6 @@ tar_copy_data(stream *tarfile, const cha
                chunk -= chunk % TAR_BLOCK_SIZE;
                assert(chunk > 0);
                assert(chunk % TAR_BLOCK_SIZE == 0);
-               fprintf(stderr, "#copying %ld/%ld bytes of component %s\n", 
chunk, size, path);
                nbytes = mnstr_read(contents, buf, 1, chunk);
                if (nbytes != chunk) {
                        GDKerror("Read only %ld/%ld bytes of component %s: %s", 
nbytes, chunk, path, mnstr_error(contents));
@@ -2500,7 +2506,6 @@ tar_copy_data(stream *tarfile, const cha
                        goto end;
                }
                size -= chunk;
-               fprintf(stderr, "# %ld left\n", size);
        }
 
        if (size > 0) {
@@ -2515,7 +2520,6 @@ tar_copy_data(stream *tarfile, const cha
                        GDKerror("Only wrote %ld/%ld bytes of block of 
component %s to tar file: %s", nbytes, size, path, mnstr_error(tarfile));
                        goto end;
                }
-               fprintf(stderr, "#tail written\n");
        }
 
        ret = GDK_SUCCEED;
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to