Changeset: 8f5ebc50168a for MonetDB URL: https://dev.monetdb.org/hg/MonetDB/rev/8f5ebc50168a Modified Files: gdk/gdk_logger.c gdk/gdk_logger_internals.h gdk/gdk_logger_old.c Branch: logger-fix Log Message:
merged with sep2022 diffs (truncated from 433 to 300 lines): diff --git a/gdk/gdk_logger.c b/gdk/gdk_logger.c --- a/gdk/gdk_logger.c +++ b/gdk/gdk_logger.c @@ -300,7 +300,7 @@ log_read_id(logger *lg, log_id *id) #endif static log_return -string_reader(logger *lg, char** rbuf, size_t* rbufsize, BAT *b, lng nr) +string_reader(logger *lg, BAT *b, lng nr) { size_t sz = 0; lng SZ = 0; @@ -310,22 +310,24 @@ string_reader(logger *lg, char** rbuf, s if (mnstr_readLng(lg->input_log, &SZ) != 1) return LOG_EOF; sz = (size_t)SZ; - if (*rbufsize < sz) { - if (!(*rbuf = GDKrealloc(*rbuf, sz))) + char *buf = lg->rbuf; + if (lg->rbufsize < sz) { + if (!(buf = GDKrealloc(lg->rbuf, sz))) return LOG_ERR; - *rbufsize = sz; + lg->rbuf = buf; + lg->rbufsize = sz; } - if (mnstr_read(lg->input_log, *rbuf, sz, 1) != 1) + if (mnstr_read(lg->input_log, buf, sz, 1) != 1) return LOG_EOF; /* handle strings */ - char *t = *rbuf; + char *t = buf; /* chunked */ #define CHUNK_SIZE 1024 char *strings[CHUNK_SIZE]; int cur = 0; - for(; nr>0 && res == LOG_OK && t < (*rbuf+sz); nr--) { + for(; nr>0 && res == LOG_OK && t < (buf+sz); nr--) { strings[cur++] = t; if (cur == CHUNK_SIZE && b && BUNappendmulti(b, strings, cur, true) != GDK_SUCCEED) res = LOG_ERR; @@ -368,8 +370,6 @@ log_read_updates(logger *lg, trans *tr, pnr = nr; tpe = find_type_nr(lg, type_id); if (tpe >= 0) { - size_t rbufsize = 64*1024; - void * rbuf = GDKmalloc(rbufsize); BAT *uid = NULL; BAT *r = NULL; void *(*rt) (ptr, size_t *, stream *, size_t) = BATatoms[tpe].atomRead; @@ -379,17 +379,13 @@ log_read_updates(logger *lg, trans *tr, if (!lg->flushing && l->flag == LOG_UPDATE) { uid = COLnew(0, TYPE_oid, (BUN)nr, PERSISTENT); if (uid == NULL) { - GDKfree(rbuf); return LOG_ERR; } } if (l->flag == LOG_UPDATE_CONST) { - if (mnstr_readLng(lg->input_log, &offset) != 1) { - GDKfree(rbuf); + if (mnstr_readLng(lg->input_log, &offset) != 1) return LOG_ERR; - - } if (cands) { // This const range actually represents a segment of candidates corresponding to updated bat entries @@ -413,8 +409,7 @@ log_read_updates(logger *lg, trans *tr, } else res = LOG_ERR; - } - else { + } else { assert((*cands)->ttype == TYPE_oid); assert(BATcount(*cands) > 0); if (BATappend(*cands, dense, NULL, true) != GDK_SUCCEED) @@ -424,12 +419,11 @@ log_read_updates(logger *lg, trans *tr, } // We have to read the value to update the read cursor - size_t tlen = rbufsize; - void *t = rt(rbuf, &tlen, lg->input_log, 1); + size_t tlen = lg->rbufsize; + void *t = rt(lg->rbuf, &tlen, lg->input_log, 1); if (t == NULL) { res = LOG_ERR; } - GDKfree(rbuf); return res; } } @@ -439,19 +433,18 @@ log_read_updates(logger *lg, trans *tr, if (r == NULL) { if (uid) BBPreclaim(uid); - GDKfree(rbuf); return LOG_ERR; } } if (l->flag == LOG_UPDATE_CONST) { - size_t tlen = rbufsize; - void *t = rt(rbuf, &tlen, lg->input_log, 1); + size_t tlen = lg->rbufsize; + void *t = rt(lg->rbuf, &tlen, lg->input_log, 1); if (t == NULL) { res = LOG_ERR; } else { - rbuf = t; - rbufsize = tlen; + lg->rbuf = t; + lg->rbufsize = tlen; for(BUN p = 0; p<(BUN) nr; p++) { if (r && BUNappend(r, t, true) != GDK_SUCCEED) res = LOG_ERR; @@ -461,7 +454,6 @@ log_read_updates(logger *lg, trans *tr, if (mnstr_readLng(lg->input_log, &offset) != 1) { if (r) BBPreclaim(r); - GDKfree(rbuf); return LOG_ERR; } if (tpe == TYPE_msk) { @@ -471,44 +463,42 @@ log_read_updates(logger *lg, trans *tr, else res = LOG_ERR; } else { - size_t tlen = rbufsize/sizeof(int); + size_t tlen = lg->rbufsize/sizeof(int); size_t cnt = 0, snr = (size_t)nr; snr = (snr+31)/32; assert(tlen); for (; res == LOG_OK && snr > 0; snr-=cnt) { cnt = snr>tlen?tlen:snr; - if (!mnstr_readIntArray(lg->input_log, rbuf, cnt)) + if (!mnstr_readIntArray(lg->input_log, lg->rbuf, cnt)) res = LOG_ERR; } } } else { if (!ATOMvarsized(tpe)) { size_t cnt = 0, snr = (size_t)nr; - size_t tlen = rbufsize/ATOMsize(tpe), ntlen = rbufsize; + size_t tlen = lg->rbufsize/ATOMsize(tpe), ntlen = lg->rbufsize; assert(tlen); /* read in chunks of max * BUFSIZE/width rows */ for (; res == LOG_OK && snr > 0; snr-=cnt) { cnt = snr>tlen?tlen:snr; - void *t = rt(rbuf, &ntlen, lg->input_log, cnt); + void *t = rt(lg->rbuf, &ntlen, lg->input_log, cnt); if (t == NULL) { res = LOG_EOF; break; } - assert(t == rbuf); + assert(t == lg->rbuf); if (r && BUNappendmulti(r, t, cnt, true) != GDK_SUCCEED) res = LOG_ERR; } } else if (tpe == TYPE_str) { /* efficient string */ - char* cbuf = rbuf; - res = string_reader(lg, &cbuf, &rbufsize, r, nr); - rbuf = cbuf; + res = string_reader(lg, r, nr); } else { for (; res == LOG_OK && nr > 0; nr--) { - size_t tlen = rbufsize; - void *t = rt(rbuf, &tlen, lg->input_log, 1); + size_t tlen = lg->rbufsize; + void *t = rt(lg->rbuf, &tlen, lg->input_log, 1); if (t == NULL) { /* see if failure was due to @@ -520,8 +510,8 @@ log_read_updates(logger *lg, trans *tr, else res = LOG_ERR; } else { - rbuf = t; - rbufsize = tlen; + lg->rbuf = t; + lg->rbufsize = tlen; if (r && BUNappend(r, t, true) != GDK_SUCCEED) res = LOG_ERR; } @@ -568,13 +558,11 @@ log_read_updates(logger *lg, trans *tr, } } else if (tpe == TYPE_str) { /* efficient string */ - char* cbuf = rbuf; - res = string_reader(lg, &cbuf, &rbufsize, r, nr); - rbuf = cbuf; + res = string_reader(lg, r, nr); } else { for (; res == LOG_OK && nr > 0; nr--) { - size_t tlen = rbufsize; - void *t = rt(rbuf, &tlen, lg->input_log, 1); + size_t tlen = lg->rbufsize; + void *t = rt(lg->rbuf, &tlen, lg->input_log, 1); if (t == NULL) { if (strstr(GDKerrbuf, "malloc") == NULL) @@ -582,8 +570,8 @@ log_read_updates(logger *lg, trans *tr, else res = LOG_ERR; } else { - rbuf = t; - rbufsize = tlen; + lg->rbuf = t; + lg->rbufsize = tlen; if ((r && BUNappend(r, t, true) != GDK_SUCCEED)) res = LOG_ERR; } @@ -633,7 +621,6 @@ log_read_updates(logger *lg, trans *tr, else if (uid) BBPreclaim(uid); } - GDKfree(rbuf); } else { /* bat missing ERROR or ignore ? currently error. */ res = LOG_ERR; @@ -1066,7 +1053,6 @@ log_open_output(logger *lg) { logged_range *new_range = (logged_range*)GDKmalloc(sizeof(logged_range)); - if (!new_range) { TRC_CRITICAL(GDK, "allocation failure\n"); return GDK_FAIL; @@ -2139,7 +2125,8 @@ log_load(int debug, const char *fn, cons MT_lock_destroy(&lg->rotation_lock); GDKfree(lg->fn); GDKfree(lg->dir); - GDKfree(lg->buf); + GDKfree(lg->rbuf); + GDKfree(lg->wbuf); GDKfree(lg); GDKdebug = dbg; return GDK_FAIL; @@ -2185,13 +2172,17 @@ log_new(int debug, const char *fn, const } lg->fn = GDKstrdup(fn); lg->dir = GDKstrdup(filename); - lg->bufsize = 64*1024; - lg->buf = GDKmalloc(lg->bufsize); - if (lg->fn == NULL || lg->dir == NULL || lg->buf == NULL) { + lg->rbufsize = 64*1024; + lg->rbuf = GDKmalloc(lg->rbufsize); + lg->wbufsize = 64*1024; + lg->wbuf = GDKmalloc(lg->wbufsize); + if (lg->fn == NULL || lg->dir == NULL || + lg->rbuf == NULL || lg->wbuf == NULL) { TRC_CRITICAL(GDK, "strdup failed\n"); GDKfree(lg->fn); GDKfree(lg->dir); - GDKfree(lg->buf); + GDKfree(lg->rbuf); + GDKfree(lg->wbuf); GDKfree(lg); return NULL; } @@ -2216,7 +2207,7 @@ do_flush_range_cleanup(logger* lg) { rotation_lock(lg); logged_range* frange = lg->flush_ranges; logged_range* first = frange; - + while ( frange->next) { if (ATOMIC_GET(&frange->refcount) > 1) break; @@ -2289,7 +2280,8 @@ log_destroy(logger *lg) ATOMIC_DESTROY(&lg->nr_flushers); GDKfree(lg->fn); GDKfree(lg->dir); - GDKfree(lg->buf); + GDKfree(lg->rbuf); + GDKfree(lg->wbuf); GDKfree(lg); } @@ -2529,9 +2521,9 @@ log_constant(logger *lg, int type, ptr v static gdk_return string_writer(logger *lg, BAT *b, lng offset, lng nr) { - size_t bufsz = lg->bufsize, resize = 0; + size_t bufsz = lg->wbufsize, resize = 0; BUN end = (BUN)(offset + nr); - char *buf = lg->buf; + char *buf = lg->wbuf; gdk_return res = GDK_SUCCEED; if (!buf) @@ -2541,12 +2533,12 @@ string_writer(logger *lg, BAT *b, lng of _______________________________________________ checkin-list mailing list -- checkin-list@monetdb.org To unsubscribe send an email to checkin-list-le...@monetdb.org