Changeset: e2da898a6a26 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/e2da898a6a26
Modified Files:
        sql/storage/store.c
Branch: nilmask
Log Message:

merged with default


diffs (truncated from 690 to 300 lines):

diff --git a/documentation/source/manual_pages/mclient.rst 
b/documentation/source/manual_pages/mclient.rst
--- a/documentation/source/manual_pages/mclient.rst
+++ b/documentation/source/manual_pages/mclient.rst
@@ -196,7 +196,8 @@ SQL Options
 
 **--rows=**\ *nr* (**-r** *nr*)
    If specified, query results will be paged by an internal pager at the
-   specified number of lines.
+   specified number of lines. If set to **0** (zero), use the height of
+   the terminal. The default is **-1** which means no pager is used.
 
 **--width=**\ *nr* (**-w** *nr*)
    Specify the width of the screen. The default is the (initial) width
@@ -260,7 +261,8 @@ General Commands
 
 **\\r** *rows*
    Use an internal pager using *rows* per page. If *rows* is **-1**,
-   stop using the internal pager.
+   stop using the internal pager, if *rows* is **0**, use the height of
+   the terminal.
 
 SQL Commands
 ------------
diff --git a/gdk/gdk_logger.c b/gdk/gdk_logger.c
--- a/gdk/gdk_logger.c
+++ b/gdk/gdk_logger.c
@@ -35,7 +35,7 @@ static gdk_return log_del_bat(logger *lg
 #define LOG_CREATE     5
 #define LOG_DESTROY    6
 #define LOG_SEQ                7
-#define LOG_CLEAR      8 // DEPRECATED
+#define LOG_CLEAR      8 /* DEPRECATED */
 #define LOG_BAT_GROUP  9
 
 #ifdef NATIVE_WIN32
@@ -61,7 +61,7 @@ static const char *log_commands[] = {
        "LOG_CREATE",
        "LOG_DESTROY",
        "LOG_SEQ",
-       "", // LOG_CLEAR IS DEPRECATED
+       "", /* LOG_CLEAR IS DEPRECATED */
        "LOG_BAT_GROUP",
 };
 
@@ -376,10 +376,10 @@ log_read_updates(logger *lg, trans *tr, 
                        if (mnstr_readLng(lg->input_log, &offset) != 1)
                                return LOG_ERR;
                        if (cands) {
-                               // This const range actually represents a 
segment of candidates corresponding to updated bat entries
+                               /* This const range actually represents a 
segment of candidates corresponding to updated bat entries */
 
                                if (BATcount(*cands) == 0 || lg->flushing) {
-                                       // when flushing, we only need the 
offset and count of the last segment of inserts.
+                                       /* when flushing, we only need the 
offset and count of the last segment of inserts. */
                                        assert((*cands)->ttype == TYPE_void);
                                        BATtseqbase(*cands, (oid) offset);
                                        BATsetcount(*cands, (BUN) nr);
@@ -407,7 +407,7 @@ log_read_updates(logger *lg, trans *tr, 
                                        BBPreclaim(dense);
                                }
 
-                               // We have to read the value to update the read 
cursor
+                               /* We have to read the value to update the read 
cursor */
                                size_t tlen = lg->rbufsize;
                                void *t = rt(lg->rbuf, &tlen, lg->input_log, 1);
                                if (t == NULL) {
@@ -573,7 +573,7 @@ log_read_updates(logger *lg, trans *tr, 
                        if (tr_grow(tr) == GDK_SUCCEED) {
                                tr->changes[tr->nr].type = l->flag;
                                if (l->flag==LOG_UPDATE_BULK && offset == -1) {
-                                       assert(cands); // bat r is part of a 
group of bats logged together.
+                                       assert(cands); /* bat r is part of a 
group of bats logged together. */
                                        struct canditer ci;
                                        canditer_init(&ci, NULL, *cands);
                                        const oid first = canditer_peek(&ci);
@@ -1077,15 +1077,14 @@ log_open_output(logger *lg)
        }
        ATOMIC_INIT(&new_range->refcount, 1);
        ATOMIC_INIT(&new_range->last_ts, 0);
-       ATOMIC_INIT(&new_range->end, 0);
-       ATOMIC_INIT(&new_range->pend, 0);
-       ATOMIC_INIT(&new_range->flushed_end, 0);
-       ATOMIC_INIT(&new_range->drops, 0);
+       ATOMIC_INIT(&new_range->flushed_ts, 0);
+       new_range->drops = 0;
        new_range->id = lg->id;
        new_range->next = NULL;
        logged_range* current = lg->current;
        assert(current && current->next == NULL);
        current->next = new_range;
+       ATOMIC_INC(&lg->nr_open_files);
        return GDK_SUCCEED;
 }
 
@@ -1107,6 +1106,7 @@ log_close_output(logger *lg)
                close_stream(lg->current->output_log);
        }
        lg->current->output_log = NULL;
+       ATOMIC_INC(&lg->nr_open_files);
 }
 
 static gdk_return
@@ -1154,7 +1154,7 @@ log_read_transaction(logger *lg)
        if (!lg->flushing)
                ATOMIC_AND(&GDKdebug, ~CHECKMASK);
 
-       BAT* cands = NULL; // used in case of LOG_BAT_GROUP
+       BAT* cands = NULL; /* used in case of LOG_BAT_GROUP */
 
        while (err == LOG_OK && (ok=log_read_format(lg, &l))) {
                if (l.flag == 0 && l.id == 0) {
@@ -1225,7 +1225,7 @@ log_read_transaction(logger *lg)
                                err = LOG_EOF;
                        else {
                                if (l.id > 0) {
-                                       // START OF LOG_BAT_GROUP
+                                       /* START OF LOG_BAT_GROUP */
                                        cands = COLnew(0, TYPE_void, 0, 
SYSTRANS);
                                        if (!cands)
                                                err = LOG_ERR;
@@ -1234,7 +1234,7 @@ log_read_transaction(logger *lg)
                                         * above option earlier */
                                        err = LOG_ERR;
                                } else {
-                                       // END OF LOG_BAT_GROUP
+                                       /* END OF LOG_BAT_GROUP */
                                        BBPunfix(cands->batCacheid);
                                        cands = NULL;
                                }
@@ -1315,7 +1315,6 @@ log_readlog(logger *lg, const char *file
         * (even if we would log aborts in the logs). So we simply
         * abort and move to the next log file */
        return err == LOG_ERR ? GDK_FAIL : GDK_SUCCEED;
-       //return GDK_SUCCEED;
 }
 
 /*
@@ -1380,8 +1379,8 @@ check_version(logger *lg, FILE *fp, cons
                }
                /* old_logger_load always closes fp */
                if (old_logger_load(lg, fn, logdir, fp, version, filename) != 
GDK_SUCCEED) {
-                       //loads drop no longer needed catalog, snapshots bats
-                       //convert catalog_oid -> catalog_id (lng->int)
+                       /*loads drop no longer needed catalog, snapshots bats */
+                       /*convert catalog_oid -> catalog_id (lng->int) */
                        GDKerror("Incompatible database version %06d, "
                                 "this server supports version %06d.\n%s",
                                 version, lg->version,
@@ -2122,6 +2121,7 @@ log_load(const char *fn, const char *log
        logbat_destroy(lg->dseqs);
        ATOMIC_DESTROY(&lg->current->refcount);
        ATOMIC_DESTROY(&lg->nr_flushers);
+       ATOMIC_DESTROY(&lg->nr_open_files);
        MT_lock_destroy(&lg->lock);
        MT_lock_destroy(&lg->rotation_lock);
        GDKfree(lg->fn);
@@ -2194,6 +2194,7 @@ log_new(int debug, const char *fn, const
        MT_lock_init(&lg->flush_lock, "flush_lock");
        MT_cond_init(&lg->excl_flush_cv);
        ATOMIC_INIT(&lg->nr_flushers, 0);
+       ATOMIC_INIT(&lg->nr_open_files, 0);
 
        if (log_load(fn, logdir, lg, filename) == GDK_SUCCEED) {
                return lg;
@@ -2208,7 +2209,7 @@ do_flush_range_cleanup(logger* lg)
        logged_range* frange = lg->flush_ranges;
        logged_range* first = frange;
 
-       while ( frange->next) {
+       while (frange->next) {
                if (ATOMIC_GET(&frange->refcount) > 1)
                        break;
                frange = frange->next;
@@ -2229,6 +2230,7 @@ do_flush_range_cleanup(logger* lg)
                        TRC_INFO(WAL, "closing output log %s", 
mnstr_name(frange->output_log));
                        close_stream(frange->output_log);
                        frange->output_log = NULL;
+                       ATOMIC_INC(&lg->nr_open_files);
                }
        }
        return flast;
@@ -2246,10 +2248,7 @@ log_destroy(logger *lg)
                logged_range *n = p->next;
                ATOMIC_DESTROY(&p->refcount);
                ATOMIC_DESTROY(&p->last_ts);
-               ATOMIC_DESTROY(&p->end);
-               ATOMIC_DESTROY(&p->pend);
-               ATOMIC_DESTROY(&p->flushed_end);
-               ATOMIC_DESTROY(&p->drops);
+               ATOMIC_DESTROY(&p->flushed_ts);
                GDKfree(p);
                p = n;
        }
@@ -2286,6 +2285,7 @@ log_destroy(logger *lg)
        MT_lock_destroy(&lg->rotation_lock);
        MT_lock_destroy(&lg->flush_lock);
        ATOMIC_DESTROY(&lg->nr_flushers);
+       ATOMIC_DESTROY(&lg->nr_open_files);
        GDKfree(lg->fn);
        GDKfree(lg->dir);
        GDKfree(lg->rbuf);
@@ -2333,9 +2333,13 @@ log_next_logfile(logger *lg, ulng ts)
        int m = (ATOMIC_GET(&GDKdebug) & FORCEMITOMASK)?1000:100;
        if (!lg->pending || !lg->pending->next)
                return 0;
-       if (ATOMIC_GET(&lg->pending->refcount) == 0 && lg->pending != 
lg->current && (ulng) ATOMIC_GET(&lg->pending->last_ts) <= ts) {
+       if (ATOMIC_GET(&lg->pending->refcount) == 0 && lg->pending != 
lg->current && lg->pending != lg->flush_ranges &&
+                       (ulng) ATOMIC_GET(&lg->pending->last_ts) == (ulng) 
ATOMIC_GET(&lg->pending->flushed_ts) &&
+                       (ulng) ATOMIC_GET(&lg->pending->flushed_ts) <= ts) {
                logged_range *p = lg->pending;
-               for(int i = 1; i<m && ATOMIC_GET(&p->refcount) == 0 && p->next 
&& p->next != lg->current && (ulng) ATOMIC_GET(&p->last_ts) <= ts; i++)
+               for(int i = 1; i<m && ATOMIC_GET(&p->refcount) == 0 && p->next 
&& p->next != lg->current && p->next != lg->flush_ranges &&
+                               (ulng) ATOMIC_GET(&p->last_ts) == (ulng) 
ATOMIC_GET(&p->flushed_ts)  &&
+                               (ulng) ATOMIC_GET(&p->flushed_ts) <= ts; i++)
                        p = p->next;
                return p->id;
        }
@@ -2359,10 +2363,6 @@ do_rotate(logger *lg) {
        logged_range* next = lg->current->next;
        if (next) {
                assert(ATOMIC_GET(&next->refcount) == 1);
-               ulng end = ATOMIC_GET(&lg->current->end);
-               ATOMIC_SET(&next->pend, end);
-               ATOMIC_SET(&next->end, end);
-               assert(ATOMIC_GET(&lg->current->refcount) > 0);
                lg->current = lg->current->next;
        }
 }
@@ -2370,20 +2370,18 @@ do_rotate(logger *lg) {
 gdk_return
 log_activate(logger *lg)
 {
-       bool flush = false;
+       bool flush_cleanup = false;
        gdk_return res = GDK_SUCCEED;
        rotation_lock(lg);
-       if (!lg->flushnow && !lg->current->next && lg->current->drops > 100000 
&& ((ulng) ATOMIC_GET(&lg->current->end) - (ulng) 
ATOMIC_GET(&lg->current->pend)) > 0 && lg->saved_id+1 == lg->id) {
+       if (!lg->flushnow && !lg->current->next && lg->current->drops > 100000 
&& (ulng) ATOMIC_GET(&lg->current->last_ts) > 0 && lg->saved_id+1 == lg->id && 
ATOMIC_GET(&lg->current->refcount) == 1 /* no pending work on this file */) {
                lg->id++;
                /* start new file */
                res = log_open_output(lg);
-               if(ATOMIC_GET(&lg->current->refcount) == 1) {
-                       flush = true;
-                       do_rotate(lg);
-               }
+               flush_cleanup = true;
+               do_rotate(lg);
        }
        rotation_unlock(lg);
-       if (flush)
+       if (flush_cleanup)
                (void) do_flush_range_cleanup(lg);
        return res;
 }
@@ -2458,10 +2456,10 @@ log_flush(logger *lg, ulng ts)
                        rotation_unlock(lg);
                }
                if (res != LOG_ERR) {
-                       while(olid <= lid) {
+                       while(olid < lid) {
                                /* Try to cleanup, remove old log file, 
continue on failure! */
+                               olid++;
                                (void)log_cleanup(lg, olid);
-                               olid++;
                        }
                }
                if (res == LOG_OK)
@@ -2604,7 +2602,6 @@ internal_log_bat(logger *lg, BAT *b, log
 
        if (LOG_DISABLED(lg) || !nr) {
                /* logging is switched off */
-               ATOMIC_ADD(&lg->current->end, nr);
                if (nr)
                        return la_bat_update_count(lg, id, offset+cnt, lg->tid);
                return GDK_SUCCEED;
@@ -2612,7 +2609,7 @@ internal_log_bat(logger *lg, BAT *b, log
 
        gdk_return (*wt) (const void *, stream *, size_t) = 
BATatoms[b->ttype].atomWrite;
 
-       if (lg->total_cnt == 0) // signals single bulk message or first part of 
bat logged in parts
+       if (lg->total_cnt == 0) /* signals single bulk message or first part of 
bat logged in parts */
                if (log_write_format(lg, &l) != GDK_SUCCEED ||
                        !mnstr_writeLng(lg->current->output_log, 
total_cnt?total_cnt:cnt) ||
                        mnstr_write(lg->current->output_log, &tpe, 1, 1) != 1 ||
@@ -2623,7 +2620,7 @@ internal_log_bat(logger *lg, BAT *b, log
        if (!total_cnt) total_cnt = cnt;
        lg->total_cnt += cnt;
 
-       if (lg->total_cnt == total_cnt) // This is the last to be logged part 
of this bat, we can already reset the total_cnt
+       if (lg->total_cnt == total_cnt) /* This is the last to be logged part 
of this bat, we can already reset the total_cnt */
                lg->total_cnt = 0;
 
        /* if offset is just for the log, but BAT is already sliced, reset 
offset */
@@ -2706,7 +2703,6 @@ log_bat_persists(logger *lg, BAT *b, log
                        return GDK_FAIL;
                }
        }
-       ATOMIC_INC(&lg->current->end);
        TRC_DEBUG(WAL, "id (%d) bat (%d)\n", id, b->batCacheid);
        gdk_return r = internal_log_bat(lg, b, id, 0, BATcount(b), 0, 0);
_______________________________________________
checkin-list mailing list -- checkin-list@monetdb.org
To unsubscribe send an email to checkin-list-le...@monetdb.org

Reply via email to