Changeset: 477a07f222a1 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/477a07f222a1
Modified Files:
        sql/storage/sql_storage.h
        sql/storage/store.c
Branch: efficient-pending-changes
Log Message:

Improve the pending changes function
With a combination of small and long running transactions, the previous 
store_pending_changes function would quickly become the bottleneck of the 
system, as it required iterating over the entire store->changes list, which 
kept growing due to the long running transaction. Now, changes are efficiently 
aggregated into blocks that state that if one transaction in the block can be 
removed, then all transactions in the block can be removed, and vice-versa. 
This significantly reduces the number of iterations required by 
store_pending_changes function, being specially important for HTAP workloads.


diffs (221 lines):

diff --git a/sql/storage/sql_storage.h b/sql/storage/sql_storage.h
--- a/sql/storage/sql_storage.h
+++ b/sql/storage/sql_storage.h
@@ -342,6 +342,7 @@ extern lng store_hot_snapshot_to_stream(
 extern ulng store_function_counter(struct sqlstore *store);
 
 extern ulng store_oldest(struct sqlstore *store);
+extern ulng *store_get_active(struct sqlstore *store);
 extern ulng store_get_timestamp(struct sqlstore *store);
 extern void store_manager(struct sqlstore *store);
 
@@ -496,6 +497,8 @@ typedef struct sqlstore {
        int debug;                              /* debug mask */
        store_type active_type;
        list *changes;                  /* pending changes to cleanup */
+       list *changesBlocks;    /* aggregates changes in blocks to speed up 
iteration while cleaning up */
+       node *lastProcessedChange; /* pointer to the last processed change for 
changesBlocks */
        sql_hash *dependencies; /* pending dependencies created to cleanup */
        sql_hash *depchanges;   /* pending dependencies changes to cleanup */
        list *seqchanges;               /* pending sequence number changes to 
be add to the first commiting transaction */
@@ -532,6 +535,12 @@ typedef struct sql_change {
        tc_cleanup_fptr cleanup;/* callback to cleanup changes */
 } sql_change;
 
+typedef struct sql_change_block {
+       node *start; /* pointer to the start of the block */
+       node *end; /* pointer to the end of the block */
+       ulng ts; /* for block building purposes it could have been the 
timestamp of any sql_change in block, however we store the oldest one to help 
compute the store->oldest_pending */
+} sql_change_block;
+
 extern void trans_add(sql_trans *tr, sql_base *b, void *data, tc_cleanup_fptr 
cleanup, tc_commit_fptr commit, tc_log_fptr log);
 extern int tr_version_of_parent(sql_trans *tr, ulng ts);
 
diff --git a/sql/storage/store.c b/sql/storage/store.c
--- a/sql/storage/store.c
+++ b/sql/storage/store.c
@@ -64,6 +64,18 @@ store_oldest_pending(sqlstore *store)
        return store->oldest_pending;
 }
 
+ulng *
+store_get_active(sqlstore *store)
+{
+       ulng *active = malloc(sizeof(ulng) * (store->active->cnt + 1));
+       node *cur = store->active->h;
+       for (int i = 0; i < store->active->cnt; i++, cur = cur->next) {
+               active[i] = ((sql_trans*)cur->data)->ts;
+       }
+       active[store->active->cnt] = 0;
+       return active;
+}
+
 static inline bool
 instore(sqlid id)
 {
@@ -1817,6 +1829,7 @@ store_load(sqlstore *store, sql_allocato
        store->depchanges = hash_new(NULL, 32, (fkeyvalue)&dep_hash);
        store->sequences = hash_new(NULL, 32, (fkeyvalue)&seq_hash);
        store->seqchanges = list_create(NULL);
+       store->changesBlocks = list_create(NULL);
        if (!store->active || !store->dependencies || !store->depchanges || 
!store->sequences || !store->seqchanges) {
                TRC_CRITICAL(SQL_STORE, "Allocation failure while initializing 
store\n");
                sql_trans_destroy(tr);
@@ -2260,24 +2273,144 @@ id_hash_clear_older(sql_hash *h, ulng ol
        }
 }
 
+/*
+ * Add the newly added changes into the store->changesBlocks structure,
+ * either by merging with existing blocks or by creating new blocks.
+ */
+static void
+process_new_changes_into_blocks(sqlstore *store, ulng *active) {
+       node *currentBlockNode = store->changesBlocks->t;
+       sql_change_block *currentBlock = currentBlockNode != NULL ? 
currentBlockNode->data : NULL;
+       if (!store->lastProcessedChange) {
+               store->lastProcessedChange = store->changes->h;
+       }
+       else {
+               store->lastProcessedChange = store->lastProcessedChange->next;
+       }
+
+       /* process new changes into blocks */
+       for(; store->lastProcessedChange; store->lastProcessedChange = 
store->lastProcessedChange->next) {
+               sql_change *lastProcessedChangeData = 
store->lastProcessedChange->data;
+
+               /* check if change can be included in the current block */
+               bool fitsInCurrentBlock = currentBlock != NULL;
+               for (int i = 0; fitsInCurrentBlock && active[i] != 0; i++) {
+                       if ((active[i] >= currentBlock->ts && active[i] <= 
lastProcessedChangeData->ts)
+                               || (active[i] <= currentBlock->ts && active[i] 
>= lastProcessedChangeData->ts)) {
+                               /* there is an active transaction in between, 
meaning it cannot be included in the current block */
+                               fitsInCurrentBlock = false;
+                       }
+               }
+
+               /* fits, extend current block */
+               if (fitsInCurrentBlock) {
+                       currentBlock->end = store->lastProcessedChange;
+                       currentBlock->ts = MIN(currentBlock->ts, 
lastProcessedChangeData->ts);
+               }
+               /* does not fit, create new block */
+               else {
+                       sql_change_block *block = 
malloc(sizeof(sql_change_block));
+                       block->start = store->lastProcessedChange;
+                       block->end = store->lastProcessedChange;
+                       block->ts = lastProcessedChangeData->ts;
+                       currentBlock = block;
+                       list_append(store->changesBlocks, block);
+               }
+       }
+       store->lastProcessedChange = store->changes->t;
+}
+
+/*
+ * Merge blocks in store->changesBlocks considering the currently active 
transactions.
+ */
+static void
+merge_changes_blocks(sqlstore *store, ulng *active) {
+       node *prevBlockNode = store->changesBlocks->h;
+       sql_change_block *prevBlock = prevBlockNode->data;
+       
+       for (node *currBlockNode = prevBlockNode->next; currBlockNode; 
currBlockNode = currBlockNode->next) {
+               sql_change_block *currBlock = currBlockNode->data;
+               bool canMerge = true;
+               for (int i = 0; canMerge && active[i] != 0; i++) {
+                       if ((active[i] >= prevBlock->ts && active[i] <= 
currBlock->ts)
+                               || (active[i] <= prevBlock->ts && active[i] >= 
currBlock->ts)) {
+                               /* cannot merge as there is an active 
transaction in between the two blocks */
+                               canMerge = false;
+                       }
+               }
+               /* can merge, combine the prev block and free the current one */
+               if (canMerge) {
+                       prevBlock->end = currBlock->end;
+                       prevBlock->ts = MIN(prevBlock->ts, currBlock->ts);
+                       list_remove_node(store->changesBlocks, store, 
currBlockNode);
+                       free(currBlock);
+                       currBlockNode = prevBlockNode;
+               }
+               /* cannot merge, advance the prev block pointer */
+               else {
+                       prevBlockNode = currBlockNode;
+                       prevBlock = currBlockNode->data;
+               }
+       }
+}
+
+/*
+ * Cleanup the sql_changes list based on the oldest active transaction,
+ * by iterating over the changesBlocks list. 
+ * Returns the timestamp of the oldest sql_change not cleaned.
+ */
+static ulng
+cleanup_changes(sqlstore *store, ulng oldest) {
+       ulng oldest_changes = store_get_timestamp(store);
+
+       for (node *blockNode = store->changesBlocks->h; blockNode;) {
+               sql_change_block *block = blockNode->data;
+               node* nextBlockNode = blockNode->next;
+               
+               /* check if block and all changes inside can be cleaned */
+               if (oldest >= block->ts) {
+                       node *endNode = block->end->next;
+                       /* clean all changes in the block */
+                       for (node *changeNode = block->start; changeNode && 
changeNode != endNode;) {
+                               sql_change *change = changeNode->data;
+                               node *next = changeNode->next;
+                               change->cleanup(store, change, oldest);
+                               list_remove_node(store->changes, store, 
changeNode);
+                               _DELETE(change);
+                               changeNode = next;
+                       }
+
+                       /* free block */
+                       list_remove_node(store->changesBlocks, store, 
blockNode);
+                       free(block);
+               }
+               else if (block->ts < oldest_changes) {
+                       oldest_changes = block->ts;
+               }
+               blockNode = nextBlockNode;
+       }
+
+       store->lastProcessedChange = store->changes->t;
+       return oldest_changes;
+}
+
 static void
 store_pending_changes(sqlstore *store, ulng oldest)
 {
        ulng oldest_changes = store_get_timestamp(store);
        if (!list_empty(store->changes)) { /* lets first cleanup old stuff */
-               for(node *n=store->changes->h; n; ) {
-                       node *next = n->next;
-                       sql_change *c = n->data;
-
-                       if (c->cleanup(store, c, oldest)) {
-                               list_remove_node(store->changes, store, n);
-                               _DELETE(c);
-                       } else if (c->ts < oldest_changes) {
-                               oldest_changes = c->ts;
-                       }
-                       n = next;
-               }
-       }
+               ulng *active = store_get_active(store);
+
+               /* process new blocks */
+               process_new_changes_into_blocks(store, active);
+
+               /* try merging existing blocks */
+               merge_changes_blocks(store, active);
+
+               /* try cleanup */
+               oldest_changes = cleanup_changes(store, oldest);
+       }
+
        if (ATOMIC_GET(&store->nr_active) < 2) { /* one or no transaction 
running */
                dep_hash_clear(store->dependencies);
                dep_hash_clear(store->depchanges);
_______________________________________________
checkin-list mailing list -- checkin-list@monetdb.org
To unsubscribe send an email to checkin-list-le...@monetdb.org

Reply via email to