Changeset: 3da0be8b9812 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=3da0be8b9812
Modified Files:
        sql/storage/objectset.c
        sql/storage/store.c
Branch: nospare
Log Message:

In process of reparing clean up.


diffs (246 lines):

diff --git a/sql/storage/objectset.c b/sql/storage/objectset.c
--- a/sql/storage/objectset.c
+++ b/sql/storage/objectset.c
@@ -26,6 +26,7 @@ struct versionhead ;// TODO: rename to o
 #define under_destruction              (id_based_rollbacked | 
name_based_rollbacked)
 #define under_resurrection             (1<<3)
 #define deleted                                        (1<<4)
+#define rollbacked                             (1<<5)
 
 typedef struct objectversion {
        ulng ts;
@@ -334,14 +335,14 @@ static void
        assert(ov->ts >= TRANSACTION_ID_BASE);
 
        bte state = os_atmc_get_state(ov);
-       if (state & under_destruction) {
+       if (state & rollbacked) {
                return;
        }
 
-       state |= under_destruction;
+       state |= rollbacked;
        os_atmc_set_state(ov, state);
 
-       if (ov->name_based_older && !(os_atmc_get_state(ov->name_based_older) & 
under_destruction)) {
+       if (ov->name_based_older && !(os_atmc_get_state(ov->name_based_older) & 
rollbacked)) {
                if (ov->ts != ov->name_based_older->ts) {
                        // older is last committed state or belongs to parent 
transaction.
                        // In any case, we restore versionhead pointer to that.
@@ -358,7 +359,7 @@ static void
                        os_remove_name_based_chain(ov->os, store, 
ov->name_based_head);
        }
 
-       if (ov->id_based_older && !(os_atmc_get_state(ov->id_based_older) & 
under_destruction)) {
+       if (ov->id_based_older && !(os_atmc_get_state(ov->id_based_older) & 
rollbacked)) {
                if (ov->ts != ov->id_based_older->ts) {
                        // older is last committed state or belongs to parent 
transaction.
                        // In any case, we restore versionhead pointer to that.
@@ -373,11 +374,11 @@ static void
                os_remove_id_based_chain(ov->os, store, ov->id_based_head);
        }
 
-       if (ov->name_based_newer && !(os_atmc_get_state(ov->name_based_newer) & 
under_destruction)) {
+       if (ov->name_based_newer && !(os_atmc_get_state(ov->name_based_newer) & 
rollbacked)) {
                _os_rollback(ov->id_based_older, store);
        }
 
-       if (ov->id_based_newer && ov->id_based_newer != ov->name_based_newer && 
!(os_atmc_get_state(ov->id_based_newer) & under_destruction)) {
+       if (ov->id_based_newer && ov->id_based_newer != ov->name_based_newer && 
!(os_atmc_get_state(ov->id_based_newer) & rollbacked)) {
                _os_rollback(ov->id_based_older, store);
        }
 }
@@ -390,11 +391,11 @@ os_rollback(objectversion *ov, sqlstore 
        return LOG_OK;
 }
 
-static void
-put_under_destruction(sqlstore* store, objectversion *ov, ulng oldest)
+static inline void
+try_to_mark_deleted_for_destruction(sqlstore* store, objectversion *ov)
 {
        //TODO ATOMIC CAS
-       if (ov->state == 0) {
+       if (ov->state == deleted) {
                ov->state = under_destruction;
 
                if (!ov->name_based_newer) {
@@ -412,32 +413,53 @@ put_under_destruction(sqlstore* store, o
                }
 
                ov->ts = store_get_timestamp(store)+1;
+       }
+       //END ATOMIC CAS
+}
 
-               if (ov->id_based_older) {
-                       put_under_destruction(store, ov->id_based_older, 
oldest);
+static void
+objectversion_destroy_recursive(sqlstore* store, objectversion *ov)
+{
+       if (ov->id_based_older && ov->id_based_older == ov->name_based_older) {
+                       objectversion_destroy_recursive(store, 
ov->id_based_older);
                }
 
-               if (ov->name_based_older) {
-                       put_under_destruction(store, ov->name_based_older, 
oldest);
-               }
-       }
-       //END ATOMIC CAS
+               if (ov->name_based_newer)
+                       ov->name_based_newer->name_based_older=NULL;
+
+               if (ov->id_based_newer)
+                       ov->id_based_newer->id_based_older=NULL;
+
+               objectversion_destroy(store, ov->os, ov);
 }
 
 static int
 os_cleanup(sqlstore* store, objectversion *ov, ulng oldest)
 {
-       if (os_atmc_get_state(ov) == under_destruction) {
+       if (os_atmc_get_state(ov) & under_destruction) {
                if (ov->ts < oldest) {
                        // This one is ready to be freed
+                       objectversion_destroy_recursive(store, ov);
+                       return LOG_ERR;
+               }
+
+               // not yet old enough to be safely removed. Try later.
+               return LOG_OK;
+       }
+
+       if (os_atmc_get_state(ov) & rollbacked) {
+               if (ov->ts < oldest) {
+                       // This one is ready to be freed
+                       if (ov->name_based_older)
+                               ov->name_based_older->name_based_newer=NULL;
+                       if (ov->id_based_older)
+                               ov->id_based_older->id_based_newer=NULL;
                        objectversion_destroy(store, ov->os, ov);
                        return LOG_ERR;
                }
 
                if (ov->ts > TRANSACTION_ID_BASE) {
-                       /* An ov which is under_destruction and does not hold a 
valid timestamp
-                        * must be a rollbacked ov ready to be eventually 
destroyed.
-                        * We mark it with the latest possible starttime and 
reinsert it into the cleanup queue.
+                       /* We mark it with the latest possible starttime and 
reinsert it into the cleanup list.
                         * This will cause a safe eventual destruction of this 
rollbacked ov.
                         */
                        ov->ts = store_get_timestamp(store)+2;
@@ -448,42 +470,32 @@ os_cleanup(sqlstore* store, objectversio
        }
 
        if (os_atmc_get_state(ov) == deleted) {
-               if (ov->ts < oldest) {
+               if (ov->ts <= oldest) {
                        // the oldest relevant state is deleted so lets try to 
mark it as destroyed
-                       put_under_destruction(store, ov, oldest);
+                       try_to_mark_deleted_for_destruction(store, ov);
                }
 
-               // reinsert it into the queue, either because it is now marked 
for destruction or
+               // Keep it inplace on the cleanup list, either because it is 
now marked for destruction or
                // we want to retry marking it for destruction later.
                return LOG_OK;
        }
 
-       // TODO ATOMIC GET
-       objectversion* newer = ov->name_based_newer;
-
-       if (ov->ts < oldest && newer && newer->ts < oldest && 
os_atmc_get_state(newer) == active) {
-               // if ov is active and one of its parents is also active then 
both parents must be the same.
-               assert(newer == ov->id_based_newer);
-
-               put_under_destruction(store, ov, oldest);
-
-               // Since this objectversion has two committed oldest parents it 
is unreachable.
-               // So we can directly destroy it.
-               objectversion_destroy(store, ov->os, ov);
-               return LOG_ERR;
+       while (ov->id_based_older && ov->id_based_older == ov->name_based_older 
&& ov->ts >= oldest) {
+               ov = ov->id_based_older;
        }
 
-       return LOG_OK;
+       if (ov->id_based_older && ov->id_based_older == ov->name_based_older) {
+               // Destroy everything older then the oldest possibly relevant 
objectversion.
+               objectversion_destroy_recursive(store, ov->id_based_older);
+       }
+
+       return LOG_ERR;
 }
 
 static int
 tc_gc_objectversion(sql_store store, sql_change *change, ulng commit_ts, ulng 
oldest)
 {
-       (void) store;
-       if (commit_ts != oldest) {
-               // TODO: for now only oldest is allowed to do clean up
-               return LOG_ERR;
-       }
+       (void) commit_ts;
 
        objectversion *ov = (objectversion*)change->data;
 
diff --git a/sql/storage/store.c b/sql/storage/store.c
--- a/sql/storage/store.c
+++ b/sql/storage/store.c
@@ -2146,11 +2146,8 @@ store_exit(sqlstore *store)
 
        if (store->cat) {
                MT_lock_unset(&store->lock);
-               os_destroy(store->cat->objects, store);
-               os_destroy(store->cat->schemas, store);
                if (store->changes) {
-                       /*
-                       ulng oldest = store_timestamp(store);
+                       ulng oldest = store_timestamp(store)+1;
                        if (!list_empty(store->changes))
                                printf("pending changes %d\n", 
list_length(store->changes));
                        for(node *n=store->changes->h; n; n = n->next) {
@@ -2161,9 +2158,10 @@ store_exit(sqlstore *store)
                                else
                                        _DELETE(c);
                        }
-                       */
                        list_destroy(store->changes);
                }
+               os_destroy(store->cat->objects, store);
+               os_destroy(store->cat->schemas, store);
                _DELETE(store->cat);
                sequences_exit();
                MT_lock_set(&store->lock);
@@ -3598,11 +3596,7 @@ sql_trans_commit(sql_trans *tr)
                for(node *n=tr->changes->h; n && ok == LOG_OK; ) {
                        node *next = n->next;
                        sql_change *c = n->data;
-
-                       if (c->cleanup && c->cleanup(store, c, commit_ts, 
oldest)) {
-                               list_remove_node(tr->changes, store, n);
-                               _DELETE(c);
-                       } else if (tr->parent) {
+                       if (tr->parent) {
                                tr->parent->changes = sa_list_append(tr->sa, 
tr->parent->changes, c);
                        } else {
                                store->changes = sa_list_append(tr->sa, 
store->changes, c);
@@ -3611,6 +3605,17 @@ sql_trans_commit(sql_trans *tr)
                }
                list_destroy(tr->changes);
                tr->changes = NULL;
+
+               for(node *n=store->changes->h; n && ok == LOG_OK; ) {
+                       node *next = n->next;
+
+                       sql_change *c = n->data;
+                       if (c->cleanup && c->cleanup(store, c, commit_ts, 
oldest)) {
+                               list_remove_node(store->changes, store, n);
+                               _DELETE(c);
+                       }
+                       n = next;
+               }
        }
        tr->ts = commit_ts;
        return (ok==LOG_OK)?SQL_OK:SQL_ERR;
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to