Changeset: 8f5ebc50168a for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/8f5ebc50168a
Modified Files:
        gdk/gdk_logger.c
        gdk/gdk_logger_internals.h
        gdk/gdk_logger_old.c
Branch: logger-fix
Log Message:

merged with sep2022


diffs (truncated from 433 to 300 lines):

diff --git a/gdk/gdk_logger.c b/gdk/gdk_logger.c
--- a/gdk/gdk_logger.c
+++ b/gdk/gdk_logger.c
@@ -300,7 +300,7 @@ log_read_id(logger *lg, log_id *id)
 #endif
 
 static log_return
-string_reader(logger *lg, char** rbuf, size_t* rbufsize, BAT *b, lng nr)
+string_reader(logger *lg, BAT *b, lng nr)
 {
        size_t sz = 0;
        lng SZ = 0;
@@ -310,22 +310,24 @@ string_reader(logger *lg, char** rbuf, s
                if (mnstr_readLng(lg->input_log, &SZ) != 1)
                        return LOG_EOF;
                sz = (size_t)SZ;
-               if (*rbufsize < sz) {
-                       if (!(*rbuf = GDKrealloc(*rbuf, sz)))
+               char *buf = lg->rbuf;
+               if (lg->rbufsize < sz) {
+                       if (!(buf = GDKrealloc(lg->rbuf, sz)))
                                return LOG_ERR;
-                       *rbufsize = sz;
+                       lg->rbuf = buf;
+                       lg->rbufsize = sz;
                }
 
-               if (mnstr_read(lg->input_log, *rbuf, sz, 1) != 1)
+               if (mnstr_read(lg->input_log, buf, sz, 1) != 1)
                        return LOG_EOF;
                /* handle strings */
-               char *t = *rbuf;
+               char *t = buf;
                /* chunked */
 #define CHUNK_SIZE 1024
                char *strings[CHUNK_SIZE];
                int cur = 0;
 
-               for(; nr>0 && res == LOG_OK && t < (*rbuf+sz); nr--) {
+               for(; nr>0 && res == LOG_OK && t < (buf+sz); nr--) {
                        strings[cur++] = t;
                        if (cur == CHUNK_SIZE && b && BUNappendmulti(b, 
strings, cur, true) != GDK_SUCCEED)
                                res = LOG_ERR;
@@ -368,8 +370,6 @@ log_read_updates(logger *lg, trans *tr, 
        pnr = nr;
        tpe = find_type_nr(lg, type_id);
        if (tpe >= 0) {
-               size_t rbufsize = 64*1024;
-               void * rbuf             = GDKmalloc(rbufsize);
                BAT *uid = NULL;
                BAT *r = NULL;
                void *(*rt) (ptr, size_t *, stream *, size_t) = 
BATatoms[tpe].atomRead;
@@ -379,17 +379,13 @@ log_read_updates(logger *lg, trans *tr, 
                if (!lg->flushing && l->flag == LOG_UPDATE) {
                        uid = COLnew(0, TYPE_oid, (BUN)nr, PERSISTENT);
                        if (uid == NULL) {
-                               GDKfree(rbuf);
                                return LOG_ERR;
                        }
                }
 
                if (l->flag == LOG_UPDATE_CONST) {
-                       if (mnstr_readLng(lg->input_log, &offset) != 1) {
-                               GDKfree(rbuf);
+                       if (mnstr_readLng(lg->input_log, &offset) != 1)
                                return LOG_ERR;
-
-                       }
                        if (cands) {
                                // This const range actually represents a 
segment of candidates corresponding to updated bat entries
 
@@ -413,8 +409,7 @@ log_read_updates(logger *lg, trans *tr, 
                                                }
                                                else
                                                        res = LOG_ERR;
-                                       }
-                                       else {
+                                       } else {
                                                assert((*cands)->ttype == 
TYPE_oid);
                                                assert(BATcount(*cands) > 0);
                                                if (BATappend(*cands, dense, 
NULL, true) != GDK_SUCCEED)
@@ -424,12 +419,11 @@ log_read_updates(logger *lg, trans *tr, 
                                }
 
                                // We have to read the value to update the read 
cursor
-                               size_t tlen = rbufsize;
-                               void *t = rt(rbuf, &tlen, lg->input_log, 1);
+                               size_t tlen = lg->rbufsize;
+                               void *t = rt(lg->rbuf, &tlen, lg->input_log, 1);
                                if (t == NULL) {
                                        res = LOG_ERR;
                                }
-                               GDKfree(rbuf);
                                return res;
                        }
                }
@@ -439,19 +433,18 @@ log_read_updates(logger *lg, trans *tr, 
                        if (r == NULL) {
                                if (uid)
                                        BBPreclaim(uid);
-                               GDKfree(rbuf);
                                return LOG_ERR;
                        }
                }
 
                if (l->flag == LOG_UPDATE_CONST) {
-                       size_t tlen = rbufsize;
-                       void *t = rt(rbuf, &tlen, lg->input_log, 1);
+                       size_t tlen = lg->rbufsize;
+                       void *t = rt(lg->rbuf, &tlen, lg->input_log, 1);
                        if (t == NULL) {
                                res = LOG_ERR;
                        } else {
-                               rbuf = t;
-                               rbufsize = tlen;
+                               lg->rbuf = t;
+                               lg->rbufsize = tlen;
                                for(BUN p = 0; p<(BUN) nr; p++) {
                                        if (r && BUNappend(r, t, true) != 
GDK_SUCCEED)
                                                res = LOG_ERR;
@@ -461,7 +454,6 @@ log_read_updates(logger *lg, trans *tr, 
                        if (mnstr_readLng(lg->input_log, &offset) != 1) {
                                if (r)
                                        BBPreclaim(r);
-                               GDKfree(rbuf);
                                return LOG_ERR;
                        }
                        if (tpe == TYPE_msk) {
@@ -471,44 +463,42 @@ log_read_updates(logger *lg, trans *tr, 
                                        else
                                                res = LOG_ERR;
                                } else {
-                                       size_t tlen = rbufsize/sizeof(int);
+                                       size_t tlen = lg->rbufsize/sizeof(int);
                                        size_t cnt = 0, snr = (size_t)nr;
                                        snr = (snr+31)/32;
                                        assert(tlen);
                                        for (; res == LOG_OK && snr > 0; 
snr-=cnt) {
                                                cnt = snr>tlen?tlen:snr;
-                                               if 
(!mnstr_readIntArray(lg->input_log, rbuf, cnt))
+                                               if 
(!mnstr_readIntArray(lg->input_log, lg->rbuf, cnt))
                                                        res = LOG_ERR;
                                        }
                                }
                        } else {
                                if (!ATOMvarsized(tpe)) {
                                        size_t cnt = 0, snr = (size_t)nr;
-                                       size_t tlen = rbufsize/ATOMsize(tpe), 
ntlen = rbufsize;
+                                       size_t tlen = 
lg->rbufsize/ATOMsize(tpe), ntlen = lg->rbufsize;
                                        assert(tlen);
                                        /* read in chunks of max
                                         * BUFSIZE/width rows */
                                        for (; res == LOG_OK && snr > 0; 
snr-=cnt) {
                                                cnt = snr>tlen?tlen:snr;
-                                               void *t = rt(rbuf, &ntlen, 
lg->input_log, cnt);
+                                               void *t = rt(lg->rbuf, &ntlen, 
lg->input_log, cnt);
 
                                                if (t == NULL) {
                                                        res = LOG_EOF;
                                                        break;
                                                }
-                                               assert(t == rbuf);
+                                               assert(t == lg->rbuf);
                                                if (r && BUNappendmulti(r, t, 
cnt, true) != GDK_SUCCEED)
                                                        res = LOG_ERR;
                                        }
                                } else if (tpe == TYPE_str) {
                                        /* efficient string */
-                                       char* cbuf = rbuf;
-                                       res = string_reader(lg, &cbuf, 
&rbufsize, r, nr);
-                                       rbuf = cbuf;
+                                       res = string_reader(lg, r, nr);
                                } else {
                                        for (; res == LOG_OK && nr > 0; nr--) {
-                                               size_t tlen = rbufsize;
-                                               void *t = rt(rbuf, &tlen, 
lg->input_log, 1);
+                                               size_t tlen = lg->rbufsize;
+                                               void *t = rt(lg->rbuf, &tlen, 
lg->input_log, 1);
 
                                                if (t == NULL) {
                                                        /* see if failure was 
due to
@@ -520,8 +510,8 @@ log_read_updates(logger *lg, trans *tr, 
                                                        else
                                                                res = LOG_ERR;
                                                } else {
-                                                       rbuf = t;
-                                                       rbufsize = tlen;
+                                                       lg->rbuf = t;
+                                                       lg->rbufsize = tlen;
                                                        if (r && BUNappend(r, 
t, true) != GDK_SUCCEED)
                                                                res = LOG_ERR;
                                                }
@@ -568,13 +558,11 @@ log_read_updates(logger *lg, trans *tr, 
                                }
                        } else if (tpe == TYPE_str) {
                                /* efficient string */
-                               char* cbuf = rbuf;
-                               res = string_reader(lg, &cbuf, &rbufsize, r, 
nr);
-                               rbuf = cbuf;
+                               res = string_reader(lg, r, nr);
                        } else {
                                for (; res == LOG_OK && nr > 0; nr--) {
-                                       size_t tlen = rbufsize;
-                                       void *t = rt(rbuf, &tlen, 
lg->input_log, 1);
+                                       size_t tlen = lg->rbufsize;
+                                       void *t = rt(lg->rbuf, &tlen, 
lg->input_log, 1);
 
                                        if (t == NULL) {
                                                if (strstr(GDKerrbuf, "malloc") 
== NULL)
@@ -582,8 +570,8 @@ log_read_updates(logger *lg, trans *tr, 
                                                else
                                                        res = LOG_ERR;
                                        } else {
-                                               rbuf = t;
-                                               rbufsize = tlen;
+                                               lg->rbuf = t;
+                                               lg->rbufsize = tlen;
                                                if ((r && BUNappend(r, t, true) 
!= GDK_SUCCEED))
                                                        res = LOG_ERR;
                                        }
@@ -633,7 +621,6 @@ log_read_updates(logger *lg, trans *tr, 
                        else if (uid)
                                BBPreclaim(uid);
                }
-               GDKfree(rbuf);
        } else {
                /* bat missing ERROR or ignore ? currently error. */
                res = LOG_ERR;
@@ -1066,7 +1053,6 @@ log_open_output(logger *lg)
 {
        logged_range *new_range = 
(logged_range*)GDKmalloc(sizeof(logged_range));
 
-
        if (!new_range) {
                TRC_CRITICAL(GDK, "allocation failure\n");
                return GDK_FAIL;
@@ -2139,7 +2125,8 @@ log_load(int debug, const char *fn, cons
        MT_lock_destroy(&lg->rotation_lock);
        GDKfree(lg->fn);
        GDKfree(lg->dir);
-       GDKfree(lg->buf);
+       GDKfree(lg->rbuf);
+       GDKfree(lg->wbuf);
        GDKfree(lg);
        GDKdebug = dbg;
        return GDK_FAIL;
@@ -2185,13 +2172,17 @@ log_new(int debug, const char *fn, const
        }
        lg->fn = GDKstrdup(fn);
        lg->dir = GDKstrdup(filename);
-       lg->bufsize = 64*1024;
-       lg->buf = GDKmalloc(lg->bufsize);
-       if (lg->fn == NULL || lg->dir == NULL || lg->buf == NULL) {
+       lg->rbufsize = 64*1024;
+       lg->rbuf = GDKmalloc(lg->rbufsize);
+       lg->wbufsize = 64*1024;
+       lg->wbuf = GDKmalloc(lg->wbufsize);
+       if (lg->fn == NULL || lg->dir == NULL ||
+           lg->rbuf == NULL || lg->wbuf == NULL) {
                TRC_CRITICAL(GDK, "strdup failed\n");
                GDKfree(lg->fn);
                GDKfree(lg->dir);
-               GDKfree(lg->buf);
+               GDKfree(lg->rbuf);
+               GDKfree(lg->wbuf);
                GDKfree(lg);
                return NULL;
        }
@@ -2216,7 +2207,7 @@ do_flush_range_cleanup(logger* lg) {
        rotation_lock(lg);
        logged_range* frange = lg->flush_ranges;
        logged_range* first = frange;
-       
+
        while ( frange->next) {
                if (ATOMIC_GET(&frange->refcount) > 1)
                        break;
@@ -2289,7 +2280,8 @@ log_destroy(logger *lg)
        ATOMIC_DESTROY(&lg->nr_flushers);
        GDKfree(lg->fn);
        GDKfree(lg->dir);
-       GDKfree(lg->buf);
+       GDKfree(lg->rbuf);
+       GDKfree(lg->wbuf);
        GDKfree(lg);
 }
 
@@ -2529,9 +2521,9 @@ log_constant(logger *lg, int type, ptr v
 static gdk_return
 string_writer(logger *lg, BAT *b, lng offset, lng nr)
 {
-       size_t bufsz = lg->bufsize, resize = 0;
+       size_t bufsz = lg->wbufsize, resize = 0;
        BUN end = (BUN)(offset + nr);
-       char *buf = lg->buf;
+       char *buf = lg->wbuf;
        gdk_return res = GDK_SUCCEED;
 
        if (!buf)
@@ -2541,12 +2533,12 @@ string_writer(logger *lg, BAT *b, lng of
_______________________________________________
checkin-list mailing list -- checkin-list@monetdb.org
To unsubscribe send an email to checkin-list-le...@monetdb.org

Reply via email to